use std::{
cell::RefCell,
fmt,
hash::{Hash, Hasher},
rc::Rc,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc, Arc,
},
thread,
time::Duration,
};
use pipewire::{
self as pw,
context::ContextRc,
core::PW_ID_CORE,
main_loop::MainLoopRc,
metadata::{Metadata, MetadataListener},
node::{Node, NodeListener},
properties::PropertiesBox,
proxy::ProxyT,
spa::utils::result::AsyncSeq,
types::ObjectType,
};
use super::stream::Stream;
use crate::{
host::{
emit_error,
latch::Latch,
pipewire::{
stream::{
DefaultDeviceMonitor, PwInitGuard, StreamCommand, StreamData, SUPPORTED_FORMATS,
},
utils::{audio, clock, default, node, DEVICE_ICON_NAME, METADATA_NAME},
},
},
iter::{SupportedInputConfigs, SupportedOutputConfigs},
traits::DeviceTrait,
BufferSize, ChannelCount, Data, DeviceDescription, DeviceDescriptionBuilder, DeviceDirection,
DeviceId, DeviceType, Error, ErrorKind, FrameCount, HostId, InputCallbackInfo, InterfaceType,
OutputCallbackInfo, SampleFormat, SampleRate, StreamConfig, SupportedBufferSize,
SupportedStreamConfig, SupportedStreamConfigRange,
};
pub type Devices = std::vec::IntoIter<Device>;
const INIT_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Clone, Debug, Default, Copy)]
pub(crate) enum Class {
#[default]
Node,
DefaultSink,
DefaultInput,
DefaultOutput,
}
#[derive(Clone, Debug, Default, Copy)]
enum Role {
#[default]
Sink,
Source,
Duplex,
StreamOutput,
StreamInput,
}
#[derive(Clone, Debug, Default)]
pub struct Device {
node_name: String,
nick_name: String,
description: String,
direction: DeviceDirection,
channels: ChannelCount,
rate: SampleRate,
allow_rates: Arc<[SampleRate]>,
quantum: FrameCount,
min_quantum: FrameCount,
max_quantum: FrameCount,
class: Class,
role: Role,
icon_name: String,
object_serial: u32,
interface_type: InterfaceType,
address: Option<String>,
driver: Option<String>,
connect_automatically: Arc<AtomicBool>,
}
impl Device {
pub(crate) fn class(&self) -> Class {
self.class
}
fn sink_default() -> Self {
Self {
node_name: "sink_default".to_owned(),
nick_name: "sink_default".to_owned(),
description: "default_sink".to_owned(),
direction: DeviceDirection::Duplex,
channels: 2,
class: Class::DefaultSink,
role: Role::Sink,
..Default::default()
}
}
fn input_default() -> Self {
Self {
node_name: "input_default".to_owned(),
nick_name: "input_default".to_owned(),
description: "default_input".to_owned(),
direction: DeviceDirection::Input,
channels: 2,
class: Class::DefaultInput,
role: Role::Source,
..Default::default()
}
}
fn output_default() -> Self {
Self {
node_name: "output_default".to_owned(),
nick_name: "output_default".to_owned(),
description: "default_output".to_owned(),
direction: DeviceDirection::Output,
channels: 2,
class: Class::DefaultOutput,
role: Role::Sink,
..Default::default()
}
}
fn device_type(&self) -> DeviceType {
match self.icon_name.as_str() {
"audio-headphones" => DeviceType::Headphones,
"audio-headset" => DeviceType::Headset,
"audio-input-microphone" => DeviceType::Microphone,
"audio-speakers" => DeviceType::Speaker,
_ => DeviceType::Unknown,
}
}
pub(crate) fn default_metadata_key(&self) -> Option<&'static str> {
match self.class {
Class::DefaultOutput | Class::DefaultSink => Some(default::SINK),
Class::DefaultInput => Some(default::SOURCE),
Class::Node => None,
}
}
pub(crate) fn pw_properties(
&self,
direction: DeviceDirection,
config: &StreamConfig,
) -> PropertiesBox {
let mut properties = match direction {
DeviceDirection::Output => pw::properties::properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_CATEGORY => "Playback",
},
DeviceDirection::Input => pw::properties::properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_CATEGORY => "Capture",
},
_ => unreachable!(),
};
if matches!(self.role, Role::Sink) && matches!(direction, DeviceDirection::Input) {
properties.insert(*pw::keys::STREAM_CAPTURE_SINK, "true");
}
if matches!(self.class, Class::Node) {
properties.insert(*pw::keys::TARGET_OBJECT, self.object_serial.to_string());
}
properties.insert("node.group", format!("cpal-{}", std::process::id()));
if let BufferSize::Fixed(buffer_size) = config.buffer_size {
properties.insert(
*pw::keys::NODE_LATENCY,
format!("{buffer_size}/{rate}", rate = config.sample_rate),
);
}
properties
}
}
impl PartialEq for Device {
fn eq(&self, other: &Self) -> bool {
self.node_name == other.node_name
}
}
impl Eq for Device {}
impl fmt::Display for Device {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let desc = self.description().map_err(|_| fmt::Error)?;
f.write_str(desc.name())
}
}
impl Hash for Device {
fn hash<H: Hasher>(&self, state: &mut H) {
self.node_name.hash(state);
}
}
impl DeviceTrait for Device {
type Stream = Stream;
type SupportedInputConfigs = SupportedInputConfigs;
type SupportedOutputConfigs = SupportedOutputConfigs;
fn id(&self) -> Result<DeviceId, Error> {
Ok(DeviceId::new(HostId::PipeWire, &self.node_name))
}
fn description(&self) -> Result<DeviceDescription, Error> {
let mut builder = DeviceDescriptionBuilder::new(&self.nick_name)
.direction(self.direction)
.device_type(self.device_type())
.interface_type(self.interface_type);
if let Some(address) = self.address.as_ref() {
builder = builder.address(address);
}
if let Some(driver) = self.driver.as_ref() {
builder = builder.driver(driver);
}
if !self.description.is_empty() && self.description != self.nick_name {
builder = builder.add_extended_line(&self.description);
}
Ok(builder.build())
}
fn supports_input(&self) -> bool {
matches!(
self.direction,
DeviceDirection::Input | DeviceDirection::Duplex
)
}
fn supports_output(&self) -> bool {
matches!(
self.direction,
DeviceDirection::Output | DeviceDirection::Duplex
)
}
fn supported_input_configs(&self) -> Result<Self::SupportedInputConfigs, Error> {
if !self.supports_input() {
return Ok(vec![].into_iter());
}
let rates: &[SampleRate] = if self.allow_rates.is_empty() {
&[self.rate]
} else {
&self.allow_rates
};
Ok(rates
.iter()
.flat_map(|&rate| {
SUPPORTED_FORMATS
.iter()
.map(move |sample_format| SupportedStreamConfigRange {
channels: self.channels,
min_sample_rate: rate,
max_sample_rate: rate,
buffer_size: SupportedBufferSize::Range {
min: self.min_quantum,
max: self.max_quantum,
},
sample_format: *sample_format,
})
})
.collect::<Vec<_>>()
.into_iter())
}
fn supported_output_configs(&self) -> Result<Self::SupportedOutputConfigs, Error> {
if !self.supports_output() {
return Ok(vec![].into_iter());
}
let rates: &[SampleRate] = if self.allow_rates.is_empty() {
&[self.rate]
} else {
&self.allow_rates
};
Ok(rates
.iter()
.flat_map(|&rate| {
SUPPORTED_FORMATS
.iter()
.map(move |sample_format| SupportedStreamConfigRange {
channels: self.channels,
min_sample_rate: rate,
max_sample_rate: rate,
buffer_size: SupportedBufferSize::Range {
min: self.min_quantum,
max: self.max_quantum,
},
sample_format: *sample_format,
})
})
.collect::<Vec<_>>()
.into_iter())
}
fn default_input_config(&self) -> Result<SupportedStreamConfig, Error> {
if !self.supports_input() {
return Err(Error::with_message(
ErrorKind::UnsupportedOperation,
"Device does not support input",
));
}
Ok(SupportedStreamConfig {
channels: self.channels,
sample_format: SampleFormat::F32,
sample_rate: self.rate,
buffer_size: SupportedBufferSize::Range {
min: self.min_quantum,
max: self.max_quantum,
},
})
}
fn default_output_config(&self) -> Result<SupportedStreamConfig, Error> {
if !self.supports_output() {
return Err(Error::with_message(
ErrorKind::UnsupportedOperation,
"Device does not support output",
));
}
Ok(SupportedStreamConfig {
channels: self.channels,
sample_format: SampleFormat::F32,
sample_rate: self.rate,
buffer_size: SupportedBufferSize::Range {
min: self.min_quantum,
max: self.max_quantum,
},
})
}
fn build_input_stream_raw<D, E>(
&self,
config: StreamConfig,
sample_format: SampleFormat,
data_callback: D,
error_callback: E,
timeout: Option<std::time::Duration>,
) -> Result<Self::Stream, Error>
where
D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
E: FnMut(Error) + Send + 'static,
{
crate::validate_stream_config(&config)?;
if let BufferSize::Fixed(n) = config.buffer_size {
if self.max_quantum > 0 && !(self.min_quantum..=self.max_quantum).contains(&n) {
return Err(Error::with_message(
ErrorKind::UnsupportedConfig,
format!(
"Buffer size {n} is not in the supported quantum range {min}..={max}",
min = self.min_quantum,
max = self.max_quantum
),
));
}
}
let (pw_play_tx, pw_play_rx) = pw::channel::channel::<StreamCommand>();
let (init_tx, init_rx) = mpsc::channel::<Result<(), Error>>();
let mut latch = Latch::new();
let waiter = latch.waiter();
let device = self.clone();
let wait_timeout = timeout.unwrap_or(Duration::from_secs(2));
let initial_quantum = match config.buffer_size {
BufferSize::Fixed(n) => n as u64,
BufferSize::Default => self.quantum as u64,
};
let last_quantum = Arc::new(AtomicU64::new(initial_quantum));
let last_quantum_clone = last_quantum.clone();
let start = std::time::Instant::now();
let handle = thread::Builder::new()
.name("pw_in".to_owned())
.spawn(move || {
let _pw = PwInitGuard::new();
let properties = device.pw_properties(DeviceDirection::Input, &config);
let stream_data = match super::stream::connect_input(
super::stream::ConnectParams {
config,
properties,
sample_format,
last_quantum: last_quantum_clone,
start,
connect_automatically: device.connect_automatically.load(Ordering::Relaxed),
},
data_callback,
error_callback,
) {
Ok(d) => d,
Err(e) => {
let _ = init_tx.send(Err(Error::with_message(
ErrorKind::UnsupportedConfig,
format!("PipeWire stream connection failed: {e}"),
)));
return;
}
};
let StreamData {
mainloop,
listener,
stream,
context,
core,
core_monitor,
error_callback,
pending_device_changed,
invalidated,
is_default_device,
} = stream_data;
let default_monitor = if let Some(key) = device.default_metadata_key() {
match core.get_registry_rc() {
Ok(registry) => Some(DefaultDeviceMonitor::new(
registry,
key,
error_callback.clone(),
invalidated,
pending_device_changed,
)),
Err(e) => {
let _ = init_tx.send(Err(Error::with_message(
ErrorKind::BackendError,
format!("PipeWire: could not acquire registry: {e}"),
)));
return;
}
}
} else {
None
};
is_default_device.store(default_monitor.is_some(), Ordering::Relaxed);
let stream_clone = stream.clone();
let mainloop_rc1 = mainloop.clone();
let error_callback_cmd = error_callback.clone();
let _receiver = pw_play_rx.attach(mainloop.loop_(), move |play| match play {
StreamCommand::Toggle(state) => {
if let Err(e) = stream_clone.set_active(state) {
emit_error(
&error_callback_cmd,
Error::with_message(
ErrorKind::StreamInvalidated,
format!("PipeWire: set_active({state}) failed: {e}"),
),
);
}
}
StreamCommand::Stop => {
if let Err(e) = stream_clone.disconnect() {
emit_error(
&error_callback_cmd,
Error::with_message(
ErrorKind::StreamInvalidated,
format!("PipeWire: stream disconnect failed: {e}"),
),
);
}
mainloop_rc1.quit();
}
});
if init_tx.send(Ok(())).is_err() {
return;
}
if !waiter.wait() {
return;
}
mainloop.run();
drop(listener);
drop(default_monitor);
drop(core_monitor);
drop(core);
drop(context);
})
.map_err(|e| {
Error::with_message(
ErrorKind::ResourceExhausted,
format!("Failed to create thread: {e}"),
)
})?;
let init_result = init_rx.recv_timeout(wait_timeout).unwrap_or_else(|_| {
Err(Error::with_message(
ErrorKind::DeviceNotAvailable,
"PipeWire timed out",
))
});
if let Err(e) = init_result {
drop(latch);
return Err(e);
}
latch.add_thread(handle.thread().clone());
let stream = Stream::new(handle, pw_play_tx, last_quantum, start, latch);
stream.signal_ready();
Ok(stream)
}
fn build_output_stream_raw<D, E>(
&self,
config: StreamConfig,
sample_format: SampleFormat,
data_callback: D,
error_callback: E,
timeout: Option<std::time::Duration>,
) -> Result<Self::Stream, Error>
where
D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
E: FnMut(Error) + Send + 'static,
{
crate::validate_stream_config(&config)?;
if let BufferSize::Fixed(n) = config.buffer_size {
if self.max_quantum > 0 && !(self.min_quantum..=self.max_quantum).contains(&n) {
return Err(Error::with_message(
ErrorKind::UnsupportedConfig,
format!(
"Buffer size {n} is not in the supported quantum range {min}..={max}",
min = self.min_quantum,
max = self.max_quantum
),
));
}
}
let (pw_play_tx, pw_play_rx) = pw::channel::channel::<StreamCommand>();
let (init_tx, init_rx) = mpsc::channel::<Result<(), Error>>();
let mut latch = Latch::new();
let waiter = latch.waiter();
let device = self.clone();
let wait_timeout = timeout.unwrap_or(Duration::from_secs(2));
let initial_quantum = match config.buffer_size {
BufferSize::Fixed(n) => n as u64,
BufferSize::Default => self.quantum as u64,
};
let last_quantum = Arc::new(AtomicU64::new(initial_quantum));
let last_quantum_clone = last_quantum.clone();
let start = std::time::Instant::now();
let handle = thread::Builder::new()
.name("pw_out".to_owned())
.spawn(move || {
let _pw = PwInitGuard::new();
let properties = device.pw_properties(DeviceDirection::Output, &config);
let stream_data = match super::stream::connect_output(
super::stream::ConnectParams {
config,
properties,
sample_format,
last_quantum: last_quantum_clone,
start,
connect_automatically: device.connect_automatically.load(Ordering::Relaxed),
},
data_callback,
error_callback,
) {
Ok(d) => d,
Err(e) => {
let _ = init_tx.send(Err(Error::with_message(
ErrorKind::UnsupportedConfig,
format!("PipeWire stream connection failed: {e}"),
)));
return;
}
};
let StreamData {
mainloop,
listener,
stream,
context,
core,
core_monitor,
error_callback,
pending_device_changed,
invalidated,
is_default_device,
} = stream_data;
let default_monitor = if let Some(key) = device.default_metadata_key() {
match core.get_registry_rc() {
Ok(registry) => Some(DefaultDeviceMonitor::new(
registry,
key,
error_callback.clone(),
invalidated,
pending_device_changed,
)),
Err(e) => {
let _ = init_tx.send(Err(Error::with_message(
ErrorKind::BackendError,
format!("PipeWire: could not acquire registry: {e}"),
)));
return;
}
}
} else {
None
};
is_default_device.store(default_monitor.is_some(), Ordering::Relaxed);
let stream_clone = stream.clone();
let mainloop_rc1 = mainloop.clone();
let error_callback_cmd = error_callback.clone();
let _receiver = pw_play_rx.attach(mainloop.loop_(), move |play| match play {
StreamCommand::Toggle(state) => {
if let Err(e) = stream_clone.set_active(state) {
emit_error(
&error_callback_cmd,
Error::with_message(
ErrorKind::StreamInvalidated,
format!("PipeWire: set_active({state}) failed: {e}"),
),
);
}
}
StreamCommand::Stop => {
if let Err(e) = stream_clone.disconnect() {
emit_error(
&error_callback_cmd,
Error::with_message(
ErrorKind::StreamInvalidated,
format!("PipeWire: stream disconnect failed: {e}"),
),
);
}
mainloop_rc1.quit();
}
});
if init_tx.send(Ok(())).is_err() {
return;
}
if !waiter.wait() {
return;
}
mainloop.run();
drop(listener);
drop(default_monitor);
drop(core_monitor);
drop(core);
drop(context);
})
.map_err(|e| {
Error::with_message(
ErrorKind::ResourceExhausted,
format!("Failed to create thread: {e}"),
)
})?;
let init_result = init_rx.recv_timeout(wait_timeout).unwrap_or_else(|_| {
Err(Error::with_message(
ErrorKind::DeviceNotAvailable,
"PipeWire timed out",
))
});
if let Err(e) = init_result {
drop(latch);
return Err(e);
}
latch.add_thread(handle.thread().clone());
let stream = Stream::new(handle, pw_play_tx, last_quantum, start, latch);
stream.signal_ready();
Ok(stream)
}
}
#[derive(Clone, Default)]
struct Settings {
rate: SampleRate,
allow_rates: Box<[SampleRate]>,
quantum: FrameCount,
min_quantum: FrameCount,
max_quantum: FrameCount,
}
#[allow(dead_code)]
enum Request {
Node(NodeListener),
Meta(MetadataListener),
}
impl From<NodeListener> for Request {
fn from(value: NodeListener) -> Self {
Self::Node(value)
}
}
impl From<MetadataListener> for Request {
fn from(value: MetadataListener) -> Self {
Self::Meta(value)
}
}
struct NodeOverrides {
rate: Option<SampleRate>,
quantum: Option<FrameCount>,
}
fn parse_fraction(s: &str) -> Option<(u32, u32)> {
let mut it = s.splitn(2, '/');
let num: u32 = it.next()?.parse().ok()?;
let den: u32 = it.next()?.parse().ok()?;
Some((num, den))
}
fn remote_props() -> Option<PropertiesBox> {
let socket = super::utils::find_socket_path()?;
let mut props = PropertiesBox::new();
props.insert(*pw::keys::REMOTE_NAME, socket.to_string_lossy().as_ref());
Some(props)
}
pub fn init_devices(connect_automatically: Arc<AtomicBool>) -> Option<Vec<Device>> {
let _pw = PwInitGuard::new();
let mainloop = MainLoopRc::new(None).ok()?;
let context = ContextRc::new(&mainloop, None).ok()?;
let core = context.connect_rc(remote_props()).ok()?;
let registry = core.get_registry_rc().ok()?;
let discovered: Rc<RefCell<Vec<(Device, NodeOverrides)>>> = Rc::new(RefCell::new(vec![]));
let requests = Rc::new(RefCell::new(vec![]));
let settings = Rc::new(RefCell::new(Settings::default()));
let loop_clone = mainloop.clone();
let pending_events: Rc<RefCell<Vec<AsyncSeq>>> = Rc::new(RefCell::new(vec![]));
let pending = core.sync(0).ok()?;
pending_events.borrow_mut().push(pending);
let _listener_core = core
.add_listener_local()
.done({
let pending_events = pending_events.clone();
move |id, seq| {
if id != PW_ID_CORE {
return;
}
let mut pendinglist = pending_events.borrow_mut();
let Some(index) = pendinglist.iter().position(|o_seq| *o_seq == seq) else {
return;
};
pendinglist.remove(index);
if !pendinglist.is_empty() {
return;
}
loop_clone.quit();
}
})
.register();
let _listener_reg = registry
.add_listener_local()
.global({
let discovered = discovered.clone();
let registry = registry.clone();
let requests = requests.clone();
let settings = settings.clone();
move |global| match global.type_ {
ObjectType::Metadata => {
if !global.props.is_some_and(|props| {
props
.get(METADATA_NAME)
.is_some_and(|name| name == "settings")
}) {
return;
}
let meta_settings: Metadata = match registry.bind(global) {
Ok(meta_settings) => meta_settings,
Err(_) => {
return;
}
};
let settings = settings.clone();
let listener = meta_settings
.add_listener_local()
.property(move |_, key, _, value| {
match (key, value) {
(Some(clock::RATE), Some(rate)) => {
let Ok(rate) = rate.parse() else {
return 0;
};
settings.borrow_mut().rate = rate;
}
(Some(clock::ALLOWED_RATES), Some(list)) => {
let Some(allow_rates) = parse_allow_rates(list) else {
return 0;
};
settings.borrow_mut().allow_rates =
allow_rates.into_boxed_slice();
}
(Some(clock::QUANTUM), Some(quantum)) => {
let Ok(quantum) = quantum.parse() else {
return 0;
};
settings.borrow_mut().quantum = quantum;
}
(Some(clock::MIN_QUANTUM), Some(min_quantum)) => {
let Ok(min_quantum) = min_quantum.parse() else {
return 0;
};
settings.borrow_mut().min_quantum = min_quantum;
}
(Some(clock::MAX_QUANTUM), Some(max_quantum)) => {
let Ok(max_quantum) = max_quantum.parse() else {
return 0;
};
settings.borrow_mut().max_quantum = max_quantum;
}
_ => {}
}
0
})
.register();
let Ok(pending) = core.sync(0) else {
return;
};
pending_events.borrow_mut().push(pending);
requests
.borrow_mut()
.push((meta_settings.upcast(), Request::Meta(listener)));
}
ObjectType::Node => {
let Some(props) = global.props else {
return;
};
let Some(media_class) = props.get(*pw::keys::MEDIA_CLASS) else {
return;
};
if !matches!(
media_class,
audio::SINK
| audio::SOURCE
| audio::DUPLEX
| audio::STREAM_INPUT
| audio::STREAM_OUTPUT
) {
return;
}
let node: Node = match registry.bind(global) {
Ok(node) => node,
Err(_) => {
return;
}
};
let discovered = discovered.clone();
let listener = node
.add_listener_local()
.info(move |info| {
let Some(props) = info.props() else {
return;
};
let Some(media_class) = props.get(*pw::keys::MEDIA_CLASS) else {
return;
};
let role = match media_class {
audio::SINK => Role::Sink,
audio::SOURCE => Role::Source,
audio::DUPLEX => Role::Duplex,
audio::STREAM_OUTPUT => Role::StreamOutput,
audio::STREAM_INPUT => Role::StreamInput,
_ => {
return;
}
};
let direction = match role {
Role::Sink => DeviceDirection::Duplex,
Role::Source => DeviceDirection::Input,
Role::Duplex => DeviceDirection::Duplex,
Role::StreamOutput => DeviceDirection::Output,
Role::StreamInput => DeviceDirection::Input,
};
let Some(object_serial) = props
.get(*pw::keys::OBJECT_SERIAL)
.and_then(|serial| serial.parse().ok())
else {
return;
};
let node_name = props
.get(*pw::keys::NODE_NAME)
.unwrap_or("unknown")
.to_owned();
let description = props
.get(*pw::keys::NODE_DESCRIPTION)
.unwrap_or("unknown")
.to_owned();
let nick_name = props
.get(*pw::keys::NODE_NICK)
.unwrap_or(description.as_str())
.to_owned();
let channels = props
.get(*pw::keys::AUDIO_CHANNELS)
.and_then(|channels| channels.parse().ok())
.unwrap_or(2);
let icon_name =
props.get(DEVICE_ICON_NAME).unwrap_or("default").to_owned();
let interface_type = match props.get(*pw::keys::DEVICE_API) {
Some("bluez5") => InterfaceType::Bluetooth,
_ => match props.get("device.bus") {
Some("pci") => InterfaceType::Pci,
Some("usb") => InterfaceType::Usb,
Some("firewire") => InterfaceType::FireWire,
Some("thunderbolt") => InterfaceType::Thunderbolt,
_ => InterfaceType::Unknown,
},
};
let address = props
.get("api.bluez5.address")
.or_else(|| props.get("api.alsa.path"))
.map(|s| s.to_owned());
let driver = props.get(*pw::keys::FACTORY_NAME).map(|s| s.to_owned());
let node_rate: Option<SampleRate> = props
.get(node::RATE)
.and_then(parse_fraction)
.filter(|(_, den)| *den > 0)
.map(|(_, den)| den);
let (node_quantum, latency_rate): (
Option<FrameCount>,
Option<SampleRate>,
) = props
.get(node::LATENCY)
.and_then(parse_fraction)
.filter(|(num, den)| *num > 0 && *den > 0)
.unzip();
let rate_override = node_rate.or(latency_rate);
let device = Device {
node_name,
nick_name,
description,
direction,
role,
channels,
icon_name,
object_serial,
interface_type,
address,
driver,
..Default::default()
};
discovered.borrow_mut().push((
device,
NodeOverrides {
rate: rate_override,
quantum: node_quantum,
},
));
})
.register();
let Ok(pending) = core.sync(0) else {
return;
};
pending_events.borrow_mut().push(pending);
requests
.borrow_mut()
.push((node.upcast(), Request::Node(listener)));
}
_ => {}
}
})
.register();
let (cancel_tx, cancel_rx) = mpsc::channel::<()>();
let (timeout_tx, timeout_rx) = pw::channel::channel::<()>();
let loop_quit = mainloop.clone();
let _timeout_watcher = timeout_rx.attach(mainloop.loop_(), move |_| {
loop_quit.quit();
});
thread::spawn(move || {
if cancel_rx.recv_timeout(INIT_TIMEOUT).is_err() {
let _ = timeout_tx.send(());
}
});
mainloop.run();
let _ = cancel_tx.send(());
if discovered.borrow().is_empty() {
return None;
}
let settings = settings.take();
let mut devices = vec![
Device::sink_default(),
Device::input_default(),
Device::output_default(),
];
let shared_rates: Arc<[SampleRate]> = Arc::from(settings.allow_rates.as_ref());
for device in devices.iter_mut() {
device.rate = settings.rate;
device.allow_rates = Arc::clone(&shared_rates);
device.quantum = settings.quantum;
device.min_quantum = settings.min_quantum;
device.max_quantum = settings.max_quantum;
device.connect_automatically = connect_automatically.clone();
}
devices.extend(
discovered
.take()
.into_iter()
.map(|(mut device, overrides)| {
device.rate = overrides.rate.unwrap_or(settings.rate);
device.allow_rates = Arc::clone(&shared_rates);
device.quantum = overrides.quantum.unwrap_or(settings.quantum);
device.min_quantum = settings.min_quantum;
device.max_quantum = settings.max_quantum;
device.connect_automatically = connect_automatically.clone();
device
}),
);
Some(devices)
}
fn parse_allow_rates(list: &str) -> Option<Vec<SampleRate>> {
list.trim()
.strip_prefix("[")?
.strip_suffix("]")?
.split([' ', ','])
.filter(|s| !s.is_empty())
.map(|s| s.parse().ok())
.collect()
}
#[cfg(test)]
mod test {
use super::{parse_allow_rates, parse_fraction, Class, Device};
use crate::host::pipewire::utils::default;
#[test]
fn rate_parse() {
let rate_str = r#" [ 44100 48000 88200 96000 176400 192000 ] "#;
let rates = parse_allow_rates(rate_str).unwrap();
assert_eq!(rates, vec![44100, 48000, 88200, 96000, 176400, 192000]);
let rate_str = r#" [ 44100, 48000, 88200, 96000 ,176400 ,192000 ] "#;
let rates = parse_allow_rates(rate_str).unwrap();
assert_eq!(rates, vec![44100, 48000, 88200, 96000, 176400, 192000]);
assert_eq!(rates, vec![44100, 48000, 88200, 96000, 176400, 192000]);
let rate_str = r#" { 44100, 48000, 88200, 96000 ,176400 ,192000 } "#;
let rates = parse_allow_rates(rate_str);
assert_eq!(rates, None);
}
#[test]
fn fraction_parse() {
assert_eq!(parse_fraction("1/48000"), Some((1, 48000)));
assert_eq!(parse_fraction("256/48000"), Some((256, 48000)));
assert_eq!(parse_fraction("0/48000"), Some((0, 48000)));
assert_eq!(parse_fraction("256/0"), Some((256, 0)));
assert_eq!(parse_fraction(""), None);
assert_eq!(parse_fraction("48000"), None);
assert_eq!(parse_fraction("abc/def"), None);
assert_eq!(parse_fraction("/48000"), None);
assert_eq!(parse_fraction("256/"), None);
}
#[test]
fn default_metadata_key_mapping() {
assert_eq!(
Device::output_default().default_metadata_key(),
Some(default::SINK)
);
assert_eq!(
Device::sink_default().default_metadata_key(),
Some(default::SINK)
);
assert_eq!(
Device::input_default().default_metadata_key(),
Some(default::SOURCE)
);
let node = Device {
class: Class::Node,
..Default::default()
};
assert_eq!(node.default_metadata_key(), None);
}
}