use std::{
io::{self, Read},
os::unix::net::UnixStream,
sync::mpsc::{Receiver, Sender, TryRecvError, channel},
thread,
};
use lunar_lib::trace;
use crate::{
client::{
ConnectErrorKind, IpcHandleError, IpcReader, IpcRequest, IpcRx, PacketType,
SeleneIpcHandle, SeleneIpcRunner,
},
config::{daemon_config, initialize_daemon_config},
listener::unix_socket_listener::{SELENE_UNIX_SOCKET_PATH, selene_unix_socket_path},
player::PlayerEvent,
};
pub struct UnixSocketHandle {
stream_handle: Option<(UnixStream, UnixStream)>,
rx: Receiver<IpcRequest>,
inner: SeleneIpcRunner,
}
impl SeleneIpcHandle for UnixSocketHandle {
fn connect<F>(event_callback: F) -> Result<Sender<IpcRequest>, IpcHandleError>
where
Self: Sized,
F: FnMut(PlayerEvent) + Send + Sync + 'static,
{
trace!("Connecting to UnixSocketHandle");
initialize_daemon_config()?;
let _ = SELENE_UNIX_SOCKET_PATH.set(daemon_config().main.socket_path.clone());
let stream = UnixStream::connect(selene_unix_socket_path()).map_err(|err| {
let reason = match err.kind() {
io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
_ => ConnectErrorKind::Other(err),
};
IpcHandleError::FailedToConnect(reason)
})?;
let (tx, rx) = channel();
let handle = Self {
stream_handle: Some((stream.try_clone().unwrap(), stream)),
rx,
inner: SeleneIpcRunner::new(event_callback),
};
thread::spawn(move || {
handle.run();
});
Ok(tx)
}
fn reconnect(&mut self) -> Result<(), IpcHandleError> {
if self.stream_handle.is_some() {
return Ok(());
}
let stream = UnixStream::connect(selene_unix_socket_path()).map_err(|err| {
let reason = match err.kind() {
io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
_ => ConnectErrorKind::Other(err),
};
IpcHandleError::FailedToConnect(reason)
})?;
self.stream_handle = Some((stream.try_clone().unwrap(), stream));
Ok(())
}
fn run(mut self) {
loop {
let Some((reader, writer)) = &mut self.stream_handle else {
match self.rx.wait_for_reconnect_request() {
Some(callback) => {
let _ = callback.send(self.reconnect());
continue;
}
None => return,
}
};
let command = if self.inner.accepting_ipc() {
match self.rx.try_recv() {
Ok(command) => Some(command),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Disconnected) => return,
}
} else {
None
};
if let Ok(()) = self.inner.run_cycle(command, reader, writer) { } else {
self.stream_handle = None;
continue;
}
}
}
}
impl IpcReader for UnixStream {
fn try_read_packet_type(&mut self) -> io::Result<Option<PacketType>> {
self.set_nonblocking(true).unwrap();
let mut type_buf = [0_u8];
let result = self.read_exact(&mut type_buf);
self.set_nonblocking(false).unwrap();
if let Err(err) = result {
if matches!(err.kind(), io::ErrorKind::WouldBlock) {
self.set_nonblocking(false).unwrap();
return Ok(None);
}
return Err(err);
}
Ok(Some(PacketType::from(type_buf[0])))
}
}