abcperf/
atomic_broadcast.rs

1use std::{collections::HashMap, fmt::Debug, num::NonZeroU64};
2
3use anyhow::Error;
4use serde::{Deserialize, Serialize};
5use shared_ids::{ClientId, IdIter};
6use tokio::sync::mpsc;
7use tokio::task::JoinHandle;
8use trait_alias_macro::pub_trait_alias_macro;
9
10use crate::{MessageDestination, MessageType, ReplicaId};
11
12/// Used to integrate an Atomic Broadcast into abcperf
13pub trait AtomicBroadcast: 'static {
14    /// Custom configuration for the algorithm
15    type Config: ConfigurationExtension;
16
17    /// Messages the algorithm sends between replicas
18    type ReplicaMessage: ReplicaMessage;
19
20    /// Requests for the algorithm
21    type Transaction: Transaction;
22
23    /// Responses by the algorithm
24    type Decision: Decision;
25
26    /// Start the atomic broadcast
27    fn start(
28        self,
29        config: AtomicBroadcastConfiguration<Self::Config>,
30        channels: AtomicBroadcastChannels<Self::ReplicaMessage, Self::Transaction, Self::Decision>,
31        ready_for_clients: impl Send + 'static + FnOnce() + Sync,
32    ) -> JoinHandle<Result<(), Error>>;
33}
34
35pub_trait_alias_macro!(ConfigurationExtension = for<'a> Deserialize<'a> + Serialize + Send + Clone + Debug + 'static + Sync + Into<HashMap<String, String>>);
36pub_trait_alias_macro!(ReplicaMessage = for<'a> Deserialize<'a> + Serialize + Debug + Send + Unpin);
37pub_trait_alias_macro!(Transaction = for<'a> Deserialize<'a> + Serialize + Debug + Send + Sync + 'static);
38pub_trait_alias_macro!(Decision = for<'a> Deserialize<'a> + Serialize + Debug + Send + Sync + 'static + Clone);
39
40pub struct AtomicBroadcastConfiguration<A: ConfigurationExtension> {
41    pub replica_id: ReplicaId,
42    pub n: NonZeroU64,
43    pub t: u64,
44    pub extension: A,
45}
46
47impl<A: ConfigurationExtension> AtomicBroadcastConfiguration<A> {
48    pub fn replicas(&self) -> impl Iterator<Item = ReplicaId> {
49        IdIter::default().take(self.n.get().try_into().unwrap())
50    }
51}
52
53pub struct AtomicBroadcastChannels<RM: ReplicaMessage, Req: Transaction, Resp: Decision> {
54    pub incoming_replica_messages: mpsc::Receiver<(MessageType, ReplicaId, RM)>,
55    pub outgoing_replica_messages: mpsc::Sender<(MessageDestination, RM)>,
56    pub requests: mpsc::Receiver<(ClientId, Req)>,
57    pub responses: mpsc::Sender<Resp>,
58}