1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
use std::{
borrow::Cow,
cmp::Ordering,
collections::{binary_heap::PeekMut, BinaryHeap},
iter,
};
use either::Either;
use usig::{Count, Counter};
use crate::peer_message::usig_message::UsigMessage;
/// Defines a Wrapper for messages of type UsigMessage.
#[derive(Debug, Clone)]
#[repr(transparent)]
struct UsigMessageWrapper<P, Sig>(UsigMessage<P, Sig>);
impl<P, Sig> From<UsigMessageWrapper<P, Sig>> for UsigMessage<P, Sig> {
/// Convert the given UsigMessageWrapper to a UsigMessage.
fn from(usig_message_wrapper: UsigMessageWrapper<P, Sig>) -> Self {
usig_message_wrapper.0
}
}
impl<P, Sig> From<UsigMessage<P, Sig>> for UsigMessageWrapper<P, Sig> {
/// Convert the given UsigMessage to a UsigMessageWrapper.
fn from(usig_message: UsigMessage<P, Sig>) -> Self {
Self(usig_message)
}
}
impl<P, Sig: Counter> Counter for UsigMessageWrapper<P, Sig> {
/// Returns the counter of the UsigMessage.
fn counter(&self) -> Count {
self.0.counter()
}
}
impl<P, Sig: Counter> PartialEq for UsigMessageWrapper<P, Sig> {
/// Returns true if the counters of the UsigMessageWrappers are equal, otherwise false.
fn eq(&self, other: &Self) -> bool {
self.counter().eq(&other.counter())
}
}
impl<P, Sig: Counter> Eq for UsigMessageWrapper<P, Sig> {}
impl<P, Sig: Counter> PartialOrd for UsigMessageWrapper<P, Sig> {
/// Partially compares the counters of the UsigMessageWrappers.
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.counter()
.partial_cmp(&other.counter())
.map(|c| c.reverse())
}
}
impl<P, Sig: Counter> Ord for UsigMessageWrapper<P, Sig> {
/// Compares the counters of the UsigMessageWrappers.
fn cmp(&self, other: &Self) -> Ordering {
self.counter().cmp(&other.counter()).reverse()
}
}
/// Defines errors regarding the counter of UsigMessages.
enum CountCheckerError {
/// Already seen UsigMessage based on its counter.
AlreadySeen,
/// Not the next expected UsigMessage based on its counter.
NotNext,
}
/// The checker for counters of messages of type UsigMessage.
#[derive(Debug, Clone)]
struct CountChecker {
/// The next expected Counter.
next_count: Count,
}
impl CountChecker {
/// Creates a new CountChecker.
fn new() -> Self {
Self {
next_count: Count(0),
}
}
/// Increments the next expected count if the given count is the expected one.
fn next(&mut self, count: Count) -> Result<(), CountCheckerError> {
match count.cmp(&self.next_count) {
Ordering::Less => Err(CountCheckerError::AlreadySeen),
Ordering::Equal => {
self.next_count += 1;
Ok(())
}
Ordering::Greater => Err(CountCheckerError::NotNext),
}
}
}
/// Defines the state of the UsigMessageHandler.
#[derive(Debug, Clone)]
pub(super) struct UsigMsgOrderEnforcer<P, Sig> {
/// Used for assuring the UsigMessages are handled and processed in the right order.
count_checker: CountChecker,
/// Collects already received, but yet to process UsigMessages
/// (UsigMessages that have a smaller Counter are yet to be received).
unprocessed: BinaryHeap<UsigMessageWrapper<P, Sig>>, // TODO limit
}
impl<P, Sig: Counter> Default for UsigMsgOrderEnforcer<P, Sig> {
/// Creates a new default UsigMessageHandlerState.
fn default() -> Self {
Self {
count_checker: CountChecker::new(),
unprocessed: BinaryHeap::new(),
}
}
}
impl<P: Clone, Sig: Counter + Clone> UsigMsgOrderEnforcer<P, Sig> {
/// Check if the given UsigMessage is the next one expected based on its
/// counter.
/// case 1: If the given UsigMessage is the next one expected, an Iterator
/// is returned over the given UsigMessage and all other received messages
/// of type UsigMessage that have yet to be processed and have counters that
/// follow directly.
/// case 2: If the given UsigMessage is not the one expected and was already
/// seen, it is ignored.
/// case 3: If the given UsigMessage is not the one expected and was not yet
/// seen, it is stored as unprocessed.
///
/// In cases 2 and 3 an empty Iterator is returned.
pub(super) fn push_to_handle<'a>(
&'a mut self,
msg: Cow<'_, impl Into<UsigMessage<P, Sig>> + Clone + Counter>,
) -> impl Iterator<Item = UsigMessage<P, Sig>> + 'a {
match self.count_checker.next(msg.counter()) {
Ok(()) => {
// we have the next message, so yield it and any messages that follow directly
Either::Left(
iter::once(msg.into_owned().into()).chain(self.yield_to_be_processed()),
)
}
Err(e) => {
match e {
CountCheckerError::AlreadySeen => {
// the given UsigMessage is an old one, so it is ignored
}
CountCheckerError::NotNext => {
// the given UsigMessage is not the next expected message, so it is put in the unprocessed heap
// (we might add a message mulitple times to the heap, but those get filtered out in yield_processed)
self.unprocessed.push(msg.into_owned().into().into())
}
}
Either::Right(iter::empty())
}
}
}
/// Returns to-be-processed UsigMessages.
fn yield_to_be_processed(&mut self) -> impl Iterator<Item = UsigMessage<P, Sig>> + '_ {
iter::from_fn(|| {
while let Some(head) = self.unprocessed.peek_mut() {
match self.count_checker.next(head.counter()) {
Ok(()) => {
// we found the next message, so return it
return Some(PeekMut::pop(head).into());
}
Err(CountCheckerError::AlreadySeen) => {
// the message is a duplicate, i.e. it was already seen
// pop, but ignore it
PeekMut::pop(head);
continue;
}
Err(CountCheckerError::NotNext) => {
// a hole was found, i.e. there are still missing UsigMessages
// therefore, end the iterator
return None;
}
}
}
// no (more) to-be-processed messages, so end the iterator
None
})
}
/// Update the last seen counter after a unique [crate::Prepare] is accepted
/// when processing a valid NewView.
pub(crate) fn update_in_new_view(&mut self, counter_accepted_prep: Count) {
while self.count_checker.next_count.0 <= counter_accepted_prep.0 {
self.count_checker.next_count += 1;
self.unprocessed.pop();
}
}
}