use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use pipewire as pw;
use pw::properties::properties;
use pw::spa;
use pw::spa::sys as spa_sys;
use crate::backend::{AudioBackend, BackendType};
use crate::buffer::IoRingBuffer;
use crate::config::AudioConfig;
use crate::error::{IoError, IoResult};
use crate::output_window::{OutputSlot, OutputWindow};
use crate::PwBuffers;
use rill_core::io::IoBackend;
const MAX_BLOCK_SAMPLES: usize = 8192;
#[derive(Copy, Clone)]
struct CbSlot(usize);
unsafe impl Send for CbSlot {}
unsafe impl Sync for CbSlot {}
impl CbSlot {
fn new() -> Self {
Self(Box::into_raw(Box::new(None::<Box<dyn Fn()>>)) as usize)
}
unsafe fn set(&self, cb: Box<dyn Fn()>) {
(*(self.0 as *mut Option<Box<dyn Fn()>>)) = Some(cb);
}
unsafe fn call(&self) {
if let Some(ref cb) = *(self.0 as *mut Option<Box<dyn Fn()>>) {
cb();
}
}
unsafe fn drop_box(&self) {
drop(Box::from_raw(self.0 as *mut Option<Box<dyn Fn()>>));
}
}
pub struct PipewireBackend {
config: AudioConfig,
input_buffer: Arc<IoRingBuffer>,
process_cb: CbSlot,
xruns: Arc<AtomicU32>,
running: Arc<AtomicBool>,
output_slot: OutputSlot,
negotiated_input_channels: Arc<AtomicU32>,
negotiated_input_rate: Arc<AtomicU32>,
}
impl fmt::Debug for PipewireBackend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PipewireBackend")
.field("config", &self.config)
.finish()
}
}
impl PipewireBackend {
pub fn new(config: AudioConfig) -> IoResult<Self> {
if !cfg!(target_os = "linux") {
return Err(IoError::Unsupported(
"PipeWire is only available on Linux".into(),
));
}
Ok(Self {
input_buffer: Arc::new(IoRingBuffer::new(
(config.buffer_size * config.input_channels.max(1) * 32) as usize,
)),
config,
process_cb: CbSlot::new(),
xruns: Arc::new(AtomicU32::new(0)),
running: Arc::new(AtomicBool::new(false)),
output_slot: OutputSlot::new(),
negotiated_input_channels: Arc::new(AtomicU32::new(0)),
negotiated_input_rate: Arc::new(AtomicU32::new(0)),
})
}
pub fn negotiated_rate(&self) -> u32 {
self.negotiated_input_rate.load(Ordering::Relaxed)
}
pub fn negotiated_channels(&self) -> u32 {
self.negotiated_input_channels.load(Ordering::Relaxed)
}
pub fn rings(&self) -> Arc<PwBuffers> {
Arc::new(PwBuffers {
input: self.input_buffer.clone(),
output: Arc::new(IoRingBuffer::new(0)),
})
}
}
impl IoBackend<f32> for PipewireBackend {
fn set_process_callback(&self, cb: Box<dyn Fn()>) {
unsafe {
self.process_cb.set(cb);
}
}
fn read(&self, channels: &mut [&mut [f32]]) -> usize {
let frames = channels.first().map(|c| c.len()).unwrap_or(0);
if frames == 0 {
return 0;
}
let out_ch = {
let c = self.negotiated_input_channels.load(Ordering::Relaxed);
if c > 0 {
c as usize
} else {
self.config.input_channels.max(1) as usize
}
};
let mut temp = [0.0f32; MAX_BLOCK_SAMPLES];
let max_s = frames.saturating_mul(out_ch).min(MAX_BLOCK_SAMPLES);
let n_read = self.input_buffer.read(&mut temp[..max_s]);
let frames_out = n_read / out_ch;
let out = frames_out.min(frames);
if out_ch >= 2 {
for i in 0..out {
if let Some(c) = channels.get_mut(0) {
c[i] = temp[i * out_ch];
}
if let Some(c) = channels.get_mut(1) {
c[i] = temp[i * out_ch + 1];
}
}
} else {
for i in 0..out {
if let Some(c) = channels.get_mut(0) {
c[i] = temp[i];
}
if let Some(c) = channels.get_mut(1) {
c[i] = temp[i];
}
}
}
out
}
fn write(&self, channels: &[&[f32]]) -> usize {
let frames = channels.first().map(|c| c.len()).unwrap_or(0);
if let Some(win) = unsafe { self.output_slot.as_mut() } {
let cap = win.capacity().min(frames * 2);
let dst = win.as_mut_slice();
for i in 0..(cap / 2) {
if let Some(ch) = channels.first() {
dst[i * 2] = ch[i];
}
if let Some(ch) = channels.get(1) {
dst[i * 2 + 1] = ch[i];
}
}
cap / 2
} else {
0
}
}
fn run(&self, running: Arc<AtomicBool>) -> Result<(), String> {
let process_cb = self.process_cb;
let oslot = self.output_slot.clone();
let ibuf = self.input_buffer.clone();
let xruns = self.xruns.clone();
let sample_rate = self.config.sample_rate;
let out_channels = self.config.output_channels;
let in_channels = self.config.input_channels;
let out_device = self.config.output_device.clone();
let in_device = self.config.input_device.clone();
pw::init();
let mainloop =
pw::main_loop::MainLoopRc::new(None).map_err(|e| format!("PW MainLoopRc::new: {e}"))?;
let context = pw::context::ContextRc::new(&mainloop, None)
.map_err(|e| format!("PW ContextRc::new: {e}"))?;
let core = context
.connect_rc(None)
.map_err(|e| format!("PW core.connect_rc: {e}"))?;
let _out_stream;
let _out_listener;
let ml = mainloop.clone();
let ml2 = ml.clone();
let running2 = running.clone();
if out_channels > 0 {
let out_chan = out_channels;
let out_node = out_device.as_deref().unwrap_or("rill-output");
let out_desc = format!("Rill Audio Output ({out_node})");
let mut out_props = properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_ROLE => "Music",
*pw::keys::MEDIA_CATEGORY => "Playback",
*pw::keys::NODE_NAME => out_node,
*pw::keys::NODE_DESCRIPTION => out_desc.as_str(),
};
out_props.insert("audio.channels", out_chan.to_string());
let stream =
pw::stream::StreamBox::new(&core, &format!("{out_node}-output"), out_props)
.map_err(|e| format!("PW StreamBox output: {e}"))?;
let out_running = running.clone();
let out_ml = ml.clone();
let listener = stream
.add_local_listener_with_user_data(())
.process(move |s, _| {
if !out_running.load(Ordering::Acquire) {
out_ml.quit();
return;
}
let mut buf = match s.dequeue_buffer() {
Some(b) => b,
None => return,
};
let datas = buf.datas_mut();
if datas.is_empty() {
return;
}
let data = &mut datas[0];
let slice = match data.data() {
Some(s) => s,
None => return,
};
let stride = out_channels as usize * 4;
let n_frames = slice.len() / stride;
let chunk_bytes = 512 * 4;
let mut offset = 0usize;
while offset + chunk_bytes <= slice.len() {
let chunk = &mut slice[offset..offset + chunk_bytes];
unsafe {
oslot.set(OutputWindow::new(chunk.as_mut_ptr() as *mut f32, 512));
process_cb.call();
oslot.clear();
}
offset += chunk_bytes;
}
if offset < slice.len() {
slice[offset..].fill(0);
}
let ck = data.chunk_mut();
*ck.offset_mut() = 0;
*ck.stride_mut() = stride as i32;
*ck.size_mut() = (stride * n_frames) as u32;
})
.register()
.map_err(|e| format!("PW output listener: {e}"))?;
_out_listener = Some(listener);
let mut audio_info = spa::param::audio::AudioInfoRaw::new();
audio_info.set_format(spa::param::audio::AudioFormat::F32LE);
audio_info.set_rate(sample_rate);
audio_info.set_channels(out_chan);
let mut position = [0; spa::param::audio::MAX_CHANNELS];
if out_chan >= 1 {
position[0] = spa_sys::SPA_AUDIO_CHANNEL_FL;
}
if out_chan >= 2 {
position[1] = spa_sys::SPA_AUDIO_CHANNEL_FR;
}
audio_info.set_position(position);
let params_bytes: Vec<u8> = spa::pod::serialize::PodSerializer::serialize(
std::io::Cursor::new(Vec::new()),
&spa::pod::Value::Object(spa::pod::Object {
type_: spa_sys::SPA_TYPE_OBJECT_Format,
id: spa_sys::SPA_PARAM_EnumFormat,
properties: audio_info.into(),
}),
)
.unwrap()
.0
.into_inner();
let mut out_params = [spa::pod::Pod::from_bytes(¶ms_bytes).unwrap()];
if let Err(e) = stream.connect(
spa::utils::Direction::Output,
None,
pw::stream::StreamFlags::AUTOCONNECT
| pw::stream::StreamFlags::MAP_BUFFERS
| pw::stream::StreamFlags::RT_PROCESS,
&mut out_params,
) {
return Err(format!("PW output connect: {e}"));
}
_out_stream = Some(stream);
} else {
_out_listener = None;
_out_stream = None;
}
self.running.store(true, Ordering::Release);
let in_node = in_device.as_deref().unwrap_or("rill-input");
let in_desc = format!("Rill Audio Input ({in_node})");
let mut in_props = properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_ROLE => "Music",
*pw::keys::MEDIA_CATEGORY => "Capture",
*pw::keys::NODE_NAME => in_node,
*pw::keys::NODE_DESCRIPTION => in_desc.as_str(),
};
in_props.insert("audio.channels", in_channels.to_string());
in_props.insert(
*pw::keys::NODE_LATENCY,
format!("{}/{}", self.config.buffer_size, sample_rate),
);
let in_stream =
match pw::stream::StreamBox::new(&core, &format!("{in_node}-input"), in_props) {
Ok(s) => Some(s),
Err(e) => {
log::warn!("PW StreamBox input: {e} — capture disabled");
None
}
};
let _in_listener;
if let Some(ref in_st) = in_stream {
let mut in_ai = spa::param::audio::AudioInfoRaw::new();
in_ai.set_format(spa::param::audio::AudioFormat::F32LE);
let in_params_bytes: Vec<u8> = spa::pod::serialize::PodSerializer::serialize(
std::io::Cursor::new(Vec::new()),
&spa::pod::Value::Object(spa::pod::Object {
type_: spa_sys::SPA_TYPE_OBJECT_Format,
id: spa_sys::SPA_PARAM_EnumFormat,
properties: in_ai.into(),
}),
)
.unwrap()
.0
.into_inner();
let mut in_params = [spa::pod::Pod::from_bytes(&in_params_bytes).unwrap()];
let buf_frames = self.config.buffer_size as usize;
let nch_fmt = self.negotiated_input_channels.clone();
let nrate_fmt = self.negotiated_input_rate.clone();
let nch_proc = self.negotiated_input_channels.clone();
let _nrate_proc = self.negotiated_input_rate.clone();
let listener = in_st
.add_local_listener_with_user_data(())
.param_changed(move |_stream, _data, id, param| {
if id == spa_sys::SPA_PARAM_Format {
if let Some(param) = param {
let mut ai = spa::param::audio::AudioInfoRaw::new();
if ai.parse(param).is_ok() {
nch_fmt.store(ai.channels(), std::sync::atomic::Ordering::Relaxed);
nrate_fmt.store(ai.rate(), std::sync::atomic::Ordering::Relaxed);
}
}
}
})
.process(move |stream, _| {
if !running2.load(Ordering::Acquire) {
ml2.quit();
return;
}
let mut buf = match stream.dequeue_buffer() {
Some(b) => b,
None => {
xruns.fetch_add(1, Ordering::Relaxed);
return;
}
};
let datas = buf.datas_mut();
if datas.is_empty() {
return;
}
let data = &mut datas[0];
let actual_channels = {
let c = nch_proc.load(std::sync::atomic::Ordering::Relaxed);
if c > 0 {
c as usize
} else {
in_channels as usize
}
};
let (_chunk_offset, chunk_size) = {
let ck = data.chunk_mut();
(*ck.offset_mut(), *ck.size_mut())
};
if chunk_size == 0 {
return;
}
let slice = match data.data() {
Some(s) => s,
None => return,
};
let stride = actual_channels * 4;
let n_samp = (chunk_size as usize / stride) * actual_channels;
let len = n_samp.min(MAX_BLOCK_SAMPLES);
let mut temp = [0.0f32; MAX_BLOCK_SAMPLES];
for (i, item) in temp.iter_mut().enumerate().take(len) {
let off = i * 4;
if off + 4 <= slice.len() {
let mut bytes = [0u8; 4];
bytes.copy_from_slice(&slice[off..off + 4]);
*item = f32::from_le_bytes(bytes);
}
}
let block_samps = buf_frames * actual_channels;
ibuf.write(&temp[..len]);
if out_channels == 0 {
while ibuf.len() >= block_samps {
unsafe {
process_cb.call();
}
}
}
})
.register()
.map_err(|e| format!("PW input listener: {e}"))?;
_in_listener = Some(listener);
if let Err(e) = in_st.connect(
spa::utils::Direction::Input,
None,
pw::stream::StreamFlags::AUTOCONNECT
| pw::stream::StreamFlags::MAP_BUFFERS
| pw::stream::StreamFlags::DRIVER,
&mut in_params,
) {
log::warn!("PW input connect: disabled — {e}");
}
} else {
_in_listener = None;
}
mainloop.run();
if let Some(ref s) = in_stream {
let _ = s.disconnect();
}
Ok(())
}
fn stop(&self) -> Result<(), String> {
self.running.store(false, Ordering::Release);
Ok(())
}
}
impl AudioBackend for PipewireBackend {
fn backend_type(&self) -> BackendType {
BackendType::PipeWire
}
fn config(&self) -> &AudioConfig {
&self.config
}
fn config_mut(&mut self) -> &mut AudioConfig {
&mut self.config
}
fn init(&mut self) -> IoResult<()> {
Ok(())
}
fn start(&mut self) -> IoResult<()> {
self.running.store(true, Ordering::Release);
Ok(())
}
fn stop(&mut self) -> IoResult<()> {
self.running.store(false, Ordering::Release);
Ok(())
}
fn read(&mut self, buffer: &mut [f32]) -> IoResult<usize> {
let n = self.input_buffer.read(buffer);
Ok(n)
}
fn write(&mut self, buffer: &[f32]) -> IoResult<usize> {
Ok(buffer.len())
}
fn xruns(&self) -> u32 {
self.xruns.load(Ordering::Acquire)
}
fn latency(&self) -> std::time::Duration {
std::time::Duration::from_micros(
(1_000_000.0 * self.config.buffer_size as f64 / self.config.sample_rate as f64) as u64,
)
}
fn list_input_devices(&self) -> Vec<String> {
vec!["default".to_string()]
}
fn list_output_devices(&self) -> Vec<String> {
vec!["default".to_string()]
}
}
impl Drop for PipewireBackend {
fn drop(&mut self) {
self.running.store(false, Ordering::Release);
unsafe {
self.process_cb.drop_box();
}
}
}