redpine 0.3.0

Connection-oriented UDP data transfer for real-time applications
Documentation
use std::sync::mpsc::{self, TryRecvError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time;

use super::util::{DataPacket, Packet, SummaryPacket};

fn run_server(port: u16, config: redpine::ServerConfig, rx: mpsc::Receiver<()>) {
    // Always bind to core zero
    let core_ids = core_affinity::get_core_ids().expect("failed to query core IDs");
    let core_id = core_ids[0];

    let mut server = redpine::Server::bind_with_config(("127.0.0.1", port), config)
        .expect("failed to create redpine server");

    let success = core_affinity::set_for_current(core_id);
    assert_eq!(success, true);

    struct StreamData {
        md5_ctx: Option<md5::Context>,
        next_id: u16,
        log: Vec<bool>,
    }

    struct PeerData {
        rel_stream: StreamData,
        unrel_stream: StreamData,
    }

    loop {
        let event_timeout = time::Duration::from_millis(1000);

        while let Some(event) = server.wait_event_timeout(event_timeout) {
            match event {
                redpine::ServerEvent::Connect(peer) => {
                    let peer_data = Arc::new(Mutex::new(PeerData {
                        rel_stream: StreamData {
                            md5_ctx: Some(md5::Context::new()),
                            next_id: 0,
                            log: Vec::new(),
                        },
                        unrel_stream: StreamData {
                            md5_ctx: Some(md5::Context::new()),
                            next_id: 0,
                            log: Vec::new(),
                        },
                    }));

                    peer.set_data(Some(peer_data as Arc<dyn std::any::Any + Send + Sync>));
                }
                redpine::ServerEvent::Disconnect(_) => {}
                redpine::ServerEvent::Receive(mut peer, bytes) => {
                    let peer_data = peer.data().unwrap().downcast::<Mutex<PeerData>>().unwrap();

                    let mut peer_data = peer_data.lock().unwrap();

                    let packet = Packet::from_bytes(&bytes);

                    match packet {
                        Packet::Data(packet) => {
                            let stream_data = if packet.unrel {
                                &mut peer_data.unrel_stream
                            } else {
                                &mut peer_data.rel_stream
                            };
                            let ref mut md5_ctx = stream_data.md5_ctx.as_mut().unwrap();
                            let ref mut log = stream_data.log;

                            md5_ctx.consume(&packet.id.to_le_bytes());
                            md5_ctx.consume(&packet.data);

                            if packet.unrel && packet.id < stream_data.next_id {
                                panic!("unreliable packet received out of order");
                            }
                            if !packet.unrel && packet.id != stream_data.next_id {
                                panic!("reliable packet received out of order");
                            }
                            stream_data.next_id = packet.id + 1;

                            let log_idx = packet.id as usize;
                            let required_size = log_idx + 1;
                            log.resize(required_size, false);
                            if log[log_idx] == false {
                                log[log_idx] = true;
                            } else {
                                panic!("duplicate packet received");
                            }
                        }
                        Packet::SummaryReq => {
                            let packet = Packet::Summary(SummaryPacket {
                                rel_md5: peer_data
                                    .rel_stream
                                    .md5_ctx
                                    .take()
                                    .unwrap()
                                    .compute()
                                    .into(),
                                unrel_md5: peer_data
                                    .unrel_stream
                                    .md5_ctx
                                    .take()
                                    .unwrap()
                                    .compute()
                                    .into(),
                                rel_log: peer_data.rel_stream.log.clone().into(),
                                unrel_log: peer_data.unrel_stream.log.clone().into(),
                            });

                            peer.send(packet.to_bytes(), redpine::SendMode::Reliable);
                        }
                        _ => panic!(),
                    }
                }
                redpine::ServerEvent::Error(_, _) => {
                    panic!("ServerEvent::Error");
                }
            }
        }

        match rx.try_recv() {
            Ok(_) | Err(TryRecvError::Disconnected) => {
                break;
            }
            Err(TryRecvError::Empty) => {}
        }
    }
}

fn run_client(id: usize, server_port: u16, config: Config) {
    // Stay off the server's core (zero)
    let core_ids = core_affinity::get_core_ids().expect("failed to query core IDs");
    if core_ids.len() > 1 {
        let core_idx = 1 + (id % (core_ids.len() - 1));
        let core_id = core_ids[core_idx];

        let success = core_affinity::set_for_current(core_id);
        assert_eq!(success, true);
    } else {
        // ¯\_(ツ)_/¯
    }

    let connect_timeout = time::Duration::from_millis(config.client_connect_timeout_ms);
    let disconnect_timeout = time::Duration::from_millis(config.client_disconnect_timeout_ms);

    let mut client = redpine::Client::connect(("127.0.0.1", server_port))
        .expect("failed to create redpine client");

    let mut connected = false;

    while let Some(event) = client.wait_event_timeout(connect_timeout) {
        match event {
            redpine::ClientEvent::Connect => {
                println!("client {} connected", id);
                connected = true;
                break;
            }
            other => {
                panic!("unexpected {:?}", other);
            }
        }
    }

    if !connected {
        panic!("timed out waiting for connect");
    }

    let mut rel_md5_ctx = md5::Context::new();
    let mut unrel_md5_ctx = md5::Context::new();
    let mut rel_send_log = Vec::new();
    let mut unrel_send_log = Vec::new();

    match config.client_action {
        ClientAction::HurryUpAndWait(delay_ms) => {
            thread::sleep(time::Duration::from_millis(delay_ms));
        }
        ClientAction::Transfer(config) => {
            let pre_delay = time::Duration::from_millis(config.pre_delay_ms);
            let post_delay = time::Duration::from_millis(config.post_delay_ms);
            let spacing_delay = time::Duration::from_millis(config.spacing_delay_ms);

            if let Some(event) = client.wait_event_timeout(pre_delay) {
                panic!("unexpected {:?}", event);
            }

            println!("client {} starting transfer", id);

            for _ in 0..config.packet_count {
                let max_size = config.packet_distribution
                    [rand::random::<usize>() % config.packet_distribution.len()];
                let size = rand::random::<usize>() % max_size;
                let data = (0..size)
                    .map(|_| rand::random::<u8>())
                    .collect::<Vec<_>>()
                    .into_boxed_slice();

                let unrel = (rand::random::<usize>() % 2) == 1;

                let (packet_id, mode) = if unrel {
                    (unrel_send_log.len(), redpine::SendMode::Unreliable(500))
                } else {
                    (rel_send_log.len(), redpine::SendMode::Reliable)
                };

                let packet_id_u16: u16 = packet_id.try_into().unwrap();
                let packet = Packet::Data(DataPacket {
                    unrel,
                    id: packet_id_u16,
                    data: data.clone(),
                });

                client.send(packet.to_bytes(), mode.clone());

                if unrel {
                    unrel_send_log.push(data);
                } else {
                    rel_send_log.push(data);
                }

                if let Some(event) = client.wait_event_timeout(spacing_delay) {
                    panic!("unexpected {:?}", event);
                }
            }

            println!("client {} finished transfer", id);

            if let Some(event) = client.wait_event_timeout(post_delay) {
                panic!("unexpected {:?}", event);
            }

            let packet = Packet::SummaryReq;

            println!("client {} requesting summary", id);

            client.send(packet.to_bytes(), redpine::SendMode::Reliable);

            let summary_timeout = time::Duration::from_millis(config.summary_timeout_ms);

            if let Some(event) = client.wait_event_timeout(summary_timeout) {
                match event {
                    redpine::ClientEvent::Receive(bytes) => {
                        let packet = Packet::from_bytes(&bytes);

                        match packet {
                            Packet::Summary(packet) => {
                                let mut received_packet_count = 0;

                                for (idx, ref data) in rel_send_log.iter().enumerate() {
                                    if idx < packet.rel_log.len() {
                                        let was_received = packet.rel_log[idx];

                                        if was_received {
                                            let id: u16 = idx.try_into().unwrap();

                                            rel_md5_ctx.consume(&id.to_le_bytes());
                                            rel_md5_ctx.consume(&data);

                                            received_packet_count += 1;
                                        } else {
                                            panic!("reliable packet {} was not received", idx);
                                        }
                                    }
                                }

                                for (idx, ref data) in unrel_send_log.iter().enumerate() {
                                    if idx < packet.unrel_log.len() {
                                        let was_received = packet.unrel_log[idx];

                                        if was_received {
                                            let id: u16 = idx.try_into().unwrap();

                                            unrel_md5_ctx.consume(&id.to_le_bytes());
                                            unrel_md5_ctx.consume(&data);

                                            received_packet_count += 1;
                                        }
                                    }
                                }

                                if received_packet_count < config.packet_count {
                                    println!(
                                        "warning: client {} sent {} packets, but only {} were received",
                                        id, config.packet_count, received_packet_count
                                    );
                                }

                                // Expect that within each stream, the packets which were reported
                                // received had the expected bytes and were received in the correct
                                // order
                                assert_eq!(rel_md5_ctx.compute(), md5::Digest(packet.rel_md5));
                                assert_eq!(unrel_md5_ctx.compute(), md5::Digest(packet.unrel_md5));
                            }
                            other => panic!("unexpected {:?}", other),
                        }
                    }
                    other => panic!("unexpected {:?}", other),
                }
            }
        }
    }

    client.disconnect();

    while let Some(event) = client.wait_event_timeout(disconnect_timeout) {
        match event {
            redpine::ClientEvent::Disconnect => {
                println!("client {} disconnected", id);
                connected = false;
                break;
            }
            other => {
                panic!("unexpected {:?}", other);
            }
        }
    }

    if connected {
        panic!("timed out waiting for disconnect");
    }
}

#[derive(Clone)]
pub struct TransferConfig {
    pub packet_count: usize,
    pub packet_distribution: Vec<usize>,
    pub pre_delay_ms: u64,
    pub post_delay_ms: u64,
    pub spacing_delay_ms: u64,
    pub summary_timeout_ms: u64,
}

#[derive(Clone)]
pub enum ClientAction {
    HurryUpAndWait(u64),
    Transfer(TransferConfig),
}

#[derive(Clone)]
pub struct Config {
    pub server_port: u16,
    pub client_count: usize,
    pub client_connect_timeout_ms: u64,
    pub client_disconnect_timeout_ms: u64,
    pub client_action: ClientAction,
}

pub fn run(config: Config) {
    let server_port = config.server_port;

    let server_config = redpine::ServerConfig {
        peer_count_max: config.client_count,
        ..Default::default()
    };

    let (server_tx, server_rx) = mpsc::channel();

    let server_thread = thread::spawn(move || run_server(server_port, server_config, server_rx));
    let mut client_threads = Vec::new();

    let mut client_id = 0;
    for _ in 0..config.client_count {
        let config = config.clone();

        client_threads.push(thread::spawn(move || {
            run_client(client_id, server_port, config)
        }));

        client_id += 1;
    }

    for client_thread in client_threads.into_iter() {
        client_thread.join().unwrap();
    }

    let _ = server_tx.send(());

    server_thread.join().unwrap();
}