Skip to main content

jonmo/
graph.rs

1//! Signal graph management and runtime.
2//!
3//! This module contains the core infrastructure for [jonmo](crate)'s reactive signal graph,
4//! including graph topology tracking, signal registration, multi-schedule processing, and
5//! the per-frame processing loop.
6//!
7//! # Graph Structure
8//!
9//! The signal graph is a directed acyclic graph (DAG) where each node is backed by a Bevy
10//! [`System`]. Edges represent data dependencies: when an upstream signal produces output, that
11//! output becomes the input to its downstream signals. Cycles are detected and rejected at
12//! edge-creation time.
13//!
14//! # Multi-Schedule Processing
15//!
16//! Signals can be assigned to run in different Bevy schedules (e.g., `Update`, `PostUpdate`)
17//! using [`SignalExt::schedule`](super::signal::SignalExt::schedule). This enables fine-grained
18//! control over when signals execute within each frame.
19//!
20//! - **Schedule assignment**: Each signal can be assigned to a specific schedule. Unassigned
21//!   signals inherit the schedule from upstream signals or default to the
22//!   [`JonmoPlugin`](crate::JonmoPlugin)'s default schedule, see
23//!   [`SignalExt::schedule`](super::signal::SignalExt::schedule) for the exact semantics.
24//! - **Cross-schedule data flow**: Signals in different schedules can still be connected. Outputs
25//!   from earlier schedules are available as inputs to signals in later schedules.
26//! - **Per-schedule graph partitioning**: The graph is partitioned by schedule, with each schedule
27//!   processing only its assigned signals in topological order.
28//!
29//! # Processing Semantics
30//!
31//! Each frame, within each configured schedule, signals are processed in topological order:
32//!
33//! 1. **Level assignment**: Each signal is assigned a level equal to 1 + the maximum level of its
34//!    upstreams (roots have level 0). This is recomputed incrementally when edges change.
35//! 2. **Level-order execution**: Signals are processed level by level, lowest first, ensuring every
36//!    signal's upstreams have already run before it executes. Within each level, signals are
37//!    processed in deterministic order (sorted by entity index) for reproducible behavior.
38//! 3. **Output forwarding**: When a signal produces [`Some`] value, that value is forwarded as
39//!    input to all downstream signals. Returning [`None`] terminates propagation for that branch.
40//! 4. **Dynamic registration**: Signals registered during processing (e.g., from UI element
41//!    spawning) are integrated and processed within the same frame, avoiding one-frame delays.
42//!
43//! A signal with multiple upstreams runs **once per upstream** that fires in a given frame. This
44//! allows a signal to act as a collection point, processing each upstream's output in turn.
45//! However, only the **final output** (from the last run) is forwarded to downstream signals,
46//! ensuring that downstreams see a single, consolidated result rather than receiving multiple
47//! inputs.
48//!
49//! # Lifecycle
50//!
51//! Signal systems are usage-tracked; each call to [`.register`](super::signal::SignalExt::register)
52//! increments a registration count on the signal and its upstreams. To release a signal,
53//! [`SignalHandle::cleanup`] must be called, either explicitly or implicitly by storing handles in
54//! a [`SignalHandles`] component (which calls cleanup when the entity is despawned). When a
55//! signal's registration count reaches zero and no downstream dependents remain, the signal's
56//! backing system is automatically despawned.
57//!
58//! # Polling
59//!
60//! In addition to the standard push-based flow, signals can be polled synchronously to retrieve
61//! their most recent output. This is useful when a system needs to read signal state on-demand
62//! rather than receiving it as pushed input, see [`poll_signal`].
63use alloc::collections::VecDeque;
64use bevy_app::PostUpdate;
65use bevy_derive::Deref;
66use bevy_ecs::{
67    lifecycle::HookContext,
68    prelude::*,
69    query::{QueryData, QueryFilter},
70    schedule::{InternedScheduleLabel, ScheduleLabel},
71    system::{SystemId, SystemState},
72    world::DeferredWorld,
73};
74use bevy_platform::{
75    collections::{HashMap, HashSet},
76    prelude::*,
77    sync::{
78        Arc, LazyLock, Mutex, RwLock,
79        atomic::{AtomicUsize, Ordering},
80    },
81};
82use core::{any::Any, hash::Hash, marker::PhantomData};
83use dyn_clone::{DynClone, clone_trait_object};
84
85/// Newtype wrapper for [`Entity`]s that hold systems in the signal graph.
86#[derive(Clone, Copy, Deref, Debug, PartialEq, Eq, Hash)]
87pub struct SignalSystem(pub Entity);
88
89impl From<Entity> for SignalSystem {
90    fn from(entity: Entity) -> Self {
91        Self(entity)
92    }
93}
94
95impl<I: 'static, O> From<SystemId<In<I>, O>> for SignalSystem {
96    fn from(system_id: SystemId<In<I>, O>) -> Self {
97        system_id.entity().into()
98    }
99}
100
101#[derive(Component, Deref)]
102pub(crate) struct SignalRegistrationCount(i32);
103
104impl SignalRegistrationCount {
105    fn new() -> Self {
106        Self(1)
107    }
108
109    fn increment(&mut self) {
110        self.0 += 1
111    }
112
113    fn decrement(&mut self) {
114        self.0 -= 1
115    }
116}
117
118/// Component on signal system entities indicating which schedule they run in.
119///
120/// Used by the multi-schedule processing system to determine which signals
121/// to run during each schedule's processing pass.
122#[derive(Component, Clone, Copy)]
123pub(crate) struct ScheduleTag(pub(crate) InternedScheduleLabel);
124
125/// Component for downstream schedule inheritance during registration.
126///
127/// When a signal is connected to an upstream via [`pipe_signal`], if the upstream
128/// has a `ScheduleHint`, the downstream inherits the schedule (unless it already
129/// has a [`ScheduleTag`]).
130#[derive(Component, Clone, Copy)]
131pub(crate) struct ScheduleHint(pub(crate) InternedScheduleLabel);
132
133/// Apply schedule tagging to a signal: tag the signal itself, propagate to unscheduled
134/// upstreams, and set hint for downstream inheritance.
135///
136/// This is the common logic used by [`SignalExt::schedule`](super::signal::SignalExt::schedule),
137/// [`SignalVecExt::schedule`](super::signal_vec::SignalVecExt::schedule), and
138/// [`SignalMapExt::schedule`](super::signal_map::SignalMapExt::schedule).
139pub(crate) fn apply_schedule_to_signal(world: &mut World, signal: SignalSystem, schedule: InternedScheduleLabel) {
140    let is_registered = world
141        .resource::<SignalGraphState>()
142        .registered_schedules
143        .contains(&schedule);
144    if !is_registered {
145        panic!(
146            "schedule `{schedule:?}` has not been registered with `JonmoPlugin`; \
147             add `.with_schedule::<{schedule:?}>()` to your `JonmoPlugin` configuration"
148        );
149    }
150    // Directly tag caller (overwrites any inherited schedule)
151    world.entity_mut(*signal).insert(ScheduleTag(schedule));
152    // Propagate to unscheduled upstreams
153    tag_unscheduled_upstreams(world, signal, schedule);
154    // Set hint for downstream inheritance
155    world.entity_mut(*signal).insert(ScheduleHint(schedule));
156}
157
158/// Tags all upstream signals that don't already have a [`ScheduleTag`].
159///
160/// Traverses the upstream graph from `start` and applies the given `schedule` to any
161/// signal that hasn't been explicitly scheduled. This ensures that when a downstream
162/// signal is scheduled, its entire upstream chain runs in a compatible schedule.
163pub(crate) fn tag_unscheduled_upstreams(world: &mut World, start: SignalSystem, schedule: InternedScheduleLabel) {
164    let mut stack = vec![start];
165    let mut visited = HashSet::new();
166    while let Some(current) = stack.pop() {
167        if !visited.insert(current) {
168            continue;
169        }
170        // Tag if not already tagged
171        if world.get::<ScheduleTag>(*current).is_none() {
172            world.entity_mut(*current).insert(ScheduleTag(schedule));
173        }
174        // Continue to upstreams
175        if let Some(upstream) = world.get::<Upstream>(*current) {
176            for &up in upstream.iter() {
177                if !visited.contains(&up) {
178                    stack.push(up);
179                }
180            }
181        }
182    }
183}
184
185pub(crate) fn register_signal<I, O, IOO, F, M>(world: &mut World, system: F) -> SignalSystem
186where
187    I: 'static,
188    O: Clone + Send + Sync + 'static,
189    IOO: Into<Option<O>> + 'static,
190    F: IntoSystem<In<I>, IOO, M> + Send + Sync + 'static,
191{
192    lazy_signal_from_system(system).register(world)
193}
194
195// TODO: many to many relationships
196#[derive(Component, Deref, Clone)]
197pub(crate) struct Upstream(pub(crate) HashSet<SignalSystem>);
198
199impl<'a> IntoIterator for &'a Upstream {
200    type Item = <Self::IntoIter as Iterator>::Item;
201    type IntoIter = bevy_platform::collections::hash_set::Iter<'a, SignalSystem>;
202
203    #[inline(always)]
204    fn into_iter(self) -> Self::IntoIter {
205        self.0.iter()
206    }
207}
208
209// TODO: many to many relationships
210#[derive(Component, Deref, Clone)]
211pub(crate) struct Downstream(HashSet<SignalSystem>);
212
213impl<'a> IntoIterator for &'a Downstream {
214    type Item = <Self::IntoIter as Iterator>::Item;
215    type IntoIter = bevy_platform::collections::hash_set::Iter<'a, SignalSystem>;
216
217    #[inline(always)]
218    fn into_iter(self) -> Self::IntoIter {
219        self.0.iter()
220    }
221}
222
223// TODO: this isn't sensitive to switchers ?
224fn would_create_cycle(world: &World, source: SignalSystem, target: SignalSystem) -> bool {
225    if source == target {
226        return true;
227    }
228    let mut stack = vec![target];
229    let mut visited = HashSet::new();
230    while let Some(node) = stack.pop() {
231        if node == source {
232            return true;
233        }
234        if visited.insert(node)
235            && let Some(down) = world.get::<Downstream>(*node)
236        {
237            stack.extend(down.iter().copied());
238        }
239    }
240    false
241}
242
243pub(crate) fn pipe_signal(world: &mut World, source: SignalSystem, target: SignalSystem) {
244    if would_create_cycle(world, source, target) {
245        // TODO: panic instead ?
246        #[cfg(feature = "tracing")]
247        bevy_log::error!("cycle detected when attempting to pipe {:?} → {:?}", source, target);
248        return;
249    }
250    let mut upstream = world.entity_mut(*source);
251    if let Some(mut downstream) = upstream.get_mut::<Downstream>() {
252        downstream.0.insert(target);
253    } else {
254        upstream.insert(Downstream(HashSet::from([target])));
255    }
256    let mut downstream = world.entity_mut(*target);
257    if let Some(mut upstream) = downstream.get_mut::<Upstream>() {
258        upstream.0.insert(source);
259    } else {
260        downstream.insert(Upstream(HashSet::from([source])));
261    }
262
263    // Inherit schedule from upstream if target doesn't already have one
264    if world.get::<ScheduleTag>(*target).is_none()
265        && let Some(hint) = world.get::<ScheduleHint>(*source).copied()
266    {
267        world
268            .entity_mut(*target)
269            .insert(ScheduleTag(hint.0))
270            .insert(ScheduleHint(hint.0)); // Pass it on to further downstreams
271    }
272
273    world
274        .resource_mut::<SignalGraphState>()
275        .edge_change_seeds
276        .insert(target);
277}
278
279#[derive(Component, Clone)]
280struct SystemRunner {
281    #[allow(clippy::type_complexity)]
282    runner: Arc<Box<dyn Fn(&mut World, Box<dyn Any>) -> Option<Box<dyn AnyClone>> + Send + Sync>>,
283}
284
285trait Runnable: Send + Sync {
286    fn run(&self, w: &mut World, i: Box<dyn Any>) -> Option<Box<dyn AnyClone>>;
287}
288
289struct SystemHolder<I, O, S>
290where
291    I: 'static,
292    O: 'static,
293    S: Into<Option<O>>,
294{
295    system: SystemId<In<I>, S>,
296    _marker: PhantomData<fn() -> O>,
297}
298
299impl<I, O, S> Runnable for SystemHolder<I, O, S>
300where
301    I: 'static,
302    O: Clone + Send + Sync,
303    S: Into<Option<O>> + 'static,
304{
305    fn run(&self, world: &mut World, input: Box<dyn Any>) -> Option<Box<dyn AnyClone>> {
306        match input.downcast::<I>() {
307            Ok(bx) => world
308                .run_system_with(self.system, *bx)
309                .ok()
310                .and_then(Into::into)
311                .map(|o| Box::new(o) as Box<dyn AnyClone>),
312            Err(_) => {
313                cfg_if::cfg_if! {
314                    if #[cfg(feature = "tracing")] {
315                        let expected_type = core::any::type_name::<I>();
316                        bevy_log::error!(
317                            "failed to downcast input for system {:?}. expected input type: `{}`",
318                            self.system, expected_type
319                        );
320                    }
321                }
322                None
323            }
324        }
325    }
326}
327
328impl SystemRunner {
329    fn run(&self, world: &mut World, input: Box<dyn Any>) -> Option<Box<dyn AnyClone>> {
330        (self.runner)(world, input)
331    }
332}
333
334/// An extension trait for [`Any`] types that implement [`Clone`], [`Send`], and [`Sync`].
335pub trait AnyClone: Any + DynClone + Send + Sync {}
336
337clone_trait_object!(AnyClone);
338
339impl<T: Clone + Send + Sync + 'static> AnyClone for T {}
340
341/// Component that stores pending inputs for a signal.
342///
343/// Inputs are written by upstream signals and read by the signal during processing.
344/// The buffer is cleared at the end of each frame.
345#[derive(Component, Default)]
346pub(crate) struct SignalInputBuffer(pub(crate) Vec<Box<dyn AnyClone>>);
347
348impl SignalInputBuffer {
349    /// Take all inputs, leaving the buffer empty.
350    fn take(&mut self) -> Vec<Box<dyn AnyClone>> {
351        core::mem::take(&mut self.0)
352    }
353
354    /// Push an input value.
355    fn push(&mut self, value: Box<dyn AnyClone>) {
356        self.0.push(value);
357    }
358
359    /// Clear the buffer.
360    fn clear(&mut self) {
361        self.0.clear();
362    }
363}
364
365/// Behavior when the signal registration recursion limit is exceeded.
366///
367/// During signal processing, signals can spawn new signals (e.g., UI elements registering
368/// child signals). These new signals are processed in the same frame via recursive passes.
369/// If signals keep spawning more signals indefinitely, this limit prevents infinite loops.
370#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
371pub enum RecursionLimitBehavior {
372    /// Panic with an error message. This is the default because hitting the limit
373    /// almost always indicates a bug (infinite signal spawning loop).
374    #[default]
375    Panic,
376    /// Log a warning (if the `tracing` feature is enabled) and stop processing
377    /// new signals for this frame. The graph may be left in an incomplete state.
378    Warn,
379    /// Silently stop processing new signals. Use this only if you understand the
380    /// implications and have a specific reason to suppress the error.
381    Silent,
382}
383
384/// Tracks signal graph topology for level-based processing.
385#[derive(Resource)]
386pub(crate) struct SignalGraphState {
387    /// Cached level for each signal (max distance from any root).
388    levels: HashMap<SignalSystem, u32>,
389    /// Signals partitioned by schedule for efficient per-schedule iteration.
390    /// Updated incrementally alongside level updates.
391    by_schedule: HashMap<InternedScheduleLabel, Vec<Vec<SignalSystem>>>,
392    /// Cache of which schedule each signal belongs to, for O(1) lookup during removal.
393    signal_schedules: HashMap<SignalSystem, InternedScheduleLabel>,
394    /// Signals that seed level recomputation after edge changes.
395    edge_change_seeds: HashSet<SignalSystem>,
396    /// Signals queued for removal while the graph is being processed.
397    deferred_removals: HashSet<SignalSystem>,
398    /// Whether the signal graph is currently being processed.
399    is_processing: bool,
400    /// Default schedule for signals without explicit scheduling.
401    default_schedule: InternedScheduleLabel,
402    /// Maximum number of recursive signal registration passes per frame.
403    registration_recursion_limit: usize,
404    /// What to do when the recursion limit is exceeded.
405    on_recursion_limit_exceeded: RecursionLimitBehavior,
406    /// Schedules that have been registered with `JonmoPlugin`.
407    registered_schedules: HashSet<InternedScheduleLabel>,
408}
409
410/// Default recursion limit for signal registration passes.
411pub const DEFAULT_REGISTRATION_RECURSION_LIMIT: usize = 100;
412
413impl Default for SignalGraphState {
414    fn default() -> Self {
415        Self::with_options(
416            PostUpdate.intern(),
417            DEFAULT_REGISTRATION_RECURSION_LIMIT,
418            RecursionLimitBehavior::default(),
419        )
420    }
421}
422
423impl SignalGraphState {
424    /// Create a new SignalGraphState with the specified default schedule.
425    #[allow(unused)] // used in tests
426    pub(crate) fn new(default_schedule: InternedScheduleLabel) -> Self {
427        Self::with_options(
428            default_schedule,
429            DEFAULT_REGISTRATION_RECURSION_LIMIT,
430            RecursionLimitBehavior::default(),
431        )
432    }
433
434    /// Create a new SignalGraphState with full configuration options.
435    pub(crate) fn with_options(
436        default_schedule: InternedScheduleLabel,
437        registration_recursion_limit: usize,
438        on_recursion_limit_exceeded: RecursionLimitBehavior,
439    ) -> Self {
440        Self {
441            levels: HashMap::default(),
442            by_schedule: HashMap::default(),
443            signal_schedules: HashMap::default(),
444            edge_change_seeds: HashSet::default(),
445            deferred_removals: HashSet::default(),
446            is_processing: false,
447            default_schedule,
448            registration_recursion_limit,
449            on_recursion_limit_exceeded,
450            registered_schedules: HashSet::from([default_schedule]),
451        }
452    }
453
454    /// Register a schedule as valid for signal processing.
455    pub(crate) fn register_schedule(&mut self, schedule: InternedScheduleLabel) {
456        self.registered_schedules.insert(schedule);
457    }
458}
459
460fn get_upstreams(world: &World, signal: SignalSystem) -> Vec<SignalSystem> {
461    world
462        .get::<Upstream>(*signal)
463        .map(|u| u.0.iter().copied().collect())
464        .unwrap_or_default()
465}
466
467fn get_downstreams(world: &World, signal: SignalSystem) -> Vec<SignalSystem> {
468    world
469        .get::<Downstream>(*signal)
470        .map(|d| d.0.iter().copied().collect())
471        .unwrap_or_default()
472}
473
474fn insert_sorted_by_index(bucket: &mut Vec<SignalSystem>, signal: SignalSystem) {
475    if bucket.contains(&signal) {
476        return;
477    }
478    let key = signal.index();
479    let index = bucket.binary_search_by_key(&key, |s| s.index()).unwrap_or_else(|i| i);
480    bucket.insert(index, signal);
481}
482
483/// Result of computing signal levels via Kahn's algorithm.
484struct LevelComputeResult {
485    /// Computed levels for each signal in the working set.
486    levels: HashMap<SignalSystem, u32>,
487    /// Number of signals successfully processed.
488    processed: usize,
489    /// Total number of signals in the working set.
490    total: usize,
491}
492
493impl LevelComputeResult {
494    fn is_complete(&self) -> bool {
495        self.processed == self.total
496    }
497}
498
499/// Core Kahn's algorithm for computing topological levels on a subset of signals.
500///
501/// - `signals`: the working set of signals to compute levels for
502/// - `upstream_filter`: determines which upstreams count toward in-degree (typically the working
503///   set)
504/// - `external_level`: provides levels for upstreams outside the working set
505fn compute_signal_levels(
506    world: &World,
507    signals: &HashSet<SignalSystem>,
508    upstream_filter: impl Fn(SignalSystem) -> bool,
509    external_level: impl Fn(SignalSystem) -> Option<u32>,
510) -> LevelComputeResult {
511    if signals.is_empty() {
512        return LevelComputeResult {
513            levels: HashMap::new(),
514            processed: 0,
515            total: 0,
516        };
517    }
518
519    let mut in_degree: HashMap<SignalSystem, usize> = HashMap::new();
520    let mut upstreams_map: HashMap<SignalSystem, Vec<SignalSystem>> = HashMap::new();
521    let mut downstreams_map: HashMap<SignalSystem, Vec<SignalSystem>> = HashMap::new();
522
523    for &signal in signals {
524        let upstreams = get_upstreams(world, signal);
525        let local_in_degree = upstreams.iter().filter(|u| upstream_filter(**u)).count();
526        in_degree.insert(signal, local_in_degree);
527        // Build local downstreams_map by inverting upstream relationships.
528        // This ensures we only traverse to signals in our working set.
529        for &upstream in upstreams.iter().filter(|u| signals.contains(*u)) {
530            downstreams_map.entry(upstream).or_default().push(signal);
531        }
532        upstreams_map.insert(signal, upstreams);
533    }
534
535    let mut queue: VecDeque<SignalSystem> = in_degree.iter().filter_map(|(s, d)| (*d == 0).then_some(*s)).collect();
536
537    let mut levels: HashMap<SignalSystem, u32> = HashMap::new();
538    let mut processed = 0usize;
539
540    while let Some(signal) = queue.pop_front() {
541        processed += 1;
542        let upstreams = upstreams_map.get(&signal).cloned().unwrap_or_default();
543        let level = upstreams
544            .iter()
545            .filter_map(|u| levels.get(u).copied().or_else(|| external_level(*u)))
546            .max()
547            .map(|m| m + 1)
548            .unwrap_or(0);
549
550        levels.insert(signal, level);
551
552        // Use local downstreams_map instead of get_downstreams to ensure we only
553        // consider signals in our working set.
554        if let Some(downstreams) = downstreams_map.get(&signal) {
555            for &downstream in downstreams {
556                if let Some(count) = in_degree.get_mut(&downstream) {
557                    *count = count.saturating_sub(1);
558                    if *count == 0 {
559                        queue.push_back(downstream);
560                    }
561                }
562            }
563        }
564    }
565
566    LevelComputeResult {
567        levels,
568        processed,
569        total: signals.len(),
570    }
571}
572
573fn bucket_levels_sorted(levels: &HashMap<SignalSystem, u32>) -> Vec<Vec<SignalSystem>> {
574    let mut by_level: Vec<Vec<SignalSystem>> = Vec::new();
575    for (&signal, &level) in levels {
576        while by_level.len() <= level as usize {
577            by_level.push(Vec::new());
578        }
579        by_level[level as usize].push(signal);
580    }
581    for bucket in &mut by_level {
582        bucket.sort_by_key(|s| s.index());
583    }
584    by_level
585}
586
587// Computes a topological ordering of signals reachable downstream from `seeds`.
588// Uses cached levels when available, falls back to BFS level computation for uncached signals.
589fn downstream_levels_from_seeds(world: &World, seeds: &[SignalSystem]) -> Vec<Vec<SignalSystem>> {
590    let state = world.resource::<SignalGraphState>();
591
592    // Collect all reachable signals via BFS.
593    let mut reachable: HashSet<SignalSystem> = HashSet::new();
594    let mut queue: VecDeque<SignalSystem> = seeds.iter().copied().collect();
595    while let Some(signal) = queue.pop_front() {
596        if reachable.insert(signal) {
597            queue.extend(get_downstreams(world, signal));
598        }
599    }
600
601    // Check if all reachable signals have cached levels.
602    let all_cached = reachable.iter().all(|s| state.levels.contains_key(s));
603
604    if all_cached {
605        // Fast path: use cached levels.
606        let mut by_level: Vec<Vec<SignalSystem>> = Vec::new();
607        for signal in reachable {
608            let level = state.levels.get(&signal).copied().unwrap_or(0) as usize;
609            while by_level.len() <= level {
610                by_level.push(Vec::new());
611            }
612            by_level[level].push(signal);
613        }
614        for bucket in &mut by_level {
615            bucket.sort_by_key(|s| s.index());
616        }
617        by_level
618    } else {
619        // Slow path: compute levels via Kahn's algorithm on the reachable subgraph.
620        // This handles newly created signals that aren't in the cache yet.
621        let result = compute_signal_levels(
622            world,
623            &reachable,
624            |u| reachable.contains(&u),        // only count upstreams in the subgraph
625            |u| state.levels.get(&u).copied(), // use cached levels for external upstreams
626        );
627        bucket_levels_sorted(&result.levels)
628    }
629}
630
631// Rebuilds per-signal levels using a Kahn-style topological traversal.
632//
633// - Roots (in-degree 0) start at level 0.
634// - Each node's level is 1 + max(level of its upstreams).
635// - Nodes are bucketed by level for deterministic per-level iteration.
636// - If a cycle or inconsistent edges are detected (not all nodes processed), this panics because
637//   the graph invariants were violated.
638fn rebuild_levels(world: &mut World, state: &mut SignalGraphState) {
639    state.levels.clear();
640    state.by_schedule.clear();
641    state.signal_schedules.clear();
642
643    let mut all_signals_state = SystemState::<Query<Entity, With<SystemRunner>>>::new(world);
644    let all_signals: HashSet<SignalSystem> = all_signals_state.get(world).iter().map(SignalSystem).collect();
645
646    let result = compute_signal_levels(
647        world,
648        &all_signals,
649        |u| all_signals.contains(&u), // all upstreams count
650        |_| None,                     // no external levels
651    );
652
653    if !result.is_complete() {
654        panic!("signal graph contains a cycle or inconsistent edges during level rebuild");
655    }
656
657    state.levels = result.levels;
658
659    // Build by_schedule partitioning and signal_schedules cache
660    for (&signal, &level) in &state.levels {
661        let schedule = world
662            .get::<ScheduleTag>(*signal)
663            .map(|tag| tag.0)
664            .unwrap_or(state.default_schedule);
665
666        // Cache the schedule for O(1) lookup during removal
667        state.signal_schedules.insert(signal, schedule);
668
669        let schedule_levels = state.by_schedule.entry(schedule).or_default();
670        while schedule_levels.len() <= level as usize {
671            schedule_levels.push(Vec::new());
672        }
673        insert_sorted_by_index(&mut schedule_levels[level as usize], signal);
674    }
675}
676
677/// Remove a signal from its current position in by_schedule.
678fn remove_signal_from_buckets(state: &mut SignalGraphState, signal: SignalSystem, old_level: u32) {
679    // Remove from by_schedule using cached schedule for O(1) lookup.
680    if let Some(&schedule) = state.signal_schedules.get(&signal)
681        && let Some(schedule_levels) = state.by_schedule.get_mut(&schedule)
682        && let Some(bucket) = schedule_levels.get_mut(old_level as usize)
683        && let Some(pos) = bucket.iter().position(|s| *s == signal)
684    {
685        bucket.remove(pos);
686    }
687}
688
689/// Insert a signal at its new level in by_schedule.
690fn insert_signal_into_buckets(world: &World, state: &mut SignalGraphState, signal: SignalSystem, new_level: u32) {
691    // Get schedule and insert into by_schedule
692    let schedule = world
693        .get::<ScheduleTag>(*signal)
694        .map(|tag| tag.0)
695        .unwrap_or(state.default_schedule);
696
697    // Update the schedule cache
698    state.signal_schedules.insert(signal, schedule);
699
700    let schedule_levels = state.by_schedule.entry(schedule).or_default();
701    while schedule_levels.len() <= new_level as usize {
702        schedule_levels.push(Vec::new());
703    }
704    insert_sorted_by_index(&mut schedule_levels[new_level as usize], signal);
705}
706
707fn update_levels_incremental(world: &mut World, state: &mut SignalGraphState, seeds: &[SignalSystem]) -> bool {
708    // Collect all signals affected by edge changes (seeds + all their downstreams).
709    let mut affected: HashSet<SignalSystem> = HashSet::new();
710    let mut queue: VecDeque<SignalSystem> = seeds.iter().copied().collect();
711
712    while let Some(signal) = queue.pop_front() {
713        if affected.insert(signal) {
714            for downstream in get_downstreams(world, signal) {
715                queue.push_back(downstream);
716            }
717        }
718    }
719    if affected.is_empty() {
720        return true;
721    }
722
723    // Use Kahn's algorithm to compute new levels for affected signals
724    let result = compute_signal_levels(
725        world,
726        &affected,
727        |u| affected.contains(&u),         // only affected upstreams count for in-degree
728        |u| state.levels.get(&u).copied(), // unaffected upstreams use cached levels
729    );
730
731    if !result.is_complete() {
732        return false;
733    }
734
735    // Update state with short-circuit optimization:
736    // Only update buckets for signals whose level actually changed.
737    for &signal in &affected {
738        let old_level = state.levels.get(&signal).copied();
739        let new_level = result.levels.get(&signal).copied();
740
741        // Short-circuit: if level didn't change, skip bucket updates
742        if old_level == new_level {
743            continue;
744        }
745
746        // Remove from old position
747        if let Some(old) = old_level {
748            remove_signal_from_buckets(state, signal, old);
749        }
750
751        // Insert at new position
752        if let Some(new_level) = new_level {
753            state.levels.insert(signal, new_level);
754            insert_signal_into_buckets(world, state, signal, new_level);
755        } else {
756            state.levels.remove(&signal);
757            state.signal_schedules.remove(&signal);
758        }
759    }
760
761    true
762}
763
764/// Updates signal levels based on edge changes and returns the seeds that were processed.
765///
766/// Returns the signals that were in `edge_change_seeds` before processing. This allows
767/// callers to know which signals triggered the update without needing to drain and re-add.
768fn update_edge_change_levels(world: &mut World, state: &mut SignalGraphState) -> Vec<SignalSystem> {
769    if state.levels.is_empty() {
770        rebuild_levels(world, state);
771        return state.edge_change_seeds.drain().collect();
772    }
773
774    if state.edge_change_seeds.is_empty() {
775        return Vec::new();
776    }
777
778    // Drain seeds once and pass directly to update function
779    let seeds: Vec<SignalSystem> = state.edge_change_seeds.drain().collect();
780
781    if !update_levels_incremental(world, state, &seeds) {
782        panic!("signal graph contains a cycle or inconsistent edges during incremental update");
783    }
784
785    seeds
786}
787
788fn remove_signal_from_graph_state_internal(state: &mut SignalGraphState, signal: SignalSystem) {
789    if let Some(level) = state.levels.remove(&signal) {
790        remove_signal_from_buckets(state, signal, level);
791    }
792    state.signal_schedules.remove(&signal);
793    state.edge_change_seeds.remove(&signal);
794}
795
796fn remove_signal_from_graph_state(world: &mut World, signal: SignalSystem) {
797    if let Some(mut state) = world.get_resource_mut::<SignalGraphState>() {
798        if state.is_processing {
799            state.deferred_removals.insert(signal);
800        } else {
801            remove_signal_from_graph_state_internal(&mut state, signal);
802        }
803    }
804}
805
806fn apply_deferred_removals(state: &mut SignalGraphState) {
807    if state.deferred_removals.is_empty() {
808        return;
809    }
810    let removals = state.deferred_removals.drain().collect::<Vec<_>>();
811    for signal in removals {
812        remove_signal_from_graph_state_internal(state, signal);
813    }
814}
815
816/// Runs a signal node: reads inputs from its [`SignalInputBuffer`], executes the system,
817/// and writes outputs to downstream signals' buffers.
818///
819/// Avoids holding references to [`SignalGraphState`] during execution, since signal
820/// systems may spawn new signals that call [`pipe_signal`].
821fn run_signal_node(world: &mut World, signal: SignalSystem) {
822    // Get runner and inputs before running (to avoid borrow conflicts)
823    let (runner, signal_inputs, upstreams) = {
824        let runner = match world.get::<SystemRunner>(*signal).cloned() {
825            Some(runner) => runner,
826            None => {
827                if world.get_entity(*signal).is_err() {
828                    // Re-entrant combinators can despawn signals during the same frame; skip these stale IDs.
829                    return;
830                }
831                let upstreams = get_upstreams(world, signal);
832                let downstreams = get_downstreams(world, signal);
833                panic!(
834                    "missing SystemRunner for signal {:?} during processing (entity exists). upstreams={:?}, downstreams={:?}",
835                    signal, upstreams, downstreams
836                );
837            }
838        };
839
840        // Take inputs from the signal's component
841        let signal_inputs = world
842            .get_mut::<SignalInputBuffer>(*signal)
843            .map(|mut buffer| buffer.take())
844            .unwrap_or_default();
845
846        let upstreams = get_upstreams(world, signal);
847
848        (runner, signal_inputs, upstreams)
849    };
850
851    // Run the signal system
852    let final_output = if upstreams.is_empty() {
853        // Root signal, run with unit input
854        runner.run(world, Box::new(()))
855    } else if !signal_inputs.is_empty() {
856        // Run once per upstream input received, forwarding only the final output.
857        // This means a signal with multiple upstreams acts as a "collection" point:
858        // it processes each upstream's output, but only its last output propagates downstream.
859        let mut output = None;
860        for input in signal_inputs {
861            if let Some(o) = runner.run(world, input) {
862                output = Some(o);
863            }
864        }
865        output
866    } else {
867        None
868    };
869
870    // Write outputs directly to downstream components
871    if let Some(output) = final_output {
872        let downstreams = get_downstreams(world, signal);
873        if let Some((last, rest)) = downstreams.split_last() {
874            // Clone for all but last
875            for downstream in rest {
876                if let Ok(mut entity) = world.get_entity_mut(**downstream)
877                    && let Some(mut buffer) = entity.get_mut::<SignalInputBuffer>()
878                {
879                    buffer.push(output.clone());
880                }
881            }
882            // Last downstream gets the original (no clone)
883            if let Ok(mut entity) = world.get_entity_mut(**last)
884                && let Some(mut buffer) = entity.get_mut::<SignalInputBuffer>()
885            {
886                buffer.push(output);
887            }
888        }
889    }
890}
891
892pub(crate) fn trigger_signal_subgraph(
893    world: &mut World,
894    signals: impl AsRef<[SignalSystem]>,
895    input: Box<dyn AnyClone>,
896) {
897    let signals = signals.as_ref();
898    if signals.is_empty() {
899        return;
900    }
901
902    // Pre-populate inputs for seed signals by writing to their components
903    if let Some((last, rest)) = signals.split_last() {
904        for signal in rest {
905            if let Ok(mut entity) = world.get_entity_mut(**signal)
906                && let Some(mut buffer) = entity.get_mut::<SignalInputBuffer>()
907            {
908                buffer.push(input.clone());
909            }
910        }
911        // Last signal gets the original (no clone)
912        if let Ok(mut entity) = world.get_entity_mut(**last)
913            && let Some(mut buffer) = entity.get_mut::<SignalInputBuffer>()
914        {
915            buffer.push(input);
916        }
917    }
918
919    // Process seeds and all their downstreams in topological order.
920    let by_level = downstream_levels_from_seeds(world, signals);
921    for level in by_level {
922        for signal in level {
923            run_signal_node(world, signal);
924        }
925    }
926}
927
928/// Creates a system that processes only signals tagged for the specified schedule.
929///
930/// Uses persistent inputs stored in [`SignalGraphState`] to enable cross-schedule data flow.
931/// Includes a fixpoint loop to process signals that are registered during processing.
932pub(crate) fn process_signal_graph_for_schedule(schedule: InternedScheduleLabel) -> impl FnMut(&mut World) {
933    move |world: &mut World| {
934        // Phase 1: Update graph if needed, take this schedule's signals (avoids cloning)
935        let levels_for_schedule: Vec<Vec<SignalSystem>> = {
936            world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
937                // Recompute levels if edges changed (also updates partition incrementally)
938                let _ = update_edge_change_levels(world, &mut state);
939                state.is_processing = true;
940
941                // Take this schedule's signals to avoid cloning; we'll put them back after processing
942                state.by_schedule.remove(&schedule).unwrap_or_default()
943            })
944        };
945
946        // Track signals we've already processed this frame to avoid double-processing
947        let mut processed: HashSet<SignalSystem> = HashSet::default();
948
949        // Phase 2: Process signals level-by-level using persistent inputs
950        // Note: We don't use resource_scope here because signal processing may spawn
951        // new elements that register new signals, which calls pipe_signal, which needs
952        // to access SignalGraphState. Using resource_scope would temporarily remove
953        // the resource and cause a panic.
954        for level in &levels_for_schedule {
955            for &signal in level {
956                processed.insert(signal);
957                run_signal_node(world, signal);
958            }
959        }
960
961        // Put levels back
962        world
963            .resource_mut::<SignalGraphState>()
964            .by_schedule
965            .insert(schedule, levels_for_schedule);
966
967        // Phase 2b: Process signals registered during processing (registration recursion).
968        // This handles cases like child_signal spawning elements with their own signals.
969        let (recursion_limit, limit_behavior) = {
970            let state = world.resource::<SignalGraphState>();
971            (state.registration_recursion_limit, state.on_recursion_limit_exceeded)
972        };
973        let mut recursion_pass = 0usize;
974
975        loop {
976            recursion_pass += 1;
977
978            if recursion_pass > recursion_limit {
979                match limit_behavior {
980                    RecursionLimitBehavior::Panic => {
981                        panic!(
982                            "Signal registration recursion limit exceeded ({} passes) in schedule {:?}. \
983                             This usually indicates an infinite loop where signals keep spawning new signals. \
984                             Processed {} signals before limit was reached. \
985                             Use `JonmoPlugin::on_recursion_limit_exceeded` to change this behavior.",
986                            recursion_limit,
987                            schedule,
988                            processed.len()
989                        );
990                    }
991                    RecursionLimitBehavior::Warn => {
992                        #[cfg(feature = "tracing")]
993                        bevy_log::warn!(
994                            "Signal registration recursion limit exceeded ({} passes) in schedule {:?}. \
995                             This may indicate an infinite loop where signals keep spawning new signals. \
996                             Processed {} signals so far.",
997                            recursion_limit,
998                            schedule,
999                            processed.len()
1000                        );
1001                        break;
1002                    }
1003                    RecursionLimitBehavior::Silent => {
1004                        break;
1005                    }
1006                }
1007            }
1008
1009            // Update levels for new signals and get the seeds that were processed
1010            let new_seeds: Vec<SignalSystem> = world
1011                .resource_scope(|world, mut state: Mut<SignalGraphState>| update_edge_change_levels(world, &mut state));
1012
1013            if new_seeds.is_empty() {
1014                break;
1015            }
1016
1017            // Get signals in this schedule that we haven't processed yet
1018            let new_signals: Vec<SignalSystem> = new_seeds
1019                .into_iter()
1020                .filter(|s| {
1021                    !processed.contains(s)
1022                        && world
1023                            .get::<ScheduleTag>(**s)
1024                            .map(|tag| tag.0 == schedule)
1025                            .unwrap_or(false)
1026                })
1027                .collect();
1028
1029            if new_signals.is_empty() {
1030                break;
1031            }
1032
1033            // Process new signals and their downstreams in topological order
1034            let new_levels = downstream_levels_from_seeds(world, &new_signals);
1035            for level in new_levels {
1036                for signal in level {
1037                    if processed.insert(signal) {
1038                        // Only process if we haven't already
1039                        if world
1040                            .get::<ScheduleTag>(*signal)
1041                            .map(|tag| tag.0 == schedule)
1042                            .unwrap_or(false)
1043                        {
1044                            run_signal_node(world, signal);
1045                        }
1046                    }
1047                }
1048            }
1049        }
1050
1051        // Phase 3: Cleanup
1052        let mut state = world.resource_mut::<SignalGraphState>();
1053        state.is_processing = false;
1054        apply_deferred_removals(&mut state);
1055    }
1056}
1057
1058/// Clears persistent inputs at the end of each frame.
1059///
1060/// This should run after all signal processing systems in the frame.
1061/// Clears all signal input buffers at the end of each frame.
1062///
1063/// This should run after all signal processing systems in the frame.
1064pub(crate) fn clear_signal_inputs(mut buffers: Query<&mut SignalInputBuffer>) {
1065    for mut buffer in &mut buffers {
1066        buffer.clear();
1067    }
1068}
1069
1070/// Handle to a particular node of the signal graph, returned by
1071/// [`SignalExt::register`](super::signal::SignalExt),
1072/// [`SignalVecExt::register`](super::signal_vec::SignalVecExt::register), and
1073/// [`SignalMapExt::register`](super::signal_map::SignalMapExt::register). In order
1074/// for signals to be appropriately cleaned up, for every call to `.register` made
1075/// to some particular signal or its clones, [`SignalHandle::cleanup`] must be
1076/// called on a corresponding [`SignalHandle`] or a downstream [`SignalHandle`].
1077/// Adding [`SignalHandle`]s to the [`SignalHandles`] [`Component`] will take care
1078/// of this when the corresponding [`Entity`] is despawned, and using the
1079/// [`jonmo::Builder`](super::builder::Builder) will manage this internally.
1080#[derive(Clone, Deref, Debug)]
1081pub struct SignalHandle(pub SignalSystem);
1082
1083impl From<SignalSystem> for SignalHandle {
1084    fn from(signal: SignalSystem) -> Self {
1085        Self(signal)
1086    }
1087}
1088
1089impl SignalHandle {
1090    #[allow(missing_docs)]
1091    pub(crate) fn new(signal: SignalSystem) -> Self {
1092        Self(signal)
1093    }
1094
1095    /// Decrements the usage tracking of the corresponding signal and all its
1096    /// upstreams, potentially despawning the backing [`System`], see [`SignalHandle`].
1097    pub fn cleanup(self, world: &mut World) {
1098        cleanup_recursive(world, *self);
1099    }
1100}
1101
1102fn cleanup_signal_handles(mut world: DeferredWorld, HookContext { entity, .. }: HookContext) {
1103    let handles: Vec<_> = world
1104        .entity_mut(entity)
1105        .get_mut::<SignalHandles>()
1106        .unwrap()
1107        .0
1108        .drain(..)
1109        .collect();
1110    let mut commands = world.commands();
1111    for handle in handles {
1112        commands.queue(|world: &mut World| handle.cleanup(world));
1113    }
1114}
1115
1116/// Stores [`SignalHandle`]s tied to the lifetime of some [`Entity`],
1117/// [`.cleanup`](SignalHandle::cleanup)-ing them when the [`Entity`] is despawned.
1118#[derive(Component, Default, Deref, Clone)]
1119#[component(on_remove = cleanup_signal_handles)]
1120pub struct SignalHandles(Vec<SignalHandle>);
1121
1122impl<T> From<T> for SignalHandles
1123where
1124    Vec<SignalHandle>: From<T>,
1125{
1126    #[inline]
1127    fn from(values: T) -> Self {
1128        SignalHandles(values.into())
1129    }
1130}
1131
1132impl SignalHandles {
1133    #[allow(missing_docs)]
1134    pub fn add(&mut self, handle: SignalHandle) {
1135        self.0.push(handle);
1136    }
1137}
1138
1139fn spawn_signal<I, O, IOO, F, M>(world: &mut World, system: F) -> SignalSystem
1140where
1141    I: 'static,
1142    O: Clone + Send + Sync + 'static,
1143    IOO: Into<Option<O>> + 'static,
1144    F: IntoSystem<In<I>, IOO, M> + 'static,
1145{
1146    let signal_system = world.register_system(system);
1147    let runner: Arc<Box<dyn Runnable>> = Arc::new(Box::new(SystemHolder::<I, O, IOO> {
1148        system: signal_system,
1149        _marker: PhantomData,
1150    }));
1151    world.entity_mut(signal_system.entity()).insert((
1152        SignalRegistrationCount::new(),
1153        SignalInputBuffer::default(),
1154        SystemRunner {
1155            runner: Arc::new(Box::new(move |world, input| runner.run(world, input))),
1156        },
1157    ));
1158    if let Some(mut state) = world.get_resource_mut::<SignalGraphState>() {
1159        state.edge_change_seeds.insert(signal_system.entity().into());
1160    }
1161    signal_system.into()
1162}
1163
1164pub(crate) struct LazySignalState {
1165    references: AtomicUsize,
1166    pub(crate) system: RwLock<LazySystem>,
1167}
1168
1169pub(crate) enum LazySystem {
1170    #[allow(clippy::type_complexity)]
1171    System(Option<Box<dyn FnOnce(&mut World) -> SignalSystem + Send + Sync>>),
1172    Registered(SignalSystem),
1173}
1174
1175impl LazySystem {
1176    fn register(&mut self, world: &mut World) -> SignalSystem {
1177        match self {
1178            LazySystem::System(f) => {
1179                let signal = f.take().unwrap()(world);
1180                *self = LazySystem::Registered(signal);
1181                signal
1182            }
1183            LazySystem::Registered(signal) => {
1184                world
1185                    .entity_mut(**signal)
1186                    .get_mut::<SignalRegistrationCount>()
1187                    .unwrap()
1188                    .increment();
1189                *signal
1190            }
1191        }
1192    }
1193}
1194
1195pub(crate) struct LazySignal {
1196    pub(crate) inner: Arc<LazySignalState>,
1197}
1198
1199impl LazySignal {
1200    pub(crate) fn new<F: FnOnce(&mut World) -> SignalSystem + Send + Sync + 'static>(system: F) -> Self {
1201        LazySignal {
1202            inner: Arc::new(LazySignalState {
1203                references: AtomicUsize::new(1),
1204                system: RwLock::new(LazySystem::System(Some(Box::new(system)))),
1205            }),
1206        }
1207    }
1208
1209    pub(crate) fn register(self, world: &mut World) -> SignalSystem {
1210        let signal = self.inner.system.write().unwrap().register(world);
1211        let mut entity = world.entity_mut(*signal);
1212        if !entity.contains::<LazySignalHolder>() {
1213            entity.insert(LazySignalHolder(self));
1214        }
1215        signal
1216    }
1217}
1218
1219impl Clone for LazySignal {
1220    fn clone(&self) -> Self {
1221        self.inner.references.fetch_add(1, Ordering::SeqCst);
1222        LazySignal {
1223            inner: self.inner.clone(),
1224        }
1225    }
1226}
1227
1228impl Drop for LazySignal {
1229    fn drop(&mut self) {
1230        // <= 2 because we also wna queue if only the holder remains
1231        if self.inner.references.fetch_sub(1, Ordering::SeqCst) <= 2
1232            && let LazySystem::Registered(signal) = *self.inner.system.read().unwrap()
1233        {
1234            STALE_SIGNALS.lock().unwrap().push(signal);
1235        }
1236    }
1237}
1238
1239#[derive(Component)]
1240pub(crate) struct LazySignalHolder(LazySignal);
1241
1242pub(crate) static STALE_SIGNALS: LazyLock<Mutex<Vec<SignalSystem>>> = LazyLock::new(|| Mutex::new(Vec::new()));
1243
1244fn unlink_downstreams_and_mark(world: &mut World, signal: SignalSystem) {
1245    if let Some(downstreams) = world.get::<Downstream>(*signal).cloned() {
1246        for &downstream in downstreams.iter() {
1247            if let Ok(mut downstream_entity) = world.get_entity_mut(*downstream)
1248                && let Some(mut upstream) = downstream_entity.get_mut::<Upstream>()
1249            {
1250                upstream.0.remove(&signal);
1251                if upstream.0.is_empty() {
1252                    downstream_entity.remove::<Upstream>();
1253                }
1254            }
1255            world
1256                .resource_mut::<SignalGraphState>()
1257                .edge_change_seeds
1258                .insert(downstream);
1259        }
1260    }
1261}
1262
1263pub(crate) fn despawn_stale_signals(world: &mut World) {
1264    let signals = STALE_SIGNALS.lock().unwrap().drain(..).collect::<Vec<_>>();
1265    for signal in signals {
1266        let should_despawn = world
1267            .get::<SignalRegistrationCount>(*signal)
1268            .map(|registration_count| **registration_count == 0)
1269            .unwrap_or(false);
1270        if should_despawn {
1271            unlink_downstreams_and_mark(world, signal);
1272            remove_signal_from_graph_state(world, signal);
1273            world.entity_mut(*signal).despawn();
1274        }
1275    }
1276}
1277
1278pub(crate) fn lazy_signal_from_system<I, O, IOO, F, M>(system: F) -> LazySignal
1279where
1280    I: 'static,
1281    O: Clone + Send + Sync + 'static,
1282    IOO: Into<Option<O>> + 'static,
1283    F: IntoSystem<In<I>, IOO, M> + Send + Sync + 'static,
1284{
1285    LazySignal::new(move |world: &mut World| spawn_signal(world, system))
1286}
1287
1288#[allow(dead_code)]
1289pub(crate) struct UpstreamIter<'w, 's, D: QueryData, F: QueryFilter>
1290where
1291    D::ReadOnly: QueryData<Item<'w, 's> = &'w Upstream>,
1292{
1293    upstreams_query: &'w Query<'w, 's, D, F>,
1294    upstreams: Vec<SignalSystem>,
1295}
1296
1297#[allow(dead_code)]
1298impl<'w, 's, D: QueryData, F: QueryFilter> UpstreamIter<'w, 's, D, F>
1299where
1300    D::ReadOnly: QueryData<Item<'w, 's> = &'w Upstream>,
1301{
1302    /// Returns a new [`DescendantIter`].
1303    pub fn new(upstreams_query: &'w Query<'w, 's, D, F>, signal: SignalSystem) -> Self {
1304        UpstreamIter {
1305            upstreams_query,
1306            upstreams: upstreams_query.get(*signal).into_iter().flatten().copied().collect(),
1307        }
1308    }
1309}
1310
1311impl<'w, 's, D: QueryData, F: QueryFilter> Iterator for UpstreamIter<'w, 's, D, F>
1312where
1313    D::ReadOnly: QueryData<Item<'w, 's> = &'w Upstream>,
1314{
1315    type Item = SignalSystem;
1316
1317    fn next(&mut self) -> Option<Self::Item> {
1318        let signal = self.upstreams.pop()?;
1319        if let Ok(upstream) = self.upstreams_query.get(*signal) {
1320            self.upstreams.extend(upstream);
1321        }
1322        Some(signal)
1323    }
1324}
1325
1326#[allow(dead_code)]
1327pub(crate) struct DownstreamIter<'w, 's, D: QueryData, F: QueryFilter>
1328where
1329    D::ReadOnly: QueryData<Item<'w, 's> = &'w Downstream>,
1330{
1331    downstreams_query: &'w Query<'w, 's, D, F>,
1332    downstreams: Vec<SignalSystem>,
1333}
1334
1335impl<'w, 's, D: QueryData, F: QueryFilter> DownstreamIter<'w, 's, D, F>
1336where
1337    D::ReadOnly: QueryData<Item<'w, 's> = &'w Downstream>,
1338{
1339    #[allow(dead_code)]
1340    pub fn new(downstreams_query: &'w Query<'w, 's, D, F>, signal: SignalSystem) -> Self {
1341        DownstreamIter {
1342            downstreams_query,
1343            downstreams: downstreams_query.get(*signal).into_iter().flatten().copied().collect(),
1344        }
1345    }
1346}
1347
1348impl<'w, 's, D: QueryData, F: QueryFilter> Iterator for DownstreamIter<'w, 's, D, F>
1349where
1350    D::ReadOnly: QueryData<Item<'w, 's> = &'w Downstream>,
1351{
1352    type Item = SignalSystem;
1353
1354    fn next(&mut self) -> Option<Self::Item> {
1355        let signal = self.downstreams.pop()?;
1356        if let Ok(downstream) = self.downstreams_query.get(*signal) {
1357            self.downstreams.extend(downstream);
1358        }
1359        Some(signal)
1360    }
1361}
1362
1363fn decrement_registration_and_needs_cleanup(world: &mut World, signal: SignalSystem) -> bool {
1364    if let Ok(mut entity) = world.get_entity_mut(*signal)
1365        && let Some(mut count) = entity.get_mut::<SignalRegistrationCount>()
1366    {
1367        count.decrement();
1368        return **count == 0;
1369    }
1370    false
1371}
1372
1373fn should_despawn_signal(world: &World, signal: SignalSystem) -> bool {
1374    world
1375        .get_entity(*signal)
1376        .ok()
1377        .map(|entity| {
1378            if let Some(lazy_holder) = entity.get::<LazySignalHolder>() {
1379                lazy_holder.0.inner.references.load(Ordering::SeqCst) == 1
1380            } else {
1381                // This is a dynamically created signal without a holder, it can be despawned once
1382                // its registrations are gone.
1383                true
1384            }
1385        })
1386        .unwrap_or(false)
1387}
1388
1389fn unlink_from_upstream(world: &mut World, upstream_system: SignalSystem, signal: SignalSystem) {
1390    if let Ok(mut upstream_entity) = world.get_entity_mut(*upstream_system)
1391        && let Some(mut downstream) = upstream_entity.get_mut::<Downstream>()
1392    {
1393        downstream.0.remove(&signal);
1394        if downstream.0.is_empty() {
1395            upstream_entity.remove::<Downstream>();
1396        }
1397    }
1398}
1399
1400fn cleanup_recursive(world: &mut World, signal: SignalSystem) {
1401    // Decrement registration and bail if the node is still in use.
1402    if !decrement_registration_and_needs_cleanup(world, signal) {
1403        return;
1404    }
1405
1406    // The count is zero. Perform the full cleanup. First, get the list of parents.
1407    let upstreams = world.get::<Upstream>(*signal).cloned();
1408
1409    // Unlink downstream edges and mark affected nodes for level recomputation.
1410    unlink_downstreams_and_mark(world, signal);
1411
1412    if should_despawn_signal(world, signal) {
1413        remove_signal_from_graph_state(world, signal);
1414        if let Ok(entity) = world.get_entity_mut(*signal) {
1415            entity.despawn();
1416        }
1417    }
1418
1419    // Notify parents and recurse after processing this node.
1420    if let Some(upstreams) = upstreams {
1421        for &upstream_system in upstreams.iter() {
1422            unlink_from_upstream(world, upstream_system, signal);
1423            cleanup_recursive(world, upstream_system);
1424        }
1425    }
1426}
1427
1428/// Computes topological levels for signals upstream of a target that aren't in the cached state.
1429/// Uses Kahn's algorithm on the reachable subgraph.
1430fn compute_levels_for_uncached(
1431    world: &World,
1432    reachable: &HashSet<SignalSystem>,
1433    cached_levels: &HashMap<SignalSystem, u32>,
1434) -> HashMap<SignalSystem, u32> {
1435    let uncached: HashSet<SignalSystem> = reachable
1436        .iter()
1437        .filter(|s| !cached_levels.contains_key(*s))
1438        .copied()
1439        .collect();
1440
1441    // Note: upstream_filter uses `reachable` (not `uncached`) because we need to count
1442    // in-degree based on the full reachable subgraph, but only compute levels for uncached signals.
1443    // Cached upstreams don't contribute to in-degree since their levels are already known.
1444    let result = compute_signal_levels(
1445        world,
1446        &uncached,
1447        |u| reachable.contains(&u) && !cached_levels.contains_key(&u),
1448        |u| cached_levels.get(&u).copied(),
1449    );
1450
1451    result.levels
1452}
1453
1454fn poll_signal_one_shot(In(target): In<SignalSystem>, world: &mut World) -> Option<Box<dyn AnyClone>> {
1455    // Collect all signals reachable upstream from target
1456    let mut reachable: HashSet<SignalSystem> = HashSet::new();
1457    let mut queue: VecDeque<SignalSystem> = VecDeque::new();
1458    queue.push_back(target);
1459    reachable.insert(target);
1460
1461    while let Some(signal) = queue.pop_front() {
1462        for upstream in get_upstreams(world, signal) {
1463            if reachable.insert(upstream) {
1464                queue.push_back(upstream);
1465            }
1466        }
1467    }
1468
1469    // Get cached levels and compute levels for any uncached signals.
1470    // We avoid cloning the entire levels HashMap by collecting only the levels we need.
1471
1472    // First pass: check which signals need level computation
1473    let uncached: HashSet<SignalSystem> = {
1474        let state = world.resource::<SignalGraphState>();
1475        reachable
1476            .iter()
1477            .filter(|s| !state.levels.contains_key(*s))
1478            .copied()
1479            .collect()
1480    };
1481
1482    // Compute levels for uncached signals if any
1483    let uncached_levels = if uncached.is_empty() {
1484        HashMap::default()
1485    } else {
1486        compute_levels_for_uncached(world, &reachable, &world.resource::<SignalGraphState>().levels)
1487    };
1488
1489    // Bucket by level
1490    let by_level = {
1491        let state = world.resource::<SignalGraphState>();
1492        let mut by_level: Vec<Vec<SignalSystem>> = Vec::new();
1493        for signal in &reachable {
1494            let level = state
1495                .levels
1496                .get(signal)
1497                .or_else(|| uncached_levels.get(signal))
1498                .copied()
1499                .unwrap_or(0) as usize;
1500            while by_level.len() <= level {
1501                by_level.push(Vec::new());
1502            }
1503            by_level[level].push(*signal);
1504        }
1505
1506        // Sort each level for determinism
1507        for level in &mut by_level {
1508            level.sort_by_key(|s| s.index());
1509        }
1510
1511        by_level
1512    };
1513
1514    // Process level by level, running each signal once per upstream input received.
1515    // We only track the target's output directly instead of storing all outputs.
1516    let mut inputs: HashMap<SignalSystem, Vec<Box<dyn AnyClone>>> = HashMap::new();
1517    let mut target_output: Option<Box<dyn AnyClone>> = None;
1518
1519    for level in by_level {
1520        for signal in level {
1521            let runner = world
1522                .get::<SystemRunner>(*signal)
1523                .cloned()
1524                .unwrap_or_else(|| panic!("missing SystemRunner for signal {:?} during poll", signal));
1525
1526            let upstreams = get_upstreams(world, signal);
1527
1528            let output = if upstreams.is_empty() {
1529                // Source signal, run with unit input
1530                runner.run(world, Box::new(()))
1531            } else if let Some(input_list) = inputs.remove(&signal) {
1532                // Has inputs from upstreams, run once per input, keep final output
1533                let mut final_output = None;
1534                for input in input_list {
1535                    if let Some(out) = runner.run(world, input) {
1536                        final_output = Some(out);
1537                    }
1538                }
1539                final_output
1540            } else {
1541                // No input received, signal doesn't fire
1542                None
1543            };
1544
1545            // Only store output if this is the target we're polling
1546            if signal == target {
1547                target_output = output;
1548                // Target found, no need to propagate further since we're done
1549                continue;
1550            }
1551
1552            // Propagate output to downstreams
1553            if let Some(out) = output {
1554                let downstreams = get_downstreams(world, signal);
1555                if let Some((last, rest)) = downstreams.split_last() {
1556                    for downstream in rest {
1557                        inputs.entry(*downstream).or_default().push(out.clone());
1558                    }
1559                    inputs.entry(*last).or_default().push(out);
1560                }
1561            }
1562        }
1563    }
1564
1565    target_output
1566}
1567
1568/// Get a signal's current value by running all of it's dependencies. Use
1569/// `.and_then(downcast_any_clone::<T>)` to downcast the output to the expected type.
1570///
1571/// # Example
1572///
1573/// ```
1574/// use bevy::prelude::*;
1575/// use jonmo::{graph::*, prelude::*};
1576///
1577/// let mut app = App::new();
1578/// app.add_plugins((MinimalPlugins, JonmoPlugin::default()));
1579/// let signal = *signal::once(42usize).register(app.world_mut());
1580/// let value = poll_signal(app.world_mut(), signal).and_then(downcast_any_clone::<usize>);
1581/// assert_eq!(value, Some(42));
1582/// ```
1583pub fn poll_signal(world: &mut World, signal: SignalSystem) -> Option<Box<dyn AnyClone>> {
1584    world
1585        .run_system_cached_with(poll_signal_one_shot, signal)
1586        .ok()
1587        .flatten()
1588}
1589
1590/// Utility function for extracting values from [`AnyClone`]s, e.g. those returned by
1591/// [`poll_signal`].
1592///
1593/// # Example
1594///
1595/// ```
1596/// use bevy::prelude::*;
1597/// use jonmo::{prelude::*, graph::*};
1598///
1599/// let mut app = App::new();
1600/// app.add_plugins((MinimalPlugins, JonmoPlugin::default()));
1601/// let signal = *signal::once(1).register(app.world_mut());
1602/// poll_signal(app.world_mut(), signal).and_then(downcast_any_clone::<usize>); // outputs an `Option<usize>`
1603/// ```
1604pub fn downcast_any_clone<T: 'static>(any_clone: Box<dyn AnyClone>) -> Option<T> {
1605    (any_clone as Box<dyn Any>).downcast::<T>().map(|o| *o).ok()
1606}
1607
1608#[cfg(test)]
1609mod tests {
1610    use super::*;
1611
1612    use bevy_ecs::{
1613        prelude::{In, Mut, World},
1614        schedule::ScheduleLabel,
1615    };
1616
1617    #[derive(Resource, Default)]
1618    struct Order(Vec<&'static str>);
1619
1620    /// Helper to process all signals in the default schedule for tests.
1621    fn process_signals(world: &mut World) {
1622        let default_schedule = world.resource::<SignalGraphState>().default_schedule;
1623        let mut system = process_signal_graph_for_schedule(default_schedule);
1624        system(world);
1625    }
1626
1627    #[test]
1628    #[should_panic(expected = "signal graph contains a cycle or inconsistent edges during incremental update")]
1629    fn incremental_update_panics_on_cycle() {
1630        let mut world = World::new();
1631        world.insert_resource(SignalGraphState::default());
1632
1633        let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1634        let signal_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1635
1636        world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1637            rebuild_levels(world, &mut state);
1638        });
1639
1640        world
1641            .entity_mut(*signal_a)
1642            .insert(Downstream(HashSet::from([signal_b])))
1643            .insert(Upstream(HashSet::from([signal_b])));
1644        world
1645            .entity_mut(*signal_b)
1646            .insert(Downstream(HashSet::from([signal_a])))
1647            .insert(Upstream(HashSet::from([signal_a])));
1648
1649        world
1650            .resource_mut::<SignalGraphState>()
1651            .edge_change_seeds
1652            .insert(signal_a);
1653
1654        world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1655            update_edge_change_levels(world, &mut state);
1656        });
1657    }
1658
1659    #[test]
1660    fn ordering_is_deterministic_with_multiple_roots() {
1661        let mut world = World::new();
1662        world.insert_resource(SignalGraphState::default());
1663        world.insert_resource(Order::default());
1664
1665        let signal_a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1666            order.0.push("a");
1667            Some(())
1668        });
1669        let signal_b = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1670            order.0.push("b");
1671            Some(())
1672        });
1673
1674        process_signals(&mut world);
1675
1676        let order = world.resource::<Order>().0.clone();
1677        if signal_a.0.index() < signal_b.0.index() {
1678            assert_eq!(order, vec!["a", "b"]);
1679        } else {
1680            assert_eq!(order, vec!["b", "a"]);
1681        }
1682    }
1683
1684    #[test]
1685    fn piping_updates_levels_for_same_frame_execution() {
1686        let mut world = World::new();
1687        world.insert_resource(SignalGraphState::default());
1688        world.insert_resource(Order::default());
1689
1690        let signal_a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1691            order.0.push("a");
1692            Some(())
1693        });
1694        let signal_b = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1695            order.0.push("b");
1696            Some(())
1697        });
1698
1699        process_signals(&mut world);
1700        world.resource_mut::<Order>().0.clear();
1701
1702        pipe_signal(&mut world, signal_a, signal_b);
1703        process_signals(&mut world);
1704
1705        let order = world.resource::<Order>().0.clone();
1706        assert_eq!(order, vec!["a", "b"]);
1707    }
1708
1709    #[test]
1710    fn incremental_matches_full_rebuild_after_edge_change() {
1711        let mut world = World::new();
1712        world.insert_resource(SignalGraphState::default());
1713
1714        let a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1715        let b = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1716        let c = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1717        let d = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1718
1719        pipe_signal(&mut world, a, b);
1720        pipe_signal(&mut world, b, c);
1721        pipe_signal(&mut world, a, d);
1722
1723        world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1724            rebuild_levels(world, &mut state);
1725        });
1726
1727        pipe_signal(&mut world, c, d);
1728
1729        let incremental_levels = world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1730            update_edge_change_levels(world, &mut state);
1731            state.levels.clone()
1732        });
1733
1734        let mut expected = SignalGraphState::default();
1735        rebuild_levels(&mut world, &mut expected);
1736
1737        assert_eq!(incremental_levels, expected.levels);
1738    }
1739
1740    #[test]
1741    fn cleanup_updates_downstream_levels() {
1742        let mut world = World::new();
1743        world.insert_resource(SignalGraphState::default());
1744
1745        let a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1746        let b = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1747        let c = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1748
1749        pipe_signal(&mut world, a, b);
1750        pipe_signal(&mut world, b, c);
1751
1752        world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1753            rebuild_levels(world, &mut state);
1754        });
1755
1756        cleanup_recursive(&mut world, a);
1757
1758        let levels_after = world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1759            update_edge_change_levels(world, &mut state);
1760            state.levels.clone()
1761        });
1762
1763        assert_eq!(levels_after.get(&b), Some(&0));
1764        assert_eq!(levels_after.get(&c), Some(&1));
1765        assert!(!levels_after.contains_key(&a));
1766    }
1767
1768    #[test]
1769    fn schedule_tag_assigns_signal_to_schedule() {
1770        use bevy_app::Update;
1771
1772        let mut world = World::new();
1773        world.insert_resource(SignalGraphState::default());
1774
1775        let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1776
1777        // Tag the signal with Update schedule
1778        world.entity_mut(*signal_a).insert(ScheduleTag(Update.intern()));
1779
1780        world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1781            rebuild_levels(world, &mut state);
1782        });
1783
1784        let state = world.resource::<SignalGraphState>();
1785        let update_signals = state.by_schedule.get(&Update.intern());
1786        assert!(update_signals.is_some());
1787        assert!(update_signals.unwrap().iter().flatten().any(|s| *s == signal_a));
1788    }
1789
1790    #[test]
1791    fn schedule_hint_propagates_to_downstream_via_pipe() {
1792        use bevy_app::Update;
1793
1794        let mut world = World::new();
1795        world.insert_resource(SignalGraphState::default());
1796
1797        let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1798        let signal_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1799
1800        // Tag signal_a with Update schedule and set a hint for downstream
1801        world
1802            .entity_mut(*signal_a)
1803            .insert(ScheduleTag(Update.intern()))
1804            .insert(ScheduleHint(Update.intern()));
1805
1806        // Pipe a -> b (b should inherit the schedule from hint)
1807        pipe_signal(&mut world, signal_a, signal_b);
1808
1809        // signal_b should now have the SignalScheduleTag from the hint
1810        let tag = world.get::<ScheduleTag>(*signal_b);
1811        assert!(tag.is_some());
1812        assert_eq!(tag.unwrap().0, Update.intern());
1813    }
1814
1815    #[test]
1816    fn schedule_hint_does_not_override_existing_tag() {
1817        use bevy_app::{Last, Update};
1818
1819        let mut world = World::new();
1820        world.insert_resource(SignalGraphState::default());
1821
1822        let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1823        let signal_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1824
1825        // Tag signal_a with Update schedule and set a hint
1826        world
1827            .entity_mut(*signal_a)
1828            .insert(ScheduleTag(Update.intern()))
1829            .insert(ScheduleHint(Update.intern()));
1830
1831        // Tag signal_b with Last schedule (explicit tag)
1832        world.entity_mut(*signal_b).insert(ScheduleTag(Last.intern()));
1833
1834        // Pipe a -> b (b's explicit tag should NOT be overridden)
1835        pipe_signal(&mut world, signal_a, signal_b);
1836
1837        // signal_b should still have Last schedule
1838        let tag = world.get::<ScheduleTag>(*signal_b);
1839        assert!(tag.is_some());
1840        assert_eq!(tag.unwrap().0, Last.intern());
1841    }
1842
1843    #[test]
1844    fn schedule_propagates_to_all_upstreams_for_multi_upstream_signal() {
1845        use bevy_app::Update;
1846
1847        let mut world = World::new();
1848        let mut state = SignalGraphState::default();
1849        state.registered_schedules.insert(Update.intern());
1850        world.insert_resource(state);
1851
1852        // Create three independent root signals (simulating branches that will be combined)
1853        let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1854        let signal_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1855        let signal_c = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(3));
1856
1857        // Create a multi-upstream signal that depends on all three
1858        // (simulating what happens with signal::zip or signal::any!)
1859        let combined = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(100));
1860
1861        // Manually connect all three as upstreams of `combined`
1862        pipe_signal(&mut world, signal_a, combined);
1863        pipe_signal(&mut world, signal_b, combined);
1864        pipe_signal(&mut world, signal_c, combined);
1865
1866        // Verify the combined signal has all three upstreams
1867        let upstreams = world.get::<Upstream>(*combined).unwrap();
1868        assert_eq!(upstreams.iter().count(), 3);
1869
1870        // Now apply schedule to the combined signal - this should propagate to ALL upstreams
1871        apply_schedule_to_signal(&mut world, combined, Update.intern());
1872
1873        // Verify the combined signal itself has the schedule
1874        let combined_tag = world.get::<ScheduleTag>(*combined);
1875        assert!(combined_tag.is_some());
1876        assert_eq!(combined_tag.unwrap().0, Update.intern());
1877
1878        // Verify ALL upstream signals got the schedule tag
1879        let tag_a = world.get::<ScheduleTag>(*signal_a);
1880        assert!(tag_a.is_some(), "signal_a should have schedule tag");
1881        assert_eq!(tag_a.unwrap().0, Update.intern());
1882
1883        let tag_b = world.get::<ScheduleTag>(*signal_b);
1884        assert!(tag_b.is_some(), "signal_b should have schedule tag");
1885        assert_eq!(tag_b.unwrap().0, Update.intern());
1886
1887        let tag_c = world.get::<ScheduleTag>(*signal_c);
1888        assert!(tag_c.is_some(), "signal_c should have schedule tag");
1889        assert_eq!(tag_c.unwrap().0, Update.intern());
1890    }
1891
1892    #[test]
1893    fn schedule_propagates_to_deep_multi_upstream_graph() {
1894        use bevy_app::Update;
1895
1896        let mut world = World::new();
1897        let mut state = SignalGraphState::default();
1898        state.registered_schedules.insert(Update.intern());
1899        world.insert_resource(state);
1900
1901        // Create a deeper graph:
1902        //       root_a    root_b    root_c
1903        //          \       /           |
1904        //         mid_ab              mid_c
1905        //              \             /
1906        //               \           /
1907        //                  final
1908        let root_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1909        let root_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1910        let root_c = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(3));
1911
1912        let mid_ab = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(10));
1913        let mid_c = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(20));
1914
1915        let final_signal = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(100));
1916
1917        // Connect the graph
1918        pipe_signal(&mut world, root_a, mid_ab);
1919        pipe_signal(&mut world, root_b, mid_ab);
1920        pipe_signal(&mut world, root_c, mid_c);
1921        pipe_signal(&mut world, mid_ab, final_signal);
1922        pipe_signal(&mut world, mid_c, final_signal);
1923
1924        // Apply schedule only to the final signal
1925        apply_schedule_to_signal(&mut world, final_signal, Update.intern());
1926
1927        // ALL signals in the graph should now have the schedule tag
1928        for (name, signal) in [
1929            ("root_a", root_a),
1930            ("root_b", root_b),
1931            ("root_c", root_c),
1932            ("mid_ab", mid_ab),
1933            ("mid_c", mid_c),
1934            ("final", final_signal),
1935        ] {
1936            let tag = world.get::<ScheduleTag>(*signal);
1937            assert!(tag.is_some(), "{name} should have schedule tag");
1938            assert_eq!(tag.unwrap().0, Update.intern(), "{name} should have Update schedule");
1939        }
1940    }
1941
1942    #[test]
1943    fn by_schedule_is_partitioned_correctly() {
1944        use bevy_app::Update;
1945
1946        let mut world = World::new();
1947        world.insert_resource(SignalGraphState::default());
1948
1949        let signal_update = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1950        let signal_default = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1951
1952        // Tag one signal with Update
1953        world.entity_mut(*signal_update).insert(ScheduleTag(Update.intern()));
1954
1955        world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1956            rebuild_levels(world, &mut state);
1957        });
1958
1959        let state = world.resource::<SignalGraphState>();
1960
1961        // Check Update schedule has signal_update
1962        let update_signals = state.by_schedule.get(&Update.intern());
1963        assert!(update_signals.is_some());
1964        let update_flat: Vec<_> = update_signals.unwrap().iter().flatten().collect();
1965        assert!(update_flat.contains(&&signal_update));
1966        assert!(!update_flat.contains(&&signal_default));
1967
1968        // Check default schedule (PostUpdate) has signal_default
1969        let default_signals = state.by_schedule.get(&state.default_schedule);
1970        assert!(default_signals.is_some());
1971        let default_flat: Vec<_> = default_signals.unwrap().iter().flatten().collect();
1972        assert!(default_flat.contains(&&signal_default));
1973        assert!(!default_flat.contains(&&signal_update));
1974    }
1975
1976    #[test]
1977    fn process_for_schedule_only_runs_scheduled_signals() {
1978        use bevy_app::Update;
1979
1980        let mut world = World::new();
1981        world.insert_resource(SignalGraphState::default());
1982        world.insert_resource(Order::default());
1983
1984        let signal_update = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1985            order.0.push("update");
1986            Some(())
1987        });
1988        let _signal_default =
1989            spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1990                order.0.push("default");
1991                Some(())
1992            });
1993
1994        // Tag one signal with Update schedule
1995        world.entity_mut(*signal_update).insert(ScheduleTag(Update.intern()));
1996
1997        // Build levels first
1998        world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1999            rebuild_levels(world, &mut state);
2000        });
2001
2002        // Process only Update schedule
2003        let mut process_update = process_signal_graph_for_schedule(Update.intern());
2004        process_update(&mut world);
2005
2006        let order = world.resource::<Order>().0.clone();
2007        assert_eq!(order, vec!["update"]);
2008
2009        // Now process default schedule
2010        world.resource_mut::<Order>().0.clear();
2011        let default_schedule = world.resource::<SignalGraphState>().default_schedule;
2012        let mut process_default = process_signal_graph_for_schedule(default_schedule);
2013        process_default(&mut world);
2014
2015        let order = world.resource::<Order>().0.clone();
2016        assert_eq!(order, vec!["default"]);
2017    }
2018
2019    #[test]
2020    fn cross_schedule_data_flow_via_inputs() {
2021        use bevy_app::Update;
2022
2023        let mut world = World::new();
2024        world.insert_resource(SignalGraphState::default());
2025
2026        #[derive(Resource, Default)]
2027        struct CollectedValues(Vec<i32>);
2028        world.insert_resource(CollectedValues::default());
2029
2030        // signal_a runs in Update, outputs 42
2031        let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(42));
2032
2033        // signal_b runs in PostUpdate (default), collects input
2034        let signal_b = spawn_signal::<i32, (), Option<()>, _, _>(
2035            &mut world,
2036            |In(value): In<i32>, mut collected: ResMut<CollectedValues>| {
2037                collected.0.push(value);
2038                Some(())
2039            },
2040        );
2041
2042        // Tag signal_a with Update
2043        world.entity_mut(*signal_a).insert(ScheduleTag(Update.intern()));
2044
2045        // Pipe a -> b (cross-schedule dependency)
2046        pipe_signal(&mut world, signal_a, signal_b);
2047
2048        // Build levels
2049        world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
2050            rebuild_levels(world, &mut state);
2051        });
2052
2053        // Process Update schedule first (signal_a runs, stores output in inputs)
2054        let mut process_update = process_signal_graph_for_schedule(Update.intern());
2055        process_update(&mut world);
2056
2057        // signal_b hasn't run yet
2058        assert!(world.resource::<CollectedValues>().0.is_empty());
2059
2060        // Process PostUpdate schedule (signal_b runs, gets input from signal_a)
2061        let default_schedule = world.resource::<SignalGraphState>().default_schedule;
2062        let mut process_default = process_signal_graph_for_schedule(default_schedule);
2063        process_default(&mut world);
2064
2065        // signal_b should have received 42 from signal_a
2066        assert_eq!(world.resource::<CollectedValues>().0, vec![42]);
2067    }
2068
2069    #[test]
2070    fn clear_signal_inputs_clears_inputs() {
2071        let mut world = World::new();
2072        world.insert_resource(SignalGraphState::default());
2073
2074        let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
2075
2076        // Manually insert some inputs into the signal's buffer component
2077        world
2078            .get_mut::<SignalInputBuffer>(*signal_a)
2079            .unwrap()
2080            .push(Box::new(42i32) as Box<dyn AnyClone>);
2081
2082        assert!(!world.get::<SignalInputBuffer>(*signal_a).unwrap().0.is_empty());
2083
2084        // Clear inputs by clearing the component
2085        world.get_mut::<SignalInputBuffer>(*signal_a).unwrap().clear();
2086
2087        assert!(world.get::<SignalInputBuffer>(*signal_a).unwrap().0.is_empty());
2088    }
2089
2090    #[test]
2091    fn signals_registered_during_processing_are_processed_same_frame() {
2092        use bevy_app::Update;
2093
2094        // This test verifies the fixpoint loop behavior:
2095        // When a signal spawns new elements that register new signals during processing,
2096        // those new signals should be processed in the same frame.
2097
2098        #[derive(Resource, Default)]
2099        struct ProcessOrder(Vec<&'static str>);
2100
2101        #[derive(Resource)]
2102        struct ChildSignalHandle(Option<SignalSystem>);
2103
2104        let mut world = World::new();
2105        world.insert_resource(SignalGraphState::new(Update.intern()));
2106        world.insert_resource(ProcessOrder::default());
2107        world.insert_resource(ChildSignalHandle(None));
2108
2109        // Create a "parent" signal that, when processed, registers a new "child" signal
2110        let parent_signal = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), world: &mut World| {
2111            world.resource_mut::<ProcessOrder>().0.push("parent");
2112
2113            // Check if child already exists (to avoid infinite loop)
2114            if world.resource::<ChildSignalHandle>().0.is_none() {
2115                // Register a new child signal during parent's processing
2116                let child_signal =
2117                    spawn_signal::<(), (), Option<()>, _, _>(world, |In(_), mut order: ResMut<ProcessOrder>| {
2118                        order.0.push("child");
2119                        Some(())
2120                    });
2121                // Tag child with same schedule as parent
2122                world.entity_mut(*child_signal).insert(ScheduleTag(Update.intern()));
2123                world.resource_mut::<ChildSignalHandle>().0 = Some(child_signal);
2124            }
2125            Some(())
2126        });
2127
2128        // Tag parent signal
2129        world.entity_mut(*parent_signal).insert(ScheduleTag(Update.intern()));
2130
2131        // Process signals - the fixpoint loop should process both parent and child
2132        let mut process_system = process_signal_graph_for_schedule(Update.intern());
2133        process_system(&mut world);
2134
2135        // Both parent and child should have been processed in the same frame
2136        let order = &world.resource::<ProcessOrder>().0;
2137        assert!(order.contains(&"parent"), "Parent signal should have been processed");
2138        assert!(
2139            order.contains(&"child"),
2140            "Child signal registered during processing should also be processed"
2141        );
2142    }
2143
2144    #[test]
2145    fn fixpoint_loop_handles_multiple_levels_of_spawning() {
2146        use bevy_app::Update;
2147
2148        // Test that the fixpoint loop can handle chains: A spawns B, B spawns C
2149
2150        #[derive(Resource, Default)]
2151        struct ProcessOrder(Vec<&'static str>);
2152
2153        #[derive(Resource, Default)]
2154        struct SpawnedSignals(Vec<SignalSystem>);
2155
2156        let mut world = World::new();
2157        world.insert_resource(SignalGraphState::new(Update.intern()));
2158        world.insert_resource(ProcessOrder::default());
2159        world.insert_resource(SpawnedSignals::default());
2160
2161        // Signal A: spawns signal B
2162        let signal_a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), world: &mut World| {
2163            world.resource_mut::<ProcessOrder>().0.push("A");
2164
2165            let spawned = &world.resource::<SpawnedSignals>().0;
2166            if spawned.is_empty() {
2167                // Spawn B
2168                let signal_b = spawn_signal::<(), (), Option<()>, _, _>(world, |In(_), world: &mut World| {
2169                    world.resource_mut::<ProcessOrder>().0.push("B");
2170
2171                    let spawned = &world.resource::<SpawnedSignals>().0;
2172                    if spawned.len() == 1 {
2173                        // B spawns C
2174                        let signal_c = spawn_signal::<(), (), Option<()>, _, _>(
2175                            world,
2176                            |In(_), mut order: ResMut<ProcessOrder>| {
2177                                order.0.push("C");
2178                                Some(())
2179                            },
2180                        );
2181                        world.entity_mut(*signal_c).insert(ScheduleTag(Update.intern()));
2182                        world.resource_mut::<SpawnedSignals>().0.push(signal_c);
2183                    }
2184                    Some(())
2185                });
2186                world.entity_mut(*signal_b).insert(ScheduleTag(Update.intern()));
2187                world.resource_mut::<SpawnedSignals>().0.push(signal_b);
2188            }
2189            Some(())
2190        });
2191
2192        world.entity_mut(*signal_a).insert(ScheduleTag(Update.intern()));
2193
2194        // Process signals
2195        let mut process_system = process_signal_graph_for_schedule(Update.intern());
2196        process_system(&mut world);
2197
2198        // All three should have been processed
2199        let order = &world.resource::<ProcessOrder>().0;
2200        assert!(order.contains(&"A"), "Signal A should have been processed");
2201        assert!(
2202            order.contains(&"B"),
2203            "Signal B (spawned by A) should have been processed"
2204        );
2205        assert!(
2206            order.contains(&"C"),
2207            "Signal C (spawned by B) should have been processed"
2208        );
2209    }
2210
2211    #[test]
2212    #[should_panic(expected = "has not been registered with `JonmoPlugin`")]
2213    fn apply_schedule_to_signal_panics_on_unregistered_schedule() {
2214        use bevy_app::Update;
2215
2216        let mut world = World::new();
2217        world.insert_resource(SignalGraphState::default());
2218
2219        let signal = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
2220
2221        // Update was not registered, this should panic
2222        apply_schedule_to_signal(&mut world, signal, Update.intern());
2223    }
2224}