netropy/
iface.rs

1use crate::{
2    error::SimError,
3    link::Packet,
4    network::NodeId,
5    time::{ScheduledEvent, SimTime},
6};
7use bytes::Bytes;
8use std::{
9    collections::BinaryHeap,
10    sync::{Arc, atomic::{AtomicU64, Ordering}},
11    time::Duration,
12};
13use tokio::sync::{mpsc::UnboundedReceiver, Mutex, RwLock};
14use crate::link::LinkConfig;
15
16static CURRENT_PACKET_ID: AtomicU64 = AtomicU64::new(0);
17
18pub enum SimCommand {
19    Shutdown,
20}
21
22pub struct SimInterface {
23    me: NodeId,
24    rx: UnboundedReceiver<Packet>,
25    links: Arc<RwLock<Vec<(NodeId, NodeId, LinkConfig)>>>,
26    queue: Arc<Mutex<BinaryHeap<ScheduledEvent<Packet>>>>,
27    now: SimTime,
28}
29
30impl SimInterface {
31    pub(crate) fn new(
32        me: NodeId,
33        rx: UnboundedReceiver<Packet>,
34        links: Arc<RwLock<Vec<(NodeId, NodeId, LinkConfig)>>>,
35        queue: Arc<Mutex<BinaryHeap<ScheduledEvent<Packet>>>>,
36        now: SimTime,
37    ) -> Self {
38        Self { me, rx, links, queue, now }
39    }
40
41    pub async fn send(
42        &self,
43        dst: &NodeId,
44        data: Bytes,
45    ) -> Result<(), SimError> {
46        let link_read = self.links.read().await;
47        let cfg = link_read
48            .iter()
49            .find(|(a, b, _)| &self.me == a && dst == b)
50            .map(|(_, _, cfg)| cfg.clone())
51            .ok_or_else(|| SimError::UnknownNode(dst.0.clone()))?;
52
53        let jitter_ns = rand::random::<u64>() % cfg.jitter.as_nanos() as u64;
54        let at = self
55            .now
56            .checked_add(cfg.latency)
57            .and_then(|t| t.checked_add(Duration::from_nanos(jitter_ns)))
58            .unwrap();
59
60        CURRENT_PACKET_ID.fetch_add(1, Ordering::SeqCst);
61
62        let pkt = Packet {
63            src: self.me.clone(),
64            dst: dst.clone(),
65            data,
66            at,
67            id: CURRENT_PACKET_ID.load(Ordering::SeqCst).to_string()
68        };
69
70        let mut q = self.queue.lock().await;
71        Self::packet_log(&format!("tx {pkt:?}"));
72        q.push(ScheduledEvent { when: at, payload: pkt });
73        Ok(())
74    }
75
76    pub async fn recv(&mut self) -> Result<(NodeId, Bytes), SimError> {
77        match self.rx.recv().await {
78            Some(pkt) => Ok((pkt.src, pkt.data)),
79            None => Err(SimError::SimulationEnded),
80        }
81    }
82
83    pub async fn sleep(&self, dur: Duration) {
84        tokio::time::sleep(dur).await;
85    }
86
87    #[cfg(feature = "packet_tracing")]
88    fn packet_log(msg: &str) {
89        tracing::debug!("{msg}");
90    }
91
92    #[cfg(not(feature = "packet_tracing"))]
93    fn packet_log(msg: &str) {
94    }
95}