aleph-bft 0.45.4

AlephBFT is an asynchronous and Byzantine fault tolerant consensus protocol aimed at ordering arbitrary messages (transactions). It has been designed to continuously operate even in the harshest conditions: with no bounds on message-delivery delays and in the presence of malicious actors. This makes it an excellent fit for blockchain-related applications.
Documentation
use crate::{
    alerts::{
        handler::{Handler, RmcResponse},
        Alert, AlertMessage, ForkingNotification, NetworkMessage,
    },
    Data, Hasher, MultiKeychain, Multisigned, NodeIndex, Receiver, Recipient, Sender, Terminator,
};
use aleph_bft_rmc::{DoublingDelayScheduler, Message as RmcMessage};
use futures::{FutureExt, StreamExt};
use log::{debug, error, trace, warn};
use std::time::Duration;

const LOG_TARGET: &str = "AlephBFT-alerter";
type RmcService<H, MK, S, M> =
    aleph_bft_rmc::Service<H, MK, DoublingDelayScheduler<RmcMessage<H, S, M>>>;

pub struct Service<H: Hasher, D: Data, MK: MultiKeychain> {
    messages_for_network: Sender<(NetworkMessage<H, D, MK>, Recipient)>,
    messages_from_network: Receiver<NetworkMessage<H, D, MK>>,
    notifications_for_units: Sender<ForkingNotification<H, D, MK::Signature>>,
    alerts_from_units: Receiver<Alert<H, D, MK::Signature>>,
    node_index: NodeIndex,
    exiting: bool,
    handler: Handler<H, D, MK>,
    rmc_service: RmcService<H::Hash, MK, MK::Signature, MK::PartialMultisignature>,
}

pub struct IO<H: Hasher, D: Data, MK: MultiKeychain> {
    pub messages_for_network: Sender<(NetworkMessage<H, D, MK>, Recipient)>,
    pub messages_from_network: Receiver<NetworkMessage<H, D, MK>>,
    pub notifications_for_units: Sender<ForkingNotification<H, D, MK::Signature>>,
    pub alerts_from_units: Receiver<Alert<H, D, MK::Signature>>,
}

impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
    pub fn new(keychain: MK, io: IO<H, D, MK>, handler: Handler<H, D, MK>) -> Service<H, D, MK> {
        let IO {
            messages_for_network,
            messages_from_network,
            notifications_for_units,
            alerts_from_units,
        } = io;

        let node_index = keychain.index();
        let rmc_handler = aleph_bft_rmc::Handler::new(keychain);
        let rmc_service = aleph_bft_rmc::Service::new(
            DoublingDelayScheduler::new(Duration::from_millis(500)),
            rmc_handler,
        );

        Service {
            messages_for_network,
            messages_from_network,
            notifications_for_units,
            alerts_from_units,
            node_index,
            exiting: false,
            handler,
            rmc_service,
        }
    }

    fn rmc_message_to_network(
        &mut self,
        message: RmcMessage<H::Hash, MK::Signature, MK::PartialMultisignature>,
    ) {
        self.send_message_for_network(
            AlertMessage::RmcMessage(self.node_index, message),
            Recipient::Everyone,
        );
    }

    fn send_notification_for_units(
        &mut self,
        notification: ForkingNotification<H, D, MK::Signature>,
    ) {
        if self
            .notifications_for_units
            .unbounded_send(notification)
            .is_err()
        {
            warn!(
                target: LOG_TARGET,
                "Channel with forking notifications should be open"
            );
            self.exiting = true;
        }
    }

    fn send_message_for_network(
        &mut self,
        message: AlertMessage<H, D, MK::Signature, MK::PartialMultisignature>,
        recipient: Recipient,
    ) {
        if self
            .messages_for_network
            .unbounded_send((message, recipient))
            .is_err()
        {
            warn!(
                target: LOG_TARGET,
                "Channel with notifications for network should be open"
            );
            self.exiting = true;
        }
    }

    fn handle_message_from_network(
        &mut self,
        message: AlertMessage<H, D, MK::Signature, MK::PartialMultisignature>,
    ) {
        match message {
            AlertMessage::ForkAlert(alert) => match self.handler.on_network_alert(alert.clone()) {
                Ok((maybe_notification, hash)) => {
                    if let Some(multisigned) = self.rmc_service.start_rmc(hash) {
                        self.handle_multisigned(multisigned);
                    }
                    if let Some(notification) = maybe_notification {
                        self.send_notification_for_units(notification);
                    }
                }
                Err(error) => debug!(target: LOG_TARGET, "{}", error),
            },
            AlertMessage::RmcMessage(sender, message) => {
                match self.handler.on_rmc_message(sender, message) {
                    RmcResponse::RmcMessage(message) => {
                        if let Some(multisigned) = self.rmc_service.process_message(message) {
                            self.handle_multisigned(multisigned);
                        }
                    }
                    RmcResponse::AlertRequest(hash, recipient) => {
                        let message = AlertMessage::AlertRequest(self.node_index, hash);
                        self.send_message_for_network(message, recipient);
                    }
                    RmcResponse::Noop => {}
                }
            }
            AlertMessage::AlertRequest(node, hash) => {
                match self.handler.on_alert_request(node, hash) {
                    Ok((alert, recipient)) => {
                        self.send_message_for_network(AlertMessage::ForkAlert(alert), recipient);
                    }
                    Err(error) => debug!(target: LOG_TARGET, "{}", error),
                }
            }
        }
    }

    fn handle_alert_from_consensus(&mut self, alert: Alert<H, D, MK::Signature>) {
        trace!(target: LOG_TARGET, "Handling alert {:?}.", alert);
        let (message, recipient, hash) = self.handler.on_own_alert(alert.clone());
        self.send_message_for_network(message, recipient);
        if let Some(multisigned) = self.rmc_service.start_rmc(hash) {
            self.handle_multisigned(multisigned);
        }
    }

    fn handle_multisigned(&mut self, multisigned: Multisigned<H::Hash, MK>) {
        match self.handler.alert_confirmed(multisigned.clone()) {
            Ok(notification) => {
                self.send_notification_for_units(notification);
            }
            Err(error) => warn!(target: LOG_TARGET, "{}", error),
        }
    }

    pub async fn run(&mut self, mut terminator: Terminator) {
        loop {
            futures::select! {
                message = self.messages_from_network.next() => match message {
                    Some(message) => self.handle_message_from_network(message),
                    None => {
                        error!(target: LOG_TARGET, "Message stream closed.");
                        break;
                    }
                },
                alert = self.alerts_from_units.next() => match alert {
                    Some(alert) => self.handle_alert_from_consensus(alert),
                    None => {
                        error!(target: LOG_TARGET, "Alert stream closed.");
                        break;
                    }
                },
                message = self.rmc_service.next_message().fuse() => {
                    self.rmc_message_to_network(message);
                },
                _ = terminator.get_exit().fuse() => {
                    debug!(target: LOG_TARGET, "Received exit signal.");
                    self.exiting = true;
                },
            }
            if self.exiting {
                debug!(target: LOG_TARGET, "Alerter decided to exit.");
                terminator.terminate_sync().await;
                break;
            }
        }
    }
}