abcperf_client_proxy/
lib.rs

1use std::{
2    cmp::Ordering,
3    collections::{HashMap, HashSet},
4    fmt::Debug,
5    mem,
6};
7
8use anyhow::Error;
9use blake2::Blake2b;
10pub use blake2::Digest;
11pub use ed25519_dalek::Signature;
12use rand::rngs::OsRng;
13
14use abcperf::{
15    atomic_broadcast::{AtomicBroadcast, AtomicBroadcastChannels, AtomicBroadcastConfiguration},
16    MessageDestination,
17};
18use async_trait::async_trait;
19use ed25519_dalek::SigningKey;
20use relay::ResponseRelay;
21use serde::{Deserialize, Serialize};
22use shared_ids::{AnyId, ClientId, ReplicaId, RequestId};
23use tokio::{select, sync::mpsc, task::JoinHandle};
24use tracing::Instrument;
25
26mod relay;
27
28static ED25519_CONTEXT: &[u8] = b"abcperf-client-proxy-sig";
29
30#[derive(Debug, Serialize, Deserialize, Clone)]
31pub struct SignedResponse<S> {
32    messsage: S,
33    signature: Signature,
34}
35
36impl<S> SignedResponse<S> {
37    fn new(messsage: S, signature: Signature) -> Self {
38        Self {
39            messsage,
40            signature,
41        }
42    }
43}
44
45#[derive(Debug, Serialize, Deserialize)]
46pub enum CustomReplicaMessage<M, R, S> {
47    ReplicaMessage(M),
48    RequestBroadcast((ClientId, R)),
49    Response(SignedResponse<S>),
50}
51
52pub trait ResponseInfo {
53    fn client_id(&self) -> ClientId;
54    fn request_id(&self) -> RequestId;
55    fn hash_with_digest<D: Digest>(&self, digest: &mut D);
56}
57
58pub trait ResponseInfoNoClientId {
59    fn request_id(&self) -> RequestId;
60    fn hash_with_digest<D: Digest>(&self, digest: &mut D);
61}
62
63impl<T: ResponseInfoNoClientId> ResponseInfo for (ClientId, T) {
64    fn client_id(&self) -> ClientId {
65        self.0
66    }
67
68    fn request_id(&self) -> RequestId {
69        self.1.request_id()
70    }
71
72    fn hash_with_digest<D: Digest>(&self, digest: &mut D) {
73        digest.update(self.client_id().as_u64().to_be_bytes());
74        self.1.hash_with_digest(digest);
75    }
76}
77
78pub struct ClientProxyAdapter<A: AtomicBroadcast> {
79    inner: A,
80}
81
82impl<A: AtomicBroadcast> ClientProxyAdapter<A> {
83    pub fn new(inner: A) -> Self {
84        Self { inner }
85    }
86}
87
88enum ClientState<R> {
89    Empty,
90    Filled {
91        request_id: RequestId,
92        response_state: ResponseState<R>,
93    },
94}
95
96impl<R> Default for ClientState<R> {
97    fn default() -> Self {
98        Self::Empty
99    }
100}
101
102impl<R: ResponseInfo + Eq> ClientState<R> {
103    fn local_response(&mut self, response: R) {
104        let new_id = if let Self::Filled {
105            request_id,
106            response_state,
107        } = self
108        {
109            let id = response.request_id();
110            match id.cmp(request_id) {
111                Ordering::Less => return,
112                Ordering::Equal => {
113                    response_state.local_response(response);
114                    return;
115                }
116                Ordering::Greater => id,
117            }
118        } else {
119            response.request_id()
120        };
121        *self = Self::Filled {
122            request_id: new_id,
123            response_state: ResponseState::from_local(response),
124        }
125    }
126
127    fn remote_response(&mut self, replica_id: ReplicaId, response: R, signature: Signature) {
128        let new_id = if let Self::Filled {
129            request_id,
130            response_state,
131        } = self
132        {
133            let id = response.request_id();
134            match id.cmp(request_id) {
135                Ordering::Less => return,
136                Ordering::Equal => {
137                    response_state.remote_response(replica_id, response, signature);
138                    return;
139                }
140                Ordering::Greater => id,
141            }
142        } else {
143            response.request_id()
144        };
145        *self = Self::Filled {
146            request_id: new_id,
147            response_state: ResponseState::from_remote(replica_id, response, signature),
148        }
149    }
150
151    fn get_response(&mut self, t: u64) -> Option<(R, Box<[Signature]>)> {
152        if let Self::Filled {
153            request_id: _,
154            response_state: response_state_ref,
155        } = self
156        {
157            let mut result = None;
158            let response_state = mem::replace(response_state_ref, ResponseState::Empty);
159            let response_state = response_state.get_response(t, &mut result);
160            *response_state_ref = response_state;
161            result
162        } else {
163            None
164        }
165    }
166}
167
168enum ResponseState<R> {
169    Empty,
170    Local {
171        local: R,
172        confirmed_singatures: Vec<Signature>,
173        confirmed_replicas: HashSet<ReplicaId>,
174    },
175    RemoteOnly {
176        unconfirmed_signatures: Vec<(ReplicaId, R, Signature)>,
177    },
178}
179
180impl<R: Eq> ResponseState<R> {
181    fn from_local(response: R) -> Self {
182        Self::Local {
183            local: response,
184            confirmed_singatures: Default::default(),
185            confirmed_replicas: Default::default(),
186        }
187    }
188
189    fn from_remote(replica_id: ReplicaId, response: R, signature: Signature) -> Self {
190        Self::RemoteOnly {
191            unconfirmed_signatures: vec![(replica_id, response, signature)],
192        }
193    }
194
195    fn local_response(&mut self, response: R) {
196        match self {
197            ResponseState::Local { .. } => unreachable!(),
198            ResponseState::RemoteOnly {
199                unconfirmed_signatures,
200            } => {
201                let mut confirmed_singatures = Vec::new();
202                let mut confirmed_replicas = HashSet::new();
203                for (id, r, s) in mem::take(unconfirmed_signatures).into_iter() {
204                    if r == response && confirmed_replicas.insert(id) {
205                        confirmed_singatures.push(s);
206                    }
207                }
208                *self = Self::Local {
209                    confirmed_singatures,
210                    confirmed_replicas,
211                    local: response,
212                }
213            }
214            ResponseState::Empty => *self = Self::from_local(response),
215        }
216    }
217
218    fn remote_response(&mut self, replica_id: ReplicaId, response: R, signature: Signature) {
219        match self {
220            ResponseState::Local {
221                local,
222                confirmed_singatures,
223                confirmed_replicas,
224            } => {
225                if local == &response && confirmed_replicas.insert(replica_id) {
226                    confirmed_singatures.push(signature);
227                }
228            }
229            ResponseState::RemoteOnly {
230                unconfirmed_signatures,
231            } => {
232                unconfirmed_signatures.push((replica_id, response, signature));
233            }
234            ResponseState::Empty => *self = Self::from_remote(replica_id, response, signature),
235        }
236    }
237
238    fn get_response(self, t: u64, result: &mut Option<(R, Box<[Signature]>)>) -> Self {
239        match self {
240            ResponseState::Local {
241                local,
242                confirmed_singatures,
243                confirmed_replicas,
244            } if confirmed_singatures.len() as u64 >= t => {
245                assert_eq!(confirmed_singatures.len(), confirmed_replicas.len());
246                *result = Some((local, confirmed_singatures.into()));
247                Self::Empty
248            }
249            this => this,
250        }
251    }
252}
253
254#[async_trait]
255impl<A: AtomicBroadcast + Send> AtomicBroadcast for ClientProxyAdapter<A>
256where
257    A::Transaction: Unpin + Clone + AsRef<RequestId>,
258    A::Decision: Unpin + Clone + Eq + ResponseInfo,
259{
260    type Config = A::Config;
261
262    type ReplicaMessage = CustomReplicaMessage<A::ReplicaMessage, A::Transaction, A::Decision>;
263
264    type Transaction = A::Transaction;
265
266    type Decision = (A::Decision, Box<[Signature]>);
267
268    fn start(
269        self,
270        config: AtomicBroadcastConfiguration<Self::Config>,
271        channels: AtomicBroadcastChannels<Self::ReplicaMessage, Self::Transaction, Self::Decision>,
272        ready_for_clients: impl Send + 'static + FnOnce() + Sync,
273    ) -> JoinHandle<Result<(), Error>> {
274        tokio::spawn(async move {
275            let AtomicBroadcastChannels {
276                mut incoming_replica_messages,
277                outgoing_replica_messages,
278                mut requests,
279                responses,
280            } = channels;
281
282            let priv_key = SigningKey::generate(&mut OsRng::default());
283            let t = config.t;
284
285            let (client_requests_send, client_requests_recv) = mpsc::channel(1000);
286
287            let (incoming_send, incoming_recv) = mpsc::channel(1000);
288            let (client_responses_send, mut client_responses_recv) =
289                mpsc::channel::<A::Decision>(1000);
290
291            let incoming_handler = tokio::spawn({
292                let outgoing_replica_messages = outgoing_replica_messages.clone();
293                async move {
294                    let mut state: HashMap<ClientId, ClientState<<A as AtomicBroadcast>::Decision>> = HashMap::<ClientId, ClientState<A::Decision>>::new();
295                    let mut relay = ResponseRelay::<A>::new(outgoing_replica_messages.clone());
296                    loop {
297                        select! {
298                            Some((msg_type, id, m)) = incoming_replica_messages.recv() => {
299                                match m {
300                                    CustomReplicaMessage::ReplicaMessage(m) => {
301                                        let _ = incoming_send.send((msg_type, id, m)).await;
302                                    }
303                                    CustomReplicaMessage::RequestBroadcast(r) => {
304                                        relay.on_remote_request(r.0, *r.1.as_ref(), id).await;
305                                        let _ = client_requests_send.try_send(r);
306                                    }
307                                    CustomReplicaMessage::Response(SignedResponse { messsage, signature }) => {
308                                        let state = state.entry(messsage.client_id()).or_default();
309                                        state.remote_response(id, messsage, signature);
310                                        if let Some(m) = state.get_response(t) {
311                                            let _ = responses.send(m).await;
312                                        }
313                                    }
314                                }
315                            }
316                            Some(r) = client_responses_recv.recv() => {
317                                let state = state.entry(r.client_id()).or_default();
318                                state.local_response(r.clone());
319                                if let Some(m) = state.get_response(t) {
320                                    let _ = responses.send(m).await;
321                                }
322                                let mut hasher = Blake2b::new();
323                                r.hash_with_digest(&mut hasher);
324                                let sig = priv_key.sign_prehashed(hasher, Some(ED25519_CONTEXT)).unwrap();
325                                let client_id = r.client_id();
326                                let request_id = r.request_id();
327                                let response = SignedResponse::new(r, sig);
328                                relay.on_response(client_id, request_id, response).await
329                            }
330                            Some(r) = requests.recv() => {
331                                relay.on_local_request(r.0, *r.1.as_ref());
332                                let _ = client_requests_send.try_send(r.clone());
333                                match outgoing_replica_messages
334                                    .send((
335                                        MessageDestination::Broadcast,
336                                        CustomReplicaMessage::RequestBroadcast(r),
337                                    ))
338                                    .await
339                                {
340                                    Ok(()) => {}
341                                    Err(_) => break,
342                                }
343                            }
344                            else => break
345                        }
346                    }
347                }.in_current_span()
348            });
349
350            let (outgoing_send, mut outgoing_recv) = mpsc::channel(1000);
351            let outgoing_handler = tokio::spawn(
352                async move {
353                    while let Some((dest, m)) = outgoing_recv.recv().await {
354                        match outgoing_replica_messages
355                            .send((dest, CustomReplicaMessage::ReplicaMessage(m)))
356                            .await
357                        {
358                            Ok(()) => {}
359                            Err(_) => break,
360                        }
361                    }
362                }
363                .in_current_span(),
364            );
365
366            self.inner
367                .start(
368                    config,
369                    AtomicBroadcastChannels {
370                        incoming_replica_messages: incoming_recv,
371                        outgoing_replica_messages: outgoing_send,
372                        requests: client_requests_recv,
373                        responses: client_responses_send,
374                    },
375                    ready_for_clients,
376                )
377                .await
378                .unwrap()
379                .unwrap();
380
381            incoming_handler.await.unwrap();
382            outgoing_handler.await.unwrap();
383            Ok(())
384        })
385    }
386}