nebulus 0.1.29

Low-latency native OpenIPC FPV ground station built with egui
use std::{
    collections::VecDeque,
    sync::{
        atomic::{AtomicU64, Ordering},
        Arc, Mutex,
    },
};

use cpal::{
    traits::{DeviceTrait, HostTrait, StreamTrait},
    BufferSize, FromSample, SampleFormat, SizedSample, Stream, StreamConfig, SupportedBufferSize,
};
use openipc_core::rtp::RtpHeader;
use ropus::{Channels, DecodeMode, Decoder};

use crate::model::AudioStats;

pub(crate) struct AudioPlayer {
    decoder: Decoder,
    source_rate: u32,
    source_channels: usize,
    output_rate: u32,
    output_channels: usize,
    volume: f32,
    decoded: Vec<f32>,
    mixed: Vec<f32>,
    queue: Arc<Mutex<VecDeque<f32>>>,
    stream_errors: Arc<AtomicU64>,
    stats: AudioStats,
    _stream: Stream,
}

impl AudioPlayer {
    pub(crate) fn new(sample_rate: u32, channels: u8, volume: u8) -> Result<Self, String> {
        let channels = channels.clamp(1, 2) as usize;
        let decoder = Decoder::new(sample_rate, opus_channels(channels))
            .map_err(|error| format!("Opus decoder init failed: {error}"))?;
        let host = cpal::default_host();
        let device = host
            .default_output_device()
            .ok_or_else(|| "no default audio output device".to_owned())?;
        let supported = device
            .default_output_config()
            .map_err(|error| format!("audio output config unavailable: {error}"))?;
        let output_rate = supported.sample_rate();
        let output_channels = usize::from(supported.channels());
        let mut config: StreamConfig = supported.into();
        let requested_frames = match supported.buffer_size() {
            SupportedBufferSize::Range { min, max } => 256_u32.clamp(*min, *max),
            SupportedBufferSize::Unknown => 256,
        };
        // CPAL's `realtime` feature requests AAudio's low-latency mode. A
        // fixed callback size also avoids oversized host defaults.
        config.buffer_size = BufferSize::Fixed(requested_frames);
        let queue = Arc::new(Mutex::new(VecDeque::new()));
        let stream_errors = Arc::new(AtomicU64::new(0));
        let sample_format = supported.sample_format();
        let stream = build_stream_for_format(
            &device,
            &config,
            sample_format,
            &queue,
            &stream_errors,
        )
        .or_else(|low_latency_error| {
            config.buffer_size = BufferSize::Default;
            build_stream_for_format(
                &device,
                &config,
                sample_format,
                &queue,
                &stream_errors,
            )
            .map_err(|fallback_error| {
                format!(
                    "low-latency audio output failed ({low_latency_error}); default buffer failed ({fallback_error})"
                )
            })
        })?;
        stream
            .play()
            .map_err(|error| format!("audio output start failed: {error}"))?;

        Ok(Self {
            decoder,
            source_rate: sample_rate,
            source_channels: channels,
            output_rate,
            output_channels,
            volume: f32::from(volume.min(100)) / 100.0,
            decoded: Vec::new(),
            mixed: Vec::new(),
            queue,
            stream_errors,
            stats: AudioStats {
                enabled: true,
                supported: true,
                decoder_name: "ropus / CPAL".to_owned(),
                ..AudioStats::default()
            },
            _stream: stream,
        })
    }

    pub(crate) fn push_rtp(&mut self, packet: &[u8]) -> Result<(), String> {
        self.stats.packets = self.stats.packets.saturating_add(1);
        self.stats.bytes = self.stats.bytes.saturating_add(packet.len() as u64);
        let header =
            RtpHeader::parse(packet).map_err(|error| format!("invalid audio RTP: {error:?}"))?;
        let payload = header.payload(packet);
        let max_frames = (self.source_rate as usize * 120) / 1_000;
        self.decoded.resize(max_frames * self.source_channels, 0.0);
        let frames = self
            .decoder
            .decode_float(payload, &mut self.decoded, DecodeMode::Normal)
            .map_err(|error| format!("Opus decode failed: {error}"))?;
        self.decoded.truncate(frames * self.source_channels);
        resample_and_remix_into(
            &self.decoded,
            self.source_rate,
            self.source_channels,
            self.output_rate,
            self.output_channels,
            self.volume,
            &mut self.mixed,
        );
        let mut queue = self.queue.lock().map_err(|_| "audio queue poisoned")?;
        let max_samples = self.output_rate as usize * self.output_channels * 40 / 1_000;
        let overflow = queue
            .len()
            .saturating_add(self.mixed.len())
            .saturating_sub(max_samples);
        if overflow > 0 {
            let drop_count = overflow.min(queue.len());
            queue.drain(..drop_count);
        }
        queue.extend(self.mixed.iter().copied());
        self.stats.decoded_frames = self.stats.decoded_frames.saturating_add(1);
        self.stats.queued_ms =
            queue.len() as f64 * 1_000.0 / (self.output_rate as f64 * self.output_channels as f64);
        Ok(())
    }

    pub(crate) fn record_error(&mut self) {
        self.stats.errors = self.stats.errors.saturating_add(1);
    }

    pub(crate) fn set_volume(&mut self, volume: u8) {
        self.volume = f32::from(volume.min(100)) / 100.0;
    }

    pub(crate) fn stats(&self) -> AudioStats {
        let mut stats = self.stats.clone();
        stats.errors = stats
            .errors
            .saturating_add(self.stream_errors.load(Ordering::Relaxed));
        if let Ok(queue) = self.queue.lock() {
            stats.queued_ms = queue.len() as f64 * 1_000.0
                / (self.output_rate as f64 * self.output_channels as f64);
        }
        stats
    }
}

fn build_stream_for_format(
    device: &cpal::Device,
    config: &StreamConfig,
    format: SampleFormat,
    queue: &Arc<Mutex<VecDeque<f32>>>,
    errors: &Arc<AtomicU64>,
) -> Result<Stream, String> {
    match format {
        SampleFormat::I8 => build_stream::<i8>(device, config, queue, errors),
        SampleFormat::I16 => build_stream::<i16>(device, config, queue, errors),
        SampleFormat::I24 => build_stream::<cpal::I24>(device, config, queue, errors),
        SampleFormat::I32 => build_stream::<i32>(device, config, queue, errors),
        SampleFormat::I64 => build_stream::<i64>(device, config, queue, errors),
        SampleFormat::U8 => build_stream::<u8>(device, config, queue, errors),
        SampleFormat::U16 => build_stream::<u16>(device, config, queue, errors),
        SampleFormat::U32 => build_stream::<u32>(device, config, queue, errors),
        SampleFormat::U64 => build_stream::<u64>(device, config, queue, errors),
        SampleFormat::F32 => build_stream::<f32>(device, config, queue, errors),
        SampleFormat::F64 => build_stream::<f64>(device, config, queue, errors),
        format => Err(format!("unsupported audio sample format {format}")),
    }
}

fn build_stream<T>(
    device: &cpal::Device,
    config: &StreamConfig,
    queue: &Arc<Mutex<VecDeque<f32>>>,
    errors: &Arc<AtomicU64>,
) -> Result<Stream, String>
where
    T: SizedSample + FromSample<f32>,
{
    let queue = Arc::clone(queue);
    let errors_for_stream = Arc::clone(errors);
    device
        .build_output_stream(
            *config,
            move |output: &mut [T], _| {
                if let Ok(mut queue) = queue.try_lock() {
                    for sample in output {
                        *sample = T::from_sample(queue.pop_front().unwrap_or(0.0));
                    }
                } else {
                    output.fill_with(|| T::from_sample(0.0));
                }
            },
            move |_| {
                errors_for_stream.fetch_add(1, Ordering::Relaxed);
            },
            None,
        )
        .map_err(|error| format!("audio output creation failed: {error}"))
}

fn opus_channels(channels: usize) -> Channels {
    if channels == 2 {
        Channels::Stereo
    } else {
        Channels::Mono
    }
}

#[cfg(test)]
fn resample_and_remix(
    input: &[f32],
    input_rate: u32,
    input_channels: usize,
    output_rate: u32,
    output_channels: usize,
    volume: f32,
) -> Vec<f32> {
    let mut output = Vec::new();
    resample_and_remix_into(
        input,
        input_rate,
        input_channels,
        output_rate,
        output_channels,
        volume,
        &mut output,
    );
    output
}

fn resample_and_remix_into(
    input: &[f32],
    input_rate: u32,
    input_channels: usize,
    output_rate: u32,
    output_channels: usize,
    volume: f32,
    output: &mut Vec<f32>,
) {
    output.clear();
    let input_frames = input.len() / input_channels;
    if input_frames == 0 {
        return;
    }
    let output_frames =
        (input_frames as u64 * u64::from(output_rate) / u64::from(input_rate)).max(1) as usize;
    output.reserve(output_frames * output_channels);
    for output_frame in 0..output_frames {
        let source = output_frame as f64 * input_rate as f64 / output_rate as f64;
        let left = source.floor() as usize;
        let right = (left + 1).min(input_frames - 1);
        let mix = (source - left as f64) as f32;
        for output_channel in 0..output_channels {
            let input_channel = output_channel.min(input_channels - 1);
            let a = input[left * input_channels + input_channel];
            let b = input[right * input_channels + input_channel];
            output.push((a + (b - a) * mix) * volume);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::resample_and_remix;

    #[test]
    fn mono_is_remixed_to_stereo() {
        let output = resample_and_remix(&[0.5, -0.5], 48_000, 1, 48_000, 2, 1.0);
        assert_eq!(output, [0.5, 0.5, -0.5, -0.5]);
    }
}