selene-daemon 0.1.0

Official music player daemon for Selene
Documentation
use std::{
    collections::VecDeque,
    io::{self, Read, Write},
    os::unix::net::UnixStream,
    sync::mpsc::{Receiver, RecvError, Sender, TryRecvError, channel},
    thread,
};

use lunar_lib::{error, trace};

use crate::{
    ConnectErrorKind, IpcActionError, IpcCommand, IpcHandleError, PacketType, PlayerEvent,
    PlayerResponse,
    daemon::{DaemonHandle, IpcRequest},
};

pub const SELENE_UNIX_SOCKET_PATH: &str = "/tmp/selene.sock";

pub(crate) struct UnixSocketHandle {
    rx: Receiver<IpcRequest>,
    stream: Option<UnixStream>,
}

impl DaemonHandle<IpcCommand, PlayerResponse> for UnixSocketHandle {
    fn connect<F>(callback: Option<F>) -> Result<Sender<IpcRequest>, IpcHandleError>
    where
        Self: Sized,
        F: FnMut(PlayerEvent) + Send + Sync + 'static,
    {
        trace!("Connecting to UnixSocketHandle");
        let stream = UnixStream::connect(SELENE_UNIX_SOCKET_PATH).map_err(|err| {
            let reason = match err.kind() {
                io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
                io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
                _ => ConnectErrorKind::Other(err),
            };
            IpcHandleError::FailedToConnect(reason)
        })?;

        let (tx, rx) = channel();

        let handle = UnixSocketHandle {
            rx,
            stream: Some(stream),
        };

        thread::spawn(|| handle.run(callback));

        Ok(tx)
    }

    fn reconnect(&mut self) -> bool {
        let Ok(stream) = UnixStream::connect(SELENE_UNIX_SOCKET_PATH) else {
            return false;
        };

        self.stream = Some(stream);
        true
    }

    fn run<F>(mut self, mut callback: Option<F>) -> Result<(), IpcHandleError>
    where
        F: FnMut(PlayerEvent) + Send + Sync + 'static,
    {
        let mut queue: VecDeque<Sender<Result<PlayerResponse, IpcActionError>>> = VecDeque::new();

        'stream_loop: loop {
            let Some(stream) = &mut self.stream else {
                // Loop until reconnected or dropped
                loop {
                    match self.rx.recv() {
                        Ok(IpcRequest::Reconnect { callback }) => {
                            let success = self.reconnect();

                            let _ = callback.send(success);

                            if success {
                                continue 'stream_loop;
                            }
                        }
                        Ok(IpcRequest::Ipc {
                            command: _,
                            callback,
                        }) => {
                            let _ = callback.send(Err(IpcActionError::Disconnected));
                        }
                        Err(RecvError) => return Ok(()),
                    }
                }
            };

            // Loop until all commands have been sent
            // Break = Try to recieve responses
            loop {
                match self.rx.try_recv() {
                    // Reconnect attempt when already reconnected, just say it succeeded
                    Ok(IpcRequest::Reconnect { callback }) => {
                        let _ = callback.send(true);
                    }
                    // Command requested, send command
                    Ok(IpcRequest::Ipc { command, callback }) => {
                        let mut command_buf = Vec::new();
                        ciborium::into_writer(&command, &mut command_buf).unwrap();

                        let len = command_buf.len() as u32;

                        if stream.write_all(&len.to_be_bytes()).is_err() {
                            let _ = callback.send(Err(IpcActionError::Disconnected));
                            self.stream.take();
                            continue 'stream_loop;
                        };

                        if stream.write_all(&command_buf).is_err() {
                            let _ = callback.send(Err(IpcActionError::Disconnected));
                            self.stream.take();
                            continue 'stream_loop;
                        };

                        queue.push_back(callback);
                    }
                    // No commands in queue, break
                    Err(TryRecvError::Empty) => break,
                    // All senders disconnected, shutdown
                    Err(TryRecvError::Disconnected) => return Ok(()),
                }
            }

            // Try to deserialize all packets in the stream
            stream.set_nonblocking(true).unwrap();

            let mut read = [0u8; 1];
            match stream.read_exact(&mut read) {
                // Packet read, there is data
                Ok(()) => (),
                // No packet, just break
                Err(err) if matches!(err.kind(), io::ErrorKind::WouldBlock) => {
                    continue;
                }
                // Disconnected, continue and wait for reconnect
                Err(err) if matches!(err.kind(), io::ErrorKind::UnexpectedEof) => {
                    self.stream.take();
                    continue 'stream_loop;
                }
                // Unexpected error
                Err(err) => {
                    panic!("Ran into an unexpected error while reading from the stream: {err}")
                }
            }

            let packet_type = PacketType::from(read[0]);

            stream.set_nonblocking(false).unwrap();

            let mut len_buf = [0u8; 4];
            if stream.read_exact(&mut len_buf).is_err() {
                self.stream.take();
                continue 'stream_loop;
            }

            let packet_len = u32::from_be_bytes(len_buf) as usize;
            let mut packet_buf = vec![0u8; packet_len];
            if stream.read_exact(&mut packet_buf).is_err() {
                self.stream.take();
                continue 'stream_loop;
            }

            match packet_type {
                // Unknown packet, do nothing and continue to the next main loop
                PacketType::Unknown => {
                    continue;
                }
                // Response packet, send the response to the first item in the queue (FIFO) and continue to the next main loop
                PacketType::PlayerResponse => {
                    if let Some(pop_front) = queue.pop_front() {
                        let response: Result<PlayerResponse, IpcActionError> =
                            match ciborium::from_reader(packet_buf.as_slice()) {
                                Ok(v) => v,
                                Err(err) => {
                                    panic!("Daemon sent corrupted bytes: {err}")
                                }
                            };

                        let _ = pop_front.send(response);
                    } else {
                        error!(
                            "Daemon sent a 'PlayerResponse' packet when no response was expected"
                        )
                    }
                    continue;
                }
                // Event packet, send the event to the callback function and continue to the next main loop
                PacketType::PlayerEvent => {
                    let event: PlayerEvent = match ciborium::from_reader(packet_buf.as_slice()) {
                        Ok(v) => v,
                        Err(err) => {
                            panic!("Daemon sent corrupted bytes: {err}")
                        }
                    };

                    if let Some(callback) = &mut callback {
                        callback(event);
                    }
                    continue;
                }
            }
        }
    }
}