Skip to main content

nexus_rt/
scheduler.rs

1//! DAG scheduler with boolean propagation.
2//!
3//! The scheduler is installed as a **driver** via [`SchedulerInstaller`].
4//! After event handlers process incoming data and write to resources,
5//! the scheduler runs reconciliation [`System`]s
6//! in topological order. This two-phase pattern (event → reconcile)
7//! separates reactive logic from derived-state computation.
8//!
9//! Root systems (no upstream dependencies) always run. Non-root
10//! systems run only if at least one upstream system returned `true`.
11//!
12//! # Propagation model
13//!
14//! Each system returns `bool`. `true` means "my outputs changed, run
15//! downstream systems." `false` means "nothing changed, skip downstream."
16//! When a system has multiple upstreams, it runs if **any** upstream
17//! returned `true` (OR / `any` semantics).
18//!
19//! Propagation is checked via a `u64` bitmask — each system occupies one
20//! bit, and the upstream check is a single AND instruction. This limits
21//! the scheduler to [`MAX_SYSTEMS`] (64) systems.
22//!
23//! # Sequence mechanics
24//!
25//! The global sequence counter is event-only — the scheduler never
26//! calls [`next_sequence`](crate::World::next_sequence).
27//!
28//! # Invariants
29//!
30//! - **Topological order**: Systems execute in an order where all
31//!   upstreams of a system have already completed. Ties are broken by
32//!   insertion order (Kahn's algorithm with FIFO queue).
33//! - **No cycles**: The edge graph must be a DAG. Cycles panic at
34//!   install time with a diagnostic message.
35//! - **Capacity**: At most [`MAX_SYSTEMS`] (64) systems. Exceeding
36//!   this panics at install time.
37//! - **Deterministic**: Same inputs produce the same execution order
38//!   and results. No randomness, no thread-dependent ordering.
39//! - **No sequence bump**: The scheduler never advances the global
40//!   sequence. Event handlers own sequencing; the scheduler observes.
41//!
42//! # Examples
43//!
44//! ```
45//! use nexus_rt::{WorldBuilder, Res, ResMut, Installer, Resource};
46//! use nexus_rt::scheduler::SchedulerInstaller;
47//! use nexus_rt::system::IntoSystem;
48//!
49//! #[derive(Resource)]
50//! struct Val(u64);
51//!
52//! fn source(mut val: ResMut<Val>) -> bool {
53//!     val.0 += 1;
54//!     true
55//! }
56//!
57//! fn sink(val: Res<Val>) -> bool {
58//!     val.0 > 0
59//! }
60//!
61//! let mut builder = WorldBuilder::new();
62//! builder.register(Val(0));
63//!
64//! let mut installer = SchedulerInstaller::new();
65//! let a = installer.add(source, builder.registry());
66//! let b = installer.add(sink, builder.registry());
67//! installer.after(b, a);
68//!
69//! let mut scheduler = builder.install_driver(installer);
70//! let mut world = builder.build();
71//!
72//! assert_eq!(scheduler.run(&mut world), 2);
73//! ```
74
75use std::collections::VecDeque;
76
77use crate::driver::Installer;
78use crate::system::{IntoSystem, System};
79use crate::world::{Registry, World, WorldBuilder};
80
81// =============================================================================
82// SystemId
83// =============================================================================
84
85/// Opaque handle identifying a system within a [`SchedulerInstaller`].
86///
87/// Used with [`after`](SchedulerInstaller::after) and
88/// [`before`](SchedulerInstaller::before) to declare ordering.
89#[derive(Clone, Copy, PartialEq, Eq, Debug)]
90pub struct SystemId(usize);
91
92// =============================================================================
93// SchedulerInstaller
94// =============================================================================
95
96/// Builder for a [`SystemScheduler`].
97///
98/// Systems are added with [`add`](Self::add) and ordering declared with
99/// [`after`](Self::after) / [`before`](Self::before). On
100/// [`install`](Installer::install), a topological sort (Kahn's algorithm
101/// with FIFO queue for deterministic insertion-order tie-breaking)
102/// determines execution order.
103///
104/// Implements [`Installer`] — consume via
105/// [`WorldBuilder::install_driver`](crate::WorldBuilder::install_driver).
106///
107/// # Examples
108///
109/// ```
110/// use nexus_rt::{WorldBuilder, Res, ResMut, Installer, Resource};
111/// use nexus_rt::scheduler::SchedulerInstaller;
112/// use nexus_rt::system::IntoSystem;
113///
114/// #[derive(Resource)]
115/// struct Val(u64);
116///
117/// fn step_a(mut val: ResMut<Val>) -> bool {
118///     val.0 += 1;
119///     true
120/// }
121///
122/// fn step_b(val: Res<Val>) -> bool {
123///     val.0 > 0
124/// }
125///
126/// let mut builder = WorldBuilder::new();
127/// builder.register(Val(0));
128///
129/// let mut installer = SchedulerInstaller::new();
130/// let a = installer.add(step_a, builder.registry());
131/// let b = installer.add(step_b, builder.registry());
132/// installer.after(b, a);
133///
134/// let mut scheduler = builder.install_driver(installer);
135/// let mut world = builder.build();
136///
137/// assert_eq!(scheduler.run(&mut world), 2);
138/// ```
139///
140/// # Panics
141///
142/// [`install`](Installer::install) panics if:
143/// - The declared edges form a cycle.
144/// - More than [`MAX_SYSTEMS`] (64) systems were added.
145pub struct SchedulerInstaller {
146    systems: Vec<Box<dyn System>>,
147    edges: Vec<(usize, usize)>, // (upstream, downstream)
148}
149
150impl SchedulerInstaller {
151    /// Create an empty installer.
152    pub fn new() -> Self {
153        Self {
154            systems: Vec::new(),
155            edges: Vec::new(),
156        }
157    }
158
159    /// Add a system, returning a [`SystemId`] for ordering.
160    ///
161    /// The function is converted via [`IntoSystem`] and parameters
162    /// are resolved from the registry immediately.
163    ///
164    /// # Panics
165    ///
166    /// Panics if any [`Param`](crate::Param) resource required by `f`
167    /// is not registered in the [`Registry`], or if parameters create
168    /// a conflicting access (e.g. `Res<T>` + `ResMut<T>`).
169    pub fn add<F, Params>(&mut self, f: F, registry: &Registry) -> SystemId
170    where
171        F: IntoSystem<Params>,
172        F::System: 'static,
173    {
174        let id = SystemId(self.systems.len());
175        self.systems.push(Box::new(f.into_system(registry)));
176        id
177    }
178
179    /// Declare that `downstream` runs after `upstream`.
180    ///
181    /// If `upstream` returns `false`, `downstream` is skipped — unless
182    /// another upstream of `downstream` returned `true` (`any` semantics).
183    pub fn after(&mut self, downstream: SystemId, upstream: SystemId) {
184        self.edges.push((upstream.0, downstream.0));
185    }
186
187    /// Declare that `upstream` runs before `downstream`.
188    ///
189    /// Equivalent to `self.after(downstream, upstream)`.
190    pub fn before(&mut self, upstream: SystemId, downstream: SystemId) {
191        self.after(downstream, upstream);
192    }
193}
194
195impl Default for SchedulerInstaller {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201/// Maximum number of systems supported by the scheduler.
202///
203/// Propagation uses a `u64` bitmask where each bit represents one
204/// system's result. The upstream check for any system is a single AND
205/// against this bitmask — no iteration over dependency lists.
206///
207/// 64 systems is well beyond typical reconciliation DAG sizes. If a
208/// future use case requires more, the bitmask can be widened to `u128`
209/// or `[u64; N]` without changing the public API.
210pub const MAX_SYSTEMS: usize = 64;
211
212impl Installer for SchedulerInstaller {
213    type Poller = SystemScheduler;
214
215    fn install(self, _world: &mut WorldBuilder) -> SystemScheduler {
216        let n = self.systems.len();
217
218        assert!(
219            n <= MAX_SYSTEMS,
220            "system scheduler supports at most {MAX_SYSTEMS} systems, got {n}",
221        );
222
223        // Build adjacency list and in-degree counts.
224        let mut in_degree = vec![0usize; n];
225        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
226
227        for &(up, down) in &self.edges {
228            adj[up].push(down);
229            in_degree[down] += 1;
230        }
231
232        // Kahn's algorithm — FIFO queue for stable insertion-order.
233        let mut queue = VecDeque::new();
234        for (i, deg) in in_degree.iter().enumerate() {
235            if *deg == 0 {
236                queue.push_back(i);
237            }
238        }
239
240        let mut order: Vec<usize> = Vec::with_capacity(n);
241        while let Some(node) = queue.pop_front() {
242            order.push(node);
243            for &succ in &adj[node] {
244                in_degree[succ] -= 1;
245                if in_degree[succ] == 0 {
246                    queue.push_back(succ);
247                }
248            }
249        }
250
251        assert!(
252            order.len() == n,
253            "cycle detected in system scheduler: {} systems in graph, \
254             but only {} reachable in topological order",
255            n,
256            order.len(),
257        );
258
259        // Build the old→new position mapping.
260        let mut old_to_new = vec![0usize; n];
261        for (new_pos, &old_pos) in order.iter().enumerate() {
262            old_to_new[old_pos] = new_pos;
263        }
264
265        // Reorder systems into topological order.
266        let mut sorted_systems: Vec<Option<Box<dyn System>>> =
267            self.systems.into_iter().map(Some).collect();
268        let systems: Vec<Box<dyn System>> = order
269            .iter()
270            .map(|&old_pos| sorted_systems[old_pos].take().unwrap())
271            .collect();
272
273        // Build upstream bitmasks: for each system in topological order,
274        // a u64 where bit j is set if system j is an upstream dependency.
275        // Roots have mask 0 (always run).
276        let mut upstream_masks = vec![0u64; n];
277        for &(up, down) in &self.edges {
278            upstream_masks[old_to_new[down]] |= 1 << old_to_new[up];
279        }
280
281        SystemScheduler {
282            systems,
283            upstream_masks,
284        }
285    }
286}
287
288// =============================================================================
289// SystemScheduler
290// =============================================================================
291
292/// DAG scheduler that runs systems in topological order with boolean
293/// propagation.
294///
295/// Created by [`SchedulerInstaller`] via
296/// [`WorldBuilder::install_driver`](crate::WorldBuilder::install_driver).
297/// This is a user-space driver — the caller decides when and whether
298/// to call [`run`](Self::run).
299///
300/// # Propagation
301///
302/// Root systems (no upstream) always run. Non-root systems run only if
303/// at least one upstream returned `true`.
304///
305/// # Bitmask implementation
306///
307/// Each system occupies one bit position in a `u64`. During
308/// [`run`](Self::run), a local `results: u64` accumulates which systems
309/// returned `true`. Each system's upstream check is:
310///
311/// ```text
312/// mask == 0              → root, always run
313/// (mask & results) != 0  → at least one upstream returned true
314/// ```
315///
316/// One load, one AND, one branch per system — no heap access for the
317/// propagation check itself. The `results` bitmask lives in a register
318/// for the duration of the loop.
319pub struct SystemScheduler {
320    systems: Vec<Box<dyn System>>,
321    /// Per-system: bit `j` set means system `j` is an upstream dependency.
322    /// `0` = root (always runs).
323    upstream_masks: Vec<u64>,
324}
325
326impl SystemScheduler {
327    /// Run all systems with boolean propagation.
328    ///
329    /// Iterates systems in topological order. For each system:
330    ///
331    /// 1. **Root** (`upstream_mask == 0`): always runs.
332    /// 2. **Non-root** (`upstream_mask & results != 0`): runs if any
333    ///    upstream returned `true`. Skipped otherwise.
334    /// 3. If the system runs and returns `true`, its bit is set in
335    ///    `results`, enabling downstream systems to run.
336    ///
337    /// The `results` bitmask is a local `u64` — the entire propagation
338    /// state fits in a single register. Per-system overhead is one
339    /// load + one AND + one branch (~4 cycles).
340    ///
341    /// Does NOT call [`next_sequence`](World::next_sequence) — the global
342    /// sequence is event-only.
343    ///
344    /// Returns the number of systems that actually ran.
345    pub fn run(&mut self, world: &mut World) -> usize {
346        let mut ran = 0;
347        let mut results: u64 = 0;
348
349        for i in 0..self.systems.len() {
350            let mask = self.upstream_masks[i];
351            if mask == 0 || (mask & results) != 0 {
352                if self.systems[i].run(world) {
353                    results |= 1 << i;
354                }
355                ran += 1;
356            }
357        }
358
359        ran
360    }
361
362    /// Returns the number of systems in the scheduler.
363    pub fn len(&self) -> usize {
364        self.systems.len()
365    }
366
367    /// Returns `true` if the scheduler contains no systems.
368    pub fn is_empty(&self) -> bool {
369        self.systems.is_empty()
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use crate::ResMut;
377
378    // -- Empty scheduler --------------------------------------------------
379
380    #[test]
381    fn empty_scheduler() {
382        let mut builder = WorldBuilder::new();
383        let installer = SchedulerInstaller::new();
384        let mut scheduler = builder.install_driver(installer);
385        let mut world = builder.build();
386
387        assert_eq!(scheduler.run(&mut world), 0);
388        assert!(scheduler.is_empty());
389    }
390
391    // -- Single root ------------------------------------------------------
392
393    fn increment(mut val: ResMut<u64>) -> bool {
394        *val += 1;
395        true
396    }
397
398    #[test]
399    fn single_root_always_runs() {
400        let mut builder = WorldBuilder::new();
401        builder.register::<u64>(0);
402        let mut installer = SchedulerInstaller::new();
403        installer.add(increment, builder.registry());
404        let mut scheduler = builder.install_driver(installer);
405        let mut world = builder.build();
406
407        assert_eq!(scheduler.run(&mut world), 1);
408        assert_eq!(*world.resource::<u64>(), 1);
409    }
410
411    // -- Linear chain: A → B → C -----------------------------------------
412
413    fn source(mut val: ResMut<u64>) -> bool {
414        *val += 1;
415        *val <= 2 // propagate first two times
416    }
417
418    fn middle(mut val: ResMut<u64>) -> bool {
419        *val += 10;
420        true
421    }
422
423    fn leaf(mut val: ResMut<u64>) -> bool {
424        *val += 100;
425        true
426    }
427
428    #[test]
429    fn linear_chain_propagation() {
430        let mut builder = WorldBuilder::new();
431        builder.register::<u64>(0);
432        let mut installer = SchedulerInstaller::new();
433        let a = installer.add(source, builder.registry());
434        let b = installer.add(middle, builder.registry());
435        let c = installer.add(leaf, builder.registry());
436        installer.after(b, a);
437        installer.after(c, b);
438        let mut scheduler = builder.install_driver(installer);
439        let mut world = builder.build();
440
441        // Pass 1: source returns true → all 3 run
442        assert_eq!(scheduler.run(&mut world), 3);
443        // 0 + 1 + 10 + 100 = 111
444        assert_eq!(*world.resource::<u64>(), 111);
445    }
446
447    // -- Propagation stops ------------------------------------------------
448
449    fn false_source() -> bool {
450        false
451    }
452
453    fn should_not_run(mut val: ResMut<u64>) -> bool {
454        *val = 999;
455        true
456    }
457
458    #[test]
459    fn propagation_stops_on_false() {
460        let mut builder = WorldBuilder::new();
461        builder.register::<u64>(0);
462        let mut installer = SchedulerInstaller::new();
463        let a = installer.add(false_source, builder.registry());
464        let b = installer.add(should_not_run, builder.registry());
465        installer.after(b, a);
466        let mut scheduler = builder.install_driver(installer);
467        let mut world = builder.build();
468
469        // Only root runs, downstream skipped
470        assert_eq!(scheduler.run(&mut world), 1);
471        assert_eq!(*world.resource::<u64>(), 0);
472    }
473
474    // -- Diamond DAG: A → B, A → C, B → D, C → D -------------------------
475
476    fn set_flag(mut flag: ResMut<bool>) -> bool {
477        *flag = true;
478        true
479    }
480
481    #[test]
482    fn diamond_dag() {
483        let mut builder = WorldBuilder::new();
484        builder.register::<u64>(0);
485        builder.register::<bool>(false);
486        let mut installer = SchedulerInstaller::new();
487        let a = installer.add(increment, builder.registry());
488        let b = installer.add(increment, builder.registry());
489        let c = installer.add(set_flag, builder.registry());
490        let d = installer.add(increment, builder.registry());
491        installer.after(b, a);
492        installer.after(c, a);
493        installer.after(d, b);
494        installer.after(d, c);
495        let mut scheduler = builder.install_driver(installer);
496        let mut world = builder.build();
497
498        assert_eq!(scheduler.run(&mut world), 4);
499        assert!(*world.resource::<bool>());
500        // u64: increment runs 3 times (a, b, d) → 3
501        assert_eq!(*world.resource::<u64>(), 3);
502    }
503
504    // -- Multiple roots ---------------------------------------------------
505
506    #[test]
507    fn multiple_roots() {
508        let mut builder = WorldBuilder::new();
509        builder.register::<u64>(0);
510        let mut installer = SchedulerInstaller::new();
511        installer.add(increment, builder.registry());
512        installer.add(increment, builder.registry());
513        installer.add(increment, builder.registry());
514        let mut scheduler = builder.install_driver(installer);
515        let mut world = builder.build();
516
517        assert_eq!(scheduler.run(&mut world), 3);
518        assert_eq!(*world.resource::<u64>(), 3);
519    }
520
521    // -- Cycle detection --------------------------------------------------
522
523    #[test]
524    #[should_panic(expected = "cycle detected")]
525    fn cycle_panics() {
526        let mut builder = WorldBuilder::new();
527        let mut installer = SchedulerInstaller::new();
528        let a = installer.add(false_source, builder.registry());
529        let b = installer.add(false_source, builder.registry());
530        installer.after(b, a);
531        installer.after(a, b);
532        let _scheduler = builder.install_driver(installer);
533    }
534
535    // -- Sequence not bumped by scheduler ---------------------------------
536
537    #[test]
538    fn scheduler_does_not_bump_sequence() {
539        let mut builder = WorldBuilder::new();
540        builder.register::<u64>(0);
541        let mut installer = SchedulerInstaller::new();
542        installer.add(increment, builder.registry());
543        let mut scheduler = builder.install_driver(installer);
544        let mut world = builder.build();
545
546        let before = world.current_sequence();
547        scheduler.run(&mut world);
548        assert_eq!(world.current_sequence(), before);
549    }
550
551    // -- System mutations visible to later systems ------------------------
552
553    fn double(mut val: ResMut<u64>) -> bool {
554        *val *= 2;
555        true
556    }
557
558    #[test]
559    fn mutations_visible_downstream() {
560        let mut builder = WorldBuilder::new();
561        builder.register::<u64>(1);
562        let mut installer = SchedulerInstaller::new();
563        let a = installer.add(double, builder.registry());
564        let b = installer.add(double, builder.registry());
565        installer.after(b, a);
566        let mut scheduler = builder.install_driver(installer);
567        let mut world = builder.build();
568
569        scheduler.run(&mut world);
570        // 1 * 2 = 2, then 2 * 2 = 4
571        assert_eq!(*world.resource::<u64>(), 4);
572    }
573
574    // -- Capacity limit ---------------------------------------------------
575
576    #[test]
577    #[should_panic(expected = "at most 64 systems")]
578    fn exceeding_max_systems_panics() {
579        let mut builder = WorldBuilder::new();
580        let mut installer = SchedulerInstaller::new();
581        for _ in 0..=MAX_SYSTEMS {
582            installer.add(false_source, builder.registry());
583        }
584        let _scheduler = builder.install_driver(installer);
585    }
586}