selene-daemon 0.8.2

Official music player daemon for Selene
Documentation
use std::sync::mpsc::{Receiver, Sender, channel};

use blake3::Hash;
use selene_core::library::collectable::Collectable;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use thiserror::Error;

use crate::{
    LoopMode, ShuffleMode,
    client::{CallbackFn, IpcHandleError, SeleneClient},
    player::{PlaybackStatus, PlayerQueryFlags, QueryResult},
};

#[repr(u8)]
pub(crate) enum PacketType {
    /// Unknown packet. Either the client is out of date, or the daemon has a logic bug
    Unknown,

    /// An event, can be sent without input
    Event,

    /// A response to a command
    Response,

    /// A [`PacketError`]
    Error,

    /// Client has been disconnected from the daemon. This can happen from a manual disconnect, or from the daemon shutting down
    Disconnect,
}

#[derive(Debug, Error, Serialize, Deserialize, Clone, Copy)]
pub enum PacketError {
    #[error("Packet size '{size}' too large: Max size is {max_size}")]
    PacketTooLarge { size: usize, max_size: usize },

    #[error("Client was disconnected while waiting for a packet")]
    Disconnect,
}

impl From<u8> for PacketType {
    fn from(value: u8) -> Self {
        match value {
            1 => Self::Event,
            2 => Self::Response,
            3 => Self::Error,
            4 => Self::Disconnect,
            _ => Self::Unknown,
        }
    }
}
impl SeleneClient {
    // Generic
    pub fn ipc_generic(&self, command: IpcCommand) -> Result<(), PacketError> {
        self.handle_tx.no_response(command)
    }

    pub fn ipc_flush(&self) -> Result<(), PacketError> {
        self.handle_tx.request(IpcCommand::Flush)
    }

    pub fn ipc_disconnect(&self) {
        self.handle_tx.action(IpcCommand::Disconnect);
    }

    pub fn ipc_reload_config(&self) -> Result<Result<(), String>, PacketError> {
        self.handle_tx.request(IpcCommand::ReloadConfig)
    }

    // Playback
    pub fn ipc_play(&self, collectable: Vec<Collectable>) {
        self.handle_tx.action(IpcCommand::Play {
            collectables: collectable,
        });
    }

    pub fn ipc_stop(&self) {
        self.handle_tx.action(IpcCommand::Stop);
    }

    pub fn ipc_next(&self) {
        self.handle_tx.action(IpcCommand::Next);
    }

    pub fn ipc_previous(&self) {
        self.handle_tx.action(IpcCommand::Previous);
    }

    pub fn ipc_set_is_playing(&self, is_playing: bool) {
        self.handle_tx
            .action(IpcCommand::SetIsPlaying { is_playing });
    }

    pub fn ipc_toggle_is_playing(&self) -> Result<PlaybackStatus, PacketError> {
        self.handle_tx.request(IpcCommand::TogglePlaying)
    }

    pub fn ipc_seek(&self, time: f64, increment: bool) -> Result<Option<f64>, PacketError> {
        self.handle_tx.request(IpcCommand::Seek {
            seconds: time,
            increment,
        })
    }

    pub fn ipc_set_volume(&self, volume: f32, increment: bool) -> Result<f32, PacketError> {
        self.handle_tx
            .request(IpcCommand::SetVolume { volume, increment })
    }

    // Queue
    pub fn ipc_queue_set(
        &self,
        tracks: Vec<Collectable>,
        expected_state: Hash,
    ) -> Result<bool, PacketError> {
        self.handle_tx.request(IpcCommand::QueueSet {
            tracks,
            expected_state,
        })
    }

    pub fn ipc_queue_extend(&self, tracks: Vec<Collectable>) {
        self.handle_tx.action(IpcCommand::QueueExtend(tracks));
    }

    pub fn ipc_queue_shuffle(&self) {
        self.handle_tx.action(IpcCommand::QueueShuffle);
    }

    pub fn ipc_queue_clear(&self) {
        self.handle_tx.action(IpcCommand::QueueClear);
    }

    // Playlist
    pub fn ipc_playlist_set(
        &self,
        collectables: Vec<Collectable>,
        expected_state: Hash,
    ) -> Result<bool, PacketError> {
        self.handle_tx.request(IpcCommand::PlaylistSet {
            collectables,
            expected_state,
        })
    }

    pub fn ipc_playlist_extend(&self, collectables: Vec<Collectable>) {
        self.handle_tx
            .action(IpcCommand::PlaylistExtend(collectables));
    }

    pub fn ipc_playlist_clear(&self) {
        self.handle_tx.action(IpcCommand::PlaylistClear);
    }

    pub fn ipc_playlist_set_shuffle_mode(&self, shuffle_mode: ShuffleMode) {
        self.handle_tx
            .action(IpcCommand::PlaylistSetShuffleMode { shuffle_mode });
    }

    pub fn ipc_playlist_set_loop_mode(&self, loop_mode: LoopMode) {
        self.handle_tx
            .action(IpcCommand::PlaylistSetLoopMode { loop_mode });
    }

    // Tracklist
    pub fn ipc_tracklist_seek(&self, index: isize, increment: bool) -> Result<usize, PacketError> {
        self.handle_tx
            .request(IpcCommand::TracklistSeek { index, increment })
    }

    // Queries
    pub fn ipc_query(&self, flags: PlayerQueryFlags) -> Result<QueryResult, PacketError> {
        self.handle_tx.request(IpcCommand::GetState { flags })
    }
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[non_exhaustive]
pub enum IpcCommand {
    // Generic
    Flush,
    Disconnect,
    ReloadConfig,

    // Playback
    Play {
        collectables: Vec<Collectable>,
    },
    Stop,
    SetIsPlaying {
        is_playing: bool,
    },
    TogglePlaying,
    Seek {
        seconds: f64,
        increment: bool,
    },

    SetVolume {
        volume: f32,
        increment: bool,
    },

    // The override queue
    QueueSet {
        tracks: Vec<Collectable>,
        expected_state: Hash,
    },
    QueueExtend(Vec<Collectable>),
    QueueShuffle,
    QueueClear,

    // Playlist
    PlaylistSet {
        collectables: Vec<Collectable>,
        expected_state: Hash,
    },
    PlaylistExtend(Vec<Collectable>),
    PlaylistClear,

    // Playlist // Shuffle mode
    PlaylistSetShuffleMode {
        shuffle_mode: ShuffleMode,
    },

    // Playlist // Loop mode
    PlaylistSetLoopMode {
        loop_mode: LoopMode,
    },

    // Tracklist
    TracklistSeek {
        index: isize,
        increment: bool,
    },
    Next,
    Previous,

    // Queries
    GetState {
        flags: PlayerQueryFlags,
    },
}

impl IpcCommand {
    #[must_use]
    pub fn responds(&self) -> bool {
        matches!(
            self,
            IpcCommand::Flush
                | IpcCommand::TogglePlaying
                | IpcCommand::Seek { .. }
                | IpcCommand::SetVolume { .. }
                | IpcCommand::QueueSet { .. }
                | IpcCommand::PlaylistSet { .. }
                | IpcCommand::TracklistSeek { .. }
                | IpcCommand::GetState { .. }
        )
    }
}

pub enum IpcRequest {
    Ipc {
        command: IpcCommand,
        callback: Option<CallbackFn>,
    },
    Reconnect {
        callback: Sender<Result<(), IpcHandleError>>,
    },
}

pub trait IpcTx {
    fn no_response(&self, command: IpcCommand) -> Result<(), PacketError>;

    fn request<T: DeserializeOwned + Send + 'static>(
        &self,
        command: IpcCommand,
    ) -> Result<T, PacketError>;

    fn action(&self, command: IpcCommand);

    fn disconnect(&self);

    fn reconnect(&self) -> Result<(), IpcHandleError>;
}

impl IpcTx for Sender<IpcRequest> {
    fn no_response(&self, command: IpcCommand) -> Result<(), PacketError> {
        self.send(IpcRequest::Ipc {
            command,
            callback: None,
        })
        .unwrap();

        Ok(())
    }

    fn request<T: DeserializeOwned + Send + 'static>(
        &self,
        command: IpcCommand,
    ) -> Result<T, PacketError> {
        assert!(
            command.responds(),
            "Commands must always respond with request()"
        );

        let (tx, rx) = channel();
        let callback: CallbackFn = Box::new(move |result| {
            let _ = tx.send(
                result.map(|bytes| postcard::from_bytes(bytes).expect("Daemon sent invalid bytes")),
            );
        });

        self.send(IpcRequest::Ipc {
            command,
            callback: Some(Box::new(callback)),
        })
        .unwrap();

        rx.recv().unwrap()
    }

    fn action(&self, command: IpcCommand) {
        assert!(
            !command.responds(),
            "Commands must never respond with action()"
        );

        self.send(IpcRequest::Ipc {
            command,
            callback: None,
        })
        .unwrap();
    }

    fn disconnect(&self) {
        let _ = self.send(IpcRequest::Ipc {
            command: IpcCommand::Disconnect,
            callback: None,
        });
    }

    fn reconnect(&self) -> Result<(), IpcHandleError> {
        let (tx, rx) = channel();
        self.send(IpcRequest::Reconnect { callback: tx })
            .map_err(|_| IpcHandleError::HandleDied)?;
        rx.recv().map_err(|_| IpcHandleError::HandleDied)?
    }
}

pub(crate) trait IpcRx {
    fn wait_for_reconnect_request(&self) -> Option<Sender<Result<(), IpcHandleError>>>;
}

impl IpcRx for Receiver<IpcRequest> {
    fn wait_for_reconnect_request(&self) -> Option<Sender<Result<(), IpcHandleError>>> {
        loop {
            match self.recv() {
                Ok(IpcRequest::Ipc { callback, .. }) => {
                    if let Some(callback) = callback {
                        callback(Err(PacketError::Disconnect));
                    }

                    continue;
                }
                Ok(IpcRequest::Reconnect { callback }) => return Some(callback),
                Err(_) => return None,
            }
        }
    }
}