Skip to main content

rns_net/
event.rs

1//! Event types for the driver loop — concrete sync instantiation.
2
3pub use crate::common::event::{
4    BackboneInterfaceEntry, BackbonePeerHookEvent, BackbonePeerPoolMemberStatus,
5    BackbonePeerPoolStatus, BackbonePeerStateEntry, BlackholeInfo, DrainStatus, HolePunchPolicy,
6    HookInfo, InterfaceStatsResponse, KnownDestinationEntry, LifecycleState, LinkInfoEntry,
7    LocalDestinationEntry, NextHopResponse, PathTableEntry, ProviderBridgeConsumerStats,
8    ProviderBridgeStats, QueryRequest, QueryResponse, RateTableEntry, ResourceInfoEntry,
9    RuntimeConfigApplyMode, RuntimeConfigEntry, RuntimeConfigError, RuntimeConfigErrorCode,
10    RuntimeConfigSource, RuntimeConfigValue, SingleInterfaceStat,
11};
12
13/// Concrete Event type using boxed sync Writer.
14pub type Event = crate::common::event::Event<Box<dyn crate::interface::Writer>>;
15
16pub const DEFAULT_EVENT_QUEUE_CAPACITY: usize = 8192;
17
18pub type EventSender = std::sync::mpsc::SyncSender<Event>;
19pub type EventReceiver = std::sync::mpsc::Receiver<Event>;
20
21pub fn channel() -> (EventSender, EventReceiver) {
22    channel_with_capacity(DEFAULT_EVENT_QUEUE_CAPACITY)
23}
24
25pub fn channel_with_capacity(capacity: usize) -> (EventSender, EventReceiver) {
26    std::sync::mpsc::sync_channel(capacity.max(1))
27}
28
29#[cfg(test)]
30mod tests {
31    use super::*;
32    use std::sync::mpsc::TrySendError;
33    use std::time::Duration;
34
35    #[test]
36    fn bounded_event_queue_backpressures_when_full() {
37        let (tx, rx) = channel_with_capacity(1);
38
39        tx.try_send(Event::Tick).unwrap();
40        match tx.try_send(Event::Shutdown) {
41            Err(TrySendError::Full(Event::Shutdown)) => {}
42            other => panic!("expected full queue for second event, got {other:?}"),
43        }
44
45        assert!(matches!(
46            rx.recv_timeout(Duration::from_secs(1)).unwrap(),
47            Event::Tick
48        ));
49        tx.try_send(Event::Shutdown).unwrap();
50        assert!(matches!(
51            rx.recv_timeout(Duration::from_secs(1)).unwrap(),
52            Event::Shutdown
53        ));
54    }
55}