nexus_rt/scheduler.rs
1//! DAG scheduler with boolean propagation.
2//!
3//! The scheduler is installed as a **driver** via [`SchedulerInstaller`].
4//! After event handlers process incoming data and write to resources,
5//! the scheduler runs reconciliation [`System`]s
6//! in topological order. This two-phase pattern (event → reconcile)
7//! separates reactive logic from derived-state computation.
8//!
9//! Root systems (no upstream dependencies) always run. Non-root
10//! systems run only if at least one upstream system returned `true`.
11//!
12//! # Propagation model
13//!
14//! Each system returns `bool`. `true` means "my outputs changed, run
15//! downstream systems." `false` means "nothing changed, skip downstream."
16//! When a system has multiple upstreams, it runs if **any** upstream
17//! returned `true` (OR / `any` semantics).
18//!
19//! Propagation is checked via a `u64` bitmask — each system occupies one
20//! bit, and the upstream check is a single AND instruction. This limits
21//! the scheduler to [`MAX_SYSTEMS`] (64) systems.
22//!
23//! # Sequence mechanics
24//!
25//! The global sequence counter is event-only — the scheduler never
26//! calls [`next_sequence`](crate::World::next_sequence).
27//!
28//! # Invariants
29//!
30//! - **Topological order**: Systems execute in an order where all
31//! upstreams of a system have already completed. Ties are broken by
32//! insertion order (Kahn's algorithm with FIFO queue).
33//! - **No cycles**: The edge graph must be a DAG. Cycles panic at
34//! install time with a diagnostic message.
35//! - **Capacity**: At most [`MAX_SYSTEMS`] (64) systems. Exceeding
36//! this panics at install time.
37//! - **Deterministic**: Same inputs produce the same execution order
38//! and results. No randomness, no thread-dependent ordering.
39//! - **No sequence bump**: The scheduler never advances the global
40//! sequence. Event handlers own sequencing; the scheduler observes.
41//!
42//! # Examples
43//!
44//! ```
45//! use nexus_rt::{WorldBuilder, Res, ResMut, Installer, Resource};
46//! use nexus_rt::scheduler::SchedulerInstaller;
47//! use nexus_rt::system::IntoSystem;
48//!
49//! #[derive(Resource)]
50//! struct Val(u64);
51//!
52//! fn source(mut val: ResMut<Val>) -> bool {
53//! val.0 += 1;
54//! true
55//! }
56//!
57//! fn sink(val: Res<Val>) -> bool {
58//! val.0 > 0
59//! }
60//!
61//! let mut builder = WorldBuilder::new();
62//! builder.register(Val(0));
63//!
64//! let mut installer = SchedulerInstaller::new();
65//! let a = installer.add(source, builder.registry());
66//! let b = installer.add(sink, builder.registry());
67//! installer.after(b, a);
68//!
69//! let mut scheduler = builder.install_driver(installer);
70//! let mut world = builder.build();
71//!
72//! assert_eq!(scheduler.run(&mut world), 2);
73//! ```
74
75use std::collections::VecDeque;
76
77use crate::driver::Installer;
78use crate::system::{IntoSystem, System};
79use crate::world::{Registry, World, WorldBuilder};
80
81// =============================================================================
82// SystemId
83// =============================================================================
84
85/// Opaque handle identifying a system within a [`SchedulerInstaller`].
86///
87/// Used with [`after`](SchedulerInstaller::after) and
88/// [`before`](SchedulerInstaller::before) to declare ordering.
89#[derive(Clone, Copy, PartialEq, Eq, Debug)]
90pub struct SystemId(usize);
91
92// =============================================================================
93// SchedulerInstaller
94// =============================================================================
95
96/// Builder for a [`SystemScheduler`].
97///
98/// Systems are added with [`add`](Self::add) and ordering declared with
99/// [`after`](Self::after) / [`before`](Self::before). On
100/// [`install`](Installer::install), a topological sort (Kahn's algorithm
101/// with FIFO queue for deterministic insertion-order tie-breaking)
102/// determines execution order.
103///
104/// Implements [`Installer`] — consume via
105/// [`WorldBuilder::install_driver`](crate::WorldBuilder::install_driver).
106///
107/// # Examples
108///
109/// ```
110/// use nexus_rt::{WorldBuilder, Res, ResMut, Installer, Resource};
111/// use nexus_rt::scheduler::SchedulerInstaller;
112/// use nexus_rt::system::IntoSystem;
113///
114/// #[derive(Resource)]
115/// struct Val(u64);
116///
117/// fn step_a(mut val: ResMut<Val>) -> bool {
118/// val.0 += 1;
119/// true
120/// }
121///
122/// fn step_b(val: Res<Val>) -> bool {
123/// val.0 > 0
124/// }
125///
126/// let mut builder = WorldBuilder::new();
127/// builder.register(Val(0));
128///
129/// let mut installer = SchedulerInstaller::new();
130/// let a = installer.add(step_a, builder.registry());
131/// let b = installer.add(step_b, builder.registry());
132/// installer.after(b, a);
133///
134/// let mut scheduler = builder.install_driver(installer);
135/// let mut world = builder.build();
136///
137/// assert_eq!(scheduler.run(&mut world), 2);
138/// ```
139///
140/// # Panics
141///
142/// [`install`](Installer::install) panics if:
143/// - The declared edges form a cycle.
144/// - More than [`MAX_SYSTEMS`] (64) systems were added.
145pub struct SchedulerInstaller {
146 systems: Vec<Box<dyn System>>,
147 edges: Vec<(usize, usize)>, // (upstream, downstream)
148}
149
150impl SchedulerInstaller {
151 /// Create an empty installer.
152 pub fn new() -> Self {
153 Self {
154 systems: Vec::new(),
155 edges: Vec::new(),
156 }
157 }
158
159 /// Add a system, returning a [`SystemId`] for ordering.
160 ///
161 /// The function is converted via [`IntoSystem`] and parameters
162 /// are resolved from the registry immediately.
163 ///
164 /// # Panics
165 ///
166 /// Panics if any [`Param`](crate::Param) resource required by `f`
167 /// is not registered in the [`Registry`], or if parameters create
168 /// a conflicting access (e.g. `Res<T>` + `ResMut<T>`).
169 pub fn add<F, Params>(&mut self, f: F, registry: &Registry) -> SystemId
170 where
171 F: IntoSystem<Params>,
172 F::System: 'static,
173 {
174 let id = SystemId(self.systems.len());
175 self.systems.push(Box::new(f.into_system(registry)));
176 id
177 }
178
179 /// Declare that `downstream` runs after `upstream`.
180 ///
181 /// If `upstream` returns `false`, `downstream` is skipped — unless
182 /// another upstream of `downstream` returned `true` (`any` semantics).
183 pub fn after(&mut self, downstream: SystemId, upstream: SystemId) {
184 self.edges.push((upstream.0, downstream.0));
185 }
186
187 /// Declare that `upstream` runs before `downstream`.
188 ///
189 /// Equivalent to `self.after(downstream, upstream)`.
190 pub fn before(&mut self, upstream: SystemId, downstream: SystemId) {
191 self.after(downstream, upstream);
192 }
193}
194
195impl Default for SchedulerInstaller {
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201/// Maximum number of systems supported by the scheduler.
202///
203/// Propagation uses a `u64` bitmask where each bit represents one
204/// system's result. The upstream check for any system is a single AND
205/// against this bitmask — no iteration over dependency lists.
206///
207/// 64 systems is well beyond typical reconciliation DAG sizes. If a
208/// future use case requires more, the bitmask can be widened to `u128`
209/// or `[u64; N]` without changing the public API.
210pub const MAX_SYSTEMS: usize = 64;
211
212impl Installer for SchedulerInstaller {
213 type Poller = SystemScheduler;
214
215 fn install(self, _world: &mut WorldBuilder) -> SystemScheduler {
216 let n = self.systems.len();
217
218 assert!(
219 n <= MAX_SYSTEMS,
220 "system scheduler supports at most {MAX_SYSTEMS} systems, got {n}",
221 );
222
223 // Build adjacency list and in-degree counts.
224 let mut in_degree = vec![0usize; n];
225 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
226
227 for &(up, down) in &self.edges {
228 adj[up].push(down);
229 in_degree[down] += 1;
230 }
231
232 // Kahn's algorithm — FIFO queue for stable insertion-order.
233 let mut queue = VecDeque::new();
234 for (i, deg) in in_degree.iter().enumerate() {
235 if *deg == 0 {
236 queue.push_back(i);
237 }
238 }
239
240 let mut order: Vec<usize> = Vec::with_capacity(n);
241 while let Some(node) = queue.pop_front() {
242 order.push(node);
243 for &succ in &adj[node] {
244 in_degree[succ] -= 1;
245 if in_degree[succ] == 0 {
246 queue.push_back(succ);
247 }
248 }
249 }
250
251 assert!(
252 order.len() == n,
253 "cycle detected in system scheduler: {} systems in graph, \
254 but only {} reachable in topological order",
255 n,
256 order.len(),
257 );
258
259 // Build the old→new position mapping.
260 let mut old_to_new = vec![0usize; n];
261 for (new_pos, &old_pos) in order.iter().enumerate() {
262 old_to_new[old_pos] = new_pos;
263 }
264
265 // Reorder systems into topological order.
266 let mut sorted_systems: Vec<Option<Box<dyn System>>> =
267 self.systems.into_iter().map(Some).collect();
268 let systems: Vec<Box<dyn System>> = order
269 .iter()
270 .map(|&old_pos| sorted_systems[old_pos].take().unwrap())
271 .collect();
272
273 // Build upstream bitmasks: for each system in topological order,
274 // a u64 where bit j is set if system j is an upstream dependency.
275 // Roots have mask 0 (always run).
276 let mut upstream_masks = vec![0u64; n];
277 for &(up, down) in &self.edges {
278 upstream_masks[old_to_new[down]] |= 1 << old_to_new[up];
279 }
280
281 SystemScheduler {
282 systems,
283 upstream_masks,
284 }
285 }
286}
287
288// =============================================================================
289// SystemScheduler
290// =============================================================================
291
292/// DAG scheduler that runs systems in topological order with boolean
293/// propagation.
294///
295/// Created by [`SchedulerInstaller`] via
296/// [`WorldBuilder::install_driver`](crate::WorldBuilder::install_driver).
297/// This is a user-space driver — the caller decides when and whether
298/// to call [`run`](Self::run).
299///
300/// # Propagation
301///
302/// Root systems (no upstream) always run. Non-root systems run only if
303/// at least one upstream returned `true`.
304///
305/// # Bitmask implementation
306///
307/// Each system occupies one bit position in a `u64`. During
308/// [`run`](Self::run), a local `results: u64` accumulates which systems
309/// returned `true`. Each system's upstream check is:
310///
311/// ```text
312/// mask == 0 → root, always run
313/// (mask & results) != 0 → at least one upstream returned true
314/// ```
315///
316/// One load, one AND, one branch per system — no heap access for the
317/// propagation check itself. The `results` bitmask lives in a register
318/// for the duration of the loop.
319pub struct SystemScheduler {
320 systems: Vec<Box<dyn System>>,
321 /// Per-system: bit `j` set means system `j` is an upstream dependency.
322 /// `0` = root (always runs).
323 upstream_masks: Vec<u64>,
324}
325
326impl SystemScheduler {
327 /// Run all systems with boolean propagation.
328 ///
329 /// Iterates systems in topological order. For each system:
330 ///
331 /// 1. **Root** (`upstream_mask == 0`): always runs.
332 /// 2. **Non-root** (`upstream_mask & results != 0`): runs if any
333 /// upstream returned `true`. Skipped otherwise.
334 /// 3. If the system runs and returns `true`, its bit is set in
335 /// `results`, enabling downstream systems to run.
336 ///
337 /// The `results` bitmask is a local `u64` — the entire propagation
338 /// state fits in a single register. Per-system overhead is one
339 /// load + one AND + one branch (~4 cycles).
340 ///
341 /// Does NOT call [`next_sequence`](World::next_sequence) — the global
342 /// sequence is event-only.
343 ///
344 /// Returns the number of systems that actually ran.
345 pub fn run(&mut self, world: &mut World) -> usize {
346 let mut ran = 0;
347 let mut results: u64 = 0;
348
349 for i in 0..self.systems.len() {
350 let mask = self.upstream_masks[i];
351 if mask == 0 || (mask & results) != 0 {
352 if self.systems[i].run(world) {
353 results |= 1 << i;
354 }
355 ran += 1;
356 }
357 }
358
359 ran
360 }
361
362 /// Returns the number of systems in the scheduler.
363 pub fn len(&self) -> usize {
364 self.systems.len()
365 }
366
367 /// Returns `true` if the scheduler contains no systems.
368 pub fn is_empty(&self) -> bool {
369 self.systems.is_empty()
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use crate::ResMut;
377
378 // -- Empty scheduler --------------------------------------------------
379
380 #[test]
381 fn empty_scheduler() {
382 let mut builder = WorldBuilder::new();
383 let installer = SchedulerInstaller::new();
384 let mut scheduler = builder.install_driver(installer);
385 let mut world = builder.build();
386
387 assert_eq!(scheduler.run(&mut world), 0);
388 assert!(scheduler.is_empty());
389 }
390
391 // -- Single root ------------------------------------------------------
392
393 fn increment(mut val: ResMut<u64>) -> bool {
394 *val += 1;
395 true
396 }
397
398 #[test]
399 fn single_root_always_runs() {
400 let mut builder = WorldBuilder::new();
401 builder.register::<u64>(0);
402 let mut installer = SchedulerInstaller::new();
403 installer.add(increment, builder.registry());
404 let mut scheduler = builder.install_driver(installer);
405 let mut world = builder.build();
406
407 assert_eq!(scheduler.run(&mut world), 1);
408 assert_eq!(*world.resource::<u64>(), 1);
409 }
410
411 // -- Linear chain: A → B → C -----------------------------------------
412
413 fn source(mut val: ResMut<u64>) -> bool {
414 *val += 1;
415 *val <= 2 // propagate first two times
416 }
417
418 fn middle(mut val: ResMut<u64>) -> bool {
419 *val += 10;
420 true
421 }
422
423 fn leaf(mut val: ResMut<u64>) -> bool {
424 *val += 100;
425 true
426 }
427
428 #[test]
429 fn linear_chain_propagation() {
430 let mut builder = WorldBuilder::new();
431 builder.register::<u64>(0);
432 let mut installer = SchedulerInstaller::new();
433 let a = installer.add(source, builder.registry());
434 let b = installer.add(middle, builder.registry());
435 let c = installer.add(leaf, builder.registry());
436 installer.after(b, a);
437 installer.after(c, b);
438 let mut scheduler = builder.install_driver(installer);
439 let mut world = builder.build();
440
441 // Pass 1: source returns true → all 3 run
442 assert_eq!(scheduler.run(&mut world), 3);
443 // 0 + 1 + 10 + 100 = 111
444 assert_eq!(*world.resource::<u64>(), 111);
445 }
446
447 // -- Propagation stops ------------------------------------------------
448
449 fn false_source() -> bool {
450 false
451 }
452
453 fn should_not_run(mut val: ResMut<u64>) -> bool {
454 *val = 999;
455 true
456 }
457
458 #[test]
459 fn propagation_stops_on_false() {
460 let mut builder = WorldBuilder::new();
461 builder.register::<u64>(0);
462 let mut installer = SchedulerInstaller::new();
463 let a = installer.add(false_source, builder.registry());
464 let b = installer.add(should_not_run, builder.registry());
465 installer.after(b, a);
466 let mut scheduler = builder.install_driver(installer);
467 let mut world = builder.build();
468
469 // Only root runs, downstream skipped
470 assert_eq!(scheduler.run(&mut world), 1);
471 assert_eq!(*world.resource::<u64>(), 0);
472 }
473
474 // -- Diamond DAG: A → B, A → C, B → D, C → D -------------------------
475
476 fn set_flag(mut flag: ResMut<bool>) -> bool {
477 *flag = true;
478 true
479 }
480
481 #[test]
482 fn diamond_dag() {
483 let mut builder = WorldBuilder::new();
484 builder.register::<u64>(0);
485 builder.register::<bool>(false);
486 let mut installer = SchedulerInstaller::new();
487 let a = installer.add(increment, builder.registry());
488 let b = installer.add(increment, builder.registry());
489 let c = installer.add(set_flag, builder.registry());
490 let d = installer.add(increment, builder.registry());
491 installer.after(b, a);
492 installer.after(c, a);
493 installer.after(d, b);
494 installer.after(d, c);
495 let mut scheduler = builder.install_driver(installer);
496 let mut world = builder.build();
497
498 assert_eq!(scheduler.run(&mut world), 4);
499 assert!(*world.resource::<bool>());
500 // u64: increment runs 3 times (a, b, d) → 3
501 assert_eq!(*world.resource::<u64>(), 3);
502 }
503
504 // -- Multiple roots ---------------------------------------------------
505
506 #[test]
507 fn multiple_roots() {
508 let mut builder = WorldBuilder::new();
509 builder.register::<u64>(0);
510 let mut installer = SchedulerInstaller::new();
511 installer.add(increment, builder.registry());
512 installer.add(increment, builder.registry());
513 installer.add(increment, builder.registry());
514 let mut scheduler = builder.install_driver(installer);
515 let mut world = builder.build();
516
517 assert_eq!(scheduler.run(&mut world), 3);
518 assert_eq!(*world.resource::<u64>(), 3);
519 }
520
521 // -- Cycle detection --------------------------------------------------
522
523 #[test]
524 #[should_panic(expected = "cycle detected")]
525 fn cycle_panics() {
526 let mut builder = WorldBuilder::new();
527 let mut installer = SchedulerInstaller::new();
528 let a = installer.add(false_source, builder.registry());
529 let b = installer.add(false_source, builder.registry());
530 installer.after(b, a);
531 installer.after(a, b);
532 let _scheduler = builder.install_driver(installer);
533 }
534
535 // -- Sequence not bumped by scheduler ---------------------------------
536
537 #[test]
538 fn scheduler_does_not_bump_sequence() {
539 let mut builder = WorldBuilder::new();
540 builder.register::<u64>(0);
541 let mut installer = SchedulerInstaller::new();
542 installer.add(increment, builder.registry());
543 let mut scheduler = builder.install_driver(installer);
544 let mut world = builder.build();
545
546 let before = world.current_sequence();
547 scheduler.run(&mut world);
548 assert_eq!(world.current_sequence(), before);
549 }
550
551 // -- System mutations visible to later systems ------------------------
552
553 fn double(mut val: ResMut<u64>) -> bool {
554 *val *= 2;
555 true
556 }
557
558 #[test]
559 fn mutations_visible_downstream() {
560 let mut builder = WorldBuilder::new();
561 builder.register::<u64>(1);
562 let mut installer = SchedulerInstaller::new();
563 let a = installer.add(double, builder.registry());
564 let b = installer.add(double, builder.registry());
565 installer.after(b, a);
566 let mut scheduler = builder.install_driver(installer);
567 let mut world = builder.build();
568
569 scheduler.run(&mut world);
570 // 1 * 2 = 2, then 2 * 2 = 4
571 assert_eq!(*world.resource::<u64>(), 4);
572 }
573
574 // -- Capacity limit ---------------------------------------------------
575
576 #[test]
577 #[should_panic(expected = "at most 64 systems")]
578 fn exceeding_max_systems_panics() {
579 let mut builder = WorldBuilder::new();
580 let mut installer = SchedulerInstaller::new();
581 for _ in 0..=MAX_SYSTEMS {
582 installer.add(false_source, builder.registry());
583 }
584 let _scheduler = builder.install_driver(installer);
585 }
586}