use std::net::SocketAddr;
use tokio::sync::broadcast;
pub type ConnId = u64;
pub type PeerId = u32;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ConnRoleTag {
Proxy,
Client,
Server,
DnodeProxy,
DnodeClient,
DnodeServer,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum CloseReason {
PeerEof,
LocalClose,
IoError,
Timeout,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum PeerDownReason {
FailureDetector,
AutoEjected,
Reconfigured,
Leaving,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ServerEvent {
PeerUp(PeerId),
PeerDown {
peer: PeerId,
reason: PeerDownReason,
},
ConfigReloaded {
generation: u64,
},
GossipRound {
round: u64,
peers: u32,
},
AutoEjected {
peer: PeerId,
failures: u32,
},
RepairTriggered {
key_hash: u64,
dc: String,
},
ConnectionAccepted {
conn_id: ConnId,
role: ConnRoleTag,
local_addr: Option<SocketAddr>,
},
ConnectionClosed {
conn_id: ConnId,
reason: CloseReason,
},
Lagged {
missed: u64,
},
}
#[derive(Debug, Clone)]
pub struct EventBus {
tx: broadcast::Sender<ServerEvent>,
}
impl EventBus {
#[must_use]
pub fn new(capacity: usize) -> Self {
let cap = capacity.max(1);
let (tx, _) = broadcast::channel(cap);
Self { tx }
}
#[must_use]
pub fn subscribe(&self) -> EventStream {
EventStream {
rx: self.tx.subscribe(),
}
}
pub(crate) fn send(&self, event: ServerEvent) -> usize {
self.tx.send(event).unwrap_or(0)
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
}
#[derive(Debug)]
pub struct EventStream {
rx: broadcast::Receiver<ServerEvent>,
}
impl EventStream {
pub async fn recv(&mut self) -> Option<ServerEvent> {
match self.rx.recv().await {
Ok(evt) => Some(evt),
Err(broadcast::error::RecvError::Closed) => None,
Err(broadcast::error::RecvError::Lagged(missed)) => {
Some(ServerEvent::Lagged { missed })
}
}
}
pub fn try_recv(&mut self) -> Option<ServerEvent> {
match self.rx.try_recv() {
Ok(evt) => Some(evt),
Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => {
None
}
Err(broadcast::error::TryRecvError::Lagged(missed)) => {
Some(ServerEvent::Lagged { missed })
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn round_trip_ok() {
let bus = EventBus::new(4);
let mut s = bus.subscribe();
bus.send(ServerEvent::PeerUp(1));
let evt = s.recv().await.unwrap();
assert!(matches!(evt, ServerEvent::PeerUp(1)));
}
#[tokio::test]
async fn lagged_synthesised() {
let bus = EventBus::new(2);
let mut s = bus.subscribe();
for i in 0..8u64 {
bus.send(ServerEvent::ConfigReloaded { generation: i });
}
let first = s.recv().await.unwrap();
assert!(matches!(
first,
ServerEvent::Lagged { .. } | ServerEvent::ConfigReloaded { .. }
));
}
#[test]
fn try_recv_empty() {
let bus = EventBus::new(2);
let mut s = bus.subscribe();
assert!(s.try_recv().is_none());
}
}