mod event;
use std::io::ErrorKind as IoErrorKind;
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::{atomic::*, Arc};
use std::thread;
use std::time::Duration;
use ahash::RandomState;
use dashmap::DashMap;
use quanta::Upkeep;
use rmp_serde as rmps;
pub(crate) use self::event::*;
use crate::type_alias::*;
struct NexusSm {
uri: SocketAddr,
sm_evt_tx: DashMap<RpcId, SmEventTx, RandomState>,
sm_should_stop: AtomicBool,
}
impl NexusSm {
fn listen(self: Arc<Self>, socket: UdpSocket) {
const EVENT_MSG_SIZE_LIMIT: usize = 4 << 10; let mut buf = [0u8; EVENT_MSG_SIZE_LIMIT];
while !self.sm_should_stop.load(Ordering::Relaxed) {
let (amt, src) = match socket.recv_from(&mut buf) {
Ok(v) => v,
Err(ref e)
if matches!(e.kind(), IoErrorKind::WouldBlock | IoErrorKind::TimedOut) =>
{
continue
}
Err(e) => panic!("failed to receive UDP packet: {}", e),
};
let Ok(evt) = rmps::from_slice::<SmEvent>(&buf[..amt]) else {
log::debug!("Nexus SM: ignoring malformed event from {}", src);
continue;
};
let dst = evt.dst_rpc_id;
match self.sm_evt_tx.get(&dst) {
Some(tx) => tx.send(evt),
None => log::debug!("Nexus SM: ignoring event to non-existent RPC {}", dst),
};
}
}
}
pub struct Nexus {
sm: Arc<NexusSm>,
sm_thread: Option<thread::JoinHandle<()>>,
_upkeeper: Option<quanta::Handle>,
}
impl Nexus {
pub(crate) fn register_event_channel(&self, rpc_id: RpcId) -> SmEventRx {
let (tx, rx) = sm_event_channel();
assert!(self.sm.sm_evt_tx.insert(rpc_id, tx).is_none());
rx
}
pub(crate) fn destroy_event_channel(&self, rpc_id: RpcId) {
self.sm.sm_evt_tx.remove(&rpc_id);
}
}
impl Nexus {
pub fn new(uri: impl ToSocketAddrs) -> Arc<Self> {
let uri = uri
.to_socket_addrs()
.expect("failed to resolve remote URI")
.next()
.expect("no such remote URI");
let unspecified = match uri {
SocketAddr::V4(_) => "0.0.0.0",
SocketAddr::V6(_) => "::0",
};
let socket = UdpSocket::bind((unspecified, uri.port())).unwrap();
const SOCKET_READ_TIMEOUT: Duration = Duration::from_millis(100);
socket.set_read_timeout(Some(SOCKET_READ_TIMEOUT)).unwrap();
let sm = Arc::new(NexusSm {
uri,
sm_evt_tx: DashMap::with_capacity_and_hasher(256, RandomState::new()),
sm_should_stop: AtomicBool::new(false),
});
let sm_listener = {
let sm = sm.clone();
thread::spawn(move || sm.listen(socket))
};
const UPKEEP_INTERVAL: Duration = Duration::from_millis(1);
let upkeeper = Upkeep::new(UPKEEP_INTERVAL).start();
if let Err(quanta::Error::FailedToSpawnUpkeepThread(ref e)) = upkeeper {
panic!("failed to spawn clock upkeep thread: {}", e);
}
Arc::new(Self {
sm,
sm_thread: Some(sm_listener),
_upkeeper: upkeeper.ok(),
})
}
#[inline]
pub fn uri(&self) -> SocketAddr {
self.sm.uri
}
}
impl Drop for Nexus {
fn drop(&mut self) {
self.sm.sm_should_stop.store(true, Ordering::SeqCst);
self.sm_thread.take().unwrap().join().unwrap();
}
}