aleph-bft 0.45.4

AlephBFT is an asynchronous and Byzantine fault tolerant consensus protocol aimed at ordering arbitrary messages (transactions). It has been designed to continuously operate even in the harshest conditions: with no bounds on message-delivery delays and in the presence of malicious actors. This makes it an excellent fit for blockchain-related applications.
Documentation
use crate::{
    alerts::{Alert, ForkingNotification},
    collection::CollectionResponse,
    consensus::{
        handler::{Consensus, ConsensusResult},
        LOG_TARGET,
    },
    dag::DagUnit,
    dissemination::{Addressed, DisseminationMessage},
    network::{UnitMessage, UnitMessageTo},
    units::{SignedUnit, UncheckedSignedUnit, Unit},
    Data, Hasher, Index, MultiKeychain, Receiver, Sender, Terminator, UnitFinalizationHandler,
};
use futures::{FutureExt, StreamExt};
use futures_timer::Delay;
use log::{debug, error, info, trace, warn};
use std::time::Duration;

pub struct Service<UFH, MK>
where
    UFH: UnitFinalizationHandler,
    MK: MultiKeychain,
{
    handler: Consensus<UFH, MK>,
    alerts_for_alerter: Sender<Alert<UFH::Hasher, UFH::Data, MK::Signature>>,
    notifications_from_alerter:
        Receiver<ForkingNotification<UFH::Hasher, UFH::Data, MK::Signature>>,
    unit_messages_for_network: Sender<UnitMessageTo<UFH::Hasher, UFH::Data, MK::Signature>>,
    unit_messages_from_network: Receiver<UnitMessage<UFH::Hasher, UFH::Data, MK::Signature>>,
    responses_for_collection: Sender<CollectionResponse<UFH::Hasher, UFH::Data, MK>>,
    parents_for_creator: Sender<DagUnit<UFH::Hasher, UFH::Data, MK>>,
    backup_units_for_saver: Sender<DagUnit<UFH::Hasher, UFH::Data, MK>>,
    backup_units_from_saver: Receiver<DagUnit<UFH::Hasher, UFH::Data, MK>>,
    new_units_from_creator: Receiver<SignedUnit<UFH::Hasher, UFH::Data, MK>>,
    exiting: bool,
}

pub struct IO<H: Hasher, D: Data, MK: MultiKeychain> {
    pub backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
    pub backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
    pub alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
    pub notifications_from_alerter: Receiver<ForkingNotification<H, D, MK::Signature>>,
    pub unit_messages_for_network: Sender<UnitMessageTo<H, D, MK::Signature>>,
    pub unit_messages_from_network: Receiver<UnitMessage<H, D, MK::Signature>>,
    pub responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
    pub parents_for_creator: Sender<DagUnit<H, D, MK>>,
    pub new_units_from_creator: Receiver<SignedUnit<H, D, MK>>,
}

impl<UFH, MK> Service<UFH, MK>
where
    UFH: UnitFinalizationHandler,
    MK: MultiKeychain,
{
    pub fn new(handler: Consensus<UFH, MK>, io: IO<UFH::Hasher, UFH::Data, MK>) -> Self {
        let IO {
            backup_units_for_saver,
            backup_units_from_saver,
            alerts_for_alerter,
            notifications_from_alerter,
            unit_messages_from_network,
            unit_messages_for_network,
            responses_for_collection,
            parents_for_creator,
            new_units_from_creator,
        } = io;

        Service {
            handler,
            alerts_for_alerter,
            notifications_from_alerter,
            unit_messages_from_network,
            unit_messages_for_network,
            parents_for_creator,
            backup_units_for_saver,
            backup_units_from_saver,
            responses_for_collection,
            new_units_from_creator,
            exiting: false,
        }
    }

    fn crucial_channel_closed(&mut self, channel_name: &str) {
        warn!(target: LOG_TARGET, "{} channel unexpectedly closed, exiting.", channel_name);
        self.exiting = true;
    }

    fn handle_result(&mut self, result: ConsensusResult<UFH::Hasher, UFH::Data, MK>) {
        let ConsensusResult {
            units,
            alerts,
            messages,
        } = result;
        for unit in units {
            self.on_unit_reconstructed(unit);
        }
        for alert in alerts {
            if self.alerts_for_alerter.unbounded_send(alert).is_err() {
                self.crucial_channel_closed("Alerter");
            }
        }
        for message in messages {
            self.send_message_for_network(message.into())
        }
    }

    fn on_unit_received(
        &mut self,
        unit: UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>,
    ) {
        let result = self.handler.process_incoming_unit(unit);
        self.handle_result(result);
    }

    fn on_unit_message(
        &mut self,
        message: DisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>,
    ) {
        use DisseminationMessage::*;
        match message {
            Unit(u) => {
                trace!(target: LOG_TARGET, "New unit received {:?}.", &u);
                self.on_unit_received(u)
            }
            Request(node_id, request) => {
                trace!(target: LOG_TARGET, "Request {:?} received from {:?}.", request, node_id);
                if let Some(message) = self.handler.process_request(request, node_id) {
                    self.send_message_for_network(message.into());
                }
            }
            ParentsResponse(u_hash, parents) => {
                trace!(target: LOG_TARGET, "Response parents received for unit {:?}.", u_hash);
                let result = self.handler.process_parents(u_hash, parents);
                self.handle_result(result);
            }
            NewestUnitRequest(node_id, salt) => {
                trace!(target: LOG_TARGET, "Newest unit request received from {:?}.", node_id);
                let message = self.handler.process_newest_unit_request(salt, node_id);
                self.send_message_for_network(message.into())
            }
            NewestUnitResponse(response) => {
                trace!(target: LOG_TARGET, "Response newest unit received from {:?}.", response.index());
                if self
                    .responses_for_collection
                    .unbounded_send(response)
                    .is_err()
                {
                    debug!(target: LOG_TARGET, "Initial unit collection channel closed, dropping response.")
                }
            }
        }
    }

    fn on_forking_notification(
        &mut self,
        notification: ForkingNotification<UFH::Hasher, UFH::Data, MK::Signature>,
    ) {
        let result = self.handler.process_forking_notification(notification);
        self.handle_result(result);
    }

    fn trigger_tasks(&mut self) {
        for message in self.handler.trigger_tasks() {
            self.send_message_for_network(message);
        }
    }

    fn on_unit_reconstructed(&mut self, unit: DagUnit<UFH::Hasher, UFH::Data, MK>) {
        trace!(target: LOG_TARGET, "Unit {:?} {} reconstructed.", unit.hash(), unit.coord());
        if self.backup_units_for_saver.unbounded_send(unit).is_err() {
            self.crucial_channel_closed("Backup");
        }
    }

    fn on_unit_backup_saved(&mut self, unit: DagUnit<UFH::Hasher, UFH::Data, MK>) {
        if self
            .parents_for_creator
            .unbounded_send(unit.clone())
            .is_err()
        {
            self.crucial_channel_closed("Creator");
        }
        if let Some(message) = self.handler.on_unit_backup_saved(unit) {
            self.send_message_for_network(message);
        }
    }

    fn send_message_for_network(
        &mut self,
        notification: Addressed<DisseminationMessage<UFH::Hasher, UFH::Data, MK::Signature>>,
    ) {
        for recipient in notification.recipients() {
            if self
                .unit_messages_for_network
                .unbounded_send((notification.message().clone().into(), recipient.clone()))
                .is_err()
            {
                self.crucial_channel_closed("Network");
            }
        }
    }

    fn status_report(&self) {
        info!(target: LOG_TARGET, "Consensus status report: {}.", self.handler.status());
    }

    pub async fn run(
        mut self,
        data_from_backup: Vec<UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>>,
        mut terminator: Terminator,
    ) {
        let status_ticker_delay = Duration::from_secs(10);
        let mut status_ticker = Delay::new(status_ticker_delay).fuse();
        let mut task_ticker = Delay::new(self.handler.next_tick()).fuse();

        for unit in data_from_backup {
            self.on_unit_received(unit);
        }

        debug!(target: LOG_TARGET, "Consensus started.");
        loop {
            futures::select! {
                signed_unit = self.new_units_from_creator.next() => match signed_unit {
                    Some(signed_unit) => {
                        self.on_unit_received(signed_unit.into())
                    },
                    None => {
                        error!(target: LOG_TARGET, "Creation stream closed.");
                        break;
                    }
                },

                notification = self.notifications_from_alerter.next() => match notification {
                    Some(notification) => {
                        trace!(target: LOG_TARGET, "Received alerter notification: {:?}.", notification);
                        self.on_forking_notification(notification);
                    },
                    None => {
                        error!(target: LOG_TARGET, "Alert notification stream closed.");
                        break;
                    }
                },

                event = self.unit_messages_from_network.next() => match event {
                    Some(event) => self.on_unit_message(event.into()),
                    None => {
                        error!(target: LOG_TARGET, "Unit message stream closed.");
                        break;
                    }
                },

                message = self.backup_units_from_saver.next() => match message {
                    Some(unit) => self.on_unit_backup_saved(unit),
                    None => {
                        error!(target: LOG_TARGET, "Saved units receiver closed.");
                    }
                },

                _ = &mut task_ticker => {
                    self.trigger_tasks();
                    task_ticker = Delay::new(self.handler.next_tick()).fuse();
                },

                _ = &mut status_ticker => {
                    self.status_report();
                    status_ticker = Delay::new(status_ticker_delay).fuse();
                },

                _ = terminator.get_exit().fuse() => {
                    debug!(target: LOG_TARGET, "Consensus received exit signal.");
                    self.exiting = true;
                }
            }

            if self.exiting {
                debug!(target: LOG_TARGET, "Consensus decided to exit.");
                terminator.terminate_sync().await;
                break;
            }
        }

        debug!(target: LOG_TARGET, "Consensus run ended.");
    }
}