stateright/actor/
network.rs

1//! Private module for selective re-export.
2
3// This can be made more efficient by introducing a `Network` trait once
4// https://github.com/rust-lang/rust/issues/44265 stabilizes, enabling a
5// `Network::Iterator<'a>` type constructor.
6//
7// ```
8// trait Network<Msg> {
9//     type Iterator<'a> ...
10//     ...
11// }
12// ```
13
14use crate::actor::Id;
15use crate::util::{HashableHashMap, HashableHashSet};
16use crate::{Rewrite, RewritePlan};
17use std::collections::btree_map::Entry;
18use std::collections::{btree_map, hash_map, hash_set};
19use std::collections::{BTreeMap, VecDeque};
20use std::hash::Hash;
21use std::str::FromStr;
22
23/// Indicates the source and destination for a message.
24#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, serde::Serialize)]
25pub struct Envelope<Msg> {
26    pub src: Id,
27    pub dst: Id,
28    pub msg: Msg,
29}
30
31impl<Msg> Envelope<&Msg> {
32    /// Converts an [`Envelope`] with a message reference to one that owns the message.
33    pub fn to_cloned_msg(&self) -> Envelope<Msg>
34    where
35        Msg: Clone,
36    {
37        Envelope {
38            src: self.src,
39            dst: self.dst,
40            msg: self.msg.clone(),
41        }
42    }
43}
44
45/// Represents a network of messages.
46#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, serde::Serialize)]
47pub enum Network<Msg>
48where
49    Msg: Eq + Hash,
50{
51    /// Indicates that messages have no ordering (racing one another), and can be redelivered.
52    UnorderedDuplicating(HashableHashSet<Envelope<Msg>>, Option<Envelope<Msg>>),
53
54    /// Indicates that messages have no ordering (racing one another), and will not be redelivered.
55    UnorderedNonDuplicating(HashableHashMap<Envelope<Msg>, usize>),
56
57    /// Indicates that directed message flows between pairs of actors are ordered. Does not
58    /// indicate any ordering across different flows. Each direction for a pair of actors is a
59    /// different flow.
60    ///
61    /// # See Also
62    ///
63    /// The [`ordered_reliable_link`] module partially implements this contract as long as actors do
64    /// not restart. A later version of the module and checker will account for actor restarts.
65    ///
66    /// [`ordered_reliable_link`]: crate::actor::ordered_reliable_link
67    Ordered(BTreeMap<(Id, Id), VecDeque<Msg>>),
68}
69
70impl<Msg> Network<Msg>
71where
72    Msg: Eq + Hash,
73{
74    /// Indicates that directed message flows between pairs of actors are ordered. Does not
75    /// indicate any ordering across different flows. Each direction for a pair of actors is a
76    /// different flow.
77    ///
78    /// # See Also
79    ///
80    /// The [`ordered_reliable_link`] module partially implements this contract as long as actors do
81    /// not restart. A later version of the module and checker will account for actor restarts.
82    ///
83    /// [`ordered_reliable_link`]: crate::actor::ordered_reliable_link
84    pub fn new_ordered(envelopes: impl IntoIterator<Item = Envelope<Msg>>) -> Self {
85        let mut this = Self::Ordered(BTreeMap::new());
86        for env in envelopes {
87            this.send(env);
88        }
89        this
90    }
91
92    /// Indicates that messages have no ordering (racing one another), and can be redelivered.
93    /// It also sets the last message delivered.
94    ///
95    /// See also: [`Self::new_unordered_duplicating`]
96    pub fn new_unordered_duplicating_with_last_msg(
97        envelopes: impl IntoIterator<Item = Envelope<Msg>>,
98        last_msg: Option<Envelope<Msg>>,
99    ) -> Self {
100        let mut this = Self::UnorderedDuplicating(
101            HashableHashSet::with_hasher(crate::stable::build_hasher()),
102            last_msg,
103        );
104        for env in envelopes {
105            this.send(env);
106        }
107        this
108    }
109
110    /// Indicates that messages have no ordering (racing one another), and can be redelivered.
111    ///
112    /// See also: [`Self::new_unordered_nonduplicating`]
113    pub fn new_unordered_duplicating(envelopes: impl IntoIterator<Item = Envelope<Msg>>) -> Self {
114        let mut this = Self::UnorderedDuplicating(
115            HashableHashSet::with_hasher(crate::stable::build_hasher()),
116            None,
117        );
118        for env in envelopes {
119            this.send(env);
120        }
121        this
122    }
123
124    /// Indicates that messages have no ordering (racing one another), and will not be redelivered.
125    ///
126    /// See also: [`Self::new_unordered_duplicating`]
127    pub fn new_unordered_nonduplicating(
128        envelopes: impl IntoIterator<Item = Envelope<Msg>>,
129    ) -> Self {
130        let mut this = Self::UnorderedNonDuplicating(HashableHashMap::with_hasher(
131            crate::stable::build_hasher(),
132        ));
133        for env in envelopes {
134            this.send(env);
135        }
136        this
137    }
138
139    /// Returns a vector of names that can be parsed using [`FromStr`].
140    pub fn names() -> Vec<&'static str> {
141        struct IterStr<Msg: Eq + Hash>(Option<Network<Msg>>);
142        impl<Msg: Eq + Hash> Iterator for IterStr<Msg> {
143            type Item = &'static str;
144            fn next(&mut self) -> Option<Self::Item> {
145                if let Some(network) = &self.0 {
146                    match network {
147                        Network::Ordered(_) => {
148                            self.0 = Some(Network::UnorderedDuplicating(Default::default(), None));
149                            Some("ordered")
150                        }
151                        Network::UnorderedDuplicating(_, _) => {
152                            self.0 = Some(Network::UnorderedNonDuplicating(Default::default()));
153                            Some("unordered_duplicating")
154                        }
155                        Network::UnorderedNonDuplicating(_) => {
156                            self.0 = None;
157                            Some("unordered_nonduplicating")
158                        }
159                    }
160                } else {
161                    None
162                }
163            }
164        }
165        IterStr::<Msg>(Some(Network::Ordered(Default::default()))).collect()
166    }
167
168    /// Returns an iterator over all envelopes in the network.
169    pub fn iter_all(&self) -> NetworkIter<Msg> {
170        match self {
171            Network::UnorderedDuplicating(set, _) => NetworkIter::UnorderedDuplicating(set.iter()),
172            Network::UnorderedNonDuplicating(multiset) => {
173                NetworkIter::UnorderedNonDuplicating(None, multiset.iter())
174            }
175            Network::Ordered(map) => NetworkIter::Ordered(None, map.iter()),
176        }
177    }
178
179    /// Returns an iterator over all distinct deliverable envelopes in the network.
180    pub fn iter_deliverable(&self) -> NetworkDeliverableIter<Msg> {
181        match self {
182            Network::UnorderedDuplicating(set, _) => {
183                NetworkDeliverableIter::UnorderedDuplicating(set.iter())
184            }
185            Network::UnorderedNonDuplicating(multiset) => {
186                NetworkDeliverableIter::UnorderedNonDuplicating(multiset.keys())
187            }
188            Network::Ordered(map) => NetworkDeliverableIter::Ordered(map.iter()),
189        }
190    }
191
192    /// Returns the number of messages in the network.
193    #[allow(clippy::len_without_is_empty)]
194    pub fn len(&self) -> usize {
195        match self {
196            Network::UnorderedDuplicating(set, _) => set.len(),
197            Network::UnorderedNonDuplicating(multiset) => multiset.values().sum(),
198            Network::Ordered(map) => map.values().map(VecDeque::len).sum(),
199        }
200    }
201
202    /// Sends a message.
203    pub(crate) fn send(&mut self, envelope: Envelope<Msg>) {
204        match self {
205            Network::UnorderedDuplicating(set, _) => {
206                set.insert(envelope);
207            }
208            Network::UnorderedNonDuplicating(multiset) => {
209                *multiset.entry(envelope).or_insert(0) += 1;
210            }
211            Network::Ordered(map) => {
212                map.entry((envelope.src, envelope.dst))
213                    .or_insert_with(|| VecDeque::with_capacity(1))
214                    .push_back(envelope.msg);
215            }
216        }
217    }
218
219    pub(crate) fn on_deliver(&mut self, envelope: Envelope<Msg>)
220    where
221        Msg: PartialEq,
222    {
223        match self {
224            Network::UnorderedDuplicating(_, last_msg) => {
225                // by remembering the last message delivered, a message that does not change the actor
226                // state can also produce a different fingerprint
227                last_msg.replace(envelope);
228            }
229            Network::UnorderedNonDuplicating(multiset) => match multiset.entry(envelope) {
230                hash_map::Entry::Occupied(mut entry) => {
231                    let value = *entry.get();
232                    assert!(value > 0);
233                    if value == 1 {
234                        entry.remove();
235                    } else {
236                        *entry.get_mut() -= 1;
237                    }
238                }
239                hash_map::Entry::Vacant(_) => {
240                    panic!("envelope not found");
241                }
242            },
243            Network::Ordered(map) => {
244                // Find the flow, then find the message in the flow, and finally remove the message
245                // from the flow. Flows must be non-empty (to ensure removing a message is the
246                // inverse of adding it), so also canonicalize by deleting the entire flow if it
247                // would be empty after removing the message.
248                let flow_entry = match map.entry((envelope.src, envelope.dst)) {
249                    Entry::Vacant(_) => panic!(
250                        "flow not found. src={:?}, dst={:?}",
251                        envelope.src, envelope.dst
252                    ),
253                    Entry::Occupied(flow) => flow,
254                };
255                let i = flow_entry
256                    .get()
257                    .iter()
258                    .position(|x| x == &envelope.msg)
259                    .expect("message not found");
260                if flow_entry.get().len() > 1 {
261                    flow_entry.into_mut().remove(i);
262                } else {
263                    flow_entry.remove();
264                }
265            }
266        }
267    }
268
269    pub(crate) fn on_drop(&mut self, envelope: Envelope<Msg>)
270    where
271        Msg: PartialEq,
272    {
273        match self {
274            Network::UnorderedDuplicating(set, _) => {
275                set.remove(&envelope);
276            }
277            Network::UnorderedNonDuplicating(multiset) => match multiset.entry(envelope) {
278                hash_map::Entry::Occupied(mut entry) => {
279                    let value = *entry.get();
280                    assert!(value > 0);
281                    if value == 1 {
282                        entry.remove();
283                    } else {
284                        *entry.get_mut() -= 1;
285                    }
286                }
287                hash_map::Entry::Vacant(_) => {
288                    panic!("envelope not found");
289                }
290            },
291            Network::Ordered(map) => {
292                // Find the flow, then find the message in the flow, and finally remove the message
293                // from the flow. Flows must be non-empty (to ensure removing a message is the
294                // inverse of adding it), so also canonicalize by deleting the entire flow if it
295                // would be empty after removing the message.
296                let flow_entry = match map.entry((envelope.src, envelope.dst)) {
297                    Entry::Vacant(_) => panic!(
298                        "flow not found. src={:?}, dst={:?}",
299                        envelope.src, envelope.dst
300                    ),
301                    Entry::Occupied(flow) => flow,
302                };
303                let i = flow_entry
304                    .get()
305                    .iter()
306                    .position(|x| x == &envelope.msg)
307                    .expect("message not found");
308                if flow_entry.get().len() > 1 {
309                    flow_entry.into_mut().remove(i);
310                } else {
311                    flow_entry.remove();
312                }
313            }
314        }
315    }
316}
317
318impl<Msg> FromStr for Network<Msg>
319where
320    Msg: Eq + Hash,
321{
322    type Err = String;
323    fn from_str(s: &str) -> Result<Self, Self::Err> {
324        match s {
325            "ordered" => Ok(Self::new_ordered([])),
326            "unordered_duplicating" => Ok(Self::new_unordered_duplicating([])),
327            "unordered_nonduplicating" => Ok(Self::new_unordered_nonduplicating([])),
328            _ => Err(format!("unable to parse network name: {s}")),
329        }
330    }
331}
332
333impl<Msg> Rewrite<Id> for Network<Msg>
334where
335    Msg: Eq + Hash + Rewrite<Id>,
336{
337    fn rewrite<S>(&self, plan: &RewritePlan<Id, S>) -> Self {
338        match self {
339            Network::UnorderedDuplicating(set, last_msg) => {
340                Network::UnorderedDuplicating(set.rewrite(plan), last_msg.rewrite(plan))
341            }
342            Network::UnorderedNonDuplicating(multiset) => {
343                Network::UnorderedNonDuplicating(multiset.rewrite(plan))
344            }
345            Network::Ordered(map) => Network::Ordered(map.rewrite(plan)),
346        }
347    }
348}
349
350pub enum NetworkIter<'a, Msg> {
351    UnorderedDuplicating(hash_set::Iter<'a, Envelope<Msg>>),
352    UnorderedNonDuplicating(
353        // active env/count to iterate over repeated sends
354        Option<(Envelope<&'a Msg>, usize)>,
355        std::collections::hash_map::Iter<'a, Envelope<Msg>, usize>,
356    ),
357    Ordered(
358        // active channel/cursor to iterate over all messages of a channel
359        Option<(Id, Id, &'a VecDeque<Msg>, usize)>,
360        btree_map::Iter<'a, (Id, Id), VecDeque<Msg>>,
361    ),
362}
363
364impl<'a, Msg> Iterator for NetworkIter<'a, Msg> {
365    type Item = Envelope<&'a Msg>;
366    fn next(&mut self) -> Option<Self::Item> {
367        match self {
368            NetworkIter::UnorderedDuplicating(it) => it.next().map(|env| Envelope {
369                src: env.src,
370                dst: env.dst,
371                msg: &env.msg,
372            }),
373            NetworkIter::UnorderedNonDuplicating(active, it) => {
374                if let Some((env, count)) = active {
375                    // invariant: count > 1
376                    let env = *env; // to avoid holding a reference inside active
377                    *count -= 1;
378                    if *count == 0 {
379                        *active = None;
380                    }
381                    return Some(env);
382                }
383                it.next().map(|(env, count)| {
384                    let env = Envelope {
385                        src: env.src,
386                        dst: env.dst,
387                        msg: &env.msg,
388                    };
389                    if *count > 1 {
390                        *active = Some((env, *count));
391                    }
392                    env
393                })
394            }
395            NetworkIter::Ordered(active, it) => {
396                if let Some((src, dst, messages, index)) = active {
397                    let msg = messages.get(*index).unwrap(); // messages.len() > 1
398                    return Some(Envelope {
399                        src: *src,
400                        dst: *dst,
401                        msg,
402                    });
403                }
404                it.next().map(|(&(src, dst), messages)| {
405                    let msg = messages.front().unwrap(); // messages.len() > 1
406                    *active = Some((src, dst, messages, 0));
407                    Envelope { src, dst, msg }
408                })
409            }
410        }
411    }
412}
413
414pub enum NetworkDeliverableIter<'a, Msg> {
415    UnorderedDuplicating(hash_set::Iter<'a, Envelope<Msg>>),
416    UnorderedNonDuplicating(hash_map::Keys<'a, Envelope<Msg>, usize>),
417    Ordered(btree_map::Iter<'a, (Id, Id), VecDeque<Msg>>),
418}
419
420impl<'a, Msg> Iterator for NetworkDeliverableIter<'a, Msg> {
421    type Item = Envelope<&'a Msg>;
422    fn next(&mut self) -> Option<Self::Item> {
423        match self {
424            NetworkDeliverableIter::UnorderedDuplicating(it) => it.next().map(|env| Envelope {
425                src: env.src,
426                dst: env.dst,
427                msg: &env.msg,
428            }),
429            NetworkDeliverableIter::UnorderedNonDuplicating(it) => it.next().map(|env| Envelope {
430                src: env.src,
431                dst: env.dst,
432                msg: &env.msg,
433            }),
434            NetworkDeliverableIter::Ordered(it) => it.next().map(|(&(src, dst), messages)| {
435                let msg = messages.front().expect("empty channel");
436                Envelope { src, dst, msg }
437            }),
438        }
439    }
440}
441
442#[cfg(test)]
443mod test {
444    use super::*;
445    use std::collections::BTreeSet;
446    #[test]
447    fn can_enumerate_and_parse_names() {
448        assert_eq!(
449            Network::<()>::names()
450                .into_iter()
451                .map(Network::<()>::from_str)
452                .map(Result::unwrap)
453                .collect::<BTreeSet<_>>(),
454            vec![
455                Network::new_ordered([]),
456                Network::new_unordered_duplicating([]),
457                Network::new_unordered_nonduplicating([]),
458            ]
459            .into_iter()
460            .collect()
461        );
462    }
463}