1use crate::{
2 error::SimError,
3 link::Packet,
4 network::NodeId,
5 time::{ScheduledEvent, SimTime},
6};
7use bytes::Bytes;
8use std::{
9 collections::BinaryHeap,
10 sync::{Arc, atomic::{AtomicU64, Ordering}},
11 time::Duration,
12};
13use tokio::sync::{mpsc::UnboundedReceiver, Mutex, RwLock};
14use crate::link::LinkConfig;
15
16static CURRENT_PACKET_ID: AtomicU64 = AtomicU64::new(0);
17
18pub enum SimCommand {
19 Shutdown,
20}
21
22pub struct SimInterface {
23 me: NodeId,
24 rx: UnboundedReceiver<Packet>,
25 links: Arc<RwLock<Vec<(NodeId, NodeId, LinkConfig)>>>,
26 queue: Arc<Mutex<BinaryHeap<ScheduledEvent<Packet>>>>,
27 now: SimTime,
28}
29
30impl SimInterface {
31 pub(crate) fn new(
32 me: NodeId,
33 rx: UnboundedReceiver<Packet>,
34 links: Arc<RwLock<Vec<(NodeId, NodeId, LinkConfig)>>>,
35 queue: Arc<Mutex<BinaryHeap<ScheduledEvent<Packet>>>>,
36 now: SimTime,
37 ) -> Self {
38 Self { me, rx, links, queue, now }
39 }
40
41 pub async fn send(
42 &self,
43 dst: &NodeId,
44 data: Bytes,
45 ) -> Result<(), SimError> {
46 let link_read = self.links.read().await;
47 let cfg = link_read
48 .iter()
49 .find(|(a, b, _)| &self.me == a && dst == b)
50 .map(|(_, _, cfg)| cfg.clone())
51 .ok_or_else(|| SimError::UnknownNode(dst.0.clone()))?;
52
53 let jitter_ns = rand::random::<u64>() % cfg.jitter.as_nanos() as u64;
54 let at = self
55 .now
56 .checked_add(cfg.latency)
57 .and_then(|t| t.checked_add(Duration::from_nanos(jitter_ns)))
58 .unwrap();
59
60 CURRENT_PACKET_ID.fetch_add(1, Ordering::SeqCst);
61
62 let pkt = Packet {
63 src: self.me.clone(),
64 dst: dst.clone(),
65 data,
66 at,
67 id: CURRENT_PACKET_ID.load(Ordering::SeqCst).to_string()
68 };
69
70 let mut q = self.queue.lock().await;
71 Self::packet_log(&format!("tx {pkt:?}"));
72 q.push(ScheduledEvent { when: at, payload: pkt });
73 Ok(())
74 }
75
76 pub async fn recv(&mut self) -> Result<(NodeId, Bytes), SimError> {
77 match self.rx.recv().await {
78 Some(pkt) => Ok((pkt.src, pkt.data)),
79 None => Err(SimError::SimulationEnded),
80 }
81 }
82
83 pub async fn sleep(&self, dur: Duration) {
84 tokio::time::sleep(dur).await;
85 }
86
87 #[cfg(feature = "packet_tracing")]
88 fn packet_log(msg: &str) {
89 tracing::debug!("{msg}");
90 }
91
92 #[cfg(not(feature = "packet_tracing"))]
93 fn packet_log(msg: &str) {
94 }
95}