aleph-bft 0.45.4

AlephBFT is an asynchronous and Byzantine fault tolerant consensus protocol aimed at ordering arbitrary messages (transactions). It has been designed to continuously operate even in the harshest conditions: with no bounds on message-delivery delays and in the presence of malicious actors. This makes it an excellent fit for blockchain-related applications.
Documentation
use std::{
    collections::{HashSet, VecDeque},
    time::{Duration, Instant},
};

use crate::{
    testing::{init_log, spawn_honest_member, HonestMember, NetworkData},
    NodeCount, NodeIndex, SpawnHandle,
};
use aleph_bft_mock::{DataProvider, NetworkHook, Router, Spawner};
use futures::StreamExt;
use log::info;

struct Latency {
    who: NodeIndex,
    buffer: VecDeque<(Instant, (NetworkData, NodeIndex, NodeIndex))>,
}

const LATENCY: Duration = Duration::from_millis(300);

impl Latency {
    pub fn new(who: NodeIndex) -> Self {
        Latency {
            who,
            buffer: VecDeque::new(),
        }
    }

    fn add_message(
        &mut self,
        data: NetworkData,
        sender: NodeIndex,
        recipient: NodeIndex,
    ) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
        match sender == self.who || recipient == self.who {
            true => {
                self.buffer
                    .push_back((Instant::now(), (data, sender, recipient)));
                Vec::new()
            }
            false => vec![(data, sender, recipient)],
        }
    }

    fn messages_to_send(&mut self) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
        let mut result = Vec::new();
        while !self.buffer.is_empty() {
            let (when, msg) = self
                .buffer
                .pop_front()
                .expect("just checked it is not empty");
            if Instant::now().duration_since(when) < LATENCY {
                self.buffer.push_front((when, msg));
                break;
            }
            result.push(msg);
        }
        result
    }
}

impl NetworkHook<NetworkData> for Latency {
    fn process_message(
        &mut self,
        data: NetworkData,
        sender: NodeIndex,
        recipient: NodeIndex,
    ) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
        let mut result = self.add_message(data, sender, recipient);
        result.append(&mut self.messages_to_send());
        result
    }
}

#[tokio::test(flavor = "multi_thread")]
async fn delayed_finalized() {
    let n_members = NodeCount(7);
    let australian = NodeIndex(0);
    init_log();
    let spawner = Spawner::new();
    let mut batch_rxs = Vec::new();
    let mut exits = Vec::new();
    let mut handles = Vec::new();
    let (mut net_hub, networks) = Router::new(n_members);

    net_hub.add_hook(Latency::new(australian));

    spawner.spawn("network-hub", net_hub);

    for (network, _) in networks {
        let ix = network.index();
        let HonestMember {
            finalization_rx,
            exit_tx,
            handle,
            ..
        } = spawn_honest_member(
            spawner,
            ix,
            n_members,
            vec![],
            DataProvider::new_range(ix.0 * 50, (ix.0 + 1) * 50),
            network,
        );
        batch_rxs.push(finalization_rx);
        exits.push(exit_tx);
        handles.push(handle);
    }
    let to_finalize: HashSet<u32> = (0..((n_members.0) * 50))
        .map(|number| number as u32)
        .collect();

    for mut rx in batch_rxs.drain(..) {
        let mut to_finalize_local = to_finalize.clone();
        while !to_finalize_local.is_empty() {
            let number = rx.next().await.unwrap();
            info!("finalizing {}", number);
            assert!(to_finalize_local.remove(&number));
        }
        info!("finished one node");
    }

    for exit in exits {
        let _ = exit.send(());
    }
    for handle in handles {
        let _ = handle.await;
    }
}