selene-daemon 0.5.7

Official music player daemon for Selene
Documentation
use std::{
    io::{self, Read},
    os::unix::net::UnixStream,
    sync::mpsc::{Receiver, Sender, TryRecvError, channel},
    thread,
};

use lunar_lib::trace;

use crate::{
    ConnectErrorKind, IpcHandleError, IpcRequest, PacketError, PacketType, PlayerEvent,
    config::{daemon_config, initialize_daemon_config},
    daemon::{SeleneDaemonHandle, read_packet_data, send_command},
    listener::unix_socket_listener::{SELENE_UNIX_SOCKET_PATH, selene_unix_socket_path},
    wait,
};

pub(crate) struct UnixSocketHandle {
    rx: Receiver<IpcRequest>,
    stream: Option<UnixStream>,
}

pub type CallbackFn = Box<dyn FnOnce(Result<&[u8], PacketError>) + Send>;

impl SeleneDaemonHandle for UnixSocketHandle {
    fn connect<F>(callback: Option<F>) -> Result<Sender<IpcRequest>, IpcHandleError>
    where
        Self: Sized,
        F: FnMut(PlayerEvent) + Send + Sync + 'static,
    {
        trace!("Connecting to UnixSocketHandle");

        initialize_daemon_config()?;
        let _ = SELENE_UNIX_SOCKET_PATH.set(daemon_config().main.socket_path.clone());

        let stream = UnixStream::connect(selene_unix_socket_path()).map_err(|err| {
            let reason = match err.kind() {
                io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
                io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
                _ => ConnectErrorKind::Other(err),
            };
            IpcHandleError::FailedToConnect(reason)
        })?;

        let (tx, rx) = channel();

        let handle = UnixSocketHandle {
            rx,
            stream: Some(stream),
        };

        thread::spawn(|| handle.run(callback));

        Ok(tx)
    }

    fn reconnect(&mut self) -> bool {
        let Ok(stream) = UnixStream::connect(selene_unix_socket_path()) else {
            return false;
        };

        self.stream = Some(stream);
        true
    }

    fn run<F>(mut self, mut callback: Option<F>)
    where
        F: FnMut(PlayerEvent) + Send + Sync + 'static,
    {
        let mut current_request_callback: Option<CallbackFn> = None;

        loop {
            let Some(mut stream) = self.stream.take() else {
                todo!()
            };

            if current_request_callback.is_none() {
                match self.rx.try_recv() {
                    Ok(IpcRequest { command, callback }) => {
                        if send_command(&mut stream, &command).is_err() {
                            continue;
                        }

                        if command.responds() {
                            current_request_callback = callback;
                        }
                    }
                    Err(TryRecvError::Empty) => (),
                    Err(TryRecvError::Disconnected) => return,
                }
            }

            stream.set_nonblocking(true).unwrap();
            let mut type_buf = [0u8; 1];
            let packet_type = match stream.read_exact(&mut type_buf) {
                Ok(()) => PacketType::from(type_buf[0]),
                Err(err) if matches!(err.kind(), io::ErrorKind::WouldBlock) => {
                    wait();
                    self.stream = Some(stream);
                    continue;
                }
                Err(_) => continue,
            };

            stream.set_nonblocking(false).unwrap();

            let Ok(data) = read_packet_data(&mut stream) else {
                continue;
            };

            match packet_type {
                PacketType::Unknown => panic!("Daemon sent unknown bytes"),
                PacketType::Event => {
                    if let Some(ref mut callback) = callback {
                        let event: PlayerEvent = ciborium::from_reader(data.as_slice())
                            .expect("Daemon sent corrupted bytes");

                        callback(event);
                    }
                }
                PacketType::Response => {
                    if let Some(ipc_callback) = current_request_callback.take() {
                        ipc_callback(Ok(&data));
                    }
                }
                PacketType::Error => {
                    let err: PacketError = ciborium::from_reader(data.as_slice())
                        .expect("Daemon sent corrupted bytes");

                    if let Some(ipc_callback) = current_request_callback.take() {
                        ipc_callback(Err(err));
                    }
                }
                PacketType::Disconnect => {
                    if let Some(ipc_callback) = current_request_callback.take() {
                        ipc_callback(Err(PacketError::Disconnect));
                    }
                    continue;
                }
            }

            self.stream = Some(stream);
        }
    }
}