1use std::any::{Any, TypeId};
123use std::hash::Hash;
124
125use nexus_notify::local::LocalNotify;
126use nexus_notify::{Events, Token};
127use rustc_hash::FxHashMap;
128
129use crate::ctx_pipeline::CtxStepCall;
130use crate::handler::Param;
131use crate::world::{Registry, Resource, ResourceId, World};
132
133#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
142pub struct DataSource(pub usize);
143
144pub struct ReactorNotify {
158 notify: LocalNotify,
160
161 interests: slab::Slab<Vec<Token>>,
165
166 reactor_sources: Vec<Vec<DataSource>>,
169
170 reactors: slab::Slab<Option<Box<dyn Reactor>>>,
175}
176
177impl std::fmt::Debug for ReactorNotify {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 f.debug_struct("ReactorNotify")
181 .field("num_sources", &self.interests.len())
182 .field("num_reactors", &self.reactors.len())
183 .field("notify", &self.notify)
184 .finish()
185 }
186}
187
188impl ReactorNotify {
189 pub fn new(source_capacity: usize, reactor_capacity: usize) -> Self {
191 Self {
192 notify: LocalNotify::with_capacity(reactor_capacity),
193 interests: slab::Slab::with_capacity(source_capacity),
194 reactor_sources: Vec::with_capacity(reactor_capacity),
195 reactors: slab::Slab::with_capacity(reactor_capacity),
196 }
197 }
198
199 pub fn register_source(&mut self) -> DataSource {
205 DataSource(self.interests.insert(Vec::new()))
206 }
207
208 pub fn remove_source(&mut self, source: DataSource) {
213 if self.interests.contains(source.0) {
214 self.interests.remove(source.0);
215 }
216 }
217
218 pub fn create_reactor(&mut self) -> Token {
245 let key = self.reactors.insert(None);
248 self.notify.ensure_capacity(key);
249 if key >= self.reactor_sources.len() {
251 self.reactor_sources.resize_with(key + 1, Vec::new);
252 }
253 Token::new(key)
254 }
255
256 pub fn insert_reactor(
266 &mut self,
267 token: Token,
268 reactor: impl Reactor + 'static,
269 ) -> ReactorRegistration<'_> {
270 let idx = token.index();
271 assert!(
272 self.reactors.contains(idx),
273 "token {} was not allocated by create_reactor",
274 idx,
275 );
276 assert!(
277 self.reactors[idx].is_none(),
278 "token {} was already filled",
279 idx,
280 );
281 self.reactors[idx] = Some(Box::new(reactor));
282 ReactorRegistration {
283 token,
284 notify: self,
285 }
286 }
287
288 pub fn register<C, Params, F: IntoReactor<C, Params>>(
295 &mut self,
296 ctx_fn: impl FnOnce(Token) -> C,
297 step: F,
298 registry: &Registry,
299 ) -> ReactorRegistration<'_> {
300 let key = self.reactors.vacant_key();
301 let token = Token::new(key);
302 self.notify.ensure_capacity(key);
303 if key >= self.reactor_sources.len() {
304 self.reactor_sources.resize_with(key + 1, Vec::new);
305 }
306 let ctx = ctx_fn(token);
307 let reactor = step.into_reactor(ctx, registry);
308 let inserted = self.reactors.insert(Some(Box::new(reactor)));
309 debug_assert_eq!(inserted, key);
310 ReactorRegistration {
311 token,
312 notify: self,
313 }
314 }
315
316 pub fn register_built(&mut self, reactor: impl Reactor + 'static) -> ReactorRegistration<'_> {
323 let key = self.reactors.vacant_key();
324 let token = Token::new(key);
325 self.notify.ensure_capacity(key);
326 if key >= self.reactor_sources.len() {
327 self.reactor_sources.resize_with(key + 1, Vec::new);
328 }
329 let inserted = self.reactors.insert(Some(Box::new(reactor)));
330 debug_assert_eq!(inserted, key);
331 ReactorRegistration {
332 token,
333 notify: self,
334 }
335 }
336
337 pub fn subscribe(&mut self, reactor: Token, source: DataSource) {
344 if let Some(subscribers) = self.interests.get_mut(source.0) {
345 if !subscribers.contains(&reactor) {
346 subscribers.push(reactor);
347 let idx = reactor.index();
349 debug_assert!(
350 idx < self.reactor_sources.len(),
351 "reactor_sources missing entry for reactor token {}",
352 idx,
353 );
354 self.reactor_sources[idx].push(source);
355 }
356 }
357 }
358
359 pub fn unsubscribe(&mut self, reactor: Token, source: DataSource) {
361 if let Some(subscribers) = self.interests.get_mut(source.0) {
362 subscribers.retain(|&t| t != reactor);
363 }
364 if let Some(sources) = self.reactor_sources.get_mut(reactor.index()) {
365 sources.retain(|&s| s != source);
366 }
367 }
368
369 #[inline]
376 pub fn mark(&mut self, source: DataSource) {
377 if let Some(subscribers) = self.interests.get(source.0) {
378 for &reactor_token in subscribers {
379 self.notify.mark(reactor_token);
380 }
381 }
382 }
383
384 #[inline]
386 pub(crate) fn poll(&mut self, events: &mut Events) {
387 self.notify.poll(events);
388 }
389
390 #[inline]
393 pub(crate) fn take_reactor(&mut self, idx: usize) -> Option<Box<dyn Reactor>> {
394 self.reactors.get_mut(idx).and_then(Option::take)
395 }
396
397 #[inline]
403 pub(crate) fn put_reactor(&mut self, idx: usize, reactor: Box<dyn Reactor>) {
404 self.reactors[idx] = Some(reactor);
405 }
406
407 pub fn remove_reactor(&mut self, token: Token) {
412 let idx = token.index();
413 if self.reactors.contains(idx) {
414 self.reactors.remove(idx);
415 if let Some(sources) = self.reactor_sources.get_mut(idx) {
417 for &source in sources.iter() {
418 if let Some(subscribers) = self.interests.get_mut(source.0) {
419 subscribers.retain(|&t| t != token);
420 }
421 }
422 sources.clear();
423 }
424 }
425 }
426
427 pub fn has_notified(&self) -> bool {
431 self.notify.has_notified()
432 }
433
434 pub fn notified_count(&self) -> usize {
436 self.notify.notified_count()
437 }
438
439 pub fn source_count(&self) -> usize {
441 self.interests.len()
442 }
443
444 pub fn reactor_count(&self) -> usize {
446 self.reactors.len()
447 }
448}
449
450impl Resource for ReactorNotify {}
451
452pub struct ReactorRegistration<'a> {
458 token: Token,
459 notify: &'a mut ReactorNotify,
460}
461
462impl ReactorRegistration<'_> {
463 pub fn subscribe(self, source: DataSource) -> Self {
465 self.notify.subscribe(self.token, source);
466 self
467 }
468
469 pub fn token(&self) -> Token {
471 self.token
472 }
473}
474
475pub trait Reactor: Send {
489 fn run(&mut self, world: &mut World);
491
492 fn name(&self) -> &'static str {
494 "<unnamed>"
495 }
496}
497
498pub struct ReactorFn<C, F, Params: Param> {
508 pub ctx: C,
510 f: F,
511 state: Params::State,
512 name: &'static str,
513}
514
515pub struct PipelineReactor<C> {
542 pub ctx: C,
544 body: Box<dyn CtxStepCall<C, (), Out = ()> + Send>,
545}
546
547impl<C: Send + 'static> PipelineReactor<C> {
548 pub fn new(ctx: C, body: impl CtxStepCall<C, (), Out = ()> + Send + 'static) -> Self {
554 Self {
555 ctx,
556 body: Box::new(body),
557 }
558 }
559}
560
561impl<C: Send + 'static> Reactor for PipelineReactor<C> {
562 fn run(&mut self, world: &mut World) {
563 self.body.call(&mut self.ctx, world, ());
564 }
565
566 fn name(&self) -> &'static str {
567 std::any::type_name::<C>()
568 }
569}
570
571#[diagnostic::on_unimplemented(
591 message = "this function cannot be used as a reactor step",
592 note = "reactor step signature: `fn(&mut C, Params...)` — context first, then resources",
593 note = "closures with resource parameters are not supported — use a named `fn`"
594)]
595pub trait IntoReactor<C, Params> {
596 type Reactor: Reactor + 'static;
598
599 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor;
602}
603
604impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> Reactor for ReactorFn<C, F, ()> {
609 fn run(&mut self, _world: &mut World) {
610 (self.f)(&mut self.ctx);
611 }
612
613 fn name(&self) -> &'static str {
614 self.name
615 }
616}
617
618impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> IntoReactor<C, ()> for F {
619 type Reactor = ReactorFn<C, F, ()>;
620
621 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
622 ReactorFn {
623 ctx,
624 f: self,
625 state: <() as Param>::init(registry),
626 name: std::any::type_name::<F>(),
627 }
628 }
629}
630
631macro_rules! impl_into_reactor {
636 ($($P:ident),+) => {
637 impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
638 Reactor for ReactorFn<C, F, ($($P,)+)>
639 where
640 for<'a> &'a mut F:
641 FnMut(&mut C, $($P,)+) +
642 FnMut(&mut C, $($P::Item<'a>,)+),
643 {
644 #[allow(non_snake_case)]
645 fn run(&mut self, world: &mut World) {
646 #[allow(clippy::too_many_arguments)]
647 fn call_inner<Ctx, $($P,)+>(
648 mut f: impl FnMut(&mut Ctx, $($P,)+),
649 ctx: &mut Ctx,
650 $($P: $P,)+
651 ) {
652 f(ctx, $($P,)+);
653 }
654
655 #[cfg(debug_assertions)]
659 world.clear_borrows();
660 let ($($P,)+) = unsafe {
661 <($($P,)+) as Param>::fetch(world, &mut self.state)
662 };
663 call_inner(&mut self.f, &mut self.ctx, $($P,)+);
664 }
665
666 fn name(&self) -> &'static str {
667 self.name
668 }
669 }
670
671 impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
672 IntoReactor<C, ($($P,)+)> for F
673 where
674 for<'a> &'a mut F:
675 FnMut(&mut C, $($P,)+) +
676 FnMut(&mut C, $($P::Item<'a>,)+),
677 {
678 type Reactor = ReactorFn<C, F, ($($P,)+)>;
679
680 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
681 let state = <($($P,)+) as Param>::init(registry);
682 {
683 #[allow(non_snake_case)]
684 let ($($P,)+) = &state;
685 registry.check_access(&[
686 $(
687 (<$P as Param>::resource_id($P),
688 std::any::type_name::<$P>()),
689 )+
690 ]);
691 }
692 ReactorFn {
693 ctx,
694 f: self,
695 state,
696 name: std::any::type_name::<F>(),
697 }
698 }
699 }
700 };
701}
702
703all_tuples!(impl_into_reactor);
704
705#[derive(Default)]
715pub struct DeferredRemovals {
716 tokens: Vec<Token>,
717}
718
719impl DeferredRemovals {
720 pub fn deregister(&mut self, token: Token) {
725 self.tokens.push(token);
726 }
727
728 #[inline]
731 pub(crate) fn take(&mut self) -> Vec<Token> {
732 std::mem::take(&mut self.tokens)
733 }
734
735 #[inline]
737 pub(crate) fn put(&mut self, tokens: Vec<Token>) {
738 debug_assert!(tokens.is_empty(), "put() expects a drained Vec");
739 self.tokens = tokens;
740 }
741
742 pub fn is_empty(&self) -> bool {
744 self.tokens.is_empty()
745 }
746}
747
748impl Resource for DeferredRemovals {}
749
750#[derive(Default)]
780pub struct SourceRegistry {
781 maps: FxHashMap<TypeId, Box<dyn Any + Send>>,
782}
783
784impl SourceRegistry {
785 pub fn new() -> Self {
787 Self::default()
788 }
789
790 pub fn insert<K: Hash + Eq + Send + 'static>(&mut self, key: K, source: DataSource) {
794 self.get_or_create_map::<K>().insert(key, source);
795 }
796
797 pub fn get<K: Hash + Eq + Send + 'static>(&self, key: &K) -> Option<DataSource> {
801 self.get_map::<K>().and_then(|map| map.get(key)).copied()
802 }
803
804 pub fn remove<K: Hash + Eq + Send + 'static>(&mut self, key: &K) -> Option<DataSource> {
807 self.get_map_mut::<K>().and_then(|map| map.remove(key))
808 }
809
810 pub fn contains<K: Hash + Eq + Send + 'static>(&self, key: &K) -> bool {
812 self.get_map::<K>().is_some_and(|map| map.contains_key(key))
813 }
814
815 fn get_map<K: Hash + Eq + Send + 'static>(&self) -> Option<&FxHashMap<K, DataSource>> {
816 self.maps.get(&TypeId::of::<K>()).map(|boxed| {
817 boxed
819 .downcast_ref::<FxHashMap<K, DataSource>>()
820 .expect("invariant: TypeId matches stored map type")
821 })
822 }
823
824 fn get_map_mut<K: Hash + Eq + Send + 'static>(
825 &mut self,
826 ) -> Option<&mut FxHashMap<K, DataSource>> {
827 self.maps.get_mut(&TypeId::of::<K>()).map(|boxed| {
828 boxed
830 .downcast_mut::<FxHashMap<K, DataSource>>()
831 .expect("invariant: TypeId matches stored map type")
832 })
833 }
834
835 fn get_or_create_map<K: Hash + Eq + Send + 'static>(
836 &mut self,
837 ) -> &mut FxHashMap<K, DataSource> {
838 self.maps
839 .entry(TypeId::of::<K>())
840 .or_insert_with(|| Box::<FxHashMap<K, DataSource>>::default())
841 .downcast_mut::<FxHashMap<K, DataSource>>()
843 .unwrap()
844 }
845}
846
847impl Resource for SourceRegistry {}
848
849pub struct ReactorSystem {
860 events: Events,
862
863 notify_id: ResourceId,
865 removals_id: ResourceId,
866}
867
868impl ReactorSystem {
869 pub fn new(world: &World) -> Self {
873 Self {
874 events: Events::with_capacity(256),
875 notify_id: world.id::<ReactorNotify>(),
876 removals_id: world.id::<DeferredRemovals>(),
877 }
878 }
879
880 pub fn dispatch(&mut self, world: &mut World) -> bool {
888 let notify_ptr: *mut ReactorNotify =
891 unsafe { world.get_mut::<ReactorNotify>(self.notify_id) };
892
893 {
895 let notify = unsafe { &mut *notify_ptr };
896 notify.poll(&mut self.events);
897 }
898 let ran = !self.events.is_empty();
899
900 for token in self.events.iter() {
903 let idx = token.index();
904 let reactor = {
907 let notify = unsafe { &mut *notify_ptr };
908 notify.take_reactor(idx)
909 };
910 if let Some(mut reactor) = reactor {
911 reactor.run(world);
912 let notify = unsafe { &mut *notify_ptr };
915 notify.put_reactor(idx, reactor);
916 }
917 }
918
919 let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
923 let mut pending = removals.take();
924 if !pending.is_empty() {
925 let notify = unsafe { &mut *notify_ptr };
927 while let Some(token) = pending.pop() {
928 notify.remove_reactor(token);
929 }
930 }
931 let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
933 removals.put(pending);
934
935 ran
936 }
937
938 pub fn reactor_count(&self, world: &World) -> usize {
940 world.resource::<ReactorNotify>().reactor_count()
941 }
942}
943
944#[cfg(test)]
949mod tests {
950 use super::*;
951 use crate::{Res, ResMut, WorldBuilder};
952
953 #[test]
956 fn reactor_fn_arity0() {
957 let wb = WorldBuilder::new();
958 let mut world = wb.build();
959 let reg = world.registry();
960
961 struct Ctx {
962 count: u32,
963 }
964
965 fn step(ctx: &mut Ctx) {
966 ctx.count += 1;
967 }
968
969 let mut reactor = step.into_reactor(Ctx { count: 0 }, reg);
970 reactor.run(&mut world);
971 assert_eq!(reactor.ctx.count, 1);
972 }
973
974 #[test]
975 fn reactor_fn_with_params() {
976 let mut wb = WorldBuilder::new();
977 wb.register::<u64>(10);
978 wb.register::<u32>(0);
979 let mut world = wb.build();
980 let reg = world.registry();
981
982 struct Ctx {
983 multiplier: u64,
984 }
985
986 fn step(ctx: &mut Ctx, val: Res<u64>, mut out: ResMut<u32>) {
987 *out = (*val * ctx.multiplier) as u32;
988 }
989
990 let mut reactor = step.into_reactor(Ctx { multiplier: 5 }, reg);
991 reactor.run(&mut world);
992 assert_eq!(*world.resource::<u32>(), 50);
993 }
994
995 fn dummy_reactor() -> ReactorFn<(), fn(&mut ()), ()> {
998 ReactorFn {
999 ctx: (),
1000 f: (|_: &mut ()| {}) as fn(&mut ()),
1001 state: (),
1002 name: "dummy",
1003 }
1004 }
1005
1006 #[test]
1007 fn reactor_notify_mark_fans_out() {
1008 let mut notify = ReactorNotify::new(4, 8);
1009 let mut events = Events::with_capacity(8);
1010
1011 let src = notify.register_source();
1012 let a1 = notify.register_built(dummy_reactor()).token();
1013 let a2 = notify.register_built(dummy_reactor()).token();
1014 let _a3 = notify.register_built(dummy_reactor()).token();
1015
1016 notify.subscribe(a1, src);
1017 notify.subscribe(a2, src);
1018 notify.mark(src);
1021 notify.notify.poll(&mut events);
1022
1023 assert_eq!(events.len(), 2);
1024 assert!(events.as_slice().contains(&a1));
1025 assert!(events.as_slice().contains(&a2));
1026 }
1027
1028 #[test]
1029 fn reactor_notify_dedup_across_sources() {
1030 let mut notify = ReactorNotify::new(4, 8);
1031 let mut events = Events::with_capacity(8);
1032
1033 let src1 = notify.register_source();
1034 let src2 = notify.register_source();
1035 let reactor = notify.register_built(dummy_reactor()).token();
1036
1037 notify.subscribe(reactor, src1);
1038 notify.subscribe(reactor, src2);
1039
1040 notify.mark(src1);
1041 notify.mark(src2);
1042
1043 notify.notify.poll(&mut events);
1044 assert_eq!(events.len(), 1);
1045 assert_eq!(events.as_slice()[0], reactor);
1046 }
1047
1048 #[test]
1049 fn reactor_notify_remove_reactor() {
1050 let mut notify = ReactorNotify::new(4, 8);
1051 let mut events = Events::with_capacity(8);
1052
1053 let src = notify.register_source();
1054
1055 struct Ctx;
1056 let token = notify
1057 .register_built(ReactorFn {
1058 ctx: Ctx,
1059 f: (|_: &mut Ctx| {}) as fn(&mut Ctx),
1060 state: (),
1061 name: "test",
1062 })
1063 .token();
1064 notify.subscribe(token, src);
1065
1066 notify.remove_reactor(token);
1067 notify.mark(src);
1068 notify.notify.poll(&mut events);
1069 assert!(events.is_empty());
1070 }
1071
1072 fn notify_mut(world: &World, id: ResourceId) -> &mut ReactorNotify {
1078 unsafe { world.get_mut::<ReactorNotify>(id) }
1079 }
1080
1081 #[test]
1082 fn reactor_system_dispatch() {
1083 let mut wb = WorldBuilder::new();
1084 wb.register::<u64>(0);
1085 wb.register(ReactorNotify::new(4, 8));
1086 wb.register(DeferredRemovals::default());
1087 let mut world = wb.build();
1088 let reg = world.registry();
1089 let nid = world.id::<ReactorNotify>();
1090
1091 let mut system = ReactorSystem::new(&world);
1092
1093 struct Ctx {
1094 _reactor_id: Token,
1095 increment: u64,
1096 }
1097
1098 fn step(ctx: &mut Ctx, mut val: ResMut<u64>) {
1099 *val += ctx.increment;
1100 }
1101
1102 let notify = notify_mut(&world, nid);
1103 let src = notify.register_source();
1104 notify
1105 .register(
1106 |t| Ctx {
1107 _reactor_id: t,
1108 increment: 10,
1109 },
1110 step,
1111 reg,
1112 )
1113 .subscribe(src);
1114 notify
1115 .register(
1116 |t| Ctx {
1117 _reactor_id: t,
1118 increment: 5,
1119 },
1120 step,
1121 reg,
1122 )
1123 .subscribe(src);
1124
1125 notify_mut(&world, nid).mark(src);
1127 let ran = system.dispatch(&mut world);
1128
1129 assert!(ran);
1130 assert_eq!(*world.resource::<u64>(), 15); }
1132
1133 #[test]
1134 fn reactor_system_deferred_removal() {
1135 let mut wb = WorldBuilder::new();
1136 wb.register::<u64>(0);
1137 wb.register(ReactorNotify::new(4, 8));
1138 wb.register(DeferredRemovals::default());
1139 let mut world = wb.build();
1140 let reg = world.registry();
1141 let nid = world.id::<ReactorNotify>();
1142
1143 let mut system = ReactorSystem::new(&world);
1144
1145 struct Ctx {
1146 reactor_id: Token,
1147 runs: u64,
1148 }
1149
1150 fn step(ctx: &mut Ctx, mut val: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1151 *val += 1;
1152 ctx.runs += 1;
1153 if ctx.runs >= 2 {
1154 removals.deregister(ctx.reactor_id);
1155 }
1156 }
1157
1158 let notify = notify_mut(&world, nid);
1159 let src = notify.register_source();
1160 notify
1161 .register(
1162 |t| Ctx {
1163 reactor_id: t,
1164 runs: 0,
1165 },
1166 step,
1167 reg,
1168 )
1169 .subscribe(src);
1170
1171 assert_eq!(system.reactor_count(&world), 1);
1172
1173 notify_mut(&world, nid).mark(src);
1175 system.dispatch(&mut world);
1176 assert_eq!(*world.resource::<u64>(), 1);
1177 assert_eq!(system.reactor_count(&world), 1);
1178
1179 notify_mut(&world, nid).mark(src);
1181 system.dispatch(&mut world);
1182 assert_eq!(*world.resource::<u64>(), 2);
1183 assert_eq!(system.reactor_count(&world), 0);
1184
1185 notify_mut(&world, nid).mark(src);
1187 let ran = system.dispatch(&mut world);
1188 assert!(!ran);
1189 assert_eq!(*world.resource::<u64>(), 2);
1190 }
1191
1192 #[test]
1193 fn reactor_system_only_subscribed_wake() {
1194 let mut wb = WorldBuilder::new();
1195 wb.register::<u64>(0);
1196 wb.register(ReactorNotify::new(4, 8));
1197 wb.register(DeferredRemovals::default());
1198 let mut world = wb.build();
1199 let reg = world.registry();
1200 let nid = world.id::<ReactorNotify>();
1201
1202 let mut system = ReactorSystem::new(&world);
1203
1204 struct Ctx {
1205 _reactor_id: Token,
1206 value: u64,
1207 }
1208
1209 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1210 *out += ctx.value;
1211 }
1212
1213 let notify = notify_mut(&world, nid);
1214 let btc = notify.register_source();
1215 let eth = notify.register_source();
1216
1217 notify
1218 .register(
1219 |t| Ctx {
1220 _reactor_id: t,
1221 value: 10,
1222 },
1223 step,
1224 reg,
1225 )
1226 .subscribe(btc);
1227 notify
1228 .register(
1229 |t| Ctx {
1230 _reactor_id: t,
1231 value: 100,
1232 },
1233 step,
1234 reg,
1235 )
1236 .subscribe(eth);
1237
1238 notify_mut(&world, nid).mark(btc);
1240 system.dispatch(&mut world);
1241 assert_eq!(*world.resource::<u64>(), 10);
1242
1243 notify_mut(&world, nid).mark(eth);
1245 system.dispatch(&mut world);
1246 assert_eq!(*world.resource::<u64>(), 110);
1247 }
1248
1249 #[test]
1250 fn runtime_registration() {
1251 let mut wb = WorldBuilder::new();
1252 wb.register::<u64>(0);
1253 wb.register(ReactorNotify::new(4, 8));
1254 wb.register(DeferredRemovals::default());
1255 let mut world = wb.build();
1256 let nid = world.id::<ReactorNotify>();
1257
1258 let mut system = ReactorSystem::new(&world);
1259
1260 struct Ctx {
1261 _reactor_id: Token,
1262 value: u64,
1263 }
1264
1265 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1266 *out += ctx.value;
1267 }
1268
1269 let src = {
1271 let reg = world.registry();
1272 let notify = notify_mut(&world, nid);
1273 let src = notify.register_source();
1274 notify
1275 .register(
1276 |t| Ctx {
1277 _reactor_id: t,
1278 value: 10,
1279 },
1280 step,
1281 reg,
1282 )
1283 .subscribe(src);
1284 src
1285 };
1286
1287 notify_mut(&world, nid).mark(src);
1289 system.dispatch(&mut world);
1290 assert_eq!(*world.resource::<u64>(), 10);
1291
1292 {
1294 let reg = world.registry();
1295 notify_mut(&world, nid)
1296 .register(
1297 |t| Ctx {
1298 _reactor_id: t,
1299 value: 100,
1300 },
1301 step,
1302 reg,
1303 )
1304 .subscribe(src);
1305 }
1306
1307 notify_mut(&world, nid).mark(src);
1309 system.dispatch(&mut world);
1310 assert_eq!(*world.resource::<u64>(), 120); }
1312
1313 #[test]
1314 fn register_after_remove_reuses_key() {
1315 let mut wb = WorldBuilder::new();
1316 wb.register::<u64>(0);
1317 wb.register(ReactorNotify::new(4, 8));
1318 wb.register(DeferredRemovals::default());
1319 let mut world = wb.build();
1320 let nid = world.id::<ReactorNotify>();
1321
1322 let mut system = ReactorSystem::new(&world);
1323
1324 struct Ctx {
1325 reactor_id: Token,
1326 value: u64,
1327 }
1328
1329 fn step(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1330 *out += ctx.value;
1331 if ctx.value == 10 {
1332 removals.deregister(ctx.reactor_id);
1333 }
1334 }
1335
1336 let src = {
1337 let reg = world.registry();
1338 let notify = notify_mut(&world, nid);
1339 let src = notify.register_source();
1340 notify
1341 .register(
1342 |t| Ctx {
1343 reactor_id: t,
1344 value: 10,
1345 },
1346 step,
1347 reg,
1348 )
1349 .subscribe(src);
1350 src
1351 };
1352
1353 notify_mut(&world, nid).mark(src);
1355 system.dispatch(&mut world);
1356 assert_eq!(*world.resource::<u64>(), 10);
1357 assert_eq!(system.reactor_count(&world), 0);
1358
1359 {
1361 let reg = world.registry();
1362 let notify = notify_mut(&world, nid);
1363 let token = notify
1364 .register(
1365 |t| Ctx {
1366 reactor_id: t,
1367 value: 100,
1368 },
1369 step,
1370 reg,
1371 )
1372 .token();
1373 notify.subscribe(token, src);
1374 assert_eq!(token.index(), 0); }
1376
1377 notify_mut(&world, nid).mark(src);
1379 system.dispatch(&mut world);
1380 assert_eq!(*world.resource::<u64>(), 110); assert_eq!(system.reactor_count(&world), 1); }
1383
1384 #[test]
1385 fn reactor_can_access_actor_notify() {
1386 let mut wb = WorldBuilder::new();
1389 wb.register::<u64>(0);
1390 wb.register(ReactorNotify::new(4, 8));
1391 wb.register(DeferredRemovals::default());
1392 let mut world = wb.build();
1393 let nid = world.id::<ReactorNotify>();
1394
1395 let mut system = ReactorSystem::new(&world);
1396
1397 struct Ctx {
1398 _reactor_id: Token,
1399 }
1400
1401 fn step(_ctx: &mut Ctx, notify: ResMut<ReactorNotify>, mut out: ResMut<u64>) {
1402 *out = notify.reactor_count() as u64;
1404 }
1405
1406 let src = {
1407 let reg = world.registry();
1408 let notify = notify_mut(&world, nid);
1409 let src = notify.register_source();
1410 notify
1411 .register(|t| Ctx { _reactor_id: t }, step, reg)
1412 .subscribe(src);
1413 src
1414 };
1415
1416 notify_mut(&world, nid).mark(src);
1417 system.dispatch(&mut world);
1418 }
1424
1425 #[test]
1428 fn multi_instrument_with_shared_source() {
1429 let mut wb = WorldBuilder::new();
1432 wb.register::<u64>(0);
1433 wb.register(ReactorNotify::new(8, 16));
1434 wb.register(DeferredRemovals::default());
1435 let mut world = wb.build();
1436 let nid = world.id::<ReactorNotify>();
1437 let mut system = ReactorSystem::new(&world);
1438
1439 struct Ctx {
1440 _reactor_id: Token,
1441 instrument: &'static str,
1442 }
1443
1444 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1445 *out += match ctx.instrument {
1447 "BTC" => 100,
1448 "ETH" => 10,
1449 "SOL" => 1,
1450 _ => 0,
1451 };
1452 }
1453
1454 let (btc_md, eth_md, sol_md, positions) = {
1455 let reg = world.registry();
1456 let notify = notify_mut(&world, nid);
1457
1458 let btc_md = notify.register_source();
1460 let eth_md = notify.register_source();
1461 let sol_md = notify.register_source();
1462 let positions = notify.register_source();
1464
1465 notify
1467 .register(
1468 |t| Ctx {
1469 _reactor_id: t,
1470 instrument: "BTC",
1471 },
1472 step,
1473 reg,
1474 )
1475 .subscribe(btc_md)
1476 .subscribe(positions);
1477
1478 notify
1480 .register(
1481 |t| Ctx {
1482 _reactor_id: t,
1483 instrument: "ETH",
1484 },
1485 step,
1486 reg,
1487 )
1488 .subscribe(eth_md)
1489 .subscribe(positions);
1490
1491 notify
1493 .register(
1494 |t| Ctx {
1495 _reactor_id: t,
1496 instrument: "SOL",
1497 },
1498 step,
1499 reg,
1500 )
1501 .subscribe(sol_md)
1502 .subscribe(positions);
1503
1504 (btc_md, eth_md, sol_md, positions)
1505 };
1506
1507 notify_mut(&world, nid).mark(btc_md);
1509 system.dispatch(&mut world);
1510 assert_eq!(*world.resource::<u64>(), 100);
1511
1512 notify_mut(&world, nid).mark(positions);
1514 system.dispatch(&mut world);
1515 assert_eq!(*world.resource::<u64>(), 211); notify_mut(&world, nid).mark(btc_md);
1519 notify_mut(&world, nid).mark(eth_md);
1520 system.dispatch(&mut world);
1521 assert_eq!(*world.resource::<u64>(), 321); notify_mut(&world, nid).mark(btc_md);
1525 notify_mut(&world, nid).mark(positions);
1526 system.dispatch(&mut world);
1527 assert_eq!(*world.resource::<u64>(), 432); let ran = system.dispatch(&mut world);
1532 assert!(!ran);
1533 assert_eq!(*world.resource::<u64>(), 432);
1534
1535 notify_mut(&world, nid).mark(sol_md);
1537 system.dispatch(&mut world);
1538 assert_eq!(*world.resource::<u64>(), 433);
1539 }
1540
1541 #[test]
1542 fn per_reactor_fill_routing() {
1543 use std::collections::HashMap;
1547
1548 let mut wb = WorldBuilder::new();
1549 wb.register::<u64>(0);
1550 wb.register(ReactorNotify::new(8, 16));
1551 wb.register(DeferredRemovals::default());
1552 let mut world = wb.build();
1553 let nid = world.id::<ReactorNotify>();
1554 let mut system = ReactorSystem::new(&world);
1555
1556 struct Ctx {
1557 reactor_id: Token,
1558 }
1559
1560 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1561 *out += ctx.reactor_id.index() as u64 + 1;
1562 }
1563
1564 let mut fill_sources: HashMap<usize, DataSource> = HashMap::new();
1566
1567 {
1568 let reg = world.registry();
1569 let notify = notify_mut(&world, nid);
1570
1571 for _ in 0..3 {
1572 let fill_src = notify.register_source();
1574 let token = notify
1575 .register(|t| Ctx { reactor_id: t }, step, reg)
1576 .subscribe(fill_src)
1577 .token();
1578
1579 fill_sources.insert(token.index(), fill_src);
1580 }
1581 }
1582
1583 let wire_client_id: usize = 1;
1585 let fill_source = fill_sources[&wire_client_id];
1586 notify_mut(&world, nid).mark(fill_source);
1587 system.dispatch(&mut world);
1588 assert_eq!(*world.resource::<u64>(), 2);
1590
1591 let fill_source = fill_sources[&0];
1593 notify_mut(&world, nid).mark(fill_source);
1594 system.dispatch(&mut world);
1595 assert_eq!(*world.resource::<u64>(), 3);
1597 }
1598
1599 #[test]
1600 fn dynamic_source_registration() {
1601 let mut wb = WorldBuilder::new();
1604 wb.register::<u64>(0);
1605 wb.register(ReactorNotify::new(4, 8));
1606 wb.register(DeferredRemovals::default());
1607 let mut world = wb.build();
1608 let nid = world.id::<ReactorNotify>();
1609 let mut system = ReactorSystem::new(&world);
1610
1611 struct Ctx {
1612 _reactor_id: Token,
1613 value: u64,
1614 }
1615
1616 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1617 *out += ctx.value;
1618 }
1619
1620 let btc_md = {
1622 let reg = world.registry();
1623 let notify = notify_mut(&world, nid);
1624 let btc_md = notify.register_source();
1625 notify
1626 .register(
1627 |t| Ctx {
1628 _reactor_id: t,
1629 value: 10,
1630 },
1631 step,
1632 reg,
1633 )
1634 .subscribe(btc_md);
1635 btc_md
1636 };
1637
1638 notify_mut(&world, nid).mark(btc_md);
1639 system.dispatch(&mut world);
1640 assert_eq!(*world.resource::<u64>(), 10);
1641
1642 let eth_md = {
1644 let reg = world.registry();
1645 let notify = notify_mut(&world, nid);
1646 let eth_md = notify.register_source();
1647 notify
1648 .register(
1649 |t| Ctx {
1650 _reactor_id: t,
1651 value: 100,
1652 },
1653 step,
1654 reg,
1655 )
1656 .subscribe(eth_md);
1657 eth_md
1658 };
1659
1660 notify_mut(&world, nid).mark(btc_md);
1662 notify_mut(&world, nid).mark(eth_md);
1663 system.dispatch(&mut world);
1664 assert_eq!(*world.resource::<u64>(), 120); }
1666
1667 #[test]
1670 fn remove_source_and_reuse_slot() {
1671 let mut wb = WorldBuilder::new();
1672 wb.register::<u64>(0);
1673 wb.register(ReactorNotify::new(4, 8));
1674 wb.register(DeferredRemovals::default());
1675 let mut world = wb.build();
1676 let nid = world.id::<ReactorNotify>();
1677 let mut system = ReactorSystem::new(&world);
1678
1679 struct Ctx {
1680 _reactor_id: Token,
1681 value: u64,
1682 }
1683
1684 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1685 *out += ctx.value;
1686 }
1687
1688 let (src_a, src_b) = {
1690 let reg = world.registry();
1691 let notify = notify_mut(&world, nid);
1692 let src_a = notify.register_source();
1693 let src_b = notify.register_source();
1694 notify
1695 .register(
1696 |t| Ctx {
1697 _reactor_id: t,
1698 value: 10,
1699 },
1700 step,
1701 reg,
1702 )
1703 .subscribe(src_a);
1704 notify
1705 .register(
1706 |t| Ctx {
1707 _reactor_id: t,
1708 value: 100,
1709 },
1710 step,
1711 reg,
1712 )
1713 .subscribe(src_b);
1714 (src_a, src_b)
1715 };
1716
1717 notify_mut(&world, nid).remove_source(src_a);
1719
1720 notify_mut(&world, nid).mark(src_a);
1722 let ran = system.dispatch(&mut world);
1723 assert!(!ran);
1724
1725 notify_mut(&world, nid).mark(src_b);
1727 system.dispatch(&mut world);
1728 assert_eq!(*world.resource::<u64>(), 100);
1729
1730 let src_c = notify_mut(&world, nid).register_source();
1732 assert_eq!(src_c.0, src_a.0); let reg = world.registry();
1736 let notify = notify_mut(&world, nid);
1737 notify
1738 .register(
1739 |t| Ctx {
1740 _reactor_id: t,
1741 value: 1,
1742 },
1743 step,
1744 reg,
1745 )
1746 .subscribe(src_c);
1747
1748 notify_mut(&world, nid).mark(src_c);
1749 system.dispatch(&mut world);
1750 assert_eq!(*world.resource::<u64>(), 101); }
1752
1753 #[test]
1756 fn source_registry_basic() {
1757 let mut registry = SourceRegistry::new();
1758
1759 #[derive(Hash, Eq, PartialEq, Debug)]
1760 struct InstrumentId(u32);
1761
1762 let src_a = DataSource(0);
1763 let src_b = DataSource(1);
1764
1765 registry.insert(InstrumentId(1), src_a);
1766 registry.insert(InstrumentId(2), src_b);
1767
1768 assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1769 assert_eq!(registry.get(&InstrumentId(2)), Some(src_b));
1770 assert_eq!(registry.get(&InstrumentId(3)), None);
1771 assert!(registry.contains(&InstrumentId(1)));
1772 assert!(!registry.contains(&InstrumentId(3)));
1773 }
1774
1775 #[test]
1776 fn source_registry_multiple_key_types() {
1777 let mut registry = SourceRegistry::new();
1778
1779 #[derive(Hash, Eq, PartialEq)]
1780 struct InstrumentId(u32);
1781
1782 #[derive(Hash, Eq, PartialEq)]
1783 struct StrategyId(u32);
1784
1785 let src_a = DataSource(0);
1786 let src_b = DataSource(1);
1787
1788 registry.insert(InstrumentId(1), src_a);
1790 registry.insert(StrategyId(1), src_b);
1791
1792 assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1794 assert_eq!(registry.get(&StrategyId(1)), Some(src_b));
1795 }
1796
1797 #[test]
1798 fn source_registry_tuple_keys() {
1799 let mut registry = SourceRegistry::new();
1800
1801 let src = DataSource(42);
1802 registry.insert(("BTC", "Binance"), src);
1803
1804 assert_eq!(registry.get(&("BTC", "Binance")), Some(src));
1805 assert_eq!(registry.get(&("ETH", "Binance")), None);
1806 }
1807
1808 #[test]
1809 fn source_registry_remove() {
1810 let mut registry = SourceRegistry::new();
1811
1812 let src = DataSource(0);
1813 registry.insert(42u64, src);
1814
1815 assert_eq!(registry.remove(&42u64), Some(src));
1816 assert_eq!(registry.get(&42u64), None);
1817 assert_eq!(registry.remove(&42u64), None); }
1819
1820 #[test]
1821 fn source_registry_integrated_with_reactor_system() {
1822 let mut wb = WorldBuilder::new();
1823 wb.register::<u64>(0);
1824 wb.register(ReactorNotify::new(4, 8));
1825 wb.register(DeferredRemovals::default());
1826 wb.register(SourceRegistry::new());
1827 let mut world = wb.build();
1828 let nid = world.id::<ReactorNotify>();
1829 let mut system = ReactorSystem::new(&world);
1830
1831 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1832 struct Instrument(u32);
1833 const BTC: Instrument = Instrument(0);
1834 const ETH: Instrument = Instrument(1);
1835
1836 struct Ctx {
1837 _reactor_id: Token,
1838 value: u64,
1839 }
1840
1841 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1842 *out += ctx.value;
1843 }
1844
1845 let btc_src = notify_mut(&world, nid).register_source();
1847 let eth_src = notify_mut(&world, nid).register_source();
1848
1849 world.resource_mut::<SourceRegistry>().insert(BTC, btc_src);
1850 world.resource_mut::<SourceRegistry>().insert(ETH, eth_src);
1851
1852 {
1854 let reg = world.registry();
1855 let btc = world.resource::<SourceRegistry>().get(&BTC).unwrap();
1856 let notify = notify_mut(&world, nid);
1857 notify
1858 .register(
1859 |t| Ctx {
1860 _reactor_id: t,
1861 value: 10,
1862 },
1863 step,
1864 reg,
1865 )
1866 .subscribe(btc);
1867 }
1868
1869 notify_mut(&world, nid).mark(btc_src);
1871 system.dispatch(&mut world);
1872 assert_eq!(*world.resource::<u64>(), 10);
1873
1874 let removed = world.resource_mut::<SourceRegistry>().remove(&BTC);
1876 assert!(removed.is_some());
1877 notify_mut(&world, nid).remove_source(removed.unwrap());
1878
1879 notify_mut(&world, nid).mark(btc_src);
1881 let ran = system.dispatch(&mut world);
1882 assert!(!ran);
1883 }
1884
1885 #[test]
1888 fn source_registry_overwrite_key() {
1889 let mut registry = SourceRegistry::new();
1890 let src_a = DataSource(0);
1891 let src_b = DataSource(1);
1892
1893 registry.insert(42u32, src_a);
1894 assert_eq!(registry.get(&42u32), Some(src_a));
1895
1896 registry.insert(42u32, src_b);
1898 assert_eq!(registry.get(&42u32), Some(src_b));
1899 }
1900
1901 #[test]
1902 fn source_registry_empty_get() {
1903 let registry = SourceRegistry::new();
1904 assert_eq!(registry.get(&42u32), None);
1906 assert!(!registry.contains(&42u32));
1907 }
1908
1909 #[test]
1910 fn source_registry_enum_keys() {
1911 #[derive(Hash, Eq, PartialEq)]
1912 enum Venue {
1913 Binance,
1914 Coinbase,
1915 }
1916
1917 let mut registry = SourceRegistry::new();
1918 let src = DataSource(0);
1919 registry.insert(Venue::Binance, src);
1920
1921 assert_eq!(registry.get(&Venue::Binance), Some(src));
1922 assert_eq!(registry.get(&Venue::Coinbase), None);
1923 }
1924
1925 #[test]
1926 fn source_registry_composite_key() {
1927 #[derive(Hash, Eq, PartialEq)]
1929 struct StrategyId(u32);
1930 #[derive(Hash, Eq, PartialEq)]
1931 struct InstrumentId(u32);
1932 #[derive(Hash, Eq, PartialEq)]
1933 struct VenueId(u32);
1934
1935 let mut registry = SourceRegistry::new();
1936 let src = DataSource(5);
1937 registry.insert((StrategyId(1), InstrumentId(0), VenueId(2)), src);
1938
1939 assert_eq!(
1940 registry.get(&(StrategyId(1), InstrumentId(0), VenueId(2))),
1941 Some(src)
1942 );
1943 assert_eq!(
1945 registry.get(&(StrategyId(2), InstrumentId(0), VenueId(2))),
1946 None
1947 );
1948 }
1949
1950 #[test]
1953 fn full_lifecycle_add_trade_remove() {
1954 let mut wb = WorldBuilder::new();
1956 wb.register::<u64>(0);
1957 wb.register(ReactorNotify::new(4, 8));
1958 wb.register(DeferredRemovals::default());
1959 wb.register(SourceRegistry::new());
1960 let mut world = wb.build();
1961 let nid = world.id::<ReactorNotify>();
1962 let mut system = ReactorSystem::new(&world);
1963
1964 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1965 struct Instrument(u32);
1966
1967 struct Ctx {
1968 _reactor_id: Token,
1969 value: u64,
1970 }
1971
1972 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1973 *out += ctx.value;
1974 }
1975
1976 let btc_src = notify_mut(&world, nid).register_source();
1978 world
1979 .resource_mut::<SourceRegistry>()
1980 .insert(Instrument(0), btc_src);
1981
1982 {
1983 let reg = world.registry();
1984 let notify = notify_mut(&world, nid);
1985 notify
1986 .register(
1987 |t| Ctx {
1988 _reactor_id: t,
1989 value: 10,
1990 },
1991 step,
1992 reg,
1993 )
1994 .subscribe(btc_src);
1995 }
1996
1997 notify_mut(&world, nid).mark(btc_src);
1999 system.dispatch(&mut world);
2000 assert_eq!(*world.resource::<u64>(), 10);
2001
2002 let eth_src = notify_mut(&world, nid).register_source();
2004 world
2005 .resource_mut::<SourceRegistry>()
2006 .insert(Instrument(1), eth_src);
2007
2008 {
2009 let reg = world.registry();
2010 let notify = notify_mut(&world, nid);
2011 notify
2012 .register(
2013 |t| Ctx {
2014 _reactor_id: t,
2015 value: 100,
2016 },
2017 step,
2018 reg,
2019 )
2020 .subscribe(eth_src);
2021 }
2022
2023 notify_mut(&world, nid).mark(btc_src);
2025 notify_mut(&world, nid).mark(eth_src);
2026 system.dispatch(&mut world);
2027 assert_eq!(*world.resource::<u64>(), 120);
2028
2029 let removed = world
2031 .resource_mut::<SourceRegistry>()
2032 .remove(&Instrument(0));
2033 notify_mut(&world, nid).remove_source(removed.unwrap());
2034
2035 notify_mut(&world, nid).mark(eth_src);
2037 system.dispatch(&mut world);
2038 assert_eq!(*world.resource::<u64>(), 220);
2039
2040 let sol_src = notify_mut(&world, nid).register_source();
2042 world
2043 .resource_mut::<SourceRegistry>()
2044 .insert(Instrument(2), sol_src);
2045 assert_eq!(sol_src.0, btc_src.0); {
2048 let reg = world.registry();
2049 let notify = notify_mut(&world, nid);
2050 notify
2051 .register(
2052 |t| Ctx {
2053 _reactor_id: t,
2054 value: 1000,
2055 },
2056 step,
2057 reg,
2058 )
2059 .subscribe(sol_src);
2060 }
2061
2062 notify_mut(&world, nid).mark(sol_src);
2064 notify_mut(&world, nid).mark(eth_src);
2065 system.dispatch(&mut world);
2066 assert_eq!(*world.resource::<u64>(), 1320); }
2068
2069 #[test]
2070 fn multi_strategy_same_instrument() {
2071 let mut wb = WorldBuilder::new();
2073 wb.register::<u64>(0);
2074 wb.register(ReactorNotify::new(8, 16));
2075 wb.register(DeferredRemovals::default());
2076 wb.register(SourceRegistry::new());
2077 let mut world = wb.build();
2078 let nid = world.id::<ReactorNotify>();
2079 let mut system = ReactorSystem::new(&world);
2080
2081 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
2082 struct StrategyInstrument(&'static str, &'static str);
2083
2084 struct Ctx {
2085 _reactor_id: Token,
2086 value: u64,
2087 }
2088
2089 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2090 *out += ctx.value;
2091 }
2092
2093 let reg = world.registry();
2095 let notify = notify_mut(&world, nid);
2096
2097 let mm_btc = notify.register_source();
2098 let mm_eth = notify.register_source();
2099 let arb_btc = notify.register_source();
2100
2101 notify
2103 .register(
2104 |t| Ctx {
2105 _reactor_id: t,
2106 value: 1,
2107 },
2108 step,
2109 reg,
2110 )
2111 .subscribe(mm_btc);
2112 notify
2113 .register(
2114 |t| Ctx {
2115 _reactor_id: t,
2116 value: 2,
2117 },
2118 step,
2119 reg,
2120 )
2121 .subscribe(mm_eth);
2122
2123 notify
2125 .register(
2126 |t| Ctx {
2127 _reactor_id: t,
2128 value: 100,
2129 },
2130 step,
2131 reg,
2132 )
2133 .subscribe(arb_btc);
2134
2135 world
2137 .resource_mut::<SourceRegistry>()
2138 .insert(StrategyInstrument("MM", "BTC"), mm_btc);
2139 world
2140 .resource_mut::<SourceRegistry>()
2141 .insert(StrategyInstrument("MM", "ETH"), mm_eth);
2142 world
2143 .resource_mut::<SourceRegistry>()
2144 .insert(StrategyInstrument("ARB", "BTC"), arb_btc);
2145
2146 let mm_btc_src = world
2149 .resource::<SourceRegistry>()
2150 .get(&StrategyInstrument("MM", "BTC"))
2151 .unwrap();
2152 let arb_btc_src = world
2153 .resource::<SourceRegistry>()
2154 .get(&StrategyInstrument("ARB", "BTC"))
2155 .unwrap();
2156
2157 notify_mut(&world, nid).mark(mm_btc_src);
2158 notify_mut(&world, nid).mark(arb_btc_src);
2159 system.dispatch(&mut world);
2160 assert_eq!(*world.resource::<u64>(), 101); }
2162
2163 #[test]
2164 fn reactor_self_removal_with_registry_cleanup() {
2165 let mut wb = WorldBuilder::new();
2167 wb.register::<u64>(0);
2168 wb.register(ReactorNotify::new(4, 8));
2169 wb.register(DeferredRemovals::default());
2170 wb.register(SourceRegistry::new());
2171 let mut world = wb.build();
2172 let nid = world.id::<ReactorNotify>();
2173 let mut system = ReactorSystem::new(&world);
2174
2175 struct Ctx {
2176 reactor_id: Token,
2177 }
2178
2179 fn one_shot(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
2180 *out += 1;
2181 removals.deregister(ctx.reactor_id);
2182 }
2183
2184 let src = notify_mut(&world, nid).register_source();
2185 world
2186 .resource_mut::<SourceRegistry>()
2187 .insert("one-shot", src);
2188
2189 {
2190 let reg = world.registry();
2191 let notify = notify_mut(&world, nid);
2192 notify
2193 .register(|t| Ctx { reactor_id: t }, one_shot, reg)
2194 .subscribe(src);
2195 }
2196
2197 notify_mut(&world, nid).mark(src);
2199 system.dispatch(&mut world);
2200 assert_eq!(*world.resource::<u64>(), 1);
2201 assert_eq!(system.reactor_count(&world), 0);
2202
2203 assert!(world.resource::<SourceRegistry>().contains(&"one-shot"));
2205
2206 notify_mut(&world, nid).mark(src);
2208 let ran = system.dispatch(&mut world);
2209 assert!(!ran);
2210 }
2211
2212 #[test]
2213 fn many_reactors_same_source() {
2214 let mut wb = WorldBuilder::new();
2216 wb.register::<u64>(0);
2217 wb.register(ReactorNotify::new(4, 64));
2218 wb.register(DeferredRemovals::default());
2219 let mut world = wb.build();
2220 let nid = world.id::<ReactorNotify>();
2221 let mut system = ReactorSystem::new(&world);
2222
2223 struct Ctx {
2224 _reactor_id: Token,
2225 }
2226
2227 fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2228 *out += 1;
2229 }
2230
2231 let src = notify_mut(&world, nid).register_source();
2232
2233 {
2234 let reg = world.registry();
2235 let notify = notify_mut(&world, nid);
2236 for _ in 0..50 {
2237 notify
2238 .register(|t| Ctx { _reactor_id: t }, step, reg)
2239 .subscribe(src);
2240 }
2241 }
2242
2243 assert_eq!(system.reactor_count(&world), 50);
2244
2245 notify_mut(&world, nid).mark(src);
2246 system.dispatch(&mut world);
2247 assert_eq!(*world.resource::<u64>(), 50); }
2249
2250 #[test]
2251 fn reactor_subscribes_to_multiple_sources() {
2252 let mut wb = WorldBuilder::new();
2255 wb.register::<u64>(0);
2256 wb.register(ReactorNotify::new(8, 8));
2257 wb.register(DeferredRemovals::default());
2258 let mut world = wb.build();
2259 let nid = world.id::<ReactorNotify>();
2260 let mut system = ReactorSystem::new(&world);
2261
2262 struct Ctx {
2263 _reactor_id: Token,
2264 }
2265
2266 fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2267 *out += 1;
2268 }
2269
2270 let mut sources = Vec::new();
2271 let notify = notify_mut(&world, nid);
2272 for _ in 0..5 {
2273 sources.push(notify.register_source());
2274 }
2275
2276 {
2277 let reg = world.registry();
2278 let notify = notify_mut(&world, nid);
2279 let mut registration = notify.register(|t| Ctx { _reactor_id: t }, step, reg);
2280 for &src in &sources {
2281 registration = registration.subscribe(src);
2282 }
2283 }
2284
2285 for &src in &sources {
2287 notify_mut(&world, nid).mark(src);
2288 }
2289
2290 system.dispatch(&mut world);
2291 assert_eq!(*world.resource::<u64>(), 1); }
2293
2294 #[test]
2295 fn stale_data_source_is_noop() {
2296 let mut wb = WorldBuilder::new();
2298 wb.register(ReactorNotify::new(4, 4));
2299 wb.register(DeferredRemovals::default());
2300 let mut world = wb.build();
2301 let nid = world.id::<ReactorNotify>();
2302 let mut system = ReactorSystem::new(&world);
2303
2304 let src = notify_mut(&world, nid).register_source();
2305 notify_mut(&world, nid).remove_source(src);
2306
2307 notify_mut(&world, nid).mark(src);
2309 let ran = system.dispatch(&mut world);
2310 assert!(!ran);
2311 }
2312
2313 #[test]
2314 fn double_remove_source_is_noop() {
2315 let mut notify = ReactorNotify::new(4, 4);
2316 let src = notify.register_source();
2317 notify.remove_source(src);
2318 notify.remove_source(src); }
2320
2321 #[test]
2324 fn pipeline_reactor_dispatch() {
2325 use crate::CtxPipelineBuilder;
2326
2327 let mut wb = WorldBuilder::new();
2328 wb.register::<u64>(0);
2329 wb.register(ReactorNotify::new(4, 8));
2330 wb.register(DeferredRemovals::default());
2331 let mut world = wb.build();
2332 let nid = world.id::<ReactorNotify>();
2333 let mut system = ReactorSystem::new(&world);
2334
2335 struct Ctx {
2336 _reactor_id: Token,
2337 instrument: &'static str,
2338 }
2339
2340 fn read_data(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2341 let _ = ctx.instrument;
2342 *val
2343 }
2344
2345 fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2346 x * 2
2347 }
2348
2349 fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2350 *out = x;
2351 }
2352
2353 let reg = world.registry();
2354
2355 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2356 .then(read_data, reg)
2357 .then(double, reg)
2358 .then(store, reg)
2359 .build();
2360
2361 let notify = notify_mut(&world, nid);
2362 let src = notify.register_source();
2363
2364 let reactor = PipelineReactor::new(
2366 Ctx {
2367 _reactor_id: Token::new(0),
2368 instrument: "BTC",
2369 },
2370 pipeline,
2371 );
2372 notify.register_built(reactor).subscribe(src);
2373
2374 *world.resource_mut::<u64>() = 10;
2376 notify_mut(&world, nid).mark(src);
2377 system.dispatch(&mut world);
2378
2379 assert_eq!(*world.resource::<u64>(), 20); }
2381
2382 #[test]
2383 fn dag_reactor_dispatch() {
2384 use crate::CtxDagBuilder;
2385
2386 let mut wb = WorldBuilder::new();
2387 wb.register::<u64>(0);
2388 wb.register(ReactorNotify::new(4, 8));
2389 wb.register(DeferredRemovals::default());
2390 let mut world = wb.build();
2391 let nid = world.id::<ReactorNotify>();
2392 let mut system = ReactorSystem::new(&world);
2393
2394 struct Ctx {
2395 _reactor_id: Token,
2396 }
2397
2398 fn root(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2399 let _ = ctx;
2400 *val
2401 }
2402
2403 fn arm_double(_ctx: &mut Ctx, val: &u64) -> u64 {
2404 *val * 2
2405 }
2406
2407 fn arm_add(_ctx: &mut Ctx, val: &u64) -> u64 {
2408 *val + 10
2409 }
2410
2411 fn merge(_ctx: &mut Ctx, mut out: ResMut<u64>, a: &u64, b: &u64) {
2412 *out = *a + *b;
2413 }
2414
2415 let reg = world.registry();
2416
2417 let dag = CtxDagBuilder::<Ctx, ()>::new()
2418 .root(root, reg)
2419 .fork()
2420 .arm(|seed| seed.then(arm_double, reg))
2421 .arm(|seed| seed.then(arm_add, reg))
2422 .merge(merge, reg)
2423 .build();
2424
2425 let notify = notify_mut(&world, nid);
2426 let src = notify.register_source();
2427
2428 let reactor = PipelineReactor::new(
2429 Ctx {
2430 _reactor_id: Token::new(0),
2431 },
2432 dag,
2433 );
2434 notify.register_built(reactor).subscribe(src);
2435
2436 *world.resource_mut::<u64>() = 5;
2437 notify_mut(&world, nid).mark(src);
2438 system.dispatch(&mut world);
2439
2440 assert_eq!(*world.resource::<u64>(), 25);
2442 }
2443
2444 #[test]
2445 fn multiple_pipeline_reactors_different_bodies() {
2446 use crate::CtxPipelineBuilder;
2447
2448 let mut wb = WorldBuilder::new();
2449 wb.register::<u64>(0);
2450 wb.register(ReactorNotify::new(4, 8));
2451 wb.register(DeferredRemovals::default());
2452 let mut world = wb.build();
2453 let nid = world.id::<ReactorNotify>();
2454 let mut system = ReactorSystem::new(&world);
2455
2456 struct Ctx {
2457 _reactor_id: Token,
2458 factor: u64,
2459 }
2460
2461 fn multiply(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2462 *val * ctx.factor
2463 }
2464
2465 fn accumulate(_ctx: &mut Ctx, mut out: ResMut<u64>, val: u64) {
2466 *out += val;
2467 }
2468
2469 let reg = world.registry();
2470
2471 let pipeline_a = CtxPipelineBuilder::<Ctx, ()>::new()
2473 .then(multiply, reg)
2474 .then(accumulate, reg)
2475 .build();
2476
2477 let pipeline_b = CtxPipelineBuilder::<Ctx, ()>::new()
2479 .then(multiply, reg)
2480 .then(accumulate, reg)
2481 .build();
2482
2483 let notify = notify_mut(&world, nid);
2484 let src = notify.register_source();
2485
2486 notify
2487 .register_built(PipelineReactor::new(
2488 Ctx {
2489 _reactor_id: Token::new(0),
2490 factor: 2,
2491 },
2492 pipeline_a,
2493 ))
2494 .subscribe(src);
2495
2496 notify
2497 .register_built(PipelineReactor::new(
2498 Ctx {
2499 _reactor_id: Token::new(1),
2500 factor: 10,
2501 },
2502 pipeline_b,
2503 ))
2504 .subscribe(src);
2505
2506 *world.resource_mut::<u64>() = 5;
2507 notify_mut(&world, nid).mark(src);
2508 system.dispatch(&mut world);
2509
2510 let val = *world.resource::<u64>();
2515 assert!(val > 5, "both reactors should have run, got {val}");
2516 }
2517
2518 #[test]
2519 fn pipeline_reactor_with_guard() {
2520 use crate::CtxPipelineBuilder;
2521
2522 let mut wb = WorldBuilder::new();
2523 wb.register::<u64>(0);
2524 wb.register(ReactorNotify::new(4, 8));
2525 wb.register(DeferredRemovals::default());
2526 let mut world = wb.build();
2527 let nid = world.id::<ReactorNotify>();
2528 let mut system = ReactorSystem::new(&world);
2529
2530 struct Ctx {
2531 _reactor_id: Token,
2532 threshold: u64,
2533 }
2534
2535 fn read(_ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2536 *val
2537 }
2538
2539 fn above_threshold(ctx: &mut Ctx, val: &u64) -> bool {
2540 *val > ctx.threshold
2541 }
2542
2543 fn write(_ctx: &mut Ctx, mut out: ResMut<u64>, _val: u64) {
2544 *out = 999;
2545 }
2546
2547 let reg = world.registry();
2548
2549 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2550 .then(read, reg)
2551 .guard(above_threshold, reg)
2552 .map(write, reg)
2553 .build();
2554
2555 let notify = notify_mut(&world, nid);
2556 let src = notify.register_source();
2557
2558 notify
2559 .register_built(PipelineReactor::new(
2560 Ctx {
2561 _reactor_id: Token::new(0),
2562 threshold: 10,
2563 },
2564 pipeline,
2565 ))
2566 .subscribe(src);
2567
2568 *world.resource_mut::<u64>() = 5;
2570 notify_mut(&world, nid).mark(src);
2571 system.dispatch(&mut world);
2572 assert_eq!(*world.resource::<u64>(), 5); *world.resource_mut::<u64>() = 20;
2576 notify_mut(&world, nid).mark(src);
2577 system.dispatch(&mut world);
2578 assert_eq!(*world.resource::<u64>(), 999);
2579 }
2580
2581 #[test]
2584 fn two_phase_registration_safe_api() {
2585 let mut wb = WorldBuilder::new();
2588 wb.register::<u64>(0);
2589 wb.register(ReactorNotify::new(4, 8));
2590 wb.register(DeferredRemovals::default());
2591 let mut world = wb.build();
2592
2593 let mut system = ReactorSystem::new(&world);
2594
2595 struct Ctx {
2596 reactor_id: Token,
2597 instrument: &'static str,
2598 }
2599
2600 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2601 let _ = ctx.instrument;
2602 *out += ctx.reactor_id.index() as u64 + 1;
2603 }
2604
2605 let src = world.resource_mut::<ReactorNotify>().register_source();
2607 let token = world.resource_mut::<ReactorNotify>().create_reactor();
2608
2609 let reactor = step.into_reactor(
2611 Ctx {
2612 reactor_id: token,
2613 instrument: "BTC",
2614 },
2615 world.registry(),
2616 );
2617
2618 world
2620 .resource_mut::<ReactorNotify>()
2621 .insert_reactor(token, reactor)
2622 .subscribe(src);
2623
2624 world.resource_mut::<ReactorNotify>().mark(src);
2626 system.dispatch(&mut world);
2627 assert_eq!(*world.resource::<u64>(), 1); let token2 = world.resource_mut::<ReactorNotify>().create_reactor();
2631 let actor2 = step.into_reactor(
2632 Ctx {
2633 reactor_id: token2,
2634 instrument: "ETH",
2635 },
2636 world.registry(),
2637 );
2638 world
2639 .resource_mut::<ReactorNotify>()
2640 .insert_reactor(token2, actor2)
2641 .subscribe(src);
2642
2643 world.resource_mut::<ReactorNotify>().mark(src);
2645 system.dispatch(&mut world);
2646 assert_eq!(*world.resource::<u64>(), 4); }
2648
2649 #[test]
2650 fn two_phase_with_pipeline_reactor() {
2651 use crate::CtxPipelineBuilder;
2652
2653 let mut wb = WorldBuilder::new();
2654 wb.register::<u64>(0);
2655 wb.register(ReactorNotify::new(4, 8));
2656 wb.register(DeferredRemovals::default());
2657 let mut world = wb.build();
2658
2659 let mut system = ReactorSystem::new(&world);
2660
2661 struct Ctx {
2662 _reactor_id: Token,
2663 }
2664
2665 fn read(ctx: &mut Ctx, val: Res<u64>, _: ()) -> u64 {
2666 let _ = ctx;
2667 *val
2668 }
2669
2670 fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2671 x * 2
2672 }
2673
2674 fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2675 *out = x;
2676 }
2677
2678 let src = world.resource_mut::<ReactorNotify>().register_source();
2680 let token = world.resource_mut::<ReactorNotify>().create_reactor();
2681
2682 let reg = world.registry();
2684 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2685 .then(read, reg)
2686 .then(double, reg)
2687 .then(store, reg)
2688 .build();
2689 let reactor = PipelineReactor::new(Ctx { _reactor_id: token }, pipeline);
2690
2691 world
2693 .resource_mut::<ReactorNotify>()
2694 .insert_reactor(token, reactor)
2695 .subscribe(src);
2696
2697 *world.resource_mut::<u64>() = 10;
2698 world.resource_mut::<ReactorNotify>().mark(src);
2699 system.dispatch(&mut world);
2700 assert_eq!(*world.resource::<u64>(), 20); }
2702}