use std::{
io::{self, Read},
os::unix::net::UnixStream,
sync::mpsc::{Receiver, Sender, TryRecvError, channel},
thread,
};
use lunar_lib::trace;
use crate::{
ConnectErrorKind, IpcHandleError, IpcRequest, PacketError, PacketType, PlayerEvent,
config::{daemon_config, initialize_daemon_config},
daemon::{SeleneDaemonHandle, read_packet_data, send_command},
listener::unix_socket_listener::{SELENE_UNIX_SOCKET_PATH, selene_unix_socket_path},
wait,
};
pub(crate) struct UnixSocketHandle {
rx: Receiver<IpcRequest>,
stream: Option<UnixStream>,
}
pub type CallbackFn = Box<dyn FnOnce(Result<&[u8], PacketError>) + Send>;
impl SeleneDaemonHandle for UnixSocketHandle {
fn connect<F>(callback: Option<F>) -> Result<Sender<IpcRequest>, IpcHandleError>
where
Self: Sized,
F: FnMut(PlayerEvent) + Send + Sync + 'static,
{
trace!("Connecting to UnixSocketHandle");
initialize_daemon_config()?;
let _ = SELENE_UNIX_SOCKET_PATH.set(daemon_config().main.socket_path.clone());
let stream = UnixStream::connect(selene_unix_socket_path()).map_err(|err| {
let reason = match err.kind() {
io::ErrorKind::NotFound => ConnectErrorKind::DaemonNotRunning,
io::ErrorKind::ConnectionRefused => ConnectErrorKind::ConnectionRefused,
_ => ConnectErrorKind::Other(err),
};
IpcHandleError::FailedToConnect(reason)
})?;
let (tx, rx) = channel();
let handle = UnixSocketHandle {
rx,
stream: Some(stream),
};
thread::spawn(|| handle.run(callback));
Ok(tx)
}
fn reconnect(&mut self) -> bool {
let Ok(stream) = UnixStream::connect(selene_unix_socket_path()) else {
return false;
};
self.stream = Some(stream);
true
}
fn run<F>(mut self, mut callback: Option<F>)
where
F: FnMut(PlayerEvent) + Send + Sync + 'static,
{
let mut current_request_callback: Option<CallbackFn> = None;
loop {
let Some(mut stream) = self.stream.take() else {
todo!()
};
if current_request_callback.is_none() {
match self.rx.try_recv() {
Ok(IpcRequest { command, callback }) => {
if send_command(&mut stream, &command).is_err() {
continue;
}
if command.responds() {
current_request_callback = callback;
}
}
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => return,
}
}
stream.set_nonblocking(true).unwrap();
let mut type_buf = [0u8; 1];
let packet_type = match stream.read_exact(&mut type_buf) {
Ok(()) => PacketType::from(type_buf[0]),
Err(err) if matches!(err.kind(), io::ErrorKind::WouldBlock) => {
wait();
self.stream = Some(stream);
continue;
}
Err(_) => continue,
};
stream.set_nonblocking(false).unwrap();
let Ok(data) = read_packet_data(&mut stream) else {
continue;
};
match packet_type {
PacketType::Unknown => panic!("Daemon sent unknown bytes"),
PacketType::Event => {
if let Some(ref mut callback) = callback {
let event: PlayerEvent = ciborium::from_reader(data.as_slice())
.expect("Daemon sent corrupted bytes");
callback(event);
}
}
PacketType::Response => {
if let Some(ipc_callback) = current_request_callback.take() {
ipc_callback(Ok(&data));
}
}
PacketType::Error => {
let err: PacketError = ciborium::from_reader(data.as_slice())
.expect("Daemon sent corrupted bytes");
if let Some(ipc_callback) = current_request_callback.take() {
ipc_callback(Err(err));
}
}
PacketType::Disconnect => {
if let Some(ipc_callback) = current_request_callback.take() {
ipc_callback(Err(PacketError::Disconnect));
}
continue;
}
}
self.stream = Some(stream);
}
}
}