srt-tokio 0.3.0

SRT implementation in Rust
Documentation
use std::{
    collections::VecDeque,
    time::{Duration, Instant},
};

use anyhow::Error;
use bytes::Bytes;
use futures::{stream, FutureExt, SinkExt, Stream, StreamExt};
use log::{error, info};
use tokio::select;

use srt_tokio::{options::*, SocketStatistics, SrtSocket};

fn stream_exact(duration: Duration) -> impl Stream<Item = Bytes> {
    let message = Bytes::from(vec![5; 1024]);
    let first = tokio::time::Instant::now();
    // This will momentarily double the data rate for the first few seconds, pushing the flow
    // window, send and receive buffers past their limits. The connection should recover once the
    // input data rate is stabilized.
    stream::unfold(
        (message, first, duration / 2),
        move |(message, last, d)| async move {
            tokio::time::sleep_until(last + d).await;
            if first.elapsed() < Duration::from_secs(3) {
                Some((message.clone(), (message, last + d, d)))
            } else {
                Some((message.clone(), (message, last + d, duration)))
            }
        },
    )
}

#[allow(clippy::large_enum_variant)]
enum Select {
    Message((Instant, Bytes)),
    Statistics(SocketStatistics),
}

#[tokio::test]
async fn high_bandwidth() -> Result<(), Error> {
    use srt_protocol::options::LiveBandwidthMode::*;
    let _ = pretty_env_logger::try_init();

    const RATE_MBPS: u64 = 50;
    let latency = Duration::from_millis(150);
    let buffer_size = ByteCount((latency.as_secs_f64() * (RATE_MBPS as f64 * 1_000_000.)) as u64);
    let sender_fut = async move {
        let mut sock = SrtSocket::builder()
            .latency(Duration::from_millis(150))
            .bandwidth(Estimated {
                expected: DataRate(RATE_MBPS * 1_000_000),
                overhead: Percent(20),
            })
            .set(|options| {
                options.sender.buffer_size = buffer_size * 10;
                options.connect.udp_send_buffer_size = ByteCount(5_000_000);
            })
            .call("127.0.0.1:6654", None)
            .await?;

        let mut stream_gbps = stream_exact(Duration::from_micros(1_000_000 / 1024 / RATE_MBPS))
            .map(|bytes| Ok((Instant::now(), bytes)))
            .boxed();

        info!("Sender all connected");

        sock.send_all(&mut stream_gbps).await?;

        sock.close().await
    };

    let recv_fut = async move {
        let mut sock = SrtSocket::builder()
            .set(|options| {
                options.receiver.buffer_size = buffer_size * 2;
                options.connect.udp_recv_buffer_size = ByteCount(5_000_000);
                options.session.statistics_interval = Duration::from_secs(1);
            })
            .latency(latency)
            .listen_on(":6654")
            .await?
            .fuse();

        let mut statistics = sock.get_mut().statistics().clone().fuse();

        let mut window = VecDeque::new();
        let mut bytes_received = 0;
        let window_size = Duration::from_secs(1);

        loop {
            use Select::*;
            let next = futures::select!(
                message = sock.next() => Message(message.unwrap().unwrap()),
                statistics = statistics.next() => Statistics(statistics.unwrap()),
                complete => break,
            );

            match next {
                Message((_, bytes)) => {
                    bytes_received += bytes.len();
                    window.push_back((Instant::now(), bytes.len()));

                    while let Some((a, bytes)) = window.front() {
                        if Instant::now() - *a > window_size {
                            bytes_received -= *bytes;
                            window.pop_front();
                        } else {
                            break;
                        }
                    }

                    print!(
                        "Received {:10.3}MB, rate={:10.3}MB/s\r",
                        bytes_received as f64 / 1024. / 1024.,
                        bytes_received as f64 / 1024. / 1024. / window_size.as_secs_f64(),
                    );
                }
                Statistics(statistics) => {
                    println!("rx_unique_bytes: {:<30}", statistics.rx_unique_bytes);
                }
            }
        }
        Ok::<_, Error>(())
    };

    let send = tokio::spawn(sender_fut).fuse();
    let recv = tokio::spawn(recv_fut).fuse();
    let timeout = tokio::time::sleep(Duration::from_secs(60)).fuse();

    select!(
        result = send => error!("send: {:?}", result),
        result = recv => error!("recv: {:?}", result),
        _ = timeout => { }
    );

    Ok(())
}