vented 0.11.5

Event driven encrypted tcp communicaton
Documentation
use async_std::sync::Mutex;
use async_std::task;
use crypto_box::SecretKey;
use log::LevelFilter;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use vented::event::Event;
use vented::server::data::{Node, ServerTimeouts};
use vented::server::server_events::NODE_LIST_REQUEST_EVENT;
use vented::server::VentedServer;

fn setup() {
    let _ = simple_logger::SimpleLogger::new()
        .with_module_level("async_std", LevelFilter::Warn)
        .with_module_level("async_io", LevelFilter::Warn)
        .with_module_level("polling", LevelFilter::Warn)
        .init();
}

#[test]
fn test_server_communication() {
    setup();
    let ping_count = Arc::new(AtomicUsize::new(0));
    let pong_count = Arc::new(AtomicUsize::new(0));
    let c_pinged = Arc::new(AtomicBool::new(false));
    let mut rng = rand::thread_rng();
    let global_secret_a = SecretKey::generate(&mut rng);
    let global_secret_b = SecretKey::generate(&mut rng);
    let global_secret_c = SecretKey::generate(&mut rng);

    let nodes = vec![
        Node {
            id: "A".to_string(),
            addresses: vec!["localhost:22222".to_string()],
            public_key: global_secret_a.public_key(),
            trusted: true,
        },
        Node {
            id: "B".to_string(),
            addresses: vec![],
            public_key: global_secret_b.public_key(),
            trusted: false,
        },
        Node {
            id: "C".to_string(),
            addresses: vec![],
            public_key: global_secret_c.public_key(),
            trusted: false,
        },
    ];
    let mut nodes_a = nodes.clone();
    for i in 0..10 {
        nodes_a.push(Node {
            id: format!("Node-{}", i),
            addresses: vec!["192.168.178.1".to_string()],
            public_key: global_secret_c.public_key(),
            trusted: false,
        })
    }

    task::block_on(async {
        let mut server_a = VentedServer::new(
            "A".to_string(),
            global_secret_a,
            nodes_a,
            ServerTimeouts::default(),
        );
        let mut server_b = VentedServer::new(
            "B".to_string(),
            global_secret_b,
            nodes.clone(),
            ServerTimeouts::default(),
        );
        let mut server_c = VentedServer::new(
            "C".to_string(),
            global_secret_c,
            nodes,
            ServerTimeouts::default(),
        );
        server_a.listen("localhost:22222".to_string());

        server_a.on("ping", {
            let ping_count = Arc::clone(&ping_count);
            move |_| {
                let ping_count = Arc::clone(&ping_count);
                Box::pin(async move {
                    ping_count.fetch_add(1, Ordering::Relaxed);

                    Some(Event::new("pong".to_string()))
                })
            }
        });
        server_b.on("pong", {
            let pong_count = Arc::clone(&pong_count);
            move |_| {
                let pong_count = Arc::clone(&pong_count);
                Box::pin(async move {
                    pong_count.fetch_add(1, Ordering::Relaxed);
                    None
                })
            }
        });
        server_c.on("ping", {
            let c_pinged = Arc::clone(&c_pinged);
            move |_| {
                let c_pinged = Arc::clone(&c_pinged);
                Box::pin(async move {
                    c_pinged.store(true, Ordering::Relaxed);
                    None
                })
            }
        });
        for i in 0..10 {
            assert!(server_a
                .emit(format!("Nodes-{}", i), Event::new("ping"))
                .await
                .is_err());
        }
        server_b
            .emit("A", Event::new(NODE_LIST_REQUEST_EVENT))
            .await
            .unwrap();
        server_c
            .emit("A", Event::new("ping".to_string()))
            .await
            .unwrap();
        server_b
            .emit("C", Event::new("ping".to_string()))
            .await
            .unwrap();
        for _ in 0..9 {
            server_b
                .emit("A", Event::new("ping".to_string()))
                .await
                .unwrap();
        }
        server_a
            .emit("B", Event::new("pong".to_string()))
            .await
            .unwrap();
        task::sleep(Duration::from_secs(2)).await;
    });
    // wait one second to make sure the servers were able to process the events

    assert_eq!(ping_count.load(Ordering::SeqCst), 10);
    assert_eq!(pong_count.load(Ordering::SeqCst), 10);
    assert!(c_pinged.load(Ordering::SeqCst));
}

const COUNT: usize = 20000;

#[test]
fn test_high_traffic() {
    setup();
    let ping_count = Arc::new(AtomicUsize::new(0));
    let pong_count = Arc::new(AtomicUsize::new(0));
    let last_pong = Arc::new(Mutex::new(Instant::now()));
    let running = Arc::new(AtomicBool::new(true));
    let mut rng = rand::thread_rng();
    let global_secret_a = SecretKey::generate(&mut rng);
    let global_secret_b = SecretKey::generate(&mut rng);

    let nodes = vec![
        Node {
            id: "A".to_string(),
            addresses: vec!["localhost:22223".to_string()],
            public_key: global_secret_a.public_key(),
            trusted: true,
        },
        Node {
            id: "B".to_string(),
            addresses: vec![],
            public_key: global_secret_b.public_key(),
            trusted: false,
        },
    ];

    task::block_on(async {
        let mut server_a = VentedServer::new(
            "A".to_string(),
            global_secret_a,
            nodes.clone(),
            ServerTimeouts::default(),
        );
        let mut server_b = VentedServer::new(
            "B".to_string(),
            global_secret_b,
            nodes,
            ServerTimeouts::default(),
        );
        server_a.listen("localhost:22223".to_string());
        task::sleep(Duration::from_millis(10)).await;

        server_a.on("ping", {
            let ping_count = Arc::clone(&ping_count);
            move |_| {
                let ping_count = Arc::clone(&ping_count);
                Box::pin(async move {
                    ping_count.fetch_add(1, Ordering::Relaxed);

                    Some(Event::new("pong".to_string()))
                })
            }
        });
        server_b.on("pong", {
            let pong_count = Arc::clone(&pong_count);
            let running = Arc::clone(&running);
            let last_pong = Arc::clone(&last_pong);

            move |_| {
                let pong_count = Arc::clone(&pong_count);
                let running = Arc::clone(&running);
                let last_pong = Arc::clone(&last_pong);

                Box::pin(async move {
                    *last_pong.lock().await = Instant::now();
                    let num = pong_count.fetch_add(1, Ordering::Relaxed);
                    log::info!("Received pong nr. {}", num);
                    if num >= COUNT - (COUNT / 100) {
                        running.store(true, Ordering::Relaxed);
                        None
                    } else {
                        Some(Event::new("ping".to_string()))
                    }
                })
            }
        });
        let mut promises = Vec::new();

        for _ in 0..COUNT / 100 {
            promises.push(server_b.emit("A", Event::new("ping")));
        }
        futures::future::join_all(promises).await;

        while running.load(Ordering::Relaxed)
            && last_pong.lock().await.elapsed() < Duration::from_secs(2)
        {
            task::sleep(Duration::from_micros(10)).await;
        }
    });
    // wait one second to make sure the servers were able to process the events

    assert_eq!(ping_count.load(Ordering::SeqCst), COUNT);
    assert_eq!(pong_count.load(Ordering::SeqCst), COUNT);
}