udx 0.2.0

Rust port of libudx, a protocol for reliable, multiplex, and congestion controlled streams over udp
Documentation
use pretty_bytes::converter::convert as fmtbytes;
use std::{io, time::Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use udx::UdxSocket;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();
    let len = usize_from_env("LENGTH", 1024 * 1024 * 64);
    let num_streams = usize_from_env("STREAMS", 4);
    let iterations = usize_from_env("ITERATIONS", 1);
    for _i in 0..iterations {
        run(len, num_streams).await.unwrap();
    }
}

async fn run(total: usize, num_streams: usize) -> io::Result<()> {
    let socka = UdxSocket::bind("127.0.0.1:0")?;
    let sockb = UdxSocket::bind("127.0.0.1:0")?;
    let addra = socka.local_addr()?;
    let addrb = sockb.local_addr()?;

    let start = Instant::now();
    let len = total / num_streams;
    eprintln!(
        "sending {} over {} streams (each stream {})",
        fmtbytes(total as f64),
        num_streams,
        fmtbytes(len as f64),
    );
    let mut readers = vec![];
    let mut writers = vec![];
    for i in 1..=num_streams as u32 {
        let streama = socka.connect(addrb, i, i)?;
        let streamb = sockb.connect(addra, i, i)?;
        let read_buf = vec![0u8; len];
        let write_buf = vec![i as u8; len];
        if i % 2 == 0 {
            readers.push((streama, read_buf));
            writers.push((streamb, write_buf));
        } else {
            readers.push((streamb, write_buf));
            writers.push((streama, read_buf));
        }
    }

    let mut tasks: Vec<tokio::task::JoinHandle<io::Result<()>>> = vec![];

    while let Some(writer) = writers.pop() {
        let task = tokio::spawn(async move {
            let (mut writer, message) = writer;
            writer.write_all(&message).await?;
            Ok(())
        });
        tasks.push(task);
    }

    while let Some(reader) = readers.pop() {
        let task = tokio::spawn(async move {
            let (mut reader, mut read_buf) = reader;
            reader.read_exact(&mut read_buf).await?;
            Ok(())
        });
        tasks.push(task);
    }

    for task in tasks {
        task.await??;
    }
    let time = start.elapsed();
    let throughput = total as f64 / time.as_secs_f64();
    eprintln!("throughput: {}/s   time: {:?}", fmtbytes(throughput), time);
    Ok(())
}

fn usize_from_env(name: &str, default: usize) -> usize {
    std::env::var(name)
        .map(|x| {
            x.parse::<usize>()
                .unwrap_or_else(|_| panic!("{} must be a number", name))
        })
        .unwrap_or(default)
}