udp_prague 0.1.1

A Rust implementation of the Prague congestion control protocol for UDP-based applications.
Documentation
#![cfg(feature = "session")]

use std::thread;
use std::time::Duration;

use udp_prague::core::{
    PragueReceivedPacket, PragueReceivedPacketView, PragueReceiverSession,
    PragueSegmentReceiverSession, PragueSegmentSenderSession, PragueSenderSession,
    PragueSessionConfig, PragueVideoReceiverSession, PragueVideoSenderSession,
    PragueVideoSessionConfig,
};

#[test]
fn session_wrapper_bulk_roundtrip_smoke() {
    let port = 38084u16;
    let payload = b"audio-or-message-payload".to_vec();

    let receiver = thread::spawn(move || {
        let mut receiver = PragueReceiverSession::bind("0.0.0.0", port).expect("bind receiver");
        let received = receiver
            .receive_and_ack(500_000)
            .expect("receive and ack")
            .expect("packet");

        match received.packet {
            PragueReceivedPacket::Bulk(packet) => {
                assert_eq!(packet.app_data, payload);
                assert_eq!(received.ack.acked_sequence_number, packet.sequence_number);
            }
            PragueReceivedPacket::Frame(_) => panic!("unexpected frame packet"),
        }
    });

    thread::sleep(Duration::from_millis(50));

    let mut sender =
        PragueSenderSession::connect("127.0.0.1", port, PragueSessionConfig::default())
            .expect("connect sender");

    let sent = sender
        .send_bulk(b"audio-or-message-payload")
        .expect("send bulk");
    let feedback = sender
        .receive_feedback(500_000)
        .expect("feedback result")
        .expect("ack feedback");

    assert_eq!(feedback.acked_sequence_number, sent.sequence_number);
    assert_eq!(sender.inflight_packets(), 0);

    receiver.join().expect("receiver join");
}

#[test]
fn session_wrapper_large_bulk_blocking_roundtrip_smoke() {
    let port = 38086u16;
    let payload = (0..5000u32)
        .map(|value| (value % 251) as u8)
        .collect::<Vec<_>>();
    let expected_payload = payload.clone();

    let receiver = thread::spawn(move || {
        let mut receiver = PragueReceiverSession::bind("0.0.0.0", port).expect("bind receiver");
        let mut assembled = Vec::new();

        while assembled.len() < expected_payload.len() {
            let received = receiver
                .receive_and_ack(500_000)
                .expect("receive and ack")
                .expect("bulk packet");

            match received.packet {
                PragueReceivedPacket::Bulk(packet) => {
                    assembled.extend_from_slice(&packet.app_data);
                }
                PragueReceivedPacket::Frame(_) => panic!("unexpected frame packet"),
            }
        }

        assert_eq!(assembled, expected_payload);
    });

    thread::sleep(Duration::from_millis(50));

    let mut sender = PragueSenderSession::connect(
        "127.0.0.1",
        port,
        PragueSessionConfig {
            max_packet_size: 256,
            ..PragueSessionConfig::default()
        },
    )
    .expect("connect sender");

    let report = sender
        .send_large_bulk_blocking(&payload, 500_000)
        .expect("send large bulk");

    assert_eq!(report.app_bytes_sent, payload.len() as u64);
    assert!(report.packets_sent > 1);
    assert_eq!(sender.inflight_packets(), 0);

    receiver.join().expect("receiver join");
}

#[test]
fn session_wrapper_borrowed_bulk_roundtrip_smoke() {
    let port = 38089u16;
    let payload = b"audio-or-message-payload".to_vec();

    let receiver = thread::spawn(move || {
        let mut receiver = PragueReceiverSession::bind("0.0.0.0", port).expect("bind receiver");
        let received = receiver
            .receive_and_ack_borrowed(500_000)
            .expect("receive and ack")
            .expect("packet");

        match received.packet {
            PragueReceivedPacketView::Bulk(packet) => {
                assert_eq!(packet.app_data, payload.as_slice());
                assert_eq!(received.ack.acked_sequence_number, packet.sequence_number);
            }
            PragueReceivedPacketView::Frame(_) => panic!("unexpected frame packet"),
        }
    });

    thread::sleep(Duration::from_millis(50));

    let mut sender =
        PragueSenderSession::connect("127.0.0.1", port, PragueSessionConfig::default())
            .expect("connect sender");

    let sent = sender
        .send_bulk(b"audio-or-message-payload")
        .expect("send bulk");
    let feedback = sender
        .receive_feedback(500_000)
        .expect("feedback result")
        .expect("ack feedback");

    assert_eq!(feedback.acked_sequence_number, sent.sequence_number);
    assert_eq!(sender.inflight_packets(), 0);

    receiver.join().expect("receiver join");
}

#[test]
fn segmented_bulk_wrapper_roundtrip_smoke() {
    let port = 38087u16;
    let content_tag = 23u16;
    let payload = (0..7000u32)
        .map(|value| ((value * 17) % 251) as u8)
        .collect::<Vec<_>>();
    let expected_payload = payload.clone();

    let receiver = thread::spawn(move || {
        let mut receiver =
            PragueSegmentReceiverSession::bind("0.0.0.0", port).expect("bind receiver");
        let received = receiver
            .receive_segment_and_ack(500_000)
            .expect("receive segment result")
            .expect("received segment");

        assert_eq!(received.content_tag, content_tag);
        assert_eq!(received.payload, expected_payload);
    });

    thread::sleep(Duration::from_millis(50));

    let mut sender = PragueSegmentSenderSession::connect(
        "127.0.0.1",
        port,
        PragueSessionConfig {
            max_packet_size: 256,
            ..PragueSessionConfig::default()
        },
    )
    .expect("connect sender");

    let report = sender
        .send_segment_blocking(content_tag, &payload, 500_000)
        .expect("send segment");

    assert_eq!(report.content_tag, content_tag);
    assert_eq!(report.segment_size_bytes, payload.len() as u64);
    assert!(report.packets_sent > 1);
    assert_eq!(sender.inflight_packets(), 0);

    receiver.join().expect("receiver join");
}

#[test]
fn video_session_wrapper_frame_roundtrip_smoke() {
    let port = 38085u16;
    let frame = vec![0x5a; 400];
    let expected_frame = frame.clone();

    let receiver = thread::spawn(move || {
        let mut receiver = PragueReceiverSession::bind("0.0.0.0", port).expect("bind receiver");
        let mut assembled = Vec::new();
        let mut expected_frame_len = None;

        while expected_frame_len.is_none_or(|len| assembled.len() < len) {
            let received = receiver
                .receive_and_ack(500_000)
                .expect("receive and ack")
                .expect("frame packet");

            match received.packet {
                PragueReceivedPacket::Frame(packet) => {
                    expected_frame_len.get_or_insert(packet.frame_size_bytes as usize);
                    assembled.extend_from_slice(&packet.app_data);
                }
                PragueReceivedPacket::Bulk(_) => panic!("unexpected bulk packet"),
            }
        }

        assert_eq!(assembled, expected_frame);
    });

    thread::sleep(Duration::from_millis(50));

    let mut sender = PragueVideoSenderSession::connect(
        "127.0.0.1",
        port,
        PragueVideoSessionConfig {
            max_packet_size: 256,
            ..PragueVideoSessionConfig::default()
        },
    )
    .expect("connect video sender");

    let queued = sender.queue_frame(&frame).expect("queue frame");
    assert_eq!(queued.actual_frame_size_bytes, frame.len() as u64);

    while sender.has_pending_frame() || sender.inflight_packets() > 0 {
        match sender.transmit_ready_frame_fragments() {
            Ok(Some(_report)) => {}
            Ok(None) => {}
            Err(udp_prague::SessionError::WouldBlock { .. }) => {}
            Err(err) => panic!("unexpected transmit error: {err}"),
        }

        if let Some(_feedback) = sender
            .receive_feedback(20_000)
            .expect("video feedback result")
        {}
    }

    assert_eq!(sender.inflight_packets(), 0);
    assert_eq!(sender.inflight_frames(), 0);

    receiver.join().expect("receiver join");
}

#[test]
fn video_receiver_wrapper_reassembles_full_frame_smoke() {
    let port = 38088u16;
    let frame = (0..1600u32)
        .map(|value| ((value * 19) % 251) as u8)
        .collect::<Vec<_>>();
    let expected_frame = frame.clone();

    let receiver = thread::spawn(move || {
        let mut receiver =
            PragueVideoReceiverSession::bind("0.0.0.0", port).expect("bind video receiver");
        let received = receiver
            .receive_frame_and_ack(500_000)
            .expect("receive frame result")
            .expect("received frame");

        assert_eq!(received.frame_size_bytes, expected_frame.len() as u64);
        assert_eq!(received.payload, expected_frame);
    });

    thread::sleep(Duration::from_millis(50));

    let mut sender = PragueVideoSenderSession::connect(
        "127.0.0.1",
        port,
        PragueVideoSessionConfig {
            max_packet_size: 256,
            ..PragueVideoSessionConfig::default()
        },
    )
    .expect("connect video sender");

    let queued = sender.queue_frame(&frame).expect("queue frame");
    assert_eq!(queued.actual_frame_size_bytes, frame.len() as u64);

    while sender.has_pending_frame() || sender.inflight_packets() > 0 {
        match sender.transmit_ready_frame_fragments() {
            Ok(Some(_report)) => {}
            Ok(None) => {}
            Err(udp_prague::SessionError::WouldBlock { .. }) => {}
            Err(err) => panic!("unexpected transmit error: {err}"),
        }

        if let Some(_feedback) = sender
            .receive_feedback(20_000)
            .expect("video feedback result")
        {}
    }

    assert_eq!(sender.inflight_packets(), 0);
    assert_eq!(sender.inflight_frames(), 0);

    receiver.join().expect("receiver join");
}