abcperf_generic_client/
response.rs1use std::{cmp::Ordering, collections::HashMap, mem};
2
3use shared_ids::{AnyId, ClientId, RequestId};
4use tokio::sync::oneshot;
5
6#[derive(Debug)]
7pub struct ResponseHandler<P> {
8 clients: HashMap<ClientId, ClientHandlerState<P>>,
9}
10
11impl<P> Default for ResponseHandler<P> {
12 fn default() -> Self {
13 Self {
14 clients: HashMap::new(),
15 }
16 }
17}
18
19impl<P> ResponseHandler<P> {
20 pub fn request(
21 &mut self,
22 client_id: ClientId,
23 request_id: RequestId,
24 channel: oneshot::Sender<P>,
25 ) {
26 self.clients
27 .entry(client_id)
28 .or_default()
29 .request(request_id, channel)
30 }
31
32 pub fn response(&mut self, client_id: ClientId, request_id: RequestId, payload: P) {
33 self.clients
34 .entry(client_id)
35 .or_default()
36 .response(request_id, payload)
37 }
38}
39
40#[derive(Debug)]
41struct ClientHandlerState<P> {
42 id: RequestId,
43 inner: InnerClientHandlerState<P>,
44}
45
46impl<P> Default for ClientHandlerState<P> {
47 fn default() -> Self {
48 Self {
49 id: RequestId::from_u64(0),
50 inner: InnerClientHandlerState::Neither,
51 }
52 }
53}
54
55#[derive(Debug)]
56enum InnerClientHandlerState<P> {
57 Neither,
58 Channel(oneshot::Sender<P>),
59 Response(P),
60}
61
62impl<P> InnerClientHandlerState<P> {
63 fn take(&mut self) -> Self {
64 mem::replace(self, Self::Neither)
65 }
66}
67
68impl<P> ClientHandlerState<P> {
69 fn request(&mut self, id: RequestId, channel: oneshot::Sender<P>) {
70 match id.cmp(&self.id) {
71 Ordering::Less => {} Ordering::Equal => match self.inner.take() {
73 InnerClientHandlerState::Neither | InnerClientHandlerState::Channel(_) => {
74 self.inner = InnerClientHandlerState::Channel(channel)
75 }
76 InnerClientHandlerState::Response(payload) => {
77 let _ = channel.send(payload);
78 self.next_id();
79 }
80 },
81 Ordering::Greater => {
82 *self = Self {
83 id,
84 inner: InnerClientHandlerState::Channel(channel),
85 }
86 }
87 }
88 }
89
90 fn response(&mut self, id: RequestId, payload: P) {
91 match id.cmp(&self.id) {
92 Ordering::Less => {} Ordering::Equal => match self.inner.take() {
94 InnerClientHandlerState::Neither => {
95 self.inner = InnerClientHandlerState::Response(payload)
96 }
97 InnerClientHandlerState::Channel(channel) => {
98 let _ = channel.send(payload);
99 self.next_id();
100 }
101 InnerClientHandlerState::Response(_) => {} },
103 Ordering::Greater => {
104 *self = Self {
105 id,
106 inner: InnerClientHandlerState::Response(payload),
107 }
108 }
109 }
110 }
111
112 fn next_id(&mut self) {
113 let id = self.id.as_mut_u64();
114 *id = id.checked_add(1).unwrap();
115 }
116}