use std::thread::{Builder, JoinHandle};
use std::time::Duration;
use crossbeam::channel::{Receiver, Sender, TryRecvError, unbounded};
use crate::SOCKET_BUFFER_LEN;
use crate::controller::context::ControllerCtx;
use deimos_shared::peripherals::PeripheralId;
use super::{Socket, SocketAddrToken, SocketId, SocketPacketMeta};
pub enum SocketWorkerCommand {
Send { id: PeripheralId, payload: Vec<u8> },
Broadcast { payload: Vec<u8> },
UpdateMap {
id: PeripheralId,
token: SocketAddrToken,
},
Close,
}
pub enum SocketWorkerEvent {
Packet {
socket_id: SocketId,
meta: SocketPacketMeta,
payload: Vec<u8>,
},
Error { socket_id: SocketId, error: String },
Closed { socket_id: SocketId },
}
pub struct SocketWorker {
socket_id: SocketId,
socket: Box<dyn Socket>,
ctx: ControllerCtx,
recv_timeout: Duration,
cmd_rx: Receiver<SocketWorkerCommand>,
event_tx: Sender<SocketWorkerEvent>,
}
impl SocketWorker {
pub fn new(
socket_id: SocketId,
socket: Box<dyn Socket>,
ctx: ControllerCtx,
recv_timeout: Duration,
cmd_rx: Receiver<SocketWorkerCommand>,
event_tx: Sender<SocketWorkerEvent>,
) -> Self {
Self {
socket_id,
socket,
ctx,
recv_timeout,
cmd_rx,
event_tx,
}
}
pub fn run(mut self) -> Box<dyn Socket> {
if !self.socket.is_open()
&& let Err(err) = self.socket.open(&self.ctx)
{
let _ = self.event_tx.send(SocketWorkerEvent::Error {
socket_id: self.socket_id,
error: err,
});
let _ = self.event_tx.send(SocketWorkerEvent::Closed {
socket_id: self.socket_id,
});
return self.socket;
}
loop {
let (keep_running, handled_commands) = self.drain_commands();
if !keep_running {
break;
}
if handled_commands {
let mut payload = vec![0_u8; SOCKET_BUFFER_LEN];
if let Some(meta) = self.socket.recv(&mut payload, Duration::ZERO) {
payload.truncate(meta.size);
if self
.event_tx
.send(SocketWorkerEvent::Packet {
socket_id: self.socket_id,
meta,
payload,
})
.is_err()
{
break;
}
}
continue;
}
let mut payload = vec![0_u8; SOCKET_BUFFER_LEN];
if let Some(meta) = self.socket.recv(&mut payload, self.recv_timeout) {
payload.truncate(meta.size);
if self
.event_tx
.send(SocketWorkerEvent::Packet {
socket_id: self.socket_id,
meta,
payload,
})
.is_err()
{
break;
}
}
}
self.socket.close();
let _ = self.event_tx.send(SocketWorkerEvent::Closed {
socket_id: self.socket_id,
});
self.socket
}
fn drain_commands(&mut self) -> (bool, bool) {
let mut handled_any = false;
loop {
match self.cmd_rx.try_recv() {
Ok(command) => {
handled_any = true;
if !self.handle_command(command) {
return (false, handled_any);
}
}
Err(TryRecvError::Empty) => return (true, handled_any),
Err(TryRecvError::Disconnected) => return (false, handled_any),
}
}
}
fn handle_command(&mut self, command: SocketWorkerCommand) -> bool {
let result = match command {
SocketWorkerCommand::Send { id, payload } => self.socket.send(id, payload.as_slice()),
SocketWorkerCommand::Broadcast { payload } => self.socket.broadcast(payload.as_slice()),
SocketWorkerCommand::UpdateMap { id, token } => self.socket.update_map(id, token),
SocketWorkerCommand::Close => return false,
};
if let Err(err) = result {
return self
.event_tx
.send(SocketWorkerEvent::Error {
socket_id: self.socket_id,
error: err,
})
.is_ok();
}
true
}
}
pub struct SocketWorkerHandle {
pub cmd_tx: Sender<SocketWorkerCommand>,
thread: JoinHandle<Box<dyn Socket>>,
}
impl SocketWorkerHandle {
pub fn spawn(
socket_id: SocketId,
socket: Box<dyn Socket>,
ctx: ControllerCtx,
recv_timeout: Duration,
event_tx: Sender<SocketWorkerEvent>,
) -> Self {
let (cmd_tx, cmd_rx) = unbounded();
let worker = SocketWorker::new(socket_id, socket, ctx, recv_timeout, cmd_rx, event_tx);
let thread = Builder::new()
.name(format!("socket-worker-{socket_id}"))
.spawn(move || worker.run())
.expect("Failed to spawn socket worker thread");
Self { cmd_tx, thread }
}
pub fn join(self) -> Result<Box<dyn Socket>, String> {
self.thread
.join()
.map_err(|_| "Socket worker thread panicked".to_string())
}
}