1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use {
    corosensei::Yielder,
    fibril_core::{Command, Event, Expectation, Id},
    std::collections::{BTreeMap, BTreeSet},
};

pub struct Sdk<'a, M>(pub(crate) &'a Yielder<Event<M>, Command<M>>, pub(crate) Id);

impl<'a, M> Sdk<'a, M> {
    pub fn exit(&self) -> ! {
        self.0.suspend(Command::Exit);
        unreachable!();
    }

    #[must_use]
    pub fn expect(&self, description: impl ToString) -> Expectation {
        let description = description.to_string();
        match self.0.suspend(Command::Expect(description)) {
            Event::ExpectOk(expectation) => expectation,
            _ => unreachable!(),
        }
    }

    pub fn expectation_met(&self, expectation: Expectation) {
        match self.0.suspend(Command::ExpectationMet(expectation)) {
            Event::ExpectationMetOk => {}
            _ => unreachable!(),
        }
    }

    pub fn id(&self) -> Id {
        self.1
    }

    pub fn recv(&self) -> (Id, M) {
        match self.0.suspend(Command::Recv) {
            Event::RecvOk(src, m) => (src, m),
            _ => unreachable!(),
        }
    }

    pub fn recv_btree_map<K, V>(
        &self,
        count: usize,
        filter_map: impl Fn(Id, M) -> Option<(K, V)>,
    ) -> BTreeMap<K, V>
    where
        K: Ord,
    {
        let mut collected = BTreeMap::new();
        while collected.len() < count {
            let (src, msg) = self.recv();
            if let Some((key, value)) = filter_map(src, msg) {
                collected.insert(key, value);
            }
        }
        collected
    }

    pub fn recv_btree_set<V>(
        &self,
        count: usize,
        filter_map: impl Fn(Id, M) -> Option<V>,
    ) -> BTreeSet<V>
    where
        V: Ord,
    {
        let mut collected = BTreeSet::new();
        while collected.len() < count {
            let (src, msg) = self.recv();
            if let Some(value) = filter_map(src, msg) {
                collected.insert(value);
            }
        }
        collected
    }

    /// This is a helper based on [`Sdk::recv_btree_set`] for the case where an actor needs to
    /// wait for `count` messages from distinct recipients (e.g. when awaiting
    /// [quorum](https://en.wikipedia.org/wiki/Quorum_%28distributed_computing%29)).
    ///
    /// `count` will match the number of distinct [`Id`]s for which the `filter` returned
    /// `true` (i.e. accepted messages from the same [`Id`] are only counted once).
    pub fn recv_response_count(&self, count: usize, filter: impl Fn(Id, M) -> bool) {
        self.recv_btree_set(count, |src, msg| filter(src, msg).then_some(src));
    }

    /// This is a helper based on [`Sdk::recv_btree_map`] for the case where an actor needs to
    /// wait for `count` messages from distinct recipients (e.g. when awaiting
    /// [quorum](https://en.wikipedia.org/wiki/Quorum_%28distributed_computing%29)).
    ///
    /// `count` will match the number of distinct [`Id`]s for which the `filter_map` returned
    /// `Some(...)` (i.e. accepted messages from the same [`Id`] are only counted once).
    pub fn recv_responses<V>(
        &self,
        count: usize,
        filter_map: impl Fn(Id, M) -> Option<V>,
    ) -> BTreeMap<Id, V> {
        self.recv_btree_map(count, |src, msg| filter_map(src, msg).map(|v| (src, v)))
    }

    pub fn send(&self, dst: Id, m: M) {
        let input = self.0.suspend(Command::Send(dst, m));
        assert!(matches!(input, Event::SendOk));
    }

    pub fn send_broadcast(&self, dst: impl IntoIterator<Item = Id>, m: &M)
    where
        M: Clone,
    {
        for dst in dst {
            self.send(dst, m.clone());
        }
    }
}