libfoksalaudio 1.2.9

foksal audio library
Documentation
use anyhow::Result;
use crossbeam_channel as cbeam_chan;
use rkyv::{access, rancor::Error as RkyvError, util::AlignedVec};
use serde::Serialize;
use std::{path::PathBuf, thread};
use thiserror::Error;
use tokio::sync::{broadcast, mpsc as tokio_chan, oneshot};
use tokio_tungstenite::tungstenite::Bytes;

use crate::{Volume, device::Device, resampler::ResamplerWrapper};
use libfoksalcommon::{
    AUDIO_BUF_LEN, ArchivedAudioChunk, AudioSpec, CommonSample,
    net::request::{FileRequest, RawFileRequest},
};

const REQUEST_SIZE: usize = 8; // multiple of AUDIO_BUF_LEN

#[derive(Clone, Debug, Error, Serialize)]
#[serde(tag = "error", rename_all = "snake_case")]
pub enum SinkError {
    #[error("decoder error ({reason})")]
    Decoder { reason: String },
    #[error("resampler error ({reason})")]
    Resampler { reason: String },
}

pub enum SinkRequest {
    GetState(oneshot::Sender<PlaybackState>),
    GetVolume(oneshot::Sender<Volume>),
    GetElapsed(oneshot::Sender<u64>),
    Play(PathBuf),
    VolChange(i8),
    VolSet(u8),
    SeekBy(isize),
    SeekTo(usize),
    Stop,
    Pause,
    Resume,
    Toggle,
}

pub enum SinkResponse {
    SongOver,
    StateChanged(PlaybackState),
    VolumeChanged(Volume),
    Elapsed(u64),
}

#[derive(Default)]
struct Samples {
    inner: Vec<CommonSample>,
    ptr: usize,
    got_all: bool,
}

#[derive(Default)]
struct PlaybackData {
    samples: Samples,
    prev_elapsed: u64,
    uri: Option<PathBuf>,
    audio_spec: Option<AudioSpec>,
    resampler: Option<ResamplerWrapper>,
    rx_chunks: Option<oneshot::Receiver<Bytes>>,
}

#[derive(Clone, Copy, Debug, Default, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum PlaybackState {
    #[default]
    Stopped,
    Playing,
    Paused,
    AwaitingNext,
}

struct Sink {
    device: Device,
    state: PlaybackState,
    data: PlaybackData,
    volume: Volume,
    tx_response: tokio_chan::UnboundedSender<SinkResponse>,
    tx_error: broadcast::Sender<SinkError>,
}

impl SinkError {
    pub fn to_bytes(&self) -> Result<Bytes> {
        let s = serde_json::to_vec(&self)?;
        Ok(s.into())
    }
}

impl Samples {
    fn clear(&mut self) {
        self.inner.clear();
        self.ptr = 0;
        self.got_all = false;
    }
}

impl Sink {
    fn new(
        device: Device,
        tx_response: tokio_chan::UnboundedSender<SinkResponse>,
        tx_error: broadcast::Sender<SinkError>,
    ) -> Self {
        Self {
            device,
            state: Default::default(),
            data: Default::default(),
            volume: Default::default(),
            tx_response,
            tx_error,
        }
    }

    fn change_state(&mut self, new_state: PlaybackState) {
        self.state = new_state;
        if matches!(
            new_state,
            PlaybackState::Stopped | PlaybackState::AwaitingNext
        ) {
            self.data = Default::default();
        }
        if !matches!(new_state, PlaybackState::AwaitingNext) {
            // AwaitingNext is a special state hidden from the outside world
            // in reality the sink stops for a moment before playing the next song
            // but we want the outside world to think that there were no gaps
            let _ = self.tx_response.send(SinkResponse::StateChanged(new_state));
        }
    }

    fn err_and_stop(&mut self, err: SinkError) {
        let _ = self.tx_error.send(err);
        self.change_state(PlaybackState::Stopped);
    }

    fn check_song_end(&mut self) {
        let samples = &self.data.samples;
        if samples.ptr >= samples.inner.len() && samples.got_all {
            let _ = self.tx_response.send(SinkResponse::SongOver);
            self.change_state(PlaybackState::AwaitingNext);
        }
    }

    fn elapsed(&self) -> u64 {
        match self.data.audio_spec {
            Some(AudioSpec {
                n_channels,
                sample_rate,
            }) => self.data.samples.ptr as u64 / (n_channels * sample_rate) as u64,
            None => 0,
        }
    }

    fn recalc_elapsed(&mut self) {
        let new_elapsed = self.elapsed();
        if new_elapsed != self.data.prev_elapsed {
            self.data.prev_elapsed = new_elapsed;
            let _ = self.tx_response.send(SinkResponse::Elapsed(new_elapsed));
        }
    }

    fn seek_by(&mut self, seconds: isize) {
        let Some(AudioSpec {
            n_channels,
            sample_rate,
        }) = self.data.audio_spec.as_ref()
        else {
            return;
        };
        let offset = (n_channels * sample_rate) as isize * seconds;
        self.data.samples.ptr = self.data.samples.ptr.saturating_add_signed(offset);
        self.check_song_end();
    }

    fn seek_to(&mut self, seconds: usize) {
        let Some(AudioSpec {
            n_channels,
            sample_rate,
        }) = self.data.audio_spec.as_ref()
        else {
            return;
        };
        self.data.samples.ptr = n_channels * sample_rate * seconds;
        self.check_song_end();
    }

    fn handle_request(&mut self, request: SinkRequest) {
        match request {
            SinkRequest::GetState(respond_to) => {
                let _ = respond_to.send(self.state);
            }
            SinkRequest::GetVolume(respond_to) => {
                let _ = respond_to.send(self.volume);
            }
            SinkRequest::GetElapsed(respond_to) => {
                let _ = respond_to.send(self.elapsed());
            }
            SinkRequest::VolChange(delta) => {
                self.volume.change(delta);
                let _ = self
                    .tx_response
                    .send(SinkResponse::VolumeChanged(self.volume));
            }
            SinkRequest::VolSet(volume) => {
                self.volume.set(volume);
                let _ = self
                    .tx_response
                    .send(SinkResponse::VolumeChanged(self.volume));
            }
            SinkRequest::SeekBy(seconds) => {
                self.seek_by(seconds);
                self.recalc_elapsed();
            }
            SinkRequest::SeekTo(seconds) => {
                self.seek_to(seconds);
                self.recalc_elapsed();
            }
            SinkRequest::Play(uri) => {
                self.data = Default::default();
                self.data.uri.replace(uri);
                self.data.samples.clear();
                self.change_state(PlaybackState::Playing);
            }
            SinkRequest::Pause => {
                if let PlaybackState::Playing = self.state {
                    self.change_state(PlaybackState::Paused);
                }
            }
            SinkRequest::Resume => {
                if let PlaybackState::Paused = self.state {
                    self.change_state(PlaybackState::Playing);
                }
            }
            SinkRequest::Toggle => {
                match self.state {
                    PlaybackState::Paused => self.change_state(PlaybackState::Playing),
                    PlaybackState::Playing => self.change_state(PlaybackState::Paused),
                    _ => (),
                };
            }
            SinkRequest::Stop => self.change_state(PlaybackState::Stopped),
        }
    }

    fn request_more_samples(
        &mut self,
        uri: PathBuf,
        tx_file_request: &tokio_chan::UnboundedSender<FileRequest>,
    ) {
        let start = self.data.samples.inner.len();
        let end = start + REQUEST_SIZE * AUDIO_BUF_LEN;
        let (tx, rx) = oneshot::channel();
        let _ = tx_file_request.send(FileRequest::new(
            RawFileRequest::GetChunk { uri, start, end },
            tx,
        ));
        self.data.rx_chunks.replace(rx);
    }

    fn append_samples(&mut self, chunk: &ArchivedAudioChunk) {
        let (n_channels, sample_rate) = (
            chunk.n_channels.to_native() as usize,
            chunk.sample_rate.to_native() as usize,
        );
        if self.data.audio_spec.is_none() {
            self.data.audio_spec = Some(AudioSpec::new(n_channels, sample_rate));
        }
        if self.data.resampler.is_none() {
            match ResamplerWrapper::try_new(sample_rate, self.device.sample_rate(), n_channels) {
                Ok(resampler) => self.data.resampler = Some(resampler),
                Err(e) => {
                    self.err_and_stop(SinkError::Resampler {
                        reason: e.to_string(),
                    });
                    return;
                }
            }
        }
        let new_samples: Vec<_> = chunk
            .samples
            .as_slice()
            .iter()
            .map(|x| x.to_native())
            .collect();
        self.data.samples.inner.extend(new_samples);
        if chunk.is_final {
            self.data.samples.got_all = true;
        }
    }

    fn run(
        mut self,
        tx_samples: cbeam_chan::Sender<CommonSample>,
        tx_file_request: tokio_chan::UnboundedSender<FileRequest>,
        rx_sink_request: cbeam_chan::Receiver<SinkRequest>,
    ) {
        loop {
            let request = match self.state {
                PlaybackState::Playing => match rx_sink_request.try_recv() {
                    Ok(request) => Some(request),
                    Err(cbeam_chan::TryRecvError::Disconnected) => break,
                    _ => None,
                },
                _ => match rx_sink_request.recv() {
                    Ok(request) => Some(request),
                    Err(_) => break,
                },
            };
            if let Some(request) = request {
                self.handle_request(request);
            }
            if !self.data.samples.got_all
                && let Some(uri) = self.data.uri.as_ref().cloned()
            {
                match self.data.rx_chunks {
                    Some(ref mut rx) => {
                        if let Ok(bytes) = rx.try_recv() {
                            let mut aligned: AlignedVec = AlignedVec::with_capacity(bytes.len());
                            aligned.extend_from_slice(&bytes);
                            match access::<ArchivedAudioChunk, RkyvError>(&aligned) {
                                Ok(chunk) => {
                                    self.append_samples(chunk);
                                    self.data.rx_chunks = None;
                                }
                                Err(_) => {
                                    self.err_and_stop(SinkError::Decoder {
                                        reason: String::from_utf8_lossy(&bytes).to_string(),
                                    });
                                }
                            }
                        }
                    }
                    None => self.request_more_samples(uri, &tx_file_request),
                }
            }
            if let PlaybackState::Playing = self.state
                && let Some(ref mut resampler) = self.data.resampler
            {
                if self.data.samples.ptr >= self.data.samples.inner.len() {
                    if self.data.samples.got_all {
                        // seeked past the song's duration
                        self.check_song_end();
                    }
                } else {
                    let samples = &mut self.data.samples;
                    let end =
                        (samples.ptr + resampler.input_len() - 1).min(samples.inner.len() - 1);
                    let buf = &samples.inner[samples.ptr..=end];
                    match resampler.resample(buf) {
                        Ok(resampled) => {
                            let mult = self.volume.to_mult();
                            for sample in resampled {
                                let _ = tx_samples.send(*sample * (mult as CommonSample));
                            }
                            samples.ptr = end + 1;
                            self.recalc_elapsed();
                            self.check_song_end();
                        }
                        Err(e) => {
                            self.err_and_stop(SinkError::Resampler {
                                reason: e.to_string(),
                            });
                        }
                    }
                }
            }
        }
    }
}

pub fn spawn_blocking(
    device_name: impl AsRef<str>,
    tx_file_request: tokio_chan::UnboundedSender<FileRequest>,
    rx_sink_request: cbeam_chan::Receiver<SinkRequest>,
    tx_sink_response: tokio_chan::UnboundedSender<SinkResponse>,
    tx_error: broadcast::Sender<SinkError>,
) -> Result<()> {
    let mut device = Device::try_new(device_name).or_else(|_| Device::try_default())?;
    let (tx_samples, rx_samples) = cbeam_chan::bounded(AUDIO_BUF_LEN);
    device.init(rx_samples)?;
    let sink = Sink::new(device, tx_sink_response, tx_error);
    thread::spawn(move || sink.run(tx_samples, tx_file_request, rx_sink_request));

    Ok(())
}