1use core::num::NonZeroU64;
16use serde::{Deserialize, Serialize};
17use std::collections::{BTreeMap, BTreeSet};
18
19use crate::abi::{EntityId, Principal, Tick, TypeCode};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23#[serde(transparent)]
24pub struct ScheduledActionId(pub NonZeroU64);
25
26impl ScheduledActionId {
27 #[inline]
29 pub const fn new(v: u64) -> Option<Self> {
30 match NonZeroU64::new(v) {
31 Some(n) => Some(Self(n)),
32 None => None,
33 }
34 }
35
36 #[inline]
38 pub const fn get(self) -> u64 {
39 self.0.get()
40 }
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
46pub(crate) struct SchedKey {
47 pub at: Tick,
48 pub seq: u64,
49 pub id: ScheduledActionId,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub(crate) struct ScheduledEntry {
54 pub id: ScheduledActionId,
55 pub at: Tick,
56 pub actor: Option<EntityId>,
57 pub principal: Principal,
58 pub action_type_code: TypeCode,
59 pub action_bytes: Vec<u8>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub(crate) struct Scheduler {
66 ready: BTreeMap<SchedKey, ScheduledEntry>,
67 by_id: BTreeMap<ScheduledActionId, SchedKey>,
68 by_actor: BTreeMap<EntityId, BTreeSet<ScheduledActionId>>,
69 next_seq: u64,
71 next_id: u64,
73}
74
75impl Scheduler {
76 pub(crate) fn new() -> Self {
77 Self {
78 ready: BTreeMap::new(),
79 by_id: BTreeMap::new(),
80 by_actor: BTreeMap::new(),
81 next_seq: 0,
82 next_id: 0,
83 }
84 }
85
86 pub(crate) fn schedule(
88 &mut self,
89 at: Tick,
90 actor: Option<EntityId>,
91 principal: Principal,
92 type_code: TypeCode,
93 bytes: Vec<u8>,
94 ) -> ScheduledActionId {
95 self.next_id += 1;
96 let id = ScheduledActionId(
97 NonZeroU64::new(self.next_id).expect("next_id incremented before use; never zero"),
98 );
99
100 let seq = self.next_seq;
101 self.next_seq += 1;
102
103 let key = SchedKey { at, seq, id };
104 let entry = ScheduledEntry {
105 id,
106 at,
107 actor,
108 principal,
109 action_type_code: type_code,
110 action_bytes: bytes,
111 };
112
113 self.ready.insert(key, entry);
114 self.by_id.insert(id, key);
115 if let Some(actor_id) = actor {
116 self.by_actor.entry(actor_id).or_default().insert(id);
117 }
118
119 id
120 }
121
122 pub(crate) fn schedule_with_id(
126 &mut self,
127 id: ScheduledActionId,
128 at: Tick,
129 actor: Option<EntityId>,
130 principal: Principal,
131 type_code: TypeCode,
132 bytes: Vec<u8>,
133 ) {
134 if id.get() > self.next_id {
135 self.next_id = id.get();
136 }
137
138 let seq = self.next_seq;
139 self.next_seq += 1;
140
141 let key = SchedKey { at, seq, id };
142 let entry = ScheduledEntry {
143 id,
144 at,
145 actor,
146 principal,
147 action_type_code: type_code,
148 action_bytes: bytes,
149 };
150
151 self.ready.insert(key, entry);
152 self.by_id.insert(id, key);
153 if let Some(actor_id) = actor {
154 self.by_actor.entry(actor_id).or_default().insert(id);
155 }
156 }
157
158 pub(crate) fn cancel(&mut self, id: ScheduledActionId) -> Option<ScheduledEntry> {
163 let key = self.by_id.remove(&id)?;
164 let entry = self
165 .ready
166 .remove(&key)
167 .expect("ready/by_id consistency violated");
168 if let Some(actor_id) = entry.actor {
169 if let Some(set) = self.by_actor.get_mut(&actor_id) {
170 set.remove(&id);
171 if set.is_empty() {
172 self.by_actor.remove(&actor_id);
173 }
174 }
175 }
176 Some(entry)
177 }
178
179 #[cfg_attr(not(test), allow(dead_code))]
184 pub(crate) fn cancel_by_actor(&mut self, actor: EntityId) -> Vec<ScheduledEntry> {
185 let Some(ids) = self.by_actor.remove(&actor) else {
186 return Vec::new();
187 };
188 let mut cancelled = Vec::with_capacity(ids.len());
189 for id in ids {
190 if let Some(key) = self.by_id.remove(&id) {
191 if let Some(entry) = self.ready.remove(&key) {
192 cancelled.push(entry);
193 }
194 }
195 }
196 cancelled
197 }
198
199 pub(crate) fn pop_due(&mut self, now: Tick) -> Option<ScheduledEntry> {
202 let (&key, _) = self.ready.first_key_value()?;
203 if key.at > now {
204 return None;
205 }
206 let entry = self
207 .ready
208 .remove(&key)
209 .expect("first_key_value just returned this key");
210 self.by_id.remove(&entry.id);
211 if let Some(actor_id) = entry.actor {
212 if let Some(set) = self.by_actor.get_mut(&actor_id) {
213 set.remove(&entry.id);
214 if set.is_empty() {
215 self.by_actor.remove(&actor_id);
216 }
217 }
218 }
219 Some(entry)
220 }
221
222 #[cfg_attr(not(test), allow(dead_code))]
225 #[inline]
226 pub(crate) fn len(&self) -> usize {
227 self.ready.len()
228 }
229
230 #[cfg_attr(not(test), allow(dead_code))]
231 #[inline]
232 pub(crate) fn is_empty(&self) -> bool {
233 self.ready.is_empty()
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::abi::{EntityId, Tick, TypeCode};
241
242 fn p() -> Principal {
243 Principal::System
244 }
245 fn tc() -> TypeCode {
246 TypeCode(1)
247 }
248
249 #[test]
250 fn empty_state() {
251 let s = Scheduler::new();
252 assert_eq!(s.len(), 0);
253 assert!(s.is_empty());
254 }
255
256 #[test]
257 fn schedule_then_pop_due_single() {
258 let mut s = Scheduler::new();
259 let id = s.schedule(Tick(5), None, p(), tc(), vec![1, 2, 3]);
260 assert_eq!(s.len(), 1);
261 let entry = s.pop_due(Tick(5)).expect("entry due");
262 assert_eq!(entry.id, id);
263 assert_eq!(entry.at, Tick(5));
264 assert_eq!(entry.action_bytes, vec![1, 2, 3]);
265 assert!(s.is_empty());
266 }
267
268 #[test]
269 fn pop_due_before_time_returns_none() {
270 let mut s = Scheduler::new();
271 s.schedule(Tick(10), None, p(), tc(), vec![]);
272 assert!(s.pop_due(Tick(9)).is_none());
273 assert_eq!(s.len(), 1);
274 }
275
276 #[test]
277 fn pop_due_at_exact_tick_pops() {
278 let mut s = Scheduler::new();
279 let id = s.schedule(Tick(5), None, p(), tc(), vec![]);
280 assert_eq!(s.pop_due(Tick(5)).unwrap().id, id);
281 }
282
283 #[test]
284 fn pop_due_ordering_by_tick() {
285 let mut s = Scheduler::new();
286 let id_late = s.schedule(Tick(20), None, p(), tc(), vec![]);
287 let id_early = s.schedule(Tick(5), None, p(), tc(), vec![]);
288 let id_mid = s.schedule(Tick(10), None, p(), tc(), vec![]);
289 assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_early);
290 assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_mid);
291 assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_late);
292 }
293
294 #[test]
295 fn pop_due_tiebreak_by_seq() {
296 let mut s = Scheduler::new();
297 let id1 = s.schedule(Tick(5), None, p(), tc(), vec![1]);
298 let id2 = s.schedule(Tick(5), None, p(), tc(), vec![2]);
299 let id3 = s.schedule(Tick(5), None, p(), tc(), vec![3]);
300 assert_eq!(s.pop_due(Tick(5)).unwrap().id, id1);
301 assert_eq!(s.pop_due(Tick(5)).unwrap().id, id2);
302 assert_eq!(s.pop_due(Tick(5)).unwrap().id, id3);
303 }
304
305 #[test]
306 fn cancel_removes_entry() {
307 let mut s = Scheduler::new();
308 let id = s.schedule(Tick(5), None, p(), tc(), vec![]);
309 let cancelled = s.cancel(id).expect("found");
310 assert_eq!(cancelled.id, id);
311 assert!(s.is_empty());
312 assert!(s.pop_due(Tick(100)).is_none());
313 }
314
315 #[test]
316 fn cancel_unknown_returns_none() {
317 let mut s = Scheduler::new();
318 let bogus = ScheduledActionId::new(999).unwrap();
319 assert!(s.cancel(bogus).is_none());
320 }
321
322 #[test]
323 fn cancel_by_actor_removes_all() {
324 let mut s = Scheduler::new();
325 let actor = EntityId::new(1).unwrap();
326 let other = EntityId::new(2).unwrap();
327 let _ = s.schedule(Tick(5), Some(actor), p(), tc(), vec![]);
328 let _ = s.schedule(Tick(10), Some(actor), p(), tc(), vec![]);
329 let id_other = s.schedule(Tick(7), Some(other), p(), tc(), vec![]);
330 assert_eq!(s.len(), 3);
331 let cancelled = s.cancel_by_actor(actor);
332 assert_eq!(cancelled.len(), 2);
333 assert_eq!(s.len(), 1);
334 assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_other);
335 }
336
337 #[test]
338 fn cancel_by_actor_unknown_returns_empty() {
339 let mut s = Scheduler::new();
340 let actor = EntityId::new(99).unwrap();
341 assert!(s.cancel_by_actor(actor).is_empty());
342 }
343
344 #[test]
345 fn schedule_id_monotonic() {
346 let mut s = Scheduler::new();
347 let id1 = s.schedule(Tick(0), None, p(), tc(), vec![]);
348 let id2 = s.schedule(Tick(0), None, p(), tc(), vec![]);
349 let id3 = s.schedule(Tick(0), None, p(), tc(), vec![]);
350 assert!(id1 < id2);
351 assert!(id2 < id3);
352 assert_eq!(id1.get(), 1);
353 assert_eq!(id3.get(), 3);
354 }
355
356 #[test]
357 fn no_tombstones() {
358 let mut s = Scheduler::new();
360 let id1 = s.schedule(Tick(5), None, p(), tc(), vec![]);
361 let _id2 = s.schedule(Tick(5), None, p(), tc(), vec![]);
362 assert_eq!(s.len(), 2);
363 s.cancel(id1);
364 assert_eq!(s.len(), 1);
365 }
366
367 #[test]
368 fn determinism_same_sequence() {
369 fn run() -> Vec<u64> {
370 let mut s = Scheduler::new();
371 s.schedule(Tick(3), None, p(), tc(), vec![]);
372 s.schedule(Tick(1), None, p(), tc(), vec![]);
373 s.schedule(Tick(2), None, p(), tc(), vec![]);
374 s.schedule(Tick(1), None, p(), tc(), vec![]);
375 let mut out = Vec::new();
376 while let Some(e) = s.pop_due(Tick(100)) {
377 out.push(e.id.get());
378 }
379 out
380 }
381 assert_eq!(run(), run());
382 }
383}