abcperf_minbft/
lib.rs

1use std::{collections::HashMap, fmt::Debug, marker::PhantomData, num::NonZeroU64, time::Duration};
2
3use abcperf::{
4    atomic_broadcast::{AtomicBroadcast, AtomicBroadcastChannels, AtomicBroadcastConfiguration},
5    MessageType,
6};
7use anyhow::{Error, Result};
8use minbft::{timeout::TimeoutType, Config as MinBftConfig, MinBft, PeerMessage, RequestPayload};
9use output_handler::OutputHandler;
10use serde::{Deserialize, Serialize};
11use shared_ids::{ClientId, ReplicaId, RequestId};
12use timeout_handler::TimeoutHandler;
13use tokio::{select, sync::mpsc, task::JoinHandle};
14use tracing::Instrument;
15use usig::Usig;
16
17mod output_handler;
18mod timeout_handler;
19
20pub struct ABCperfMinbft<P, U> {
21    phantom_data: PhantomData<P>,
22    usig: U,
23}
24
25#[derive(Debug, Deserialize, Serialize, Clone)]
26pub struct ConfigurationExtension {
27    batch_timeout: f64,
28    max_batch_size: Option<NonZeroU64>,
29    backoff_multiplier: f64,
30    initial_timeout_duration: f64,
31    checkpoint_period: NonZeroU64,
32}
33
34impl From<ConfigurationExtension> for HashMap<String, String> {
35    fn from(config: ConfigurationExtension) -> Self {
36        let ConfigurationExtension {
37            batch_timeout,
38            max_batch_size,
39            backoff_multiplier,
40            initial_timeout_duration,
41            checkpoint_period,
42        } = config;
43
44        let mut map = HashMap::new();
45
46        map.insert("batch_timeout".to_owned(), batch_timeout.to_string());
47        if let Some(max_batch_size) = max_batch_size {
48            map.insert("max_batch_size".to_owned(), max_batch_size.to_string());
49        }
50        map.insert(
51            "backoff_multiplier".to_owned(),
52            backoff_multiplier.to_string(),
53        );
54        map.insert(
55            "initial_timeout_duration".to_owned(),
56            initial_timeout_duration.to_string(),
57        );
58        map.insert(
59            "checkpoint_period".to_owned(),
60            checkpoint_period.to_string(),
61        );
62
63        map
64    }
65}
66
67impl<P, U> ABCperfMinbft<P, U> {
68    pub fn new(usig: U) -> Self {
69        Self {
70            usig,
71            phantom_data: PhantomData::default(),
72        }
73    }
74}
75
76impl<
77        P: RequestPayload + 'static + Unpin + Send + Sync + AsRef<RequestId>,
78        U: Usig + 'static + Send,
79    > AtomicBroadcast for ABCperfMinbft<P, U>
80where
81    U::Attestation: Serialize + for<'a> Deserialize<'a> + Debug + Unpin + Send + Clone + Sync,
82    U::Signature: Debug + Serialize + for<'a> Deserialize<'a> + Unpin + Send + Clone + Sync,
83{
84    type Config = ConfigurationExtension;
85    type ReplicaMessage = PeerMessage<U::Attestation, P, U::Signature>;
86    type Transaction = P;
87    type Decision = (ClientId, P);
88
89    fn start(
90        self,
91        config: AtomicBroadcastConfiguration<Self::Config>,
92        channels: AtomicBroadcastChannels<Self::ReplicaMessage, Self::Transaction, Self::Decision>,
93        ready_for_clients: impl Send + 'static + FnOnce(),
94    ) -> JoinHandle<Result<(), Error>> {
95        let config = MinBftConfig {
96            id: config.replica_id,
97            n: config.n,
98            t: config.t,
99            max_batch_size: config
100                .extension
101                .max_batch_size
102                .map(|n| n.try_into().expect("number should not get that big")),
103            batch_timeout: Duration::from_secs_f64(config.extension.batch_timeout),
104            initial_timeout_duration: Duration::from_secs_f64(
105                config.extension.initial_timeout_duration,
106            ),
107            checkpoint_period: config.extension.checkpoint_period,
108        };
109        let (minbft, initial_output) = MinBft::new(self.usig, config).unwrap();
110
111        let AtomicBroadcastChannels {
112            incoming_replica_messages,
113            outgoing_replica_messages,
114            requests,
115            responses,
116        } = channels;
117
118        tokio::spawn(
119            async move {
120                let (timeout_handler_task, set_timeout, timeouts) = TimeoutHandler::start();
121
122                let (mut output_handler, hello_done_recv) =
123                    OutputHandler::new(outgoing_replica_messages, responses, set_timeout);
124
125                output_handler.handle_output(initial_output).await.unwrap();
126
127                let minbft_task = tokio::spawn(
128                    run_minbft(
129                        minbft,
130                        incoming_replica_messages,
131                        requests,
132                        timeouts,
133                        output_handler,
134                    )
135                    .in_current_span(),
136                );
137
138                hello_done_recv
139                    .await
140                    .expect("hello done channel sender should not be dropped");
141                ready_for_clients();
142
143                minbft_task.await.unwrap().unwrap();
144                timeout_handler_task.await.unwrap().unwrap();
145                Ok(())
146            }
147            .in_current_span(),
148        )
149    }
150}
151
152type IncomingPeerMessageChannel<A, P, S> =
153    mpsc::Receiver<(MessageType, ReplicaId, PeerMessage<A, P, S>)>;
154
155async fn run_minbft<P: Send + Sync + RequestPayload + 'static, U: Usig>(
156    mut minbft: MinBft<P, U>,
157    mut replica_messages: IncomingPeerMessageChannel<U::Attestation, P, U::Signature>,
158    mut client_messages: mpsc::Receiver<(ClientId, P)>,
159    mut timeouts: mpsc::Receiver<TimeoutType>,
160    mut output_handler: OutputHandler<P, U>,
161) -> Result<()>
162where
163    U::Attestation: Send + Sync + Clone + Debug + 'static,
164    U::Signature: Send + Sync + Clone + Serialize + Debug + 'static,
165{
166    let mut replica_open = true;
167    let mut client_open = true;
168
169    loop {
170        let output = select! {
171            msg = replica_messages.recv(), if replica_open => {
172                if let Some((_msg_type, replica, message)) = msg {
173                    minbft.handle_peer_message(replica, message)
174                } else {
175                    replica_open = false;
176                    continue
177                }
178            }
179            msg = client_messages.recv(), if output_handler.is_hello_done() && client_open => {
180                if let Some((client_id, request)) = msg {
181                    minbft.handle_client_message(client_id, request)
182                } else {
183                    client_open = false;
184                    continue
185                }
186            }
187            Some(timeout_type) = timeouts.recv(), if replica_open || client_open => {
188                minbft.handle_timeout(timeout_type)
189            }
190            else => break
191        };
192
193        output_handler.handle_output(output).await?;
194    }
195
196    Ok(())
197}
198
199#[cfg(test)]
200mod tests {
201    use shared_ids::{AnyId, RequestId};
202
203    use super::*;
204
205    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
206    struct DummyPayload {}
207
208    impl RequestPayload for DummyPayload {
209        fn id(&self) -> RequestId {
210            *self.as_ref()
211        }
212
213        fn verify(&self, _id: ClientId) -> Result<()> {
214            Ok(())
215        }
216    }
217
218    impl AsRef<RequestId> for DummyPayload {
219        fn as_ref(&self) -> &RequestId {
220            &RequestId::FIRST
221        }
222    }
223
224    #[tokio::test]
225    async fn exit_replica_first() {
226        let abcperf_minbft = ABCperfMinbft::<DummyPayload, _>::new(usig::noop::UsigNoOp::default());
227
228        let (from_replica_send, from_replica_recv) = mpsc::channel(1000);
229        let (to_replica_send, _to_replica_recv) = mpsc::channel(1000);
230        let (from_client_send, from_client_recv) = mpsc::channel(1000);
231        let (to_client_send, _to_client_recv) = mpsc::channel(1000);
232
233        let task = abcperf_minbft.start(
234            AtomicBroadcastConfiguration {
235                replica_id: ReplicaId::from_u64(0),
236                extension: ConfigurationExtension {
237                    batch_timeout: 1_000_000f64,
238                    max_batch_size: None,
239                    backoff_multiplier: 1f64,
240                    initial_timeout_duration: 1_000_000f64,
241                    checkpoint_period: NonZeroU64::new(2).unwrap(),
242                },
243                n: 1.try_into().expect("> 0"),
244                t: 0,
245            },
246            AtomicBroadcastChannels {
247                incoming_replica_messages: from_replica_recv,
248                outgoing_replica_messages: to_replica_send,
249                requests: from_client_recv,
250                responses: to_client_send,
251            },
252            || {},
253        );
254
255        tokio::time::sleep(Duration::from_secs(1)).await;
256        assert!(!task.is_finished());
257
258        drop(from_replica_send);
259        tokio::time::sleep(Duration::from_secs(1)).await;
260        assert!(!task.is_finished());
261
262        drop(from_client_send);
263        tokio::time::sleep(Duration::from_secs(1)).await;
264
265        assert!(task.is_finished());
266
267        task.await.unwrap().unwrap();
268    }
269
270    #[tokio::test]
271    async fn exit_client_first() {
272        let abcperf_minbft = ABCperfMinbft::<DummyPayload, _>::new(usig::noop::UsigNoOp::default());
273
274        let (from_replica_send, from_replica_recv) = mpsc::channel(1000);
275        let (to_replica_send, _to_replica_recv) = mpsc::channel(1000);
276        let (from_client_send, from_client_recv) = mpsc::channel(1000);
277        let (to_client_send, _to_client_recv) = mpsc::channel(1000);
278
279        let task = abcperf_minbft.start(
280            AtomicBroadcastConfiguration {
281                replica_id: ReplicaId::from_u64(0),
282                extension: ConfigurationExtension {
283                    batch_timeout: 1_000_000f64,
284                    max_batch_size: None,
285                    backoff_multiplier: 1f64,
286                    initial_timeout_duration: 1_000_000f64,
287                    checkpoint_period: NonZeroU64::new(2).unwrap(),
288                },
289                n: 1.try_into().expect("> 0"),
290                t: 0,
291            },
292            AtomicBroadcastChannels {
293                incoming_replica_messages: from_replica_recv,
294                outgoing_replica_messages: to_replica_send,
295                requests: from_client_recv,
296                responses: to_client_send,
297            },
298            || {},
299        );
300
301        tokio::time::sleep(Duration::from_secs(1)).await;
302        assert!(!task.is_finished());
303
304        drop(from_client_send);
305        tokio::time::sleep(Duration::from_secs(1)).await;
306        assert!(!task.is_finished());
307
308        drop(from_replica_send);
309        tokio::time::sleep(Duration::from_secs(1)).await;
310
311        assert!(task.is_finished());
312
313        task.await.unwrap().unwrap();
314    }
315}