1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
use rf_distributed::mailbox::{Mailbox, Messages};
use rf_distributed::message::Message;
use std::collections::{BTreeMap, HashMap};
use std::time::SystemTime;

/// This struct is used as a factory for [Mailbox]es.
pub struct MailboxFactory;

impl MailboxFactory {
    /// Creates a new [MemoryLessMailbox] with a memory less message processing policy.
    ///For each neighbouring message, only the last one received is kept. This policy, from the user's viewpoint, acts similarly to
    /// the [MostRecent] version, but it is more memory efficient since the other messages received are substituted
    /// with the last received.
    pub fn memory_less() -> MemoryLessMailbox {
        MemoryLessMailbox::new()
    }

    /// Creates a new [TimeOrderedMailbox] with a most recent message processing policy.
    /// Keeps every message received from each neighbor, but returns only the most recent one. Used
    /// for processing in a LIFO order.
    pub fn most_recent() -> TimeOrderedMailbox {
        TimeOrderedMailbox {
            messages: HashMap::new(),
            pop_first: false,
        }
    }

    /// Creates a new [TimeOrderedMailbox] with a least recent message processing policy.
    /// Keeps every message received from each neighbor, but returns only the least recent one. Used
    /// for processing in a FIFO order.
    pub fn least_recent() -> TimeOrderedMailbox {
        TimeOrderedMailbox {
            messages: HashMap::new(),
            pop_first: true,
        }
    }
}

pub struct MemoryLessMailbox {
    messages: HashMap<i32, Message>,
}

impl MemoryLessMailbox {
    pub fn new() -> Self {
        MemoryLessMailbox {
            messages: HashMap::new(),
        }
    }
}

impl Default for MemoryLessMailbox {
    fn default() -> Self {
        MemoryLessMailbox::new()
    }
}

impl Mailbox for MemoryLessMailbox {
    fn enqueue(&mut self, msg: Message) {
        self.messages.insert(msg.source, msg);
    }

    fn messages(&mut self) -> Messages {
        self.messages.clone()
    }
}

pub struct TimeOrderedMailbox {
    messages: HashMap<i32, BTreeMap<SystemTime, Message>>,
    pop_first: bool,
}

impl Mailbox for TimeOrderedMailbox {
    fn enqueue(&mut self, msg: Message) {
        let msgs = self.messages.entry(msg.source).or_default();
        msgs.insert(msg.timestamp, msg);
    }

    fn messages(&mut self) -> Messages {
        let mut messages = HashMap::new();
        for (id, msgs) in self.messages.iter_mut() {
            if self.pop_first {
                //get the first entry of the BTreeMap
                if let Some((_, msg)) = msgs.pop_first() {
                    messages.insert(*id, msg.clone());
                }
            } else {
                //get the last entry of the BTreeMap
                if let Some((_, msg)) = msgs.pop_last() {
                    messages.insert(*id, msg.clone());
                }
            }
        }
        messages
    }
}

#[cfg(test)]
mod test {
    use crate::mailbox::MailboxFactory;
    use rf_core::export;
    use rf_core::export::Export;
    use rf_core::path::Path;
    use rf_distributed::mailbox::Mailbox;
    use rf_distributed::message::Message;
    use std::any::Any;
    use std::collections::HashMap;
    use std::time::SystemTime;

    #[test]
    fn test_memory_less() {
        let mut mailbox = MailboxFactory::memory_less();
        let export_2 = export!((Path::new(), 2));
        let export_3 = export!((Path::new(), 3));
        let msg_2 = Message::new(2, export_2.clone(), SystemTime::now());
        let msg_3 = Message::new(3, export_3.clone(), SystemTime::now());
        mailbox.enqueue(msg_2.clone());
        mailbox.enqueue(msg_3.clone());
        let messages = mailbox.messages();
        assert_eq!(messages, HashMap::from([(2, msg_2), (3, msg_3.clone())]));

        // update msg_2
        let new_export_2 = export!((Path::new(), 2 + 2));
        let new_msg_2 = Message::new(2, new_export_2, SystemTime::now());
        mailbox.enqueue(new_msg_2.clone());
        let messages = mailbox.messages();
        assert_eq!(messages, HashMap::from([(2, new_msg_2), (3, msg_3)]));
    }

    #[test]
    fn test_most_recent() {
        let mut mailbox = MailboxFactory::most_recent();

        // add the first round of messages
        let export_2 = export!((Path::new(), 2));
        let export_3 = export!((Path::new(), 3));
        let msg_2 = Message::new(2, export_2.clone(), SystemTime::now());
        let msg_3 = Message::new(3, export_3.clone(), SystemTime::now());
        mailbox.enqueue(msg_2.clone());
        mailbox.enqueue(msg_3.clone());

        // add the second round of messages
        let new_export_2 = export!((Path::new(), 2 + 2));
        let new_msg_2 = Message::new(2, new_export_2, SystemTime::now());
        let new_export_3 = export!((Path::new(), 3 + 3));
        let new_msg_3 = Message::new(3, new_export_3, SystemTime::now());
        mailbox.enqueue(new_msg_2.clone());
        mailbox.enqueue(new_msg_3.clone());

        // pop once
        let messages = mailbox.messages();
        assert_eq!(messages, HashMap::from([(2, new_msg_2), (3, new_msg_3)]));

        // pop the second time
        let messages = mailbox.messages();
        assert_eq!(messages, HashMap::from([(2, msg_2), (3, msg_3)]));
    }

    #[test]
    fn test_least_recent() {
        let mut mailbox = MailboxFactory::least_recent();

        // add the first round of messages
        let export_2 = export!((Path::new(), 2));
        let export_3 = export!((Path::new(), 3));
        let msg_2 = Message::new(2, export_2.clone(), SystemTime::now());
        let msg_3 = Message::new(3, export_3.clone(), SystemTime::now());
        mailbox.enqueue(msg_2.clone());
        mailbox.enqueue(msg_3.clone());

        // add the second round of messages
        let new_export_2 = export!((Path::new(), 2 + 2));
        let new_msg_2 = Message::new(2, new_export_2, SystemTime::now());
        let new_export_3 = export!((Path::new(), 3 + 3));
        let new_msg_3 = Message::new(3, new_export_3, SystemTime::now());
        mailbox.enqueue(new_msg_2.clone());
        mailbox.enqueue(new_msg_3.clone());

        // pop once
        let messages = mailbox.messages();
        assert_eq!(messages, HashMap::from([(2, msg_2), (3, msg_3)]));

        // pop the second time
        let messages = mailbox.messages();
        assert_eq!(messages, HashMap::from([(2, new_msg_2), (3, new_msg_3)]));
    }
}