selene-daemon 0.5.1

Official music player daemon for Selene
Documentation
use std::{
    sync::{
        Arc, Mutex,
        atomic::{AtomicU32, Ordering},
        mpsc::{Receiver, Sender, TryRecvError, channel},
    },
    thread,
};

use lunar_lib::error;
use ringbuf::traits::Observer;

use crate::{
    PlayerEvent, SHUTDOWN,
    event_handler::EventTx,
    player::{
        OpenedDecoder, PlayerError, PlayerRequest, PlayerTx,
        playback::{CpalHandle, DeviceConfig},
    },
    playlist::{AtomicPlaybackStatus, PlaybackStatus, ResolvedTrack},
    wait,
};

pub struct DecoderHandle {
    rx: Receiver<DecoderCommand>,
    player_tx: Sender<PlayerRequest>,
    event_tx: Sender<PlayerEvent>,

    cpal_handle: Option<CpalHandle>,
    device_config: Arc<Mutex<DeviceConfig>>,

    current_decoder: Option<OpenedDecoder>,
    preload_decoder: Option<OpenedDecoder>,

    playback_state: Arc<AtomicPlaybackStatus>,
    volume: Arc<AtomicU32>,
}

impl DecoderHandle {
    pub(crate) fn open(
        player_tx: Sender<PlayerRequest>,
        event_tx: Sender<PlayerEvent>,
        device_config: Arc<Mutex<DeviceConfig>>,
        playback_state: Arc<AtomicPlaybackStatus>,
        volume: Arc<AtomicU32>,
    ) -> Result<Sender<DecoderCommand>, PlayerError> {
        let (tx, rx) = channel();

        let handle = Self {
            rx,
            player_tx,
            event_tx,

            cpal_handle: None,
            device_config,

            current_decoder: None,
            preload_decoder: None,

            playback_state,
            volume,
        };

        thread::spawn(move || {
            if let Err(err) = handle.run() {
                error!("DecoderHandle failed with error: {err}");
            }
            SHUTDOWN.store(true, Ordering::Relaxed);
        });

        Ok(tx)
    }

    fn run(mut self) -> Result<(), PlayerError> {
        loop {
            if SHUTDOWN.load(Ordering::Relaxed) {
                return Ok(());
            }

            loop {
                match self.rx.try_recv() {
                    Ok(command) => self.run_command(command)?,
                    Err(TryRecvError::Empty) => break,
                    Err(TryRecvError::Disconnected) => return Ok(()),
                }
            }

            if !matches!(
                self.playback_state.load(Ordering::Relaxed),
                PlaybackStatus::Playing
            ) {
                wait();
                continue;
            }

            let volume = f32::from_bits(self.volume.load(Ordering::Relaxed));
            if let Some(ref mut cpal_handle) = self.cpal_handle
                && let Some(finished_consuming) = cpal_handle.consume_packet(volume)
                && !finished_consuming
            {
                wait();
                continue;
            }

            if self.current_decoder.is_none() {
                if let Some(ref cpal_handle) = self.cpal_handle
                    && !cpal_handle.audio_buf.is_empty()
                {
                    wait();
                    continue;
                }

                self.cpal_handle = None;
                self.playback_state
                    .store(PlaybackStatus::Stopped, Ordering::SeqCst);
                self.event_tx.event(PlayerEvent::PlaybackStopped);
                continue;
            }

            // Use the current CpalHandle, or open a new one if one doesnt exist
            self.ensure_cpal_handle()?;

            if self.current_decoder.as_ref().unwrap().at_eof {
                if let Some(take) = self.preload_decoder.take() {
                    self.player_tx
                        .decoder_event(DecoderEvent::PreloadConsumed)?;
                    let currently_playing = take.decoded_from.clone();
                    self.current_decoder = Some(take);
                    self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
                        currently_playing: Box::new(currently_playing),
                    });
                } else {
                    self.current_decoder = None;
                    continue;
                }
            }

            let packet = self
                .current_decoder
                .as_mut()
                .unwrap()
                .decode_next_packet()?;
            self.cpal_handle.as_mut().unwrap().pending_packet = packet;
        }
    }

    fn run_command(&mut self, command: DecoderCommand) -> Result<(), PlayerError> {
        match command {
            DecoderCommand::Load { decoder } => {
                let currently_playing = decoder.decoded_from.clone();
                self.current_decoder = Some(*decoder);
                self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
                    currently_playing: Box::new(currently_playing),
                });
            }
            DecoderCommand::Preload { decoder } => self.preload_decoder = Some(*decoder),
            DecoderCommand::LoadAndPreload { load, preload } => {
                let currently_playing = load.decoded_from.clone();
                self.current_decoder = Some(*load);
                self.preload_decoder = Some(*preload);
                self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
                    currently_playing: Box::new(currently_playing),
                });
            }
            DecoderCommand::SetPlaying(playing) => {
                if let Some(ref decoder) = self.current_decoder {
                    let new_state = if playing {
                        PlaybackStatus::Playing
                    } else {
                        PlaybackStatus::Paused
                    };

                    if self.playback_state.load(Ordering::SeqCst) == new_state {
                        return Ok(());
                    }

                    self.playback_state.store(new_state, Ordering::SeqCst);

                    self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
                        is_playing: playing,
                        changed_at: decoder.time(),
                    });
                }
            }
            DecoderCommand::TogglePlaying { callback } => {
                let current_state = self.playback_state.load(Ordering::SeqCst);
                if let Some(ref decoder) = self.current_decoder {
                    let new_state = match current_state {
                        PlaybackStatus::Playing => PlaybackStatus::Paused,
                        PlaybackStatus::Paused | PlaybackStatus::Stopped => PlaybackStatus::Playing,
                    };

                    self.playback_state.store(new_state, Ordering::SeqCst);
                    callback.send(new_state)?;

                    self.event_tx.event(PlayerEvent::PlaybackIsPlayingChanged {
                        is_playing: matches!(new_state, PlaybackStatus::Playing),
                        changed_at: decoder.time(),
                    });
                } else {
                    callback.send(PlaybackStatus::Stopped)?;
                }
            }
            DecoderCommand::Seek {
                time,
                increment,
                callback,
            } => {
                if let Some(ref mut opened_decoder) = self.current_decoder {
                    let time = opened_decoder.seek(time, increment)?;
                    self.event_tx.event(PlayerEvent::SeekOccured { time });
                    callback.send(Some(time))?;
                } else {
                    callback.send(None)?;
                }
            }
            DecoderCommand::GetTime { callback } => {
                if let Some(ref opened_decoder) = self.current_decoder {
                    callback.send(Some(opened_decoder.time()))?;
                } else {
                    callback.send(None)?;
                }
            }
            DecoderCommand::GetPlaying { callback } => {
                if let Some(ref opened_decoder) = self.current_decoder {
                    callback.send(Some(opened_decoder.decoded_from.clone()))?;
                } else {
                    callback.send(None)?;
                }
            }
            DecoderCommand::Stop => {
                self.current_decoder = None;
                self.preload_decoder = None;
                self.playback_state
                    .store(PlaybackStatus::Stopped, Ordering::SeqCst);
                self.event_tx.event(PlayerEvent::PlaybackStopped);
            }
            DecoderCommand::Skip => {
                if let Some(take) = self.preload_decoder.take() {
                    self.player_tx
                        .decoder_event(DecoderEvent::PreloadConsumed)?;
                    let currently_playing = take.decoded_from.clone();
                    self.current_decoder = Some(take);
                    self.event_tx.event(PlayerEvent::CurrentlyPlayingChanged {
                        currently_playing: Box::new(currently_playing),
                    });
                } else {
                    self.current_decoder = None;
                }
            }
        }
        Ok(())
    }
}

impl DecoderHandle {
    /// Returns a mutable reference to the current [`CpalHandle`], opening one of one does not exist
    ///
    /// # Errors
    ///
    /// Errors if the [`CpalHandle`] fails to open
    pub fn ensure_cpal_handle(&mut self) -> Result<(), PlayerError> {
        if self.cpal_handle.is_none() {
            self.cpal_handle = Some(CpalHandle::open(&*self.device_config.lock()?)?);
        }

        Ok(())
    }
}

pub enum DecoderEvent {
    PreloadConsumed,
}

pub enum DecoderCommand {
    Load {
        decoder: Box<OpenedDecoder>,
    },
    Preload {
        decoder: Box<OpenedDecoder>,
    },
    LoadAndPreload {
        load: Box<OpenedDecoder>,
        preload: Box<OpenedDecoder>,
    },
    SetPlaying(bool),
    TogglePlaying {
        callback: Sender<PlaybackStatus>,
    },
    Seek {
        time: f64,
        increment: bool,
        callback: Sender<Option<f64>>,
    },
    GetTime {
        callback: Sender<Option<f64>>,
    },
    GetPlaying {
        callback: Sender<Option<ResolvedTrack>>,
    },
    Skip,
    Stop,
}

pub trait DecoderTx {
    fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
    fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError>;
    fn load_and_preload(
        &self,
        load: OpenedDecoder,
        preload: OpenedDecoder,
    ) -> Result<(), PlayerError>;
    fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError>;
    fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError>;
    fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError>;
    fn get_time(&self) -> Result<Option<f64>, PlayerError>;
    fn get_playing(&self) -> Result<Option<ResolvedTrack>, PlayerError>;
    fn stop(&self) -> Result<(), PlayerError>;
    fn skip(&self) -> Result<(), PlayerError>;
}

impl DecoderTx for Sender<DecoderCommand> {
    fn load(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
        self.send(DecoderCommand::Load {
            decoder: Box::new(decoder),
        })?;
        Ok(())
    }

    fn preload(&self, decoder: OpenedDecoder) -> Result<(), PlayerError> {
        self.send(DecoderCommand::Preload {
            decoder: Box::new(decoder),
        })?;
        Ok(())
    }

    fn load_and_preload(
        &self,
        load: OpenedDecoder,
        preload: OpenedDecoder,
    ) -> Result<(), PlayerError> {
        self.send(DecoderCommand::LoadAndPreload {
            load: Box::new(load),
            preload: Box::new(preload),
        })?;
        Ok(())
    }

    fn set_playing(&self, is_playing: bool) -> Result<(), PlayerError> {
        self.send(DecoderCommand::SetPlaying(is_playing))?;
        Ok(())
    }

    fn toggle_playing(&self) -> Result<PlaybackStatus, PlayerError> {
        let (tx, rx) = channel();
        self.send(DecoderCommand::TogglePlaying { callback: tx })?;
        Ok(rx.recv()?)
    }

    fn seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PlayerError> {
        let (tx, rx) = channel();
        self.send(DecoderCommand::Seek {
            time,
            increment,
            callback: tx,
        })?;
        Ok(rx.recv()?)
    }

    fn get_time(&self) -> Result<Option<f64>, PlayerError> {
        let (tx, rx) = channel();
        self.send(DecoderCommand::GetTime { callback: tx })?;
        Ok(rx.recv()?)
    }

    fn get_playing(&self) -> Result<Option<ResolvedTrack>, PlayerError> {
        let (tx, rx) = channel();
        self.send(DecoderCommand::GetPlaying { callback: tx })?;
        Ok(rx.recv()?)
    }

    fn stop(&self) -> Result<(), PlayerError> {
        self.send(DecoderCommand::Stop)?;
        Ok(())
    }

    fn skip(&self) -> Result<(), PlayerError> {
        self.send(DecoderCommand::Skip)?;
        Ok(())
    }
}