crossio-core 0.1.0

Core abstractions for the crossio async I/O backend
Documentation
//! A small, in-memory timer queue used by reactors and runtimes to track
//! upcoming wakeups. It stays backend-agnostic so every platform can share
//! the same scheduling logic.

use std::{
    cmp::Ordering,
    collections::{BinaryHeap, HashMap},
    time::{Duration, Instant},
};

/// Lightweight identifier returned when scheduling a timer.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct TimerId(u64);

impl TimerId {
    fn new(raw: u64) -> Self {
        TimerId(raw)
    }
}

/// Internal representation of a scheduled timer.
#[derive(Debug, Clone)]
struct TimerEntry {
    id: TimerId,
    deadline: Instant,
}

/// Wrapper used to turn the `BinaryHeap` into a min-heap on deadline.
#[derive(Debug)]
struct HeapItem(TimerEntry);

impl PartialEq for HeapItem {
    fn eq(&self, other: &Self) -> bool {
        self.0.deadline.eq(&other.0.deadline) && self.0.id.eq(&other.0.id)
    }
}

impl Eq for HeapItem {}

impl PartialOrd for HeapItem {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        // Reverse ordering to make the smallest deadline bubble to the top.
        Some(other.0.deadline.cmp(&self.0.deadline))
    }
}

impl Ord for HeapItem {
    fn cmp(&self, other: &Self) -> Ordering {
        // Reverse ordering to make the smallest deadline bubble to the top.
        other.0.deadline.cmp(&self.0.deadline)
    }
}

/// Small timer queue backed by a binary heap.
///
/// The queue knows nothing about sockets or backends – it simply tracks
/// deadlines. Reactors can use it to pick reasonable poll timeouts and decide
/// which tasks to wake up next.
#[derive(Debug, Default)]
pub struct TimerQueue {
    next_id: u64,
    heap: BinaryHeap<HeapItem>,
    // Tracks which timers are still active so cancellations can be applied
    // lazily without mutating the heap in-place.
    active: HashMap<TimerId, Instant>,
}

impl TimerQueue {
    /// Creates an empty timer queue.
    pub fn new() -> Self {
        Self::default()
    }

    /// Returns true when no timers are scheduled.
    pub fn is_empty(&self) -> bool {
        self.active.is_empty()
    }

    /// Returns the number of active timers.
    pub fn len(&self) -> usize {
        self.active.len()
    }

    /// Returns true if the given timer id is still scheduled.
    pub fn contains(&self, id: TimerId) -> bool {
        self.active.contains_key(&id)
    }

    /// Schedules a timer to fire after the provided duration.
    pub fn schedule_after(&mut self, delay: Duration, now: Instant) -> TimerId {
        self.schedule_deadline(now + delay)
    }

    /// Schedules a timer for the given absolute deadline.
    pub fn schedule_deadline(&mut self, deadline: Instant) -> TimerId {
        let id = TimerId::new(self.next_id);
        self.next_id = self.next_id.wrapping_add(1);

        let entry = TimerEntry { id, deadline };
        self.active.insert(id, deadline);
        self.heap.push(HeapItem(entry));

        id
    }

    /// Cancels a previously scheduled timer. Returns true if the timer was
    /// active at the time of the call.
    pub fn cancel(&mut self, id: TimerId) -> bool {
        self.active.remove(&id).is_some()
    }

    /// Returns the deadline of the next timer to fire, if any.
    pub fn next_deadline(&mut self) -> Option<Instant> {
        self.peek_active().map(|entry| entry.deadline)
    }

    /// Drains all timers whose deadlines are <= `now`, returning their ids in
    /// chronological order.
    pub fn poll_expired(&mut self, now: Instant) -> Vec<TimerId> {
        let mut expired = Vec::new();

        while let Some(entry) = self.peek_active() {
            if entry.deadline > now {
                break;
            }

            let id = entry.id;
            // Remove from active set so subsequent calls do not surface it
            // again.
            self.active.remove(&id);
            // Remove the corresponding heap entry. At this point we know the
            // front of the heap matches `entry` because `peek_active` only
            // returns active timers and does not modify the heap in that
            // branch.
            let _ = self.heap.pop();
            expired.push(id);
        }

        expired
    }

    fn peek_active(&mut self) -> Option<TimerEntry> {
        loop {
            let head = match self.heap.peek() {
                Some(item) => item.0.clone(),
                None => return None,
            };

            if self.active.contains_key(&head.id) {
                return Some(head);
            }

            // Timer was cancelled; remove it from the heap and keep searching.
            let _ = self.heap.pop();
        }
    }
}

#[cfg(test)]
mod tests {
    use super::TimerQueue;
    use std::time::{Duration, Instant};

    #[test]
    fn schedule_and_expire_in_order() {
        let start = Instant::now();
        let mut timers = TimerQueue::new();

        let t1 = timers.schedule_after(Duration::from_millis(10), start);
        let t2 = timers.schedule_after(Duration::from_millis(20), start);

        assert_eq!(timers.len(), 2);

        let expired = timers.poll_expired(start + Duration::from_millis(15));
        assert_eq!(expired, vec![t1]);

        let expired = timers.poll_expired(start + Duration::from_millis(25));
        assert_eq!(expired, vec![t2]);
        assert!(timers.is_empty());
    }

    #[test]
    fn cancellation_prevents_expiry() {
        let start = Instant::now();
        let mut timers = TimerQueue::new();

        let t1 = timers.schedule_after(Duration::from_millis(10), start);
        let _t2 = timers.schedule_after(Duration::from_millis(20), start);

        assert!(timers.cancel(t1));

        let expired = timers.poll_expired(start + Duration::from_millis(25));
        assert_eq!(expired.len(), 1);
        assert_ne!(expired[0], t1);
    }

    #[test]
    fn next_deadline_reports_earliest() {
        let start = Instant::now();
        let mut timers = TimerQueue::new();

        let d1 = start + Duration::from_millis(30);
        let d2 = start + Duration::from_millis(10);

        timers.schedule_deadline(d1);
        timers.schedule_deadline(d2);

        assert_eq!(timers.next_deadline(), Some(d2));
    }
}