pipenet 0.1.5

Non blocking tcp stream wrapper using channels
Documentation
use super::*;
use serde::{Deserialize, Serialize};
use serial_test::serial;
use std::{
    net::{SocketAddr, TcpListener, TcpStream},
    thread::sleep,
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct Msg {
    data: Vec<u8>,
}

const TEST_SIZE: usize = 10_000_000; //10mb

#[test]
#[serial]
fn test_big_payload() {
    let (mut h, mut c) = build_stream_pair();

    let m = Msg {
        data: vec![1; TEST_SIZE],
    };
    h.write(m.clone().into()).unwrap();

    let msg_rec = wait_msg(&mut c).unwrap();
    assert_eq!(m, *msg_rec);
}

#[test]
#[serial]
fn test_multichannels() {
    let (mut h1, mut h2, mut c1, mut c2) = build_stream_triple();

    // Do it a few times to build up buffers
    for _ in 0..10 {
        let m = Msg { data: vec![1; 100] };

        h1.write(m.clone().into()).unwrap();
        h2.write(m.clone().into()).unwrap();
        let msg_rec1 = wait_msg(&mut c1).unwrap();
        let msg_rec2 = wait_msg(&mut c2).unwrap();

        assert_eq!(m, *msg_rec1);
        assert_eq!(m, *msg_rec2);
    }
}

#[test]
#[serial]
#[cfg(all(feature = "compression", feature = "encryption"))]
fn test_compresssion_and_encryption() {
    let (mut h1, mut h2, mut c1, mut c2) = build_stream_triple_comp_enc();

    // Do it a few times to build up buffers
    for _ in 0..10 {
        let m = Msg { data: vec![1; 100] };

        h1.write(m.clone().into()).unwrap();
        h2.write(m.clone().into()).unwrap();
        let msg_rec1 = wait_msg(&mut c1).unwrap();
        let msg_rec2 = wait_msg(&mut c2).unwrap();

        assert_eq!(m, *msg_rec1);
        assert_eq!(m, *msg_rec2);
    }
}

fn wait_msg(c: &mut NonBlockStream<Msg>) -> Option<Box<Msg>> {
    let mut count = 0;
    sleep(Duration::from_millis(100));
    let mut msg_rec = c.read().unwrap();
    while msg_rec.is_none() && count < 100 {
        sleep(Duration::from_millis(100));
        count += 1;
        msg_rec = c.read().unwrap();
    }
    msg_rec
}

fn build_stream_pair() -> (NonBlockStream<Msg>, NonBlockStream<Msg>) {
    let p = find_port();
    let s = SocketAddr::from(([127, 0, 0, 1], p));
    let l = TcpListener::bind(s).unwrap();
    let c = TcpStream::connect(s).unwrap();
    let (h, _) = l.accept().unwrap();
    (h.into(), c.into())
}

fn build_stream_triple() -> (
    NonBlockStream<Msg>,
    NonBlockStream<Msg>,
    NonBlockStream<Msg>,
    NonBlockStream<Msg>,
) {
    let (c1, h_to_c1, c2, h_to_c2) = create_connections();
    (h_to_c1.into(), h_to_c2.into(), c1.into(), c2.into())
}

fn create_connections() -> (TcpStream, TcpStream, TcpStream, TcpStream) {
    let p = find_port();
    let s = SocketAddr::from(([127, 0, 0, 1], p));
    let l = TcpListener::bind(s).unwrap();
    let c1 = TcpStream::connect(s).unwrap();
    let (h_to_c1, _) = l.accept().unwrap();
    let c2 = TcpStream::connect(s).unwrap();
    let (h_to_c2, _) = l.accept().unwrap();
    (c1, h_to_c1, c2, h_to_c2)
}

// With compression and encryption
#[cfg(all(feature = "compression", feature = "encryption"))]
fn build_stream_triple_comp_enc() -> (
    NonBlockStream<Msg>,
    NonBlockStream<Msg>,
    NonBlockStream<Msg>,
    NonBlockStream<Msg>,
) {
    let (c1, h_to_c1, c2, h_to_c2) = create_connections();
    let key = [0u8; 32];
    let nb_to_c1 = NonBlockStream::from_version_packs(
        Default::default(),
        Packs::default().compress().encrypt(&key),
        h_to_c1,
    );
    let nb_to_c2 = NonBlockStream::from_version_packs(
        Default::default(),
        Packs::default().compress().encrypt(&key),
        h_to_c2,
    );
    let nbc1 = NonBlockStream::from_version_packs(
        Default::default(),
        Packs::default().compress().encrypt(&key),
        c1,
    );
    let nbc2 = NonBlockStream::from_version_packs(
        Default::default(),
        Packs::default().compress().encrypt(&key),
        c2,
    );
    (nb_to_c1, nb_to_c2, nbc1, nbc2)
}

fn find_port() -> u16 {
    (10000..=20000)
        .find(|p| TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], *p))).is_ok())
        .unwrap()
}