abcperf_generic_client/no_proxy/
mod.rs

1use std::{collections::HashMap, fmt::Debug, marker::PhantomData, net::SocketAddr, time::Instant};
2
3use crate::cs::{typed::TypedCSTrait, CSTrait};
4use abcperf::stats::ClientStats;
5use anyhow::Result;
6use rand::SeedableRng;
7use serde::{Deserialize, Serialize};
8use shared_ids::ReplicaId;
9use std::time::Duration;
10use tokio::{sync::oneshot, task::JoinHandle};
11use tracing::Instrument;
12
13use abcperf::{config::ClientConfig, ClientEmulator, EndRng, SeedRng};
14
15use crate::{CustomClientConfig, Generate};
16
17use self::{generator::GeneratorClient, n_clients::NClientsClient};
18
19mod generator;
20mod n_clients;
21
22const CLIENT_STUCK_AFTER: Duration = Duration::from_secs(10); // TODO maybe config option
23
24pub struct GenericClient<
25    I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static,
26    O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
27    C: for<'a> Deserialize<'a>
28        + Serialize
29        + Send
30        + Clone
31        + Debug
32        + 'static
33        + Sync
34        + Into<HashMap<String, String>>
35        + AsRef<CustomClientConfig>,
36    CS: CSTrait,
37> {
38    phantom_data: PhantomData<(I, O, C)>,
39    cs: TypedCSTrait<CS, I, O>,
40}
41
42impl<
43        I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static,
44        O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
45        C: for<'a> Deserialize<'a>
46            + Serialize
47            + Send
48            + Clone
49            + Debug
50            + 'static
51            + Sync
52            + Into<HashMap<String, String>>
53            + AsRef<CustomClientConfig>,
54        CS: CSTrait,
55    > GenericClient<I, O, C, CS>
56{
57    pub fn new(cs: TypedCSTrait<CS, I, O>) -> Self {
58        Self {
59            phantom_data: Default::default(),
60            cs,
61        }
62    }
63}
64
65impl<
66        I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static + Clone,
67        O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
68        C: for<'a> Deserialize<'a>
69            + Serialize
70            + Send
71            + Clone
72            + Debug
73            + 'static
74            + Sync
75            + Into<HashMap<String, String>>
76            + AsRef<CustomClientConfig>,
77        CS: CSTrait,
78    > ClientEmulator for GenericClient<I, O, C, CS>
79{
80    type Config = C;
81
82    fn start(
83        self,
84        config: ClientConfig<Self::Config>,
85        replicas: Vec<(ReplicaId, SocketAddr)>,
86        seed: &mut SeedRng,
87        start_time: Instant,
88    ) -> (oneshot::Sender<()>, JoinHandle<Result<Vec<ClientStats>>>) {
89        let (sender, receiver) = oneshot::channel();
90
91        let t = config.t;
92
93        let replicas = replicas.into_iter().collect();
94
95        let join = match config.client.as_ref() {
96            CustomClientConfig::NClients { number } => {
97                let n_clients =
98                    NClientsClient::<I, O, C, CS>::new(*number, replicas, config, self.cs);
99                tokio::spawn(
100                    n_clients
101                        .run(
102                            start_time,
103                            receiver,
104                            EndRng::from_rng(seed).expect("seed rng should always work"),
105                            t,
106                        )
107                        .in_current_span(),
108                )
109            }
110            CustomClientConfig::Distribution(generator_config) => {
111                let n_clients = GeneratorClient::<I, O, C, CS>::new(
112                    replicas,
113                    generator_config.clone(),
114                    config,
115                    seed,
116                    receiver,
117                    self.cs,
118                );
119                tokio::spawn(
120                    n_clients
121                        .run(
122                            start_time,
123                            EndRng::from_rng(seed).expect("seed rng should always work"),
124                            t,
125                        )
126                        .in_current_span(),
127                )
128            }
129        };
130        (sender, join)
131    }
132}