bb_runtime/framework/
scheduler.rs1use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11
12use crate::ids::CommandId;
13
14#[derive(Clone, Copy, Debug, PartialEq, Eq)]
17pub enum TimerKind {
18 Sleep(CommandId),
20 Interval {
24 period_ns: u64,
26 key: u64,
28 },
29 After {
31 key: u64,
33 },
34 Completion(CommandId),
37}
38
39#[derive(Debug)]
43struct Entry {
44 maturity_ns: u64,
45 kind: TimerKind,
46}
47
48impl Ord for Entry {
49 fn cmp(&self, other: &Self) -> Ordering {
50 other.maturity_ns.cmp(&self.maturity_ns)
51 }
52}
53
54impl PartialOrd for Entry {
55 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
56 Some(self.cmp(other))
57 }
58}
59
60impl PartialEq for Entry {
61 fn eq(&self, other: &Self) -> bool {
62 self.maturity_ns == other.maturity_ns
63 }
64}
65
66impl Eq for Entry {}
67
68pub const DEFAULT_TIMER_HEAP_CAP: usize = 65_536;
72
73pub struct Scheduler {
75 heap: BinaryHeap<Entry>,
76 now_ns: u64,
77 cap: usize,
81 dropped: u64,
83}
84
85impl Default for Scheduler {
86 fn default() -> Self {
87 Self {
88 heap: BinaryHeap::new(),
89 now_ns: 0,
90 cap: DEFAULT_TIMER_HEAP_CAP,
91 dropped: 0,
92 }
93 }
94}
95
96impl Scheduler {
97 pub fn new() -> Self {
99 Self::default()
100 }
101
102 pub fn set_cap(&mut self, cap: usize) {
105 self.cap = cap.max(1);
106 }
107
108 pub fn dropped(&self) -> u64 {
110 self.dropped
111 }
112
113 pub fn set_now(&mut self, now_ns: u64) {
115 self.now_ns = now_ns;
116 }
117
118 pub fn now_ns(&self) -> u64 {
120 self.now_ns
121 }
122
123 pub fn schedule(&mut self, maturity_ns: u64, kind: TimerKind) {
127 if self.heap.len() >= self.cap {
128 self.dropped = self.dropped.saturating_add(1);
129 tracing::warn!(
130 cap = self.cap,
131 dropped_total = self.dropped,
132 ?kind,
133 "Scheduler: timer dropped, heap at cap",
134 );
135 return;
136 }
137 self.heap.push(Entry { maturity_ns, kind });
138 }
139
140 pub fn has_matured(&self, now_ns: u64) -> bool {
142 self.heap.peek().is_some_and(|e| e.maturity_ns <= now_ns)
143 }
144
145 pub fn poll_matured(&mut self, now_ns: u64) -> Vec<TimerKind> {
147 let mut out = Vec::new();
148 while let Some(entry) = self.heap.peek() {
149 if entry.maturity_ns > now_ns {
150 break;
151 }
152 out.push(self.heap.pop().expect("just peeked").kind);
153 }
154 out
155 }
156
157 pub fn len(&self) -> usize {
159 self.heap.len()
160 }
161
162 pub fn is_empty(&self) -> bool {
164 self.heap.is_empty()
165 }
166}
167
168#[cfg(test)]
169mod cap_tests {
170 use super::*;
171 use crate::ids::CommandId;
172
173 #[test]
174 fn schedule_drops_at_cap() {
175 let mut s = Scheduler::new();
176 s.set_cap(3);
177 for i in 0..3 {
178 s.schedule(i, TimerKind::Sleep(CommandId::from(i)));
179 }
180 assert_eq!(s.len(), 3);
181 s.schedule(100, TimerKind::Sleep(CommandId::from(100)));
182 assert_eq!(s.len(), 3, "4th schedule dropped");
183 assert_eq!(s.dropped(), 1);
184 }
185
186 #[test]
187 fn schedule_resumes_when_heap_drains_below_cap() {
188 let mut s = Scheduler::new();
189 s.set_cap(2);
190 s.schedule(0, TimerKind::Sleep(CommandId::from(0)));
191 s.schedule(1, TimerKind::Sleep(CommandId::from(1)));
192 let _matured = s.poll_matured(0);
193 s.schedule(2, TimerKind::Sleep(CommandId::from(2)));
194 assert_eq!(s.dropped(), 0);
195 }
196}
197