uniudp 1.0.0

Unidirectional UDP transport with chunking, redundancy, and Reed-Solomon FEC.
Documentation
use std::time::Instant;

use crate::receiver::session::SessionNonceOutcome;
use crate::types::{MessageKey, ReceiverRuntimeConfig, SenderId};

use super::{ReceiverState, SenderSessionState};

impl ReceiverState {
    pub(in crate::receiver) fn can_track_session(
        &self,
        sender_id: SenderId,
        session_nonce: u64,
        config: &ReceiverRuntimeConfig,
    ) -> bool {
        let sender_sessions = self.sender_sessions.get(&sender_id);
        if sender_sessions.is_some_and(|sessions| sessions.contains_key(&session_nonce)) {
            return true;
        }
        let tracked_for_sender = sender_sessions.map_or(0, std::collections::HashMap::len);
        self.tracked_sessions < config.max_tracked_sessions_total()
            && tracked_for_sender < config.max_tracked_sessions_per_sender()
    }

    pub(in crate::receiver) fn check_session_nonce(
        &self,
        sender_id: SenderId,
        session_nonce: u64,
    ) -> SessionNonceOutcome {
        let Some(existing_sessions) = self.sender_sessions.get(&sender_id) else {
            return SessionNonceOutcome::Initialize;
        };
        if existing_sessions.contains_key(&session_nonce) {
            SessionNonceOutcome::Current
        } else {
            SessionNonceOutcome::Advance
        }
    }

    pub(in crate::receiver) fn is_message_fresh(
        &self,
        key: MessageKey,
        config: &ReceiverRuntimeConfig,
        session_outcome: SessionNonceOutcome,
    ) -> bool {
        match session_outcome {
            SessionNonceOutcome::Initialize | SessionNonceOutcome::Advance => true,
            SessionNonceOutcome::Current => {
                let Some(session) = self
                    .sender_sessions
                    .get(&key.sender_id)
                    .and_then(|sessions| sessions.get(&key.session_nonce))
                else {
                    return true;
                };
                if config.strict_message_ordering() {
                    return key.message_id > session.max_message_id;
                }
                if key.message_id >= session.max_message_id {
                    return true;
                }
                config.unbounded_message_freshness()
                    || session.max_message_id.saturating_sub(key.message_id)
                        <= config.message_freshness_window()
            }
        }
    }

    pub(in crate::receiver) fn note_message_seen(
        &mut self,
        key: MessageKey,
        now: Instant,
        session_outcome: SessionNonceOutcome,
    ) {
        match session_outcome {
            SessionNonceOutcome::Advance | SessionNonceOutcome::Initialize => {
                let inserted = self
                    .sender_sessions
                    .entry(key.sender_id)
                    .or_default()
                    .insert(
                        key.session_nonce,
                        SenderSessionState {
                            max_message_id: key.message_id,
                            last_seen: now,
                        },
                    );
                if inserted.is_none() {
                    self.tracked_sessions = self.tracked_sessions.saturating_add(1);
                }
            }
            SessionNonceOutcome::Current => {
                let sender_sessions = self.sender_sessions.entry(key.sender_id).or_default();
                let session = sender_sessions
                    .get_mut(&key.session_nonce)
                    .expect("current session outcome requires existing sender/session state");
                session.max_message_id = session.max_message_id.max(key.message_id);
                session.last_seen = now;
            }
        }
    }
}