1use alloc::collections::VecDeque;
64use bevy_app::PostUpdate;
65use bevy_derive::Deref;
66use bevy_ecs::{
67 lifecycle::HookContext,
68 prelude::*,
69 query::{QueryData, QueryFilter},
70 schedule::{InternedScheduleLabel, ScheduleLabel},
71 system::{SystemId, SystemState},
72 world::DeferredWorld,
73};
74use bevy_platform::{
75 collections::{HashMap, HashSet},
76 prelude::*,
77 sync::{
78 Arc, LazyLock, Mutex, RwLock,
79 atomic::{AtomicUsize, Ordering},
80 },
81};
82use core::{any::Any, hash::Hash, marker::PhantomData};
83use dyn_clone::{DynClone, clone_trait_object};
84
85#[derive(Clone, Copy, Deref, Debug, PartialEq, Eq, Hash)]
87pub struct SignalSystem(pub Entity);
88
89impl From<Entity> for SignalSystem {
90 fn from(entity: Entity) -> Self {
91 Self(entity)
92 }
93}
94
95impl<I: 'static, O> From<SystemId<In<I>, O>> for SignalSystem {
96 fn from(system_id: SystemId<In<I>, O>) -> Self {
97 system_id.entity().into()
98 }
99}
100
101#[derive(Component, Deref)]
102pub(crate) struct SignalRegistrationCount(i32);
103
104impl SignalRegistrationCount {
105 fn new() -> Self {
106 Self(1)
107 }
108
109 fn increment(&mut self) {
110 self.0 += 1
111 }
112
113 fn decrement(&mut self) {
114 self.0 -= 1
115 }
116}
117
118#[derive(Component, Clone, Copy)]
123pub(crate) struct ScheduleTag(pub(crate) InternedScheduleLabel);
124
125#[derive(Component, Clone, Copy)]
131pub(crate) struct ScheduleHint(pub(crate) InternedScheduleLabel);
132
133pub(crate) fn apply_schedule_to_signal(world: &mut World, signal: SignalSystem, schedule: InternedScheduleLabel) {
140 let is_registered = world
141 .resource::<SignalGraphState>()
142 .registered_schedules
143 .contains(&schedule);
144 if !is_registered {
145 panic!(
146 "schedule `{schedule:?}` has not been registered with `JonmoPlugin`; \
147 add `.with_schedule::<{schedule:?}>()` to your `JonmoPlugin` configuration"
148 );
149 }
150 world.entity_mut(*signal).insert(ScheduleTag(schedule));
152 tag_unscheduled_upstreams(world, signal, schedule);
154 world.entity_mut(*signal).insert(ScheduleHint(schedule));
156}
157
158pub(crate) fn tag_unscheduled_upstreams(world: &mut World, start: SignalSystem, schedule: InternedScheduleLabel) {
164 let mut stack = vec![start];
165 let mut visited = HashSet::new();
166 while let Some(current) = stack.pop() {
167 if !visited.insert(current) {
168 continue;
169 }
170 if world.get::<ScheduleTag>(*current).is_none() {
172 world.entity_mut(*current).insert(ScheduleTag(schedule));
173 }
174 if let Some(upstream) = world.get::<Upstream>(*current) {
176 for &up in upstream.iter() {
177 if !visited.contains(&up) {
178 stack.push(up);
179 }
180 }
181 }
182 }
183}
184
185pub(crate) fn register_signal<I, O, IOO, F, M>(world: &mut World, system: F) -> SignalSystem
186where
187 I: 'static,
188 O: Clone + Send + Sync + 'static,
189 IOO: Into<Option<O>> + 'static,
190 F: IntoSystem<In<I>, IOO, M> + Send + Sync + 'static,
191{
192 lazy_signal_from_system(system).register(world)
193}
194
195#[derive(Component, Deref, Clone)]
197pub(crate) struct Upstream(pub(crate) HashSet<SignalSystem>);
198
199impl<'a> IntoIterator for &'a Upstream {
200 type Item = <Self::IntoIter as Iterator>::Item;
201 type IntoIter = bevy_platform::collections::hash_set::Iter<'a, SignalSystem>;
202
203 #[inline(always)]
204 fn into_iter(self) -> Self::IntoIter {
205 self.0.iter()
206 }
207}
208
209#[derive(Component, Deref, Clone)]
211pub(crate) struct Downstream(HashSet<SignalSystem>);
212
213impl<'a> IntoIterator for &'a Downstream {
214 type Item = <Self::IntoIter as Iterator>::Item;
215 type IntoIter = bevy_platform::collections::hash_set::Iter<'a, SignalSystem>;
216
217 #[inline(always)]
218 fn into_iter(self) -> Self::IntoIter {
219 self.0.iter()
220 }
221}
222
223fn would_create_cycle(world: &World, source: SignalSystem, target: SignalSystem) -> bool {
225 if source == target {
226 return true;
227 }
228 let mut stack = vec![target];
229 let mut visited = HashSet::new();
230 while let Some(node) = stack.pop() {
231 if node == source {
232 return true;
233 }
234 if visited.insert(node)
235 && let Some(down) = world.get::<Downstream>(*node)
236 {
237 stack.extend(down.iter().copied());
238 }
239 }
240 false
241}
242
243pub(crate) fn pipe_signal(world: &mut World, source: SignalSystem, target: SignalSystem) {
244 if would_create_cycle(world, source, target) {
245 #[cfg(feature = "tracing")]
247 bevy_log::error!("cycle detected when attempting to pipe {:?} → {:?}", source, target);
248 return;
249 }
250 let mut upstream = world.entity_mut(*source);
251 if let Some(mut downstream) = upstream.get_mut::<Downstream>() {
252 downstream.0.insert(target);
253 } else {
254 upstream.insert(Downstream(HashSet::from([target])));
255 }
256 let mut downstream = world.entity_mut(*target);
257 if let Some(mut upstream) = downstream.get_mut::<Upstream>() {
258 upstream.0.insert(source);
259 } else {
260 downstream.insert(Upstream(HashSet::from([source])));
261 }
262
263 if world.get::<ScheduleTag>(*target).is_none()
265 && let Some(hint) = world.get::<ScheduleHint>(*source).copied()
266 {
267 world
268 .entity_mut(*target)
269 .insert(ScheduleTag(hint.0))
270 .insert(ScheduleHint(hint.0)); }
272
273 world
274 .resource_mut::<SignalGraphState>()
275 .edge_change_seeds
276 .insert(target);
277}
278
279#[derive(Component, Clone)]
280struct SystemRunner {
281 #[allow(clippy::type_complexity)]
282 runner: Arc<Box<dyn Fn(&mut World, Box<dyn Any>) -> Option<Box<dyn AnyClone>> + Send + Sync>>,
283}
284
285trait Runnable: Send + Sync {
286 fn run(&self, w: &mut World, i: Box<dyn Any>) -> Option<Box<dyn AnyClone>>;
287}
288
289struct SystemHolder<I, O, S>
290where
291 I: 'static,
292 O: 'static,
293 S: Into<Option<O>>,
294{
295 system: SystemId<In<I>, S>,
296 _marker: PhantomData<fn() -> O>,
297}
298
299impl<I, O, S> Runnable for SystemHolder<I, O, S>
300where
301 I: 'static,
302 O: Clone + Send + Sync,
303 S: Into<Option<O>> + 'static,
304{
305 fn run(&self, world: &mut World, input: Box<dyn Any>) -> Option<Box<dyn AnyClone>> {
306 match input.downcast::<I>() {
307 Ok(bx) => world
308 .run_system_with(self.system, *bx)
309 .ok()
310 .and_then(Into::into)
311 .map(|o| Box::new(o) as Box<dyn AnyClone>),
312 Err(_) => {
313 cfg_if::cfg_if! {
314 if #[cfg(feature = "tracing")] {
315 let expected_type = core::any::type_name::<I>();
316 bevy_log::error!(
317 "failed to downcast input for system {:?}. expected input type: `{}`",
318 self.system, expected_type
319 );
320 }
321 }
322 None
323 }
324 }
325 }
326}
327
328impl SystemRunner {
329 fn run(&self, world: &mut World, input: Box<dyn Any>) -> Option<Box<dyn AnyClone>> {
330 (self.runner)(world, input)
331 }
332}
333
334pub trait AnyClone: Any + DynClone + Send + Sync {}
336
337clone_trait_object!(AnyClone);
338
339impl<T: Clone + Send + Sync + 'static> AnyClone for T {}
340
341#[derive(Component, Default)]
346pub(crate) struct SignalInputBuffer(pub(crate) Vec<Box<dyn AnyClone>>);
347
348impl SignalInputBuffer {
349 fn take(&mut self) -> Vec<Box<dyn AnyClone>> {
351 core::mem::take(&mut self.0)
352 }
353
354 fn push(&mut self, value: Box<dyn AnyClone>) {
356 self.0.push(value);
357 }
358
359 fn clear(&mut self) {
361 self.0.clear();
362 }
363}
364
365#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
371pub enum RecursionLimitBehavior {
372 #[default]
375 Panic,
376 Warn,
379 Silent,
382}
383
384#[derive(Resource)]
386pub(crate) struct SignalGraphState {
387 levels: HashMap<SignalSystem, u32>,
389 by_schedule: HashMap<InternedScheduleLabel, Vec<Vec<SignalSystem>>>,
392 signal_schedules: HashMap<SignalSystem, InternedScheduleLabel>,
394 edge_change_seeds: HashSet<SignalSystem>,
396 deferred_removals: HashSet<SignalSystem>,
398 is_processing: bool,
400 default_schedule: InternedScheduleLabel,
402 registration_recursion_limit: usize,
404 on_recursion_limit_exceeded: RecursionLimitBehavior,
406 registered_schedules: HashSet<InternedScheduleLabel>,
408}
409
410pub const DEFAULT_REGISTRATION_RECURSION_LIMIT: usize = 100;
412
413impl Default for SignalGraphState {
414 fn default() -> Self {
415 Self::with_options(
416 PostUpdate.intern(),
417 DEFAULT_REGISTRATION_RECURSION_LIMIT,
418 RecursionLimitBehavior::default(),
419 )
420 }
421}
422
423impl SignalGraphState {
424 #[allow(unused)] pub(crate) fn new(default_schedule: InternedScheduleLabel) -> Self {
427 Self::with_options(
428 default_schedule,
429 DEFAULT_REGISTRATION_RECURSION_LIMIT,
430 RecursionLimitBehavior::default(),
431 )
432 }
433
434 pub(crate) fn with_options(
436 default_schedule: InternedScheduleLabel,
437 registration_recursion_limit: usize,
438 on_recursion_limit_exceeded: RecursionLimitBehavior,
439 ) -> Self {
440 Self {
441 levels: HashMap::default(),
442 by_schedule: HashMap::default(),
443 signal_schedules: HashMap::default(),
444 edge_change_seeds: HashSet::default(),
445 deferred_removals: HashSet::default(),
446 is_processing: false,
447 default_schedule,
448 registration_recursion_limit,
449 on_recursion_limit_exceeded,
450 registered_schedules: HashSet::from([default_schedule]),
451 }
452 }
453
454 pub(crate) fn register_schedule(&mut self, schedule: InternedScheduleLabel) {
456 self.registered_schedules.insert(schedule);
457 }
458}
459
460fn get_upstreams(world: &World, signal: SignalSystem) -> Vec<SignalSystem> {
461 world
462 .get::<Upstream>(*signal)
463 .map(|u| u.0.iter().copied().collect())
464 .unwrap_or_default()
465}
466
467fn get_downstreams(world: &World, signal: SignalSystem) -> Vec<SignalSystem> {
468 world
469 .get::<Downstream>(*signal)
470 .map(|d| d.0.iter().copied().collect())
471 .unwrap_or_default()
472}
473
474fn insert_sorted_by_index(bucket: &mut Vec<SignalSystem>, signal: SignalSystem) {
475 if bucket.contains(&signal) {
476 return;
477 }
478 let key = signal.index();
479 let index = bucket.binary_search_by_key(&key, |s| s.index()).unwrap_or_else(|i| i);
480 bucket.insert(index, signal);
481}
482
483struct LevelComputeResult {
485 levels: HashMap<SignalSystem, u32>,
487 processed: usize,
489 total: usize,
491}
492
493impl LevelComputeResult {
494 fn is_complete(&self) -> bool {
495 self.processed == self.total
496 }
497}
498
499fn compute_signal_levels(
506 world: &World,
507 signals: &HashSet<SignalSystem>,
508 upstream_filter: impl Fn(SignalSystem) -> bool,
509 external_level: impl Fn(SignalSystem) -> Option<u32>,
510) -> LevelComputeResult {
511 if signals.is_empty() {
512 return LevelComputeResult {
513 levels: HashMap::new(),
514 processed: 0,
515 total: 0,
516 };
517 }
518
519 let mut in_degree: HashMap<SignalSystem, usize> = HashMap::new();
520 let mut upstreams_map: HashMap<SignalSystem, Vec<SignalSystem>> = HashMap::new();
521 let mut downstreams_map: HashMap<SignalSystem, Vec<SignalSystem>> = HashMap::new();
522
523 for &signal in signals {
524 let upstreams = get_upstreams(world, signal);
525 let local_in_degree = upstreams.iter().filter(|u| upstream_filter(**u)).count();
526 in_degree.insert(signal, local_in_degree);
527 for &upstream in upstreams.iter().filter(|u| signals.contains(*u)) {
530 downstreams_map.entry(upstream).or_default().push(signal);
531 }
532 upstreams_map.insert(signal, upstreams);
533 }
534
535 let mut queue: VecDeque<SignalSystem> = in_degree.iter().filter_map(|(s, d)| (*d == 0).then_some(*s)).collect();
536
537 let mut levels: HashMap<SignalSystem, u32> = HashMap::new();
538 let mut processed = 0usize;
539
540 while let Some(signal) = queue.pop_front() {
541 processed += 1;
542 let upstreams = upstreams_map.get(&signal).cloned().unwrap_or_default();
543 let level = upstreams
544 .iter()
545 .filter_map(|u| levels.get(u).copied().or_else(|| external_level(*u)))
546 .max()
547 .map(|m| m + 1)
548 .unwrap_or(0);
549
550 levels.insert(signal, level);
551
552 if let Some(downstreams) = downstreams_map.get(&signal) {
555 for &downstream in downstreams {
556 if let Some(count) = in_degree.get_mut(&downstream) {
557 *count = count.saturating_sub(1);
558 if *count == 0 {
559 queue.push_back(downstream);
560 }
561 }
562 }
563 }
564 }
565
566 LevelComputeResult {
567 levels,
568 processed,
569 total: signals.len(),
570 }
571}
572
573fn bucket_levels_sorted(levels: &HashMap<SignalSystem, u32>) -> Vec<Vec<SignalSystem>> {
574 let mut by_level: Vec<Vec<SignalSystem>> = Vec::new();
575 for (&signal, &level) in levels {
576 while by_level.len() <= level as usize {
577 by_level.push(Vec::new());
578 }
579 by_level[level as usize].push(signal);
580 }
581 for bucket in &mut by_level {
582 bucket.sort_by_key(|s| s.index());
583 }
584 by_level
585}
586
587fn downstream_levels_from_seeds(world: &World, seeds: &[SignalSystem]) -> Vec<Vec<SignalSystem>> {
590 let state = world.resource::<SignalGraphState>();
591
592 let mut reachable: HashSet<SignalSystem> = HashSet::new();
594 let mut queue: VecDeque<SignalSystem> = seeds.iter().copied().collect();
595 while let Some(signal) = queue.pop_front() {
596 if reachable.insert(signal) {
597 queue.extend(get_downstreams(world, signal));
598 }
599 }
600
601 let all_cached = reachable.iter().all(|s| state.levels.contains_key(s));
603
604 if all_cached {
605 let mut by_level: Vec<Vec<SignalSystem>> = Vec::new();
607 for signal in reachable {
608 let level = state.levels.get(&signal).copied().unwrap_or(0) as usize;
609 while by_level.len() <= level {
610 by_level.push(Vec::new());
611 }
612 by_level[level].push(signal);
613 }
614 for bucket in &mut by_level {
615 bucket.sort_by_key(|s| s.index());
616 }
617 by_level
618 } else {
619 let result = compute_signal_levels(
622 world,
623 &reachable,
624 |u| reachable.contains(&u), |u| state.levels.get(&u).copied(), );
627 bucket_levels_sorted(&result.levels)
628 }
629}
630
631fn rebuild_levels(world: &mut World, state: &mut SignalGraphState) {
639 state.levels.clear();
640 state.by_schedule.clear();
641 state.signal_schedules.clear();
642
643 let mut all_signals_state = SystemState::<Query<Entity, With<SystemRunner>>>::new(world);
644 let all_signals: HashSet<SignalSystem> = all_signals_state.get(world).iter().map(SignalSystem).collect();
645
646 let result = compute_signal_levels(
647 world,
648 &all_signals,
649 |u| all_signals.contains(&u), |_| None, );
652
653 if !result.is_complete() {
654 panic!("signal graph contains a cycle or inconsistent edges during level rebuild");
655 }
656
657 state.levels = result.levels;
658
659 for (&signal, &level) in &state.levels {
661 let schedule = world
662 .get::<ScheduleTag>(*signal)
663 .map(|tag| tag.0)
664 .unwrap_or(state.default_schedule);
665
666 state.signal_schedules.insert(signal, schedule);
668
669 let schedule_levels = state.by_schedule.entry(schedule).or_default();
670 while schedule_levels.len() <= level as usize {
671 schedule_levels.push(Vec::new());
672 }
673 insert_sorted_by_index(&mut schedule_levels[level as usize], signal);
674 }
675}
676
677fn remove_signal_from_buckets(state: &mut SignalGraphState, signal: SignalSystem, old_level: u32) {
679 if let Some(&schedule) = state.signal_schedules.get(&signal)
681 && let Some(schedule_levels) = state.by_schedule.get_mut(&schedule)
682 && let Some(bucket) = schedule_levels.get_mut(old_level as usize)
683 && let Some(pos) = bucket.iter().position(|s| *s == signal)
684 {
685 bucket.remove(pos);
686 }
687}
688
689fn insert_signal_into_buckets(world: &World, state: &mut SignalGraphState, signal: SignalSystem, new_level: u32) {
691 let schedule = world
693 .get::<ScheduleTag>(*signal)
694 .map(|tag| tag.0)
695 .unwrap_or(state.default_schedule);
696
697 state.signal_schedules.insert(signal, schedule);
699
700 let schedule_levels = state.by_schedule.entry(schedule).or_default();
701 while schedule_levels.len() <= new_level as usize {
702 schedule_levels.push(Vec::new());
703 }
704 insert_sorted_by_index(&mut schedule_levels[new_level as usize], signal);
705}
706
707fn update_levels_incremental(world: &mut World, state: &mut SignalGraphState, seeds: &[SignalSystem]) -> bool {
708 let mut affected: HashSet<SignalSystem> = HashSet::new();
710 let mut queue: VecDeque<SignalSystem> = seeds.iter().copied().collect();
711
712 while let Some(signal) = queue.pop_front() {
713 if affected.insert(signal) {
714 for downstream in get_downstreams(world, signal) {
715 queue.push_back(downstream);
716 }
717 }
718 }
719 if affected.is_empty() {
720 return true;
721 }
722
723 let result = compute_signal_levels(
725 world,
726 &affected,
727 |u| affected.contains(&u), |u| state.levels.get(&u).copied(), );
730
731 if !result.is_complete() {
732 return false;
733 }
734
735 for &signal in &affected {
738 let old_level = state.levels.get(&signal).copied();
739 let new_level = result.levels.get(&signal).copied();
740
741 if old_level == new_level {
743 continue;
744 }
745
746 if let Some(old) = old_level {
748 remove_signal_from_buckets(state, signal, old);
749 }
750
751 if let Some(new_level) = new_level {
753 state.levels.insert(signal, new_level);
754 insert_signal_into_buckets(world, state, signal, new_level);
755 } else {
756 state.levels.remove(&signal);
757 state.signal_schedules.remove(&signal);
758 }
759 }
760
761 true
762}
763
764fn update_edge_change_levels(world: &mut World, state: &mut SignalGraphState) -> Vec<SignalSystem> {
769 if state.levels.is_empty() {
770 rebuild_levels(world, state);
771 return state.edge_change_seeds.drain().collect();
772 }
773
774 if state.edge_change_seeds.is_empty() {
775 return Vec::new();
776 }
777
778 let seeds: Vec<SignalSystem> = state.edge_change_seeds.drain().collect();
780
781 if !update_levels_incremental(world, state, &seeds) {
782 panic!("signal graph contains a cycle or inconsistent edges during incremental update");
783 }
784
785 seeds
786}
787
788fn remove_signal_from_graph_state_internal(state: &mut SignalGraphState, signal: SignalSystem) {
789 if let Some(level) = state.levels.remove(&signal) {
790 remove_signal_from_buckets(state, signal, level);
791 }
792 state.signal_schedules.remove(&signal);
793 state.edge_change_seeds.remove(&signal);
794}
795
796fn remove_signal_from_graph_state(world: &mut World, signal: SignalSystem) {
797 if let Some(mut state) = world.get_resource_mut::<SignalGraphState>() {
798 if state.is_processing {
799 state.deferred_removals.insert(signal);
800 } else {
801 remove_signal_from_graph_state_internal(&mut state, signal);
802 }
803 }
804}
805
806fn apply_deferred_removals(state: &mut SignalGraphState) {
807 if state.deferred_removals.is_empty() {
808 return;
809 }
810 let removals = state.deferred_removals.drain().collect::<Vec<_>>();
811 for signal in removals {
812 remove_signal_from_graph_state_internal(state, signal);
813 }
814}
815
816fn run_signal_node(world: &mut World, signal: SignalSystem) {
822 let (runner, signal_inputs, upstreams) = {
824 let runner = match world.get::<SystemRunner>(*signal).cloned() {
825 Some(runner) => runner,
826 None => {
827 if world.get_entity(*signal).is_err() {
828 return;
830 }
831 let upstreams = get_upstreams(world, signal);
832 let downstreams = get_downstreams(world, signal);
833 panic!(
834 "missing SystemRunner for signal {:?} during processing (entity exists). upstreams={:?}, downstreams={:?}",
835 signal, upstreams, downstreams
836 );
837 }
838 };
839
840 let signal_inputs = world
842 .get_mut::<SignalInputBuffer>(*signal)
843 .map(|mut buffer| buffer.take())
844 .unwrap_or_default();
845
846 let upstreams = get_upstreams(world, signal);
847
848 (runner, signal_inputs, upstreams)
849 };
850
851 let final_output = if upstreams.is_empty() {
853 runner.run(world, Box::new(()))
855 } else if !signal_inputs.is_empty() {
856 let mut output = None;
860 for input in signal_inputs {
861 if let Some(o) = runner.run(world, input) {
862 output = Some(o);
863 }
864 }
865 output
866 } else {
867 None
868 };
869
870 if let Some(output) = final_output {
872 let downstreams = get_downstreams(world, signal);
873 if let Some((last, rest)) = downstreams.split_last() {
874 for downstream in rest {
876 if let Ok(mut entity) = world.get_entity_mut(**downstream)
877 && let Some(mut buffer) = entity.get_mut::<SignalInputBuffer>()
878 {
879 buffer.push(output.clone());
880 }
881 }
882 if let Ok(mut entity) = world.get_entity_mut(**last)
884 && let Some(mut buffer) = entity.get_mut::<SignalInputBuffer>()
885 {
886 buffer.push(output);
887 }
888 }
889 }
890}
891
892pub(crate) fn trigger_signal_subgraph(
893 world: &mut World,
894 signals: impl AsRef<[SignalSystem]>,
895 input: Box<dyn AnyClone>,
896) {
897 let signals = signals.as_ref();
898 if signals.is_empty() {
899 return;
900 }
901
902 if let Some((last, rest)) = signals.split_last() {
904 for signal in rest {
905 if let Ok(mut entity) = world.get_entity_mut(**signal)
906 && let Some(mut buffer) = entity.get_mut::<SignalInputBuffer>()
907 {
908 buffer.push(input.clone());
909 }
910 }
911 if let Ok(mut entity) = world.get_entity_mut(**last)
913 && let Some(mut buffer) = entity.get_mut::<SignalInputBuffer>()
914 {
915 buffer.push(input);
916 }
917 }
918
919 let by_level = downstream_levels_from_seeds(world, signals);
921 for level in by_level {
922 for signal in level {
923 run_signal_node(world, signal);
924 }
925 }
926}
927
928pub(crate) fn process_signal_graph_for_schedule(schedule: InternedScheduleLabel) -> impl FnMut(&mut World) {
933 move |world: &mut World| {
934 let levels_for_schedule: Vec<Vec<SignalSystem>> = {
936 world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
937 let _ = update_edge_change_levels(world, &mut state);
939 state.is_processing = true;
940
941 state.by_schedule.remove(&schedule).unwrap_or_default()
943 })
944 };
945
946 let mut processed: HashSet<SignalSystem> = HashSet::default();
948
949 for level in &levels_for_schedule {
955 for &signal in level {
956 processed.insert(signal);
957 run_signal_node(world, signal);
958 }
959 }
960
961 world
963 .resource_mut::<SignalGraphState>()
964 .by_schedule
965 .insert(schedule, levels_for_schedule);
966
967 let (recursion_limit, limit_behavior) = {
970 let state = world.resource::<SignalGraphState>();
971 (state.registration_recursion_limit, state.on_recursion_limit_exceeded)
972 };
973 let mut recursion_pass = 0usize;
974
975 loop {
976 recursion_pass += 1;
977
978 if recursion_pass > recursion_limit {
979 match limit_behavior {
980 RecursionLimitBehavior::Panic => {
981 panic!(
982 "Signal registration recursion limit exceeded ({} passes) in schedule {:?}. \
983 This usually indicates an infinite loop where signals keep spawning new signals. \
984 Processed {} signals before limit was reached. \
985 Use `JonmoPlugin::on_recursion_limit_exceeded` to change this behavior.",
986 recursion_limit,
987 schedule,
988 processed.len()
989 );
990 }
991 RecursionLimitBehavior::Warn => {
992 #[cfg(feature = "tracing")]
993 bevy_log::warn!(
994 "Signal registration recursion limit exceeded ({} passes) in schedule {:?}. \
995 This may indicate an infinite loop where signals keep spawning new signals. \
996 Processed {} signals so far.",
997 recursion_limit,
998 schedule,
999 processed.len()
1000 );
1001 break;
1002 }
1003 RecursionLimitBehavior::Silent => {
1004 break;
1005 }
1006 }
1007 }
1008
1009 let new_seeds: Vec<SignalSystem> = world
1011 .resource_scope(|world, mut state: Mut<SignalGraphState>| update_edge_change_levels(world, &mut state));
1012
1013 if new_seeds.is_empty() {
1014 break;
1015 }
1016
1017 let new_signals: Vec<SignalSystem> = new_seeds
1019 .into_iter()
1020 .filter(|s| {
1021 !processed.contains(s)
1022 && world
1023 .get::<ScheduleTag>(**s)
1024 .map(|tag| tag.0 == schedule)
1025 .unwrap_or(false)
1026 })
1027 .collect();
1028
1029 if new_signals.is_empty() {
1030 break;
1031 }
1032
1033 let new_levels = downstream_levels_from_seeds(world, &new_signals);
1035 for level in new_levels {
1036 for signal in level {
1037 if processed.insert(signal) {
1038 if world
1040 .get::<ScheduleTag>(*signal)
1041 .map(|tag| tag.0 == schedule)
1042 .unwrap_or(false)
1043 {
1044 run_signal_node(world, signal);
1045 }
1046 }
1047 }
1048 }
1049 }
1050
1051 let mut state = world.resource_mut::<SignalGraphState>();
1053 state.is_processing = false;
1054 apply_deferred_removals(&mut state);
1055 }
1056}
1057
1058pub(crate) fn clear_signal_inputs(mut buffers: Query<&mut SignalInputBuffer>) {
1065 for mut buffer in &mut buffers {
1066 buffer.clear();
1067 }
1068}
1069
1070#[derive(Clone, Deref, Debug)]
1081pub struct SignalHandle(pub SignalSystem);
1082
1083impl From<SignalSystem> for SignalHandle {
1084 fn from(signal: SignalSystem) -> Self {
1085 Self(signal)
1086 }
1087}
1088
1089impl SignalHandle {
1090 #[allow(missing_docs)]
1091 pub(crate) fn new(signal: SignalSystem) -> Self {
1092 Self(signal)
1093 }
1094
1095 pub fn cleanup(self, world: &mut World) {
1098 cleanup_recursive(world, *self);
1099 }
1100}
1101
1102fn cleanup_signal_handles(mut world: DeferredWorld, HookContext { entity, .. }: HookContext) {
1103 let handles: Vec<_> = world
1104 .entity_mut(entity)
1105 .get_mut::<SignalHandles>()
1106 .unwrap()
1107 .0
1108 .drain(..)
1109 .collect();
1110 let mut commands = world.commands();
1111 for handle in handles {
1112 commands.queue(|world: &mut World| handle.cleanup(world));
1113 }
1114}
1115
1116#[derive(Component, Default, Deref, Clone)]
1119#[component(on_remove = cleanup_signal_handles)]
1120pub struct SignalHandles(Vec<SignalHandle>);
1121
1122impl<T> From<T> for SignalHandles
1123where
1124 Vec<SignalHandle>: From<T>,
1125{
1126 #[inline]
1127 fn from(values: T) -> Self {
1128 SignalHandles(values.into())
1129 }
1130}
1131
1132impl SignalHandles {
1133 #[allow(missing_docs)]
1134 pub fn add(&mut self, handle: SignalHandle) {
1135 self.0.push(handle);
1136 }
1137}
1138
1139fn spawn_signal<I, O, IOO, F, M>(world: &mut World, system: F) -> SignalSystem
1140where
1141 I: 'static,
1142 O: Clone + Send + Sync + 'static,
1143 IOO: Into<Option<O>> + 'static,
1144 F: IntoSystem<In<I>, IOO, M> + 'static,
1145{
1146 let signal_system = world.register_system(system);
1147 let runner: Arc<Box<dyn Runnable>> = Arc::new(Box::new(SystemHolder::<I, O, IOO> {
1148 system: signal_system,
1149 _marker: PhantomData,
1150 }));
1151 world.entity_mut(signal_system.entity()).insert((
1152 SignalRegistrationCount::new(),
1153 SignalInputBuffer::default(),
1154 SystemRunner {
1155 runner: Arc::new(Box::new(move |world, input| runner.run(world, input))),
1156 },
1157 ));
1158 if let Some(mut state) = world.get_resource_mut::<SignalGraphState>() {
1159 state.edge_change_seeds.insert(signal_system.entity().into());
1160 }
1161 signal_system.into()
1162}
1163
1164pub(crate) struct LazySignalState {
1165 references: AtomicUsize,
1166 pub(crate) system: RwLock<LazySystem>,
1167}
1168
1169pub(crate) enum LazySystem {
1170 #[allow(clippy::type_complexity)]
1171 System(Option<Box<dyn FnOnce(&mut World) -> SignalSystem + Send + Sync>>),
1172 Registered(SignalSystem),
1173}
1174
1175impl LazySystem {
1176 fn register(&mut self, world: &mut World) -> SignalSystem {
1177 match self {
1178 LazySystem::System(f) => {
1179 let signal = f.take().unwrap()(world);
1180 *self = LazySystem::Registered(signal);
1181 signal
1182 }
1183 LazySystem::Registered(signal) => {
1184 world
1185 .entity_mut(**signal)
1186 .get_mut::<SignalRegistrationCount>()
1187 .unwrap()
1188 .increment();
1189 *signal
1190 }
1191 }
1192 }
1193}
1194
1195pub(crate) struct LazySignal {
1196 pub(crate) inner: Arc<LazySignalState>,
1197}
1198
1199impl LazySignal {
1200 pub(crate) fn new<F: FnOnce(&mut World) -> SignalSystem + Send + Sync + 'static>(system: F) -> Self {
1201 LazySignal {
1202 inner: Arc::new(LazySignalState {
1203 references: AtomicUsize::new(1),
1204 system: RwLock::new(LazySystem::System(Some(Box::new(system)))),
1205 }),
1206 }
1207 }
1208
1209 pub(crate) fn register(self, world: &mut World) -> SignalSystem {
1210 let signal = self.inner.system.write().unwrap().register(world);
1211 let mut entity = world.entity_mut(*signal);
1212 if !entity.contains::<LazySignalHolder>() {
1213 entity.insert(LazySignalHolder(self));
1214 }
1215 signal
1216 }
1217}
1218
1219impl Clone for LazySignal {
1220 fn clone(&self) -> Self {
1221 self.inner.references.fetch_add(1, Ordering::SeqCst);
1222 LazySignal {
1223 inner: self.inner.clone(),
1224 }
1225 }
1226}
1227
1228impl Drop for LazySignal {
1229 fn drop(&mut self) {
1230 if self.inner.references.fetch_sub(1, Ordering::SeqCst) <= 2
1232 && let LazySystem::Registered(signal) = *self.inner.system.read().unwrap()
1233 {
1234 STALE_SIGNALS.lock().unwrap().push(signal);
1235 }
1236 }
1237}
1238
1239#[derive(Component)]
1240pub(crate) struct LazySignalHolder(LazySignal);
1241
1242pub(crate) static STALE_SIGNALS: LazyLock<Mutex<Vec<SignalSystem>>> = LazyLock::new(|| Mutex::new(Vec::new()));
1243
1244fn unlink_downstreams_and_mark(world: &mut World, signal: SignalSystem) {
1245 if let Some(downstreams) = world.get::<Downstream>(*signal).cloned() {
1246 for &downstream in downstreams.iter() {
1247 if let Ok(mut downstream_entity) = world.get_entity_mut(*downstream)
1248 && let Some(mut upstream) = downstream_entity.get_mut::<Upstream>()
1249 {
1250 upstream.0.remove(&signal);
1251 if upstream.0.is_empty() {
1252 downstream_entity.remove::<Upstream>();
1253 }
1254 }
1255 world
1256 .resource_mut::<SignalGraphState>()
1257 .edge_change_seeds
1258 .insert(downstream);
1259 }
1260 }
1261}
1262
1263pub(crate) fn despawn_stale_signals(world: &mut World) {
1264 let signals = STALE_SIGNALS.lock().unwrap().drain(..).collect::<Vec<_>>();
1265 for signal in signals {
1266 let should_despawn = world
1267 .get::<SignalRegistrationCount>(*signal)
1268 .map(|registration_count| **registration_count == 0)
1269 .unwrap_or(false);
1270 if should_despawn {
1271 unlink_downstreams_and_mark(world, signal);
1272 remove_signal_from_graph_state(world, signal);
1273 world.entity_mut(*signal).despawn();
1274 }
1275 }
1276}
1277
1278pub(crate) fn lazy_signal_from_system<I, O, IOO, F, M>(system: F) -> LazySignal
1279where
1280 I: 'static,
1281 O: Clone + Send + Sync + 'static,
1282 IOO: Into<Option<O>> + 'static,
1283 F: IntoSystem<In<I>, IOO, M> + Send + Sync + 'static,
1284{
1285 LazySignal::new(move |world: &mut World| spawn_signal(world, system))
1286}
1287
1288#[allow(dead_code)]
1289pub(crate) struct UpstreamIter<'w, 's, D: QueryData, F: QueryFilter>
1290where
1291 D::ReadOnly: QueryData<Item<'w, 's> = &'w Upstream>,
1292{
1293 upstreams_query: &'w Query<'w, 's, D, F>,
1294 upstreams: Vec<SignalSystem>,
1295}
1296
1297#[allow(dead_code)]
1298impl<'w, 's, D: QueryData, F: QueryFilter> UpstreamIter<'w, 's, D, F>
1299where
1300 D::ReadOnly: QueryData<Item<'w, 's> = &'w Upstream>,
1301{
1302 pub fn new(upstreams_query: &'w Query<'w, 's, D, F>, signal: SignalSystem) -> Self {
1304 UpstreamIter {
1305 upstreams_query,
1306 upstreams: upstreams_query.get(*signal).into_iter().flatten().copied().collect(),
1307 }
1308 }
1309}
1310
1311impl<'w, 's, D: QueryData, F: QueryFilter> Iterator for UpstreamIter<'w, 's, D, F>
1312where
1313 D::ReadOnly: QueryData<Item<'w, 's> = &'w Upstream>,
1314{
1315 type Item = SignalSystem;
1316
1317 fn next(&mut self) -> Option<Self::Item> {
1318 let signal = self.upstreams.pop()?;
1319 if let Ok(upstream) = self.upstreams_query.get(*signal) {
1320 self.upstreams.extend(upstream);
1321 }
1322 Some(signal)
1323 }
1324}
1325
1326#[allow(dead_code)]
1327pub(crate) struct DownstreamIter<'w, 's, D: QueryData, F: QueryFilter>
1328where
1329 D::ReadOnly: QueryData<Item<'w, 's> = &'w Downstream>,
1330{
1331 downstreams_query: &'w Query<'w, 's, D, F>,
1332 downstreams: Vec<SignalSystem>,
1333}
1334
1335impl<'w, 's, D: QueryData, F: QueryFilter> DownstreamIter<'w, 's, D, F>
1336where
1337 D::ReadOnly: QueryData<Item<'w, 's> = &'w Downstream>,
1338{
1339 #[allow(dead_code)]
1340 pub fn new(downstreams_query: &'w Query<'w, 's, D, F>, signal: SignalSystem) -> Self {
1341 DownstreamIter {
1342 downstreams_query,
1343 downstreams: downstreams_query.get(*signal).into_iter().flatten().copied().collect(),
1344 }
1345 }
1346}
1347
1348impl<'w, 's, D: QueryData, F: QueryFilter> Iterator for DownstreamIter<'w, 's, D, F>
1349where
1350 D::ReadOnly: QueryData<Item<'w, 's> = &'w Downstream>,
1351{
1352 type Item = SignalSystem;
1353
1354 fn next(&mut self) -> Option<Self::Item> {
1355 let signal = self.downstreams.pop()?;
1356 if let Ok(downstream) = self.downstreams_query.get(*signal) {
1357 self.downstreams.extend(downstream);
1358 }
1359 Some(signal)
1360 }
1361}
1362
1363fn decrement_registration_and_needs_cleanup(world: &mut World, signal: SignalSystem) -> bool {
1364 if let Ok(mut entity) = world.get_entity_mut(*signal)
1365 && let Some(mut count) = entity.get_mut::<SignalRegistrationCount>()
1366 {
1367 count.decrement();
1368 return **count == 0;
1369 }
1370 false
1371}
1372
1373fn should_despawn_signal(world: &World, signal: SignalSystem) -> bool {
1374 world
1375 .get_entity(*signal)
1376 .ok()
1377 .map(|entity| {
1378 if let Some(lazy_holder) = entity.get::<LazySignalHolder>() {
1379 lazy_holder.0.inner.references.load(Ordering::SeqCst) == 1
1380 } else {
1381 true
1384 }
1385 })
1386 .unwrap_or(false)
1387}
1388
1389fn unlink_from_upstream(world: &mut World, upstream_system: SignalSystem, signal: SignalSystem) {
1390 if let Ok(mut upstream_entity) = world.get_entity_mut(*upstream_system)
1391 && let Some(mut downstream) = upstream_entity.get_mut::<Downstream>()
1392 {
1393 downstream.0.remove(&signal);
1394 if downstream.0.is_empty() {
1395 upstream_entity.remove::<Downstream>();
1396 }
1397 }
1398}
1399
1400fn cleanup_recursive(world: &mut World, signal: SignalSystem) {
1401 if !decrement_registration_and_needs_cleanup(world, signal) {
1403 return;
1404 }
1405
1406 let upstreams = world.get::<Upstream>(*signal).cloned();
1408
1409 unlink_downstreams_and_mark(world, signal);
1411
1412 if should_despawn_signal(world, signal) {
1413 remove_signal_from_graph_state(world, signal);
1414 if let Ok(entity) = world.get_entity_mut(*signal) {
1415 entity.despawn();
1416 }
1417 }
1418
1419 if let Some(upstreams) = upstreams {
1421 for &upstream_system in upstreams.iter() {
1422 unlink_from_upstream(world, upstream_system, signal);
1423 cleanup_recursive(world, upstream_system);
1424 }
1425 }
1426}
1427
1428fn compute_levels_for_uncached(
1431 world: &World,
1432 reachable: &HashSet<SignalSystem>,
1433 cached_levels: &HashMap<SignalSystem, u32>,
1434) -> HashMap<SignalSystem, u32> {
1435 let uncached: HashSet<SignalSystem> = reachable
1436 .iter()
1437 .filter(|s| !cached_levels.contains_key(*s))
1438 .copied()
1439 .collect();
1440
1441 let result = compute_signal_levels(
1445 world,
1446 &uncached,
1447 |u| reachable.contains(&u) && !cached_levels.contains_key(&u),
1448 |u| cached_levels.get(&u).copied(),
1449 );
1450
1451 result.levels
1452}
1453
1454fn poll_signal_one_shot(In(target): In<SignalSystem>, world: &mut World) -> Option<Box<dyn AnyClone>> {
1455 let mut reachable: HashSet<SignalSystem> = HashSet::new();
1457 let mut queue: VecDeque<SignalSystem> = VecDeque::new();
1458 queue.push_back(target);
1459 reachable.insert(target);
1460
1461 while let Some(signal) = queue.pop_front() {
1462 for upstream in get_upstreams(world, signal) {
1463 if reachable.insert(upstream) {
1464 queue.push_back(upstream);
1465 }
1466 }
1467 }
1468
1469 let uncached: HashSet<SignalSystem> = {
1474 let state = world.resource::<SignalGraphState>();
1475 reachable
1476 .iter()
1477 .filter(|s| !state.levels.contains_key(*s))
1478 .copied()
1479 .collect()
1480 };
1481
1482 let uncached_levels = if uncached.is_empty() {
1484 HashMap::default()
1485 } else {
1486 compute_levels_for_uncached(world, &reachable, &world.resource::<SignalGraphState>().levels)
1487 };
1488
1489 let by_level = {
1491 let state = world.resource::<SignalGraphState>();
1492 let mut by_level: Vec<Vec<SignalSystem>> = Vec::new();
1493 for signal in &reachable {
1494 let level = state
1495 .levels
1496 .get(signal)
1497 .or_else(|| uncached_levels.get(signal))
1498 .copied()
1499 .unwrap_or(0) as usize;
1500 while by_level.len() <= level {
1501 by_level.push(Vec::new());
1502 }
1503 by_level[level].push(*signal);
1504 }
1505
1506 for level in &mut by_level {
1508 level.sort_by_key(|s| s.index());
1509 }
1510
1511 by_level
1512 };
1513
1514 let mut inputs: HashMap<SignalSystem, Vec<Box<dyn AnyClone>>> = HashMap::new();
1517 let mut target_output: Option<Box<dyn AnyClone>> = None;
1518
1519 for level in by_level {
1520 for signal in level {
1521 let runner = world
1522 .get::<SystemRunner>(*signal)
1523 .cloned()
1524 .unwrap_or_else(|| panic!("missing SystemRunner for signal {:?} during poll", signal));
1525
1526 let upstreams = get_upstreams(world, signal);
1527
1528 let output = if upstreams.is_empty() {
1529 runner.run(world, Box::new(()))
1531 } else if let Some(input_list) = inputs.remove(&signal) {
1532 let mut final_output = None;
1534 for input in input_list {
1535 if let Some(out) = runner.run(world, input) {
1536 final_output = Some(out);
1537 }
1538 }
1539 final_output
1540 } else {
1541 None
1543 };
1544
1545 if signal == target {
1547 target_output = output;
1548 continue;
1550 }
1551
1552 if let Some(out) = output {
1554 let downstreams = get_downstreams(world, signal);
1555 if let Some((last, rest)) = downstreams.split_last() {
1556 for downstream in rest {
1557 inputs.entry(*downstream).or_default().push(out.clone());
1558 }
1559 inputs.entry(*last).or_default().push(out);
1560 }
1561 }
1562 }
1563 }
1564
1565 target_output
1566}
1567
1568pub fn poll_signal(world: &mut World, signal: SignalSystem) -> Option<Box<dyn AnyClone>> {
1584 world
1585 .run_system_cached_with(poll_signal_one_shot, signal)
1586 .ok()
1587 .flatten()
1588}
1589
1590pub fn downcast_any_clone<T: 'static>(any_clone: Box<dyn AnyClone>) -> Option<T> {
1605 (any_clone as Box<dyn Any>).downcast::<T>().map(|o| *o).ok()
1606}
1607
1608#[cfg(test)]
1609mod tests {
1610 use super::*;
1611
1612 use bevy_ecs::{
1613 prelude::{In, Mut, World},
1614 schedule::ScheduleLabel,
1615 };
1616
1617 #[derive(Resource, Default)]
1618 struct Order(Vec<&'static str>);
1619
1620 fn process_signals(world: &mut World) {
1622 let default_schedule = world.resource::<SignalGraphState>().default_schedule;
1623 let mut system = process_signal_graph_for_schedule(default_schedule);
1624 system(world);
1625 }
1626
1627 #[test]
1628 #[should_panic(expected = "signal graph contains a cycle or inconsistent edges during incremental update")]
1629 fn incremental_update_panics_on_cycle() {
1630 let mut world = World::new();
1631 world.insert_resource(SignalGraphState::default());
1632
1633 let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1634 let signal_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1635
1636 world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1637 rebuild_levels(world, &mut state);
1638 });
1639
1640 world
1641 .entity_mut(*signal_a)
1642 .insert(Downstream(HashSet::from([signal_b])))
1643 .insert(Upstream(HashSet::from([signal_b])));
1644 world
1645 .entity_mut(*signal_b)
1646 .insert(Downstream(HashSet::from([signal_a])))
1647 .insert(Upstream(HashSet::from([signal_a])));
1648
1649 world
1650 .resource_mut::<SignalGraphState>()
1651 .edge_change_seeds
1652 .insert(signal_a);
1653
1654 world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1655 update_edge_change_levels(world, &mut state);
1656 });
1657 }
1658
1659 #[test]
1660 fn ordering_is_deterministic_with_multiple_roots() {
1661 let mut world = World::new();
1662 world.insert_resource(SignalGraphState::default());
1663 world.insert_resource(Order::default());
1664
1665 let signal_a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1666 order.0.push("a");
1667 Some(())
1668 });
1669 let signal_b = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1670 order.0.push("b");
1671 Some(())
1672 });
1673
1674 process_signals(&mut world);
1675
1676 let order = world.resource::<Order>().0.clone();
1677 if signal_a.0.index() < signal_b.0.index() {
1678 assert_eq!(order, vec!["a", "b"]);
1679 } else {
1680 assert_eq!(order, vec!["b", "a"]);
1681 }
1682 }
1683
1684 #[test]
1685 fn piping_updates_levels_for_same_frame_execution() {
1686 let mut world = World::new();
1687 world.insert_resource(SignalGraphState::default());
1688 world.insert_resource(Order::default());
1689
1690 let signal_a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1691 order.0.push("a");
1692 Some(())
1693 });
1694 let signal_b = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1695 order.0.push("b");
1696 Some(())
1697 });
1698
1699 process_signals(&mut world);
1700 world.resource_mut::<Order>().0.clear();
1701
1702 pipe_signal(&mut world, signal_a, signal_b);
1703 process_signals(&mut world);
1704
1705 let order = world.resource::<Order>().0.clone();
1706 assert_eq!(order, vec!["a", "b"]);
1707 }
1708
1709 #[test]
1710 fn incremental_matches_full_rebuild_after_edge_change() {
1711 let mut world = World::new();
1712 world.insert_resource(SignalGraphState::default());
1713
1714 let a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1715 let b = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1716 let c = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1717 let d = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1718
1719 pipe_signal(&mut world, a, b);
1720 pipe_signal(&mut world, b, c);
1721 pipe_signal(&mut world, a, d);
1722
1723 world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1724 rebuild_levels(world, &mut state);
1725 });
1726
1727 pipe_signal(&mut world, c, d);
1728
1729 let incremental_levels = world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1730 update_edge_change_levels(world, &mut state);
1731 state.levels.clone()
1732 });
1733
1734 let mut expected = SignalGraphState::default();
1735 rebuild_levels(&mut world, &mut expected);
1736
1737 assert_eq!(incremental_levels, expected.levels);
1738 }
1739
1740 #[test]
1741 fn cleanup_updates_downstream_levels() {
1742 let mut world = World::new();
1743 world.insert_resource(SignalGraphState::default());
1744
1745 let a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1746 let b = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1747 let c = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_)| Some(()));
1748
1749 pipe_signal(&mut world, a, b);
1750 pipe_signal(&mut world, b, c);
1751
1752 world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1753 rebuild_levels(world, &mut state);
1754 });
1755
1756 cleanup_recursive(&mut world, a);
1757
1758 let levels_after = world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1759 update_edge_change_levels(world, &mut state);
1760 state.levels.clone()
1761 });
1762
1763 assert_eq!(levels_after.get(&b), Some(&0));
1764 assert_eq!(levels_after.get(&c), Some(&1));
1765 assert!(!levels_after.contains_key(&a));
1766 }
1767
1768 #[test]
1769 fn schedule_tag_assigns_signal_to_schedule() {
1770 use bevy_app::Update;
1771
1772 let mut world = World::new();
1773 world.insert_resource(SignalGraphState::default());
1774
1775 let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1776
1777 world.entity_mut(*signal_a).insert(ScheduleTag(Update.intern()));
1779
1780 world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1781 rebuild_levels(world, &mut state);
1782 });
1783
1784 let state = world.resource::<SignalGraphState>();
1785 let update_signals = state.by_schedule.get(&Update.intern());
1786 assert!(update_signals.is_some());
1787 assert!(update_signals.unwrap().iter().flatten().any(|s| *s == signal_a));
1788 }
1789
1790 #[test]
1791 fn schedule_hint_propagates_to_downstream_via_pipe() {
1792 use bevy_app::Update;
1793
1794 let mut world = World::new();
1795 world.insert_resource(SignalGraphState::default());
1796
1797 let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1798 let signal_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1799
1800 world
1802 .entity_mut(*signal_a)
1803 .insert(ScheduleTag(Update.intern()))
1804 .insert(ScheduleHint(Update.intern()));
1805
1806 pipe_signal(&mut world, signal_a, signal_b);
1808
1809 let tag = world.get::<ScheduleTag>(*signal_b);
1811 assert!(tag.is_some());
1812 assert_eq!(tag.unwrap().0, Update.intern());
1813 }
1814
1815 #[test]
1816 fn schedule_hint_does_not_override_existing_tag() {
1817 use bevy_app::{Last, Update};
1818
1819 let mut world = World::new();
1820 world.insert_resource(SignalGraphState::default());
1821
1822 let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1823 let signal_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1824
1825 world
1827 .entity_mut(*signal_a)
1828 .insert(ScheduleTag(Update.intern()))
1829 .insert(ScheduleHint(Update.intern()));
1830
1831 world.entity_mut(*signal_b).insert(ScheduleTag(Last.intern()));
1833
1834 pipe_signal(&mut world, signal_a, signal_b);
1836
1837 let tag = world.get::<ScheduleTag>(*signal_b);
1839 assert!(tag.is_some());
1840 assert_eq!(tag.unwrap().0, Last.intern());
1841 }
1842
1843 #[test]
1844 fn schedule_propagates_to_all_upstreams_for_multi_upstream_signal() {
1845 use bevy_app::Update;
1846
1847 let mut world = World::new();
1848 let mut state = SignalGraphState::default();
1849 state.registered_schedules.insert(Update.intern());
1850 world.insert_resource(state);
1851
1852 let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1854 let signal_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1855 let signal_c = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(3));
1856
1857 let combined = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(100));
1860
1861 pipe_signal(&mut world, signal_a, combined);
1863 pipe_signal(&mut world, signal_b, combined);
1864 pipe_signal(&mut world, signal_c, combined);
1865
1866 let upstreams = world.get::<Upstream>(*combined).unwrap();
1868 assert_eq!(upstreams.iter().count(), 3);
1869
1870 apply_schedule_to_signal(&mut world, combined, Update.intern());
1872
1873 let combined_tag = world.get::<ScheduleTag>(*combined);
1875 assert!(combined_tag.is_some());
1876 assert_eq!(combined_tag.unwrap().0, Update.intern());
1877
1878 let tag_a = world.get::<ScheduleTag>(*signal_a);
1880 assert!(tag_a.is_some(), "signal_a should have schedule tag");
1881 assert_eq!(tag_a.unwrap().0, Update.intern());
1882
1883 let tag_b = world.get::<ScheduleTag>(*signal_b);
1884 assert!(tag_b.is_some(), "signal_b should have schedule tag");
1885 assert_eq!(tag_b.unwrap().0, Update.intern());
1886
1887 let tag_c = world.get::<ScheduleTag>(*signal_c);
1888 assert!(tag_c.is_some(), "signal_c should have schedule tag");
1889 assert_eq!(tag_c.unwrap().0, Update.intern());
1890 }
1891
1892 #[test]
1893 fn schedule_propagates_to_deep_multi_upstream_graph() {
1894 use bevy_app::Update;
1895
1896 let mut world = World::new();
1897 let mut state = SignalGraphState::default();
1898 state.registered_schedules.insert(Update.intern());
1899 world.insert_resource(state);
1900
1901 let root_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1909 let root_b = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1910 let root_c = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(3));
1911
1912 let mid_ab = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(10));
1913 let mid_c = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(20));
1914
1915 let final_signal = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(100));
1916
1917 pipe_signal(&mut world, root_a, mid_ab);
1919 pipe_signal(&mut world, root_b, mid_ab);
1920 pipe_signal(&mut world, root_c, mid_c);
1921 pipe_signal(&mut world, mid_ab, final_signal);
1922 pipe_signal(&mut world, mid_c, final_signal);
1923
1924 apply_schedule_to_signal(&mut world, final_signal, Update.intern());
1926
1927 for (name, signal) in [
1929 ("root_a", root_a),
1930 ("root_b", root_b),
1931 ("root_c", root_c),
1932 ("mid_ab", mid_ab),
1933 ("mid_c", mid_c),
1934 ("final", final_signal),
1935 ] {
1936 let tag = world.get::<ScheduleTag>(*signal);
1937 assert!(tag.is_some(), "{name} should have schedule tag");
1938 assert_eq!(tag.unwrap().0, Update.intern(), "{name} should have Update schedule");
1939 }
1940 }
1941
1942 #[test]
1943 fn by_schedule_is_partitioned_correctly() {
1944 use bevy_app::Update;
1945
1946 let mut world = World::new();
1947 world.insert_resource(SignalGraphState::default());
1948
1949 let signal_update = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
1950 let signal_default = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(2));
1951
1952 world.entity_mut(*signal_update).insert(ScheduleTag(Update.intern()));
1954
1955 world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1956 rebuild_levels(world, &mut state);
1957 });
1958
1959 let state = world.resource::<SignalGraphState>();
1960
1961 let update_signals = state.by_schedule.get(&Update.intern());
1963 assert!(update_signals.is_some());
1964 let update_flat: Vec<_> = update_signals.unwrap().iter().flatten().collect();
1965 assert!(update_flat.contains(&&signal_update));
1966 assert!(!update_flat.contains(&&signal_default));
1967
1968 let default_signals = state.by_schedule.get(&state.default_schedule);
1970 assert!(default_signals.is_some());
1971 let default_flat: Vec<_> = default_signals.unwrap().iter().flatten().collect();
1972 assert!(default_flat.contains(&&signal_default));
1973 assert!(!default_flat.contains(&&signal_update));
1974 }
1975
1976 #[test]
1977 fn process_for_schedule_only_runs_scheduled_signals() {
1978 use bevy_app::Update;
1979
1980 let mut world = World::new();
1981 world.insert_resource(SignalGraphState::default());
1982 world.insert_resource(Order::default());
1983
1984 let signal_update = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1985 order.0.push("update");
1986 Some(())
1987 });
1988 let _signal_default =
1989 spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), mut order: ResMut<Order>| {
1990 order.0.push("default");
1991 Some(())
1992 });
1993
1994 world.entity_mut(*signal_update).insert(ScheduleTag(Update.intern()));
1996
1997 world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
1999 rebuild_levels(world, &mut state);
2000 });
2001
2002 let mut process_update = process_signal_graph_for_schedule(Update.intern());
2004 process_update(&mut world);
2005
2006 let order = world.resource::<Order>().0.clone();
2007 assert_eq!(order, vec!["update"]);
2008
2009 world.resource_mut::<Order>().0.clear();
2011 let default_schedule = world.resource::<SignalGraphState>().default_schedule;
2012 let mut process_default = process_signal_graph_for_schedule(default_schedule);
2013 process_default(&mut world);
2014
2015 let order = world.resource::<Order>().0.clone();
2016 assert_eq!(order, vec!["default"]);
2017 }
2018
2019 #[test]
2020 fn cross_schedule_data_flow_via_inputs() {
2021 use bevy_app::Update;
2022
2023 let mut world = World::new();
2024 world.insert_resource(SignalGraphState::default());
2025
2026 #[derive(Resource, Default)]
2027 struct CollectedValues(Vec<i32>);
2028 world.insert_resource(CollectedValues::default());
2029
2030 let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(42));
2032
2033 let signal_b = spawn_signal::<i32, (), Option<()>, _, _>(
2035 &mut world,
2036 |In(value): In<i32>, mut collected: ResMut<CollectedValues>| {
2037 collected.0.push(value);
2038 Some(())
2039 },
2040 );
2041
2042 world.entity_mut(*signal_a).insert(ScheduleTag(Update.intern()));
2044
2045 pipe_signal(&mut world, signal_a, signal_b);
2047
2048 world.resource_scope(|world, mut state: Mut<SignalGraphState>| {
2050 rebuild_levels(world, &mut state);
2051 });
2052
2053 let mut process_update = process_signal_graph_for_schedule(Update.intern());
2055 process_update(&mut world);
2056
2057 assert!(world.resource::<CollectedValues>().0.is_empty());
2059
2060 let default_schedule = world.resource::<SignalGraphState>().default_schedule;
2062 let mut process_default = process_signal_graph_for_schedule(default_schedule);
2063 process_default(&mut world);
2064
2065 assert_eq!(world.resource::<CollectedValues>().0, vec![42]);
2067 }
2068
2069 #[test]
2070 fn clear_signal_inputs_clears_inputs() {
2071 let mut world = World::new();
2072 world.insert_resource(SignalGraphState::default());
2073
2074 let signal_a = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
2075
2076 world
2078 .get_mut::<SignalInputBuffer>(*signal_a)
2079 .unwrap()
2080 .push(Box::new(42i32) as Box<dyn AnyClone>);
2081
2082 assert!(!world.get::<SignalInputBuffer>(*signal_a).unwrap().0.is_empty());
2083
2084 world.get_mut::<SignalInputBuffer>(*signal_a).unwrap().clear();
2086
2087 assert!(world.get::<SignalInputBuffer>(*signal_a).unwrap().0.is_empty());
2088 }
2089
2090 #[test]
2091 fn signals_registered_during_processing_are_processed_same_frame() {
2092 use bevy_app::Update;
2093
2094 #[derive(Resource, Default)]
2099 struct ProcessOrder(Vec<&'static str>);
2100
2101 #[derive(Resource)]
2102 struct ChildSignalHandle(Option<SignalSystem>);
2103
2104 let mut world = World::new();
2105 world.insert_resource(SignalGraphState::new(Update.intern()));
2106 world.insert_resource(ProcessOrder::default());
2107 world.insert_resource(ChildSignalHandle(None));
2108
2109 let parent_signal = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), world: &mut World| {
2111 world.resource_mut::<ProcessOrder>().0.push("parent");
2112
2113 if world.resource::<ChildSignalHandle>().0.is_none() {
2115 let child_signal =
2117 spawn_signal::<(), (), Option<()>, _, _>(world, |In(_), mut order: ResMut<ProcessOrder>| {
2118 order.0.push("child");
2119 Some(())
2120 });
2121 world.entity_mut(*child_signal).insert(ScheduleTag(Update.intern()));
2123 world.resource_mut::<ChildSignalHandle>().0 = Some(child_signal);
2124 }
2125 Some(())
2126 });
2127
2128 world.entity_mut(*parent_signal).insert(ScheduleTag(Update.intern()));
2130
2131 let mut process_system = process_signal_graph_for_schedule(Update.intern());
2133 process_system(&mut world);
2134
2135 let order = &world.resource::<ProcessOrder>().0;
2137 assert!(order.contains(&"parent"), "Parent signal should have been processed");
2138 assert!(
2139 order.contains(&"child"),
2140 "Child signal registered during processing should also be processed"
2141 );
2142 }
2143
2144 #[test]
2145 fn fixpoint_loop_handles_multiple_levels_of_spawning() {
2146 use bevy_app::Update;
2147
2148 #[derive(Resource, Default)]
2151 struct ProcessOrder(Vec<&'static str>);
2152
2153 #[derive(Resource, Default)]
2154 struct SpawnedSignals(Vec<SignalSystem>);
2155
2156 let mut world = World::new();
2157 world.insert_resource(SignalGraphState::new(Update.intern()));
2158 world.insert_resource(ProcessOrder::default());
2159 world.insert_resource(SpawnedSignals::default());
2160
2161 let signal_a = spawn_signal::<(), (), Option<()>, _, _>(&mut world, |In(_), world: &mut World| {
2163 world.resource_mut::<ProcessOrder>().0.push("A");
2164
2165 let spawned = &world.resource::<SpawnedSignals>().0;
2166 if spawned.is_empty() {
2167 let signal_b = spawn_signal::<(), (), Option<()>, _, _>(world, |In(_), world: &mut World| {
2169 world.resource_mut::<ProcessOrder>().0.push("B");
2170
2171 let spawned = &world.resource::<SpawnedSignals>().0;
2172 if spawned.len() == 1 {
2173 let signal_c = spawn_signal::<(), (), Option<()>, _, _>(
2175 world,
2176 |In(_), mut order: ResMut<ProcessOrder>| {
2177 order.0.push("C");
2178 Some(())
2179 },
2180 );
2181 world.entity_mut(*signal_c).insert(ScheduleTag(Update.intern()));
2182 world.resource_mut::<SpawnedSignals>().0.push(signal_c);
2183 }
2184 Some(())
2185 });
2186 world.entity_mut(*signal_b).insert(ScheduleTag(Update.intern()));
2187 world.resource_mut::<SpawnedSignals>().0.push(signal_b);
2188 }
2189 Some(())
2190 });
2191
2192 world.entity_mut(*signal_a).insert(ScheduleTag(Update.intern()));
2193
2194 let mut process_system = process_signal_graph_for_schedule(Update.intern());
2196 process_system(&mut world);
2197
2198 let order = &world.resource::<ProcessOrder>().0;
2200 assert!(order.contains(&"A"), "Signal A should have been processed");
2201 assert!(
2202 order.contains(&"B"),
2203 "Signal B (spawned by A) should have been processed"
2204 );
2205 assert!(
2206 order.contains(&"C"),
2207 "Signal C (spawned by B) should have been processed"
2208 );
2209 }
2210
2211 #[test]
2212 #[should_panic(expected = "has not been registered with `JonmoPlugin`")]
2213 fn apply_schedule_to_signal_panics_on_unregistered_schedule() {
2214 use bevy_app::Update;
2215
2216 let mut world = World::new();
2217 world.insert_resource(SignalGraphState::default());
2218
2219 let signal = spawn_signal::<(), i32, Option<i32>, _, _>(&mut world, |In(_)| Some(1));
2220
2221 apply_schedule_to_signal(&mut world, signal, Update.intern());
2223 }
2224}