abcperf-client-proxy 0.10.0

Client Request Proxy for ABCperf
Documentation
use std::{
    cmp::Ordering,
    collections::{HashMap, HashSet},
    fmt::Debug,
    mem,
};

use anyhow::Error;
use blake2::Blake2b;
pub use blake2::Digest;
pub use ed25519_dalek::Signature;
use rand::rngs::OsRng;

use abcperf::{
    atomic_broadcast::{AtomicBroadcast, AtomicBroadcastChannels, AtomicBroadcastConfiguration},
    MessageDestination,
};
use async_trait::async_trait;
use ed25519_dalek::SigningKey;
use relay::ResponseRelay;
use serde::{Deserialize, Serialize};
use shared_ids::{AnyId, ClientId, ReplicaId, RequestId};
use tokio::{select, sync::mpsc, task::JoinHandle};
use tracing::Instrument;

mod relay;

static ED25519_CONTEXT: &[u8] = b"abcperf-client-proxy-sig";

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SignedResponse<S> {
    messsage: S,
    signature: Signature,
}

impl<S> SignedResponse<S> {
    fn new(messsage: S, signature: Signature) -> Self {
        Self {
            messsage,
            signature,
        }
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum CustomReplicaMessage<M, R, S> {
    ReplicaMessage(M),
    RequestBroadcast((ClientId, R)),
    Response(SignedResponse<S>),
}

pub trait ResponseInfo {
    fn client_id(&self) -> ClientId;
    fn request_id(&self) -> RequestId;
    fn hash_with_digest<D: Digest>(&self, digest: &mut D);
}

pub trait ResponseInfoNoClientId {
    fn request_id(&self) -> RequestId;
    fn hash_with_digest<D: Digest>(&self, digest: &mut D);
}

impl<T: ResponseInfoNoClientId> ResponseInfo for (ClientId, T) {
    fn client_id(&self) -> ClientId {
        self.0
    }

    fn request_id(&self) -> RequestId {
        self.1.request_id()
    }

    fn hash_with_digest<D: Digest>(&self, digest: &mut D) {
        digest.update(self.client_id().as_u64().to_be_bytes());
        self.1.hash_with_digest(digest);
    }
}

pub struct ClientProxyAdapter<A: AtomicBroadcast> {
    inner: A,
}

impl<A: AtomicBroadcast> ClientProxyAdapter<A> {
    pub fn new(inner: A) -> Self {
        Self { inner }
    }
}

enum ClientState<R> {
    Empty,
    Filled {
        request_id: RequestId,
        response_state: ResponseState<R>,
    },
}

impl<R> Default for ClientState<R> {
    fn default() -> Self {
        Self::Empty
    }
}

impl<R: ResponseInfo + Eq> ClientState<R> {
    fn local_response(&mut self, response: R) {
        let new_id = if let Self::Filled {
            request_id,
            response_state,
        } = self
        {
            let id = response.request_id();
            match id.cmp(request_id) {
                Ordering::Less => return,
                Ordering::Equal => {
                    response_state.local_response(response);
                    return;
                }
                Ordering::Greater => id,
            }
        } else {
            response.request_id()
        };
        *self = Self::Filled {
            request_id: new_id,
            response_state: ResponseState::from_local(response),
        }
    }

    fn remote_response(&mut self, replica_id: ReplicaId, response: R, signature: Signature) {
        let new_id = if let Self::Filled {
            request_id,
            response_state,
        } = self
        {
            let id = response.request_id();
            match id.cmp(request_id) {
                Ordering::Less => return,
                Ordering::Equal => {
                    response_state.remote_response(replica_id, response, signature);
                    return;
                }
                Ordering::Greater => id,
            }
        } else {
            response.request_id()
        };
        *self = Self::Filled {
            request_id: new_id,
            response_state: ResponseState::from_remote(replica_id, response, signature),
        }
    }

    fn get_response(&mut self, t: u64) -> Option<(R, Box<[Signature]>)> {
        if let Self::Filled {
            request_id: _,
            response_state: response_state_ref,
        } = self
        {
            let mut result = None;
            let response_state = mem::replace(response_state_ref, ResponseState::Empty);
            let response_state = response_state.get_response(t, &mut result);
            *response_state_ref = response_state;
            result
        } else {
            None
        }
    }
}

enum ResponseState<R> {
    Empty,
    Local {
        local: R,
        confirmed_singatures: Vec<Signature>,
        confirmed_replicas: HashSet<ReplicaId>,
    },
    RemoteOnly {
        unconfirmed_signatures: Vec<(ReplicaId, R, Signature)>,
    },
}

impl<R: Eq> ResponseState<R> {
    fn from_local(response: R) -> Self {
        Self::Local {
            local: response,
            confirmed_singatures: Default::default(),
            confirmed_replicas: Default::default(),
        }
    }

    fn from_remote(replica_id: ReplicaId, response: R, signature: Signature) -> Self {
        Self::RemoteOnly {
            unconfirmed_signatures: vec![(replica_id, response, signature)],
        }
    }

    fn local_response(&mut self, response: R) {
        match self {
            ResponseState::Local { .. } => unreachable!(),
            ResponseState::RemoteOnly {
                unconfirmed_signatures,
            } => {
                let mut confirmed_singatures = Vec::new();
                let mut confirmed_replicas = HashSet::new();
                for (id, r, s) in mem::take(unconfirmed_signatures).into_iter() {
                    if r == response && confirmed_replicas.insert(id) {
                        confirmed_singatures.push(s);
                    }
                }
                *self = Self::Local {
                    confirmed_singatures,
                    confirmed_replicas,
                    local: response,
                }
            }
            ResponseState::Empty => *self = Self::from_local(response),
        }
    }

    fn remote_response(&mut self, replica_id: ReplicaId, response: R, signature: Signature) {
        match self {
            ResponseState::Local {
                local,
                confirmed_singatures,
                confirmed_replicas,
            } => {
                if local == &response && confirmed_replicas.insert(replica_id) {
                    confirmed_singatures.push(signature);
                }
            }
            ResponseState::RemoteOnly {
                unconfirmed_signatures,
            } => {
                unconfirmed_signatures.push((replica_id, response, signature));
            }
            ResponseState::Empty => *self = Self::from_remote(replica_id, response, signature),
        }
    }

    fn get_response(self, t: u64, result: &mut Option<(R, Box<[Signature]>)>) -> Self {
        match self {
            ResponseState::Local {
                local,
                confirmed_singatures,
                confirmed_replicas,
            } if confirmed_singatures.len() as u64 >= t => {
                assert_eq!(confirmed_singatures.len(), confirmed_replicas.len());
                *result = Some((local, confirmed_singatures.into()));
                Self::Empty
            }
            this => this,
        }
    }
}

#[async_trait]
impl<A: AtomicBroadcast + Send> AtomicBroadcast for ClientProxyAdapter<A>
where
    A::Transaction: Unpin + Clone + AsRef<RequestId>,
    A::Decision: Unpin + Clone + Eq + ResponseInfo,
{
    type Config = A::Config;

    type ReplicaMessage = CustomReplicaMessage<A::ReplicaMessage, A::Transaction, A::Decision>;

    type Transaction = A::Transaction;

    type Decision = (A::Decision, Box<[Signature]>);

    fn start(
        self,
        config: AtomicBroadcastConfiguration<Self::Config>,
        channels: AtomicBroadcastChannels<Self::ReplicaMessage, Self::Transaction, Self::Decision>,
        ready_for_clients: impl Send + 'static + FnOnce() + Sync,
    ) -> JoinHandle<Result<(), Error>> {
        tokio::spawn(async move {
            let AtomicBroadcastChannels {
                mut incoming_replica_messages,
                outgoing_replica_messages,
                mut requests,
                responses,
            } = channels;

            let priv_key = SigningKey::generate(&mut OsRng::default());
            let t = config.t;

            let (client_requests_send, client_requests_recv) = mpsc::channel(1000);

            let (incoming_send, incoming_recv) = mpsc::channel(1000);
            let (client_responses_send, mut client_responses_recv) =
                mpsc::channel::<A::Decision>(1000);

            let incoming_handler = tokio::spawn({
                let outgoing_replica_messages = outgoing_replica_messages.clone();
                async move {
                    let mut state: HashMap<ClientId, ClientState<<A as AtomicBroadcast>::Decision>> = HashMap::<ClientId, ClientState<A::Decision>>::new();
                    let mut relay = ResponseRelay::<A>::new(outgoing_replica_messages.clone());
                    loop {
                        select! {
                            Some((msg_type, id, m)) = incoming_replica_messages.recv() => {
                                match m {
                                    CustomReplicaMessage::ReplicaMessage(m) => {
                                        let _ = incoming_send.send((msg_type, id, m)).await;
                                    }
                                    CustomReplicaMessage::RequestBroadcast(r) => {
                                        relay.on_remote_request(r.0, *r.1.as_ref(), id).await;
                                        let _ = client_requests_send.try_send(r);
                                    }
                                    CustomReplicaMessage::Response(SignedResponse { messsage, signature }) => {
                                        let state = state.entry(messsage.client_id()).or_default();
                                        state.remote_response(id, messsage, signature);
                                        if let Some(m) = state.get_response(t) {
                                            let _ = responses.send(m).await;
                                        }
                                    }
                                }
                            }
                            Some(r) = client_responses_recv.recv() => {
                                let state = state.entry(r.client_id()).or_default();
                                state.local_response(r.clone());
                                if let Some(m) = state.get_response(t) {
                                    let _ = responses.send(m).await;
                                }
                                let mut hasher = Blake2b::new();
                                r.hash_with_digest(&mut hasher);
                                let sig = priv_key.sign_prehashed(hasher, Some(ED25519_CONTEXT)).unwrap();
                                let client_id = r.client_id();
                                let request_id = r.request_id();
                                let response = SignedResponse::new(r, sig);
                                relay.on_response(client_id, request_id, response).await
                            }
                            Some(r) = requests.recv() => {
                                relay.on_local_request(r.0, *r.1.as_ref());
                                let _ = client_requests_send.try_send(r.clone());
                                match outgoing_replica_messages
                                    .send((
                                        MessageDestination::Broadcast,
                                        CustomReplicaMessage::RequestBroadcast(r),
                                    ))
                                    .await
                                {
                                    Ok(()) => {}
                                    Err(_) => break,
                                }
                            }
                            else => break
                        }
                    }
                }.in_current_span()
            });

            let (outgoing_send, mut outgoing_recv) = mpsc::channel(1000);
            let outgoing_handler = tokio::spawn(
                async move {
                    while let Some((dest, m)) = outgoing_recv.recv().await {
                        match outgoing_replica_messages
                            .send((dest, CustomReplicaMessage::ReplicaMessage(m)))
                            .await
                        {
                            Ok(()) => {}
                            Err(_) => break,
                        }
                    }
                }
                .in_current_span(),
            );

            self.inner
                .start(
                    config,
                    AtomicBroadcastChannels {
                        incoming_replica_messages: incoming_recv,
                        outgoing_replica_messages: outgoing_send,
                        requests: client_requests_recv,
                        responses: client_responses_send,
                    },
                    ready_for_clients,
                )
                .await
                .unwrap()
                .unwrap();

            incoming_handler.await.unwrap();
            outgoing_handler.await.unwrap();
            Ok(())
        })
    }
}