minbft/
usig_msg_order_enforcer.rs

1use std::{
2    borrow::Cow,
3    cmp::Ordering,
4    collections::{binary_heap::PeekMut, BinaryHeap},
5    iter,
6};
7
8use either::Either;
9use usig::{Count, Counter};
10
11use crate::peer_message::usig_message::UsigMessage;
12
13/// Defines a Wrapper for messages of type UsigMessage.
14#[derive(Debug, Clone)]
15#[repr(transparent)]
16struct UsigMessageWrapper<P, Sig>(UsigMessage<P, Sig>);
17
18impl<P, Sig> From<UsigMessageWrapper<P, Sig>> for UsigMessage<P, Sig> {
19    /// Convert the given UsigMessageWrapper to a UsigMessage.
20    fn from(usig_message_wrapper: UsigMessageWrapper<P, Sig>) -> Self {
21        usig_message_wrapper.0
22    }
23}
24
25impl<P, Sig> From<UsigMessage<P, Sig>> for UsigMessageWrapper<P, Sig> {
26    /// Convert the given UsigMessage to a UsigMessageWrapper.
27    fn from(usig_message: UsigMessage<P, Sig>) -> Self {
28        Self(usig_message)
29    }
30}
31
32impl<P, Sig: Counter> Counter for UsigMessageWrapper<P, Sig> {
33    /// Returns the counter of the UsigMessage.
34    fn counter(&self) -> Count {
35        self.0.counter()
36    }
37}
38
39impl<P, Sig: Counter> PartialEq for UsigMessageWrapper<P, Sig> {
40    /// Returns true if the counters of the UsigMessageWrappers are equal, otherwise false.
41    fn eq(&self, other: &Self) -> bool {
42        self.counter().eq(&other.counter())
43    }
44}
45
46impl<P, Sig: Counter> Eq for UsigMessageWrapper<P, Sig> {}
47
48impl<P, Sig: Counter> PartialOrd for UsigMessageWrapper<P, Sig> {
49    /// Partially compares the counters of the UsigMessageWrappers.
50    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51        self.counter()
52            .partial_cmp(&other.counter())
53            .map(|c| c.reverse())
54    }
55}
56
57impl<P, Sig: Counter> Ord for UsigMessageWrapper<P, Sig> {
58    /// Compares the counters of the UsigMessageWrappers.
59    fn cmp(&self, other: &Self) -> Ordering {
60        self.counter().cmp(&other.counter()).reverse()
61    }
62}
63
64/// Defines errors regarding the counter of UsigMessages.
65enum CountCheckerError {
66    /// Already seen UsigMessage based on its counter.
67    AlreadySeen,
68    /// Not the next expected UsigMessage based on its counter.
69    NotNext,
70}
71
72/// The checker for counters of messages of type UsigMessage.
73#[derive(Debug, Clone)]
74struct CountChecker {
75    /// The next expected Counter.
76    next_count: Count,
77}
78
79impl CountChecker {
80    /// Creates a new CountChecker.
81    fn new() -> Self {
82        Self {
83            next_count: Count(0),
84        }
85    }
86
87    /// Increments the next expected count if the given count is the expected one.
88    fn next(&mut self, count: Count) -> Result<(), CountCheckerError> {
89        match count.cmp(&self.next_count) {
90            Ordering::Less => Err(CountCheckerError::AlreadySeen),
91            Ordering::Equal => {
92                self.next_count += 1;
93                Ok(())
94            }
95            Ordering::Greater => Err(CountCheckerError::NotNext),
96        }
97    }
98}
99
100/// Defines the state of the UsigMessageHandler.
101#[derive(Debug, Clone)]
102pub(super) struct UsigMsgOrderEnforcer<P, Sig> {
103    /// Used for assuring the UsigMessages are handled and processed in the right order.
104    count_checker: CountChecker,
105    /// Collects already received, but yet to process UsigMessages
106    /// (UsigMessages that have a smaller Counter are yet to be received).
107    unprocessed: BinaryHeap<UsigMessageWrapper<P, Sig>>, // TODO limit
108}
109
110impl<P, Sig: Counter> Default for UsigMsgOrderEnforcer<P, Sig> {
111    /// Creates a new default UsigMessageHandlerState.
112    fn default() -> Self {
113        Self {
114            count_checker: CountChecker::new(),
115            unprocessed: BinaryHeap::new(),
116        }
117    }
118}
119
120impl<P: Clone, Sig: Counter + Clone> UsigMsgOrderEnforcer<P, Sig> {
121    /// Check if the given UsigMessage is the next one expected based on its
122    /// counter.
123    /// case 1: If the given UsigMessage is the next one expected, an Iterator
124    /// is returned over the given UsigMessage and all other received messages
125    /// of type UsigMessage that have yet to be processed and have counters that
126    /// follow directly.
127    /// case 2: If the given UsigMessage is not the one expected and was already
128    /// seen, it is ignored.
129    /// case 3: If the given UsigMessage is not the one expected and was not yet
130    /// seen, it is stored as unprocessed.
131    ///
132    /// In cases 2 and 3 an empty Iterator is returned.
133    pub(super) fn push_to_handle<'a>(
134        &'a mut self,
135        msg: Cow<'_, impl Into<UsigMessage<P, Sig>> + Clone + Counter>,
136    ) -> impl Iterator<Item = UsigMessage<P, Sig>> + 'a {
137        match self.count_checker.next(msg.counter()) {
138            Ok(()) => {
139                // we have the next message, so yield it and any messages that follow directly
140                Either::Left(
141                    iter::once(msg.into_owned().into()).chain(self.yield_to_be_processed()),
142                )
143            }
144            Err(e) => {
145                match e {
146                    CountCheckerError::AlreadySeen => {
147                        // the given UsigMessage is an old one, so it is ignored
148                    }
149                    CountCheckerError::NotNext => {
150                        // the given UsigMessage is not the next expected message, so it is put in the unprocessed heap
151                        // (we might add a message mulitple times to the heap, but those get filtered out in yield_processed)
152                        self.unprocessed.push(msg.into_owned().into().into())
153                    }
154                }
155                Either::Right(iter::empty())
156            }
157        }
158    }
159
160    /// Returns to-be-processed UsigMessages.
161    fn yield_to_be_processed(&mut self) -> impl Iterator<Item = UsigMessage<P, Sig>> + '_ {
162        iter::from_fn(|| {
163            while let Some(head) = self.unprocessed.peek_mut() {
164                match self.count_checker.next(head.counter()) {
165                    Ok(()) => {
166                        // we found the next message, so return it
167                        return Some(PeekMut::pop(head).into());
168                    }
169                    Err(CountCheckerError::AlreadySeen) => {
170                        // the message is a duplicate, i.e. it was already seen
171                        // pop, but ignore it
172                        PeekMut::pop(head);
173                        continue;
174                    }
175                    Err(CountCheckerError::NotNext) => {
176                        // a hole was found, i.e. there are still missing UsigMessages
177                        // therefore, end the iterator
178                        return None;
179                    }
180                }
181            }
182            // no (more) to-be-processed messages, so end the iterator
183            None
184        })
185    }
186
187    /// Update the last seen counter after a unique [crate::Prepare] is accepted
188    /// when processing a valid NewView.
189    pub(crate) fn update_in_new_view(&mut self, counter_accepted_prep: Count) {
190        while self.count_checker.next_count.0 <= counter_accepted_prep.0 {
191            self.count_checker.next_count += 1;
192            self.unprocessed.pop();
193        }
194    }
195}