Skip to main content

arkhe_kernel/state/
scheduler.rs

1//! BTreeMap-based scheduler with immediate-remove cancellation.
2//!
3//! Three indexed tables maintained in lockstep:
4//! - `ready: BTreeMap<SchedKey, ScheduledEntry>` — primary execution queue,
5//!   ordered by `(at, seq, id)`.
6//! - `by_id: BTreeMap<ScheduledActionId, SchedKey>` — O(log n) cancel lookup.
7//! - `by_actor: BTreeMap<EntityId, BTreeSet<ScheduledActionId>>` — O(k log n)
8//!   actor-scoped cancel.
9//!
10//! No tombstones — `cancel` immediately removes from all three tables.
11//! Determinism: BTreeMap iteration order is total over `SchedKey`, and
12//! `seq` is monotonic per kernel lifetime, so identical schedule sequences
13//! produce identical pop_due streams (deterministic).
14
15use core::num::NonZeroU64;
16use serde::{Deserialize, Serialize};
17use std::collections::{BTreeMap, BTreeSet};
18
19use crate::abi::{EntityId, Principal, Tick, TypeCode};
20
21/// Sentinel-free scheduled-action handle (A6).
22#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
23#[serde(transparent)]
24pub struct ScheduledActionId(pub NonZeroU64);
25
26impl ScheduledActionId {
27    /// Returns `Some(_)` iff `v != 0`.
28    #[inline]
29    pub const fn new(v: u64) -> Option<Self> {
30        match NonZeroU64::new(v) {
31            Some(n) => Some(Self(n)),
32            None => None,
33        }
34    }
35
36    /// Underlying non-zero `u64`.
37    #[inline]
38    pub const fn get(self) -> u64 {
39        self.0.get()
40    }
41}
42
43/// Total-ordered key — `(at, seq, id)`. Tick first; same-tick FIFO by `seq`;
44/// final disambiguator by `id` (defensive — `seq` alone is unique).
45#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
46pub(crate) struct SchedKey {
47    pub at: Tick,
48    pub seq: u64,
49    pub id: ScheduledActionId,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub(crate) struct ScheduledEntry {
54    pub id: ScheduledActionId,
55    pub at: Tick,
56    pub actor: Option<EntityId>,
57    pub principal: Principal,
58    pub action_type_code: TypeCode,
59    /// Canonical bytes (postcard); deserialization through `ActionRegistry`
60    /// happens at dispatch time.
61    pub action_bytes: Vec<u8>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub(crate) struct Scheduler {
66    ready: BTreeMap<SchedKey, ScheduledEntry>,
67    by_id: BTreeMap<ScheduledActionId, SchedKey>,
68    by_actor: BTreeMap<EntityId, BTreeSet<ScheduledActionId>>,
69    /// Monotonic per kernel lifetime; same-tick FIFO discriminator.
70    next_seq: u64,
71    /// Monotonic ID counter (NonZeroU64 starts at 1).
72    next_id: u64,
73}
74
75impl Scheduler {
76    pub(crate) fn new() -> Self {
77        Self {
78            ready: BTreeMap::new(),
79            by_id: BTreeMap::new(),
80            by_actor: BTreeMap::new(),
81            next_seq: 0,
82            next_id: 0,
83        }
84    }
85
86    /// Insert into `ready` + `by_id` + `by_actor` atomically.
87    pub(crate) fn schedule(
88        &mut self,
89        at: Tick,
90        actor: Option<EntityId>,
91        principal: Principal,
92        type_code: TypeCode,
93        bytes: Vec<u8>,
94    ) -> ScheduledActionId {
95        self.next_id += 1;
96        let id = ScheduledActionId(
97            NonZeroU64::new(self.next_id).expect("next_id incremented before use; never zero"),
98        );
99
100        let seq = self.next_seq;
101        self.next_seq += 1;
102
103        let key = SchedKey { at, seq, id };
104        let entry = ScheduledEntry {
105            id,
106            at,
107            actor,
108            principal,
109            action_type_code: type_code,
110            action_bytes: bytes,
111        };
112
113        self.ready.insert(key, entry);
114        self.by_id.insert(id, key);
115        if let Some(actor_id) = actor {
116            self.by_actor.entry(actor_id).or_default().insert(id);
117        }
118
119        id
120    }
121
122    /// Schedule with a caller-provided `id` (e.g. when Kernel pre-allocates
123    /// the ScheduledActionId so it can be returned from `submit`). Internal
124    /// `next_id` is bumped so future auto-allocations stay monotonic.
125    pub(crate) fn schedule_with_id(
126        &mut self,
127        id: ScheduledActionId,
128        at: Tick,
129        actor: Option<EntityId>,
130        principal: Principal,
131        type_code: TypeCode,
132        bytes: Vec<u8>,
133    ) {
134        if id.get() > self.next_id {
135            self.next_id = id.get();
136        }
137
138        let seq = self.next_seq;
139        self.next_seq += 1;
140
141        let key = SchedKey { at, seq, id };
142        let entry = ScheduledEntry {
143            id,
144            at,
145            actor,
146            principal,
147            action_type_code: type_code,
148            action_bytes: bytes,
149        };
150
151        self.ready.insert(key, entry);
152        self.by_id.insert(id, key);
153        if let Some(actor_id) = actor {
154            self.by_actor.entry(actor_id).or_default().insert(id);
155        }
156    }
157
158    /// Immediate cancel — three-table consistent removal.
159    /// Returns the removed entry, or `None` if `id` was not scheduled
160    /// (collapsed `CancelMiss` semantics — never-scheduled,
161    /// already-executed, already-cancelled all return `None`).
162    pub(crate) fn cancel(&mut self, id: ScheduledActionId) -> Option<ScheduledEntry> {
163        let key = self.by_id.remove(&id)?;
164        let entry = self
165            .ready
166            .remove(&key)
167            .expect("ready/by_id consistency violated");
168        if let Some(actor_id) = entry.actor {
169            if let Some(set) = self.by_actor.get_mut(&actor_id) {
170                set.remove(&id);
171                if set.is_empty() {
172                    self.by_actor.remove(&actor_id);
173                }
174            }
175        }
176        Some(entry)
177    }
178
179    /// Bulk-cancel every entry owned by `actor`. Returns removed entries
180    /// in scheduler order (BTreeSet iteration over IDs is ascending).
181    /// Production wiring (entity-despawn cascade) lands with the
182    /// per-entity ownership refinement (deferred).
183    #[cfg_attr(not(test), allow(dead_code))]
184    pub(crate) fn cancel_by_actor(&mut self, actor: EntityId) -> Vec<ScheduledEntry> {
185        let Some(ids) = self.by_actor.remove(&actor) else {
186            return Vec::new();
187        };
188        let mut cancelled = Vec::with_capacity(ids.len());
189        for id in ids {
190            if let Some(key) = self.by_id.remove(&id) {
191                if let Some(entry) = self.ready.remove(&key) {
192                    cancelled.push(entry);
193                }
194            }
195        }
196        cancelled
197    }
198
199    /// Pop the earliest-due entry whose `at <= now`. Returns `None` if the
200    /// queue is empty or the head is in the future.
201    pub(crate) fn pop_due(&mut self, now: Tick) -> Option<ScheduledEntry> {
202        let (&key, _) = self.ready.first_key_value()?;
203        if key.at > now {
204            return None;
205        }
206        let entry = self
207            .ready
208            .remove(&key)
209            .expect("first_key_value just returned this key");
210        self.by_id.remove(&entry.id);
211        if let Some(actor_id) = entry.actor {
212            if let Some(set) = self.by_actor.get_mut(&actor_id) {
213                set.remove(&entry.id);
214                if set.is_empty() {
215                    self.by_actor.remove(&actor_id);
216                }
217            }
218        }
219        Some(entry)
220    }
221
222    // Test-only observability accessors. Production introspection wiring
223    // lands with the future IntrospectHandle interface (deferred).
224    #[cfg_attr(not(test), allow(dead_code))]
225    #[inline]
226    pub(crate) fn len(&self) -> usize {
227        self.ready.len()
228    }
229
230    #[cfg_attr(not(test), allow(dead_code))]
231    #[inline]
232    pub(crate) fn is_empty(&self) -> bool {
233        self.ready.is_empty()
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::abi::{EntityId, Tick, TypeCode};
241
242    fn p() -> Principal {
243        Principal::System
244    }
245    fn tc() -> TypeCode {
246        TypeCode(1)
247    }
248
249    #[test]
250    fn empty_state() {
251        let s = Scheduler::new();
252        assert_eq!(s.len(), 0);
253        assert!(s.is_empty());
254    }
255
256    #[test]
257    fn schedule_then_pop_due_single() {
258        let mut s = Scheduler::new();
259        let id = s.schedule(Tick(5), None, p(), tc(), vec![1, 2, 3]);
260        assert_eq!(s.len(), 1);
261        let entry = s.pop_due(Tick(5)).expect("entry due");
262        assert_eq!(entry.id, id);
263        assert_eq!(entry.at, Tick(5));
264        assert_eq!(entry.action_bytes, vec![1, 2, 3]);
265        assert!(s.is_empty());
266    }
267
268    #[test]
269    fn pop_due_before_time_returns_none() {
270        let mut s = Scheduler::new();
271        s.schedule(Tick(10), None, p(), tc(), vec![]);
272        assert!(s.pop_due(Tick(9)).is_none());
273        assert_eq!(s.len(), 1);
274    }
275
276    #[test]
277    fn pop_due_at_exact_tick_pops() {
278        let mut s = Scheduler::new();
279        let id = s.schedule(Tick(5), None, p(), tc(), vec![]);
280        assert_eq!(s.pop_due(Tick(5)).unwrap().id, id);
281    }
282
283    #[test]
284    fn pop_due_ordering_by_tick() {
285        let mut s = Scheduler::new();
286        let id_late = s.schedule(Tick(20), None, p(), tc(), vec![]);
287        let id_early = s.schedule(Tick(5), None, p(), tc(), vec![]);
288        let id_mid = s.schedule(Tick(10), None, p(), tc(), vec![]);
289        assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_early);
290        assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_mid);
291        assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_late);
292    }
293
294    #[test]
295    fn pop_due_tiebreak_by_seq() {
296        let mut s = Scheduler::new();
297        let id1 = s.schedule(Tick(5), None, p(), tc(), vec![1]);
298        let id2 = s.schedule(Tick(5), None, p(), tc(), vec![2]);
299        let id3 = s.schedule(Tick(5), None, p(), tc(), vec![3]);
300        assert_eq!(s.pop_due(Tick(5)).unwrap().id, id1);
301        assert_eq!(s.pop_due(Tick(5)).unwrap().id, id2);
302        assert_eq!(s.pop_due(Tick(5)).unwrap().id, id3);
303    }
304
305    #[test]
306    fn cancel_removes_entry() {
307        let mut s = Scheduler::new();
308        let id = s.schedule(Tick(5), None, p(), tc(), vec![]);
309        let cancelled = s.cancel(id).expect("found");
310        assert_eq!(cancelled.id, id);
311        assert!(s.is_empty());
312        assert!(s.pop_due(Tick(100)).is_none());
313    }
314
315    #[test]
316    fn cancel_unknown_returns_none() {
317        let mut s = Scheduler::new();
318        let bogus = ScheduledActionId::new(999).unwrap();
319        assert!(s.cancel(bogus).is_none());
320    }
321
322    #[test]
323    fn cancel_by_actor_removes_all() {
324        let mut s = Scheduler::new();
325        let actor = EntityId::new(1).unwrap();
326        let other = EntityId::new(2).unwrap();
327        let _ = s.schedule(Tick(5), Some(actor), p(), tc(), vec![]);
328        let _ = s.schedule(Tick(10), Some(actor), p(), tc(), vec![]);
329        let id_other = s.schedule(Tick(7), Some(other), p(), tc(), vec![]);
330        assert_eq!(s.len(), 3);
331        let cancelled = s.cancel_by_actor(actor);
332        assert_eq!(cancelled.len(), 2);
333        assert_eq!(s.len(), 1);
334        assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_other);
335    }
336
337    #[test]
338    fn cancel_by_actor_unknown_returns_empty() {
339        let mut s = Scheduler::new();
340        let actor = EntityId::new(99).unwrap();
341        assert!(s.cancel_by_actor(actor).is_empty());
342    }
343
344    #[test]
345    fn schedule_id_monotonic() {
346        let mut s = Scheduler::new();
347        let id1 = s.schedule(Tick(0), None, p(), tc(), vec![]);
348        let id2 = s.schedule(Tick(0), None, p(), tc(), vec![]);
349        let id3 = s.schedule(Tick(0), None, p(), tc(), vec![]);
350        assert!(id1 < id2);
351        assert!(id2 < id3);
352        assert_eq!(id1.get(), 1);
353        assert_eq!(id3.get(), 3);
354    }
355
356    #[test]
357    fn no_tombstones() {
358        // After cancel, len decrements immediately — no lazy deletion.
359        let mut s = Scheduler::new();
360        let id1 = s.schedule(Tick(5), None, p(), tc(), vec![]);
361        let _id2 = s.schedule(Tick(5), None, p(), tc(), vec![]);
362        assert_eq!(s.len(), 2);
363        s.cancel(id1);
364        assert_eq!(s.len(), 1);
365    }
366
367    #[test]
368    fn determinism_same_sequence() {
369        fn run() -> Vec<u64> {
370            let mut s = Scheduler::new();
371            s.schedule(Tick(3), None, p(), tc(), vec![]);
372            s.schedule(Tick(1), None, p(), tc(), vec![]);
373            s.schedule(Tick(2), None, p(), tc(), vec![]);
374            s.schedule(Tick(1), None, p(), tc(), vec![]);
375            let mut out = Vec::new();
376            while let Some(e) = s.pop_due(Tick(100)) {
377                out.push(e.id.get());
378            }
379            out
380        }
381        assert_eq!(run(), run());
382    }
383}