abcperf_generic_client/
response.rs

1use std::{cmp::Ordering, collections::HashMap, mem};
2
3use shared_ids::{AnyId, ClientId, RequestId};
4use tokio::sync::oneshot;
5
6#[derive(Debug)]
7pub struct ResponseHandler<P> {
8    clients: HashMap<ClientId, ClientHandlerState<P>>,
9}
10
11impl<P> Default for ResponseHandler<P> {
12    fn default() -> Self {
13        Self {
14            clients: HashMap::new(),
15        }
16    }
17}
18
19impl<P> ResponseHandler<P> {
20    pub fn request(
21        &mut self,
22        client_id: ClientId,
23        request_id: RequestId,
24        channel: oneshot::Sender<P>,
25    ) {
26        self.clients
27            .entry(client_id)
28            .or_default()
29            .request(request_id, channel)
30    }
31
32    pub fn response(&mut self, client_id: ClientId, request_id: RequestId, payload: P) {
33        self.clients
34            .entry(client_id)
35            .or_default()
36            .response(request_id, payload)
37    }
38}
39
40#[derive(Debug)]
41struct ClientHandlerState<P> {
42    id: RequestId,
43    inner: InnerClientHandlerState<P>,
44}
45
46impl<P> Default for ClientHandlerState<P> {
47    fn default() -> Self {
48        Self {
49            id: RequestId::from_u64(0),
50            inner: InnerClientHandlerState::Neither,
51        }
52    }
53}
54
55#[derive(Debug)]
56enum InnerClientHandlerState<P> {
57    Neither,
58    Channel(oneshot::Sender<P>),
59    Response(P),
60}
61
62impl<P> InnerClientHandlerState<P> {
63    fn take(&mut self) -> Self {
64        mem::replace(self, Self::Neither)
65    }
66}
67
68impl<P> ClientHandlerState<P> {
69    fn request(&mut self, id: RequestId, channel: oneshot::Sender<P>) {
70        match id.cmp(&self.id) {
71            Ordering::Less => {} // ignore
72            Ordering::Equal => match self.inner.take() {
73                InnerClientHandlerState::Neither | InnerClientHandlerState::Channel(_) => {
74                    self.inner = InnerClientHandlerState::Channel(channel)
75                }
76                InnerClientHandlerState::Response(payload) => {
77                    let _ = channel.send(payload);
78                    self.next_id();
79                }
80            },
81            Ordering::Greater => {
82                *self = Self {
83                    id,
84                    inner: InnerClientHandlerState::Channel(channel),
85                }
86            }
87        }
88    }
89
90    fn response(&mut self, id: RequestId, payload: P) {
91        match id.cmp(&self.id) {
92            Ordering::Less => {} // ignore
93            Ordering::Equal => match self.inner.take() {
94                InnerClientHandlerState::Neither => {
95                    self.inner = InnerClientHandlerState::Response(payload)
96                }
97                InnerClientHandlerState::Channel(channel) => {
98                    let _ = channel.send(payload);
99                    self.next_id();
100                }
101                InnerClientHandlerState::Response(_) => {} // ignore
102            },
103            Ordering::Greater => {
104                *self = Self {
105                    id,
106                    inner: InnerClientHandlerState::Response(payload),
107                }
108            }
109        }
110    }
111
112    fn next_id(&mut self) {
113        let id = self.id.as_mut_u64();
114        *id = id.checked_add(1).unwrap();
115    }
116}