asim/network/link.rs
1use crate::time::Duration;
2use crate::TaskRunner;
3
4use std::cmp::Ordering as CmpOrdering;
5use std::rc::{Rc, Weak as WeakRc};
6use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7
8use crate::network::{Latency, NetworkMessage, Process, ProcessId};
9
10/// Each link consists of two messages queues, one for each direction
11pub struct Link<Message: NetworkMessage> {
12 queue1: Rc<LinkQueue<Message>>,
13 queue2: Rc<LinkQueue<Message>>,
14
15 task_runner: Rc<TaskRunner>,
16
17 active_queues: AtomicU32,
18}
19
20impl<Message: NetworkMessage> Link<Message> {
21 pub(super) fn new(
22 latency: Latency,
23 process1: WeakRc<Process<Message>>,
24 process2: WeakRc<Process<Message>>,
25 task_runner: Rc<TaskRunner>,
26 ) -> Self {
27 let queue1 = Rc::new(LinkQueue::new(latency, process1.clone(), process2.clone()));
28
29 let queue2 = Rc::new(LinkQueue::new(latency, process2, process1));
30
31 let active_queues = AtomicU32::new(0);
32
33 Self {
34 queue1,
35 queue2,
36 task_runner,
37 active_queues,
38 }
39 }
40
41 /// Get the two processs connected with this link
42 /// Always sorted by smallest id first
43 pub fn get_processes(&self) -> (Rc<Process<Message>>, Rc<Process<Message>>) {
44 let process1 = self.queue1.get_source();
45 let process2 = self.queue1.get_destination();
46
47 match process1.get_identifier().cmp(&process2.get_identifier()) {
48 CmpOrdering::Less => (process1, process2),
49 CmpOrdering::Greater => (process2, process1),
50 CmpOrdering::Equal => panic!("Invalid state: src and dst process are the same"),
51 }
52 }
53
54 pub fn send(self_ptr: &Rc<Link<Message>>, source: ProcessId, message: Message) {
55 if self_ptr.queue1.get_source().get_identifier() == source {
56 LinkQueue::send(self_ptr.queue1.clone(), self_ptr.clone(), message);
57 } else if self_ptr.queue2.get_source().get_identifier() == source {
58 LinkQueue::send(self_ptr.queue2.clone(), self_ptr.clone(), message);
59 } else {
60 panic!("Invalid state");
61 }
62 }
63
64 /// Get the number of all messages ever sent through this link
65 pub fn num_total_messages(&self) -> u64 {
66 self.queue1.total_message_count.load(Ordering::SeqCst)
67 + self.queue2.total_message_count.load(Ordering::SeqCst)
68 }
69}
70
71struct LinkQueue<Message: NetworkMessage> {
72 latency: Duration,
73
74 source: WeakRc<Process<Message>>,
75 dest: WeakRc<Process<Message>>,
76
77 current_message_count: AtomicU32,
78 total_message_count: AtomicU64,
79}
80
81impl<Message: NetworkMessage> LinkQueue<Message> {
82 fn new(
83 latency: Latency,
84 source: WeakRc<Process<Message>>,
85 dest: WeakRc<Process<Message>>,
86 ) -> Self {
87 let current_message_count = AtomicU32::new(0);
88 let total_message_count = AtomicU64::new(0);
89
90 Self {
91 latency,
92 total_message_count,
93 source,
94 dest,
95 current_message_count,
96 }
97 }
98
99 fn send(
100 self_ptr: Rc<LinkQueue<Message>>,
101 link: Rc<Link<Message>>,
102 message: Message,
103 ) -> (bool, Duration) {
104 let task_runner = link.task_runner.clone();
105
106 let latency = self_ptr.latency;
107
108 let was_empty = {
109 self_ptr.total_message_count.fetch_add(1, Ordering::SeqCst);
110 let prev = self_ptr
111 .current_message_count
112 .fetch_add(1, Ordering::SeqCst);
113 prev == 0
114 };
115
116 if was_empty {
117 link.active_queues.fetch_add(1, Ordering::SeqCst);
118 }
119
120 task_runner.spawn(async move {
121 //TODO re-add link bandwidth
122
123 let notify_delivery_fn = {
124 let self_ptr = self_ptr.clone();
125 let link = link.clone();
126
127 Box::new(move || {
128 let prev = self_ptr
129 .current_message_count
130 .fetch_sub(1, Ordering::SeqCst);
131 assert!(prev > 0);
132
133 if prev == 1 {
134 link.active_queues.fetch_sub(1, Ordering::SeqCst);
135 }
136 })
137 };
138
139 let dst = self_ptr.get_destination();
140 dst.deliver_message(
141 self_ptr.get_source().get_identifier(),
142 message,
143 notify_delivery_fn,
144 );
145 });
146
147 (was_empty, latency)
148 }
149
150 fn get_source(&self) -> Rc<Process<Message>> {
151 self.source.upgrade().unwrap()
152 }
153
154 fn get_destination(&self) -> Rc<Process<Message>> {
155 self.dest.upgrade().unwrap()
156 }
157}
158
159/*
160#[cfg(test)]
161mod tests {
162 use std::rc::Rc;
163 use std::sync::mpsc;
164
165 use desim::TaskRunner;
166
167 use crate::events::{Event, LinkEvent, EVENT_HANDLER};
168 use crate::logic::DummyLogic;
169 use crate::message::DummyMessage;
170 use crate::process::Node;
171 use crate::Location;
172
173 use task_runner::time::Duration;
174 use task_runner::Timer;
175
176 use super::Link;
177
178 fn get_events(event_receiver: &mpsc::Receiver<Event>) -> Vec<Event> {
179 let mut result = vec![];
180
181 while let Ok(event) = event_receiver.try_recv() {
182 result.push(event);
183 }
184
185 result
186 }
187
188 #[test]
189 fn is_active() {
190 let timer = Rc::new(Timer::new());
191 let task_runner = Rc::new(TaskRunner::default());
192
193 let logic = Rc::new(DummyLogic::default());
194
195 let (event_sender, event_receiver) = mpsc::channel();
196
197 EVENT_HANDLER.with(|hdl| {
198 let mut handler = hdl.borrow_mut();
199 if handler.is_none() {
200 *handler = Some(event_sender);
201 }
202 });
203
204 let process1 = Node::new(
205 2,
206 0,
207 Location::default(),
208 1000,
209 timer.clone(),
210 &task_runner,
211 logic.clone(),
212 );
213
214 let process2 = Node::new(
215 2,
216 1,
217 Location::default(),
218 1000,
219 timer.clone(),
220 &task_runner,
221 logic,
222 );
223
224 let link = Rc::new(Link::new(
225 3,
226 timer.clone(),
227 None,
228 50,
229 process1.clone(),
230 process2.clone(),
231 task_runner.clone(),
232 ));
233
234 let events = get_events(&event_receiver);
235 assert!(events.is_empty());
236 Link::send(&link, 1, DummyMessage::default().into());
237
238 // Sending messages is a two step process (link latency + bandwidth)
239 task_runner.execute_tasks();
240
241 let events = get_events(&event_receiver);
242 assert_eq!(events.len(), 1);
243 assert_eq!(
244 Event::Link {
245 identifier: 3,
246 event: LinkEvent::Active
247 },
248 events[0]
249 );
250
251 timer.advance();
252 task_runner.execute_tasks();
253 task_runner.execute_tasks();
254
255 let events = get_events(&event_receiver);
256 assert_eq!(events.len(), 1);
257 assert_eq!(
258 Event::Link {
259 identifier: 3,
260 event: LinkEvent::Inactive
261 },
262 events[0]
263 );
264 }
265}*/