kadmium 0.6.0

A broadcast protocol implementation based on Kadcast.
Documentation
#![cfg(all(feature = "codec", feature = "sync"))]

use kadmium::{
    message::{FindKNodes, KNodes, Message, Ping, Pong},
    Id,
};
use pea2pea::{
    protocols::{Handshake, Reading, Writing},
    Pea2Pea,
};
use rand::{thread_rng, Rng};

mod common;
#[allow(unused_imports)]
use crate::common::{enable_tracing, KadNode};

#[tokio::test]
async fn connect_two_nodes() {
    let mut rng = thread_rng();
    let id_a = Id::new(rng.gen());
    let id_b = Id::new(rng.gen());
    // Unlikely but just in case we hit this case.
    assert_ne!(id_a, id_b);

    // Start two nodes and have them establish a connection.
    let node_a = KadNode::new(id_a).await;
    node_a.enable_handshake().await;

    let node_b = KadNode::new(id_b).await;
    node_b.enable_handshake().await;

    // The handshake is enacted here, assertions are checked in the handshake protocol
    // implementation.
    assert!(node_a
        .node()
        .connect(node_b.node().listening_addr().unwrap())
        .await
        .is_ok());
}

#[tokio::test]
async fn ping_pong_two_nodes() {
    // enable_tracing();

    let mut rng = thread_rng();
    let id_a = Id::new(rng.gen());
    let id_b = Id::new(rng.gen());
    // Unlikely but just in case we hit this case.
    assert_ne!(id_a, id_b);

    // Start two nodes and have them establish a connection.
    let node_a = KadNode::new(id_a).await;
    node_a.enable_handshake().await;
    node_a.enable_reading().await;
    node_a.enable_writing().await;

    let node_b = KadNode::new(id_b).await;
    node_b.enable_handshake().await;
    node_b.enable_reading().await;
    node_b.enable_writing().await;

    node_a
        .node()
        .connect(node_b.node().listening_addr().unwrap())
        .await
        .unwrap();

    let nonce = rng.gen();
    let ping = Message::Ping(Ping {
        nonce,
        id: node_a.router.local_id(),
    });

    let pong = Message::Pong(Pong {
        nonce,
        id: node_b.router.local_id(),
    });

    assert!(node_a
        .unicast(node_b.node().listening_addr().unwrap(), ping.clone())
        .unwrap()
        .await
        .is_ok());

    // Wait for PING to be received and PONG to come back.
    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
    assert_eq!(node_b.received_messages.read().get(&nonce), Some(&ping));
    assert_eq!(node_a.received_messages.read().get(&nonce), Some(&pong));
}

#[tokio::test]
async fn k_nodes_two_nodes() {
    enable_tracing();

    let mut rng = thread_rng();
    let id_a = Id::new(rng.gen());
    let id_b = Id::new(rng.gen());
    // Unlikely but just in case we hit this case.
    assert_ne!(id_a, id_b);

    // Start two nodes and have them establish a connection.
    let node_a = KadNode::new(id_a).await;
    node_a.enable_handshake().await;
    node_a.enable_reading().await;
    node_a.enable_writing().await;

    let node_b = KadNode::new(id_b).await;
    node_b.enable_handshake().await;
    node_b.enable_reading().await;
    node_b.enable_writing().await;

    node_a
        .node()
        .connect(node_b.node().listening_addr().unwrap())
        .await
        .unwrap();

    let nonce = rng.gen();
    let find_k_nodes = Message::FindKNodes(FindKNodes {
        nonce,
        id: node_a.router.local_id(),
    });

    let k_nodes = Message::KNodes(KNodes {
        nonce,
        nodes: vec![(
            node_a.router.local_id(),
            node_a.node().listening_addr().unwrap(),
        )],
    });

    assert!(node_a
        .unicast(
            node_b.node().listening_addr().unwrap(),
            find_k_nodes.clone()
        )
        .unwrap()
        .await
        .is_ok());

    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
    assert_eq!(
        node_b.received_messages.read().get(&nonce),
        Some(&find_k_nodes)
    );
    assert_eq!(node_a.received_messages.read().get(&nonce), Some(&k_nodes));
}