abcperf_generic_client/no_proxy/
mod.rs1use std::{collections::HashMap, fmt::Debug, marker::PhantomData, net::SocketAddr, time::Instant};
2
3use crate::cs::{typed::TypedCSTrait, CSTrait};
4use abcperf::stats::ClientStats;
5use anyhow::Result;
6use rand::SeedableRng;
7use serde::{Deserialize, Serialize};
8use shared_ids::ReplicaId;
9use std::time::Duration;
10use tokio::{sync::oneshot, task::JoinHandle};
11use tracing::Instrument;
12
13use abcperf::{config::ClientConfig, ClientEmulator, EndRng, SeedRng};
14
15use crate::{CustomClientConfig, Generate};
16
17use self::{generator::GeneratorClient, n_clients::NClientsClient};
18
19mod generator;
20mod n_clients;
21
22const CLIENT_STUCK_AFTER: Duration = Duration::from_secs(10); pub struct GenericClient<
25 I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static,
26 O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
27 C: for<'a> Deserialize<'a>
28 + Serialize
29 + Send
30 + Clone
31 + Debug
32 + 'static
33 + Sync
34 + Into<HashMap<String, String>>
35 + AsRef<CustomClientConfig>,
36 CS: CSTrait,
37> {
38 phantom_data: PhantomData<(I, O, C)>,
39 cs: TypedCSTrait<CS, I, O>,
40}
41
42impl<
43 I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static,
44 O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
45 C: for<'a> Deserialize<'a>
46 + Serialize
47 + Send
48 + Clone
49 + Debug
50 + 'static
51 + Sync
52 + Into<HashMap<String, String>>
53 + AsRef<CustomClientConfig>,
54 CS: CSTrait,
55 > GenericClient<I, O, C, CS>
56{
57 pub fn new(cs: TypedCSTrait<CS, I, O>) -> Self {
58 Self {
59 phantom_data: Default::default(),
60 cs,
61 }
62 }
63}
64
65impl<
66 I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static + Clone,
67 O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
68 C: for<'a> Deserialize<'a>
69 + Serialize
70 + Send
71 + Clone
72 + Debug
73 + 'static
74 + Sync
75 + Into<HashMap<String, String>>
76 + AsRef<CustomClientConfig>,
77 CS: CSTrait,
78 > ClientEmulator for GenericClient<I, O, C, CS>
79{
80 type Config = C;
81
82 fn start(
83 self,
84 config: ClientConfig<Self::Config>,
85 replicas: Vec<(ReplicaId, SocketAddr)>,
86 seed: &mut SeedRng,
87 start_time: Instant,
88 ) -> (oneshot::Sender<()>, JoinHandle<Result<Vec<ClientStats>>>) {
89 let (sender, receiver) = oneshot::channel();
90
91 let t = config.t;
92
93 let replicas = replicas.into_iter().collect();
94
95 let join = match config.client.as_ref() {
96 CustomClientConfig::NClients { number } => {
97 let n_clients =
98 NClientsClient::<I, O, C, CS>::new(*number, replicas, config, self.cs);
99 tokio::spawn(
100 n_clients
101 .run(
102 start_time,
103 receiver,
104 EndRng::from_rng(seed).expect("seed rng should always work"),
105 t,
106 )
107 .in_current_span(),
108 )
109 }
110 CustomClientConfig::Distribution(generator_config) => {
111 let n_clients = GeneratorClient::<I, O, C, CS>::new(
112 replicas,
113 generator_config.clone(),
114 config,
115 seed,
116 receiver,
117 self.cs,
118 );
119 tokio::spawn(
120 n_clients
121 .run(
122 start_time,
123 EndRng::from_rng(seed).expect("seed rng should always work"),
124 t,
125 )
126 .in_current_span(),
127 )
128 }
129 };
130 (sender, join)
131 }
132}