abcperf_generic_client/proxy/
mod.rs

1use std::{collections::HashMap, fmt::Debug, marker::PhantomData, net::SocketAddr, time::Instant};
2
3use crate::cs::{typed::TypedCSTrait, CSTrait};
4use abcperf::stats::ClientStats;
5use anyhow::Result;
6use rand::SeedableRng;
7use serde::{Deserialize, Serialize};
8use shared_ids::ReplicaId;
9use std::time::Duration;
10use tokio::{sync::oneshot, task::JoinHandle};
11use tracing::Instrument;
12
13use abcperf::{config::ClientConfig, ClientEmulator, EndRng, SeedRng};
14
15use crate::{CustomClientConfig, Generate};
16
17use self::{generator::GeneratorClient, n_clients::NClientsClient};
18
19mod generator;
20mod n_clients;
21
22const CLIENT_STUCK_AFTER: Duration = Duration::from_secs(10); // TODO maybe config option
23
24pub trait ConfigTrait {
25    fn fallback_timeout(&self) -> Duration;
26}
27
28pub struct GenericClient<
29    I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static,
30    O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
31    C: for<'a> Deserialize<'a>
32        + Serialize
33        + Send
34        + Clone
35        + Debug
36        + 'static
37        + Sync
38        + Into<HashMap<String, String>>
39        + AsRef<CustomClientConfig>
40        + ConfigTrait,
41    CS: CSTrait,
42> {
43    phantom_data: PhantomData<(I, O, C)>,
44    cs: TypedCSTrait<CS, I, O>,
45}
46
47impl<
48        I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static,
49        O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
50        C: for<'a> Deserialize<'a>
51            + Serialize
52            + Send
53            + Clone
54            + Debug
55            + 'static
56            + Sync
57            + Into<HashMap<String, String>>
58            + AsRef<CustomClientConfig>
59            + ConfigTrait,
60        CS: CSTrait,
61    > GenericClient<I, O, C, CS>
62{
63    pub fn new(cs: TypedCSTrait<CS, I, O>) -> Self {
64        Self {
65            phantom_data: Default::default(),
66            cs,
67        }
68    }
69}
70
71impl<
72        I: Generate<C> + Serialize + for<'a> Deserialize<'a> + Debug + Send + Sync + 'static + Clone,
73        O: Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static,
74        C: for<'a> Deserialize<'a>
75            + Serialize
76            + Send
77            + Clone
78            + Debug
79            + 'static
80            + Sync
81            + Into<HashMap<String, String>>
82            + AsRef<CustomClientConfig>
83            + ConfigTrait,
84        CS: CSTrait,
85    > ClientEmulator for GenericClient<I, O, C, CS>
86{
87    type Config = C;
88
89    fn start(
90        self,
91        config: ClientConfig<Self::Config>,
92        replicas: Vec<(ReplicaId, SocketAddr)>,
93        seed: &mut SeedRng,
94        start_time: Instant,
95    ) -> (oneshot::Sender<()>, JoinHandle<Result<Vec<ClientStats>>>) {
96        let (sender, receiver) = oneshot::channel();
97
98        let replicas = replicas.into_iter().collect();
99        let fallback_timeout = config.client.fallback_timeout();
100
101        let join = match config.client.as_ref() {
102            CustomClientConfig::NClients { number } => {
103                let n_clients =
104                    NClientsClient::<I, O, C, CS>::new(*number, replicas, config, self.cs);
105                tokio::spawn(
106                    n_clients
107                        .run(
108                            start_time,
109                            receiver,
110                            EndRng::from_rng(seed).expect("seed rng should always work"),
111                            fallback_timeout,
112                        )
113                        .in_current_span(),
114                )
115            }
116            CustomClientConfig::Distribution(generator_config) => {
117                let n_clients = GeneratorClient::<I, O, C, CS>::new(
118                    replicas,
119                    generator_config.clone(),
120                    config,
121                    seed,
122                    receiver,
123                    self.cs,
124                );
125                tokio::spawn(
126                    n_clients
127                        .run(
128                            start_time,
129                            EndRng::from_rng(seed).expect("seed rng should always work"),
130                            fallback_timeout,
131                        )
132                        .in_current_span(),
133                )
134            }
135        };
136        (sender, join)
137    }
138}