uniudp 1.0.0

Unidirectional UDP transport with chunking, redundancy, and Reed-Solomon FEC.
Documentation
use std::mem::size_of;

use crate::types::{MessageKey, ReceiverRuntimeConfig, SourcePolicy};

use super::{MessageState, ReceiverState};

impl ReceiverState {
    pub(super) fn pending_cost(state: &MessageState) -> usize {
        state
            .pending_bytes_estimate()
            .saturating_add(size_of::<MessageKey>())
    }

    pub(super) fn insert_pending(&mut self, key: MessageKey, state: MessageState) {
        let is_complete = state.is_complete();
        let created_at = state.created_at;
        if self.pending.contains_key(&key) {
            self.remove_pending(&key);
        }
        self.pending_estimated_bytes = self
            .pending_estimated_bytes
            .saturating_add(Self::pending_cost(&state));
        self.pending.insert(key, state);
        self.pending_index.insert(key, created_at);
        if is_complete {
            self.enqueue_complete(key);
        }
        self.assert_index_invariants();
    }

    pub(super) fn remove_pending(&mut self, key: &MessageKey) -> Option<MessageState> {
        let state = self.pending.remove(key)?;
        self.pending_estimated_bytes = self
            .pending_estimated_bytes
            .saturating_sub(Self::pending_cost(&state));
        self.pending_index.remove(key);
        self.remove_completion_tracking(*key);
        self.assert_index_invariants();
        Some(state)
    }

    pub(super) fn enqueue_complete(&mut self, key: MessageKey) {
        self.completion_index.insert_if_absent(key);
        self.assert_index_invariants();
    }

    fn remove_completion_tracking(&mut self, key: MessageKey) {
        self.completion_index.remove(&key);
    }

    fn evict_least_active(&mut self, protected_key: Option<MessageKey>) -> bool {
        let victim = self
            .pending
            .iter()
            .filter(|&(k, _)| Some(*k) != protected_key)
            .min_by_key(|(_, state)| state.last_activity_at)
            .map(|(k, _)| *k);
        let Some(key) = victim else {
            return false;
        };
        self.remove_pending(&key);
        true
    }

    pub(in crate::receiver) fn remove_pending_if_allowed(
        &mut self,
        key: MessageKey,
        policy: SourcePolicy,
    ) -> Option<MessageState> {
        let allowed = self
            .pending
            .get(&key)
            .is_some_and(|state| policy.allows_buffered(state.first_source));
        if !allowed {
            return None;
        }
        self.remove_pending(&key)
    }

    pub(super) fn evict_oldest_if_needed(
        &mut self,
        incoming_message_bytes: usize,
        config: &ReceiverRuntimeConfig,
        protected_key: Option<MessageKey>,
    ) -> bool {
        if incoming_message_bytes > config.max_pending_bytes() {
            return false;
        }

        while self.pending.len() >= config.max_pending_messages()
            || self
                .pending_estimated_bytes
                .saturating_add(incoming_message_bytes)
                > config.max_pending_bytes()
        {
            if !self.evict_least_active(protected_key) {
                return false;
            }
        }
        self.assert_index_invariants();
        true
    }

    pub(in crate::receiver) fn find_complete_message(
        &mut self,
        filter_key: Option<MessageKey>,
        policy: SourcePolicy,
    ) -> Option<MessageState> {
        if let Some(key) = filter_key {
            if self.pending.get(&key).is_some_and(|state| {
                state.is_complete() && policy.allows_buffered(state.first_source)
            }) {
                return self.remove_pending(&key);
            }
            return None;
        }

        let mut stale: Vec<(u64, MessageKey)> = Vec::new();
        let mut selected_key: Option<MessageKey> = None;
        for (seq, key) in self.completion_index.entries() {
            let Some(state) = self.pending.get(&key) else {
                stale.push((seq, key));
                continue;
            };
            if !state.is_complete() {
                stale.push((seq, key));
                continue;
            }
            if !policy.allows_buffered(state.first_source) {
                continue;
            }
            selected_key = Some(key);
            break;
        }

        for (seq, key) in stale {
            self.completion_index.remove_exact(seq, key);
        }

        if let Some(key) = selected_key {
            return self.remove_pending(&key);
        }
        self.assert_index_invariants();
        None
    }

    pub(in crate::receiver) fn is_pending_complete(&self, key: MessageKey) -> bool {
        self.pending
            .get(&key)
            .is_some_and(MessageState::is_complete)
    }
}