Skip to main content

selene_daemon/client/
unix_socket_handle.rs

1use std::{
2    io::{self, Read},
3    os::unix::net::UnixStream,
4    sync::mpsc::{Receiver, Sender, TryRecvError, channel},
5    thread,
6};
7
8use lunar_lib::trace;
9
10use crate::{
11    client::{
12        ConnectErrorKind, IpcHandleError, IpcReader, IpcRequest, IpcRx, PacketType,
13        SeleneIpcHandle, SeleneIpcRunner,
14    },
15    config::{daemon_config, initialize_daemon_config},
16    listener::unix_socket_listener::{SELENE_UNIX_SOCKET_PATH, selene_unix_socket_path},
17    player::PlayerEvent,
18};
19
20pub struct UnixSocketHandle {
21    stream_handle: Option<(UnixStream, UnixStream)>,
22    rx: Receiver<IpcRequest>,
23
24    inner: SeleneIpcRunner,
25}
26
27impl SeleneIpcHandle for UnixSocketHandle {
28    fn connect<F>(event_callback: F) -> Result<Sender<IpcRequest>, IpcHandleError>
29    where
30        Self: Sized,
31        F: FnMut(PlayerEvent) + Send + Sync + 'static,
32    {
33        trace!("Connecting to UnixSocketHandle");
34
35        initialize_daemon_config()?;
36        let _ = SELENE_UNIX_SOCKET_PATH.set(daemon_config().main.socket_path.clone());
37
38        let stream = UnixStream::connect(selene_unix_socket_path()).map_err(|err| {
39            let reason = match err.kind() {
40                io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
41                io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
42                _ => ConnectErrorKind::Other(err),
43            };
44            IpcHandleError::FailedToConnect(reason)
45        })?;
46
47        let (tx, rx) = channel();
48
49        let handle = Self {
50            stream_handle: Some((stream.try_clone().unwrap(), stream)),
51            rx,
52            inner: SeleneIpcRunner::new(event_callback),
53        };
54
55        thread::spawn(move || {
56            handle.run();
57        });
58
59        Ok(tx)
60    }
61
62    fn reconnect(&mut self) -> Result<(), IpcHandleError> {
63        if self.stream_handle.is_some() {
64            return Ok(());
65        }
66
67        let stream = UnixStream::connect(selene_unix_socket_path()).map_err(|err| {
68            let reason = match err.kind() {
69                io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
70                io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
71                _ => ConnectErrorKind::Other(err),
72            };
73            IpcHandleError::FailedToConnect(reason)
74        })?;
75
76        self.stream_handle = Some((stream.try_clone().unwrap(), stream));
77        Ok(())
78    }
79
80    fn run(mut self) {
81        loop {
82            let Some((reader, writer)) = &mut self.stream_handle else {
83                match self.rx.wait_for_reconnect_request() {
84                    Some(callback) => {
85                        let _ = callback.send(self.reconnect());
86                        continue;
87                    }
88                    None => return,
89                }
90            };
91
92            let command = if self.inner.accepting_ipc() {
93                match self.rx.try_recv() {
94                    Ok(command) => Some(command),
95                    Err(TryRecvError::Empty) => None,
96                    Err(TryRecvError::Disconnected) => return,
97                }
98            } else {
99                None
100            };
101
102            if let Ok(()) = self.inner.run_cycle(command, reader, writer) {  } else {
103                self.stream_handle = None;
104                continue;
105            }
106        }
107    }
108}
109
110impl IpcReader for UnixStream {
111    fn try_read_packet_type(&mut self) -> io::Result<Option<PacketType>> {
112        self.set_nonblocking(true).unwrap();
113        let mut type_buf = [0_u8];
114
115        let result = self.read_exact(&mut type_buf);
116        self.set_nonblocking(false).unwrap();
117
118        if let Err(err) = result {
119            if matches!(err.kind(), io::ErrorKind::WouldBlock) {
120                self.set_nonblocking(false).unwrap();
121                return Ok(None);
122            }
123            return Err(err);
124        }
125
126        Ok(Some(PacketType::from(type_buf[0])))
127    }
128}