selene-daemon 0.9.0-alpha.2

Official music player daemon for Selene
Documentation
use std::{
    io::{self, Write},
    net::TcpListener,
    sync::Arc,
    thread::{self},
    time::Duration,
};

use crossbeam::channel::{self, Receiver, Sender};
use interprocess::local_socket::{
    GenericNamespaced, ListenerOptions, ToNsName, traits::ListenerExt,
};
use lunar_lib::log::{error, info};

use crate::{
    common::{SessionHandshakeResponse, SessionToken, Stream},
    config::daemon_config,
    ipc_common::Packetable,
    should_shutdown, shutdown,
};

pub(crate) struct ConnectionListener;
impl ConnectionListener {
    pub(crate) fn open(stream_tx: Sender<Box<dyn Stream>>) -> io::Result<()> {
        let config = daemon_config();

        // Local
        let socket_listener = ListenerOptions::new()
            .name(
                config
                    .main
                    .socket_name
                    .as_str()
                    .to_ns_name::<GenericNamespaced>()?,
            )
            .create_sync()?;

        // Remote
        let tcp_listener = TcpListener::bind(config.main.host)?;

        let thread_tx = stream_tx.clone();
        thread::spawn(move || {
            while !should_shutdown()
                && let Some(connection) = socket_listener.incoming().next()
            {
                match connection {
                    Ok(stream) => thread_tx
                        .send(Box::new(stream))
                        .expect("Channel cannot die before thread closes"),
                    Err(err) => {
                        shutdown(format!(
                            "Local socket listener thread failed with error: {err}"
                        ));
                        break;
                    }
                }
            }
        });

        thread::spawn(move || {
            while !should_shutdown()
                && let Some(connection) = tcp_listener.incoming().next()
            {
                match connection {
                    Ok(stream) => stream_tx
                        .send(Box::new(stream))
                        .expect("Channel cannot die before thread closes"),
                    Err(err) => {
                        shutdown(format!(
                            "Local socket listener thread failed with error: {err}"
                        ));
                        break;
                    }
                }
            }
        });

        Ok(())
    }
}

pub(crate) struct SessionEventSender {
    event_rx: Receiver<Arc<[u8]>>,
    stream: Box<dyn Stream>,
    token: SessionToken,
}

impl SessionEventSender {
    #[must_use]
    pub(crate) fn connect(stream: Box<dyn Stream>, token: SessionToken) -> Sender<Arc<[u8]>> {
        let (event_tx, event_rx) = channel::unbounded();
        thread::spawn(move || {
            let client = Self {
                event_rx,
                stream,
                token,
            };

            if let Err(err) = client.start() {
                error!("Client disconnected with error: {err}");
            } else {
                info!("Client disconnected");
            }
        });

        event_tx
    }

    fn start(mut self) -> io::Result<()> {
        SessionHandshakeResponse { token: self.token }.serialize_into_writer(&mut self.stream)?;

        const TIMEOUT: Duration = Duration::from_secs(5);

        while !should_shutdown() {
            match self.event_rx.recv_timeout(TIMEOUT) {
                Ok(event) => {
                    self.stream.write_all(&event)?;
                }
                Err(channel::RecvTimeoutError::Timeout) => {
                    if self.stream.read(&mut [0u8])? != 0 {
                        return Err(io::Error::other(
                            "Client sent data when they shouldn't have",
                        ));
                    }
                }
                Err(channel::RecvTimeoutError::Disconnected) => break,
            }
        }

        Ok(())
    }
}

// impl ControllerThread {
//     fn run_ipc(&mut self, command: SessionRequest) -> anyhow::Result<Vec<u8>> {
//         let bytes = match command {
//             SessionRequest::Flush => ().serialize_packet(),
//             SessionRequest::ReloadConfig => DaemonConfig::reload().is_ok().serialize_packet(),
//             SessionRequest::GetState { flags } => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx
//                     .send(SessionCommand::GetState { flags, callback })?;
//                 rx.recv()?.serialize_packet()
//             }

//             SessionRequest::Play { collectables } => {
//                 let (callback, rx) = oneshot::channel();

//                 let playables = {
//                     let db = DbHandle::<LibraryDb>::open()?;
//                     collectables
//                         .into_iter()
//                         .flat_map(|p| Playable::from_collectable(p, &db))
//                         .collect()
//                 };

//                 self.session_tx.send(SessionCommand::Play {
//                     playables,
//                     callback,
//                 })?;
//                 rx.recv()?.serialize_packet()
//             }
//             SessionRequest::Stop => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx.send(SessionCommand::Stop { callback })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::SetIsPlaying { is_playing } => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx.send(SessionCommand::SetIsPlaying {
//                     is_playing,
//                     callback,
//                 })?;
//                 rx.recv()?.serialize_packet()
//             }
//             SessionRequest::TogglePlaying => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx
//                     .send(SessionCommand::TogglePlaying { callback })?;
//                 rx.recv()?.serialize_packet()
//             }
//             SessionRequest::Seek { seconds, increment } => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx.send(SessionCommand::Seek {
//                     seconds,
//                     increment,
//                     callback,
//                 })?;
//                 rx.recv()?.serialize_packet()
//             }
//             SessionRequest::QueueSet {
//                 collectables: tracks,
//             } => {
//                 let (callback, rx) = oneshot::channel();
//                 let tracks = {
//                     let db = DbHandle::<LibraryDb>::open()?;
//                     tracks
//                         .into_iter()
//                         .flat_map(|p| Playable::from_collectable(p, &db))
//                         .collect()
//                 };

//                 self.session_tx
//                     .send(SessionCommand::QueueSet { tracks, callback })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::QueueExtend {
//                 collectables: tracks,
//             } => {
//                 let (callback, rx) = oneshot::channel();
//                 let tracks = {
//                     let db = DbHandle::<LibraryDb>::open()?;
//                     tracks
//                         .into_iter()
//                         .flat_map(|p| Playable::from_collectable(p, &db))
//                         .collect()
//                 };

//                 self.session_tx
//                     .send(SessionCommand::QueueExtend { tracks, callback })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::QueueShuffle => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx
//                     .send(SessionCommand::QueueShuffle { callback })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::QueueClear => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx
//                     .send(SessionCommand::QueueClear { callback })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::PlaylistSet { collectables } => {
//                 let (callback, rx) = oneshot::channel();
//                 let playables = {
//                     let db = DbHandle::<LibraryDb>::open()?;
//                     collectables
//                         .into_iter()
//                         .flat_map(|p| Playable::from_collectable(p, &db))
//                         .collect()
//                 };

//                 self.session_tx.send(SessionCommand::PlaylistSet {
//                     playables,
//                     callback,
//                 })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::PlaylistExtend { collectables } => {
//                 let (callback, rx) = oneshot::channel();
//                 let playables = {
//                     let db = DbHandle::<LibraryDb>::open()?;
//                     collectables
//                         .into_iter()
//                         .flat_map(|p| Playable::from_collectable(p, &db))
//                         .collect()
//                 };

//                 self.session_tx.send(SessionCommand::PlaylistExtend {
//                     playables,
//                     callback,
//                 })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::PlaylistClear => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx
//                     .send(SessionCommand::PlaylistClear { callback })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::PlaylistSetShuffleMode { shuffle_mode } => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx
//                     .send(SessionCommand::PlaylistSetShuffleMode {
//                         shuffle_mode,
//                         callback,
//                     })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::PlaylistSetLoopMode { loop_mode } => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx.send(SessionCommand::PlaylistSetLoopMode {
//                     loop_mode,
//                     callback,
//                 })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::TracklistSeek { index, increment } => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx.send(SessionCommand::TracklistSeek {
//                     index,
//                     increment,
//                     callback,
//                 })?;
//                 rx.recv()?.serialize_packet()
//             }
//             SessionRequest::Next => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx.send(SessionCommand::Next { callback })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//             SessionRequest::Previous => {
//                 let (callback, rx) = oneshot::channel();
//                 self.session_tx
//                     .send(SessionCommand::Previous { callback })?;
//                 rx.recv()?;
//                 ().serialize_packet()
//             }
//         };

//         Ok(bytes)
//     }
// }