use std::{
io::{self, Write},
sync::atomic::Ordering,
thread::{self, JoinHandle},
time::Duration,
};
use anyhow::anyhow;
use crossbeam::channel::{self, Receiver};
use lunar_lib::{database::DatabaseError, log::error};
use thiserror::Error;
#[cfg(feature = "local-session")]
use crate::{
cpal_thread::PlaybackError,
local_session::{LocalSessionHandle, local_session},
};
use crate::{
Method, SHUTDOWN,
common::{
AudioHandshakeRequest, AudioHandshakeResponse, HandshakeError, HandshakeType,
IPC_PROTOCOL_VERSION, SessionHandshakeRequest, SessionHandshakeResponse, SessionTarget,
SessionToken, Stream,
},
init_images_db, init_library_db,
ipc_common::Packetable,
listener::{ConnectionListener, SessionEventSender},
process::ProcessThread,
session::{SessionHandle, session_registry},
should_shutdown,
};
#[cfg(feature = "plugin-support")]
static PLUGIN_HOLD: std::sync::OnceLock<selene_plugin_sdk::LoadedLibraries> =
std::sync::OnceLock::new();
#[derive(Debug, Error)]
pub enum DaemonStartError {
#[error("Io: {0}")]
Io(#[from] io::Error),
#[error("Database: {0}")]
Database(#[from] DatabaseError),
#[cfg(feature = "local-session")]
#[error("Cpal: {0}")]
Cpal(#[from] PlaybackError),
}
pub struct SeleneDaemon {
client_rx: Receiver<Box<dyn Stream>>,
process_thread: JoinHandle<()>,
}
pub struct SeleneDaemonBuilder {
#[cfg(feature = "plugin-support")]
plugins: Option<selene_plugin_sdk::LoadedLibraries>,
}
impl SeleneDaemonBuilder {
fn new() -> Self {
Self {
#[cfg(feature = "plugin-support")]
plugins: None,
}
}
#[cfg(feature = "plugin-support")]
#[allow(unsafe_code)]
#[must_use]
pub fn with_plugins(mut self, plugin_container: selene_plugin_sdk::LoadedLibraries) -> Self {
self.plugins = Some(plugin_container);
self
}
pub fn start(self) -> Result<(), DaemonStartError> {
let (stream_tx, stream_rx) = channel::unbounded();
ConnectionListener::open(stream_tx)?;
let process_thread = ProcessThread::open();
init_library_db()?;
init_images_db()?;
#[cfg(feature = "plugin-support")]
if let Some(plugin_container) = self.plugins {
use crate::{local_session::plugin_session, plugin_sdk_ext::StableLogger};
use selene_plugin_sdk::{
HostInfo,
local_session::{StableLocalSessionPluginDynMut, init_local_session_plugin},
};
PLUGIN_HOLD.set(plugin_container).unwrap();
let plugin_container = PLUGIN_HOLD.get().unwrap();
let mut plugin_handles = Vec::new();
let mut plugin_event_callbacks = Vec::new();
for plugin in plugin_container.plugins() {
if let Some(mut plugin_entry) = init_local_session_plugin(plugin) {
plugin_event_callbacks.push(plugin_entry.get_event_callback());
plugin_handles.push(plugin_entry);
}
}
LocalSessionHandle::init_with_plugins(plugin_event_callbacks)?;
for plugin in &mut plugin_handles {
plugin.start(
plugin_session(),
HostInfo::new(env!("CARGO_PKG_VERSION").to_owned(), StableLogger),
);
}
} else {
LocalSessionHandle::init()?;
};
#[cfg(all(feature = "local-session", not(feature = "plugin-support")))]
LocalSessionHandle::init()?;
let daemon = SeleneDaemon {
client_rx: stream_rx,
process_thread,
};
daemon.start();
Ok(())
}
}
impl SeleneDaemon {
pub fn init() -> Result<SeleneDaemonBuilder, DatabaseError> {
Ok(SeleneDaemonBuilder::new())
}
pub fn start(mut self) {
while !should_shutdown()
&& let Ok(client_stream) = self.client_rx.recv()
{
if let Err(err) = self.accept_client(client_stream) {
error!("Failed to accept client stream: {err}");
}
}
SHUTDOWN.store(true, Ordering::Relaxed);
if let Err(err) = self.process_thread.join() {
error!("Process thread panicked: {err:?}");
}
}
}
impl SeleneDaemon {
fn route_session(
&mut self,
target: SessionTarget,
mut client: Box<dyn Stream>,
create_fn: impl FnOnce(Box<dyn Stream>, SessionToken) -> io::Result<SessionHandle>,
join_fn: impl FnOnce(Box<dyn Stream>, &SessionHandle, SessionToken) -> io::Result<()>,
) -> io::Result<()> {
let mut session_registry = session_registry();
match target {
SessionTarget::Create => {
let token = loop {
let token = SessionToken::generate();
#[cfg(feature = "local-session")]
let valid = !session_registry.contains_key(&token)
&& token != SessionToken::LOCAL_SESSION;
#[cfg(not(feature = "local-session"))]
let valid = !session_registry.contains_key(&token);
if valid {
break token;
}
};
let session = create_fn(client, token)?;
session_registry
.insert(token, session)
.expect("Token was verified to not exist");
Ok(())
}
SessionTarget::Join { token } => {
#[cfg(feature = "local-session")]
let session = if let Some(session) = session_registry.get(&token) {
session
} else if token == SessionToken::LOCAL_SESSION {
local_session()
} else {
let _ = Err::<(), HandshakeError>(HandshakeError::Refused)
.serialize_into_writer(&mut client);
return Ok(());
};
#[cfg(not(feature = "local-session"))]
let session = if let Some(session) = session_registry.get(&token) {
session
} else {
let _ = Err::<(), HandshakeError>(HandshakeError::Refused)
.serialize_into_writer(&mut client);
return Ok(());
};
join_fn(client, session, token)
}
}
}
pub fn accept_client(&mut self, mut client: Box<dyn Stream>) -> anyhow::Result<()> {
client.set_read_timeout(Some(Duration::from_secs(10)))?;
let mut header_buf = [0u8; 4];
client.read_exact(&mut header_buf)?;
client.write_all(&IPC_PROTOCOL_VERSION)?;
if header_buf != IPC_PROTOCOL_VERSION {
return Ok(());
}
client.read_exact(&mut header_buf)?;
match header_buf {
HandshakeType::SESSION => {
let data = client.read_data()?;
let SessionHandshakeRequest { session_target } =
SessionHandshakeRequest::deserialize_packet(&data)?;
self.route_session(
session_target,
client,
|mut client, token| {
Ok::<_, HandshakeError>(SessionHandshakeResponse { token })
.serialize_into_writer(&mut client)?;
let handle = SessionEventSender::connect(client, token);
let session = SessionHandle::create_with_event_client(handle);
Ok(session)
},
|mut client, session, token| {
Ok::<_, HandshakeError>(SessionHandshakeResponse { token })
.serialize_into_writer(&mut client)?;
let handle = SessionEventSender::connect(client, token);
session
.controller_tx
.send(handle)
.expect("Session handle exists, channel must be open");
Ok(())
},
)?;
}
HandshakeType::AUDIO => {
let data = client.read_data()?;
let AudioHandshakeRequest { session_target } =
AudioHandshakeRequest::deserialize_packet(&data)?;
self.route_session(
session_target,
client,
move |mut client, _| {
Ok::<_, HandshakeError>(AudioHandshakeResponse { track_info: None })
.serialize_into_writer(&mut client)?;
let session = SessionHandle::create_with_audio_client(client);
Ok(session)
},
move |client, session, _| {
session
.sink_tx
.send(client)
.expect("Session handle exists, channel must be open");
Ok(())
},
)?;
}
HandshakeType::METHOD => {
let method_data = client.read_data()?;
thread::spawn(move || {
let mut response = Method::resolve(method_data);
response.extend((response.len() as u32).to_be_bytes());
response.rotate_right(4);
let _ = client.write_all(&response);
});
}
_ => {
return Err(anyhow!("Client didn't send handshake packet"));
}
}
Ok(())
}
}