wiremix 0.11.0

A TUI mixer for PipeWire
use std::mem;
use std::rc::Rc;

use pipewire::{
    core::Core,
    properties::properties,
    stream::{Stream, StreamListener},
};

use libspa::{
    param::audio::{AudioFormat, AudioInfoRaw},
    param::format::{MediaSubtype, MediaType},
    param::{format_utils, ParamType},
    pod::{Object, Pod},
};

use crate::wirehose::event_sender::EventSender;
use crate::wirehose::{ObjectId, StateEvent};

#[derive(Default)]
pub struct StreamData {
    format: AudioInfoRaw,
}

pub fn capture_node(
    core: &Core,
    sender: &Rc<EventSender>,
    object_id: ObjectId,
    serial: &str,
    capture_sink: bool,
) -> Option<(Rc<Stream>, StreamListener<StreamData>)> {
    let mut props = properties! {
        *pipewire::keys::TARGET_OBJECT => String::from(serial),
        *pipewire::keys::STREAM_MONITOR => "true",
        *pipewire::keys::NODE_NAME => "wiremix-capture",
    };
    if capture_sink {
        props.insert(*pipewire::keys::STREAM_CAPTURE_SINK, "true");
    }

    let data = StreamData {
        format: Default::default(),
    };

    let stream = Stream::new(core, "wiremix-capture", props).ok()?;
    let stream = Rc::new(stream);
    let listener = stream
        .add_local_listener_with_user_data(data)
        .param_changed({
            let sender_weak = Rc::downgrade(sender);

            move |_stream, user_data, id, param| {
                // NULL means to clear the format
                let Some(param) = param else {
                    return;
                };
                if id != ParamType::Format.as_raw() {
                    return;
                }

                let (media_type, media_subtype) =
                    match format_utils::parse_format(param) {
                        Ok(v) => v,
                        Err(_) => return,
                    };

                // only accept raw audio
                if media_type != MediaType::Audio
                    || media_subtype != MediaSubtype::Raw
                {
                    return;
                }

                // call a helper function to parse the format for us.
                let _ = user_data.format.parse(param);

                let Some(sender) = sender_weak.upgrade() else {
                    return;
                };
                sender.send(StateEvent::NodeRate {
                    object_id,
                    rate: user_data.format.rate(),
                });
            }
        })
        .process({
            let sender_weak = Rc::downgrade(sender);

            move |stream, user_data| {
                let Some(mut buffer) = stream.dequeue_buffer() else {
                    return;
                };
                let Some(sender) = sender_weak.upgrade() else {
                    return;
                };
                let datas = buffer.datas_mut();
                if datas.is_empty() {
                    return;
                }

                let data = &mut datas[0];
                let n_channels = user_data.format.channels();
                let n_samples =
                    data.chunk().size() / (mem::size_of::<f32>() as u32);

                if let Some(samples) = data.data() {
                    let mut peaks = Vec::new();
                    for c in 0..n_channels {
                        let mut max: f32 = 0.0;
                        for n in (c..n_samples).step_by(n_channels as usize) {
                            let start = n as usize * mem::size_of::<f32>();
                            let end = start + mem::size_of::<f32>();
                            let chan = &samples[start..end];
                            let f = f32::from_le_bytes(
                                chan.try_into().unwrap_or([0; 4]),
                            );
                            max = max.max(f.abs());
                        }

                        peaks.push(max);
                    }
                    sender.send(StateEvent::NodePeaks {
                        object_id,
                        peaks,
                        samples: n_samples,
                    });
                }
            }
        })
        .register()
        .ok()?;

    let mut audio_info = AudioInfoRaw::new();
    audio_info.set_format(AudioFormat::F32LE);
    let pod_object = Object {
        type_: pipewire::spa::utils::SpaTypes::ObjectParamFormat.as_raw(),
        id: ParamType::EnumFormat.as_raw(),
        properties: audio_info.into(),
    };
    let values: Vec<u8> =
        pipewire::spa::pod::serialize::PodSerializer::serialize(
            std::io::Cursor::new(Vec::new()),
            &pipewire::spa::pod::Value::Object(pod_object),
        )
        .ok()?
        .0
        .into_inner();

    let mut params = [Pod::from_bytes(&values)?];

    stream
        .connect(
            libspa::utils::Direction::Input,
            None,
            pipewire::stream::StreamFlags::AUTOCONNECT
                | pipewire::stream::StreamFlags::MAP_BUFFERS,
            &mut params,
        )
        .ok()?;

    Some((stream, listener))
}