fedimint_hbbft/
traits.rs

1//! Common supertraits for consensus protocols.
2
3use std::collections::BTreeMap;
4use std::error::Error;
5use std::fmt::{Debug, Display};
6use std::hash::Hash;
7use std::iter::once;
8
9use rand::Rng;
10use serde::{de::DeserializeOwned, Serialize};
11
12use crate::fault_log::{Fault, FaultLog};
13use crate::sender_queue::SenderQueueableMessage;
14use crate::{Target, TargetedMessage};
15
16/// A transaction, user message, or other user data.
17pub trait Contribution: Eq + Debug + Hash + Send + Sync {}
18impl<C> Contribution for C where C: Eq + Debug + Hash + Send + Sync {}
19
20/// A peer node's unique identifier.
21pub trait NodeIdT: Eq + Ord + Clone + Debug + Hash + Send + Sync {}
22impl<N> NodeIdT for N where N: Eq + Ord + Clone + Debug + Hash + Send + Sync {}
23
24/// A consensus protocol fault.
25pub trait FaultT: Clone + Debug + Error + PartialEq {}
26impl<N> FaultT for N where N: Clone + Debug + Error + PartialEq {}
27
28/// Messages.
29pub trait Message: Debug + Send + Sync {}
30impl<M> Message for M where M: Debug + Send + Sync {}
31
32/// Session identifiers.
33pub trait SessionIdT: Display + Serialize + Send + Sync + Clone + Debug {}
34impl<S> SessionIdT for S where S: Display + Serialize + Send + Sync + Clone + Debug {}
35
36/// Epochs.
37pub trait EpochT: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {}
38impl<E> EpochT for E where E: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {}
39
40/// Single algorithm step outcome.
41///
42/// Each time input (typically in the form of user input or incoming network messages) is provided
43/// to an instance of an algorithm, a `Step` is produced, potentially containing output values,
44/// a fault log, and network messages.
45///
46/// Any `Step` **must always be used** by the client application; at the very least the resulting
47/// messages must be queued.
48///
49/// ## Handling unused Steps
50///
51/// In the (rare) case of a `Step` not being of any interest at all, instead of discarding it
52/// through `let _ = ...` or similar constructs, the implicit assumption should explicitly be
53/// checked instead:
54///
55/// ```ignore
56/// assert!(alg.propose(123).expect("Could not propose value").is_empty(),
57///         "Algorithm will never output anything on first proposal");
58/// ```
59///
60/// If an edge case occurs and outgoing messages are generated as a result, the `assert!` will
61/// catch it, instead of potentially stalling the algorithm.
62#[must_use = "The algorithm step result must be used."]
63#[derive(Debug)]
64pub struct Step<M, O, N, F: Error> {
65    /// The algorithm's output, after consensus has been reached. This is guaranteed to be the same
66    /// in all nodes.
67    pub output: Vec<O>,
68    /// A list of nodes that are not following consensus, together with information about the
69    /// detected misbehavior.
70    pub fault_log: FaultLog<N, F>,
71    /// A list of messages that must be sent to other nodes. Each entry contains a message and a
72    /// `Target`.
73    pub messages: Vec<TargetedMessage<M, N>>,
74}
75
76impl<M, O, N, F> Default for Step<M, O, N, F>
77where
78    F: Error,
79{
80    fn default() -> Self {
81        Step {
82            output: Vec::default(),
83            fault_log: FaultLog::default(),
84            messages: Vec::default(),
85        }
86    }
87}
88
89impl<M, O, N, F> Step<M, O, N, F>
90where
91    F: Error,
92{
93    /// Returns the same step, with the given additional output.
94    pub fn with_output<T: Into<Option<O>>>(mut self, output: T) -> Self {
95        self.output.extend(output.into());
96        self
97    }
98
99    /// Converts `self` into a step of another type, given conversion methods for output, faults,
100    /// and messages.
101    pub fn map<M2, O2, F2, FO, FF, FM>(
102        self,
103        f_out: FO,
104        f_fail: FF,
105        mut f_msg: FM,
106    ) -> Step<M2, O2, N, F2>
107    where
108        F2: Error,
109        FO: FnMut(O) -> O2,
110        FF: FnMut(F) -> F2,
111        FM: FnMut(M) -> M2,
112    {
113        Step {
114            output: self.output.into_iter().map(f_out).collect(),
115            fault_log: self.fault_log.map(f_fail),
116            messages: self
117                .messages
118                .into_iter()
119                .map(|tm| tm.map(&mut f_msg))
120                .collect(),
121        }
122    }
123
124    /// Extends `self` with `other`s messages and fault logs, and returns `other.output`.
125    #[must_use]
126    pub fn extend_with<M2, O2, F2, FF, FM>(
127        &mut self,
128        other: Step<M2, O2, N, F2>,
129        f_fail: FF,
130        mut f_msg: FM,
131    ) -> Vec<O2>
132    where
133        F2: Error,
134        FF: FnMut(F2) -> F,
135        FM: FnMut(M2) -> M,
136    {
137        let fails = other.fault_log.map(f_fail);
138        self.fault_log.extend(fails);
139        let msgs = other.messages.into_iter().map(|tm| tm.map(&mut f_msg));
140        self.messages.extend(msgs);
141        other.output
142    }
143
144    /// Adds the outputs, fault logs and messages of `other` to `self`.
145    pub fn extend(&mut self, other: Self) {
146        self.output.extend(other.output);
147        self.fault_log.extend(other.fault_log);
148        self.messages.extend(other.messages);
149    }
150
151    /// Extends this step with `other` and returns the result.
152    pub fn join(mut self, other: Self) -> Self {
153        self.extend(other);
154        self
155    }
156
157    /// Returns `true` if there are no messages, faults or outputs.
158    pub fn is_empty(&self) -> bool {
159        self.output.is_empty() && self.fault_log.is_empty() && self.messages.is_empty()
160    }
161}
162
163impl<M, O, N, F> From<FaultLog<N, F>> for Step<M, O, N, F>
164where
165    F: Error,
166{
167    fn from(fault_log: FaultLog<N, F>) -> Self {
168        Step {
169            fault_log,
170            ..Step::default()
171        }
172    }
173}
174
175impl<M, O, N, F> From<Fault<N, F>> for Step<M, O, N, F>
176where
177    F: Error,
178{
179    fn from(fault: Fault<N, F>) -> Self {
180        Step {
181            fault_log: fault.into(),
182            ..Step::default()
183        }
184    }
185}
186
187impl<M, O, N, F> From<TargetedMessage<M, N>> for Step<M, O, N, F>
188where
189    F: Error,
190{
191    fn from(msg: TargetedMessage<M, N>) -> Self {
192        Step {
193            messages: once(msg).collect(),
194            ..Step::default()
195        }
196    }
197}
198
199impl<I, M, O, N, F> From<I> for Step<M, O, N, F>
200where
201    I: IntoIterator<Item = TargetedMessage<M, N>>,
202    F: Error,
203{
204    fn from(msgs: I) -> Self {
205        Step {
206            messages: msgs.into_iter().collect(),
207            ..Step::default()
208        }
209    }
210}
211
212/// An interface to objects with epoch numbers. Different algorithms may have different internal
213/// notion of _epoch_. This interface summarizes the properties that are essential for the message
214/// sender queue.
215pub trait Epoched {
216    /// Type of epoch.
217    type Epoch: EpochT;
218
219    /// Returns the object's epoch number.
220    fn epoch(&self) -> Self::Epoch;
221}
222
223/// An alias for the type of `Step` returned by `D`'s methods.
224pub type CpStep<D> = Step<
225    <D as ConsensusProtocol>::Message,
226    <D as ConsensusProtocol>::Output,
227    <D as ConsensusProtocol>::NodeId,
228    <D as ConsensusProtocol>::FaultKind,
229>;
230
231impl<'i, M, O, N, F> Step<M, O, N, F>
232where
233    N: NodeIdT,
234    M: 'i + Clone + SenderQueueableMessage,
235    F: Error,
236{
237    /// Removes and returns any messages that are not yet accepted by remote nodes according to the
238    /// mapping `remote_epochs`. This way the returned messages are postponed until later, and the
239    /// remaining messages can be sent to remote nodes without delay.
240    pub fn defer_messages(
241        &mut self,
242        peer_epochs: &BTreeMap<N, M::Epoch>,
243        max_future_epochs: u64,
244    ) -> Vec<(N, M)> {
245        let mut deferred_msgs: Vec<(N, M)> = Vec::new();
246        let mut passed_msgs: Vec<_> = Vec::new();
247        for msg in self.messages.drain(..) {
248            match msg.target.clone() {
249                Target::Nodes(mut ids) => {
250                    let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
251                    let is_obsolete = |&them| msg.message.is_obsolete(them);
252                    for (id, them) in peer_epochs {
253                        if ids.contains(id) {
254                            if is_premature(them) {
255                                deferred_msgs.push((id.clone(), msg.message.clone()));
256                                ids.remove(id);
257                            } else if is_obsolete(them) {
258                                ids.remove(id);
259                            }
260                        }
261                    }
262                    if !ids.is_empty() {
263                        passed_msgs.push(Target::Nodes(ids).message(msg.message));
264                    }
265                }
266                Target::AllExcept(mut exclude) => {
267                    let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
268                    let is_obsolete = |&them| msg.message.is_obsolete(them);
269                    for (id, them) in peer_epochs {
270                        if !exclude.contains(id) {
271                            if is_premature(them) {
272                                deferred_msgs.push((id.clone(), msg.message.clone()));
273                                exclude.insert(id.clone());
274                            } else if is_obsolete(them) {
275                                exclude.insert(id.clone());
276                            }
277                        }
278                    }
279                    passed_msgs.push(Target::AllExcept(exclude).message(msg.message));
280                }
281            }
282        }
283        self.messages.extend(passed_msgs);
284        deferred_msgs
285    }
286}
287
288/// A consensus protocol that defines a message flow.
289///
290/// Many algorithms require an RNG which must be supplied on each call. It is up to the caller to
291/// ensure that this random number generator is cryptographically secure.
292pub trait ConsensusProtocol: Send + Sync {
293    /// Unique node identifier.
294    type NodeId: NodeIdT;
295    /// The input provided by the user.
296    type Input;
297    /// The output type. Some algorithms return an output exactly once, others return multiple
298    /// times.
299    type Output;
300    /// The messages that need to be exchanged between the instances in the participating nodes.
301    type Message: Message;
302    /// The errors that can occur during execution.
303    type Error: Error;
304    /// The kinds of message faults that can be detected during execution.
305    type FaultKind: FaultT;
306
307    /// Handles an input provided by the user, and returns
308    fn handle_input<R: Rng>(
309        &mut self,
310        input: Self::Input,
311        rng: &mut R,
312    ) -> Result<CpStep<Self>, Self::Error>
313    where
314        Self: Sized;
315
316    /// Handles a message received from node `sender_id`.
317    fn handle_message<R: Rng>(
318        &mut self,
319        sender_id: &Self::NodeId,
320        message: Self::Message,
321        rng: &mut R,
322    ) -> Result<CpStep<Self>, Self::Error>
323    where
324        Self: Sized;
325
326    /// Returns `true` if execution has completed and this instance can be dropped.
327    fn terminated(&self) -> bool;
328
329    /// Returns this node's own ID.
330    fn our_id(&self) -> &Self::NodeId;
331}