selene-daemon 0.9.0-alpha.2

Official music player daemon for Selene
Documentation
use std::{
    io::{self, Write},
    sync::atomic::Ordering,
    thread::{self, JoinHandle},
    time::Duration,
};

use anyhow::anyhow;
use crossbeam::channel::{self, Receiver};
use lunar_lib::{database::DatabaseError, log::error};
use thiserror::Error;

#[cfg(feature = "local-session")]
use crate::{
    cpal_thread::PlaybackError,
    local_session::{LocalSessionHandle, local_session},
};

use crate::{
    Method, SHUTDOWN,
    common::{
        AudioHandshakeRequest, AudioHandshakeResponse, HandshakeError, HandshakeType,
        IPC_PROTOCOL_VERSION, SessionHandshakeRequest, SessionHandshakeResponse, SessionTarget,
        SessionToken, Stream,
    },
    init_images_db, init_library_db,
    ipc_common::Packetable,
    listener::{ConnectionListener, SessionEventSender},
    process::ProcessThread,
    session::{SessionHandle, session_registry},
    should_shutdown,
};

#[cfg(feature = "plugin-support")]
static PLUGIN_HOLD: std::sync::OnceLock<selene_plugin_sdk::LoadedLibraries> =
    std::sync::OnceLock::new();

#[derive(Debug, Error)]
pub enum DaemonStartError {
    #[error("Io: {0}")]
    Io(#[from] io::Error),

    #[error("Database: {0}")]
    Database(#[from] DatabaseError),

    #[cfg(feature = "local-session")]
    #[error("Cpal: {0}")]
    Cpal(#[from] PlaybackError),
}

pub struct SeleneDaemon {
    client_rx: Receiver<Box<dyn Stream>>,
    process_thread: JoinHandle<()>,
}

pub struct SeleneDaemonBuilder {
    #[cfg(feature = "plugin-support")]
    plugins: Option<selene_plugin_sdk::LoadedLibraries>,
}

impl SeleneDaemonBuilder {
    fn new() -> Self {
        Self {
            #[cfg(feature = "plugin-support")]
            plugins: None,
        }
    }

    #[cfg(feature = "plugin-support")]
    #[allow(unsafe_code)]
    #[must_use]
    pub fn with_plugins(mut self, plugin_container: selene_plugin_sdk::LoadedLibraries) -> Self {
        self.plugins = Some(plugin_container);
        self
    }

    pub fn start(self) -> Result<(), DaemonStartError> {
        let (stream_tx, stream_rx) = channel::unbounded();
        ConnectionListener::open(stream_tx)?;
        let process_thread = ProcessThread::open();

        init_library_db()?;
        init_images_db()?;

        #[cfg(feature = "plugin-support")]
        if let Some(plugin_container) = self.plugins {
            use crate::{local_session::plugin_session, plugin_sdk_ext::StableLogger};
            use selene_plugin_sdk::{
                HostInfo,
                local_session::{StableLocalSessionPluginDynMut, init_local_session_plugin},
            };

            PLUGIN_HOLD.set(plugin_container).unwrap();
            let plugin_container = PLUGIN_HOLD.get().unwrap();

            let mut plugin_handles = Vec::new();
            let mut plugin_event_callbacks = Vec::new();
            for plugin in plugin_container.plugins() {
                if let Some(mut plugin_entry) = init_local_session_plugin(plugin) {
                    plugin_event_callbacks.push(plugin_entry.get_event_callback());
                    plugin_handles.push(plugin_entry);
                }
            }

            LocalSessionHandle::init_with_plugins(plugin_event_callbacks)?;

            for plugin in &mut plugin_handles {
                plugin.start(
                    plugin_session(),
                    HostInfo::new(env!("CARGO_PKG_VERSION").to_owned(), StableLogger),
                );
            }
        } else {
            LocalSessionHandle::init()?;
        };

        #[cfg(all(feature = "local-session", not(feature = "plugin-support")))]
        LocalSessionHandle::init()?;

        let daemon = SeleneDaemon {
            client_rx: stream_rx,
            process_thread,
        };

        daemon.start();

        Ok(())
    }
}

impl SeleneDaemon {
    pub fn init() -> Result<SeleneDaemonBuilder, DatabaseError> {
        Ok(SeleneDaemonBuilder::new())
    }

    pub fn start(mut self) {
        while !should_shutdown()
            && let Ok(client_stream) = self.client_rx.recv()
        {
            if let Err(err) = self.accept_client(client_stream) {
                error!("Failed to accept client stream: {err}");
            }
        }

        SHUTDOWN.store(true, Ordering::Relaxed);

        if let Err(err) = self.process_thread.join() {
            error!("Process thread panicked: {err:?}");
        }
    }
}

impl SeleneDaemon {
    fn route_session(
        &mut self,
        target: SessionTarget,
        mut client: Box<dyn Stream>,
        create_fn: impl FnOnce(Box<dyn Stream>, SessionToken) -> io::Result<SessionHandle>,
        join_fn: impl FnOnce(Box<dyn Stream>, &SessionHandle, SessionToken) -> io::Result<()>,
    ) -> io::Result<()> {
        let mut session_registry = session_registry();
        match target {
            SessionTarget::Create => {
                let token = loop {
                    let token = SessionToken::generate();

                    #[cfg(feature = "local-session")]
                    let valid = !session_registry.contains_key(&token)
                        && token != SessionToken::LOCAL_SESSION;

                    #[cfg(not(feature = "local-session"))]
                    let valid = !session_registry.contains_key(&token);

                    if valid {
                        break token;
                    }
                };

                let session = create_fn(client, token)?;
                session_registry
                    .insert(token, session)
                    .expect("Token was verified to not exist");

                Ok(())
            }
            SessionTarget::Join { token } => {
                #[cfg(feature = "local-session")]
                let session = if let Some(session) = session_registry.get(&token) {
                    session
                } else if token == SessionToken::LOCAL_SESSION {
                    local_session()
                } else {
                    let _ = Err::<(), HandshakeError>(HandshakeError::Refused)
                        .serialize_into_writer(&mut client);
                    return Ok(());
                };

                #[cfg(not(feature = "local-session"))]
                let session = if let Some(session) = session_registry.get(&token) {
                    session
                } else {
                    let _ = Err::<(), HandshakeError>(HandshakeError::Refused)
                        .serialize_into_writer(&mut client);
                    return Ok(());
                };

                join_fn(client, session, token)
            }
        }
    }

    pub fn accept_client(&mut self, mut client: Box<dyn Stream>) -> anyhow::Result<()> {
        client.set_read_timeout(Some(Duration::from_secs(10)))?;

        let mut header_buf = [0u8; 4];

        // Consumes version header
        client.read_exact(&mut header_buf)?;
        client.write_all(&IPC_PROTOCOL_VERSION)?;
        if header_buf != IPC_PROTOCOL_VERSION {
            return Ok(());
        }

        // Consumes type header
        client.read_exact(&mut header_buf)?;

        match header_buf {
            HandshakeType::SESSION => {
                let data = client.read_data()?;
                let SessionHandshakeRequest { session_target } =
                    SessionHandshakeRequest::deserialize_packet(&data)?;

                self.route_session(
                    session_target,
                    client,
                    |mut client, token| {
                        Ok::<_, HandshakeError>(SessionHandshakeResponse { token })
                            .serialize_into_writer(&mut client)?;

                        let handle = SessionEventSender::connect(client, token);
                        let session = SessionHandle::create_with_event_client(handle);

                        Ok(session)
                    },
                    |mut client, session, token| {
                        Ok::<_, HandshakeError>(SessionHandshakeResponse { token })
                            .serialize_into_writer(&mut client)?;

                        let handle = SessionEventSender::connect(client, token);
                        session
                            .controller_tx
                            .send(handle)
                            .expect("Session handle exists, channel must be open");

                        Ok(())
                    },
                )?;
            }
            HandshakeType::AUDIO => {
                let data = client.read_data()?;
                let AudioHandshakeRequest { session_target } =
                    AudioHandshakeRequest::deserialize_packet(&data)?;

                self.route_session(
                    session_target,
                    client,
                    move |mut client, _| {
                        Ok::<_, HandshakeError>(AudioHandshakeResponse { track_info: None })
                            .serialize_into_writer(&mut client)?;
                        let session = SessionHandle::create_with_audio_client(client);

                        Ok(session)
                    },
                    move |client, session, _| {
                        session
                            .sink_tx
                            .send(client)
                            .expect("Session handle exists, channel must be open");

                        Ok(())
                    },
                )?;
            }
            HandshakeType::METHOD => {
                let method_data = client.read_data()?;
                thread::spawn(move || {
                    let mut response = Method::resolve(method_data);
                    response.extend((response.len() as u32).to_be_bytes());
                    response.rotate_right(4);
                    let _ = client.write_all(&response);
                });
            }
            _ => {
                return Err(anyhow!("Client didn't send handshake packet"));
            }
        }

        Ok(())
    }
}