nebulus 0.1.27

Low-latency native OpenIPC FPV ground station built with egui
use std::{
    collections::VecDeque,
    sync::{
        atomic::{AtomicU64, Ordering},
        Arc, Mutex,
    },
};

use cpal::{
    traits::{DeviceTrait, HostTrait, StreamTrait},
    FromSample, SampleFormat, SizedSample, Stream, StreamConfig,
};
use openipc_core::rtp::RtpHeader;
use ropus::{Channels, DecodeMode, Decoder};

use crate::model::AudioStats;

pub(crate) struct AudioPlayer {
    decoder: Decoder,
    source_rate: u32,
    source_channels: usize,
    output_rate: u32,
    output_channels: usize,
    volume: f32,
    queue: Arc<Mutex<VecDeque<f32>>>,
    stream_errors: Arc<AtomicU64>,
    stats: AudioStats,
    _stream: Stream,
}

impl AudioPlayer {
    pub(crate) fn new(sample_rate: u32, channels: u8, volume: u8) -> Result<Self, String> {
        let channels = channels.clamp(1, 2) as usize;
        let decoder = Decoder::new(sample_rate, opus_channels(channels))
            .map_err(|error| format!("Opus decoder init failed: {error}"))?;
        let host = cpal::default_host();
        let device = host
            .default_output_device()
            .ok_or_else(|| "no default audio output device".to_owned())?;
        let supported = device
            .default_output_config()
            .map_err(|error| format!("audio output config unavailable: {error}"))?;
        let output_rate = supported.sample_rate();
        let output_channels = usize::from(supported.channels());
        let config: StreamConfig = supported.into();
        let queue = Arc::new(Mutex::new(VecDeque::new()));
        let stream_errors = Arc::new(AtomicU64::new(0));
        let stream = match supported.sample_format() {
            SampleFormat::I8 => build_stream::<i8>(&device, &config, &queue, &stream_errors),
            SampleFormat::I16 => build_stream::<i16>(&device, &config, &queue, &stream_errors),
            SampleFormat::I24 => {
                build_stream::<cpal::I24>(&device, &config, &queue, &stream_errors)
            }
            SampleFormat::I32 => build_stream::<i32>(&device, &config, &queue, &stream_errors),
            SampleFormat::I64 => build_stream::<i64>(&device, &config, &queue, &stream_errors),
            SampleFormat::U8 => build_stream::<u8>(&device, &config, &queue, &stream_errors),
            SampleFormat::U16 => build_stream::<u16>(&device, &config, &queue, &stream_errors),
            SampleFormat::U32 => build_stream::<u32>(&device, &config, &queue, &stream_errors),
            SampleFormat::U64 => build_stream::<u64>(&device, &config, &queue, &stream_errors),
            SampleFormat::F32 => build_stream::<f32>(&device, &config, &queue, &stream_errors),
            SampleFormat::F64 => build_stream::<f64>(&device, &config, &queue, &stream_errors),
            format => Err(format!("unsupported audio sample format {format}")),
        }?;
        stream
            .play()
            .map_err(|error| format!("audio output start failed: {error}"))?;

        Ok(Self {
            decoder,
            source_rate: sample_rate,
            source_channels: channels,
            output_rate,
            output_channels,
            volume: f32::from(volume.min(100)) / 100.0,
            queue,
            stream_errors,
            stats: AudioStats {
                enabled: true,
                supported: true,
                decoder_name: "ropus / CPAL".to_owned(),
                ..AudioStats::default()
            },
            _stream: stream,
        })
    }

    pub(crate) fn push_rtp(&mut self, packet: &[u8]) -> Result<(), String> {
        self.stats.packets = self.stats.packets.saturating_add(1);
        self.stats.bytes = self.stats.bytes.saturating_add(packet.len() as u64);
        let header =
            RtpHeader::parse(packet).map_err(|error| format!("invalid audio RTP: {error:?}"))?;
        let payload = header.payload(packet);
        let max_frames = (self.source_rate as usize * 120) / 1_000;
        let mut decoded = vec![0.0; max_frames * self.source_channels];
        let frames = self
            .decoder
            .decode_float(payload, &mut decoded, DecodeMode::Normal)
            .map_err(|error| format!("Opus decode failed: {error}"))?;
        decoded.truncate(frames * self.source_channels);
        let output = resample_and_remix(
            &decoded,
            self.source_rate,
            self.source_channels,
            self.output_rate,
            self.output_channels,
            self.volume,
        );
        let mut queue = self.queue.lock().map_err(|_| "audio queue poisoned")?;
        let max_samples = self.output_rate as usize * self.output_channels / 4;
        let overflow = queue
            .len()
            .saturating_add(output.len())
            .saturating_sub(max_samples);
        if overflow > 0 {
            let drop_count = overflow.min(queue.len());
            queue.drain(..drop_count);
        }
        queue.extend(output);
        self.stats.decoded_frames = self.stats.decoded_frames.saturating_add(1);
        self.stats.queued_ms =
            queue.len() as f64 * 1_000.0 / (self.output_rate as f64 * self.output_channels as f64);
        Ok(())
    }

    pub(crate) fn record_error(&mut self) {
        self.stats.errors = self.stats.errors.saturating_add(1);
    }

    pub(crate) fn set_volume(&mut self, volume: u8) {
        self.volume = f32::from(volume.min(100)) / 100.0;
    }

    pub(crate) fn stats(&self) -> AudioStats {
        let mut stats = self.stats.clone();
        stats.errors = stats
            .errors
            .saturating_add(self.stream_errors.load(Ordering::Relaxed));
        if let Ok(queue) = self.queue.lock() {
            stats.queued_ms = queue.len() as f64 * 1_000.0
                / (self.output_rate as f64 * self.output_channels as f64);
        }
        stats
    }
}

fn build_stream<T>(
    device: &cpal::Device,
    config: &StreamConfig,
    queue: &Arc<Mutex<VecDeque<f32>>>,
    errors: &Arc<AtomicU64>,
) -> Result<Stream, String>
where
    T: SizedSample + FromSample<f32>,
{
    let queue = Arc::clone(queue);
    let errors_for_stream = Arc::clone(errors);
    device
        .build_output_stream(
            *config,
            move |output: &mut [T], _| {
                if let Ok(mut queue) = queue.try_lock() {
                    for sample in output {
                        *sample = T::from_sample(queue.pop_front().unwrap_or(0.0));
                    }
                } else {
                    output.fill_with(|| T::from_sample(0.0));
                }
            },
            move |_| {
                errors_for_stream.fetch_add(1, Ordering::Relaxed);
            },
            None,
        )
        .map_err(|error| format!("audio output creation failed: {error}"))
}

fn opus_channels(channels: usize) -> Channels {
    if channels == 2 {
        Channels::Stereo
    } else {
        Channels::Mono
    }
}

fn resample_and_remix(
    input: &[f32],
    input_rate: u32,
    input_channels: usize,
    output_rate: u32,
    output_channels: usize,
    volume: f32,
) -> Vec<f32> {
    let input_frames = input.len() / input_channels;
    if input_frames == 0 {
        return Vec::new();
    }
    let output_frames =
        (input_frames as u64 * u64::from(output_rate) / u64::from(input_rate)).max(1) as usize;
    let mut output = Vec::with_capacity(output_frames * output_channels);
    for output_frame in 0..output_frames {
        let source = output_frame as f64 * input_rate as f64 / output_rate as f64;
        let left = source.floor() as usize;
        let right = (left + 1).min(input_frames - 1);
        let mix = (source - left as f64) as f32;
        for output_channel in 0..output_channels {
            let input_channel = output_channel.min(input_channels - 1);
            let a = input[left * input_channels + input_channel];
            let b = input[right * input_channels + input_channel];
            output.push((a + (b - a) * mix) * volume);
        }
    }
    output
}

#[cfg(test)]
mod tests {
    use super::resample_and_remix;

    #[test]
    fn mono_is_remixed_to_stereo() {
        let output = resample_and_remix(&[0.5, -0.5], 48_000, 1, 48_000, 2, 1.0);
        assert_eq!(output, [0.5, 0.5, -0.5, -0.5]);
    }
}