use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use pipewire::{
core::CoreRc,
properties::properties,
stream::{StreamListener, StreamRc},
};
use libspa::{
param::audio::{AudioFormat, AudioInfoRaw},
param::format::{MediaSubtype, MediaType},
param::{format_utils, ParamType},
pod::{Object, Pod},
};
use pulp::{Arch, Simd, WithSimd};
use crate::atomic_f32::AtomicF32;
use crate::wirehose::event_sender::EventSender;
use crate::wirehose::{ObjectId, StateEvent};
pub trait PeakProcessor: Send + Sync {
fn process_peak(
&self,
current_peak: f32,
previous_peak: f32,
sample_count: u32,
sample_rate: u32,
) -> f32;
}
impl<F> PeakProcessor for F
where
F: Fn(f32, f32, u32, u32) -> f32 + Send + Sync,
{
fn process_peak(
&self,
current_peak: f32,
previous_peak: f32,
sample_count: u32,
sample_rate: u32,
) -> f32 {
self(current_peak, previous_peak, sample_count, sample_rate)
}
}
#[derive(Default)]
pub struct StreamData {
format: AudioInfoRaw,
peaks: Arc<[AtomicF32]>,
peaks_dirty: Arc<AtomicBool>,
peak_processor: Option<Arc<dyn PeakProcessor>>,
buffers_seen: u8,
}
fn find_peak(samples: &[f32]) -> f32 {
struct Max<'a>(&'a [f32]);
impl WithSimd for Max<'_> {
type Output = f32;
#[inline(always)]
fn with_simd<S: Simd>(self, simd: S) -> Self::Output {
let v = self.0;
let (head, tail) = S::as_simd_f32s(v);
let mut head_max = simd.splat_f32s(0.0);
for x in head {
head_max = simd.max_f32s(head_max, simd.abs_f32s(*x));
}
let head_max = head_max;
let mut tail_max = simd.reduce_max_f32s(head_max);
for x in tail {
tail_max = tail_max.max(x.abs());
}
tail_max
}
}
Arch::new().dispatch(Max(samples))
}
pub fn capture_node(
core: &CoreRc,
sender: &Rc<EventSender>,
object_id: ObjectId,
serial: &str,
capture_sink: bool,
peaks_dirty: Arc<AtomicBool>,
peak_processor: Option<Arc<dyn PeakProcessor>>,
) -> Option<(StreamRc, StreamListener<StreamData>)> {
let mut props = properties! {
*pipewire::keys::TARGET_OBJECT => String::from(serial),
*pipewire::keys::STREAM_MONITOR => "true",
*pipewire::keys::NODE_NAME => "wiremix-capture",
};
if capture_sink {
props.insert(*pipewire::keys::STREAM_CAPTURE_SINK, "true");
}
let data = StreamData {
format: Default::default(),
peaks: Default::default(),
peaks_dirty,
peak_processor,
buffers_seen: 0,
};
let stream = StreamRc::new(core.clone(), "wiremix-capture", props).ok()?;
let listener = stream
.add_local_listener_with_user_data(data)
.param_changed({
let sender_weak = Rc::downgrade(sender);
move |_stream, user_data, id, param| {
let Some(param) = param else {
return;
};
if id != ParamType::Format.as_raw() {
return;
}
let (media_type, media_subtype) =
match format_utils::parse_format(param) {
Ok(v) => v,
Err(_) => return,
};
if media_type != MediaType::Audio
|| media_subtype != MediaSubtype::Raw
{
return;
}
let _ = user_data.format.parse(param);
user_data.peaks = (0..user_data.format.channels())
.map(|_| AtomicF32::new(0.0))
.collect::<Arc<[_]>>();
let Some(sender) = sender_weak.upgrade() else {
return;
};
sender.send(StateEvent::NodeStreamStarted {
object_id,
rate: user_data.format.rate(),
peaks: Arc::clone(&user_data.peaks),
});
}
})
.process({
let sender_weak = Rc::downgrade(sender);
move |stream, user_data| {
let Some(mut buffer) = stream.dequeue_buffer() else {
return;
};
user_data.buffers_seen =
user_data.buffers_seen.saturating_add(1);
let Some(sender) = sender_weak.upgrade() else {
return;
};
if user_data.buffers_seen == 1 {
return;
}
let datas = buffer.datas_mut();
if datas.is_empty() {
return;
}
let n_channels = user_data.format.channels() as usize;
let mut n_samples = 0u32;
for c in 0..n_channels {
let Some(data) = datas.get_mut(c) else {
user_data.peaks[c].store(0.0);
continue;
};
let chunk_size = data.chunk().size() as usize;
let Some(samples) = data.data() else {
user_data.peaks[c].store(0.0);
continue;
};
let samples: &[f32] =
bytemuck::cast_slice(&samples[..chunk_size]);
if c == 0 {
n_samples = samples.len() as u32;
}
let new_peak = find_peak(samples);
if let Some(peak_processor) = &user_data.peak_processor {
let _ = user_data.peaks[c].fetch_update(|current| {
let current = if user_data.buffers_seen == 2 {
new_peak
} else {
current
};
Some(peak_processor.process_peak(
new_peak,
current,
n_samples,
user_data.format.rate(),
))
});
} else {
user_data.peaks[c].store(new_peak);
}
}
if !user_data.peaks_dirty.swap(true, Ordering::Relaxed) {
sender.send(StateEvent::NodePeaksDirty { object_id });
}
}
})
.register()
.ok()?;
let mut audio_info = AudioInfoRaw::new();
audio_info.set_format(AudioFormat::F32P);
let pod_object = Object {
type_: pipewire::spa::utils::SpaTypes::ObjectParamFormat.as_raw(),
id: ParamType::EnumFormat.as_raw(),
properties: audio_info.into(),
};
let values: Vec<u8> =
pipewire::spa::pod::serialize::PodSerializer::serialize(
std::io::Cursor::new(Vec::new()),
&pipewire::spa::pod::Value::Object(pod_object),
)
.ok()?
.0
.into_inner();
let mut params = [Pod::from_bytes(&values)?];
stream
.connect(
libspa::utils::Direction::Input,
None,
pipewire::stream::StreamFlags::AUTOCONNECT
| pipewire::stream::StreamFlags::MAP_BUFFERS,
&mut params,
)
.ok()?;
Some((stream, listener))
}