irtt-client 0.1.1

IRTT-compatible Rust client library
Documentation
use std::{
    collections::{HashMap, VecDeque},
    time::Instant,
};

use crate::{error::ClientError, timing::ClientTimestamp};

#[derive(Debug, Clone)]
pub(crate) struct PendingProbe {
    pub logical_seq: u64,
    pub wire_seq: u32,
    pub sent_at: ClientTimestamp,
    pub timeout_at: Instant,
}

#[derive(Debug)]
pub(crate) struct PendingMap {
    map: HashMap<u32, PendingProbe>,
    max_capacity: usize,
}

impl PendingMap {
    pub fn new(max_capacity: usize) -> Self {
        Self {
            map: HashMap::new(),
            max_capacity,
        }
    }

    pub fn check_capacity(&self) -> Result<(), ClientError> {
        if self.map.len() >= self.max_capacity {
            return Err(ClientError::PendingLimitExceeded {
                limit: self.max_capacity,
            });
        }
        Ok(())
    }

    pub fn insert(&mut self, probe: PendingProbe) -> Result<(), ClientError> {
        self.check_capacity()?;
        self.map.insert(probe.wire_seq, probe);
        Ok(())
    }

    pub fn remove(&mut self, wire_seq: u32) -> Option<PendingProbe> {
        self.map.remove(&wire_seq)
    }

    pub fn drain_expired(&mut self, now: Instant) -> Vec<PendingProbe> {
        let expired_keys: Vec<u32> = self
            .map
            .iter()
            .filter(|(_, probe)| probe.timeout_at <= now)
            .map(|(key, _)| *key)
            .collect();
        let mut expired = Vec::with_capacity(expired_keys.len());
        for key in expired_keys {
            if let Some(probe) = self.map.remove(&key) {
                expired.push(probe);
            }
        }
        expired.sort_by_key(|p| p.logical_seq);
        expired
    }

    #[allow(dead_code)]
    pub fn len(&self) -> usize {
        self.map.len()
    }
}

#[derive(Debug)]
pub(crate) struct TimedOutMap {
    map: HashMap<u32, PendingProbe>,
    insertion_order: VecDeque<u32>,
    max_capacity: usize,
}

impl TimedOutMap {
    pub fn new(max_capacity: usize) -> Self {
        Self {
            map: HashMap::new(),
            insertion_order: VecDeque::new(),
            max_capacity,
        }
    }

    pub fn insert(&mut self, probe: PendingProbe) {
        if self.max_capacity == 0 {
            return;
        }
        if let std::collections::hash_map::Entry::Occupied(mut entry) =
            self.map.entry(probe.wire_seq)
        {
            entry.insert(probe);
            return;
        }
        while self.map.len() >= self.max_capacity {
            self.evict_oldest();
        }
        self.insertion_order.push_back(probe.wire_seq);
        self.map.insert(probe.wire_seq, probe);
    }

    pub fn remove(&mut self, wire_seq: u32) -> Option<PendingProbe> {
        let removed = self.map.remove(&wire_seq);
        if removed.is_some() {
            self.insertion_order.retain(|seq| *seq != wire_seq);
        }
        removed
    }

    pub fn clear(&mut self) {
        self.map.clear();
        self.insertion_order.clear();
    }

    #[allow(dead_code)]
    pub fn len(&self) -> usize {
        self.map.len()
    }

    #[cfg(test)]
    fn insertion_order_len(&self) -> usize {
        self.insertion_order.len()
    }

    fn evict_oldest(&mut self) {
        while let Some(oldest_key) = self.insertion_order.pop_front() {
            if self.map.remove(&oldest_key).is_some() {
                break;
            }
        }
    }
}

#[derive(Debug)]
pub(crate) struct CompletedSet {
    set: HashMap<u32, u64>,
    insertion_order: VecDeque<u32>,
    max_capacity: usize,
}

impl CompletedSet {
    pub fn new(max_capacity: usize) -> Self {
        Self {
            set: HashMap::new(),
            insertion_order: VecDeque::new(),
            max_capacity,
        }
    }

    pub fn insert(&mut self, wire_seq: u32, logical_seq: u64) {
        if let Some(existing) = self.set.get_mut(&wire_seq) {
            *existing = logical_seq;
            return;
        }
        if self.set.len() >= self.max_capacity {
            self.evict_oldest();
        }
        self.insertion_order.push_back(wire_seq);
        self.set.insert(wire_seq, logical_seq);
    }

    pub fn contains(&self, wire_seq: u32) -> bool {
        self.set.contains_key(&wire_seq)
    }

    fn evict_oldest(&mut self) {
        while let Some(oldest_key) = self.insertion_order.pop_front() {
            if self.set.remove(&oldest_key).is_some() {
                break;
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::{Duration, SystemTime};

    fn ts(mono: Instant) -> ClientTimestamp {
        ClientTimestamp {
            mono,
            wall: SystemTime::now(),
        }
    }

    fn pending(seq: u32, logical: u64, timeout_at: Instant) -> PendingProbe {
        PendingProbe {
            logical_seq: logical,
            wire_seq: seq,
            sent_at: ts(timeout_at - Duration::from_secs(1)),
            timeout_at,
        }
    }

    #[test]
    fn pending_map_insert_and_remove() {
        let mut map = PendingMap::new(10);
        let now = Instant::now();
        let probe = pending(0, 0, now + Duration::from_secs(4));
        map.insert(probe).unwrap();
        assert_eq!(map.len(), 1);
        assert!(map.remove(0).is_some());
        assert_eq!(map.len(), 0);
        assert!(map.remove(0).is_none());
    }

    #[test]
    fn pending_map_rejects_over_capacity() {
        let mut map = PendingMap::new(2);
        let now = Instant::now();
        map.insert(pending(0, 0, now + Duration::from_secs(4)))
            .unwrap();
        map.insert(pending(1, 1, now + Duration::from_secs(4)))
            .unwrap();
        assert!(matches!(
            map.insert(pending(2, 2, now + Duration::from_secs(4))),
            Err(ClientError::PendingLimitExceeded { limit: 2 })
        ));
    }

    #[test]
    fn drain_expired_returns_in_logical_order() {
        let mut map = PendingMap::new(10);
        let now = Instant::now();
        let past = now - Duration::from_secs(1);
        map.insert(pending(5, 5, past)).unwrap();
        map.insert(pending(2, 2, past)).unwrap();
        map.insert(pending(8, 8, now + Duration::from_secs(10)))
            .unwrap();
        let expired = map.drain_expired(now);
        assert_eq!(expired.len(), 2);
        assert_eq!(expired[0].wire_seq, 2);
        assert_eq!(expired[1].wire_seq, 5);
        assert_eq!(map.len(), 1);
    }

    #[test]
    fn completed_set_tracks_and_evicts() {
        let mut set = CompletedSet::new(3);
        set.insert(0, 0);
        set.insert(1, 1);
        set.insert(2, 2);
        assert!(set.contains(0));
        assert!(set.contains(1));
        assert!(set.contains(2));
        set.insert(3, 3);
        assert_eq!(set.set.len(), 3);
        assert!(!set.contains(0));
        assert!(set.contains(3));
    }

    #[test]
    fn timed_out_map_tracks_and_evicts() {
        let mut map = TimedOutMap::new(2);
        let now = Instant::now();
        map.insert(pending(0, 0, now));
        map.insert(pending(1, 1, now));
        map.insert(pending(2, 2, now));

        assert_eq!(map.len(), 2);
        assert!(map.remove(0).is_none());
        assert!(map.remove(1).is_some());
        assert!(map.remove(2).is_some());
    }

    #[test]
    fn timed_out_map_remove_prunes_insertion_order() {
        let mut map = TimedOutMap::new(4);
        let now = Instant::now();

        for i in 0..20 {
            map.insert(pending(i, u64::from(i), now));
            assert!(map.remove(i).is_some());
            assert_eq!(map.len(), 0);
            assert_eq!(map.insertion_order_len(), 0);
        }
    }
}