chokepoint 0.5.1

Generic stream transformer to simulate traffic / network congestion
Documentation
use bytes::Bytes;
use chokepoint::{
    ChokeSettings,
    ChokeSettingsOrder,
    ChokeStream,
};
use futures::stream::StreamExt;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

#[tokio::test]
async fn delivery_without_modifications() {
    let (tx, rx) = mpsc::unbounded_channel();
    let traffic_shaper = ChokeStream::new(Box::new(UnboundedReceiverStream::new(rx)), Default::default());

    tokio::spawn(async move {
        for i in 0..10usize {
            tx.send(Bytes::from(i.to_le_bytes().to_vec())).unwrap();
        }
    });

    let output = traffic_shaper
        .map(|packet| usize::from_le_bytes(packet[0..8].try_into().unwrap()))
        .collect::<Vec<_>>()
        .await;

    assert_eq!(output, (0..10).collect::<Vec<_>>());
}

#[yare::parameterized(
        unordered = { ChokeSettingsOrder::Unordered, vec![2, 3, 1] },
        ordered = { ChokeSettingsOrder::Ordered, vec![1, 2, 3] },
        // backpressure = { ChokeSettingsOrder::Backpressure, vec![1, 2, 3] }
    )]
#[test_macro(tokio::test)]
async fn ordering(ordering: ChokeSettingsOrder, expected: Vec<usize>) {
    let (tx, rx) = mpsc::unbounded_channel();
    let stream = ChokeStream::new(
        Box::new(UnboundedReceiverStream::new(rx)),
        ChokeSettings::default()
            .set_ordering(Some(ordering))
            .set_latency_distribution(Some({
                let mut n = 0;
                move || {
                    n += 1;
                    match n {
                        1 => Some(Duration::from_millis(150)),
                        2 => Some(Duration::from_millis(50)),
                        3 => Some(Duration::from_millis(100)),
                        _ => None,
                    }
                }
            })),
    );

    tx.send(Bytes::from(1usize.to_le_bytes().to_vec())).unwrap();
    tx.send(Bytes::from(2usize.to_le_bytes().to_vec())).unwrap();
    tx.send(Bytes::from(3usize.to_le_bytes().to_vec())).unwrap();
    drop(tx);

    let output = stream
        .map(|packet| usize::from_le_bytes(packet[0..8].try_into().unwrap()))
        .collect::<Vec<_>>()
        .await;

    assert_eq!(output, expected);
}