Skip to main content

selene_daemon/
client.rs

1use std::{
2    fmt::Display,
3    io::{self, Read, Write},
4    sync::mpsc::Sender,
5};
6
7pub type CallbackFn = Box<dyn FnOnce(Result<&[u8], PacketError>) + Send>;
8
9/// `DaemonHandle` implementation for unix using unix sockets
10#[cfg(unix)]
11pub mod unix_socket_handle;
12
13mod ipc;
14pub use ipc::*;
15use lunar_lib::config::ConfigError;
16use thiserror::Error;
17
18use crate::{player::PlayerEvent, wait};
19
20#[derive(Debug, Error)]
21pub enum IpcHandleError {
22    #[error("Failed to connect: {0}")]
23    FailedToConnect(ConnectErrorKind),
24
25    #[error("The handling thread cannot be communicated with")]
26    HandleDied,
27
28    #[error("The current platform is not supported")]
29    UnsupportedPlatform,
30}
31
32impl From<ConfigError> for IpcHandleError {
33    fn from(value: ConfigError) -> Self {
34        Self::FailedToConnect(ConnectErrorKind::FailedToLoadConfig(value.to_string()))
35    }
36}
37
38#[derive(Debug)]
39pub enum ConnectErrorKind {
40    DaemonNotRunning,
41    ConnectionRefused,
42    FailedToLoadConfig(String),
43    Other(io::Error),
44}
45
46impl Display for ConnectErrorKind {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            ConnectErrorKind::DaemonNotRunning => f.write_str("The daemon is not running"),
50            ConnectErrorKind::ConnectionRefused => {
51                f.write_str("The daemon listener thread halted and must be restarted")
52            }
53            ConnectErrorKind::FailedToLoadConfig(string) => string.fmt(f),
54            ConnectErrorKind::Other(error) => error.fmt(f),
55        }
56    }
57}
58
59// Interface stuff
60/// Daemon connection abstraction
61///
62/// Allow clients to connect to selene-daemon using one of the handles compatible with their system. Defines wrapper functions around all IPC calls to allow for easy get/set calls
63pub struct SeleneClient {
64    handle_tx: Sender<IpcRequest>,
65}
66
67// Daemon impls
68impl SeleneClient {
69    /// Create a new connection to the daemon
70    ///
71    /// # Errors
72    ///
73    /// This function will error if:
74    /// - There is no [`DaemonHandle`] configured for the current system
75    /// - The connection could not be established (May be because the daemon is not running)
76    pub fn connect<F>(event_callback: F) -> Result<SeleneClient, IpcHandleError>
77    where
78        Self: Sized,
79        F: FnMut(PlayerEvent) + Send + Sync + 'static,
80    {
81        #[cfg(unix)]
82        {
83            Ok(SeleneClient {
84                handle_tx: unix_socket_handle::UnixSocketHandle::connect(event_callback)?,
85            })
86        }
87
88        #[cfg(not(unix))]
89        {
90            Err(IpcHandleError::UnsupportedPlatform)
91        }
92    }
93}
94
95impl Drop for SeleneClient {
96    fn drop(&mut self) {
97        let () = self.handle_tx.disconnect();
98    }
99}
100
101trait SeleneIpcHandle {
102    fn connect<F>(event_callback: F) -> Result<Sender<IpcRequest>, IpcHandleError>
103    where
104        Self: Sized,
105        F: FnMut(PlayerEvent) + Send + Sync + 'static;
106
107    fn reconnect(&mut self) -> Result<(), IpcHandleError>;
108
109    fn run(self);
110}
111
112trait IpcReader: Read {
113    fn try_read_packet_type(&mut self) -> io::Result<Option<PacketType>>;
114}
115
116struct SeleneIpcRunner {
117    pending_ipc_callback: Option<CallbackFn>,
118    event_callback: Box<dyn FnMut(PlayerEvent) + Send + Sync + 'static>,
119}
120
121impl SeleneIpcRunner {
122    fn new<F>(event_callback: F) -> Self
123    where
124        F: FnMut(PlayerEvent) + Send + Sync + 'static,
125    {
126        Self {
127            pending_ipc_callback: None,
128            event_callback: Box::new(event_callback),
129        }
130    }
131
132    fn accepting_ipc(&self) -> bool {
133        self.pending_ipc_callback.is_none()
134    }
135
136    /// Runs a single cycle using the current handle
137    ///
138    /// Returns [`true`] if there was a disconnect
139    fn run_cycle<R, W>(
140        &mut self,
141        command: Option<IpcRequest>,
142        reader: &mut R,
143        writer: &mut W,
144    ) -> Result<(), ()>
145    where
146        R: IpcReader,
147        W: Write,
148    {
149        if self.pending_ipc_callback.is_none()
150            && let Some(command) = command
151        {
152            match command {
153                IpcRequest::Ipc { command, callback } => {
154                    if send_command(writer, &command).is_err() {
155                        return Ok(());
156                    }
157
158                    if command.responds() {
159                        self.pending_ipc_callback = callback;
160                    }
161                }
162                IpcRequest::Reconnect { callback } => {
163                    let _ = callback.send(Ok(()));
164                }
165            }
166        }
167
168        let packet_type = match reader.try_read_packet_type() {
169            Ok(Some(packet_type)) => packet_type,
170            Ok(None) => {
171                wait();
172                return Ok(());
173            }
174            Err(_) => return Err(()),
175        };
176
177        let Ok(data) = read_packet_data(reader) else {
178            return Ok(());
179        };
180
181        match packet_type {
182            PacketType::Unknown => panic!("Daemon sent unknown bytes"),
183            PacketType::Event => {
184                let event: PlayerEvent =
185                    postcard::from_bytes(data.as_slice()).expect("Daemon sent corrupted bytes");
186
187                (self.event_callback)(event);
188            }
189            PacketType::Response => {
190                if let Some(ipc_callback) = self.pending_ipc_callback.take() {
191                    ipc_callback(Ok(&data));
192                }
193            }
194            PacketType::Error => {
195                let err: PacketError =
196                    postcard::from_bytes(data.as_slice()).expect("Daemon sent corrupted bytes");
197
198                if let Some(ipc_callback) = self.pending_ipc_callback.take() {
199                    ipc_callback(Err(err));
200                }
201            }
202            PacketType::Disconnect => {
203                if let Some(ipc_callback) = self.pending_ipc_callback.take() {
204                    ipc_callback(Err(PacketError::Disconnect));
205                }
206
207                return Err(());
208            }
209        }
210
211        Ok(())
212    }
213}
214
215/// Assumes the next two byte groups are \[len\]\[data\] and that the packet type has already been read
216///
217/// Returns just the packet data from the buffer
218fn read_packet_data(reader: &mut impl Read) -> io::Result<Vec<u8>> {
219    let mut len_buf = [0u8; 4];
220    reader.read_exact(&mut len_buf)?;
221    let len = u32::from_be_bytes(len_buf) as usize;
222
223    let mut data = vec![0u8; len];
224    reader.read_exact(&mut data)?;
225
226    Ok(data)
227}
228
229fn send_command(
230    writer: &mut impl Write,
231    command: &IpcCommand,
232) -> Result<(), Box<dyn std::error::Error>> {
233    let buf = postcard::to_stdvec(&command).expect("Serialization should not fail.");
234    writer.write_all(&(buf.len() as u32).to_be_bytes())?;
235    writer.write_all(&buf)?;
236    Ok(())
237}