hbbft/subset/
subset.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::{fmt, result};
4
5use derivative::Derivative;
6use hex_fmt::HexFmt;
7use log::debug;
8use serde::Serialize;
9
10use super::proposal_state::{ProposalState, Step as ProposalStep};
11use super::{Error, FaultKind, Message, MessageContent, Result};
12use crate::{util, ConsensusProtocol, NetworkInfo, NodeIdT, SessionIdT};
13use rand::Rng;
14
15/// A `Subset` step, possibly containing several outputs.
16pub type Step<N> = crate::Step<Message<N>, SubsetOutput<N>, N, FaultKind>;
17
18/// An output with an accepted contribution or the end of the set.
19#[derive(Derivative, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
20#[derivative(Debug)]
21pub enum SubsetOutput<N> {
22    /// A contribution was accepted into the set.
23    Contribution(
24        N,
25        #[derivative(Debug(format_with = "util::fmt_hex"))] Vec<u8>,
26    ),
27    /// The set is complete.
28    Done,
29}
30
31/// Subset algorithm instance
32#[derive(Debug)]
33pub struct Subset<N, S> {
34    /// Shared network information.
35    netinfo: Arc<NetworkInfo<N>>,
36    /// The session identifier.
37    session_id: S,
38    /// A map that assigns to each validator the progress of their contribution.
39    proposal_states: BTreeMap<N, ProposalState<N, S>>,
40    /// Whether the instance has decided on a value.
41    decided: bool,
42}
43
44impl<N: NodeIdT, S: SessionIdT> ConsensusProtocol for Subset<N, S> {
45    type NodeId = N;
46    type Input = Vec<u8>;
47    type Output = SubsetOutput<N>;
48    type Message = Message<N>;
49    type Error = Error;
50    type FaultKind = FaultKind;
51
52    fn handle_input<R: Rng>(&mut self, input: Self::Input, _rng: &mut R) -> Result<Step<N>> {
53        self.propose(input)
54    }
55
56    fn handle_message<R: Rng>(
57        &mut self,
58        sender_id: &N,
59        message: Message<N>,
60        _rng: &mut R,
61    ) -> Result<Step<N>> {
62        self.handle_message(sender_id, message)
63    }
64
65    fn terminated(&self) -> bool {
66        self.decided
67    }
68
69    fn our_id(&self) -> &Self::NodeId {
70        self.netinfo.our_id()
71    }
72}
73
74impl<N: NodeIdT, S: SessionIdT> Subset<N, S> {
75    /// Creates a new `Subset` instance with the given session identifier.
76    ///
77    /// If multiple `Subset`s are instantiated within a single network, they must use different
78    /// session identifiers to foil replay attacks.
79    pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: S) -> Result<Self> {
80        let mut proposal_states = BTreeMap::new();
81        for (proposer_idx, proposer_id) in netinfo.all_ids().enumerate() {
82            let ba_id = BaSessionId {
83                subset_id: session_id.clone(),
84                proposer_idx: proposer_idx as u32,
85            };
86            proposal_states.insert(
87                proposer_id.clone(),
88                ProposalState::new(netinfo.clone(), ba_id, proposer_id.clone())?,
89            );
90        }
91
92        Ok(Subset {
93            netinfo,
94            session_id,
95            proposal_states,
96            decided: false,
97        })
98    }
99
100    /// Proposes a value for the subset.
101    ///
102    /// Returns an error if we already made a proposal.
103    pub fn propose(&mut self, value: Vec<u8>) -> Result<Step<N>> {
104        if !self.netinfo.is_validator() {
105            return Ok(Step::default());
106        }
107        debug!("{} proposing {:0.10}", self, HexFmt(&value));
108        let prop_step = self
109            .proposal_states
110            .get_mut(self.netinfo.our_id())
111            .ok_or(Error::UnknownProposer)?
112            .propose(value)?;
113        let step = Self::convert_step(self.netinfo.our_id(), prop_step);
114        Ok(step.join(self.try_output()?))
115    }
116
117    /// Handles a message received from `sender_id`.
118    ///
119    /// This must be called with every message we receive from another node.
120    pub fn handle_message(&mut self, sender_id: &N, msg: Message<N>) -> Result<Step<N>> {
121        let prop_step = self
122            .proposal_states
123            .get_mut(&msg.proposer_id)
124            .ok_or(Error::UnknownProposer)?
125            .handle_message(sender_id, msg.content)?;
126        let step = Self::convert_step(&msg.proposer_id, prop_step);
127        Ok(step.join(self.try_output()?))
128    }
129
130    /// Returns the number of validators from which we have already received a proposal.
131    pub fn received_proposals(&self) -> usize {
132        let received = |state: &&ProposalState<N, S>| state.received();
133        self.proposal_states.values().filter(received).count()
134    }
135
136    fn convert_step(proposer_id: &N, prop_step: ProposalStep<N>) -> Step<N> {
137        let from_p_msg = |p_msg: MessageContent| p_msg.with(proposer_id.clone());
138        let mut step = Step::default();
139        if let Some(value) = step.extend_with(prop_step, |fault| fault, from_p_msg).pop() {
140            let contribution = SubsetOutput::Contribution(proposer_id.clone(), value);
141            step.output.push(contribution);
142        }
143        step
144    }
145
146    /// Returns the number of Binary Agreement instances that have decided "yes".
147    fn count_accepted(&self) -> usize {
148        let accepted = |state: &&ProposalState<N, S>| state.accepted();
149        self.proposal_states.values().filter(accepted).count()
150    }
151
152    /// Checks the voting and termination conditions: If enough proposals have been accepted, votes
153    /// "no" for the remaining ones. If all proposals have been decided, outputs `Done`.
154    fn try_output(&mut self) -> Result<Step<N>> {
155        if self.decided || self.count_accepted() < self.netinfo.num_correct() {
156            return Ok(Step::default());
157        }
158        let mut step = Step::default();
159        if self.count_accepted() == self.netinfo.num_correct() {
160            for (proposer_id, state) in &mut self.proposal_states {
161                step.extend(Self::convert_step(proposer_id, state.vote_false()?));
162            }
163        }
164        if self.proposal_states.values().all(ProposalState::complete) {
165            self.decided = true;
166            step.output.push(SubsetOutput::Done);
167        }
168        Ok(step)
169    }
170}
171
172impl<N: NodeIdT, S: SessionIdT> fmt::Display for Subset<N, S> {
173    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> result::Result<(), fmt::Error> {
174        write!(f, "{:?} Subset({})", self.our_id(), self.session_id)
175    }
176}
177
178/// A session identifier for a `BinaryAgreement` instance run as a `Subset` sub-algorithm. It
179/// consists of the `Subset` instance's own session ID, and the index of the proposer whose
180/// contribution this `BinaryAgreement` is about.
181#[derive(Clone, Debug, Serialize)]
182pub struct BaSessionId<S> {
183    subset_id: S,
184    proposer_idx: u32,
185}
186
187impl<S: fmt::Display> fmt::Display for BaSessionId<S> {
188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> result::Result<(), fmt::Error> {
189        write!(
190            f,
191            "subset {}, proposer #{}",
192            self.subset_id, self.proposer_idx
193        )
194    }
195}