rf_distributed_impl/
mailbox.rs

1use rf_distributed::mailbox::{Mailbox, Messages};
2use rf_distributed::message::Message;
3use std::collections::{BTreeMap, HashMap};
4use std::time::SystemTime;
5
6/// This struct is used as a factory for [Mailbox]es.
7pub struct MailboxFactory;
8
9impl MailboxFactory {
10    /// Creates a new [MemoryLessMailbox] with a memory less message processing policy.
11    ///For each neighbouring message, only the last one received is kept. This policy, from the user's viewpoint, acts similarly to
12    /// the [MostRecent] version, but it is more memory efficient since the other messages received are substituted
13    /// with the last received.
14    pub fn memory_less() -> MemoryLessMailbox {
15        MemoryLessMailbox::new()
16    }
17
18    /// Creates a new [TimeOrderedMailbox] with a most recent message processing policy.
19    /// Keeps every message received from each neighbor, but returns only the most recent one. Used
20    /// for processing in a LIFO order.
21    pub fn most_recent() -> TimeOrderedMailbox {
22        TimeOrderedMailbox {
23            messages: HashMap::new(),
24            pop_first: false,
25        }
26    }
27
28    /// Creates a new [TimeOrderedMailbox] with a least recent message processing policy.
29    /// Keeps every message received from each neighbor, but returns only the least recent one. Used
30    /// for processing in a FIFO order.
31    pub fn least_recent() -> TimeOrderedMailbox {
32        TimeOrderedMailbox {
33            messages: HashMap::new(),
34            pop_first: true,
35        }
36    }
37}
38
39pub struct MemoryLessMailbox {
40    messages: HashMap<i32, Message>,
41}
42
43impl MemoryLessMailbox {
44    pub fn new() -> Self {
45        MemoryLessMailbox {
46            messages: HashMap::new(),
47        }
48    }
49}
50
51impl Default for MemoryLessMailbox {
52    fn default() -> Self {
53        MemoryLessMailbox::new()
54    }
55}
56
57impl Mailbox for MemoryLessMailbox {
58    fn enqueue(&mut self, msg: Message) {
59        self.messages.insert(msg.source, msg);
60    }
61
62    fn messages(&mut self) -> Messages {
63        self.messages.clone()
64    }
65}
66
67pub struct TimeOrderedMailbox {
68    messages: HashMap<i32, BTreeMap<SystemTime, Message>>,
69    pop_first: bool,
70}
71
72impl Mailbox for TimeOrderedMailbox {
73    fn enqueue(&mut self, msg: Message) {
74        let msgs = self.messages.entry(msg.source).or_default();
75        msgs.insert(msg.timestamp, msg);
76    }
77
78    fn messages(&mut self) -> Messages {
79        let mut messages = HashMap::new();
80        for (id, msgs) in self.messages.iter_mut() {
81            if self.pop_first {
82                //get the first entry of the BTreeMap
83                if let Some((_, msg)) = msgs.pop_first() {
84                    messages.insert(*id, msg.clone());
85                }
86            } else {
87                //get the last entry of the BTreeMap
88                if let Some((_, msg)) = msgs.pop_last() {
89                    messages.insert(*id, msg.clone());
90                }
91            }
92        }
93        messages
94    }
95}
96
97#[cfg(test)]
98mod test {
99    use crate::mailbox::MailboxFactory;
100    use rf_core::export;
101    use rf_core::export::Export;
102    use rf_core::path::Path;
103    use rf_distributed::mailbox::Mailbox;
104    use rf_distributed::message::Message;
105    use std::any::Any;
106    use std::collections::HashMap;
107    use std::time::SystemTime;
108
109    #[test]
110    fn test_memory_less() {
111        let mut mailbox = MailboxFactory::memory_less();
112        let export_2 = export!((Path::new(), 2));
113        let export_3 = export!((Path::new(), 3));
114        let msg_2 = Message::new(2, export_2.clone(), SystemTime::now());
115        let msg_3 = Message::new(3, export_3.clone(), SystemTime::now());
116        mailbox.enqueue(msg_2.clone());
117        mailbox.enqueue(msg_3.clone());
118        let messages = mailbox.messages();
119        assert_eq!(messages, HashMap::from([(2, msg_2), (3, msg_3.clone())]));
120
121        // update msg_2
122        let new_export_2 = export!((Path::new(), 2 + 2));
123        let new_msg_2 = Message::new(2, new_export_2, SystemTime::now());
124        mailbox.enqueue(new_msg_2.clone());
125        let messages = mailbox.messages();
126        assert_eq!(messages, HashMap::from([(2, new_msg_2), (3, msg_3)]));
127    }
128
129    #[test]
130    fn test_most_recent() {
131        let mut mailbox = MailboxFactory::most_recent();
132
133        // add the first round of messages
134        let export_2 = export!((Path::new(), 2));
135        let export_3 = export!((Path::new(), 3));
136        let msg_2 = Message::new(2, export_2.clone(), SystemTime::now());
137        let msg_3 = Message::new(3, export_3.clone(), SystemTime::now());
138        mailbox.enqueue(msg_2.clone());
139        mailbox.enqueue(msg_3.clone());
140
141        // add the second round of messages
142        let new_export_2 = export!((Path::new(), 2 + 2));
143        let new_msg_2 = Message::new(2, new_export_2, SystemTime::now());
144        let new_export_3 = export!((Path::new(), 3 + 3));
145        let new_msg_3 = Message::new(3, new_export_3, SystemTime::now());
146        mailbox.enqueue(new_msg_2.clone());
147        mailbox.enqueue(new_msg_3.clone());
148
149        // pop once
150        let messages = mailbox.messages();
151        assert_eq!(messages, HashMap::from([(2, new_msg_2), (3, new_msg_3)]));
152
153        // pop the second time
154        let messages = mailbox.messages();
155        assert_eq!(messages, HashMap::from([(2, msg_2), (3, msg_3)]));
156    }
157
158    #[test]
159    fn test_least_recent() {
160        let mut mailbox = MailboxFactory::least_recent();
161
162        // add the first round of messages
163        let export_2 = export!((Path::new(), 2));
164        let export_3 = export!((Path::new(), 3));
165        let msg_2 = Message::new(2, export_2.clone(), SystemTime::now());
166        let msg_3 = Message::new(3, export_3.clone(), SystemTime::now());
167        mailbox.enqueue(msg_2.clone());
168        mailbox.enqueue(msg_3.clone());
169
170        // add the second round of messages
171        let new_export_2 = export!((Path::new(), 2 + 2));
172        let new_msg_2 = Message::new(2, new_export_2, SystemTime::now());
173        let new_export_3 = export!((Path::new(), 3 + 3));
174        let new_msg_3 = Message::new(3, new_export_3, SystemTime::now());
175        mailbox.enqueue(new_msg_2.clone());
176        mailbox.enqueue(new_msg_3.clone());
177
178        // pop once
179        let messages = mailbox.messages();
180        assert_eq!(messages, HashMap::from([(2, msg_2), (3, msg_3)]));
181
182        // pop the second time
183        let messages = mailbox.messages();
184        assert_eq!(messages, HashMap::from([(2, new_msg_2), (3, new_msg_3)]));
185    }
186}