1use std::{collections::HashMap, fmt::Debug, marker::PhantomData, num::NonZeroU64, time::Duration};
2
3use abcperf::{
4 atomic_broadcast::{AtomicBroadcast, AtomicBroadcastChannels, AtomicBroadcastConfiguration},
5 MessageType,
6};
7use anyhow::{Error, Result};
8use minbft::{timeout::TimeoutType, Config as MinBftConfig, MinBft, PeerMessage, RequestPayload};
9use output_handler::OutputHandler;
10use serde::{Deserialize, Serialize};
11use shared_ids::{ClientId, ReplicaId, RequestId};
12use timeout_handler::TimeoutHandler;
13use tokio::{select, sync::mpsc, task::JoinHandle};
14use tracing::Instrument;
15use usig::Usig;
16
17mod output_handler;
18mod timeout_handler;
19
20pub struct ABCperfMinbft<P, U> {
21 phantom_data: PhantomData<P>,
22 usig: U,
23}
24
25#[derive(Debug, Deserialize, Serialize, Clone)]
26pub struct ConfigurationExtension {
27 batch_timeout: f64,
28 max_batch_size: Option<NonZeroU64>,
29 backoff_multiplier: f64,
30 initial_timeout_duration: f64,
31 checkpoint_period: NonZeroU64,
32}
33
34impl From<ConfigurationExtension> for HashMap<String, String> {
35 fn from(config: ConfigurationExtension) -> Self {
36 let ConfigurationExtension {
37 batch_timeout,
38 max_batch_size,
39 backoff_multiplier,
40 initial_timeout_duration,
41 checkpoint_period,
42 } = config;
43
44 let mut map = HashMap::new();
45
46 map.insert("batch_timeout".to_owned(), batch_timeout.to_string());
47 if let Some(max_batch_size) = max_batch_size {
48 map.insert("max_batch_size".to_owned(), max_batch_size.to_string());
49 }
50 map.insert(
51 "backoff_multiplier".to_owned(),
52 backoff_multiplier.to_string(),
53 );
54 map.insert(
55 "initial_timeout_duration".to_owned(),
56 initial_timeout_duration.to_string(),
57 );
58 map.insert(
59 "checkpoint_period".to_owned(),
60 checkpoint_period.to_string(),
61 );
62
63 map
64 }
65}
66
67impl<P, U> ABCperfMinbft<P, U> {
68 pub fn new(usig: U) -> Self {
69 Self {
70 usig,
71 phantom_data: PhantomData::default(),
72 }
73 }
74}
75
76impl<
77 P: RequestPayload + 'static + Unpin + Send + Sync + AsRef<RequestId>,
78 U: Usig + 'static + Send,
79 > AtomicBroadcast for ABCperfMinbft<P, U>
80where
81 U::Attestation: Serialize + for<'a> Deserialize<'a> + Debug + Unpin + Send + Clone + Sync,
82 U::Signature: Debug + Serialize + for<'a> Deserialize<'a> + Unpin + Send + Clone + Sync,
83{
84 type Config = ConfigurationExtension;
85 type ReplicaMessage = PeerMessage<U::Attestation, P, U::Signature>;
86 type Transaction = P;
87 type Decision = (ClientId, P);
88
89 fn start(
90 self,
91 config: AtomicBroadcastConfiguration<Self::Config>,
92 channels: AtomicBroadcastChannels<Self::ReplicaMessage, Self::Transaction, Self::Decision>,
93 ready_for_clients: impl Send + 'static + FnOnce(),
94 ) -> JoinHandle<Result<(), Error>> {
95 let config = MinBftConfig {
96 id: config.replica_id,
97 n: config.n,
98 t: config.t,
99 max_batch_size: config
100 .extension
101 .max_batch_size
102 .map(|n| n.try_into().expect("number should not get that big")),
103 batch_timeout: Duration::from_secs_f64(config.extension.batch_timeout),
104 initial_timeout_duration: Duration::from_secs_f64(
105 config.extension.initial_timeout_duration,
106 ),
107 checkpoint_period: config.extension.checkpoint_period,
108 };
109 let (minbft, initial_output) = MinBft::new(self.usig, config).unwrap();
110
111 let AtomicBroadcastChannels {
112 incoming_replica_messages,
113 outgoing_replica_messages,
114 requests,
115 responses,
116 } = channels;
117
118 tokio::spawn(
119 async move {
120 let (timeout_handler_task, set_timeout, timeouts) = TimeoutHandler::start();
121
122 let (mut output_handler, hello_done_recv) =
123 OutputHandler::new(outgoing_replica_messages, responses, set_timeout);
124
125 output_handler.handle_output(initial_output).await.unwrap();
126
127 let minbft_task = tokio::spawn(
128 run_minbft(
129 minbft,
130 incoming_replica_messages,
131 requests,
132 timeouts,
133 output_handler,
134 )
135 .in_current_span(),
136 );
137
138 hello_done_recv
139 .await
140 .expect("hello done channel sender should not be dropped");
141 ready_for_clients();
142
143 minbft_task.await.unwrap().unwrap();
144 timeout_handler_task.await.unwrap().unwrap();
145 Ok(())
146 }
147 .in_current_span(),
148 )
149 }
150}
151
152type IncomingPeerMessageChannel<A, P, S> =
153 mpsc::Receiver<(MessageType, ReplicaId, PeerMessage<A, P, S>)>;
154
155async fn run_minbft<P: Send + Sync + RequestPayload + 'static, U: Usig>(
156 mut minbft: MinBft<P, U>,
157 mut replica_messages: IncomingPeerMessageChannel<U::Attestation, P, U::Signature>,
158 mut client_messages: mpsc::Receiver<(ClientId, P)>,
159 mut timeouts: mpsc::Receiver<TimeoutType>,
160 mut output_handler: OutputHandler<P, U>,
161) -> Result<()>
162where
163 U::Attestation: Send + Sync + Clone + Debug + 'static,
164 U::Signature: Send + Sync + Clone + Serialize + Debug + 'static,
165{
166 let mut replica_open = true;
167 let mut client_open = true;
168
169 loop {
170 let output = select! {
171 msg = replica_messages.recv(), if replica_open => {
172 if let Some((_msg_type, replica, message)) = msg {
173 minbft.handle_peer_message(replica, message)
174 } else {
175 replica_open = false;
176 continue
177 }
178 }
179 msg = client_messages.recv(), if output_handler.is_hello_done() && client_open => {
180 if let Some((client_id, request)) = msg {
181 minbft.handle_client_message(client_id, request)
182 } else {
183 client_open = false;
184 continue
185 }
186 }
187 Some(timeout_type) = timeouts.recv(), if replica_open || client_open => {
188 minbft.handle_timeout(timeout_type)
189 }
190 else => break
191 };
192
193 output_handler.handle_output(output).await?;
194 }
195
196 Ok(())
197}
198
199#[cfg(test)]
200mod tests {
201 use shared_ids::{AnyId, RequestId};
202
203 use super::*;
204
205 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
206 struct DummyPayload {}
207
208 impl RequestPayload for DummyPayload {
209 fn id(&self) -> RequestId {
210 *self.as_ref()
211 }
212
213 fn verify(&self, _id: ClientId) -> Result<()> {
214 Ok(())
215 }
216 }
217
218 impl AsRef<RequestId> for DummyPayload {
219 fn as_ref(&self) -> &RequestId {
220 &RequestId::FIRST
221 }
222 }
223
224 #[tokio::test]
225 async fn exit_replica_first() {
226 let abcperf_minbft = ABCperfMinbft::<DummyPayload, _>::new(usig::noop::UsigNoOp::default());
227
228 let (from_replica_send, from_replica_recv) = mpsc::channel(1000);
229 let (to_replica_send, _to_replica_recv) = mpsc::channel(1000);
230 let (from_client_send, from_client_recv) = mpsc::channel(1000);
231 let (to_client_send, _to_client_recv) = mpsc::channel(1000);
232
233 let task = abcperf_minbft.start(
234 AtomicBroadcastConfiguration {
235 replica_id: ReplicaId::from_u64(0),
236 extension: ConfigurationExtension {
237 batch_timeout: 1_000_000f64,
238 max_batch_size: None,
239 backoff_multiplier: 1f64,
240 initial_timeout_duration: 1_000_000f64,
241 checkpoint_period: NonZeroU64::new(2).unwrap(),
242 },
243 n: 1.try_into().expect("> 0"),
244 t: 0,
245 },
246 AtomicBroadcastChannels {
247 incoming_replica_messages: from_replica_recv,
248 outgoing_replica_messages: to_replica_send,
249 requests: from_client_recv,
250 responses: to_client_send,
251 },
252 || {},
253 );
254
255 tokio::time::sleep(Duration::from_secs(1)).await;
256 assert!(!task.is_finished());
257
258 drop(from_replica_send);
259 tokio::time::sleep(Duration::from_secs(1)).await;
260 assert!(!task.is_finished());
261
262 drop(from_client_send);
263 tokio::time::sleep(Duration::from_secs(1)).await;
264
265 assert!(task.is_finished());
266
267 task.await.unwrap().unwrap();
268 }
269
270 #[tokio::test]
271 async fn exit_client_first() {
272 let abcperf_minbft = ABCperfMinbft::<DummyPayload, _>::new(usig::noop::UsigNoOp::default());
273
274 let (from_replica_send, from_replica_recv) = mpsc::channel(1000);
275 let (to_replica_send, _to_replica_recv) = mpsc::channel(1000);
276 let (from_client_send, from_client_recv) = mpsc::channel(1000);
277 let (to_client_send, _to_client_recv) = mpsc::channel(1000);
278
279 let task = abcperf_minbft.start(
280 AtomicBroadcastConfiguration {
281 replica_id: ReplicaId::from_u64(0),
282 extension: ConfigurationExtension {
283 batch_timeout: 1_000_000f64,
284 max_batch_size: None,
285 backoff_multiplier: 1f64,
286 initial_timeout_duration: 1_000_000f64,
287 checkpoint_period: NonZeroU64::new(2).unwrap(),
288 },
289 n: 1.try_into().expect("> 0"),
290 t: 0,
291 },
292 AtomicBroadcastChannels {
293 incoming_replica_messages: from_replica_recv,
294 outgoing_replica_messages: to_replica_send,
295 requests: from_client_recv,
296 responses: to_client_send,
297 },
298 || {},
299 );
300
301 tokio::time::sleep(Duration::from_secs(1)).await;
302 assert!(!task.is_finished());
303
304 drop(from_client_send);
305 tokio::time::sleep(Duration::from_secs(1)).await;
306 assert!(!task.is_finished());
307
308 drop(from_replica_send);
309 tokio::time::sleep(Duration::from_secs(1)).await;
310
311 assert!(task.is_finished());
312
313 task.await.unwrap().unwrap();
314 }
315}