abcperf_generic_client/proxy/
mod.rs1use std::{collections::HashMap, fmt::Debug, marker::PhantomData, net::SocketAddr, time::Instant};
2
3use crate::cs::{typed::TypedCSTrait, CSTrait};
4use abcperf::stats::ClientStats;
5use anyhow::Result;
6use rand::SeedableRng;
7use serde::{Deserialize, Serialize};
8use shared_ids::ReplicaId;
9use std::time::Duration;
10use tokio::{sync::oneshot, task::JoinHandle};
11use tracing::Instrument;
12
13use abcperf::{config::ClientConfig, ClientEmulator, EndRng, SeedRng};
14
15use crate::{CustomClientConfig, Generate};
16
17use self::{generator::GeneratorClient, n_clients::NClientsClient};
18
19mod generator;
20mod n_clients;
21
22const CLIENT_STUCK_AFTER: Duration = Duration::from_secs(10); pub trait ConfigTrait {
25 fn fallback_timeout(&self) -> Duration;
26}
27
28pub struct GenericClient<
29 I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static,
30 O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
31 C: for<'a> Deserialize<'a>
32 + Serialize
33 + Send
34 + Clone
35 + Debug
36 + 'static
37 + Sync
38 + Into<HashMap<String, String>>
39 + AsRef<CustomClientConfig>
40 + ConfigTrait,
41 CS: CSTrait,
42> {
43 phantom_data: PhantomData<(I, O, C)>,
44 cs: TypedCSTrait<CS, I, O>,
45}
46
47impl<
48 I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static,
49 O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
50 C: for<'a> Deserialize<'a>
51 + Serialize
52 + Send
53 + Clone
54 + Debug
55 + 'static
56 + Sync
57 + Into<HashMap<String, String>>
58 + AsRef<CustomClientConfig>
59 + ConfigTrait,
60 CS: CSTrait,
61 > GenericClient<I, O, C, CS>
62{
63 pub fn new(cs: TypedCSTrait<CS, I, O>) -> Self {
64 Self {
65 phantom_data: Default::default(),
66 cs,
67 }
68 }
69}
70
71impl<
72 I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static + Clone,
73 O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
74 C: for<'a> Deserialize<'a>
75 + Serialize
76 + Send
77 + Clone
78 + Debug
79 + 'static
80 + Sync
81 + Into<HashMap<String, String>>
82 + AsRef<CustomClientConfig>
83 + ConfigTrait,
84 CS: CSTrait,
85 > ClientEmulator for GenericClient<I, O, C, CS>
86{
87 type Config = C;
88
89 fn start(
90 self,
91 config: ClientConfig<Self::Config>,
92 replicas: Vec<(ReplicaId, SocketAddr)>,
93 seed: &mut SeedRng,
94 start_time: Instant,
95 ) -> (oneshot::Sender<()>, JoinHandle<Result<Vec<ClientStats>>>) {
96 let (sender, receiver) = oneshot::channel();
97
98 let replicas = replicas.into_iter().collect();
99 let fallback_timeout = config.client.fallback_timeout();
100
101 let join = match config.client.as_ref() {
102 CustomClientConfig::NClients { number } => {
103 let n_clients =
104 NClientsClient::<I, O, C, CS>::new(*number, replicas, config, self.cs);
105 tokio::spawn(
106 n_clients
107 .run(
108 start_time,
109 receiver,
110 EndRng::from_rng(seed).expect("seed rng should always work"),
111 fallback_timeout,
112 )
113 .in_current_span(),
114 )
115 }
116 CustomClientConfig::Distribution(generator_config) => {
117 let n_clients = GeneratorClient::<I, O, C, CS>::new(
118 replicas,
119 generator_config.clone(),
120 config,
121 seed,
122 receiver,
123 self.cs,
124 );
125 tokio::spawn(
126 n_clients
127 .run(
128 start_time,
129 EndRng::from_rng(seed).expect("seed rng should always work"),
130 fallback_timeout,
131 )
132 .in_current_span(),
133 )
134 }
135 };
136 (sender, join)
137 }
138}