1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::{fmt, result};
4
5use derivative::Derivative;
6use hex_fmt::HexFmt;
7use log::debug;
8use serde::Serialize;
9
10use super::proposal_state::{ProposalState, Step as ProposalStep};
11use super::{Error, FaultKind, Message, MessageContent, Result};
12use crate::{util, ConsensusProtocol, NetworkInfo, NodeIdT, SessionIdT};
13use rand::Rng;
14
15pub type Step<N> = crate::Step<Message<N>, SubsetOutput<N>, N, FaultKind>;
17
18#[derive(Derivative, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
20#[derivative(Debug)]
21pub enum SubsetOutput<N> {
22 Contribution(
24 N,
25 #[derivative(Debug(format_with = "util::fmt_hex"))] Vec<u8>,
26 ),
27 Done,
29}
30
31#[derive(Debug)]
33pub struct Subset<N, S> {
34 netinfo: Arc<NetworkInfo<N>>,
36 session_id: S,
38 proposal_states: BTreeMap<N, ProposalState<N, S>>,
40 decided: bool,
42}
43
44impl<N: NodeIdT, S: SessionIdT> ConsensusProtocol for Subset<N, S> {
45 type NodeId = N;
46 type Input = Vec<u8>;
47 type Output = SubsetOutput<N>;
48 type Message = Message<N>;
49 type Error = Error;
50 type FaultKind = FaultKind;
51
52 fn handle_input<R: Rng>(&mut self, input: Self::Input, _rng: &mut R) -> Result<Step<N>> {
53 self.propose(input)
54 }
55
56 fn handle_message<R: Rng>(
57 &mut self,
58 sender_id: &N,
59 message: Message<N>,
60 _rng: &mut R,
61 ) -> Result<Step<N>> {
62 self.handle_message(sender_id, message)
63 }
64
65 fn terminated(&self) -> bool {
66 self.decided
67 }
68
69 fn our_id(&self) -> &Self::NodeId {
70 self.netinfo.our_id()
71 }
72}
73
74impl<N: NodeIdT, S: SessionIdT> Subset<N, S> {
75 pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: S) -> Result<Self> {
80 let mut proposal_states = BTreeMap::new();
81 for (proposer_idx, proposer_id) in netinfo.all_ids().enumerate() {
82 let ba_id = BaSessionId {
83 subset_id: session_id.clone(),
84 proposer_idx: proposer_idx as u32,
85 };
86 proposal_states.insert(
87 proposer_id.clone(),
88 ProposalState::new(netinfo.clone(), ba_id, proposer_id.clone())?,
89 );
90 }
91
92 Ok(Subset {
93 netinfo,
94 session_id,
95 proposal_states,
96 decided: false,
97 })
98 }
99
100 pub fn propose(&mut self, value: Vec<u8>) -> Result<Step<N>> {
104 if !self.netinfo.is_validator() {
105 return Ok(Step::default());
106 }
107 debug!("{} proposing {:0.10}", self, HexFmt(&value));
108 let prop_step = self
109 .proposal_states
110 .get_mut(self.netinfo.our_id())
111 .ok_or(Error::UnknownProposer)?
112 .propose(value)?;
113 let step = Self::convert_step(self.netinfo.our_id(), prop_step);
114 Ok(step.join(self.try_output()?))
115 }
116
117 pub fn handle_message(&mut self, sender_id: &N, msg: Message<N>) -> Result<Step<N>> {
121 let prop_step = self
122 .proposal_states
123 .get_mut(&msg.proposer_id)
124 .ok_or(Error::UnknownProposer)?
125 .handle_message(sender_id, msg.content)?;
126 let step = Self::convert_step(&msg.proposer_id, prop_step);
127 Ok(step.join(self.try_output()?))
128 }
129
130 pub fn received_proposals(&self) -> usize {
132 let received = |state: &&ProposalState<N, S>| state.received();
133 self.proposal_states.values().filter(received).count()
134 }
135
136 fn convert_step(proposer_id: &N, prop_step: ProposalStep<N>) -> Step<N> {
137 let from_p_msg = |p_msg: MessageContent| p_msg.with(proposer_id.clone());
138 let mut step = Step::default();
139 if let Some(value) = step.extend_with(prop_step, |fault| fault, from_p_msg).pop() {
140 let contribution = SubsetOutput::Contribution(proposer_id.clone(), value);
141 step.output.push(contribution);
142 }
143 step
144 }
145
146 fn count_accepted(&self) -> usize {
148 let accepted = |state: &&ProposalState<N, S>| state.accepted();
149 self.proposal_states.values().filter(accepted).count()
150 }
151
152 fn try_output(&mut self) -> Result<Step<N>> {
155 if self.decided || self.count_accepted() < self.netinfo.num_correct() {
156 return Ok(Step::default());
157 }
158 let mut step = Step::default();
159 if self.count_accepted() == self.netinfo.num_correct() {
160 for (proposer_id, state) in &mut self.proposal_states {
161 step.extend(Self::convert_step(proposer_id, state.vote_false()?));
162 }
163 }
164 if self.proposal_states.values().all(ProposalState::complete) {
165 self.decided = true;
166 step.output.push(SubsetOutput::Done);
167 }
168 Ok(step)
169 }
170}
171
172impl<N: NodeIdT, S: SessionIdT> fmt::Display for Subset<N, S> {
173 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> result::Result<(), fmt::Error> {
174 write!(f, "{:?} Subset({})", self.our_id(), self.session_id)
175 }
176}
177
178#[derive(Clone, Debug, Serialize)]
182pub struct BaSessionId<S> {
183 subset_id: S,
184 proposer_idx: u32,
185}
186
187impl<S: fmt::Display> fmt::Display for BaSessionId<S> {
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> result::Result<(), fmt::Error> {
189 write!(
190 f,
191 "subset {}, proposer #{}",
192 self.subset_id, self.proposer_idx
193 )
194 }
195}