asim/network/
link.rs

1use crate::time::Duration;
2use crate::TaskRunner;
3
4use std::cmp::Ordering as CmpOrdering;
5use std::rc::{Rc, Weak as WeakRc};
6use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7
8use crate::network::{Latency, NetworkMessage, Process, ProcessId};
9
10/// Each link consists of two messages queues, one for each direction
11pub struct Link<Message: NetworkMessage> {
12    queue1: Rc<LinkQueue<Message>>,
13    queue2: Rc<LinkQueue<Message>>,
14
15    task_runner: Rc<TaskRunner>,
16
17    active_queues: AtomicU32,
18}
19
20impl<Message: NetworkMessage> Link<Message> {
21    pub(super) fn new(
22        latency: Latency,
23        process1: WeakRc<Process<Message>>,
24        process2: WeakRc<Process<Message>>,
25        task_runner: Rc<TaskRunner>,
26    ) -> Self {
27        let queue1 = Rc::new(LinkQueue::new(latency, process1.clone(), process2.clone()));
28
29        let queue2 = Rc::new(LinkQueue::new(latency, process2, process1));
30
31        let active_queues = AtomicU32::new(0);
32
33        Self {
34            queue1,
35            queue2,
36            task_runner,
37            active_queues,
38        }
39    }
40
41    /// Get the two processs connected with this link
42    /// Always sorted by smallest id first
43    pub fn get_processes(&self) -> (Rc<Process<Message>>, Rc<Process<Message>>) {
44        let process1 = self.queue1.get_source();
45        let process2 = self.queue1.get_destination();
46
47        match process1.get_identifier().cmp(&process2.get_identifier()) {
48            CmpOrdering::Less => (process1, process2),
49            CmpOrdering::Greater => (process2, process1),
50            CmpOrdering::Equal => panic!("Invalid state: src and dst process are the same"),
51        }
52    }
53
54    pub fn send(self_ptr: &Rc<Link<Message>>, source: ProcessId, message: Message) {
55        if self_ptr.queue1.get_source().get_identifier() == source {
56            LinkQueue::send(self_ptr.queue1.clone(), self_ptr.clone(), message);
57        } else if self_ptr.queue2.get_source().get_identifier() == source {
58            LinkQueue::send(self_ptr.queue2.clone(), self_ptr.clone(), message);
59        } else {
60            panic!("Invalid state");
61        }
62    }
63
64    /// Get the number of all messages ever sent through this link
65    pub fn num_total_messages(&self) -> u64 {
66        self.queue1.total_message_count.load(Ordering::SeqCst)
67            + self.queue2.total_message_count.load(Ordering::SeqCst)
68    }
69}
70
71struct LinkQueue<Message: NetworkMessage> {
72    latency: Duration,
73
74    source: WeakRc<Process<Message>>,
75    dest: WeakRc<Process<Message>>,
76
77    current_message_count: AtomicU32,
78    total_message_count: AtomicU64,
79}
80
81impl<Message: NetworkMessage> LinkQueue<Message> {
82    fn new(
83        latency: Latency,
84        source: WeakRc<Process<Message>>,
85        dest: WeakRc<Process<Message>>,
86    ) -> Self {
87        let current_message_count = AtomicU32::new(0);
88        let total_message_count = AtomicU64::new(0);
89
90        Self {
91            latency,
92            total_message_count,
93            source,
94            dest,
95            current_message_count,
96        }
97    }
98
99    fn send(
100        self_ptr: Rc<LinkQueue<Message>>,
101        link: Rc<Link<Message>>,
102        message: Message,
103    ) -> (bool, Duration) {
104        let task_runner = link.task_runner.clone();
105
106        let latency = self_ptr.latency;
107
108        let was_empty = {
109            self_ptr.total_message_count.fetch_add(1, Ordering::SeqCst);
110            let prev = self_ptr
111                .current_message_count
112                .fetch_add(1, Ordering::SeqCst);
113            prev == 0
114        };
115
116        if was_empty {
117            link.active_queues.fetch_add(1, Ordering::SeqCst);
118        }
119
120        task_runner.spawn(async move {
121            //TODO re-add link bandwidth
122
123            let notify_delivery_fn = {
124                let self_ptr = self_ptr.clone();
125                let link = link.clone();
126
127                Box::new(move || {
128                    let prev = self_ptr
129                        .current_message_count
130                        .fetch_sub(1, Ordering::SeqCst);
131                    assert!(prev > 0);
132
133                    if prev == 1 {
134                        link.active_queues.fetch_sub(1, Ordering::SeqCst);
135                    }
136                })
137            };
138
139            let dst = self_ptr.get_destination();
140            dst.deliver_message(
141                self_ptr.get_source().get_identifier(),
142                message,
143                notify_delivery_fn,
144            );
145        });
146
147        (was_empty, latency)
148    }
149
150    fn get_source(&self) -> Rc<Process<Message>> {
151        self.source.upgrade().unwrap()
152    }
153
154    fn get_destination(&self) -> Rc<Process<Message>> {
155        self.dest.upgrade().unwrap()
156    }
157}
158
159/*
160#[cfg(test)]
161mod tests {
162    use std::rc::Rc;
163    use std::sync::mpsc;
164
165    use desim::TaskRunner;
166
167    use crate::events::{Event, LinkEvent, EVENT_HANDLER};
168    use crate::logic::DummyLogic;
169    use crate::message::DummyMessage;
170    use crate::process::Node;
171    use crate::Location;
172
173    use task_runner::time::Duration;
174    use task_runner::Timer;
175
176    use super::Link;
177
178    fn get_events(event_receiver: &mpsc::Receiver<Event>) -> Vec<Event> {
179        let mut result = vec![];
180
181        while let Ok(event) = event_receiver.try_recv() {
182            result.push(event);
183        }
184
185        result
186    }
187
188    #[test]
189    fn is_active() {
190        let timer = Rc::new(Timer::new());
191        let task_runner = Rc::new(TaskRunner::default());
192
193        let logic = Rc::new(DummyLogic::default());
194
195        let (event_sender, event_receiver) = mpsc::channel();
196
197        EVENT_HANDLER.with(|hdl| {
198            let mut handler = hdl.borrow_mut();
199            if handler.is_none() {
200                *handler = Some(event_sender);
201            }
202        });
203
204        let process1 = Node::new(
205            2,
206            0,
207            Location::default(),
208            1000,
209            timer.clone(),
210            &task_runner,
211            logic.clone(),
212        );
213
214        let process2 = Node::new(
215            2,
216            1,
217            Location::default(),
218            1000,
219            timer.clone(),
220            &task_runner,
221            logic,
222        );
223
224        let link = Rc::new(Link::new(
225            3,
226            timer.clone(),
227            None,
228            50,
229            process1.clone(),
230            process2.clone(),
231            task_runner.clone(),
232        ));
233
234        let events = get_events(&event_receiver);
235        assert!(events.is_empty());
236        Link::send(&link, 1, DummyMessage::default().into());
237
238        // Sending messages is a two step process (link latency + bandwidth)
239        task_runner.execute_tasks();
240
241        let events = get_events(&event_receiver);
242        assert_eq!(events.len(), 1);
243        assert_eq!(
244            Event::Link {
245                identifier: 3,
246                event: LinkEvent::Active
247            },
248            events[0]
249        );
250
251        timer.advance();
252        task_runner.execute_tasks();
253        task_runner.execute_tasks();
254
255        let events = get_events(&event_receiver);
256        assert_eq!(events.len(), 1);
257        assert_eq!(
258            Event::Link {
259                identifier: 3,
260                event: LinkEvent::Inactive
261            },
262            events[0]
263        );
264    }
265}*/