abcperf/
atomic_broadcast.rs1use std::{collections::HashMap, fmt::Debug, num::NonZeroU64};
2
3use anyhow::Error;
4use serde::{Deserialize, Serialize};
5use shared_ids::{ClientId, IdIter};
6use tokio::sync::mpsc;
7use tokio::task::JoinHandle;
8use trait_alias_macro::pub_trait_alias_macro;
9
10use crate::{MessageDestination, MessageType, ReplicaId};
11
12pub trait AtomicBroadcast: 'static {
14 type Config: ConfigurationExtension;
16
17 type ReplicaMessage: ReplicaMessage;
19
20 type Transaction: Transaction;
22
23 type Decision: Decision;
25
26 fn start(
28 self,
29 config: AtomicBroadcastConfiguration<Self::Config>,
30 channels: AtomicBroadcastChannels<Self::ReplicaMessage, Self::Transaction, Self::Decision>,
31 ready_for_clients: impl Send + 'static + FnOnce() + Sync,
32 ) -> JoinHandle<Result<(), Error>>;
33}
34
35pub_trait_alias_macro!(ConfigurationExtension = for<'a> Deserialize<'a> + Serialize + Send + Clone + Debug + 'static + Sync + Into<HashMap<String, String>>);
36pub_trait_alias_macro!(ReplicaMessage = for<'a> Deserialize<'a> + Serialize + Debug + Send + Unpin);
37pub_trait_alias_macro!(Transaction = for<'a> Deserialize<'a> + Serialize + Debug + Send + Sync + 'static);
38pub_trait_alias_macro!(Decision = for<'a> Deserialize<'a> + Serialize + Debug + Send + Sync + 'static + Clone);
39
40pub struct AtomicBroadcastConfiguration<A: ConfigurationExtension> {
41 pub replica_id: ReplicaId,
42 pub n: NonZeroU64,
43 pub t: u64,
44 pub extension: A,
45}
46
47impl<A: ConfigurationExtension> AtomicBroadcastConfiguration<A> {
48 pub fn replicas(&self) -> impl Iterator<Item = ReplicaId> {
49 IdIter::default().take(self.n.get().try_into().unwrap())
50 }
51}
52
53pub struct AtomicBroadcastChannels<RM: ReplicaMessage, Req: Transaction, Resp: Decision> {
54 pub incoming_replica_messages: mpsc::Receiver<(MessageType, ReplicaId, RM)>,
55 pub outgoing_replica_messages: mpsc::Sender<(MessageDestination, RM)>,
56 pub requests: mpsc::Receiver<(ClientId, Req)>,
57 pub responses: mpsc::Sender<Resp>,
58}