webrtc-util 0.17.1

Utilities for WebRTC.rs stack
Documentation
use std::collections::VecDeque;
use std::sync::Arc;

use bytes::Bytes;
use tokio::sync::mpsc;

use super::conn_bridge::*;
use super::*;

static MSG1: Bytes = Bytes::from_static(b"ADC");
static MSG2: Bytes = Bytes::from_static(b"DEFG");

#[tokio::test]
async fn test_bridge_normal() -> Result<()> {
    let (br, conn0, conn1) = Bridge::new(0, None, None);

    let n = conn0.send(&MSG1).await?;
    assert_eq!(n, MSG1.len(), "unexpected length");

    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        let mut buf = vec![0u8; 256];
        let n = conn1.recv(&mut buf).await?;
        let _ = tx.send(n).await;
        Result::<()>::Ok(())
    });

    br.process().await;

    let n = rx.recv().await.unwrap();
    assert_eq!(n, MSG1.len(), "unexpected length");

    Ok(())
}

#[tokio::test]
async fn test_bridge_drop_first_packet_from_conn0() -> Result<()> {
    let (br, conn0, conn1) = Bridge::new(0, None, None);

    let n = conn0.send(&MSG1).await?;
    assert_eq!(n, MSG1.len(), "unexpected length");
    let n = conn0.send(&MSG2).await?;
    assert_eq!(n, MSG2.len(), "unexpected length");

    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        let mut buf = vec![0u8; 256];
        let n = conn1.recv(&mut buf).await?;
        let _ = tx.send(n).await;
        Result::<()>::Ok(())
    });

    br.drop_offset(0, 0, 1).await;
    br.process().await;

    let n = rx.recv().await.unwrap();
    assert_eq!(n, MSG2.len(), "unexpected length");

    Ok(())
}

#[tokio::test]
async fn test_bridge_drop_second_packet_from_conn0() -> Result<()> {
    let (br, conn0, conn1) = Bridge::new(0, None, None);

    let n = conn0.send(&MSG1).await?;
    assert_eq!(n, MSG1.len(), "unexpected length");
    let n = conn0.send(&MSG2).await?;
    assert_eq!(n, MSG2.len(), "unexpected length");

    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        let mut buf = vec![0u8; 256];
        let n = conn1.recv(&mut buf).await?;
        let _ = tx.send(n).await;
        Result::<()>::Ok(())
    });

    br.drop_offset(0, 1, 1).await;
    br.process().await;

    let n = rx.recv().await.unwrap();
    assert_eq!(n, MSG1.len(), "unexpected length");

    Ok(())
}

#[tokio::test]
async fn test_bridge_drop_first_packet_from_conn1() -> Result<()> {
    let (br, conn0, conn1) = Bridge::new(0, None, None);

    let n = conn1.send(&MSG1).await?;
    assert_eq!(n, MSG1.len(), "unexpected length");
    let n = conn1.send(&MSG2).await?;
    assert_eq!(n, MSG2.len(), "unexpected length");

    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        let mut buf = vec![0u8; 256];
        let n = conn0.recv(&mut buf).await?;
        let _ = tx.send(n).await;
        Result::<()>::Ok(())
    });

    br.drop_offset(1, 0, 1).await;
    br.process().await;

    let n = rx.recv().await.unwrap();
    assert_eq!(n, MSG2.len(), "unexpected length");

    Ok(())
}

#[tokio::test]
async fn test_bridge_drop_second_packet_from_conn1() -> Result<()> {
    let (br, conn0, conn1) = Bridge::new(0, None, None);

    let n = conn1.send(&MSG1).await?;
    assert_eq!(n, MSG1.len(), "unexpected length");
    let n = conn1.send(&MSG2).await?;
    assert_eq!(n, MSG2.len(), "unexpected length");

    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        let mut buf = vec![0u8; 256];
        let n = conn0.recv(&mut buf).await?;
        let _ = tx.send(n).await;
        Result::<()>::Ok(())
    });

    br.drop_offset(1, 1, 1).await;
    br.process().await;

    let n = rx.recv().await.unwrap();
    assert_eq!(n, MSG1.len(), "unexpected length");

    Ok(())
}

#[tokio::test]
async fn test_bridge_reorder_packets_from_conn0() -> Result<()> {
    let (br, conn0, conn1) = Bridge::new(0, None, None);

    let n = conn0.send(&MSG1).await?;
    assert_eq!(n, MSG1.len(), "unexpected length");
    let n = conn0.send(&MSG2).await?;
    assert_eq!(n, MSG2.len(), "unexpected length");

    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        let mut buf = vec![0u8; 256];
        let n = conn1.recv(&mut buf).await?;
        assert_eq!(n, MSG2.len(), "unexpected length");
        let n = conn1.recv(&mut buf).await?;
        assert_eq!(n, MSG1.len(), "unexpected length");

        let _ = rx.recv().await;

        Result::<()>::Ok(())
    });

    br.reorder(0).await;
    br.process().await;

    let _ = tx.send(()).await;

    Ok(())
}

#[tokio::test]
async fn test_bridge_reorder_packets_from_conn1() -> Result<()> {
    let (br, conn0, conn1) = Bridge::new(0, None, None);

    let n = conn1.send(&MSG1).await?;
    assert_eq!(n, MSG1.len(), "unexpected length");
    let n = conn1.send(&MSG2).await?;
    assert_eq!(n, MSG2.len(), "unexpected length");

    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        let mut buf = vec![0u8; 256];
        let n = conn0.recv(&mut buf).await?;
        assert_eq!(n, MSG2.len(), "unexpected length");
        let n = conn0.recv(&mut buf).await?;
        assert_eq!(n, MSG1.len(), "unexpected length");

        let _ = rx.recv().await;

        Result::<()>::Ok(())
    });

    br.reorder(1).await;
    br.process().await;

    let _ = tx.send(()).await;

    Ok(())
}

#[tokio::test]
async fn test_bridge_inverse_error() -> Result<()> {
    let mut q = VecDeque::new();
    q.push_back(MSG1.clone());
    assert!(!inverse(&mut q));
    Ok(())
}

#[tokio::test]
async fn test_bridge_drop_next_n_packets() -> Result<()> {
    for id in 0..2 {
        let (br, conn0, conn1) = Bridge::new(0, None, None);
        br.drop_next_nwrites(id, 3);
        let conns: Vec<Arc<dyn Conn + Send + Sync>> = vec![Arc::new(conn0), Arc::new(conn1)];
        let src_conn = Arc::clone(&conns[id]);
        let dst_conn = Arc::clone(&conns[1 - id]);

        let (tx, mut rx) = mpsc::channel(5);

        tokio::spawn(async move {
            let mut buf = vec![0u8; 256];
            for _ in 0..2u8 {
                let n = dst_conn.recv(&mut buf).await?;
                let _ = tx.send(buf[..n].to_vec()).await;
            }

            Result::<()>::Ok(())
        });

        let mut msgs = vec![];
        for i in 0..5u8 {
            let msg = format!("msg{i}");
            let n = src_conn.send(msg.as_bytes()).await?;
            assert_eq!(n, msg.len(), "[{id}] unexpected length");
            msgs.push(msg);
            br.process().await;
        }

        for i in 0..2 {
            if let Some(buf) = rx.recv().await {
                assert_eq!(msgs[i + 3].as_bytes(), &buf);
            } else {
                panic!("{id} unexpected number of packets");
            }
        }
    }

    Ok(())
}