Skip to main content

bb_runtime/framework/
scheduler.rs

1//! `Scheduler` - sorted timer heap.
2//!
3//! ships the real `BinaryHeap`-backed scheduler that drives
4//! `Interval` / `After` / `Sleep` / `TimerKind::Completion` syscall
5//! ops. `has_matured(now_ns)` reports whether any timer has matured
6//! by the supplied time; `poll_matured(now_ns)` drains every
7//! matured timer in age order.
8
9use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11
12use crate::ids::CommandId;
13
14/// What a matured timer signals covers Sleep / Interval /
15/// After / Completion.
16#[derive(Clone, Copy, Debug, PartialEq, Eq)]
17pub enum TimerKind {
18    /// One-shot sleep maturity - fulfills `CommandId`.
19    Sleep(CommandId),
20    /// Periodic timer - re-armed after maturity. Carries the
21    /// owning Op's site name as a u64 key for the matured-timer
22    /// routing.
23    Interval {
24        /// Period in nanoseconds.
25        period_ns: u64,
26        /// Stable id for the owning Op (caller-supplied u64).
27        key: u64,
28    },
29    /// One-shot delayed Trigger emission.
30    After {
31        /// Stable id for the owning Op.
32        key: u64,
33    },
34    /// External-completion shim (used by `Sleep`-like ops that
35    /// re-route through `handle_completion`).
36    Completion(CommandId),
37}
38
39/// Internal heap entry. `BinaryHeap` is a max-heap by default;
40/// we reverse the ordering on `maturity_ns` so the earliest
41/// timer is at the top.
42#[derive(Debug)]
43struct Entry {
44    maturity_ns: u64,
45    kind: TimerKind,
46}
47
48impl Ord for Entry {
49    fn cmp(&self, other: &Self) -> Ordering {
50        other.maturity_ns.cmp(&self.maturity_ns)
51    }
52}
53
54impl PartialOrd for Entry {
55    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
56        Some(self.cmp(other))
57    }
58}
59
60impl PartialEq for Entry {
61    fn eq(&self, other: &Self) -> bool {
62        self.maturity_ns == other.maturity_ns
63    }
64}
65
66impl Eq for Entry {}
67
68/// Default cap on the timer heap. A runaway Interval / After /
69/// Pulse / Sleep call site would otherwise grow `heap` without
70/// bound and drag a long-running Node into OOM.
71pub const DEFAULT_TIMER_HEAP_CAP: usize = 65_536;
72
73/// Sorted timer heap.
74pub struct Scheduler {
75    heap: BinaryHeap<Entry>,
76    now_ns: u64,
77    /// Maximum permitted heap depth. `schedule` drops the latest
78    /// timer (with a `tracing::warn`) once `heap.len() >= cap`.
79    /// Operators tune via [`Self::set_cap`].
80    cap: usize,
81    /// Count of timers dropped due to cap. Exposed for telemetry.
82    dropped: u64,
83}
84
85impl Default for Scheduler {
86    fn default() -> Self {
87        Self {
88            heap: BinaryHeap::new(),
89            now_ns: 0,
90            cap: DEFAULT_TIMER_HEAP_CAP,
91            dropped: 0,
92        }
93    }
94}
95
96impl Scheduler {
97    /// Construct a fresh scheduler with `now_ns = 0`.
98    pub fn new() -> Self {
99        Self::default()
100    }
101
102    /// Override the timer heap cap. Production paths size from
103    /// `NodeConfig` if the host advertises a different bound.
104    pub fn set_cap(&mut self, cap: usize) {
105        self.cap = cap.max(1);
106    }
107
108    /// Number of timers dropped on overflow since construction.
109    pub fn dropped(&self) -> u64 {
110        self.dropped
111    }
112
113    /// Advance the scheduler's notion of current time.
114    pub fn set_now(&mut self, now_ns: u64) {
115        self.now_ns = now_ns;
116    }
117
118    /// Read the current time.
119    pub fn now_ns(&self) -> u64 {
120        self.now_ns
121    }
122
123    /// Schedule a timer. Drops with a `tracing::warn` once the
124    /// heap reaches its cap so a runaway Interval/After loop
125    /// can't grow the heap to OOM.
126    pub fn schedule(&mut self, maturity_ns: u64, kind: TimerKind) {
127        if self.heap.len() >= self.cap {
128            self.dropped = self.dropped.saturating_add(1);
129            tracing::warn!(
130                cap = self.cap,
131                dropped_total = self.dropped,
132                ?kind,
133                "Scheduler: timer dropped, heap at cap",
134            );
135            return;
136        }
137        self.heap.push(Entry { maturity_ns, kind });
138    }
139
140    /// Whether any timer has matured by `now_ns`.
141    pub fn has_matured(&self, now_ns: u64) -> bool {
142        self.heap.peek().is_some_and(|e| e.maturity_ns <= now_ns)
143    }
144
145    /// Drain every timer whose `maturity_ns <= now_ns` in age order.
146    pub fn poll_matured(&mut self, now_ns: u64) -> Vec<TimerKind> {
147        let mut out = Vec::new();
148        while let Some(entry) = self.heap.peek() {
149            if entry.maturity_ns > now_ns {
150                break;
151            }
152            out.push(self.heap.pop().expect("just peeked").kind);
153        }
154        out
155    }
156
157    /// Number of pending (un-matured + matured) timers.
158    pub fn len(&self) -> usize {
159        self.heap.len()
160    }
161
162    /// Whether the heap is empty.
163    pub fn is_empty(&self) -> bool {
164        self.heap.is_empty()
165    }
166}
167
168#[cfg(test)]
169mod cap_tests {
170    use super::*;
171    use crate::ids::CommandId;
172
173    #[test]
174    fn schedule_drops_at_cap() {
175        let mut s = Scheduler::new();
176        s.set_cap(3);
177        for i in 0..3 {
178            s.schedule(i, TimerKind::Sleep(CommandId::from(i)));
179        }
180        assert_eq!(s.len(), 3);
181        s.schedule(100, TimerKind::Sleep(CommandId::from(100)));
182        assert_eq!(s.len(), 3, "4th schedule dropped");
183        assert_eq!(s.dropped(), 1);
184    }
185
186    #[test]
187    fn schedule_resumes_when_heap_drains_below_cap() {
188        let mut s = Scheduler::new();
189        s.set_cap(2);
190        s.schedule(0, TimerKind::Sleep(CommandId::from(0)));
191        s.schedule(1, TimerKind::Sleep(CommandId::from(1)));
192        let _matured = s.poll_matured(0);
193        s.schedule(2, TimerKind::Sleep(CommandId::from(2)));
194        assert_eq!(s.dropped(), 0);
195    }
196}
197