1use std::{
2 borrow::Cow,
3 env,
4 fmt::Debug,
5 net::SocketAddr,
6 sync::Arc,
7 time::{Duration, Instant},
8};
9
10use crate::stats::ClientStats;
11use async_trait::async_trait;
12pub use atomic_broadcast::AtomicBroadcast;
13pub use atomic_broadcast::AtomicBroadcastChannels;
14pub use atomic_broadcast::AtomicBroadcastConfiguration;
15use atomic_broadcast::{Decision, Transaction};
16use config::{ClientConfig, ClientConfigExt};
17use futures::Future;
18use ip_family::IpFamilyExt;
19use serde::{Deserialize, Serialize};
20use shared_ids::{ClientId, ReplicaId};
21use tokio::{
22 sync::{mpsc, oneshot},
23 task::JoinHandle,
24};
25
26use clap::{Args, Parser, Subcommand};
27
28use anyhow::Result;
29
30use quinn::{
31 ClientConfig as QuinnClientConfig, Connection, Endpoint, ServerConfig, TransportConfig, VarInt,
32};
33use rustls::{Certificate, PrivateKey, RootCertStore};
34use tracing::Level;
35use tracing_subscriber::FmtSubscriber;
36use trait_alias_macro::pub_trait_alias_macro;
37
38pub mod atomic_broadcast;
39mod client;
40pub mod config;
41mod connection;
42pub mod generator;
43mod message;
44pub mod replica;
45pub mod stats;
46#[cfg(test)]
47mod tests;
48mod time;
49
50pub type MainSeed = [u8; 32];
51pub type SeedRng = rand_chacha::ChaCha20Rng;
52pub type EndRng = rand_pcg::Pcg64Mcg;
53
54const DEFAULT_LEVEL: &str = "info";
58
59pub const READ_TO_END_LIMIT: usize = 128 * 1024 * 1024;
60pub const CERTIFICATE_AUTHORITY: &[u8] = include_bytes!("../debug-certs/root-cert.der");
61pub const PRIVATE_KEY: &[u8] = include_bytes!("../debug-certs/key.der");
62pub const CERTIFICATE: &[u8] = include_bytes!("../debug-certs/cert.der");
63
64#[derive(Parser)]
65struct ABCperfOpt<O: Args> {
66 #[clap(flatten)]
67 algo_opt: O,
68
69 #[clap(flatten)]
70 own_opt: ABCperfArgs,
71}
72
73#[doc(hidden)]
74#[derive(Args)]
75pub struct ABCperfArgs {
76 #[clap(subcommand)]
77 command: Command,
78
79 #[clap(long, default_value = DEFAULT_LEVEL)]
80 log_level: Level,
81}
82
83#[derive(Subcommand)]
84enum Command {
85 Info,
87
88 #[command(flatten)]
90 Async(AsyncCommand),
91}
92
93#[derive(Subcommand)]
94enum AsyncCommand {
95 Client(Box<client::ClientOpt>),
97 Replica(Box<replica::ReplicaOpt>),
99}
100
101pub trait ClientEmulator {
103 type Config: ClientConfigExt;
105
106 fn start(
108 self,
109 config: ClientConfig<Self::Config>,
110 replicas: Vec<(ReplicaId, SocketAddr)>,
111 seed: &mut SeedRng,
112 start_time: Instant,
113 ) -> (oneshot::Sender<()>, JoinHandle<Result<Vec<ClientStats>>>);
114}
115
116#[async_trait]
118pub trait Server: 'static {
119 type AlgoRequest: Transaction;
121
122 type AlgoResponse: Decision;
124
125 type ReplicaMessage: ServerReplicaMessage;
127
128 type Config: ClientConfigExt;
130
131 #[allow(clippy::too_many_arguments)]
132 async fn run<F: Send + Future<Output = ()>>(
133 self,
134 config: ClientConfig<Self::Config>,
135 requests: mpsc::Sender<(ClientId, Self::AlgoRequest)>,
136 responses: mpsc::Receiver<Self::AlgoResponse>,
137 exit: oneshot::Receiver<()>,
138 ready: oneshot::Sender<SocketAddr>,
139 local_socket: SocketAddr,
140 replica_send: impl 'static + Sync + Send + Fn(MessageDestination, Self::ReplicaMessage) -> F,
141 replica_recv: mpsc::Receiver<(MessageType, ReplicaId, Self::ReplicaMessage)>,
142 );
143}
144
145pub_trait_alias_macro!(ServerReplicaMessage = for<'a> Deserialize<'a> + Serialize + Send + Debug);
146
147pub fn main<
148 A: AtomicBroadcast,
149 S: Server<AlgoRequest = A::Transaction, AlgoResponse = A::Decision>,
150 C: ClientEmulator<Config = S::Config>,
151 N: Into<Cow<'static, str>>,
152>(
153 name: N,
154 version: &'static str,
155 algo: impl FnOnce() -> A,
156 server: impl FnOnce() -> S,
157 client_emulator: impl FnOnce() -> C,
158) -> Result<()> {
159 #[derive(Args)]
160 struct Empty;
161
162 let (Empty, abcperf_args) = parse_args()?;
163
164 main_with_args(abcperf_args, name, version, algo, server, client_emulator)
165}
166
167pub fn parse_args<O: Args>() -> Result<(O, ABCperfArgs)> {
169 let opt = ABCperfOpt::<O>::try_parse()?;
170 Ok((opt.algo_opt, opt.own_opt))
171}
172
173pub fn main_with_args<
175 A: AtomicBroadcast,
176 S: Server<AlgoRequest = A::Transaction, AlgoResponse = A::Decision>,
177 C: ClientEmulator<Config = S::Config>,
178 N: Into<Cow<'static, str>>,
179>(
180 abcperf_args: ABCperfArgs,
181 name: N,
182 version: &'static str,
183 algo: impl FnOnce() -> A,
184 server: impl FnOnce() -> S,
185 client_emulator: impl FnOnce() -> C,
186) -> Result<()> {
187 tracing::subscriber::set_global_default(
188 FmtSubscriber::builder()
189 .with_max_level(abcperf_args.log_level)
190 .finish(),
191 )?;
192
193 let abcperf_version = env!("CARGO_PKG_VERSION");
194 let info = VersionInfo {
195 abcperf_version: abcperf_version.into(),
196 name: name.into(),
197 version: version.into(),
198 };
199
200 match abcperf_args.command {
201 Command::Info => print_info(info),
202 Command::Async(command) => {
203 let rt = tokio::runtime::Builder::new_multi_thread()
204 .enable_all()
205 .build()
206 .unwrap();
207
208 match command {
209 AsyncCommand::Client(opt) => {
210 rt.block_on(client::main::<A, _>(*opt, info, client_emulator()))
211 }
212 AsyncCommand::Replica(opt) => rt.block_on(replica::main(*opt, info, algo, server)),
213 }
214 }
215 }
216}
217
218fn print_info(
219 VersionInfo {
220 name,
221 version,
222 abcperf_version,
223 }: VersionInfo,
224) -> Result<()> {
225 println!("{}", name);
226 println!("Version: {}", version);
227 println!("ABCperf Version: {}", abcperf_version);
228 Ok(())
229}
230
231fn server_config() -> Result<ServerConfig> {
232 let priv_key = PrivateKey(PRIVATE_KEY.to_vec());
233 let cert_chain = vec![Certificate(CERTIFICATE.to_vec())];
234 let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key)?;
235 Arc::get_mut(&mut server_config.transport)
236 .expect("config was just created")
237 .max_idle_timeout(Some(VarInt::from_u32(30_000).into()));
238 Ok(server_config)
239}
240
241fn client_config() -> Result<QuinnClientConfig> {
242 let mut roots = RootCertStore::empty();
243 roots.add(&Certificate(CERTIFICATE_AUTHORITY.to_vec()))?;
244 let mut client_config = QuinnClientConfig::with_root_certificates(roots);
245
246 let mut transport_config = TransportConfig::default();
247 transport_config.max_idle_timeout(Some(VarInt::from_u32(30_000).into()));
248 transport_config.keep_alive_interval(Some(Duration::from_secs(10)));
249 client_config.transport_config(Arc::new(transport_config));
250
251 Ok(client_config)
252}
253
254async fn quic_new_client_connect(addr: SocketAddr, server_name: &str) -> Result<Connection> {
255 let endpoint = quic_new_client((addr.family().unspecified(), 0))?;
256
257 let connection = endpoint.connect(addr, server_name)?.await?;
258
259 Ok(connection)
260}
261
262pub fn quic_new_client(bind: impl Into<SocketAddr>) -> Result<Endpoint> {
263 let mut endpoint = Endpoint::client(bind.into())?;
264 endpoint.set_default_client_config(crate::client_config()?);
265 Ok(endpoint)
266}
267
268pub fn quic_server(addr: impl Into<SocketAddr>) -> Result<Endpoint> {
269 let mut endpoint = Endpoint::server(server_config()?, addr.into())?;
270 endpoint.set_default_client_config(crate::client_config()?);
271 Ok(endpoint)
272}
273
274#[cfg(not(target_os = "windows"))]
275const USER_VAR: &str = "USER";
276#[cfg(target_os = "windows")]
277const USER_VAR: &str = "USERNAME";
278
279#[derive(Debug, Deserialize, Serialize, Eq, PartialEq, Clone)]
280struct VersionInfo {
281 abcperf_version: Cow<'static, str>,
282 name: Cow<'static, str>,
283 version: Cow<'static, str>,
284}
285
286#[derive(Debug)]
287pub enum MessageDestination {
288 Unicast(ReplicaId),
289 Broadcast,
290}
291
292#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
293pub enum MessageType {
294 Unicast,
295 Broadcast,
296}
297
298impl AsRef<MessageType> for MessageDestination {
299 fn as_ref(&self) -> &MessageType {
300 match self {
301 Self::Unicast(_) => &MessageType::Unicast,
302 Self::Broadcast => &MessageType::Broadcast,
303 }
304 }
305}
306
307impl<T: AsRef<MessageType>> From<T> for MessageType {
308 fn from(value: T) -> Self {
309 *value.as_ref()
310 }
311}