use core::num::NonZeroU64;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use crate::abi::{EntityId, Principal, Tick, TypeCode};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ScheduledActionId(pub NonZeroU64);
impl ScheduledActionId {
#[inline]
pub const fn new(v: u64) -> Option<Self> {
match NonZeroU64::new(v) {
Some(n) => Some(Self(n)),
None => None,
}
}
#[inline]
pub const fn get(self) -> u64 {
self.0.get()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub(crate) struct SchedKey {
pub at: Tick,
pub seq: u64,
pub id: ScheduledActionId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ScheduledEntry {
pub id: ScheduledActionId,
pub at: Tick,
pub actor: Option<EntityId>,
pub principal: Principal,
pub action_type_code: TypeCode,
pub action_bytes: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct Scheduler {
ready: BTreeMap<SchedKey, ScheduledEntry>,
by_id: BTreeMap<ScheduledActionId, SchedKey>,
by_actor: BTreeMap<EntityId, BTreeSet<ScheduledActionId>>,
next_seq: u64,
next_id: u64,
}
impl Scheduler {
pub(crate) fn new() -> Self {
Self {
ready: BTreeMap::new(),
by_id: BTreeMap::new(),
by_actor: BTreeMap::new(),
next_seq: 0,
next_id: 0,
}
}
pub(crate) fn schedule(
&mut self,
at: Tick,
actor: Option<EntityId>,
principal: Principal,
type_code: TypeCode,
bytes: Vec<u8>,
) -> ScheduledActionId {
self.next_id += 1;
let id = ScheduledActionId(
NonZeroU64::new(self.next_id).expect("next_id incremented before use; never zero"),
);
let seq = self.next_seq;
self.next_seq += 1;
let key = SchedKey { at, seq, id };
let entry = ScheduledEntry {
id,
at,
actor,
principal,
action_type_code: type_code,
action_bytes: bytes,
};
self.ready.insert(key, entry);
self.by_id.insert(id, key);
if let Some(actor_id) = actor {
self.by_actor.entry(actor_id).or_default().insert(id);
}
id
}
pub(crate) fn schedule_with_id(
&mut self,
id: ScheduledActionId,
at: Tick,
actor: Option<EntityId>,
principal: Principal,
type_code: TypeCode,
bytes: Vec<u8>,
) {
if id.get() > self.next_id {
self.next_id = id.get();
}
let seq = self.next_seq;
self.next_seq += 1;
let key = SchedKey { at, seq, id };
let entry = ScheduledEntry {
id,
at,
actor,
principal,
action_type_code: type_code,
action_bytes: bytes,
};
self.ready.insert(key, entry);
self.by_id.insert(id, key);
if let Some(actor_id) = actor {
self.by_actor.entry(actor_id).or_default().insert(id);
}
}
pub(crate) fn cancel(&mut self, id: ScheduledActionId) -> Option<ScheduledEntry> {
let key = self.by_id.remove(&id)?;
let entry = self
.ready
.remove(&key)
.expect("ready/by_id consistency violated");
if let Some(actor_id) = entry.actor {
if let Some(set) = self.by_actor.get_mut(&actor_id) {
set.remove(&id);
if set.is_empty() {
self.by_actor.remove(&actor_id);
}
}
}
Some(entry)
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn cancel_by_actor(&mut self, actor: EntityId) -> Vec<ScheduledEntry> {
let Some(ids) = self.by_actor.remove(&actor) else {
return Vec::new();
};
let mut cancelled = Vec::with_capacity(ids.len());
for id in ids {
if let Some(key) = self.by_id.remove(&id) {
if let Some(entry) = self.ready.remove(&key) {
cancelled.push(entry);
}
}
}
cancelled
}
pub(crate) fn pop_due(&mut self, now: Tick) -> Option<ScheduledEntry> {
let (&key, _) = self.ready.first_key_value()?;
if key.at > now {
return None;
}
let entry = self
.ready
.remove(&key)
.expect("first_key_value just returned this key");
self.by_id.remove(&entry.id);
if let Some(actor_id) = entry.actor {
if let Some(set) = self.by_actor.get_mut(&actor_id) {
set.remove(&entry.id);
if set.is_empty() {
self.by_actor.remove(&actor_id);
}
}
}
Some(entry)
}
#[cfg_attr(not(test), allow(dead_code))]
#[inline]
pub(crate) fn len(&self) -> usize {
self.ready.len()
}
#[cfg_attr(not(test), allow(dead_code))]
#[inline]
pub(crate) fn is_empty(&self) -> bool {
self.ready.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::abi::{EntityId, Tick, TypeCode};
fn p() -> Principal {
Principal::System
}
fn tc() -> TypeCode {
TypeCode(1)
}
#[test]
fn empty_state() {
let s = Scheduler::new();
assert_eq!(s.len(), 0);
assert!(s.is_empty());
}
#[test]
fn schedule_then_pop_due_single() {
let mut s = Scheduler::new();
let id = s.schedule(Tick(5), None, p(), tc(), vec![1, 2, 3]);
assert_eq!(s.len(), 1);
let entry = s.pop_due(Tick(5)).expect("entry due");
assert_eq!(entry.id, id);
assert_eq!(entry.at, Tick(5));
assert_eq!(entry.action_bytes, vec![1, 2, 3]);
assert!(s.is_empty());
}
#[test]
fn pop_due_before_time_returns_none() {
let mut s = Scheduler::new();
s.schedule(Tick(10), None, p(), tc(), vec![]);
assert!(s.pop_due(Tick(9)).is_none());
assert_eq!(s.len(), 1);
}
#[test]
fn pop_due_at_exact_tick_pops() {
let mut s = Scheduler::new();
let id = s.schedule(Tick(5), None, p(), tc(), vec![]);
assert_eq!(s.pop_due(Tick(5)).unwrap().id, id);
}
#[test]
fn pop_due_ordering_by_tick() {
let mut s = Scheduler::new();
let id_late = s.schedule(Tick(20), None, p(), tc(), vec![]);
let id_early = s.schedule(Tick(5), None, p(), tc(), vec![]);
let id_mid = s.schedule(Tick(10), None, p(), tc(), vec![]);
assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_early);
assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_mid);
assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_late);
}
#[test]
fn pop_due_tiebreak_by_seq() {
let mut s = Scheduler::new();
let id1 = s.schedule(Tick(5), None, p(), tc(), vec![1]);
let id2 = s.schedule(Tick(5), None, p(), tc(), vec![2]);
let id3 = s.schedule(Tick(5), None, p(), tc(), vec![3]);
assert_eq!(s.pop_due(Tick(5)).unwrap().id, id1);
assert_eq!(s.pop_due(Tick(5)).unwrap().id, id2);
assert_eq!(s.pop_due(Tick(5)).unwrap().id, id3);
}
#[test]
fn cancel_removes_entry() {
let mut s = Scheduler::new();
let id = s.schedule(Tick(5), None, p(), tc(), vec![]);
let cancelled = s.cancel(id).expect("found");
assert_eq!(cancelled.id, id);
assert!(s.is_empty());
assert!(s.pop_due(Tick(100)).is_none());
}
#[test]
fn cancel_unknown_returns_none() {
let mut s = Scheduler::new();
let bogus = ScheduledActionId::new(999).unwrap();
assert!(s.cancel(bogus).is_none());
}
#[test]
fn cancel_by_actor_removes_all() {
let mut s = Scheduler::new();
let actor = EntityId::new(1).unwrap();
let other = EntityId::new(2).unwrap();
let _ = s.schedule(Tick(5), Some(actor), p(), tc(), vec![]);
let _ = s.schedule(Tick(10), Some(actor), p(), tc(), vec![]);
let id_other = s.schedule(Tick(7), Some(other), p(), tc(), vec![]);
assert_eq!(s.len(), 3);
let cancelled = s.cancel_by_actor(actor);
assert_eq!(cancelled.len(), 2);
assert_eq!(s.len(), 1);
assert_eq!(s.pop_due(Tick(100)).unwrap().id, id_other);
}
#[test]
fn cancel_by_actor_unknown_returns_empty() {
let mut s = Scheduler::new();
let actor = EntityId::new(99).unwrap();
assert!(s.cancel_by_actor(actor).is_empty());
}
#[test]
fn schedule_id_monotonic() {
let mut s = Scheduler::new();
let id1 = s.schedule(Tick(0), None, p(), tc(), vec![]);
let id2 = s.schedule(Tick(0), None, p(), tc(), vec![]);
let id3 = s.schedule(Tick(0), None, p(), tc(), vec![]);
assert!(id1 < id2);
assert!(id2 < id3);
assert_eq!(id1.get(), 1);
assert_eq!(id3.get(), 3);
}
#[test]
fn no_tombstones() {
let mut s = Scheduler::new();
let id1 = s.schedule(Tick(5), None, p(), tc(), vec![]);
let _id2 = s.schedule(Tick(5), None, p(), tc(), vec![]);
assert_eq!(s.len(), 2);
s.cancel(id1);
assert_eq!(s.len(), 1);
}
#[test]
fn determinism_same_sequence() {
fn run() -> Vec<u64> {
let mut s = Scheduler::new();
s.schedule(Tick(3), None, p(), tc(), vec![]);
s.schedule(Tick(1), None, p(), tc(), vec![]);
s.schedule(Tick(2), None, p(), tc(), vec![]);
s.schedule(Tick(1), None, p(), tc(), vec![]);
let mut out = Vec::new();
while let Some(e) = s.pop_due(Tick(100)) {
out.push(e.id.get());
}
out
}
assert_eq!(run(), run());
}
}