use std::{
cell::{Cell, RefCell},
convert::TryInto,
mem,
rc::Rc,
sync::{mpsc, Arc, Mutex},
thread::{self, JoinHandle},
time::Duration,
};
use ::pipewire as pw;
use pw::{
properties::properties,
spa::{
self,
param::{
audio::{AudioFormat, AudioInfoRaw},
format::{MediaSubtype, MediaType},
format_utils, ParamType,
},
pod::{serialize::PodSerializer, Object, Pod, Value},
utils::{Direction, SpaTypes},
},
stream::{StreamFlags, StreamState},
types::ObjectType,
};
use tauri::ipc::Channel;
use super::{
devices::AudioDevice,
stream::{StreamErrorEvent, StreamEvent, WaveformState},
};
const PIPEWIRE_OUTPUT_PREFIX: &str = "pipewire-output:";
const PIPEWIRE_FORMAT_TIMEOUT: Duration = Duration::from_secs(3);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) struct PipewireStreamFormat {
pub sample_rate: u32,
pub channels: u16,
}
pub(super) struct PipewireOutputStream {
stop: pw::channel::Sender<PipewireControl>,
worker: Option<JoinHandle<()>>,
format: PipewireStreamFormat,
}
impl PipewireOutputStream {
pub(super) fn start(device_id: &str, on_event: Channel<StreamEvent>) -> Result<Self, String> {
let sink_node_id = pipewire_output_node_id(device_id)?;
let (stop, receiver) = pw::channel::channel();
let (ready_sender, ready_receiver) = mpsc::channel();
let worker = thread::spawn(move || {
if let Err(error) =
run_output_capture(sink_node_id, on_event, receiver, ready_sender.clone())
{
let _ = ready_sender.send(Err(error));
}
});
let format = match ready_receiver.recv_timeout(PIPEWIRE_FORMAT_TIMEOUT) {
Ok(Ok(format)) => format,
Ok(Err(error)) => {
stop_output_worker(&stop, Some(worker));
return Err(error);
}
Err(mpsc::RecvTimeoutError::Timeout) => {
stop_output_worker(&stop, Some(worker));
return Err("Timed out waiting for PipeWire output capture format".to_string());
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
stop_output_worker(&stop, Some(worker));
return Err(
"PipeWire output capture stopped before negotiating a format".to_string(),
);
}
};
Ok(Self {
stop,
worker: Some(worker),
format,
})
}
pub(super) fn format(&self) -> PipewireStreamFormat {
self.format
}
}
impl Drop for PipewireOutputStream {
fn drop(&mut self) {
stop_output_worker(&self.stop, self.worker.take());
}
}
pub(super) fn output_devices() -> Vec<AudioDevice> {
collect_pipewire_snapshot()
.map(|snapshot| pipewire_devices_from_snapshot(&snapshot))
.unwrap_or_default()
}
fn stop_output_worker(stop: &pw::channel::Sender<PipewireControl>, worker: Option<JoinHandle<()>>) {
let _ = stop.send(PipewireControl::Stop);
if let Some(worker) = worker {
let _ = worker.join();
}
}
enum PipewireControl {
Stop,
}
type ReadySender = Arc<Mutex<Option<mpsc::Sender<Result<PipewireStreamFormat, String>>>>>;
struct PipewireCaptureData {
format: AudioInfoRaw,
negotiated_format: Option<PipewireStreamFormat>,
waveform: WaveformState,
ready_sender: ReadySender,
}
fn run_output_capture(
sink_node_id: u32,
on_event: Channel<StreamEvent>,
receiver: pw::channel::Receiver<PipewireControl>,
ready_sender: mpsc::Sender<Result<PipewireStreamFormat, String>>,
) -> Result<(), String> {
let mainloop = pw::main_loop::MainLoopRc::new(None)
.map_err(|error| format!("Failed to create PipeWire main loop: {error}"))?;
let _receiver = receiver.attach(mainloop.loop_(), {
let mainloop = mainloop.clone();
move |message| match message {
PipewireControl::Stop => mainloop.quit(),
}
});
let context = pw::context::ContextRc::new(&mainloop, None)
.map_err(|error| format!("Failed to create PipeWire context: {error}"))?;
let core = context
.connect_rc(None)
.map_err(|error| format!("Failed to connect to PipeWire: {error}"))?;
let stream = pw::stream::StreamRc::new(
core,
"try-cpal-output-capture",
properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_CATEGORY => "Capture",
*pw::keys::MEDIA_ROLE => "Music",
*pw::keys::STREAM_CAPTURE_SINK => "true",
},
)
.map_err(|error| format!("Failed to create PipeWire output capture stream: {error}"))?;
let ready_sender = Arc::new(Mutex::new(Some(ready_sender)));
let data = PipewireCaptureData {
format: AudioInfoRaw::new(),
negotiated_format: None,
waveform: WaveformState::new(on_event.clone()),
ready_sender,
};
let _listener = stream
.add_local_listener_with_user_data(data)
.state_changed(|_, user_data, _, new| {
if let StreamState::Error(error) = new {
send_ready(
&user_data.ready_sender,
Err(format!("PipeWire output capture stream error: {error}")),
);
let _ = user_data
.waveform
.on_event
.send(StreamEvent::Error(StreamErrorEvent { message: error }));
}
})
.param_changed(|_, user_data, id, param| {
let Some(param) = param else {
return;
};
if id != ParamType::Format.as_raw() {
return;
}
match parse_audio_format(param) {
Ok((format, negotiated_format)) => {
user_data.format = format;
user_data.negotiated_format = Some(negotiated_format);
send_ready(&user_data.ready_sender, Ok(negotiated_format));
}
Err(error) => send_ready(&user_data.ready_sender, Err(error)),
}
})
.process(|stream, user_data| {
let Some(format) = user_data.negotiated_format else {
return;
};
let Some(mut buffer) = stream.dequeue_buffer() else {
return;
};
let Some(data) = buffer.datas_mut().first_mut() else {
return;
};
let samples = pipewire_f32_samples(data);
user_data
.waveform
.process(&samples, usize::from(format.channels));
})
.register();
let format_param = f32_format_param()?;
let mut params = [Pod::from_bytes(&format_param)
.ok_or_else(|| "Failed to build PipeWire F32 format param".to_string())?];
stream
.connect(
Direction::Input,
Some(sink_node_id),
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS,
&mut params,
)
.map_err(|error| {
format!(
"Failed to connect PipeWire output capture stream to node {sink_node_id}: {error}"
)
})?;
mainloop.run();
let _ = stream.disconnect();
Ok(())
}
fn send_ready(ready_sender: &ReadySender, result: Result<PipewireStreamFormat, String>) {
let Ok(mut ready_sender) = ready_sender.lock() else {
return;
};
let Some(ready_sender) = ready_sender.take() else {
return;
};
let _ = ready_sender.send(result);
}
fn pipewire_output_node_id(device_id: &str) -> Result<u32, String> {
let node_id = device_id
.strip_prefix(PIPEWIRE_OUTPUT_PREFIX)
.filter(|target| !target.is_empty())
.ok_or_else(|| {
"Select a PipeWire output monitor device from the Out source list".to_string()
})?;
node_id
.parse()
.map_err(|_| format!("Invalid PipeWire output node id: {node_id}"))
}
fn parse_audio_format(param: &Pod) -> Result<(AudioInfoRaw, PipewireStreamFormat), String> {
let (media_type, media_subtype) = format_utils::parse_format(param)
.map_err(|error| format!("Failed to parse PipeWire stream format: {error:?}"))?;
if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw {
return Err("PipeWire output capture negotiated a non-raw-audio format".to_string());
}
let mut format = AudioInfoRaw::new();
format
.parse(param)
.map_err(|error| format!("Failed to parse PipeWire raw audio format: {error:?}"))?;
let sample_rate = format.rate();
let channels = u16::try_from(format.channels())
.map_err(|_| format!("PipeWire channel count is too large: {}", format.channels()))?;
if sample_rate == 0 || channels == 0 {
return Err(format!(
"PipeWire output capture negotiated invalid format: {sample_rate} Hz, {channels} channels"
));
}
Ok((
format,
PipewireStreamFormat {
sample_rate,
channels,
},
))
}
fn pipewire_f32_samples(data: &mut spa::buffer::Data) -> Vec<f32> {
let chunk = data.chunk();
let offset = chunk.offset() as usize;
let size = chunk.size() as usize;
let Some(bytes) = data.data() else {
return Vec::new();
};
if offset >= bytes.len() {
return Vec::new();
}
let end = offset.saturating_add(size).min(bytes.len());
let complete_len = (end - offset) - ((end - offset) % mem::size_of::<f32>());
bytes[offset..offset + complete_len]
.chunks_exact(mem::size_of::<f32>())
.map(|chunk| f32::from_le_bytes(chunk.try_into().expect("chunk size is checked")))
.collect()
}
fn f32_format_param() -> Result<Vec<u8>, String> {
let mut audio_info = AudioInfoRaw::new();
audio_info.set_format(AudioFormat::F32LE);
PodSerializer::serialize(
std::io::Cursor::new(Vec::new()),
&Value::Object(Object {
type_: SpaTypes::ObjectParamFormat.as_raw(),
id: ParamType::EnumFormat.as_raw(),
properties: audio_info.into(),
}),
)
.map(|serialized| serialized.0.into_inner())
.map_err(|error| format!("Failed to serialize PipeWire F32 format param: {error:?}"))
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct PipewireNode {
id: u32,
media_class: Option<String>,
description: Option<String>,
name: Option<String>,
}
#[derive(Default)]
struct PipewireRegistrySnapshot {
nodes: Vec<PipewireNode>,
default_sink: DefaultSinkMetadata,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct DefaultSinkMetadata {
active_name: Option<String>,
configured_name: Option<String>,
}
impl DefaultSinkMetadata {
fn apply_property(&mut self, key: Option<&str>, value: Option<&str>) {
match key {
None => *self = Self::default(),
Some("default.audio.sink") => self.active_name = value.and_then(default_sink_name),
Some("default.configured.audio.sink") => {
self.configured_name = value.and_then(default_sink_name)
}
_ => {}
}
}
fn selected_name(&self) -> Option<&str> {
self.active_name
.as_deref()
.or(self.configured_name.as_deref())
}
}
struct BoundMetadata {
#[allow(dead_code)]
listener: pw::metadata::MetadataListener,
#[allow(dead_code)]
metadata: pw::metadata::Metadata,
}
fn collect_pipewire_snapshot() -> Result<PipewireRegistrySnapshot, String> {
let mainloop = pw::main_loop::MainLoopRc::new(None)
.map_err(|error| format!("Failed to create PipeWire main loop: {error}"))?;
let context = pw::context::ContextRc::new(&mainloop, None)
.map_err(|error| format!("Failed to create PipeWire context: {error}"))?;
let core = context
.connect_rc(None)
.map_err(|error| format!("Failed to connect to PipeWire: {error}"))?;
let registry = core
.get_registry_rc()
.map_err(|error| format!("Failed to read PipeWire registry: {error}"))?;
let nodes = Rc::new(RefCell::new(Vec::new()));
let default_sink = Rc::new(RefCell::new(DefaultSinkMetadata::default()));
let metadata_globals = Rc::new(RefCell::new(Vec::<
pw::registry::GlobalObject<pw::properties::PropertiesBox>,
>::new()));
let nodes_for_listener = Rc::clone(&nodes);
let metadata_globals_for_listener = Rc::clone(&metadata_globals);
let _registry_listener = registry
.add_listener_local()
.global(move |global| match &global.type_ {
ObjectType::Node => {
if let Some(node) = node_from_global(global) {
nodes_for_listener.borrow_mut().push(node);
}
}
ObjectType::Metadata => {
if is_default_metadata(global) {
metadata_globals_for_listener
.borrow_mut()
.push(global.to_owned());
}
}
_ => {}
})
.register();
sync_pipewire_core(&core, &mainloop)?;
let mut bound_metadata = Vec::new();
for global in metadata_globals.borrow().iter() {
let Ok(metadata) = registry.bind::<pw::metadata::Metadata, _>(global) else {
continue;
};
let default_sink = Rc::clone(&default_sink);
let listener = metadata
.add_listener_local()
.property(move |_, key, _, value| {
default_sink.borrow_mut().apply_property(key, value);
0
})
.register();
bound_metadata.push(BoundMetadata { listener, metadata });
}
sync_pipewire_core(&core, &mainloop)?;
let snapshot_nodes = nodes.borrow().clone();
let snapshot_default_sink = default_sink.borrow().clone();
Ok(PipewireRegistrySnapshot {
nodes: snapshot_nodes,
default_sink: snapshot_default_sink,
})
}
fn sync_pipewire_core(
core: &pw::core::CoreRc,
mainloop: &pw::main_loop::MainLoopRc,
) -> Result<(), String> {
let done = Rc::new(Cell::new(false));
let pending = core
.sync(0)
.map_err(|error| format!("Failed to sync PipeWire registry: {error}"))?;
let _core_listener = core
.add_listener_local()
.done({
let done = Rc::clone(&done);
let mainloop = mainloop.clone();
move |id, seq| {
if id == pw::core::PW_ID_CORE && seq == pending {
done.set(true);
mainloop.quit();
}
}
})
.register();
while !done.get() {
mainloop.run();
}
Ok(())
}
fn node_from_global<P: AsRef<spa::utils::dict::DictRef>>(
global: &pw::registry::GlobalObject<P>,
) -> Option<PipewireNode> {
let props = global.props.as_ref()?.as_ref();
Some(PipewireNode {
id: global.id,
media_class: props.get("media.class").map(ToOwned::to_owned),
description: props.get("node.description").map(ToOwned::to_owned),
name: props.get("node.name").map(ToOwned::to_owned),
})
}
fn is_default_metadata<P: AsRef<spa::utils::dict::DictRef>>(
global: &pw::registry::GlobalObject<P>,
) -> bool {
global
.props
.as_ref()
.and_then(|props| props.as_ref().get("metadata.name"))
== Some("default")
}
fn pipewire_devices_from_snapshot(snapshot: &PipewireRegistrySnapshot) -> Vec<AudioDevice> {
let default_sink_name = snapshot.default_sink.selected_name();
let mut sinks = snapshot
.nodes
.iter()
.filter(|node| node.media_class.as_deref() == Some("Audio/Sink"))
.collect::<Vec<_>>();
sinks.sort_by_key(|node| node.id);
sinks
.into_iter()
.map(|node| pipewire_device_from_sink(node, default_sink_name))
.collect()
}
fn pipewire_device_from_sink(node: &PipewireNode, default_sink_name: Option<&str>) -> AudioDevice {
let id = node.id.to_string();
let name = node
.description
.as_ref()
.or(node.name.as_ref())
.map(|label| format!("Output: {label}"))
.unwrap_or_else(|| format!("Output: PipeWire sink {id}"));
AudioDevice {
id: Some(format!("{PIPEWIRE_OUTPUT_PREFIX}{id}")),
name,
supports_input: false,
supports_output: true,
can_capture_input: false,
can_capture_output: true,
capture_input_unavailable_reason: Some("Output monitor device".to_string()),
capture_output_unavailable_reason: None,
is_default_input: false,
is_default_output: node.name.as_deref() == default_sink_name,
}
}
fn default_sink_name(value: &str) -> Option<String> {
serde_json::from_str::<serde_json::Value>(value)
.ok()?
.get("name")?
.as_str()
.filter(|name| !name.is_empty())
.map(ToOwned::to_owned)
}
#[cfg(test)]
mod tests {
use super::{
default_sink_name, pipewire_devices_from_snapshot, DefaultSinkMetadata, PipewireNode,
PipewireRegistrySnapshot,
};
#[test]
fn includes_only_audio_sink_nodes() {
let snapshot = PipewireRegistrySnapshot {
nodes: vec![
PipewireNode {
id: 43,
media_class: Some("Audio/Source".to_string()),
description: Some("Brio 300 Mono".to_string()),
name: Some("alsa_input.usb-046d_Brio_300".to_string()),
},
PipewireNode {
id: 54,
media_class: Some("Audio/Sink".to_string()),
description: Some("Built-in Audio Digital Stereo (IEC958)".to_string()),
name: Some("alsa_output.pci-0000_00_1f.3.iec958-stereo".to_string()),
},
],
default_sink: DefaultSinkMetadata::default(),
};
let devices = pipewire_devices_from_snapshot(&snapshot);
assert_eq!(devices.len(), 1);
assert_eq!(devices[0].id.as_deref(), Some("pipewire-output:54"));
assert_eq!(
devices[0].name,
"Output: Built-in Audio Digital Stereo (IEC958)"
);
assert!(devices[0].can_capture_output);
}
#[test]
fn parses_default_sink_metadata_json() {
assert_eq!(
default_sink_name(r#"{ "name": "alsa_output.pci-0000_00_1f.3.iec958-stereo" }"#)
.as_deref(),
Some("alsa_output.pci-0000_00_1f.3.iec958-stereo")
);
assert_eq!(default_sink_name(r#"{ "name": "" }"#), None);
assert_eq!(default_sink_name("not-json"), None);
}
#[test]
fn prefers_active_default_sink_over_configured_fallback() {
let mut metadata = DefaultSinkMetadata::default();
metadata.apply_property(
Some("default.configured.audio.sink"),
Some(r#"{ "name": "configured.sink" }"#),
);
assert_eq!(metadata.selected_name(), Some("configured.sink"));
metadata.apply_property(
Some("default.audio.sink"),
Some(r#"{ "name": "active.sink" }"#),
);
assert_eq!(metadata.selected_name(), Some("active.sink"));
}
#[test]
fn marks_matching_sink_name_as_default_and_ignores_sources() {
let snapshot = PipewireRegistrySnapshot {
nodes: vec![
PipewireNode {
id: 43,
media_class: Some("Audio/Source".to_string()),
description: Some("Brio 300 Mono".to_string()),
name: Some("alsa_input.usb-046d_Brio_300".to_string()),
},
PipewireNode {
id: 54,
media_class: Some("Audio/Sink".to_string()),
description: Some("Built-in Audio Digital Stereo (IEC958)".to_string()),
name: Some("alsa_output.pci-0000_00_1f.3.iec958-stereo".to_string()),
},
PipewireNode {
id: 80,
media_class: Some("Audio/Sink".to_string()),
description: Some("HDMI Audio".to_string()),
name: Some("alsa_output.pci-0000_01_00.1.hdmi-stereo".to_string()),
},
],
default_sink: DefaultSinkMetadata {
active_name: Some("alsa_output.pci-0000_00_1f.3.iec958-stereo".to_string()),
configured_name: None,
},
};
let devices = pipewire_devices_from_snapshot(&snapshot);
assert_eq!(devices.len(), 2);
let default_count = devices
.iter()
.filter(|device| device.is_default_output)
.count();
assert_eq!(default_count, 1);
assert_eq!(
devices
.iter()
.find(|device| device.is_default_output)
.and_then(|device| device.id.as_deref()),
Some("pipewire-output:54")
);
}
}