fedimint_hbbft/sender_queue/
mod.rs1mod dynamic_honey_badger;
9mod error;
10mod honey_badger;
11mod message;
12mod queueing_honey_badger;
13
14use rand::Rng;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17
18use log::debug;
19
20use crate::traits::EpochT;
21use crate::{ConsensusProtocol, CpStep, Epoched, NodeIdT, Target};
22
23pub use self::error::Error;
24pub use self::message::Message;
25
26pub trait SenderQueueableMessage {
28 type Epoch: EpochT;
30
31 fn is_premature(&self, them: Self::Epoch, max_future_epochs: u64) -> bool;
33
34 fn is_obsolete(&self, them: Self::Epoch) -> bool;
36
37 fn is_accepted(&self, them: Self::Epoch, max_future_epochs: u64) -> bool {
39 !self.is_premature(them, max_future_epochs) && !self.is_obsolete(them)
40 }
41
42 fn first_epoch(&self) -> Self::Epoch;
44}
45
46pub trait SenderQueueableOutput<N, E>
48where
49 N: NodeIdT,
50{
51 fn participant_change(&self) -> Option<BTreeSet<N>>;
59
60 fn output_epoch(&self) -> E;
62}
63
64pub trait SenderQueueableConsensusProtocol: Epoched + ConsensusProtocol {
66 fn max_future_epochs(&self) -> u64;
69}
70
71pub type OutgoingQueue<D> = BTreeMap<
73 <D as ConsensusProtocol>::NodeId,
74 BTreeMap<<D as Epoched>::Epoch, Vec<<D as ConsensusProtocol>::Message>>,
75>;
76
77#[derive(Debug)]
84pub struct SenderQueue<D>
85where
86 D: SenderQueueableConsensusProtocol,
87{
88 algo: D,
90 our_id: D::NodeId,
92 outgoing_queue: OutgoingQueue<D>,
94 peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
97 last_epochs: BTreeMap<D::NodeId, D::Epoch>,
102 participants_after_change: BTreeSet<D::NodeId>,
108 is_removed: bool,
114}
115
116pub type Step<D> = crate::CpStep<SenderQueue<D>>;
118
119impl<D> ConsensusProtocol for SenderQueue<D>
120where
121 D: SenderQueueableConsensusProtocol + Debug,
122 D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
123 D::NodeId: NodeIdT,
124 D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
125{
126 type NodeId = D::NodeId;
127 type Input = D::Input;
128 type Output = D::Output;
129 type Message = Message<D::Message>;
130 type Error = Error<D::Error>;
131 type FaultKind = D::FaultKind;
132
133 fn handle_input<R: Rng>(
134 &mut self,
135 input: Self::Input,
136 rng: &mut R,
137 ) -> Result<CpStep<Self>, Error<D::Error>> {
138 self.handle_input(input, rng)
139 }
140
141 fn handle_message<R: Rng>(
142 &mut self,
143 sender_id: &D::NodeId,
144 message: Self::Message,
145 rng: &mut R,
146 ) -> Result<CpStep<Self>, Error<D::Error>> {
147 self.handle_message(sender_id, message, rng)
148 }
149
150 fn terminated(&self) -> bool {
151 self.is_removed
152 }
153
154 fn our_id(&self) -> &D::NodeId {
155 &self.our_id
156 }
157}
158
159impl<D> SenderQueue<D>
160where
161 D: SenderQueueableConsensusProtocol + Debug,
162 D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
163 D::NodeId: NodeIdT,
164 D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
165{
166 pub fn builder<I>(algo: D, peer_ids: I) -> SenderQueueBuilder<D>
169 where
170 I: Iterator<Item = D::NodeId>,
171 {
172 SenderQueueBuilder::new(algo, peer_ids)
173 }
174
175 pub fn handle_input<R: Rng>(
177 &mut self,
178 input: D::Input,
179 rng: &mut R,
180 ) -> Result<CpStep<Self>, Error<D::Error>> {
181 if self.is_removed {
182 return Ok(Step::<D>::default());
183 }
184 self.apply(|algo| algo.handle_input(input, rng))
185 }
186
187 pub fn handle_message<R: Rng>(
191 &mut self,
192 sender_id: &D::NodeId,
193 message: Message<D::Message>,
194 rng: &mut R,
195 ) -> Result<CpStep<Self>, Error<D::Error>> {
196 if self.is_removed {
197 return Ok(Step::<D>::default());
198 }
199 match message {
200 Message::EpochStarted(epoch) => Ok(self.handle_epoch_started(sender_id, epoch)),
201 Message::Algo(msg) => self.handle_message_content(sender_id, msg, rng),
202 }
203 }
204
205 pub fn inner(&self) -> &D {
207 &self.algo
208 }
209
210 pub fn is_removed(&self) -> bool {
212 self.is_removed
213 }
214
215 fn apply<F>(&mut self, f: F) -> Result<CpStep<Self>, Error<D::Error>>
218 where
219 F: FnOnce(&mut D) -> Result<CpStep<D>, D::Error>,
220 {
221 let mut step = f(&mut self.algo).map_err(Error::Apply)?;
222 let mut sender_queue_step = self.update_epoch(&step);
223 self.defer_messages(&mut step);
224 sender_queue_step.extend(step.map(|output| output, |fault| fault, Message::from));
225 Ok(sender_queue_step)
226 }
227
228 fn handle_epoch_started(&mut self, sender_id: &D::NodeId, epoch: D::Epoch) -> CpStep<Self> {
230 self.peer_epochs
231 .entry(sender_id.clone())
232 .and_modify(|e| {
233 if *e < epoch {
234 *e = epoch;
235 }
236 })
237 .or_insert(epoch);
238 if !self.remove_participant_if_old(sender_id) {
239 self.process_new_epoch(sender_id, epoch)
240 } else {
241 Step::<D>::default()
242 }
243 }
244
245 #[allow(clippy::needless_collect)]
247 fn process_new_epoch(&mut self, sender_id: &D::NodeId, epoch: D::Epoch) -> CpStep<Self> {
248 let queue = match self.outgoing_queue.get_mut(sender_id) {
249 None => return CpStep::<Self>::default(),
250 Some(queue) => queue,
251 };
252 let earlier_keys: Vec<_> = queue
253 .keys()
254 .cloned()
255 .take_while(|this_epoch| *this_epoch <= epoch)
256 .collect();
257 earlier_keys
258 .into_iter()
259 .filter_map(|key| queue.remove(&key))
260 .flatten()
261 .filter(|msg| !msg.is_obsolete(epoch))
262 .map(|msg| Target::node(sender_id.clone()).message(Message::Algo(msg)))
263 .into()
264 }
265
266 fn handle_message_content<R: Rng>(
268 &mut self,
269 sender_id: &D::NodeId,
270 content: D::Message,
271 rng: &mut R,
272 ) -> Result<CpStep<Self>, Error<D::Error>> {
273 self.apply(|algo| algo.handle_message(sender_id, content, rng))
274 }
275
276 fn update_epoch(&mut self, step: &CpStep<D>) -> CpStep<Self> {
278 if step.output.is_empty() {
279 return Step::<D>::default();
280 }
281 let mut send_last_epoch_started = false;
284 for batch in &step.output {
286 if let Some(next_participants) = batch.participant_change() {
287 for id in &next_participants {
289 if id != self.our_id() {
290 self.peer_epochs.entry(id.clone()).or_default();
291 if let Some(&last) = self.last_epochs.get(id) {
292 if last < batch.output_epoch() {
293 self.last_epochs.remove(id);
294 }
295 }
296 }
297 }
298 debug!(
299 "Participants after the last change: {:?}",
300 self.participants_after_change
301 );
302 debug!("Next participants: {:?}", next_participants);
303 for id in self
305 .participants_after_change
306 .clone()
307 .difference(&next_participants)
308 {
309 self.remove_participant_after(id, &batch.output_epoch());
311 }
312 if self.participants_after_change.contains(&self.our_id)
313 && !next_participants.contains(&self.our_id)
314 {
315 send_last_epoch_started = true;
316 }
317 self.participants_after_change = next_participants;
318 }
319 }
320 if !self.is_removed || send_last_epoch_started {
321 let msg = Message::EpochStarted(self.algo.epoch());
323 Target::all().message(msg).into()
324 } else {
325 Step::<D>::default()
328 }
329 }
330
331 fn defer_messages(&mut self, step: &mut CpStep<D>) {
336 let max_future_epochs = self.algo.max_future_epochs();
337 for (id, message) in step.defer_messages(&self.peer_epochs, max_future_epochs) {
339 self.outgoing_queue
340 .entry(id)
341 .or_default()
342 .entry(message.first_epoch())
343 .or_default()
344 .push(message);
345 }
346 }
347
348 fn remove_participant_after(&mut self, id: &D::NodeId, last_epoch: &D::Epoch) -> bool {
352 self.last_epochs.insert(id.clone(), *last_epoch);
353 self.remove_participant_if_old(id)
354 }
355
356 fn remove_participant_if_old(&mut self, id: &D::NodeId) -> bool {
367 let last_epoch = if let Some(epoch) = self.last_epochs.get(id) {
368 *epoch
369 } else {
370 return false;
371 };
372 if last_epoch >= self.algo.epoch() {
373 return false;
374 }
375 if id == self.our_id() {
376 self.is_removed = true;
377 } else {
378 if let Some(peer_epoch) = self.peer_epochs.get(id) {
379 if last_epoch >= *peer_epoch {
380 return false;
381 }
382 }
383 self.peer_epochs.remove(id);
384 self.outgoing_queue.remove(id);
385 }
386 self.last_epochs.remove(id);
387 true
388 }
389
390 pub fn algo(&self) -> &D {
392 &self.algo
393 }
394
395 pub fn algo_mut(&mut self) -> &mut D {
397 &mut self.algo
398 }
399}
400
401pub struct SenderQueueBuilder<D>
404where
405 D: SenderQueueableConsensusProtocol,
406{
407 algo: D,
408 peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
409}
410
411impl<D> SenderQueueBuilder<D>
412where
413 D: SenderQueueableConsensusProtocol + Debug,
414 D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
415 D::NodeId: NodeIdT,
416 D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
417{
418 pub fn new<I>(algo: D, peer_ids: I) -> Self
420 where
421 I: Iterator<Item = D::NodeId>,
422 {
423 SenderQueueBuilder {
424 algo,
425 peer_epochs: peer_ids.map(|id| (id, D::Epoch::default())).collect(),
426 }
427 }
428
429 pub fn peer_epochs(mut self, peer_epochs: BTreeMap<D::NodeId, D::Epoch>) -> Self {
431 self.peer_epochs = peer_epochs;
432 self
433 }
434
435 pub fn build(self, our_id: D::NodeId) -> (SenderQueue<D>, CpStep<SenderQueue<D>>) {
437 let epoch = self.algo.epoch();
438 let sq = SenderQueue {
439 algo: self.algo,
440 our_id,
441 outgoing_queue: BTreeMap::new(),
442 peer_epochs: self.peer_epochs,
443 last_epochs: BTreeMap::new(),
444 participants_after_change: BTreeSet::new(),
445 is_removed: false,
446 };
447 let step = Target::all().message(Message::EpochStarted(epoch)).into();
448 (sq, step)
449 }
450}