1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// Copyright (C) 2026 SiputBiru <hillsforrest03@gmail.com>
// SPDX-License-Identifier: GPL-2.0-only
use std::cell::Cell;
use std::mem;
use std::os::raw::c_void;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::mpsc;
use std::time::Duration;
use pipewire::channel::Receiver;
use pipewire::context::ContextRc;
use pipewire::main_loop::MainLoopRc;
use crate::pipeline::Pipeline;
use crate::state::{DeviceClass, NodeInfo, PwCommand, PwEvent};
use super::filter::{FilterHandle, create_eq_filter};
use super::links::{
check_null_sink_input_source, create_device_output_links, remove_device_output_links,
};
use super::null_sink::{NullSinkHandle, NullSinkListenerData, bound_cb, create_null_sink};
pub fn run(tx: mpsc::Sender<PwEvent>, rx: Receiver<PwCommand>, pipeline: Arc<Pipeline>) {
let mainloop = match MainLoopRc::new(None) {
Ok(ml) => ml,
Err(e) => {
let _ = tx.send(PwEvent::Error(format!("mainloop: {e}")));
return;
}
};
let context = match ContextRc::new(&mainloop, None) {
Ok(ctx) => ctx,
Err(e) => {
let _ = tx.send(PwEvent::Error(format!("context: {e}")));
return;
}
};
let core = match context.connect_rc(None) {
Ok(c) => c,
Err(e) => {
let _ = tx.send(PwEvent::Error(format!("connect: {e}")));
return;
}
};
let registry = match core.get_registry_rc() {
Ok(r) => r,
Err(e) => {
let _ = tx.send(PwEvent::Error(format!("registry: {e}")));
return;
}
};
let nodes: Rc<std::cell::RefCell<Vec<NodeInfo>>> = Rc::new(std::cell::RefCell::new(Vec::new()));
let nodes_reg_add = nodes.clone();
let nodes_reg_rem = nodes.clone();
let _reg_listener = registry
.add_listener_local()
.global(move |global| {
if let Some(props) = &global.props {
let class = props.get(&pipewire::keys::MEDIA_CLASS).unwrap_or("");
if class == "Audio/Sink" || class == "Audio/Source" {
let name = props
.get(&pipewire::keys::NODE_NAME)
.unwrap_or("?")
.to_string();
let description = props
.get(&pipewire::keys::NODE_DESCRIPTION)
.unwrap_or("")
.to_string();
let device_class = if class == "Audio/Source" {
DeviceClass::Input
} else if name.to_lowercase().contains("headphone")
|| name.to_lowercase().contains("headset")
|| description.to_lowercase().contains("headphone")
|| description.to_lowercase().contains("headset")
{
DeviceClass::Headphone
} else {
DeviceClass::Speaker
};
nodes_reg_add.borrow_mut().push(NodeInfo {
id: global.id,
name,
description,
class: device_class,
});
}
}
})
.global_remove(move |id| {
nodes_reg_rem.borrow_mut().retain(|n| n.id != id);
})
.register();
let tx_snapshot = tx.clone();
let nodes_timer = nodes.clone();
// Declare cells BEFORE the timer so they can be captured.
// Use Rc so the timer closure and the null-sink listener can share.
let null_sink_id_cell: Rc<Cell<Option<u32>>> = Rc::new(Cell::new(None));
let ns_timer = null_sink_id_cell.clone();
let timer = mainloop.loop_().add_timer(move |_| {
let list: Vec<NodeInfo> = nodes_timer.borrow().iter().cloned().collect();
let _ = tx_snapshot.send(PwEvent::NodeList(list));
// Poll whether an audio source is linked to the null sink's
// playback ports. `pw-link -I` lists all links as
// {out_id}:{out_port} -> {in_id}:{in_port}
// Checks if any link targets the null sink's input.
if let Some(ns_id) = ns_timer.get() {
let has_source = check_null_sink_input_source(ns_id);
let _ = tx_snapshot.send(PwEvent::NullSinkInputState { has_source });
}
});
timer.update_timer(Some(Duration::from_millis(500)), None);
let core_raw = core.as_raw_ptr().cast::<pipewire_sys::pw_core>();
let filter_cell: Cell<Option<FilterHandle>> = Cell::new(None);
let nullsink_cell: Cell<Option<NullSinkHandle>> = Cell::new(None);
// Create the virtual null-audio-sink BEFORE the equalizer filter.
// Attaching a proxy-listener that fires when the proxy is bound to a
// server-side global id; that callback then creates the filter wired
// to the null sink's monitor ports. This ordering ensures wiremix can
// discover eqtui as a selectable Audio/Sink.
let nullsink_handle = create_null_sink(core_raw, &tx);
if let Some(mut handle) = nullsink_handle {
// Heap-allocate listener data. This Box is leaked into a raw
// pointer and freed during shutdown (NullSinkHandle::destroy).
let listener_data = Box::new(NullSinkListenerData {
tx: tx.clone(),
core_raw,
pipeline: pipeline.clone(),
// Safety: cell pointers live on the stack in run(), which
// outlives the mainloop (only quits on Terminate).
filter_cell_ptr: (&raw const filter_cell).cast_mut(),
null_sink_id_cell_ptr: Rc::as_ptr(&null_sink_id_cell).cast_mut(),
filter_created: Cell::new(false),
});
let data_ptr = Box::into_raw(listener_data);
// Allocate spa_hook for the proxy listener.
let listener_box = Box::new(unsafe { mem::zeroed::<libspa_sys::spa_hook>() });
let listener_ptr = Box::into_raw(listener_box);
// Set up pw_proxy_events with the bound callback. When the
// null-sink proxy is bound, bound_cb reads the global id and
// creates the equalizer filter wired to it.
let mut events_box = Box::new(unsafe { mem::zeroed::<pipewire_sys::pw_proxy_events>() });
events_box.version = pipewire_sys::PW_VERSION_PROXY_EVENTS;
events_box.bound = Some(bound_cb);
let events_ptr = Box::into_raw(events_box);
// Safety: proxy is non-null (create_null_sink guarantees this).
// listener_ptr and events_ptr point to freshly allocated,
// heap-stable memory that outlives the proxy (freed on destroy).
// data_ptr holds cloned/ref-counted resources valid for the
// mainloop lifetime.
unsafe {
pipewire_sys::pw_proxy_add_listener(
handle.proxy,
listener_ptr,
events_ptr,
data_ptr.cast::<c_void>(),
);
}
// Stash the listener pointers in the handle for cleanup.
handle.listener_ptr = listener_ptr;
handle.events_ptr = events_ptr;
handle.data_ptr = data_ptr;
nullsink_cell.set(Some(handle));
} else {
let _ = tx.send(PwEvent::NullSinkError(
"failed to create null-audio-sink node".into(),
));
// Fallback: create filter without a null sink target so the
// equalizer remains functional even without wiremix visibility.
if let Some(handle) = create_eq_filter(core_raw, &pipeline, &tx, None) {
filter_cell.set(Some(handle));
} else {
return;
}
}
let mainloop_cmd = mainloop.clone();
let _cmd_receiver = rx.attach(mainloop.loop_(), move |cmd| match cmd {
PwCommand::Terminate => {
// Teardown order: deactivate/destroy filter first, then destroy the
// null-audio-sink. The filter consumer must be torn down before the
// source node to avoid dangling PipeWire references.
if let Some(handle) = filter_cell.take() {
// Safety: running on the mainloop thread while the core is
// still connected. The filter pointer and its allocations
// are valid — FilterHandle::destroy deactivates, disconnects,
// and frees all resources.
unsafe {
handle.destroy();
}
}
if let Some(handle) = nullsink_cell.take() {
// Safety: running on the mainloop thread while the core
// is still connected. pw_proxy_destroy frees the client-side
// proxy and destroys the server-side node.
unsafe {
handle.destroy();
}
}
mainloop_cmd.quit();
}
PwCommand::ConnectDevice { filter_id, node_id } => {
tracing::info!(
filter_id,
device_id = node_id,
"Connecting device to filter"
);
create_device_output_links(filter_id, node_id);
}
PwCommand::DisconnectDevice { filter_id, node_id } => {
tracing::info!(
filter_id,
device_id = node_id,
"Disconnecting device from filter"
);
remove_device_output_links(filter_id, node_id);
}
});
let _ = tx.send(PwEvent::Connected);
mainloop.run();
}