1use std::{
2 cmp::Ordering,
3 collections::{HashMap, HashSet},
4 fmt::Debug,
5 mem,
6};
7
8use anyhow::Error;
9use blake2::Blake2b;
10pub use blake2::Digest;
11pub use ed25519_dalek::Signature;
12use rand::rngs::OsRng;
13
14use abcperf::{
15 atomic_broadcast::{AtomicBroadcast, AtomicBroadcastChannels, AtomicBroadcastConfiguration},
16 MessageDestination,
17};
18use async_trait::async_trait;
19use ed25519_dalek::SigningKey;
20use relay::ResponseRelay;
21use serde::{Deserialize, Serialize};
22use shared_ids::{AnyId, ClientId, ReplicaId, RequestId};
23use tokio::{select, sync::mpsc, task::JoinHandle};
24use tracing::Instrument;
25
26mod relay;
27
28static ED25519_CONTEXT: &[u8] = b"abcperf-client-proxy-sig";
29
30#[derive(Debug, Serialize, Deserialize, Clone)]
31pub struct SignedResponse<S> {
32 messsage: S,
33 signature: Signature,
34}
35
36impl<S> SignedResponse<S> {
37 fn new(messsage: S, signature: Signature) -> Self {
38 Self {
39 messsage,
40 signature,
41 }
42 }
43}
44
45#[derive(Debug, Serialize, Deserialize)]
46pub enum CustomReplicaMessage<M, R, S> {
47 ReplicaMessage(M),
48 RequestBroadcast((ClientId, R)),
49 Response(SignedResponse<S>),
50}
51
52pub trait ResponseInfo {
53 fn client_id(&self) -> ClientId;
54 fn request_id(&self) -> RequestId;
55 fn hash_with_digest<D: Digest>(&self, digest: &mut D);
56}
57
58pub trait ResponseInfoNoClientId {
59 fn request_id(&self) -> RequestId;
60 fn hash_with_digest<D: Digest>(&self, digest: &mut D);
61}
62
63impl<T: ResponseInfoNoClientId> ResponseInfo for (ClientId, T) {
64 fn client_id(&self) -> ClientId {
65 self.0
66 }
67
68 fn request_id(&self) -> RequestId {
69 self.1.request_id()
70 }
71
72 fn hash_with_digest<D: Digest>(&self, digest: &mut D) {
73 digest.update(self.client_id().as_u64().to_be_bytes());
74 self.1.hash_with_digest(digest);
75 }
76}
77
78pub struct ClientProxyAdapter<A: AtomicBroadcast> {
79 inner: A,
80}
81
82impl<A: AtomicBroadcast> ClientProxyAdapter<A> {
83 pub fn new(inner: A) -> Self {
84 Self { inner }
85 }
86}
87
88enum ClientState<R> {
89 Empty,
90 Filled {
91 request_id: RequestId,
92 response_state: ResponseState<R>,
93 },
94}
95
96impl<R> Default for ClientState<R> {
97 fn default() -> Self {
98 Self::Empty
99 }
100}
101
102impl<R: ResponseInfo + Eq> ClientState<R> {
103 fn local_response(&mut self, response: R) {
104 let new_id = if let Self::Filled {
105 request_id,
106 response_state,
107 } = self
108 {
109 let id = response.request_id();
110 match id.cmp(request_id) {
111 Ordering::Less => return,
112 Ordering::Equal => {
113 response_state.local_response(response);
114 return;
115 }
116 Ordering::Greater => id,
117 }
118 } else {
119 response.request_id()
120 };
121 *self = Self::Filled {
122 request_id: new_id,
123 response_state: ResponseState::from_local(response),
124 }
125 }
126
127 fn remote_response(&mut self, replica_id: ReplicaId, response: R, signature: Signature) {
128 let new_id = if let Self::Filled {
129 request_id,
130 response_state,
131 } = self
132 {
133 let id = response.request_id();
134 match id.cmp(request_id) {
135 Ordering::Less => return,
136 Ordering::Equal => {
137 response_state.remote_response(replica_id, response, signature);
138 return;
139 }
140 Ordering::Greater => id,
141 }
142 } else {
143 response.request_id()
144 };
145 *self = Self::Filled {
146 request_id: new_id,
147 response_state: ResponseState::from_remote(replica_id, response, signature),
148 }
149 }
150
151 fn get_response(&mut self, t: u64) -> Option<(R, Box<[Signature]>)> {
152 if let Self::Filled {
153 request_id: _,
154 response_state: response_state_ref,
155 } = self
156 {
157 let mut result = None;
158 let response_state = mem::replace(response_state_ref, ResponseState::Empty);
159 let response_state = response_state.get_response(t, &mut result);
160 *response_state_ref = response_state;
161 result
162 } else {
163 None
164 }
165 }
166}
167
168enum ResponseState<R> {
169 Empty,
170 Local {
171 local: R,
172 confirmed_singatures: Vec<Signature>,
173 confirmed_replicas: HashSet<ReplicaId>,
174 },
175 RemoteOnly {
176 unconfirmed_signatures: Vec<(ReplicaId, R, Signature)>,
177 },
178}
179
180impl<R: Eq> ResponseState<R> {
181 fn from_local(response: R) -> Self {
182 Self::Local {
183 local: response,
184 confirmed_singatures: Default::default(),
185 confirmed_replicas: Default::default(),
186 }
187 }
188
189 fn from_remote(replica_id: ReplicaId, response: R, signature: Signature) -> Self {
190 Self::RemoteOnly {
191 unconfirmed_signatures: vec![(replica_id, response, signature)],
192 }
193 }
194
195 fn local_response(&mut self, response: R) {
196 match self {
197 ResponseState::Local { .. } => unreachable!(),
198 ResponseState::RemoteOnly {
199 unconfirmed_signatures,
200 } => {
201 let mut confirmed_singatures = Vec::new();
202 let mut confirmed_replicas = HashSet::new();
203 for (id, r, s) in mem::take(unconfirmed_signatures).into_iter() {
204 if r == response && confirmed_replicas.insert(id) {
205 confirmed_singatures.push(s);
206 }
207 }
208 *self = Self::Local {
209 confirmed_singatures,
210 confirmed_replicas,
211 local: response,
212 }
213 }
214 ResponseState::Empty => *self = Self::from_local(response),
215 }
216 }
217
218 fn remote_response(&mut self, replica_id: ReplicaId, response: R, signature: Signature) {
219 match self {
220 ResponseState::Local {
221 local,
222 confirmed_singatures,
223 confirmed_replicas,
224 } => {
225 if local == &response && confirmed_replicas.insert(replica_id) {
226 confirmed_singatures.push(signature);
227 }
228 }
229 ResponseState::RemoteOnly {
230 unconfirmed_signatures,
231 } => {
232 unconfirmed_signatures.push((replica_id, response, signature));
233 }
234 ResponseState::Empty => *self = Self::from_remote(replica_id, response, signature),
235 }
236 }
237
238 fn get_response(self, t: u64, result: &mut Option<(R, Box<[Signature]>)>) -> Self {
239 match self {
240 ResponseState::Local {
241 local,
242 confirmed_singatures,
243 confirmed_replicas,
244 } if confirmed_singatures.len() as u64 >= t => {
245 assert_eq!(confirmed_singatures.len(), confirmed_replicas.len());
246 *result = Some((local, confirmed_singatures.into()));
247 Self::Empty
248 }
249 this => this,
250 }
251 }
252}
253
254#[async_trait]
255impl<A: AtomicBroadcast + Send> AtomicBroadcast for ClientProxyAdapter<A>
256where
257 A::Transaction: Unpin + Clone + AsRef<RequestId>,
258 A::Decision: Unpin + Clone + Eq + ResponseInfo,
259{
260 type Config = A::Config;
261
262 type ReplicaMessage = CustomReplicaMessage<A::ReplicaMessage, A::Transaction, A::Decision>;
263
264 type Transaction = A::Transaction;
265
266 type Decision = (A::Decision, Box<[Signature]>);
267
268 fn start(
269 self,
270 config: AtomicBroadcastConfiguration<Self::Config>,
271 channels: AtomicBroadcastChannels<Self::ReplicaMessage, Self::Transaction, Self::Decision>,
272 ready_for_clients: impl Send + 'static + FnOnce() + Sync,
273 ) -> JoinHandle<Result<(), Error>> {
274 tokio::spawn(async move {
275 let AtomicBroadcastChannels {
276 mut incoming_replica_messages,
277 outgoing_replica_messages,
278 mut requests,
279 responses,
280 } = channels;
281
282 let priv_key = SigningKey::generate(&mut OsRng::default());
283 let t = config.t;
284
285 let (client_requests_send, client_requests_recv) = mpsc::channel(1000);
286
287 let (incoming_send, incoming_recv) = mpsc::channel(1000);
288 let (client_responses_send, mut client_responses_recv) =
289 mpsc::channel::<A::Decision>(1000);
290
291 let incoming_handler = tokio::spawn({
292 let outgoing_replica_messages = outgoing_replica_messages.clone();
293 async move {
294 let mut state: HashMap<ClientId, ClientState<<A as AtomicBroadcast>::Decision>> = HashMap::<ClientId, ClientState<A::Decision>>::new();
295 let mut relay = ResponseRelay::<A>::new(outgoing_replica_messages.clone());
296 loop {
297 select! {
298 Some((msg_type, id, m)) = incoming_replica_messages.recv() => {
299 match m {
300 CustomReplicaMessage::ReplicaMessage(m) => {
301 let _ = incoming_send.send((msg_type, id, m)).await;
302 }
303 CustomReplicaMessage::RequestBroadcast(r) => {
304 relay.on_remote_request(r.0, *r.1.as_ref(), id).await;
305 let _ = client_requests_send.try_send(r);
306 }
307 CustomReplicaMessage::Response(SignedResponse { messsage, signature }) => {
308 let state = state.entry(messsage.client_id()).or_default();
309 state.remote_response(id, messsage, signature);
310 if let Some(m) = state.get_response(t) {
311 let _ = responses.send(m).await;
312 }
313 }
314 }
315 }
316 Some(r) = client_responses_recv.recv() => {
317 let state = state.entry(r.client_id()).or_default();
318 state.local_response(r.clone());
319 if let Some(m) = state.get_response(t) {
320 let _ = responses.send(m).await;
321 }
322 let mut hasher = Blake2b::new();
323 r.hash_with_digest(&mut hasher);
324 let sig = priv_key.sign_prehashed(hasher, Some(ED25519_CONTEXT)).unwrap();
325 let client_id = r.client_id();
326 let request_id = r.request_id();
327 let response = SignedResponse::new(r, sig);
328 relay.on_response(client_id, request_id, response).await
329 }
330 Some(r) = requests.recv() => {
331 relay.on_local_request(r.0, *r.1.as_ref());
332 let _ = client_requests_send.try_send(r.clone());
333 match outgoing_replica_messages
334 .send((
335 MessageDestination::Broadcast,
336 CustomReplicaMessage::RequestBroadcast(r),
337 ))
338 .await
339 {
340 Ok(()) => {}
341 Err(_) => break,
342 }
343 }
344 else => break
345 }
346 }
347 }.in_current_span()
348 });
349
350 let (outgoing_send, mut outgoing_recv) = mpsc::channel(1000);
351 let outgoing_handler = tokio::spawn(
352 async move {
353 while let Some((dest, m)) = outgoing_recv.recv().await {
354 match outgoing_replica_messages
355 .send((dest, CustomReplicaMessage::ReplicaMessage(m)))
356 .await
357 {
358 Ok(()) => {}
359 Err(_) => break,
360 }
361 }
362 }
363 .in_current_span(),
364 );
365
366 self.inner
367 .start(
368 config,
369 AtomicBroadcastChannels {
370 incoming_replica_messages: incoming_recv,
371 outgoing_replica_messages: outgoing_send,
372 requests: client_requests_recv,
373 responses: client_responses_send,
374 },
375 ready_for_clients,
376 )
377 .await
378 .unwrap()
379 .unwrap();
380
381 incoming_handler.await.unwrap();
382 outgoing_handler.await.unwrap();
383 Ok(())
384 })
385 }
386}