fedimint_hbbft/sender_queue/
mod.rs

1//! # Sender queue
2//!
3//! A sender queue allows a `ConsensusProtocol` that outputs `Epoched` messages to buffer those outgoing
4//! messages based on their epochs. A message is sent to its recipient only when the recipient's
5//! epoch matches the epoch of the message. Thus no queueing is required for incoming messages since
6//! any incoming messages with non-matching epochs can be safely discarded.
7
8mod dynamic_honey_badger;
9mod error;
10mod honey_badger;
11mod message;
12mod queueing_honey_badger;
13
14use rand::Rng;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17
18use log::debug;
19
20use crate::traits::EpochT;
21use crate::{ConsensusProtocol, CpStep, Epoched, NodeIdT, Target};
22
23pub use self::error::Error;
24pub use self::message::Message;
25
26/// A message type that is suitable for use with a sender queue.
27pub trait SenderQueueableMessage {
28    /// The epoch type of the wrapped algorithm.
29    type Epoch: EpochT;
30
31    /// Whether the message needs to be deferred.
32    fn is_premature(&self, them: Self::Epoch, max_future_epochs: u64) -> bool;
33
34    /// Whether the epoch of the message is behind `them`.
35    fn is_obsolete(&self, them: Self::Epoch) -> bool;
36
37    /// Whether the message is neither obsolete nor premature.
38    fn is_accepted(&self, them: Self::Epoch, max_future_epochs: u64) -> bool {
39        !self.is_premature(them, max_future_epochs) && !self.is_obsolete(them)
40    }
41
42    /// Returns the earliest epoch in which this message can be handled.
43    fn first_epoch(&self) -> Self::Epoch;
44}
45
46/// An output type compatible with the sender queue.
47pub trait SenderQueueableOutput<N, E>
48where
49    N: NodeIdT,
50{
51    /// Returns the set of participants in the next epoch. New participants should be added to the
52    /// set of peers for tracking their epochs. Old participants - ones that appear only among
53    /// current participants - should be scheduled for removal from the set of peers in an orderly
54    /// manner making sure that all messages those participants are entitled to are delivered to
55    /// them.
56    ///
57    /// The common case of no change in the set of participants is denoted by `None`.
58    fn participant_change(&self) -> Option<BTreeSet<N>>;
59
60    /// The epoch in which the output was produced.
61    fn output_epoch(&self) -> E;
62}
63
64/// A `ConsensusProtocol` that can be wrapped by a sender queue.
65pub trait SenderQueueableConsensusProtocol: Epoched + ConsensusProtocol {
66    /// The maximum number of subsequent future epochs that the `ConsensusProtocol` is allowed to handle
67    /// messages for.
68    fn max_future_epochs(&self) -> u64;
69}
70
71/// A map with outgoing messages, per epoch and per target node.
72pub type OutgoingQueue<D> = BTreeMap<
73    <D as ConsensusProtocol>::NodeId,
74    BTreeMap<<D as Epoched>::Epoch, Vec<<D as ConsensusProtocol>::Message>>,
75>;
76
77/// An instance of `ConsensusProtocol` wrapped with a queue of outgoing messages, that is, a sender
78/// queue. This wrapping ensures that the messages sent to remote instances lead to progress of the
79/// entire consensus network. In particular, messages to lagging remote nodes are queued and sent
80/// only when those nodes' epochs match the queued messages' epochs. Thus all nodes can handle
81/// incoming messages without queueing them and can ignore messages whose epochs are not currently
82/// acccepted.
83#[derive(Debug)]
84pub struct SenderQueue<D>
85where
86    D: SenderQueueableConsensusProtocol,
87{
88    /// The managed `ConsensusProtocol` instance.
89    algo: D,
90    /// Our node ID.
91    our_id: D::NodeId,
92    /// Messages that couldn't be handled yet by remote nodes.
93    outgoing_queue: OutgoingQueue<D>,
94    /// The set of all remote nodes on the network including validator as well as non-validator
95    /// (observer) nodes together with their epochs as of the last communication.
96    peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
97    /// The set of previously participating nodes now removed from the network. Each node is marked
98    /// with an epoch after which it left. The node is a member of this set from the epoch when it
99    /// was voted to be removed and until all messages have been delivered to it for all epochs in
100    /// which it was still a participant.
101    last_epochs: BTreeMap<D::NodeId, D::Epoch>,
102    /// Participants of the managed algorithm after the latest change of the participant set. If the
103    /// set of participants never changes, this set remains empty and unused. If the algorithm
104    /// initiates a ballot to change the validators, the sender queue has to remember the new set of
105    /// participants (validators both current and proposed) in order to roll the ballot back if it
106    /// fails to progress.
107    participants_after_change: BTreeSet<D::NodeId>,
108    /// A flag that gets set when this node is removed from the set of participants. When it is set,
109    /// the node broadcasts the last `EpochStarted` and then no more messages. The flag can be reset
110    /// to `false` only by restarting the node. In case of `DynamicHoneyBadger` or
111    /// `QueueingHoneyBadger`, it can be restarted on receipt of a join plan where this node is a
112    /// validator.
113    is_removed: bool,
114}
115
116/// A `SenderQueue` step. The output corresponds to the wrapped algorithm.
117pub type Step<D> = crate::CpStep<SenderQueue<D>>;
118
119impl<D> ConsensusProtocol for SenderQueue<D>
120where
121    D: SenderQueueableConsensusProtocol + Debug,
122    D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
123    D::NodeId: NodeIdT,
124    D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
125{
126    type NodeId = D::NodeId;
127    type Input = D::Input;
128    type Output = D::Output;
129    type Message = Message<D::Message>;
130    type Error = Error<D::Error>;
131    type FaultKind = D::FaultKind;
132
133    fn handle_input<R: Rng>(
134        &mut self,
135        input: Self::Input,
136        rng: &mut R,
137    ) -> Result<CpStep<Self>, Error<D::Error>> {
138        self.handle_input(input, rng)
139    }
140
141    fn handle_message<R: Rng>(
142        &mut self,
143        sender_id: &D::NodeId,
144        message: Self::Message,
145        rng: &mut R,
146    ) -> Result<CpStep<Self>, Error<D::Error>> {
147        self.handle_message(sender_id, message, rng)
148    }
149
150    fn terminated(&self) -> bool {
151        self.is_removed
152    }
153
154    fn our_id(&self) -> &D::NodeId {
155        &self.our_id
156    }
157}
158
159impl<D> SenderQueue<D>
160where
161    D: SenderQueueableConsensusProtocol + Debug,
162    D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
163    D::NodeId: NodeIdT,
164    D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
165{
166    /// Returns a new `SenderQueueBuilder` configured to manage a given `DynamicHoneyBadger`
167    /// instance.
168    pub fn builder<I>(algo: D, peer_ids: I) -> SenderQueueBuilder<D>
169    where
170        I: Iterator<Item = D::NodeId>,
171    {
172        SenderQueueBuilder::new(algo, peer_ids)
173    }
174
175    /// Handles an input. This will call the wrapped algorithm's `handle_input`.
176    pub fn handle_input<R: Rng>(
177        &mut self,
178        input: D::Input,
179        rng: &mut R,
180    ) -> Result<CpStep<Self>, Error<D::Error>> {
181        if self.is_removed {
182            return Ok(Step::<D>::default());
183        }
184        self.apply(|algo| algo.handle_input(input, rng))
185    }
186
187    /// Handles a message received from `sender_id`.
188    ///
189    /// This must be called with every message we receive from another node.
190    pub fn handle_message<R: Rng>(
191        &mut self,
192        sender_id: &D::NodeId,
193        message: Message<D::Message>,
194        rng: &mut R,
195    ) -> Result<CpStep<Self>, Error<D::Error>> {
196        if self.is_removed {
197            return Ok(Step::<D>::default());
198        }
199        match message {
200            Message::EpochStarted(epoch) => Ok(self.handle_epoch_started(sender_id, epoch)),
201            Message::Algo(msg) => self.handle_message_content(sender_id, msg, rng),
202        }
203    }
204
205    /// Returns an immutable reference to the wrapped algorithm.
206    pub fn inner(&self) -> &D {
207        &self.algo
208    }
209
210    /// Returns `true` iff the node has been removed from the list of participants.
211    pub fn is_removed(&self) -> bool {
212        self.is_removed
213    }
214
215    /// Applies `f` to the wrapped algorithm and converts the step in the result to a sender queue
216    /// step, deferring or dropping messages, where necessary.
217    fn apply<F>(&mut self, f: F) -> Result<CpStep<Self>, Error<D::Error>>
218    where
219        F: FnOnce(&mut D) -> Result<CpStep<D>, D::Error>,
220    {
221        let mut step = f(&mut self.algo).map_err(Error::Apply)?;
222        let mut sender_queue_step = self.update_epoch(&step);
223        self.defer_messages(&mut step);
224        sender_queue_step.extend(step.map(|output| output, |fault| fault, Message::from));
225        Ok(sender_queue_step)
226    }
227
228    /// Handles an epoch start announcement.
229    fn handle_epoch_started(&mut self, sender_id: &D::NodeId, epoch: D::Epoch) -> CpStep<Self> {
230        self.peer_epochs
231            .entry(sender_id.clone())
232            .and_modify(|e| {
233                if *e < epoch {
234                    *e = epoch;
235                }
236            })
237            .or_insert(epoch);
238        if !self.remove_participant_if_old(sender_id) {
239            self.process_new_epoch(sender_id, epoch)
240        } else {
241            Step::<D>::default()
242        }
243    }
244
245    /// Processes an announcement of a new epoch update received from a remote node.
246    #[allow(clippy::needless_collect)]
247    fn process_new_epoch(&mut self, sender_id: &D::NodeId, epoch: D::Epoch) -> CpStep<Self> {
248        let queue = match self.outgoing_queue.get_mut(sender_id) {
249            None => return CpStep::<Self>::default(),
250            Some(queue) => queue,
251        };
252        let earlier_keys: Vec<_> = queue
253            .keys()
254            .cloned()
255            .take_while(|this_epoch| *this_epoch <= epoch)
256            .collect();
257        earlier_keys
258            .into_iter()
259            .filter_map(|key| queue.remove(&key))
260            .flatten()
261            .filter(|msg| !msg.is_obsolete(epoch))
262            .map(|msg| Target::node(sender_id.clone()).message(Message::Algo(msg)))
263            .into()
264    }
265
266    /// Handles a Honey Badger algorithm message in a given epoch.
267    fn handle_message_content<R: Rng>(
268        &mut self,
269        sender_id: &D::NodeId,
270        content: D::Message,
271        rng: &mut R,
272    ) -> Result<CpStep<Self>, Error<D::Error>> {
273        self.apply(|algo| algo.handle_message(sender_id, content, rng))
274    }
275
276    /// Updates the current Honey Badger epoch.
277    fn update_epoch(&mut self, step: &CpStep<D>) -> CpStep<Self> {
278        if step.output.is_empty() {
279            return Step::<D>::default();
280        }
281        // If this node removes itself after this epoch, it should send an `EpochStarted` with the
282        // next epoch and then go offline.
283        let mut send_last_epoch_started = false;
284        // Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
285        for batch in &step.output {
286            if let Some(next_participants) = batch.participant_change() {
287                // Insert candidates.
288                for id in &next_participants {
289                    if id != self.our_id() {
290                        self.peer_epochs.entry(id.clone()).or_default();
291                        if let Some(&last) = self.last_epochs.get(id) {
292                            if last < batch.output_epoch() {
293                                self.last_epochs.remove(id);
294                            }
295                        }
296                    }
297                }
298                debug!(
299                    "Participants after the last change: {:?}",
300                    self.participants_after_change
301                );
302                debug!("Next participants: {:?}", next_participants);
303                // Remove obsolete participants.
304                for id in self
305                    .participants_after_change
306                    .clone()
307                    .difference(&next_participants)
308                {
309                    // Begin the participant removal process.
310                    self.remove_participant_after(id, &batch.output_epoch());
311                }
312                if self.participants_after_change.contains(&self.our_id)
313                    && !next_participants.contains(&self.our_id)
314                {
315                    send_last_epoch_started = true;
316                }
317                self.participants_after_change = next_participants;
318            }
319        }
320        if !self.is_removed || send_last_epoch_started {
321            // Announce the new epoch.
322            let msg = Message::EpochStarted(self.algo.epoch());
323            Target::all().message(msg).into()
324        } else {
325            // If removed, do not announce the new epoch to prevent peers from sending messages to
326            // this node.
327            Step::<D>::default()
328        }
329    }
330
331    /// Removes any messages to nodes at earlier epochs from the given `Step`. This may involve
332    /// decomposing a `Target::all()` message into `Target::Nodes` messages and sending some of the
333    /// resulting messages while placing onto the queue those remaining messages whose recipient is
334    /// currently at an earlier epoch.
335    fn defer_messages(&mut self, step: &mut CpStep<D>) {
336        let max_future_epochs = self.algo.max_future_epochs();
337        // Append the deferred messages onto the queues.
338        for (id, message) in step.defer_messages(&self.peer_epochs, max_future_epochs) {
339            self.outgoing_queue
340                .entry(id)
341                .or_default()
342                .entry(message.first_epoch())
343                .or_default()
344                .push(message);
345        }
346    }
347
348    /// Removes a given old participant after a specified epoch if that participant has become
349    /// superseded by a new set of participants of which it is not a member. Returns `true` if the
350    /// participant has been removed and `false` otherwise.
351    fn remove_participant_after(&mut self, id: &D::NodeId, last_epoch: &D::Epoch) -> bool {
352        self.last_epochs.insert(id.clone(), *last_epoch);
353        self.remove_participant_if_old(id)
354    }
355
356    /// Removes a given old participant if it has been scheduled for removal as a result of being
357    /// superseded by a new set of participants of which it is not a member. The participant is
358    /// removed if
359    ///
360    /// 1. its epoch is newer than its last epoch, or
361    ///
362    /// 2. the epoch of the managed algorithm instance is newer than the last epoch and the sender
363    /// queue has sent all messages for all epochs up to the last epoch to the participant.
364    ///
365    /// Returns `true` if the participant has been removed and `false` otherwise.
366    fn remove_participant_if_old(&mut self, id: &D::NodeId) -> bool {
367        let last_epoch = if let Some(epoch) = self.last_epochs.get(id) {
368            *epoch
369        } else {
370            return false;
371        };
372        if last_epoch >= self.algo.epoch() {
373            return false;
374        }
375        if id == self.our_id() {
376            self.is_removed = true;
377        } else {
378            if let Some(peer_epoch) = self.peer_epochs.get(id) {
379                if last_epoch >= *peer_epoch {
380                    return false;
381                }
382            }
383            self.peer_epochs.remove(id);
384            self.outgoing_queue.remove(id);
385        }
386        self.last_epochs.remove(id);
387        true
388    }
389
390    /// Returns a reference to the managed algorithm.
391    pub fn algo(&self) -> &D {
392        &self.algo
393    }
394
395    /// Returns a mutable reference to the managed algorithm.
396    pub fn algo_mut(&mut self) -> &mut D {
397        &mut self.algo
398    }
399}
400
401/// A builder of a Honey Badger with a sender queue. It configures the parameters and creates a new
402/// instance of `SenderQueue`.
403pub struct SenderQueueBuilder<D>
404where
405    D: SenderQueueableConsensusProtocol,
406{
407    algo: D,
408    peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
409}
410
411impl<D> SenderQueueBuilder<D>
412where
413    D: SenderQueueableConsensusProtocol + Debug,
414    D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
415    D::NodeId: NodeIdT,
416    D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
417{
418    /// Creates a new builder, with an empty outgoing queue and the specified known peers.
419    pub fn new<I>(algo: D, peer_ids: I) -> Self
420    where
421        I: Iterator<Item = D::NodeId>,
422    {
423        SenderQueueBuilder {
424            algo,
425            peer_epochs: peer_ids.map(|id| (id, D::Epoch::default())).collect(),
426        }
427    }
428
429    /// Sets the peer epochs.
430    pub fn peer_epochs(mut self, peer_epochs: BTreeMap<D::NodeId, D::Epoch>) -> Self {
431        self.peer_epochs = peer_epochs;
432        self
433    }
434
435    /// Creates a new sender queue and returns the `Step` with the initial message.
436    pub fn build(self, our_id: D::NodeId) -> (SenderQueue<D>, CpStep<SenderQueue<D>>) {
437        let epoch = self.algo.epoch();
438        let sq = SenderQueue {
439            algo: self.algo,
440            our_id,
441            outgoing_queue: BTreeMap::new(),
442            peer_epochs: self.peer_epochs,
443            last_epochs: BTreeMap::new(),
444            participants_after_change: BTreeSet::new(),
445            is_removed: false,
446        };
447        let step = Target::all().message(Message::EpochStarted(epoch)).into();
448        (sq, step)
449    }
450}