minbft/usig_msg_order_enforcer.rs
1use std::{
2 borrow::Cow,
3 cmp::Ordering,
4 collections::{binary_heap::PeekMut, BinaryHeap},
5 iter,
6};
7
8use either::Either;
9use usig::{Count, Counter};
10
11use crate::peer_message::usig_message::UsigMessage;
12
13/// Defines a Wrapper for messages of type UsigMessage.
14#[derive(Debug, Clone)]
15#[repr(transparent)]
16struct UsigMessageWrapper<P, Sig>(UsigMessage<P, Sig>);
17
18impl<P, Sig> From<UsigMessageWrapper<P, Sig>> for UsigMessage<P, Sig> {
19 /// Convert the given UsigMessageWrapper to a UsigMessage.
20 fn from(usig_message_wrapper: UsigMessageWrapper<P, Sig>) -> Self {
21 usig_message_wrapper.0
22 }
23}
24
25impl<P, Sig> From<UsigMessage<P, Sig>> for UsigMessageWrapper<P, Sig> {
26 /// Convert the given UsigMessage to a UsigMessageWrapper.
27 fn from(usig_message: UsigMessage<P, Sig>) -> Self {
28 Self(usig_message)
29 }
30}
31
32impl<P, Sig: Counter> Counter for UsigMessageWrapper<P, Sig> {
33 /// Returns the counter of the UsigMessage.
34 fn counter(&self) -> Count {
35 self.0.counter()
36 }
37}
38
39impl<P, Sig: Counter> PartialEq for UsigMessageWrapper<P, Sig> {
40 /// Returns true if the counters of the UsigMessageWrappers are equal, otherwise false.
41 fn eq(&self, other: &Self) -> bool {
42 self.counter().eq(&other.counter())
43 }
44}
45
46impl<P, Sig: Counter> Eq for UsigMessageWrapper<P, Sig> {}
47
48impl<P, Sig: Counter> PartialOrd for UsigMessageWrapper<P, Sig> {
49 /// Partially compares the counters of the UsigMessageWrappers.
50 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51 self.counter()
52 .partial_cmp(&other.counter())
53 .map(|c| c.reverse())
54 }
55}
56
57impl<P, Sig: Counter> Ord for UsigMessageWrapper<P, Sig> {
58 /// Compares the counters of the UsigMessageWrappers.
59 fn cmp(&self, other: &Self) -> Ordering {
60 self.counter().cmp(&other.counter()).reverse()
61 }
62}
63
64/// Defines errors regarding the counter of UsigMessages.
65enum CountCheckerError {
66 /// Already seen UsigMessage based on its counter.
67 AlreadySeen,
68 /// Not the next expected UsigMessage based on its counter.
69 NotNext,
70}
71
72/// The checker for counters of messages of type UsigMessage.
73#[derive(Debug, Clone)]
74struct CountChecker {
75 /// The next expected Counter.
76 next_count: Count,
77}
78
79impl CountChecker {
80 /// Creates a new CountChecker.
81 fn new() -> Self {
82 Self {
83 next_count: Count(0),
84 }
85 }
86
87 /// Increments the next expected count if the given count is the expected one.
88 fn next(&mut self, count: Count) -> Result<(), CountCheckerError> {
89 match count.cmp(&self.next_count) {
90 Ordering::Less => Err(CountCheckerError::AlreadySeen),
91 Ordering::Equal => {
92 self.next_count += 1;
93 Ok(())
94 }
95 Ordering::Greater => Err(CountCheckerError::NotNext),
96 }
97 }
98}
99
100/// Defines the state of the UsigMessageHandler.
101#[derive(Debug, Clone)]
102pub(super) struct UsigMsgOrderEnforcer<P, Sig> {
103 /// Used for assuring the UsigMessages are handled and processed in the right order.
104 count_checker: CountChecker,
105 /// Collects already received, but yet to process UsigMessages
106 /// (UsigMessages that have a smaller Counter are yet to be received).
107 unprocessed: BinaryHeap<UsigMessageWrapper<P, Sig>>, // TODO limit
108}
109
110impl<P, Sig: Counter> Default for UsigMsgOrderEnforcer<P, Sig> {
111 /// Creates a new default UsigMessageHandlerState.
112 fn default() -> Self {
113 Self {
114 count_checker: CountChecker::new(),
115 unprocessed: BinaryHeap::new(),
116 }
117 }
118}
119
120impl<P: Clone, Sig: Counter + Clone> UsigMsgOrderEnforcer<P, Sig> {
121 /// Check if the given UsigMessage is the next one expected based on its
122 /// counter.
123 /// case 1: If the given UsigMessage is the next one expected, an Iterator
124 /// is returned over the given UsigMessage and all other received messages
125 /// of type UsigMessage that have yet to be processed and have counters that
126 /// follow directly.
127 /// case 2: If the given UsigMessage is not the one expected and was already
128 /// seen, it is ignored.
129 /// case 3: If the given UsigMessage is not the one expected and was not yet
130 /// seen, it is stored as unprocessed.
131 ///
132 /// In cases 2 and 3 an empty Iterator is returned.
133 pub(super) fn push_to_handle<'a>(
134 &'a mut self,
135 msg: Cow<'_, impl Into<UsigMessage<P, Sig>> + Clone + Counter>,
136 ) -> impl Iterator<Item = UsigMessage<P, Sig>> + 'a {
137 match self.count_checker.next(msg.counter()) {
138 Ok(()) => {
139 // we have the next message, so yield it and any messages that follow directly
140 Either::Left(
141 iter::once(msg.into_owned().into()).chain(self.yield_to_be_processed()),
142 )
143 }
144 Err(e) => {
145 match e {
146 CountCheckerError::AlreadySeen => {
147 // the given UsigMessage is an old one, so it is ignored
148 }
149 CountCheckerError::NotNext => {
150 // the given UsigMessage is not the next expected message, so it is put in the unprocessed heap
151 // (we might add a message mulitple times to the heap, but those get filtered out in yield_processed)
152 self.unprocessed.push(msg.into_owned().into().into())
153 }
154 }
155 Either::Right(iter::empty())
156 }
157 }
158 }
159
160 /// Returns to-be-processed UsigMessages.
161 fn yield_to_be_processed(&mut self) -> impl Iterator<Item = UsigMessage<P, Sig>> + '_ {
162 iter::from_fn(|| {
163 while let Some(head) = self.unprocessed.peek_mut() {
164 match self.count_checker.next(head.counter()) {
165 Ok(()) => {
166 // we found the next message, so return it
167 return Some(PeekMut::pop(head).into());
168 }
169 Err(CountCheckerError::AlreadySeen) => {
170 // the message is a duplicate, i.e. it was already seen
171 // pop, but ignore it
172 PeekMut::pop(head);
173 continue;
174 }
175 Err(CountCheckerError::NotNext) => {
176 // a hole was found, i.e. there are still missing UsigMessages
177 // therefore, end the iterator
178 return None;
179 }
180 }
181 }
182 // no (more) to-be-processed messages, so end the iterator
183 None
184 })
185 }
186
187 /// Update the last seen counter after a unique [crate::Prepare] is accepted
188 /// when processing a valid NewView.
189 pub(crate) fn update_in_new_view(&mut self, counter_accepted_prep: Count) {
190 while self.count_checker.next_count.0 <= counter_accepted_prep.0 {
191 self.count_checker.next_count += 1;
192 self.unprocessed.pop();
193 }
194 }
195}