termwaves 0.1.0

Real-time audio capture and visualization (FFT, oscilloscope, spectrum) driven by PipeWire
use std::convert::TryInto;
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};

use pipewire as pw;
use pw::{properties::properties, spa};
use spa::param::format::{MediaSubtype, MediaType};
use spa::param::format_utils;
use spa::pod::Pod;

use ringbuf::HeapRb;
use ringbuf::traits::{Consumer, Producer as _, Split};

const RING_CAPACITY: usize = 96_000;

type RingProducer = ringbuf::wrap::caching::Caching<Arc<HeapRb<f32>>, true, false>;
type RingConsumer = ringbuf::wrap::caching::Caching<Arc<HeapRb<f32>>, false, true>;

#[derive(Default)]
struct SharedFormat {
    channels: AtomicU32,
    rate: AtomicU32,
}

pub struct CaptureHandle {
    consumer: RingConsumer,
    format: Arc<SharedFormat>,
}

impl CaptureHandle {
    pub fn channels(&self) -> u32 {
        self.format.channels.load(Ordering::Relaxed)
    }

    pub fn sample_rate(&self) -> u32 {
        self.format.rate.load(Ordering::Relaxed)
    }

    pub fn is_ready(&self) -> bool {
        self.channels() > 0
    }

    pub fn read(&mut self, out: &mut [f32]) -> usize {
        self.consumer.pop_slice(out)
    }
}

struct CaptureState {
    info: spa::param::audio::AudioInfoRaw,
    format: Arc<SharedFormat>,
    producer: RingProducer,
    scratch: Vec<f32>,
}

pub fn start() -> CaptureHandle {
    let ring = Arc::new(HeapRb::<f32>::new(RING_CAPACITY));
    let (producer, consumer) = ring.split();

    let format = Arc::new(SharedFormat::default());

    let thread_format = format.clone();
    std::thread::Builder::new()
        .name("termwaves-capture".into())
        .spawn(move || {
            if let Err(e) = run_capture(producer, thread_format) {
                eprintln!("capture thread error: {e}");
            }
        })
        .expect("failed to spawn capture thread");

    CaptureHandle { consumer, format }
}

fn run_capture(
    producer: RingProducer,
    format: Arc<SharedFormat>,
) -> Result<(), Box<dyn std::error::Error>> {
    pw::init();

    let mainloop = pw::main_loop::MainLoopRc::new(None)?;
    let context = pw::context::ContextRc::new(&mainloop, None)?;
    let core = context.connect_rc(None)?;

    let mut props = properties! {
        *pw::keys::MEDIA_TYPE => "Audio",
        *pw::keys::MEDIA_CATEGORY => "Capture",
        *pw::keys::MEDIA_ROLE => "Music",
    };
    props.insert(*pw::keys::STREAM_CAPTURE_SINK, "true");

    let stream = pw::stream::StreamBox::new(&core, "termwaves-capture", props)?;

    let state = CaptureState {
        info: Default::default(),
        format,
        producer,
        scratch: Vec::with_capacity(8192),
    };

    let _listener = stream
        .add_local_listener_with_user_data(state)
        .param_changed(|_, state, id, param| {
            let Some(param) = param else { return };
            if id != pw::spa::param::ParamType::Format.as_raw() {
                return;
            }
            let (media_type, media_subtype) = match format_utils::parse_format(param) {
                Ok(v) => v,
                Err(_) => return,
            };
            if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw {
                return;
            }
            state
                .info
                .parse(param)
                .expect("failed to parse AudioInfoRaw");
            state
                .format
                .channels
                .store(state.info.channels(), Ordering::Relaxed);
            state
                .format
                .rate
                .store(state.info.rate(), Ordering::Relaxed);
        })
        .process(|stream, state| {
            let Some(mut buffer) = stream.dequeue_buffer() else {
                return;
            };
            let datas = buffer.datas_mut();
            if datas.is_empty() {
                return;
            }
            let data = &mut datas[0];
            let n_bytes = data.chunk().size() as usize;
            let Some(bytes) = data.data() else { return };

            state.scratch.clear();
            state.scratch.extend(
                bytes[..n_bytes]
                    .chunks_exact(mem::size_of::<f32>())
                    .map(|b| f32::from_le_bytes(b.try_into().unwrap())),
            );
            let _ = state.producer.push_slice(&state.scratch);
        })
        .register()?;

    let mut audio_info = spa::param::audio::AudioInfoRaw::new();
    audio_info.set_format(spa::param::audio::AudioFormat::F32LE);
    let obj = pw::spa::pod::Object {
        type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(),
        id: pw::spa::param::ParamType::EnumFormat.as_raw(),
        properties: audio_info.into(),
    };
    let values: Vec<u8> = pw::spa::pod::serialize::PodSerializer::serialize(
        std::io::Cursor::new(Vec::new()),
        &pw::spa::pod::Value::Object(obj),
    )?
    .0
    .into_inner();
    let mut params = [Pod::from_bytes(&values).unwrap()];

    stream.connect(
        spa::utils::Direction::Input,
        None,
        pw::stream::StreamFlags::AUTOCONNECT
            | pw::stream::StreamFlags::MAP_BUFFERS
            | pw::stream::StreamFlags::RT_PROCESS,
        &mut params,
    )?;

    mainloop.run();
    Ok(())
}