#![cfg(target_os = "linux")]
#![warn(missing_docs)]
use std::collections::VecDeque;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use flexaudio_core::backend::{CaptureBackend, RawSink};
use flexaudio_core::clock::monotonic_now_ns;
use flexaudio_core::types::{DeviceEvent, DeviceInfo, Error, ProcessMode, Result, SourceKind};
use pipewire as pw;
use pw::spa;
use pw::{properties::properties, stream::StreamFlags};
use spa::param::format::{MediaSubtype, MediaType};
use spa::param::format_utils;
use spa::pod::Pod;
const NATIVE_RATE: u32 = 48_000;
const NATIVE_CHANNELS: u16 = 2;
const MAX_WATCH_EVENTS: usize = 1024;
const ENUMERATE_DEADLINE_MS: u128 = 2_000;
fn pw_init_once() {
use std::sync::Once;
static PW_INIT: Once = Once::new();
PW_INIT.call_once(|| {
pw::init();
});
}
pub struct PwSystemBackend {
exclude_self: bool,
device_id: Option<String>,
running: Arc<AtomicBool>,
stop_tx: Option<pw::channel::Sender<Terminate>>,
handle: Option<JoinHandle<()>>,
}
struct Terminate;
impl PwSystemBackend {
pub fn new(exclude_self: bool, device_id: Option<String>) -> Self {
Self {
exclude_self,
device_id,
running: Arc::new(AtomicBool::new(false)),
stop_tx: None,
handle: None,
}
}
pub fn exclude_self(&self) -> bool {
self.exclude_self
}
}
impl Default for PwSystemBackend {
fn default() -> Self {
Self::new(false, None)
}
}
impl CaptureBackend for PwSystemBackend {
fn native_format(&self) -> (u32, u16) {
(NATIVE_RATE, NATIVE_CHANNELS)
}
fn start(&mut self, sink: RawSink) -> Result<()> {
if self.running.load(Ordering::SeqCst) {
return Ok(());
}
let device_id = self.device_id.clone();
if !self.exclude_self {
if let Some(id) = device_id.as_deref() {
if let Ok(devs) = enumerate_pw() {
let found = devs.iter().any(|d| d.is_loopback && d.id == id);
if !found {
return Err(Error::DeviceNotFound);
}
}
}
}
let (stop_tx, stop_rx) = pw::channel::channel::<Terminate>();
let (ready_tx, ready_rx) = mpsc::channel::<std::result::Result<(), String>>();
let running = self.running.clone();
running.store(true, Ordering::SeqCst);
let exclude_self = self.exclude_self;
let handle = thread::Builder::new()
.name(
if exclude_self {
"flexaudio-pw-system-excl"
} else {
"flexaudio-pw-system"
}
.into(),
)
.spawn(move || {
if exclude_self {
run_pw_process_loop(
PidSelect::Exclude(std::process::id()),
sink,
stop_rx,
&ready_tx,
);
} else {
run_pw_loop(device_id, sink, stop_rx, &ready_tx);
}
})
.map_err(|e| Error::Backend(format!("spawn pipewire thread: {e}")))?;
match ready_rx.recv() {
Ok(Ok(())) => {
self.stop_tx = Some(stop_tx);
self.handle = Some(handle);
Ok(())
}
Ok(Err(msg)) => {
running.store(false, Ordering::SeqCst);
let _ = handle.join();
Err(Error::Backend(msg))
}
Err(_) => {
running.store(false, Ordering::SeqCst);
let _ = handle.join();
Err(Error::Backend(
"pipewire setup thread terminated before signaling readiness".into(),
))
}
}
}
fn stop(&mut self) {
if !self.running.swap(false, Ordering::SeqCst) {
if let Some(h) = self.handle.take() {
let _ = h.join();
}
self.stop_tx = None;
return;
}
if let Some(tx) = self.stop_tx.take() {
let _ = tx.send(Terminate);
}
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for PwSystemBackend {
fn drop(&mut self) {
self.stop();
}
}
pub struct PwProcessBackend {
target_pid: u32,
mode: ProcessMode,
running: Arc<AtomicBool>,
stop_tx: Option<pw::channel::Sender<Terminate>>,
handle: Option<JoinHandle<()>>,
}
impl PwProcessBackend {
pub fn new(target_pid: u32, mode: ProcessMode) -> Self {
Self {
target_pid,
mode,
running: Arc::new(AtomicBool::new(false)),
stop_tx: None,
handle: None,
}
}
pub fn target_pid(&self) -> u32 {
self.target_pid
}
pub fn mode(&self) -> ProcessMode {
self.mode
}
}
impl CaptureBackend for PwProcessBackend {
fn native_format(&self) -> (u32, u16) {
(NATIVE_RATE, NATIVE_CHANNELS)
}
fn start(&mut self, sink: RawSink) -> Result<()> {
if self.running.load(Ordering::SeqCst) {
return Ok(());
}
let select = match self.mode {
ProcessMode::Include => PidSelect::Include(self.target_pid),
ProcessMode::Exclude => PidSelect::Exclude(self.target_pid),
};
let (stop_tx, stop_rx) = pw::channel::channel::<Terminate>();
let (ready_tx, ready_rx) = mpsc::channel::<std::result::Result<(), String>>();
let running = self.running.clone();
running.store(true, Ordering::SeqCst);
let handle = thread::Builder::new()
.name("flexaudio-pw-process".into())
.spawn(move || {
run_pw_process_loop(select, sink, stop_rx, &ready_tx);
})
.map_err(|e| Error::Backend(format!("spawn pipewire process thread: {e}")))?;
match ready_rx.recv() {
Ok(Ok(())) => {
self.stop_tx = Some(stop_tx);
self.handle = Some(handle);
Ok(())
}
Ok(Err(msg)) => {
running.store(false, Ordering::SeqCst);
let _ = handle.join();
Err(Error::Backend(msg))
}
Err(_) => {
running.store(false, Ordering::SeqCst);
let _ = handle.join();
Err(Error::Backend(
"pipewire process setup thread terminated before signaling readiness".into(),
))
}
}
}
fn stop(&mut self) {
if !self.running.swap(false, Ordering::SeqCst) {
if let Some(h) = self.handle.take() {
let _ = h.join();
}
self.stop_tx = None;
return;
}
if let Some(tx) = self.stop_tx.take() {
let _ = tx.send(Terminate);
}
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for PwProcessBackend {
fn drop(&mut self) {
self.stop();
}
}
fn run_pw_process_loop(
select: PidSelect,
sink: RawSink,
stop_rx: pw::channel::Receiver<Terminate>,
ready_tx: &mpsc::Sender<std::result::Result<(), String>>,
) {
let (main_loop, _keep) = match setup_pw_process(select, sink) {
Ok(t) => t,
Err(msg) => {
let _ = ready_tx.send(Err(msg));
return;
}
};
let main_loop_for_quit = main_loop.clone();
let _attached = stop_rx.attach(main_loop.loop_(), move |_terminate| {
main_loop_for_quit.quit();
});
if ready_tx.send(Ok(())).is_err() {
return;
}
main_loop.run();
}
#[allow(clippy::type_complexity)]
struct ProcessKeep {
_stream: pw::stream::StreamRc,
_listener: pw::stream::StreamListener<UserData>,
_registry: pw::registry::RegistryRc,
_registry_listener: pw::registry::Listener,
_links: std::rc::Rc<std::cell::RefCell<std::collections::HashMap<u32, Vec<pw::link::Link>>>>,
_core: pw::core::CoreRc,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct NodeEntry {
owning_client_id: Option<u32>,
app_pid: Option<u32>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct PortEntry {
node_id: u32,
direction: String,
channel: String,
}
fn pair_ports(out_ports: &[(u32, String)], in_ports: &[(u32, String)]) -> Vec<(u32, u32)> {
let mut pairs: Vec<(u32, u32)> = Vec::new();
let mut used_in: Vec<bool> = vec![false; in_ports.len()];
for (out_id, out_ch) in out_ports {
if out_ch.is_empty() {
continue;
}
if let Some(idx) = in_ports
.iter()
.enumerate()
.position(|(i, (_in_id, in_ch))| !used_in[i] && in_ch == out_ch)
{
used_in[idx] = true;
pairs.push((*out_id, in_ports[idx].0));
}
}
if out_ports.len() == 1 {
let (out_id, _out_ch) = &out_ports[0];
for (i, _in_port) in in_ports.iter().enumerate() {
if !used_in[i] {
used_in[i] = true;
pairs.push((*out_id, in_ports[i].0));
}
}
return pairs;
}
let mut paired_out: Vec<u32> = pairs.iter().map(|(o, _)| *o).collect();
for (out_id, _out_ch) in out_ports {
if paired_out.contains(out_id) {
continue;
}
if let Some(idx) = used_in.iter().position(|used| !*used) {
used_in[idx] = true;
paired_out.push(*out_id);
pairs.push((*out_id, in_ports[idx].0));
}
}
pairs
}
fn resolve_node_pid(
entry: &NodeEntry,
client_pid: &std::collections::HashMap<u32, u32>,
) -> Option<u32> {
if let Some(pid) = entry.app_pid {
return Some(pid);
}
let client_id = entry.owning_client_id?;
client_pid.get(&client_id).copied()
}
fn capture_node_name(target_pid: u32) -> String {
format!("flexaudio-capture-{target_pid}")
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum PidSelect {
Include(u32),
Exclude(u32),
}
impl PidSelect {
fn pid(self) -> u32 {
match self {
PidSelect::Include(p) | PidSelect::Exclude(p) => p,
}
}
}
#[allow(clippy::type_complexity)]
fn setup_pw_process(
select: PidSelect,
sink: RawSink,
) -> std::result::Result<(pw::main_loop::MainLoopRc, ProcessKeep), String> {
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::rc::Rc;
pw_init_once();
let main_loop = pw::main_loop::MainLoopRc::new(None)
.map_err(|e| format!("create pipewire main loop failed: {e}"))?;
let context = pw::context::ContextRc::new(&main_loop, None)
.map_err(|e| format!("create pipewire context failed: {e}"))?;
let core = context
.connect_rc(None)
.map_err(|e| format!("connect to pipewire daemon failed (is PipeWire running?): {e}"))?;
let registry = core
.get_registry_rc()
.map_err(|e| format!("get pipewire registry failed: {e}"))?;
let node_name = capture_node_name(select.pid());
let props = properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_CATEGORY => "Capture",
*pw::keys::MEDIA_CLASS => "Stream/Input/Audio",
*pw::keys::MEDIA_ROLE => "Music",
*pw::keys::NODE_NAME => node_name.as_str(),
};
let stream = pw::stream::StreamRc::new(core.clone(), "flexaudio-process-capture", props)
.map_err(|e| format!("create pipewire capture stream failed: {e}"))?;
let user_data = UserData {
format: spa::param::audio::AudioInfoRaw::new(),
sink,
};
let listener = add_capture_listener(&stream, user_data)?;
{
let values = build_format_pod_bytes()?;
let pod = Pod::from_bytes(&values)
.ok_or_else(|| "build audio format pod from bytes failed".to_string())?;
let mut params = [pod];
stream
.connect(
spa::utils::Direction::Input,
None,
StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
&mut params,
)
.map_err(|e| format!("connect pipewire capture stream failed: {e}"))?;
}
let self_node_id: Rc<Cell<Option<u32>>> = Rc::new(Cell::new(None));
let nodes: Rc<RefCell<HashMap<u32, NodeEntry>>> = Rc::new(RefCell::new(HashMap::new()));
let client_pid: Rc<RefCell<HashMap<u32, u32>>> = Rc::new(RefCell::new(HashMap::new()));
let target_client_id: Rc<Cell<Option<u32>>> = Rc::new(Cell::new(None));
let ports: Rc<RefCell<HashMap<u32, PortEntry>>> = Rc::new(RefCell::new(HashMap::new()));
let linked: Rc<RefCell<HashMap<u32, Vec<pw::link::Link>>>> =
Rc::new(RefCell::new(HashMap::new()));
#[allow(clippy::too_many_arguments)]
fn try_link(
core: &pw::core::CoreRc,
stream: &pw::stream::StreamRc,
select: PidSelect,
self_node_id: &Cell<Option<u32>>,
nodes: &RefCell<HashMap<u32, NodeEntry>>,
client_pid: &RefCell<HashMap<u32, u32>>,
ports: &RefCell<HashMap<u32, PortEntry>>,
linked: &RefCell<HashMap<u32, Vec<pw::link::Link>>>,
) {
if let PidSelect::Include(_) = select {
if !linked.borrow().is_empty() {
return;
}
}
let sid = stream.node_id();
if sid != 0 && sid != pw::constants::ID_ANY {
self_node_id.set(Some(sid));
}
let Some(self_nid) = self_node_id.get() else {
return;
};
let targets: Vec<u32> = {
let nodes = nodes.borrow();
let client_pid = client_pid.borrow();
let linked = linked.borrow();
match select {
PidSelect::Include(pid) => nodes
.iter()
.find(|(id, entry)| {
!linked.contains_key(id)
&& resolve_node_pid(entry, &client_pid) == Some(pid)
})
.map(|(&node_id, _)| node_id)
.into_iter()
.collect(),
PidSelect::Exclude(pid) => nodes
.iter()
.filter(|(id, entry)| {
if linked.contains_key(id) {
return false;
}
matches!(resolve_node_pid(entry, &client_pid), Some(other) if other != pid)
})
.map(|(&node_id, _)| node_id)
.collect(),
}
};
if targets.is_empty() {
return;
}
let in_ports: Vec<(u32, String)> = {
let ports = ports.borrow();
ports
.iter()
.filter(|(_pid, p)| p.node_id == self_nid && p.direction == "in")
.map(|(&pid, p)| (pid, p.channel.clone()))
.collect()
};
if in_ports.is_empty() {
return;
}
for target_node_id in targets {
let out_ports: Vec<(u32, String)> = {
let ports = ports.borrow();
ports
.iter()
.filter(|(_pid, p)| p.node_id == target_node_id && p.direction == "out")
.map(|(&pid, p)| (pid, p.channel.clone()))
.collect()
};
if out_ports.is_empty() {
continue;
}
let pairs = pair_ports(&out_ports, &in_ports);
if pairs.is_empty() {
continue;
}
let want = pairs.len();
let mut created: Vec<pw::link::Link> = Vec::with_capacity(want);
for (out_port_id, in_port_id) in pairs {
let link_props = properties! {
*pw::keys::LINK_OUTPUT_NODE => target_node_id.to_string(),
*pw::keys::LINK_OUTPUT_PORT => out_port_id.to_string(),
*pw::keys::LINK_INPUT_NODE => self_nid.to_string(),
*pw::keys::LINK_INPUT_PORT => in_port_id.to_string(),
};
match core.create_object::<pw::link::Link>("link-factory", &link_props) {
Ok(link) => created.push(link),
Err(_e) => {
break;
}
}
}
if created.len() != want {
drop(created);
continue;
}
linked.borrow_mut().insert(target_node_id, created);
}
}
let core_for_global = core.clone();
let stream_for_global = stream.clone();
let self_node_for_global = self_node_id.clone();
let nodes_for_global = nodes.clone();
let client_pid_for_global = client_pid.clone();
let target_client_for_global = target_client_id.clone();
let ports_for_global = ports.clone();
let linked_for_global = linked.clone();
let core_for_remove = core.clone();
let stream_for_remove = stream.clone();
let self_node_for_remove = self_node_id.clone();
let nodes_for_remove = nodes.clone();
let client_pid_for_remove = client_pid.clone();
let target_client_for_remove = target_client_id.clone();
let ports_for_remove = ports.clone();
let linked_for_remove = linked.clone();
let _registry_listener = registry
.add_listener_local()
.global(move |global| {
let _ = catch_unwind(AssertUnwindSafe(|| {
let Some(props) = global.props else {
return;
};
match global.type_ {
pw::types::ObjectType::Client => {
let Some(pid_str) = props.get(*pw::keys::SEC_PID) else {
return;
};
let Ok(pid) = pid_str.parse::<u32>() else {
return;
};
client_pid_for_global.borrow_mut().insert(global.id, pid);
if pid == select.pid() {
target_client_for_global.set(Some(global.id));
}
}
pw::types::ObjectType::Node => {
let media_class = props.get(*pw::keys::MEDIA_CLASS).unwrap_or("");
if media_class != "Stream/Output/Audio" {
return;
}
let owning_client_id = props
.get(*pw::keys::CLIENT_ID)
.and_then(|s| s.parse::<u32>().ok());
let app_pid = props
.get(*pw::keys::SEC_PID)
.and_then(|s| s.parse::<u32>().ok());
nodes_for_global.borrow_mut().insert(
global.id,
NodeEntry {
owning_client_id,
app_pid,
},
);
}
pw::types::ObjectType::Port => {
let Some(node_id) = props
.get(*pw::keys::NODE_ID)
.and_then(|s| s.parse::<u32>().ok())
else {
return;
};
let direction = props
.get(*pw::keys::PORT_DIRECTION)
.unwrap_or("")
.to_string();
if direction != "out" && direction != "in" {
return;
}
let channel = props
.get(*pw::keys::AUDIO_CHANNEL)
.unwrap_or("")
.to_string();
ports_for_global.borrow_mut().insert(
global.id,
PortEntry {
node_id,
direction,
channel,
},
);
}
_ => return,
}
try_link(
&core_for_global,
&stream_for_global,
select,
&self_node_for_global,
&nodes_for_global,
&client_pid_for_global,
&ports_for_global,
&linked_for_global,
);
}));
})
.global_remove(move |id| {
let _ = catch_unwind(AssertUnwindSafe(|| {
let mut relink_needed = false;
let was_linked_node = linked_for_remove.borrow().contains_key(&id);
let was_target_client = target_client_for_remove.get() == Some(id);
let was_self_node = self_node_for_remove.get() == Some(id);
let (linked_out_owner, was_self_in_port): (Option<u32>, bool) = {
let ports = ports_for_remove.borrow();
let owner = ports.get(&id).and_then(|p| {
if p.direction == "out"
&& linked_for_remove.borrow().contains_key(&p.node_id)
{
Some(p.node_id)
} else {
None
}
});
let self_in = if let Some(self_nid) = self_node_for_remove.get() {
ports
.get(&id)
.map(|p| p.node_id == self_nid && p.direction == "in")
.unwrap_or(false)
} else {
false
};
(owner, self_in)
};
if was_self_node || was_self_in_port || was_target_client {
linked_for_remove.borrow_mut().clear();
relink_needed = true;
} else {
if was_linked_node {
linked_for_remove.borrow_mut().remove(&id);
relink_needed = true;
}
if let Some(owner) = linked_out_owner {
linked_for_remove.borrow_mut().remove(&owner);
relink_needed = true;
}
}
if was_target_client {
target_client_for_remove.set(None);
}
if was_self_node {
self_node_for_remove.set(None);
}
nodes_for_remove.borrow_mut().remove(&id);
client_pid_for_remove.borrow_mut().remove(&id);
ports_for_remove.borrow_mut().remove(&id);
if relink_needed {
try_link(
&core_for_remove,
&stream_for_remove,
select,
&self_node_for_remove,
&nodes_for_remove,
&client_pid_for_remove,
&ports_for_remove,
&linked_for_remove,
);
}
}));
})
.register();
Ok((
main_loop,
ProcessKeep {
_stream: stream,
_listener: listener,
_registry: registry,
_registry_listener,
_links: linked,
_core: core,
},
))
}
struct UserData {
format: spa::param::audio::AudioInfoRaw,
sink: RawSink,
}
fn add_capture_listener(
stream: &pw::stream::StreamRc,
user_data: UserData,
) -> std::result::Result<pw::stream::StreamListener<UserData>, String> {
PROC_SCRATCH.with(|cell| {
let mut s = cell.borrow_mut();
let cap = s.capacity();
if cap < PROC_SCRATCH_CAP {
s.reserve(PROC_SCRATCH_CAP - cap);
}
});
stream
.add_local_listener_with_user_data(user_data)
.param_changed(|_stream, user_data, id, param| {
let _ = catch_unwind(AssertUnwindSafe(|| {
let Some(param) = param else {
return;
};
if id != pw::spa::param::ParamType::Format.as_raw() {
return;
}
let (media_type, media_subtype) = match format_utils::parse_format(param) {
Ok(v) => v,
Err(_) => return,
};
if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw {
return;
}
if user_data.format.parse(param).is_err() {
}
}));
})
.process(|stream, user_data| {
let _ = catch_unwind(AssertUnwindSafe(|| {
let Some(mut buffer) = stream.dequeue_buffer() else {
return;
};
let datas = buffer.datas_mut();
if datas.is_empty() {
return;
}
let data = &mut datas[0];
let chunk = data.chunk();
let size = chunk.size() as usize;
let offset = chunk.offset() as usize;
if size == 0 {
return;
}
let Some(bytes) = data.data() else {
return;
};
let end = offset.saturating_add(size);
if end > bytes.len() {
return;
}
let valid = &bytes[offset..end];
let n_floats = valid.len() / std::mem::size_of::<f32>();
if n_floats == 0 {
return;
}
PROC_SCRATCH.with(|cell| {
let mut scratch = cell.borrow_mut();
let cap = scratch.capacity();
if n_floats > cap {
scratch.reserve(n_floats - cap);
}
scratch.clear();
for i in 0..n_floats {
let b = i * 4;
let v = f32::from_le_bytes([
valid[b],
valid[b + 1],
valid[b + 2],
valid[b + 3],
]);
scratch.push(v);
}
user_data.sink.push(&scratch, monotonic_now_ns());
});
}));
})
.register()
.map_err(|e| format!("register pipewire stream listener failed: {e}"))
}
fn build_format_pod_bytes() -> std::result::Result<Vec<u8>, String> {
let mut audio_info = spa::param::audio::AudioInfoRaw::new();
audio_info.set_format(spa::param::audio::AudioFormat::F32LE);
audio_info.set_rate(NATIVE_RATE);
audio_info.set_channels(NATIVE_CHANNELS as u32);
let obj = pw::spa::pod::Object {
type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(),
id: pw::spa::param::ParamType::EnumFormat.as_raw(),
properties: audio_info.into(),
};
let values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
std::io::Cursor::new(Vec::new()),
&pw::spa::pod::Value::Object(obj),
)
.map_err(|e| format!("serialize audio format pod failed: {e}"))?
.0
.into_inner();
Ok(values)
}
fn run_pw_loop(
device_id: Option<String>,
sink: RawSink,
stop_rx: pw::channel::Receiver<Terminate>,
ready_tx: &mpsc::Sender<std::result::Result<(), String>>,
) {
let (main_loop, _stream, _listener) = match setup_pw(device_id, sink) {
Ok(t) => t,
Err(msg) => {
let _ = ready_tx.send(Err(msg));
return;
}
};
let main_loop_for_quit = main_loop.clone();
let _attached = stop_rx.attach(main_loop.loop_(), move |_terminate| {
main_loop_for_quit.quit();
});
if ready_tx.send(Ok(())).is_err() {
return;
}
main_loop.run();
}
#[allow(clippy::type_complexity)]
fn setup_pw(
device_id: Option<String>,
sink: RawSink,
) -> std::result::Result<
(
pw::main_loop::MainLoopRc,
pw::stream::StreamRc,
pw::stream::StreamListener<UserData>,
),
String,
> {
pw_init_once();
let main_loop = pw::main_loop::MainLoopRc::new(None)
.map_err(|e| format!("create pipewire main loop failed: {e}"))?;
let context = pw::context::ContextRc::new(&main_loop, None)
.map_err(|e| format!("create pipewire context failed: {e}"))?;
let core = context
.connect_rc(None)
.map_err(|e| format!("connect to pipewire daemon failed (is PipeWire running?): {e}"))?;
let mut props = properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_CATEGORY => "Capture",
*pw::keys::MEDIA_CLASS => "Stream/Input/Audio",
*pw::keys::MEDIA_ROLE => "Music",
};
props.insert(*pw::keys::STREAM_CAPTURE_SINK, "true");
if let Some(id) = device_id {
props.insert("target.object", id);
}
let stream = pw::stream::StreamRc::new(core, "flexaudio-system-capture", props)
.map_err(|e| format!("create pipewire capture stream failed: {e}"))?;
let user_data = UserData {
format: spa::param::audio::AudioInfoRaw::new(),
sink,
};
let listener = add_capture_listener(&stream, user_data)?;
let values = build_format_pod_bytes()?;
let pod = Pod::from_bytes(&values)
.ok_or_else(|| "build audio format pod from bytes failed".to_string())?;
let mut params = [pod];
stream
.connect(
spa::utils::Direction::Input,
None,
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
&mut params,
)
.map_err(|e| format!("connect pipewire capture stream failed: {e}"))?;
Ok((main_loop, stream, listener))
}
const PROC_SCRATCH_CAP: usize = (NATIVE_RATE as usize) * (NATIVE_CHANNELS as usize);
thread_local! {
static PROC_SCRATCH: std::cell::RefCell<Vec<f32>> = const { std::cell::RefCell::new(Vec::new()) };
}
struct NodeRecord {
node_name: String,
description: String,
media_class: String,
rate: Option<u32>,
channels: Option<u16>,
}
#[derive(Default)]
struct EnumState {
nodes: Vec<NodeRecord>,
default_sink: Option<String>,
default_source: Option<String>,
}
pub fn list_devices() -> Result<Vec<DeviceInfo>> {
match enumerate_pw() {
Ok(v) => Ok(v),
Err(_msg) => Ok(Vec::new()),
}
}
fn enumerate_pw() -> std::result::Result<Vec<DeviceInfo>, String> {
use std::cell::RefCell;
use std::rc::Rc;
pw_init_once();
let main_loop = pw::main_loop::MainLoopRc::new(None)
.map_err(|e| format!("create pipewire main loop failed: {e}"))?;
let context = pw::context::ContextRc::new(&main_loop, None)
.map_err(|e| format!("create pipewire context failed: {e}"))?;
let core = context
.connect_rc(None)
.map_err(|e| format!("connect to pipewire daemon failed (is PipeWire running?): {e}"))?;
let registry = core
.get_registry_rc()
.map_err(|e| format!("get pipewire registry failed: {e}"))?;
let state = Rc::new(RefCell::new(EnumState::default()));
type MetaKeep = (Box<dyn pw::proxy::ProxyT>, Box<dyn pw::proxy::Listener>);
let meta_keep: Rc<RefCell<Vec<MetaKeep>>> = Rc::new(RefCell::new(Vec::new()));
let state_for_global = state.clone();
let registry_for_global = registry.clone();
let meta_keep_for_global = meta_keep.clone();
let _reg_listener = registry
.add_listener_local()
.global(move |global| {
let _ = catch_unwind(AssertUnwindSafe(|| {
let Some(props) = global.props else {
return;
};
match global.type_ {
pw::types::ObjectType::Node => {
let media_class = props.get(*pw::keys::MEDIA_CLASS).unwrap_or("");
if media_class != "Audio/Sink" && media_class != "Audio/Source" {
return;
}
let node_name = props.get(*pw::keys::NODE_NAME).unwrap_or("");
if node_name.is_empty() {
return;
}
let description = props
.get(*pw::keys::NODE_DESCRIPTION)
.filter(|s| !s.is_empty())
.unwrap_or(node_name);
let rate = props.get("audio.rate").and_then(|s| s.parse::<u32>().ok());
let channels = props
.get(*pw::keys::AUDIO_CHANNELS)
.and_then(|s| s.parse::<u16>().ok());
state_for_global.borrow_mut().nodes.push(NodeRecord {
node_name: node_name.to_string(),
description: description.to_string(),
media_class: media_class.to_string(),
rate,
channels,
});
}
pw::types::ObjectType::Metadata => {
let meta_name = props.get("metadata.name").unwrap_or("");
if meta_name != "default" {
return;
}
let metadata: pw::metadata::Metadata =
match registry_for_global.bind(global) {
Ok(m) => m,
Err(_) => return,
};
let state_for_meta = state_for_global.clone();
let listener = metadata
.add_listener_local()
.property(move |_subject, key, _type, value| {
catch_unwind(AssertUnwindSafe(|| {
if let (Some(key), Some(value)) = (key, value) {
if key == "default.audio.sink" {
state_for_meta.borrow_mut().default_sink =
extract_json_name(value);
} else if key == "default.audio.source" {
state_for_meta.borrow_mut().default_source =
extract_json_name(value);
}
}
}))
.ok();
0
})
.register();
meta_keep_for_global
.borrow_mut()
.push((Box::new(metadata), Box::new(listener)));
}
_ => {}
}
}));
})
.register();
let done = Rc::new(std::cell::Cell::new(false));
let stage = Rc::new(std::cell::Cell::new(0u8));
let pending1 = core
.sync(0)
.map_err(|e| format!("pipewire sync failed: {e}"))?;
let pending1 = Rc::new(std::cell::Cell::new(pending1.seq()));
let done_for_cb = done.clone();
let stage_for_cb = stage.clone();
let pending1_for_cb = pending1.clone();
let loop_for_cb = main_loop.clone();
let core_weak = core.downgrade();
let _core_listener = core
.add_listener_local()
.done(move |id, seq| {
if id != pw::core::PW_ID_CORE {
return;
}
let seq = seq.seq();
match stage_for_cb.get() {
0 if seq == pending1_for_cb.get() => {
stage_for_cb.set(1);
if let Some(core) = core_weak.upgrade() {
match core.sync(0) {
Ok(p) => pending1_for_cb.set(p.seq()),
Err(_) => {
done_for_cb.set(true);
loop_for_cb.quit();
}
}
} else {
done_for_cb.set(true);
loop_for_cb.quit();
}
}
1 if seq == pending1_for_cb.get() => {
done_for_cb.set(true);
loop_for_cb.quit();
}
_ => {}
}
})
.register();
let deadline = std::time::Instant::now();
while !done.get() {
main_loop.run();
if deadline.elapsed().as_millis() >= ENUMERATE_DEADLINE_MS {
break;
}
}
let state = state.borrow();
let mut out = Vec::with_capacity(state.nodes.len());
for n in &state.nodes {
let is_loopback = n.media_class == "Audio/Sink";
let source_kind = if is_loopback {
SourceKind::SystemLoopback
} else {
SourceKind::Mic
};
let is_default = if is_loopback {
state.default_sink.as_deref() == Some(n.node_name.as_str())
} else {
state.default_source.as_deref() == Some(n.node_name.as_str())
};
out.push(DeviceInfo {
id: n.node_name.clone(),
name: n.description.clone(),
source_kind,
sample_rate: n.rate.unwrap_or(NATIVE_RATE),
channels: n.channels.unwrap_or(NATIVE_CHANNELS),
is_loopback,
is_default,
});
}
Ok(out)
}
fn extract_json_name(value: &str) -> Option<String> {
let after_key = value.split("\"name\"").nth(1)?;
let after_colon = after_key.split(':').nth(1)?;
let start = after_colon.find('"')? + 1;
let rest = &after_colon[start..];
let end = rest.find('"')?;
let name = &rest[..end];
if name.is_empty() {
None
} else {
Some(name.to_string())
}
}
pub struct PwDeviceWatcher {
events: Arc<Mutex<VecDeque<DeviceEvent>>>,
running: Arc<AtomicBool>,
stop_tx: Option<pw::channel::Sender<Terminate>>,
handle: Option<JoinHandle<()>>,
}
impl PwDeviceWatcher {
pub fn start() -> Result<Self> {
let events: Arc<Mutex<VecDeque<DeviceEvent>>> = Arc::new(Mutex::new(VecDeque::new()));
let (stop_tx, stop_rx) = pw::channel::channel::<Terminate>();
let (ready_tx, ready_rx) = mpsc::channel::<std::result::Result<(), String>>();
let running = Arc::new(AtomicBool::new(true));
let events_for_thread = events.clone();
let handle = thread::Builder::new()
.name("flexaudio-pw-watch".into())
.spawn(move || {
run_watch_loop(events_for_thread, stop_rx, &ready_tx);
})
.map_err(|e| Error::Backend(format!("spawn pipewire watch thread: {e}")))?;
match ready_rx.recv() {
Ok(Ok(())) => Ok(Self {
events,
running,
stop_tx: Some(stop_tx),
handle: Some(handle),
}),
Ok(Err(msg)) => {
running.store(false, Ordering::SeqCst);
let _ = handle.join();
Err(Error::Backend(msg))
}
Err(_) => {
running.store(false, Ordering::SeqCst);
let _ = handle.join();
Err(Error::Backend(
"pipewire watch thread terminated before signaling readiness".into(),
))
}
}
}
pub fn poll_event(&mut self) -> Option<DeviceEvent> {
self.events.lock().ok().and_then(|mut q| q.pop_front())
}
pub fn stop(&mut self) {
if !self.running.swap(false, Ordering::SeqCst) {
if let Some(h) = self.handle.take() {
let _ = h.join();
}
self.stop_tx = None;
return;
}
if let Some(tx) = self.stop_tx.take() {
let _ = tx.send(Terminate);
}
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for PwDeviceWatcher {
fn drop(&mut self) {
self.stop();
}
}
#[derive(Default)]
struct WatchState {
by_global_id: std::collections::HashMap<u32, DeviceInfo>,
initial_scan_done: bool,
default_sink: Option<String>,
default_source: Option<String>,
}
fn run_watch_loop(
events: Arc<Mutex<VecDeque<DeviceEvent>>>,
stop_rx: pw::channel::Receiver<Terminate>,
ready_tx: &mpsc::Sender<std::result::Result<(), String>>,
) {
let (main_loop, _core, _registry, _listeners) = match setup_watch(events) {
Ok(t) => t,
Err(msg) => {
let _ = ready_tx.send(Err(msg));
return;
}
};
let main_loop_for_quit = main_loop.clone();
let _attached = stop_rx.attach(main_loop.loop_(), move |_terminate| {
main_loop_for_quit.quit();
});
if ready_tx.send(Ok(())).is_err() {
return;
}
main_loop.run();
}
#[allow(clippy::type_complexity)]
type WatchKeep = (
pw::main_loop::MainLoopRc,
pw::core::CoreRc,
pw::registry::RegistryRc,
WatchListeners,
);
type MetaKeepEntry = (Box<dyn pw::proxy::ProxyT>, Box<dyn pw::proxy::Listener>);
type MetaKeepStore = std::rc::Rc<std::cell::RefCell<Vec<MetaKeepEntry>>>;
struct WatchListeners {
_registry_listener: pw::registry::Listener,
_core_listener: pw::core::Listener,
_meta_keep: MetaKeepStore,
}
#[allow(clippy::type_complexity)]
fn setup_watch(
events: Arc<Mutex<VecDeque<DeviceEvent>>>,
) -> std::result::Result<WatchKeep, String> {
use std::cell::{Cell, RefCell};
use std::rc::Rc;
pw_init_once();
let main_loop = pw::main_loop::MainLoopRc::new(None)
.map_err(|e| format!("create pipewire main loop failed: {e}"))?;
let context = pw::context::ContextRc::new(&main_loop, None)
.map_err(|e| format!("create pipewire context failed: {e}"))?;
let core = context
.connect_rc(None)
.map_err(|e| format!("connect to pipewire daemon failed (is PipeWire running?): {e}"))?;
let registry = core
.get_registry_rc()
.map_err(|e| format!("get pipewire registry failed: {e}"))?;
let state = Rc::new(RefCell::new(WatchState::default()));
let meta_keep: MetaKeepStore = Rc::new(RefCell::new(Vec::new()));
let state_for_global = state.clone();
let events_for_global = events.clone();
let registry_for_global = registry.clone();
let meta_keep_for_global = meta_keep.clone();
let state_for_remove = state.clone();
let events_for_remove = events.clone();
let _registry_listener = registry
.add_listener_local()
.global(move |global| {
let _ = catch_unwind(AssertUnwindSafe(|| {
let Some(props) = global.props else {
return;
};
match global.type_ {
pw::types::ObjectType::Node => {
let media_class = props.get(*pw::keys::MEDIA_CLASS).unwrap_or("");
if media_class != "Audio/Sink" && media_class != "Audio/Source" {
return;
}
let node_name = props.get(*pw::keys::NODE_NAME).unwrap_or("");
if node_name.is_empty() {
return;
}
let description = props
.get(*pw::keys::NODE_DESCRIPTION)
.filter(|s| !s.is_empty())
.unwrap_or(node_name);
let rate = props.get("audio.rate").and_then(|s| s.parse::<u32>().ok());
let channels = props
.get(*pw::keys::AUDIO_CHANNELS)
.and_then(|s| s.parse::<u16>().ok());
let is_loopback = media_class == "Audio/Sink";
let source_kind = if is_loopback {
SourceKind::SystemLoopback
} else {
SourceKind::Mic
};
let mut st = state_for_global.borrow_mut();
let is_default = if is_loopback {
st.default_sink.as_deref() == Some(node_name)
} else {
st.default_source.as_deref() == Some(node_name)
};
let info = DeviceInfo {
id: node_name.to_string(),
name: description.to_string(),
source_kind,
sample_rate: rate.unwrap_or(NATIVE_RATE),
channels: channels.unwrap_or(NATIVE_CHANNELS),
is_loopback,
is_default,
};
st.by_global_id.insert(global.id, info.clone());
let initial_scan_done = st.initial_scan_done;
drop(st);
if initial_scan_done {
enqueue_event(&events_for_global, DeviceEvent::Added(info));
}
}
pw::types::ObjectType::Metadata => {
let meta_name = props.get("metadata.name").unwrap_or("");
if meta_name != "default" {
return;
}
let metadata: pw::metadata::Metadata =
match registry_for_global.bind(global) {
Ok(m) => m,
Err(_) => return,
};
let state_for_meta = state_for_global.clone();
let events_for_meta = events_for_global.clone();
let listener = metadata
.add_listener_local()
.property(move |_subject, key, _type, value| {
catch_unwind(AssertUnwindSafe(|| {
if let (Some(key), Some(value)) = (key, value) {
let new_name = extract_json_name(value);
let mut st = state_for_meta.borrow_mut();
if key == "default.audio.sink" {
if st.default_sink != new_name {
st.default_sink = new_name.clone();
if st.initial_scan_done {
if let Some(id) = new_name {
drop(st);
enqueue_event(
&events_for_meta,
DeviceEvent::DefaultChanged {
kind: SourceKind::SystemLoopback,
id,
},
);
}
}
}
} else if key == "default.audio.source"
&& st.default_source != new_name
{
st.default_source = new_name.clone();
if st.initial_scan_done {
if let Some(id) = new_name {
drop(st);
enqueue_event(
&events_for_meta,
DeviceEvent::DefaultChanged {
kind: SourceKind::Mic,
id,
},
);
}
}
}
}
}))
.ok();
0
})
.register();
meta_keep_for_global
.borrow_mut()
.push((Box::new(metadata), Box::new(listener)));
}
_ => {}
}
}));
})
.global_remove(move |id| {
let _ = catch_unwind(AssertUnwindSafe(|| {
let removed = state_for_remove.borrow_mut().by_global_id.remove(&id);
if let Some(info) = removed {
enqueue_event(&events_for_remove, DeviceEvent::Removed { id: info.id });
}
}));
})
.register();
let stage = Rc::new(Cell::new(0u8));
let pending = core
.sync(0)
.map_err(|e| format!("pipewire sync failed: {e}"))?;
let pending = Rc::new(Cell::new(pending.seq()));
let stage_for_cb = stage.clone();
let pending_for_cb = pending.clone();
let state_for_done = state.clone();
let loop_for_done = main_loop.clone();
let core_weak = core.downgrade();
let _core_listener = core
.add_listener_local()
.done(move |id, seq| {
if id != pw::core::PW_ID_CORE {
return;
}
let seq = seq.seq();
match stage_for_cb.get() {
0 if seq == pending_for_cb.get() => {
stage_for_cb.set(1);
if let Some(core) = core_weak.upgrade() {
match core.sync(0) {
Ok(p) => pending_for_cb.set(p.seq()),
Err(_) => {
stage_for_cb.set(2);
state_for_done.borrow_mut().initial_scan_done = true;
loop_for_done.quit();
}
}
} else {
stage_for_cb.set(2);
state_for_done.borrow_mut().initial_scan_done = true;
loop_for_done.quit();
}
}
1 if seq == pending_for_cb.get() => {
stage_for_cb.set(2);
state_for_done.borrow_mut().initial_scan_done = true;
loop_for_done.quit();
}
_ => {}
}
})
.register();
while !state.borrow().initial_scan_done {
main_loop.run();
}
Ok((
main_loop,
core,
registry,
WatchListeners {
_registry_listener,
_core_listener,
_meta_keep: meta_keep,
},
))
}
fn enqueue_event(events: &Arc<Mutex<VecDeque<DeviceEvent>>>, ev: DeviceEvent) {
if let Ok(mut q) = events.lock() {
while q.len() >= MAX_WATCH_EVENTS {
q.pop_front();
}
q.push_back(ev);
}
}
#[cfg(test)]
mod tests {
use super::*;
use flexaudio_core::raw_ring::raw_ring;
#[test]
fn backend_is_send() {
fn assert_send<T: Send>() {}
assert_send::<PwSystemBackend>();
}
#[test]
fn native_format_is_48k_stereo() {
let be = PwSystemBackend::new(false, None);
assert_eq!(be.native_format(), (NATIVE_RATE, NATIVE_CHANNELS));
assert_eq!(be.native_format(), (48_000, 2));
assert!(!be.exclude_self());
}
#[test]
fn stop_without_start_is_safe() {
let mut be = PwSystemBackend::new(false, None);
be.stop();
be.stop();
}
#[test]
fn system_exclude_self_is_graceful() {
let (prod, _cons) = raw_ring(1 << 16);
let sink = RawSink::new(prod, NATIVE_RATE, NATIVE_CHANNELS);
let mut be = PwSystemBackend::new(true, None);
assert!(be.exclude_self());
match be.start(sink) {
Ok(()) => {
be.stop();
}
Err(Error::Backend(_)) => {
}
Err(other) => panic!("unexpected error variant: {other:?}"),
}
}
#[test]
fn extract_json_name_parses_default_metadata_value() {
assert_eq!(
extract_json_name(r#"{"name":"alsa_output.pci-0000_00_1f.3.analog-stereo"}"#)
.as_deref(),
Some("alsa_output.pci-0000_00_1f.3.analog-stereo")
);
assert_eq!(
extract_json_name(r#"{ "name" : "foo.bar" }"#).as_deref(),
Some("foo.bar")
);
assert_eq!(extract_json_name(r#"{"other":"x"}"#), None);
assert_eq!(extract_json_name(r#"{"name":""}"#), None);
assert_eq!(extract_json_name("not json"), None);
}
#[test]
fn list_devices_is_graceful_without_pipewire() {
let devices = list_devices().expect("list_devices は Err を返さない設計");
for d in &devices {
assert!(!d.id.is_empty(), "id(=node.name)は空でない");
match d.source_kind {
SourceKind::SystemLoopback => assert!(d.is_loopback, "Sink はループバック"),
SourceKind::Mic => assert!(!d.is_loopback, "Source はループバックでない"),
other => panic!("想定外の source_kind: {other:?}"),
}
assert!(d.sample_rate > 0);
assert!(d.channels > 0);
}
let default_loopback = devices
.iter()
.filter(|d| d.is_default && d.is_loopback)
.count();
let default_mic = devices
.iter()
.filter(|d| d.is_default && !d.is_loopback)
.count();
assert!(default_loopback <= 1);
assert!(default_mic <= 1);
}
#[test]
fn start_is_graceful_without_pipewire() {
let (prod, _cons) = raw_ring(1 << 16);
let sink = RawSink::new(prod, NATIVE_RATE, NATIVE_CHANNELS);
let mut be = PwSystemBackend::new(false, None);
match be.start(sink) {
Ok(()) => {
be.stop();
}
Err(Error::Backend(_)) => {
}
Err(other) => panic!("unexpected error variant: {other:?}"),
}
}
#[test]
fn start_with_unknown_device_id_is_not_found_or_backend() {
let (prod, _cons) = raw_ring(1 << 16);
let sink = RawSink::new(prod, NATIVE_RATE, NATIVE_CHANNELS);
let mut be = PwSystemBackend::new(false, Some("flexaudio-no-such-sink-zzz".to_string()));
match be.start(sink) {
Err(Error::DeviceNotFound) => {}
Err(Error::Backend(_)) => {}
Ok(()) => {
be.stop();
panic!("start should not succeed for an unknown device_id");
}
Err(other) => panic!("unexpected error variant: {other:?}"),
}
}
#[test]
#[ignore = "requires a running PipeWire session with audio playing (desktop/laptop)"]
fn capture_smoke() {
use std::time::Duration;
let (prod, mut cons) = raw_ring(1 << 18);
let sink = RawSink::new(prod, NATIVE_RATE, NATIVE_CHANNELS);
let mut be = PwSystemBackend::new(false, None);
be.start(sink)
.expect("start should succeed on a PipeWire desktop");
thread::sleep(Duration::from_millis(500));
be.stop();
let mut out = vec![0.0f32; 1920];
let got = cons.pop_slice(&mut out);
assert!(
got > 0,
"expected captured samples from the default sink monitor"
);
}
#[test]
fn process_backend_is_send() {
fn assert_send<T: Send>() {}
assert_send::<PwProcessBackend>();
}
#[test]
fn process_native_format_is_48k_stereo() {
let be = PwProcessBackend::new(4242, ProcessMode::Exclude);
assert_eq!(be.native_format(), (NATIVE_RATE, NATIVE_CHANNELS));
assert_eq!(be.native_format(), (48_000, 2));
assert_eq!(be.target_pid(), 4242);
assert_eq!(be.mode(), ProcessMode::Exclude);
let be2 = PwProcessBackend::new(1, ProcessMode::Include);
assert_eq!(be2.mode(), ProcessMode::Include);
}
#[test]
fn process_stop_without_start_is_safe() {
let mut be = PwProcessBackend::new(1234, ProcessMode::Include);
be.stop();
be.stop();
}
#[test]
fn process_exclude_mode_is_graceful() {
let (prod, _cons) = raw_ring(1 << 16);
let sink = RawSink::new(prod, NATIVE_RATE, NATIVE_CHANNELS);
let mut be = PwProcessBackend::new(u32::MAX, ProcessMode::Exclude);
match be.start(sink) {
Ok(()) => {
let (prod2, _cons2) = raw_ring(1 << 16);
let sink2 = RawSink::new(prod2, NATIVE_RATE, NATIVE_CHANNELS);
assert!(be.start(sink2).is_ok());
be.stop();
be.stop();
}
Err(Error::Backend(_)) => {
}
Err(other) => panic!("unexpected error variant: {other:?}"),
}
}
#[test]
fn resolve_node_pid_via_client_table() {
use std::collections::HashMap;
let node = NodeEntry {
owning_client_id: Some(60),
app_pid: None,
};
let mut client_pid: HashMap<u32, u32> = HashMap::new();
assert_eq!(
resolve_node_pid(&node, &client_pid),
None,
"client.id に対応する Client がまだ無ければ PID 未解決"
);
client_pid.insert(60, 13394);
assert_eq!(
resolve_node_pid(&node, &client_pid),
Some(13394),
"client.id=60 → Client の pid=13394 を二段で解決"
);
let orphan = NodeEntry {
owning_client_id: None,
app_pid: None,
};
assert_eq!(resolve_node_pid(&orphan, &client_pid), None);
let node_with_pid = NodeEntry {
owning_client_id: Some(99), app_pid: Some(424242),
};
let empty: HashMap<u32, u32> = HashMap::new();
assert_eq!(
resolve_node_pid(&node_with_pid, &empty),
Some(424242),
"ノード自身の PID を優先して直解決"
);
let other_node = NodeEntry {
owning_client_id: Some(61),
app_pid: None,
};
assert_eq!(resolve_node_pid(&other_node, &client_pid), None);
client_pid.insert(61, 555);
assert_eq!(resolve_node_pid(&other_node, &client_pid), Some(555));
assert_eq!(resolve_node_pid(&node, &client_pid), Some(13394));
}
#[test]
fn pair_ports_maps_channels() {
let out = vec![(10u32, "FL".to_string()), (11u32, "FR".to_string())];
let inp = vec![(20u32, "FL".to_string()), (21u32, "FR".to_string())];
let mut pairs = pair_ports(&out, &inp);
pairs.sort();
assert_eq!(pairs, vec![(10, 20), (11, 21)], "FL→FL / FR→FR");
let inp_rev = vec![(21u32, "FR".to_string()), (20u32, "FL".to_string())];
let mut pairs = pair_ports(&out, &inp_rev);
pairs.sort();
assert_eq!(pairs, vec![(10, 20), (11, 21)], "並び逆でも FL→FL / FR→FR");
let mono_out = vec![(30u32, "MONO".to_string())];
let stereo_in = vec![(40u32, "FL".to_string()), (41u32, "FR".to_string())];
let mut pairs = pair_ports(&mono_out, &stereo_in);
pairs.sort();
assert_eq!(pairs, vec![(30, 40), (30, 41)], "モノは FL/FR へ複製");
let out_noch = vec![(50u32, String::new()), (51u32, String::new())];
let in_noch = vec![(60u32, String::new()), (61u32, String::new())];
let pairs = pair_ports(&out_noch, &in_noch);
assert_eq!(pairs.len(), 2);
let ins: std::collections::HashSet<u32> = pairs.iter().map(|(_, i)| *i).collect();
assert_eq!(ins.len(), 2, "各入力ポートは高々 1 回");
assert!(pair_ports(&[], &inp).is_empty());
assert!(pair_ports(&out, &[]).is_empty());
let out_fl = vec![(70u32, "FL".to_string())];
let in_fr = vec![(80u32, "FR".to_string())];
let pairs = pair_ports(&out_fl, &in_fr);
assert_eq!(pairs, vec![(70, 80)], "出力1ポートは残り入力へ複製");
}
#[test]
fn process_start_is_graceful_without_pipewire() {
let (prod, _cons) = raw_ring(1 << 16);
let sink = RawSink::new(prod, NATIVE_RATE, NATIVE_CHANNELS);
let mut be = PwProcessBackend::new(u32::MAX, ProcessMode::Include);
match be.start(sink) {
Ok(()) => {
let (prod2, _cons2) = raw_ring(1 << 16);
let sink2 = RawSink::new(prod2, NATIVE_RATE, NATIVE_CHANNELS);
assert!(be.start(sink2).is_ok());
be.stop();
be.stop();
}
Err(Error::Backend(_)) => {
}
Err(other) => panic!("unexpected error variant: {other:?}"),
}
}
#[test]
#[ignore = "requires a running PipeWire session with the target PID playing audio (set FLEXAUDIO_TEST_PID)"]
fn process_capture_smoke() {
use std::time::Duration;
let Ok(pid_str) = std::env::var("FLEXAUDIO_TEST_PID") else {
eprintln!("FLEXAUDIO_TEST_PID 未指定のためスキップ");
return;
};
let pid: u32 = pid_str.parse().expect("FLEXAUDIO_TEST_PID は u32");
let (prod, mut cons) = raw_ring(1 << 18);
let sink = RawSink::new(prod, NATIVE_RATE, NATIVE_CHANNELS);
let mut be = PwProcessBackend::new(pid, ProcessMode::Include);
be.start(sink)
.expect("start should succeed on a PipeWire desktop");
thread::sleep(Duration::from_millis(800));
be.stop();
let mut out = vec![0.0f32; 1920];
let got = cons.pop_slice(&mut out);
assert!(
got > 0,
"expected captured samples link-factory-linked from PID {pid}"
);
}
#[test]
fn watcher_is_send() {
fn assert_send<T: Send>() {}
assert_send::<PwDeviceWatcher>();
}
#[test]
fn watcher_graceful_without_pipewire() {
match PwDeviceWatcher::start() {
Ok(mut w) => {
let _ = w.poll_event();
w.stop();
}
Err(Error::Backend(_)) => {
}
Err(other) => panic!("unexpected error variant: {other:?}"),
}
}
#[test]
fn watcher_double_stop_is_safe() {
if let Ok(mut w) = PwDeviceWatcher::start() {
w.stop();
w.stop();
}
}
#[test]
fn enqueue_and_drain_is_fifo() {
let events: Arc<Mutex<VecDeque<DeviceEvent>>> = Arc::new(Mutex::new(VecDeque::new()));
let mic = DeviceInfo {
id: "mic.a".into(),
name: "Mic A".into(),
source_kind: SourceKind::Mic,
sample_rate: NATIVE_RATE,
channels: NATIVE_CHANNELS,
is_loopback: false,
is_default: false,
};
enqueue_event(&events, DeviceEvent::Added(mic.clone()));
enqueue_event(&events, DeviceEvent::Removed { id: "mic.a".into() });
enqueue_event(
&events,
DeviceEvent::DefaultChanged {
kind: SourceKind::SystemLoopback,
id: "sink.x".into(),
},
);
let mut drained = Vec::new();
while let Some(ev) = events.lock().unwrap().pop_front() {
drained.push(ev);
}
assert_eq!(
drained,
vec![
DeviceEvent::Added(mic),
DeviceEvent::Removed { id: "mic.a".into() },
DeviceEvent::DefaultChanged {
kind: SourceKind::SystemLoopback,
id: "sink.x".into(),
},
]
);
}
#[test]
fn enqueue_event_caps_queue_and_drops_oldest() {
let events: Arc<Mutex<VecDeque<DeviceEvent>>> = Arc::new(Mutex::new(VecDeque::new()));
let total = MAX_WATCH_EVENTS + 10;
for i in 0..total {
enqueue_event(
&events,
DeviceEvent::Removed {
id: format!("n{i}"),
},
);
}
let q = events.lock().unwrap();
assert_eq!(q.len(), MAX_WATCH_EVENTS, "キュー長は上限で頭打ち");
match q.front().unwrap() {
DeviceEvent::Removed { id } => assert_eq!(id, "n10", "最古から捨てられる"),
other => panic!("想定外イベント: {other:?}"),
}
match q.back().unwrap() {
DeviceEvent::Removed { id } => assert_eq!(id, &format!("n{}", total - 1)),
other => panic!("想定外イベント: {other:?}"),
}
}
}