raknet-rust 0.2.0

Asynchronous, high-performance RakNet transport library for Rust.
Documentation
use std::time::{Duration, Instant};

use bytes::Bytes;
use raknet_rust::low_level::protocol::ack::{AckNackPayload, SequenceRange};
use raknet_rust::low_level::protocol::reliability::Reliability;
use raknet_rust::low_level::session::tunables::{CongestionProfile, SessionTunables};
use raknet_rust::low_level::session::{QueuePayloadResult, RakPriority, Session};

fn default_tick(session: &mut Session, now: Instant, max_new_datagrams: usize) -> usize {
    session
        .on_tick(now, max_new_datagrams, 64 * 1024, 0, 0)
        .len()
}

#[test]
fn reliable_send_is_throttled_by_congestion_window_until_ack() {
    let tunables = SessionTunables {
        congestion_profile: CongestionProfile::Custom,
        initial_congestion_window: 1.0,
        min_congestion_window: 1.0,
        max_congestion_window: 1.0,
        ..SessionTunables::default()
    };

    let mut session = Session::with_tunables(200, tunables);
    let payload = Bytes::from(vec![0xAA; 150]);

    assert!(matches!(
        session.queue_payload(
            payload.clone(),
            Reliability::ReliableOrdered,
            0,
            RakPriority::High
        ),
        QueuePayloadResult::Enqueued { .. }
    ));
    assert!(matches!(
        session.queue_payload(payload, Reliability::ReliableOrdered, 0, RakPriority::High),
        QueuePayloadResult::Enqueued { .. }
    ));

    let now = Instant::now();
    let first = session.on_tick(now, 2, 64 * 1024, 0, 0);
    assert_eq!(
        first.len(),
        1,
        "cwnd=1 must allow only one reliable datagram"
    );
    let seq = first[0].header.sequence;

    let blocked = default_tick(&mut session, now + Duration::from_millis(1), 2);
    assert_eq!(blocked, 0, "second datagram must wait for ACK");

    session.handle_ack_payload(AckNackPayload {
        ranges: vec![SequenceRange {
            start: seq,
            end: seq,
        }],
    });
    let _ = session.process_incoming_receipts(now + Duration::from_millis(30));

    let after_ack = default_tick(&mut session, now + Duration::from_millis(31), 2);
    assert_eq!(
        after_ack, 1,
        "after ACK, queued reliable datagram must flow"
    );
}

#[test]
fn karn_rule_skips_srtt_update_for_retransmitted_datagram() {
    let mut session = Session::new(200);
    let now = Instant::now();

    assert!(matches!(
        session.queue_payload(
            Bytes::from(vec![0xBB; 150]),
            Reliability::ReliableOrdered,
            0,
            RakPriority::High
        ),
        QueuePayloadResult::Enqueued { .. }
    ));

    let sent = session.on_tick(now, 1, 64 * 1024, 0, 0);
    assert_eq!(sent.len(), 1);
    let seq = sent[0].header.sequence;

    let resent = session.collect_resendable(now + Duration::from_secs(2), 4, usize::MAX);
    assert_eq!(resent.len(), 1, "timeout should produce one resend");

    session.handle_ack_payload(AckNackPayload {
        ranges: vec![SequenceRange {
            start: seq,
            end: seq,
        }],
    });
    let _ =
        session.process_incoming_receipts(now + Duration::from_secs(2) + Duration::from_millis(20));

    let snapshot = session.metrics_snapshot();
    assert_eq!(
        snapshot.srtt_ms, 0.0,
        "Karn rule: retransmitted packet ACK must not sample RTT"
    );
}

#[test]
fn resend_rto_is_clamped_to_configured_max() {
    let tunables = SessionTunables {
        congestion_profile: CongestionProfile::Custom,
        resend_rto: Duration::from_millis(100),
        min_resend_rto: Duration::from_millis(80),
        max_resend_rto: Duration::from_millis(500),
        ..SessionTunables::default()
    };

    let mut session = Session::with_tunables(200, tunables);
    let start = Instant::now();

    assert!(matches!(
        session.queue_payload(
            Bytes::from(vec![0xCC; 150]),
            Reliability::ReliableOrdered,
            0,
            RakPriority::High
        ),
        QueuePayloadResult::Enqueued { .. }
    ));
    let _ = session.on_tick(start, 1, 64 * 1024, 0, 0);

    for step in 1..=6 {
        let now = start + Duration::from_secs(step * 2);
        let resent = session.collect_resendable(now, 8, usize::MAX);
        assert_eq!(resent.len(), 1);
    }

    let snapshot = session.metrics_snapshot();
    assert!(
        snapshot.resend_rto_ms <= 500.0,
        "RTO must stay under configured max: {}",
        snapshot.resend_rto_ms
    );
}

#[test]
fn resend_rto_is_clamped_to_configured_min_after_fast_ack_samples() {
    let tunables = SessionTunables {
        congestion_profile: CongestionProfile::Custom,
        resend_rto: Duration::from_millis(400),
        min_resend_rto: Duration::from_millis(200),
        max_resend_rto: Duration::from_millis(2_000),
        ..SessionTunables::default()
    };

    let mut session = Session::with_tunables(200, tunables);
    let start = Instant::now();

    for i in 0..5 {
        assert!(matches!(
            session.queue_payload(
                Bytes::from(vec![0xCD; 120]),
                Reliability::ReliableOrdered,
                0,
                RakPriority::High
            ),
            QueuePayloadResult::Enqueued { .. }
        ));
        let send_at = start + Duration::from_millis((i * 10) as u64);
        let sent = session.on_tick(send_at, 1, 64 * 1024, 0, 0);
        assert_eq!(sent.len(), 1, "payload must be emitted for RTT sample");
        let seq = sent[0].header.sequence;

        session.handle_ack_payload(AckNackPayload {
            ranges: vec![SequenceRange {
                start: seq,
                end: seq,
            }],
        });
        let _ = session.process_incoming_receipts(send_at + Duration::from_millis(4));
    }

    let snapshot = session.metrics_snapshot();
    assert!(
        snapshot.resend_rto_ms >= 200.0,
        "RTO must stay above configured min: {}",
        snapshot.resend_rto_ms
    );
}

#[test]
fn high_latency_profile_is_less_aggressive_on_nack_loss_than_conservative() {
    let now = Instant::now();

    let mut conservative = Session::with_tunables(
        200,
        SessionTunables {
            congestion_profile: CongestionProfile::Conservative,
            ..SessionTunables::default()
        },
    );

    let mut high_latency = Session::with_tunables(
        200,
        SessionTunables {
            congestion_profile: CongestionProfile::HighLatency,
            ..SessionTunables::default()
        },
    );

    for session in [&mut conservative, &mut high_latency] {
        assert!(matches!(
            session.queue_payload(
                Bytes::from(vec![0xCE; 120]),
                Reliability::ReliableOrdered,
                0,
                RakPriority::High
            ),
            QueuePayloadResult::Enqueued { .. }
        ));
    }

    let conservative_sent = conservative.on_tick(now, 1, 64 * 1024, 0, 0);
    let high_latency_sent = high_latency.on_tick(now, 1, 64 * 1024, 0, 0);
    assert_eq!(conservative_sent.len(), 1);
    assert_eq!(high_latency_sent.len(), 1);

    let conservative_before = conservative.metrics_snapshot().congestion_window_packets;
    let high_latency_before = high_latency.metrics_snapshot().congestion_window_packets;

    conservative.handle_nack_payload(AckNackPayload {
        ranges: vec![SequenceRange {
            start: conservative_sent[0].header.sequence,
            end: conservative_sent[0].header.sequence,
        }],
    });
    let _ = conservative.process_incoming_receipts(now + Duration::from_millis(1));
    let conservative_after = conservative.metrics_snapshot().congestion_window_packets;

    high_latency.handle_nack_payload(AckNackPayload {
        ranges: vec![SequenceRange {
            start: high_latency_sent[0].header.sequence,
            end: high_latency_sent[0].header.sequence,
        }],
    });
    let _ = high_latency.process_incoming_receipts(now + Duration::from_millis(1));
    let high_latency_after = high_latency.metrics_snapshot().congestion_window_packets;

    let conservative_ratio = conservative_after / conservative_before.max(1.0);
    let high_latency_ratio = high_latency_after / high_latency_before.max(1.0);
    assert!(
        high_latency_ratio >= conservative_ratio,
        "high-latency profile should be at least as conservative on NACK loss (high={}, conservative={})",
        high_latency_ratio,
        conservative_ratio
    );
}

#[test]
fn pending_outgoing_bytes_return_to_zero_after_flush() {
    let mut session = Session::new(200);
    let payload = Bytes::from(vec![0xDD; 150]);

    let _ = session.queue_payload(
        payload.clone(),
        Reliability::ReliableOrdered,
        0,
        RakPriority::High,
    );
    let _ = session.queue_payload(payload, Reliability::ReliableOrdered, 0, RakPriority::High);

    assert!(
        session.pending_outgoing_bytes() > 0,
        "queued bytes must increase after enqueue"
    );

    let sent = session.on_tick(Instant::now(), 8, 64 * 1024, 0, 0);
    assert!(!sent.is_empty());
    assert_eq!(session.pending_outgoing_frames(), 0);
    assert_eq!(session.pending_outgoing_bytes(), 0);
}

#[test]
fn soft_backpressure_defers_low_priority_reliable() {
    let tunables = SessionTunables {
        outgoing_queue_max_frames: 4,
        outgoing_queue_max_bytes: 8 * 1024,
        outgoing_queue_soft_ratio: 0.5,
        ..SessionTunables::default()
    };

    let mut session = Session::with_tunables(200, tunables);

    let _ = session.queue_payload(
        Bytes::from_static(b"a"),
        Reliability::Reliable,
        0,
        RakPriority::High,
    );
    let _ = session.queue_payload(
        Bytes::from_static(b"b"),
        Reliability::Reliable,
        0,
        RakPriority::High,
    );

    assert!(matches!(
        session.queue_payload(
            Bytes::from_static(b"c"),
            Reliability::Reliable,
            0,
            RakPriority::Low
        ),
        QueuePayloadResult::Deferred
    ));

    let snapshot = session.metrics_snapshot();
    assert_eq!(snapshot.outgoing_queue_defers, 1);
}

#[test]
fn nack_for_reliable_datagram_triggers_resend_before_timeout() {
    let mut session = Session::new(200);
    let now = Instant::now();

    assert!(matches!(
        session.queue_payload(
            Bytes::from(vec![0xEE; 150]),
            Reliability::ReliableOrdered,
            0,
            RakPriority::High
        ),
        QueuePayloadResult::Enqueued { .. }
    ));

    let sent = session.on_tick(now, 1, 64 * 1024, 0, 0);
    assert_eq!(sent.len(), 1);
    let seq = sent[0].header.sequence;

    session.handle_nack_payload(AckNackPayload {
        ranges: vec![SequenceRange {
            start: seq,
            end: seq,
        }],
    });
    let progress = session.process_incoming_receipts(now + Duration::from_millis(5));
    assert_eq!(progress.nacked, 1);

    let resent = session.on_tick(now + Duration::from_millis(5), 0, 0, 4, usize::MAX);
    assert!(
        resent.iter().any(|d| d.header.sequence == seq),
        "nack should schedule immediate resend of the same datagram sequence"
    );
}

#[test]
fn unreliable_with_ack_receipt_reports_receipt_completion_on_ack() {
    let mut session = Session::new(200);
    let now = Instant::now();

    assert!(matches!(
        session.queue_payload_with_receipt(
            Bytes::from_static(b"hello"),
            Reliability::UnreliableWithAckReceipt,
            0,
            RakPriority::Normal,
            Some(555)
        ),
        QueuePayloadResult::Enqueued { .. }
    ));

    let sent = session.on_tick(now, 1, 64 * 1024, 0, 0);
    assert_eq!(sent.len(), 1);
    let seq = sent[0].header.sequence;

    session.handle_ack_payload(AckNackPayload {
        ranges: vec![SequenceRange {
            start: seq,
            end: seq,
        }],
    });
    let progress = session.process_incoming_receipts(now + Duration::from_millis(15));
    assert_eq!(progress.acked, 1);
    assert_eq!(progress.acked_receipt_ids, vec![555]);
}