raknet-rust 0.2.0

Asynchronous, high-performance RakNet transport library for Rust.
Documentation
use std::collections::VecDeque;

use crate::protocol::frame::Frame;
use crate::session::SessionState;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineFrameAction {
    Deliver,
    Queued,
    Overflow,
}

#[derive(Debug, Clone, Copy, Default)]
pub struct SessionPipelineMetricsSnapshot {
    pub pending_unhandled_frames: usize,
    pub pending_unhandled_bytes: usize,
    pub unhandled_frames_queued: u64,
    pub unhandled_frames_flushed: u64,
    pub unhandled_frames_dropped: u64,
}

#[derive(Debug, Clone)]
pub struct SessionPipeline {
    pending_unhandled: VecDeque<Frame>,
    pending_unhandled_bytes: usize,
    max_unhandled_frames: usize,
    max_unhandled_bytes: usize,
    unhandled_frames_queued: u64,
    unhandled_frames_flushed: u64,
    unhandled_frames_dropped: u64,
}

impl SessionPipeline {
    pub fn new(max_unhandled_frames: usize, max_unhandled_bytes: usize) -> Self {
        Self {
            pending_unhandled: VecDeque::new(),
            pending_unhandled_bytes: 0,
            max_unhandled_frames: max_unhandled_frames.max(1),
            max_unhandled_bytes: max_unhandled_bytes.max(1),
            unhandled_frames_queued: 0,
            unhandled_frames_flushed: 0,
            unhandled_frames_dropped: 0,
        }
    }

    pub fn route_inbound_app_frame(
        &mut self,
        state: SessionState,
        frame: Frame,
        out: &mut Vec<Frame>,
    ) -> PipelineFrameAction {
        if state == SessionState::Connected {
            out.push(frame);
            return PipelineFrameAction::Deliver;
        }

        let frame_bytes = frame.payload.len();
        if self.pending_unhandled.len() >= self.max_unhandled_frames
            || self.pending_unhandled_bytes.saturating_add(frame_bytes) > self.max_unhandled_bytes
        {
            self.unhandled_frames_dropped = self.unhandled_frames_dropped.saturating_add(1);
            return PipelineFrameAction::Overflow;
        }

        self.pending_unhandled_bytes = self.pending_unhandled_bytes.saturating_add(frame_bytes);
        self.pending_unhandled.push_back(frame);
        self.unhandled_frames_queued = self.unhandled_frames_queued.saturating_add(1);
        PipelineFrameAction::Queued
    }

    pub fn flush_if_connected(&mut self, state: SessionState, out: &mut Vec<Frame>) -> usize {
        if state != SessionState::Connected || self.pending_unhandled.is_empty() {
            return 0;
        }

        let mut flushed = 0usize;
        while let Some(frame) = self.pending_unhandled.pop_front() {
            self.pending_unhandled_bytes = self
                .pending_unhandled_bytes
                .saturating_sub(frame.payload.len());
            out.push(frame);
            flushed = flushed.saturating_add(1);
        }
        self.unhandled_frames_flushed =
            self.unhandled_frames_flushed.saturating_add(flushed as u64);
        flushed
    }

    pub fn metrics_snapshot(&self) -> SessionPipelineMetricsSnapshot {
        SessionPipelineMetricsSnapshot {
            pending_unhandled_frames: self.pending_unhandled.len(),
            pending_unhandled_bytes: self.pending_unhandled_bytes,
            unhandled_frames_queued: self.unhandled_frames_queued,
            unhandled_frames_flushed: self.unhandled_frames_flushed,
            unhandled_frames_dropped: self.unhandled_frames_dropped,
        }
    }
}

#[cfg(test)]
mod tests {
    use bytes::Bytes;

    use super::{PipelineFrameAction, SessionPipeline};
    use crate::protocol::frame::Frame;
    use crate::protocol::frame_header::FrameHeader;
    use crate::protocol::reliability::Reliability;
    use crate::session::SessionState;

    fn test_frame(payload: &'static [u8]) -> Frame {
        Frame {
            header: FrameHeader::new(Reliability::ReliableOrdered, false, false),
            bit_length: (payload.len() as u16) << 3,
            reliable_index: None,
            sequence_index: None,
            ordering_index: None,
            ordering_channel: None,
            split: None,
            payload: Bytes::from_static(payload),
        }
    }

    #[test]
    fn unhandled_frames_are_buffered_and_flushed_on_connected() {
        let mut pipeline = SessionPipeline::new(4, 1024);
        let mut delivered = Vec::new();

        let action = pipeline.route_inbound_app_frame(
            SessionState::Reply2Sent,
            test_frame(b"\xfehello"),
            &mut delivered,
        );
        assert_eq!(action, PipelineFrameAction::Queued);
        assert!(delivered.is_empty());

        assert_eq!(
            pipeline.flush_if_connected(SessionState::Reply2Sent, &mut delivered),
            0
        );
        assert_eq!(
            pipeline.flush_if_connected(SessionState::Connected, &mut delivered),
            1
        );
        assert_eq!(delivered.len(), 1);

        let metrics = pipeline.metrics_snapshot();
        assert_eq!(metrics.pending_unhandled_frames, 0);
        assert_eq!(metrics.unhandled_frames_queued, 1);
        assert_eq!(metrics.unhandled_frames_flushed, 1);
        assert_eq!(metrics.unhandled_frames_dropped, 0);
    }

    #[test]
    fn overflow_drops_frame_and_reports_overflow() {
        let mut pipeline = SessionPipeline::new(1, 1024);
        let mut delivered = Vec::new();

        let first = pipeline.route_inbound_app_frame(
            SessionState::Reply2Sent,
            test_frame(b"\xfeone"),
            &mut delivered,
        );
        let second = pipeline.route_inbound_app_frame(
            SessionState::Reply2Sent,
            test_frame(b"\xfetwo"),
            &mut delivered,
        );

        assert_eq!(first, PipelineFrameAction::Queued);
        assert_eq!(second, PipelineFrameAction::Overflow);

        let metrics = pipeline.metrics_snapshot();
        assert_eq!(metrics.pending_unhandled_frames, 1);
        assert_eq!(metrics.unhandled_frames_queued, 1);
        assert_eq!(metrics.unhandled_frames_dropped, 1);
    }

    #[test]
    fn connected_state_delivers_immediately_without_queueing() {
        let mut pipeline = SessionPipeline::new(2, 32);
        let mut delivered = Vec::new();

        let action = pipeline.route_inbound_app_frame(
            SessionState::Connected,
            test_frame(b"\xfelive"),
            &mut delivered,
        );

        assert_eq!(action, PipelineFrameAction::Deliver);
        assert_eq!(delivered.len(), 1);

        let metrics = pipeline.metrics_snapshot();
        assert_eq!(metrics.pending_unhandled_frames, 0);
        assert_eq!(metrics.pending_unhandled_bytes, 0);
        assert_eq!(metrics.unhandled_frames_queued, 0);
        assert_eq!(metrics.unhandled_frames_flushed, 0);
        assert_eq!(metrics.unhandled_frames_dropped, 0);
    }

    #[test]
    fn overflow_can_be_triggered_by_byte_budget() {
        let mut pipeline = SessionPipeline::new(4, 6);
        let mut delivered = Vec::new();

        let first = pipeline.route_inbound_app_frame(
            SessionState::Reply2Sent,
            test_frame(b"1234"),
            &mut delivered,
        );
        let second = pipeline.route_inbound_app_frame(
            SessionState::Reply2Sent,
            test_frame(b"5678"),
            &mut delivered,
        );

        assert_eq!(first, PipelineFrameAction::Queued);
        assert_eq!(second, PipelineFrameAction::Overflow);

        let metrics = pipeline.metrics_snapshot();
        assert_eq!(metrics.pending_unhandled_frames, 1);
        assert_eq!(metrics.pending_unhandled_bytes, 4);
        assert_eq!(metrics.unhandled_frames_dropped, 1);
    }
}