1use std::collections::BTreeMap;
4use std::error::Error;
5use std::fmt::{Debug, Display};
6use std::hash::Hash;
7use std::iter::once;
8
9use rand::Rng;
10use serde::{de::DeserializeOwned, Serialize};
11
12use crate::fault_log::{Fault, FaultLog};
13use crate::sender_queue::SenderQueueableMessage;
14use crate::{Target, TargetedMessage};
15
16pub trait Contribution: Eq + Debug + Hash + Send + Sync {}
18impl<C> Contribution for C where C: Eq + Debug + Hash + Send + Sync {}
19
20pub trait NodeIdT: Eq + Ord + Clone + Debug + Hash + Send + Sync {}
22impl<N> NodeIdT for N where N: Eq + Ord + Clone + Debug + Hash + Send + Sync {}
23
24pub trait FaultT: Clone + Debug + Error + PartialEq {}
26impl<N> FaultT for N where N: Clone + Debug + Error + PartialEq {}
27
28pub trait Message: Debug + Send + Sync {}
30impl<M> Message for M where M: Debug + Send + Sync {}
31
32pub trait SessionIdT: Display + Serialize + Send + Sync + Clone + Debug {}
34impl<S> SessionIdT for S where S: Display + Serialize + Send + Sync + Clone + Debug {}
35
36pub trait EpochT: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {}
38impl<E> EpochT for E where E: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {}
39
40#[must_use = "The algorithm step result must be used."]
63#[derive(Debug)]
64pub struct Step<M, O, N, F: Error> {
65 pub output: Vec<O>,
68 pub fault_log: FaultLog<N, F>,
71 pub messages: Vec<TargetedMessage<M, N>>,
74}
75
76impl<M, O, N, F> Default for Step<M, O, N, F>
77where
78 F: Error,
79{
80 fn default() -> Self {
81 Step {
82 output: Vec::default(),
83 fault_log: FaultLog::default(),
84 messages: Vec::default(),
85 }
86 }
87}
88
89impl<M, O, N, F> Step<M, O, N, F>
90where
91 F: Error,
92{
93 pub fn with_output<T: Into<Option<O>>>(mut self, output: T) -> Self {
95 self.output.extend(output.into());
96 self
97 }
98
99 pub fn map<M2, O2, F2, FO, FF, FM>(
102 self,
103 f_out: FO,
104 f_fail: FF,
105 mut f_msg: FM,
106 ) -> Step<M2, O2, N, F2>
107 where
108 F2: Error,
109 FO: FnMut(O) -> O2,
110 FF: FnMut(F) -> F2,
111 FM: FnMut(M) -> M2,
112 {
113 Step {
114 output: self.output.into_iter().map(f_out).collect(),
115 fault_log: self.fault_log.map(f_fail),
116 messages: self
117 .messages
118 .into_iter()
119 .map(|tm| tm.map(&mut f_msg))
120 .collect(),
121 }
122 }
123
124 #[must_use]
126 pub fn extend_with<M2, O2, F2, FF, FM>(
127 &mut self,
128 other: Step<M2, O2, N, F2>,
129 f_fail: FF,
130 mut f_msg: FM,
131 ) -> Vec<O2>
132 where
133 F2: Error,
134 FF: FnMut(F2) -> F,
135 FM: FnMut(M2) -> M,
136 {
137 let fails = other.fault_log.map(f_fail);
138 self.fault_log.extend(fails);
139 let msgs = other.messages.into_iter().map(|tm| tm.map(&mut f_msg));
140 self.messages.extend(msgs);
141 other.output
142 }
143
144 pub fn extend(&mut self, other: Self) {
146 self.output.extend(other.output);
147 self.fault_log.extend(other.fault_log);
148 self.messages.extend(other.messages);
149 }
150
151 pub fn join(mut self, other: Self) -> Self {
153 self.extend(other);
154 self
155 }
156
157 pub fn is_empty(&self) -> bool {
159 self.output.is_empty() && self.fault_log.is_empty() && self.messages.is_empty()
160 }
161}
162
163impl<M, O, N, F> From<FaultLog<N, F>> for Step<M, O, N, F>
164where
165 F: Error,
166{
167 fn from(fault_log: FaultLog<N, F>) -> Self {
168 Step {
169 fault_log,
170 ..Step::default()
171 }
172 }
173}
174
175impl<M, O, N, F> From<Fault<N, F>> for Step<M, O, N, F>
176where
177 F: Error,
178{
179 fn from(fault: Fault<N, F>) -> Self {
180 Step {
181 fault_log: fault.into(),
182 ..Step::default()
183 }
184 }
185}
186
187impl<M, O, N, F> From<TargetedMessage<M, N>> for Step<M, O, N, F>
188where
189 F: Error,
190{
191 fn from(msg: TargetedMessage<M, N>) -> Self {
192 Step {
193 messages: once(msg).collect(),
194 ..Step::default()
195 }
196 }
197}
198
199impl<I, M, O, N, F> From<I> for Step<M, O, N, F>
200where
201 I: IntoIterator<Item = TargetedMessage<M, N>>,
202 F: Error,
203{
204 fn from(msgs: I) -> Self {
205 Step {
206 messages: msgs.into_iter().collect(),
207 ..Step::default()
208 }
209 }
210}
211
212pub trait Epoched {
216 type Epoch: EpochT;
218
219 fn epoch(&self) -> Self::Epoch;
221}
222
223pub type CpStep<D> = Step<
225 <D as ConsensusProtocol>::Message,
226 <D as ConsensusProtocol>::Output,
227 <D as ConsensusProtocol>::NodeId,
228 <D as ConsensusProtocol>::FaultKind,
229>;
230
231impl<'i, M, O, N, F> Step<M, O, N, F>
232where
233 N: NodeIdT,
234 M: 'i + Clone + SenderQueueableMessage,
235 F: Error,
236{
237 pub fn defer_messages(
241 &mut self,
242 peer_epochs: &BTreeMap<N, M::Epoch>,
243 max_future_epochs: u64,
244 ) -> Vec<(N, M)> {
245 let mut deferred_msgs: Vec<(N, M)> = Vec::new();
246 let mut passed_msgs: Vec<_> = Vec::new();
247 for msg in self.messages.drain(..) {
248 match msg.target.clone() {
249 Target::Nodes(mut ids) => {
250 let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
251 let is_obsolete = |&them| msg.message.is_obsolete(them);
252 for (id, them) in peer_epochs {
253 if ids.contains(id) {
254 if is_premature(them) {
255 deferred_msgs.push((id.clone(), msg.message.clone()));
256 ids.remove(id);
257 } else if is_obsolete(them) {
258 ids.remove(id);
259 }
260 }
261 }
262 if !ids.is_empty() {
263 passed_msgs.push(Target::Nodes(ids).message(msg.message));
264 }
265 }
266 Target::AllExcept(mut exclude) => {
267 let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
268 let is_obsolete = |&them| msg.message.is_obsolete(them);
269 for (id, them) in peer_epochs {
270 if !exclude.contains(id) {
271 if is_premature(them) {
272 deferred_msgs.push((id.clone(), msg.message.clone()));
273 exclude.insert(id.clone());
274 } else if is_obsolete(them) {
275 exclude.insert(id.clone());
276 }
277 }
278 }
279 passed_msgs.push(Target::AllExcept(exclude).message(msg.message));
280 }
281 }
282 }
283 self.messages.extend(passed_msgs);
284 deferred_msgs
285 }
286}
287
288pub trait ConsensusProtocol: Send + Sync {
293 type NodeId: NodeIdT;
295 type Input;
297 type Output;
300 type Message: Message;
302 type Error: Error;
304 type FaultKind: FaultT;
306
307 fn handle_input<R: Rng>(
309 &mut self,
310 input: Self::Input,
311 rng: &mut R,
312 ) -> Result<CpStep<Self>, Self::Error>
313 where
314 Self: Sized;
315
316 fn handle_message<R: Rng>(
318 &mut self,
319 sender_id: &Self::NodeId,
320 message: Self::Message,
321 rng: &mut R,
322 ) -> Result<CpStep<Self>, Self::Error>
323 where
324 Self: Sized;
325
326 fn terminated(&self) -> bool;
328
329 fn our_id(&self) -> &Self::NodeId;
331}