use std::{
collections::VecDeque,
io::{self, Read, Write},
os::unix::net::UnixStream,
path::PathBuf,
sync::{
LazyLock,
mpsc::{Receiver, RecvError, Sender, TryRecvError, channel},
},
thread,
};
use lunar_lib::{error, trace};
use selene_core::runtime_dir;
use crate::{
ConnectErrorKind, IpcActionError, IpcCommand, IpcHandleError, PacketType, PlayerEvent,
PlayerResponse,
daemon::{DaemonHandle, IpcRequest},
};
pub static SELENE_UNIX_SOCKET_PATH: LazyLock<PathBuf> = LazyLock::new(|| runtime_dir());
pub(crate) struct UnixSocketHandle {
rx: Receiver<IpcRequest>,
stream: Option<UnixStream>,
}
impl DaemonHandle<IpcCommand, PlayerResponse> for UnixSocketHandle {
fn connect<F>(callback: Option<F>) -> Result<Sender<IpcRequest>, IpcHandleError>
where
Self: Sized,
F: FnMut(PlayerEvent) + Send + Sync + 'static,
{
trace!("Connecting to UnixSocketHandle");
let stream = UnixStream::connect(SELENE_UNIX_SOCKET_PATH.clone()).map_err(|err| {
let reason = match err.kind() {
io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
_ => ConnectErrorKind::Other(err),
};
IpcHandleError::FailedToConnect(reason)
})?;
let (tx, rx) = channel();
let handle = UnixSocketHandle {
rx,
stream: Some(stream),
};
thread::spawn(|| handle.run(callback));
Ok(tx)
}
fn reconnect(&mut self) -> bool {
let Ok(stream) = UnixStream::connect(SELENE_UNIX_SOCKET_PATH.clone()) else {
return false;
};
self.stream = Some(stream);
true
}
fn run<F>(mut self, mut callback: Option<F>) -> Result<(), IpcHandleError>
where
F: FnMut(PlayerEvent) + Send + Sync + 'static,
{
let mut queue: VecDeque<Sender<Result<PlayerResponse, IpcActionError>>> = VecDeque::new();
'stream_loop: loop {
let Some(stream) = &mut self.stream else {
loop {
match self.rx.recv() {
Ok(IpcRequest::Reconnect { callback }) => {
let success = self.reconnect();
let _ = callback.send(success);
if success {
continue 'stream_loop;
}
}
Ok(IpcRequest::Ipc {
command: _,
callback,
}) => {
let _ = callback.send(Err(IpcActionError::Disconnected));
}
Err(RecvError) => return Ok(()),
}
}
};
loop {
match self.rx.try_recv() {
Ok(IpcRequest::Reconnect { callback }) => {
let _ = callback.send(true);
}
Ok(IpcRequest::Ipc { command, callback }) => {
let mut command_buf = Vec::new();
ciborium::into_writer(&command, &mut command_buf).unwrap();
let len = command_buf.len() as u32;
if stream.write_all(&len.to_be_bytes()).is_err() {
let _ = callback.send(Err(IpcActionError::Disconnected));
self.stream.take();
continue 'stream_loop;
};
if stream.write_all(&command_buf).is_err() {
let _ = callback.send(Err(IpcActionError::Disconnected));
self.stream.take();
continue 'stream_loop;
};
queue.push_back(callback);
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return Ok(()),
}
}
stream.set_nonblocking(true).unwrap();
let mut read = [0u8; 1];
match stream.read_exact(&mut read) {
Ok(()) => (),
Err(err) if matches!(err.kind(), io::ErrorKind::WouldBlock) => {
continue;
}
Err(err) if matches!(err.kind(), io::ErrorKind::UnexpectedEof) => {
self.stream.take();
continue 'stream_loop;
}
Err(err) => {
panic!("Ran into an unexpected error while reading from the stream: {err}")
}
}
let packet_type = PacketType::from(read[0]);
stream.set_nonblocking(false).unwrap();
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).is_err() {
self.stream.take();
continue 'stream_loop;
}
let packet_len = u32::from_be_bytes(len_buf) as usize;
let mut packet_buf = vec![0u8; packet_len];
if stream.read_exact(&mut packet_buf).is_err() {
self.stream.take();
continue 'stream_loop;
}
match packet_type {
PacketType::Unknown => {
continue;
}
PacketType::PlayerResponse => {
if let Some(pop_front) = queue.pop_front() {
let response: Result<PlayerResponse, IpcActionError> =
match ciborium::from_reader(packet_buf.as_slice()) {
Ok(v) => v,
Err(err) => {
panic!("Daemon sent corrupted bytes: {err}")
}
};
let _ = pop_front.send(response);
} else {
error!(
"Daemon sent a 'PlayerResponse' packet when no response was expected"
)
}
continue;
}
PacketType::PlayerEvent => {
let event: PlayerEvent = match ciborium::from_reader(packet_buf.as_slice()) {
Ok(v) => v,
Err(err) => {
panic!("Daemon sent corrupted bytes: {err}")
}
};
if let Some(callback) = &mut callback {
callback(event);
}
continue;
}
}
}
}
}