abcperf/
lib.rs

1use std::{
2    borrow::Cow,
3    env,
4    fmt::Debug,
5    net::SocketAddr,
6    sync::Arc,
7    time::{Duration, Instant},
8};
9
10use crate::stats::ClientStats;
11use async_trait::async_trait;
12pub use atomic_broadcast::AtomicBroadcast;
13pub use atomic_broadcast::AtomicBroadcastChannels;
14pub use atomic_broadcast::AtomicBroadcastConfiguration;
15use atomic_broadcast::{Decision, Transaction};
16use config::{ClientConfig, ClientConfigExt};
17use futures::Future;
18use ip_family::IpFamilyExt;
19use serde::{Deserialize, Serialize};
20use shared_ids::{ClientId, ReplicaId};
21use tokio::{
22    sync::{mpsc, oneshot},
23    task::JoinHandle,
24};
25
26use clap::{Args, Parser, Subcommand};
27
28use anyhow::Result;
29
30use quinn::{
31    ClientConfig as QuinnClientConfig, Connection, Endpoint, ServerConfig, TransportConfig, VarInt,
32};
33use rustls::{Certificate, PrivateKey, RootCertStore};
34use tracing::Level;
35use tracing_subscriber::FmtSubscriber;
36use trait_alias_macro::pub_trait_alias_macro;
37
38pub mod atomic_broadcast;
39mod client;
40pub mod config;
41mod connection;
42pub mod generator;
43mod message;
44pub mod replica;
45pub mod stats;
46#[cfg(test)]
47mod tests;
48mod time;
49
50pub type MainSeed = [u8; 32];
51pub type SeedRng = rand_chacha::ChaCha20Rng;
52pub type EndRng = rand_pcg::Pcg64Mcg;
53
54//#[cfg(debug_assertions)]
55//const DEFAULT_LEVEL: &str = "debug";
56//#[cfg(not(debug_assertions))]
57const DEFAULT_LEVEL: &str = "info";
58
59pub const READ_TO_END_LIMIT: usize = 128 * 1024 * 1024;
60pub const CERTIFICATE_AUTHORITY: &[u8] = include_bytes!("../debug-certs/root-cert.der");
61pub const PRIVATE_KEY: &[u8] = include_bytes!("../debug-certs/key.der");
62pub const CERTIFICATE: &[u8] = include_bytes!("../debug-certs/cert.der");
63
64#[derive(Parser)]
65struct ABCperfOpt<O: Args> {
66    #[clap(flatten)]
67    algo_opt: O,
68
69    #[clap(flatten)]
70    own_opt: ABCperfArgs,
71}
72
73#[doc(hidden)]
74#[derive(Args)]
75pub struct ABCperfArgs {
76    #[clap(subcommand)]
77    command: Command,
78
79    #[clap(long, default_value = DEFAULT_LEVEL)]
80    log_level: Level,
81}
82
83#[derive(Subcommand)]
84enum Command {
85    /// Information about this binary
86    Info,
87
88    /// Commands that need a async runtime
89    #[command(flatten)]
90    Async(AsyncCommand),
91}
92
93#[derive(Subcommand)]
94enum AsyncCommand {
95    /// Run in client mode
96    Client(Box<client::ClientOpt>),
97    /// Run in replica mode
98    Replica(Box<replica::ReplicaOpt>),
99}
100
101/// Used to integrate a client emulator into abcperf
102pub trait ClientEmulator {
103    /// Custom configuration for the client emulator
104    type Config: ClientConfigExt;
105
106    /// Run the client emulator
107    fn start(
108        self,
109        config: ClientConfig<Self::Config>,
110        replicas: Vec<(ReplicaId, SocketAddr)>,
111        seed: &mut SeedRng,
112        start_time: Instant,
113    ) -> (oneshot::Sender<()>, JoinHandle<Result<Vec<ClientStats>>>);
114}
115
116/// Used to integrate a server into abcperf
117#[async_trait]
118pub trait Server: 'static {
119    /// Requests send to the algorithm by the server
120    type AlgoRequest: Transaction;
121
122    /// Responses recived from the algorithm by the server
123    type AlgoResponse: Decision;
124
125    /// Messages the server sends between replicas
126    type ReplicaMessage: ServerReplicaMessage;
127
128    /// Custom configuration for the server
129    type Config: ClientConfigExt;
130
131    #[allow(clippy::too_many_arguments)]
132    async fn run<F: Send + Future<Output = ()>>(
133        self,
134        config: ClientConfig<Self::Config>,
135        requests: mpsc::Sender<(ClientId, Self::AlgoRequest)>,
136        responses: mpsc::Receiver<Self::AlgoResponse>,
137        exit: oneshot::Receiver<()>,
138        ready: oneshot::Sender<SocketAddr>,
139        local_socket: SocketAddr,
140        replica_send: impl 'static + Sync + Send + Fn(MessageDestination, Self::ReplicaMessage) -> F,
141        replica_recv: mpsc::Receiver<(MessageType, ReplicaId, Self::ReplicaMessage)>,
142    );
143}
144
145pub_trait_alias_macro!(ServerReplicaMessage = for<'a> Deserialize<'a> + Serialize + Send + Debug);
146
147pub fn main<
148    A: AtomicBroadcast,
149    S: Server<AlgoRequest = A::Transaction, AlgoResponse = A::Decision>,
150    C: ClientEmulator<Config = S::Config>,
151    N: Into<Cow<'static, str>>,
152>(
153    name: N,
154    version: &'static str,
155    algo: impl FnOnce() -> A,
156    server: impl FnOnce() -> S,
157    client_emulator: impl FnOnce() -> C,
158) -> Result<()> {
159    #[derive(Args)]
160    struct Empty;
161
162    let (Empty, abcperf_args) = parse_args()?;
163
164    main_with_args(abcperf_args, name, version, algo, server, client_emulator)
165}
166
167/// parse the command line arguments with the provided additional args
168pub fn parse_args<O: Args>() -> Result<(O, ABCperfArgs)> {
169    let opt = ABCperfOpt::<O>::try_parse()?;
170    Ok((opt.algo_opt, opt.own_opt))
171}
172
173/// run abcperf with the provided algorithms and command line arguments
174pub fn main_with_args<
175    A: AtomicBroadcast,
176    S: Server<AlgoRequest = A::Transaction, AlgoResponse = A::Decision>,
177    C: ClientEmulator<Config = S::Config>,
178    N: Into<Cow<'static, str>>,
179>(
180    abcperf_args: ABCperfArgs,
181    name: N,
182    version: &'static str,
183    algo: impl FnOnce() -> A,
184    server: impl FnOnce() -> S,
185    client_emulator: impl FnOnce() -> C,
186) -> Result<()> {
187    tracing::subscriber::set_global_default(
188        FmtSubscriber::builder()
189            .with_max_level(abcperf_args.log_level)
190            .finish(),
191    )?;
192
193    let abcperf_version = env!("CARGO_PKG_VERSION");
194    let info = VersionInfo {
195        abcperf_version: abcperf_version.into(),
196        name: name.into(),
197        version: version.into(),
198    };
199
200    match abcperf_args.command {
201        Command::Info => print_info(info),
202        Command::Async(command) => {
203            let rt = tokio::runtime::Builder::new_multi_thread()
204                .enable_all()
205                .build()
206                .unwrap();
207
208            match command {
209                AsyncCommand::Client(opt) => {
210                    rt.block_on(client::main::<A, _>(*opt, info, client_emulator()))
211                }
212                AsyncCommand::Replica(opt) => rt.block_on(replica::main(*opt, info, algo, server)),
213            }
214        }
215    }
216}
217
218fn print_info(
219    VersionInfo {
220        name,
221        version,
222        abcperf_version,
223    }: VersionInfo,
224) -> Result<()> {
225    println!("{}", name);
226    println!("Version: {}", version);
227    println!("ABCperf Version: {}", abcperf_version);
228    Ok(())
229}
230
231fn server_config() -> Result<ServerConfig> {
232    let priv_key = PrivateKey(PRIVATE_KEY.to_vec());
233    let cert_chain = vec![Certificate(CERTIFICATE.to_vec())];
234    let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key)?;
235    Arc::get_mut(&mut server_config.transport)
236        .expect("config was just created")
237        .max_idle_timeout(Some(VarInt::from_u32(30_000).into()));
238    Ok(server_config)
239}
240
241fn client_config() -> Result<QuinnClientConfig> {
242    let mut roots = RootCertStore::empty();
243    roots.add(&Certificate(CERTIFICATE_AUTHORITY.to_vec()))?;
244    let mut client_config = QuinnClientConfig::with_root_certificates(roots);
245
246    let mut transport_config = TransportConfig::default();
247    transport_config.max_idle_timeout(Some(VarInt::from_u32(30_000).into()));
248    transport_config.keep_alive_interval(Some(Duration::from_secs(10)));
249    client_config.transport_config(Arc::new(transport_config));
250
251    Ok(client_config)
252}
253
254async fn quic_new_client_connect(addr: SocketAddr, server_name: &str) -> Result<Connection> {
255    let endpoint = quic_new_client((addr.family().unspecified(), 0))?;
256
257    let connection = endpoint.connect(addr, server_name)?.await?;
258
259    Ok(connection)
260}
261
262pub fn quic_new_client(bind: impl Into<SocketAddr>) -> Result<Endpoint> {
263    let mut endpoint = Endpoint::client(bind.into())?;
264    endpoint.set_default_client_config(crate::client_config()?);
265    Ok(endpoint)
266}
267
268pub fn quic_server(addr: impl Into<SocketAddr>) -> Result<Endpoint> {
269    let mut endpoint = Endpoint::server(server_config()?, addr.into())?;
270    endpoint.set_default_client_config(crate::client_config()?);
271    Ok(endpoint)
272}
273
274#[cfg(not(target_os = "windows"))]
275const USER_VAR: &str = "USER";
276#[cfg(target_os = "windows")]
277const USER_VAR: &str = "USERNAME";
278
279#[derive(Debug, Deserialize, Serialize, Eq, PartialEq, Clone)]
280struct VersionInfo {
281    abcperf_version: Cow<'static, str>,
282    name: Cow<'static, str>,
283    version: Cow<'static, str>,
284}
285
286#[derive(Debug)]
287pub enum MessageDestination {
288    Unicast(ReplicaId),
289    Broadcast,
290}
291
292#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
293pub enum MessageType {
294    Unicast,
295    Broadcast,
296}
297
298impl AsRef<MessageType> for MessageDestination {
299    fn as_ref(&self) -> &MessageType {
300        match self {
301            Self::Unicast(_) => &MessageType::Unicast,
302            Self::Broadcast => &MessageType::Broadcast,
303        }
304    }
305}
306
307impl<T: AsRef<MessageType>> From<T> for MessageType {
308    fn from(value: T) -> Self {
309        *value.as_ref()
310    }
311}