selene_daemon/client/
unix_socket_handle.rs1use std::{
2 io::{self, Read},
3 os::unix::net::UnixStream,
4 sync::mpsc::{Receiver, Sender, TryRecvError, channel},
5 thread,
6};
7
8use lunar_lib::trace;
9
10use crate::{
11 client::{
12 ConnectErrorKind, IpcHandleError, IpcReader, IpcRequest, IpcRx, PacketType,
13 SeleneIpcHandle, SeleneIpcRunner,
14 },
15 config::{daemon_config, initialize_daemon_config},
16 listener::unix_socket_listener::{SELENE_UNIX_SOCKET_PATH, selene_unix_socket_path},
17 player::PlayerEvent,
18};
19
20pub struct UnixSocketHandle {
21 stream_handle: Option<(UnixStream, UnixStream)>,
22 rx: Receiver<IpcRequest>,
23
24 inner: SeleneIpcRunner,
25}
26
27impl SeleneIpcHandle for UnixSocketHandle {
28 fn connect<F>(event_callback: F) -> Result<Sender<IpcRequest>, IpcHandleError>
29 where
30 Self: Sized,
31 F: FnMut(PlayerEvent) + Send + Sync + 'static,
32 {
33 trace!("Connecting to UnixSocketHandle");
34
35 initialize_daemon_config()?;
36 let _ = SELENE_UNIX_SOCKET_PATH.set(daemon_config().main.socket_path.clone());
37
38 let stream = UnixStream::connect(selene_unix_socket_path()).map_err(|err| {
39 let reason = match err.kind() {
40 io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
41 io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
42 _ => ConnectErrorKind::Other(err),
43 };
44 IpcHandleError::FailedToConnect(reason)
45 })?;
46
47 let (tx, rx) = channel();
48
49 let handle = Self {
50 stream_handle: Some((stream.try_clone().unwrap(), stream)),
51 rx,
52 inner: SeleneIpcRunner::new(event_callback),
53 };
54
55 thread::spawn(move || {
56 handle.run();
57 });
58
59 Ok(tx)
60 }
61
62 fn reconnect(&mut self) -> Result<(), IpcHandleError> {
63 if self.stream_handle.is_some() {
64 return Ok(());
65 }
66
67 let stream = UnixStream::connect(selene_unix_socket_path()).map_err(|err| {
68 let reason = match err.kind() {
69 io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
70 io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
71 _ => ConnectErrorKind::Other(err),
72 };
73 IpcHandleError::FailedToConnect(reason)
74 })?;
75
76 self.stream_handle = Some((stream.try_clone().unwrap(), stream));
77 Ok(())
78 }
79
80 fn run(mut self) {
81 loop {
82 let Some((reader, writer)) = &mut self.stream_handle else {
83 match self.rx.wait_for_reconnect_request() {
84 Some(callback) => {
85 let _ = callback.send(self.reconnect());
86 continue;
87 }
88 None => return,
89 }
90 };
91
92 let command = if self.inner.accepting_ipc() {
93 match self.rx.try_recv() {
94 Ok(command) => Some(command),
95 Err(TryRecvError::Empty) => None,
96 Err(TryRecvError::Disconnected) => return,
97 }
98 } else {
99 None
100 };
101
102 if let Ok(()) = self.inner.run_cycle(command, reader, writer) { } else {
103 self.stream_handle = None;
104 continue;
105 }
106 }
107 }
108}
109
110impl IpcReader for UnixStream {
111 fn try_read_packet_type(&mut self) -> io::Result<Option<PacketType>> {
112 self.set_nonblocking(true).unwrap();
113 let mut type_buf = [0_u8];
114
115 let result = self.read_exact(&mut type_buf);
116 self.set_nonblocking(false).unwrap();
117
118 if let Err(err) = result {
119 if matches!(err.kind(), io::ErrorKind::WouldBlock) {
120 self.set_nonblocking(false).unwrap();
121 return Ok(None);
122 }
123 return Err(err);
124 }
125
126 Ok(Some(PacketType::from(type_buf[0])))
127 }
128}