use super::analyzer::{create_shared_analyzer, SharedAnalyzer, SpectrumData};
use pipewire as pw;
use pw::spa::param::audio::AudioInfoRaw;
use pw::spa::pod::Pod;
use std::mem;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::thread;
pub struct PipeWireCapture {
analyzer: SharedAnalyzer,
active: Arc<AtomicBool>,
_thread: thread::JoinHandle<()>,
}
impl PipeWireCapture {
pub fn new() -> Option<Self> {
let analyzer = create_shared_analyzer();
let active = Arc::new(AtomicBool::new(true));
let analyzer_clone = analyzer.clone();
let active_clone = active.clone();
let thread = thread::spawn(move || {
if let Err(e) = run_pipewire_capture(analyzer_clone, active_clone) {
eprintln!("[audio-viz] PipeWire capture error: {:?}", e);
}
});
std::thread::sleep(std::time::Duration::from_millis(300));
if active.load(Ordering::Relaxed) {
Some(Self {
analyzer,
active,
_thread: thread,
})
} else {
None
}
}
pub fn get_spectrum(&self) -> Option<SpectrumData> {
if !self.active.load(Ordering::Relaxed) {
return None;
}
if let Ok(mut analyzer) = self.analyzer.lock() {
Some(analyzer.process())
} else {
None
}
}
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Relaxed)
}
}
impl Drop for PipeWireCapture {
fn drop(&mut self) {
self.active.store(false, Ordering::Relaxed);
}
}
#[derive(Default)]
struct StreamData {
channels: AtomicU32,
format: std::sync::Mutex<AudioInfoRaw>,
}
fn run_pipewire_capture(
analyzer: SharedAnalyzer,
active: Arc<AtomicBool>,
) -> Result<(), pw::Error> {
let mainloop = pw::main_loop::MainLoopBox::new(None)?;
let context = pw::context::ContextBox::new(mainloop.loop_(), None)?;
let core = context.connect(None)?;
let props = pw::properties::properties! {
*pw::keys::MEDIA_TYPE => "Audio",
*pw::keys::MEDIA_CATEGORY => "Capture",
*pw::keys::MEDIA_ROLE => "Music",
*pw::keys::STREAM_CAPTURE_SINK => "true",
};
let stream = pw::stream::StreamBox::new(&core, "spotatui-audio-viz", props)?;
let active_clone = active.clone();
let analyzer_clone = analyzer.clone();
let _listener = stream
.add_local_listener_with_user_data(StreamData::default())
.param_changed(|_, user_data, id, param| {
let Some(param) = param else { return };
if id != pw::spa::param::ParamType::Format.as_raw() {
return;
}
if let Ok(mut format) = user_data.format.lock() {
if format.parse(param).is_ok() {
let channels = format.channels();
user_data.channels.store(channels, Ordering::Relaxed);
}
}
})
.process(move |stream, user_data| {
if !active_clone.load(Ordering::Relaxed) {
return;
}
let Some(mut buffer) = stream.dequeue_buffer() else {
return;
};
let datas = buffer.datas_mut();
if datas.is_empty() {
return;
}
let data = &mut datas[0];
let n_channels = user_data.channels.load(Ordering::Relaxed).max(1) as usize;
let chunk = data.chunk();
let n_bytes = chunk.size() as usize;
if n_bytes == 0 {
return;
}
if let Some(samples_bytes) = data.data() {
let valid_bytes = &samples_bytes[..n_bytes.min(samples_bytes.len())];
let mono_samples: Vec<f32> = valid_bytes
.chunks_exact(mem::size_of::<f32>())
.enumerate()
.filter_map(|(i, chunk)| {
if i % n_channels == 0 {
Some(f32::from_le_bytes(chunk.try_into().unwrap_or([0; 4])))
} else {
None
}
})
.collect();
if !mono_samples.is_empty() {
if let Ok(mut analyzer) = analyzer_clone.lock() {
analyzer.push_samples(&mono_samples);
}
}
}
})
.register()?;
let mut audio_info = AudioInfoRaw::new();
audio_info.set_format(pw::spa::param::audio::AudioFormat::F32LE);
audio_info.set_rate(48000);
audio_info.set_channels(2);
let obj = pw::spa::pod::Object {
type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(),
id: pw::spa::param::ParamType::EnumFormat.as_raw(),
properties: audio_info.into(),
};
let values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
std::io::Cursor::new(Vec::new()),
&pw::spa::pod::Value::Object(obj),
)
.map_err(|_| pw::Error::CreationFailed)?
.0
.into_inner();
let mut params = [Pod::from_bytes(&values).unwrap()];
stream.connect(
pw::spa::utils::Direction::Input,
None,
pw::stream::StreamFlags::AUTOCONNECT
| pw::stream::StreamFlags::MAP_BUFFERS
| pw::stream::StreamFlags::RT_PROCESS,
&mut params,
)?;
mainloop.run();
Ok(())
}