1use std::any::{Any, TypeId};
123use std::hash::Hash;
124
125use nexus_notify::local::LocalNotify;
126use nexus_notify::{Events, Token};
127use rustc_hash::FxHashMap;
128
129use crate::ctx_pipeline::CtxStepCall;
130use crate::handler::Param;
131use crate::world::{Registry, Resource, ResourceId, World};
132
133#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
142pub struct DataSource(pub usize);
143
144pub struct ReactorNotify {
158 notify: LocalNotify,
160
161 interests: slab::Slab<Vec<Token>>,
165
166 reactor_sources: Vec<Vec<DataSource>>,
169
170 reactors: slab::Slab<Option<Box<dyn Reactor>>>,
175}
176
177impl std::fmt::Debug for ReactorNotify {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 f.debug_struct("ReactorNotify")
181 .field("num_sources", &self.interests.len())
182 .field("num_reactors", &self.reactors.len())
183 .field("notify", &self.notify)
184 .finish()
185 }
186}
187
188impl ReactorNotify {
189 pub fn new(source_capacity: usize, reactor_capacity: usize) -> Self {
191 Self {
192 notify: LocalNotify::with_capacity(reactor_capacity),
193 interests: slab::Slab::with_capacity(source_capacity),
194 reactor_sources: Vec::with_capacity(reactor_capacity),
195 reactors: slab::Slab::with_capacity(reactor_capacity),
196 }
197 }
198
199 pub fn register_source(&mut self) -> DataSource {
205 DataSource(self.interests.insert(Vec::new()))
206 }
207
208 pub fn remove_source(&mut self, source: DataSource) {
213 if self.interests.contains(source.0) {
214 self.interests.remove(source.0);
215 }
216 }
217
218 pub fn create_reactor(&mut self) -> Token {
245 let key = self.reactors.insert(None);
248 self.notify.ensure_capacity(key);
249 if key >= self.reactor_sources.len() {
251 self.reactor_sources.resize_with(key + 1, Vec::new);
252 }
253 Token::new(key)
254 }
255
256 pub fn insert_reactor(
266 &mut self,
267 token: Token,
268 reactor: impl Reactor + 'static,
269 ) -> ReactorRegistration<'_> {
270 let idx = token.index();
271 assert!(
272 self.reactors.contains(idx),
273 "token {} was not allocated by create_reactor",
274 idx,
275 );
276 assert!(
277 self.reactors[idx].is_none(),
278 "token {} was already filled",
279 idx,
280 );
281 self.reactors[idx] = Some(Box::new(reactor));
282 ReactorRegistration {
283 token,
284 notify: self,
285 }
286 }
287
288 pub fn register<C, Params, F: IntoReactor<C, Params>>(
295 &mut self,
296 ctx_fn: impl FnOnce(Token) -> C,
297 step: F,
298 registry: &Registry,
299 ) -> ReactorRegistration<'_> {
300 let key = self.reactors.vacant_key();
301 let token = Token::new(key);
302 self.notify.ensure_capacity(key);
303 if key >= self.reactor_sources.len() {
304 self.reactor_sources.resize_with(key + 1, Vec::new);
305 }
306 let ctx = ctx_fn(token);
307 let reactor = step.into_reactor(ctx, registry);
308 let inserted = self.reactors.insert(Some(Box::new(reactor)));
309 debug_assert_eq!(inserted, key);
310 ReactorRegistration {
311 token,
312 notify: self,
313 }
314 }
315
316 pub fn register_built(&mut self, reactor: impl Reactor + 'static) -> ReactorRegistration<'_> {
323 let key = self.reactors.vacant_key();
324 let token = Token::new(key);
325 self.notify.ensure_capacity(key);
326 if key >= self.reactor_sources.len() {
327 self.reactor_sources.resize_with(key + 1, Vec::new);
328 }
329 let inserted = self.reactors.insert(Some(Box::new(reactor)));
330 debug_assert_eq!(inserted, key);
331 ReactorRegistration {
332 token,
333 notify: self,
334 }
335 }
336
337 pub fn subscribe(&mut self, reactor: Token, source: DataSource) {
344 if let Some(subscribers) = self.interests.get_mut(source.0) {
345 if !subscribers.contains(&reactor) {
346 subscribers.push(reactor);
347 let idx = reactor.index();
349 debug_assert!(
350 idx < self.reactor_sources.len(),
351 "reactor_sources missing entry for reactor token {}",
352 idx,
353 );
354 self.reactor_sources[idx].push(source);
355 }
356 }
357 }
358
359 pub fn unsubscribe(&mut self, reactor: Token, source: DataSource) {
361 if let Some(subscribers) = self.interests.get_mut(source.0) {
362 subscribers.retain(|&t| t != reactor);
363 }
364 if let Some(sources) = self.reactor_sources.get_mut(reactor.index()) {
365 sources.retain(|&s| s != source);
366 }
367 }
368
369 #[inline]
376 pub fn mark(&mut self, source: DataSource) {
377 if let Some(subscribers) = self.interests.get(source.0) {
378 for &reactor_token in subscribers {
379 self.notify.mark(reactor_token);
380 }
381 }
382 }
383
384 #[inline]
386 pub(crate) fn poll(&mut self, events: &mut Events) {
387 self.notify.poll(events);
388 }
389
390 #[inline]
393 pub(crate) fn take_reactor(&mut self, idx: usize) -> Option<Box<dyn Reactor>> {
394 self.reactors.get_mut(idx).and_then(Option::take)
395 }
396
397 #[inline]
403 pub(crate) fn put_reactor(&mut self, idx: usize, reactor: Box<dyn Reactor>) {
404 self.reactors[idx] = Some(reactor);
405 }
406
407 pub fn remove_reactor(&mut self, token: Token) {
412 let idx = token.index();
413 if self.reactors.contains(idx) {
414 self.reactors.remove(idx);
415 if let Some(sources) = self.reactor_sources.get_mut(idx) {
417 for &source in sources.iter() {
418 if let Some(subscribers) = self.interests.get_mut(source.0) {
419 subscribers.retain(|&t| t != token);
420 }
421 }
422 sources.clear();
423 }
424 }
425 }
426
427 pub fn has_notified(&self) -> bool {
431 self.notify.has_notified()
432 }
433
434 pub fn notified_count(&self) -> usize {
436 self.notify.notified_count()
437 }
438
439 pub fn source_count(&self) -> usize {
441 self.interests.len()
442 }
443
444 pub fn reactor_count(&self) -> usize {
446 self.reactors.len()
447 }
448}
449
450impl Resource for ReactorNotify {}
451
452pub struct ReactorRegistration<'a> {
458 token: Token,
459 notify: &'a mut ReactorNotify,
460}
461
462impl ReactorRegistration<'_> {
463 pub fn subscribe(self, source: DataSource) -> Self {
465 self.notify.subscribe(self.token, source);
466 self
467 }
468
469 pub fn token(&self) -> Token {
471 self.token
472 }
473}
474
475pub trait Reactor: Send {
489 fn run(&mut self, world: &mut World);
491
492 fn name(&self) -> &'static str {
494 "<unnamed>"
495 }
496}
497
498pub struct ReactorFn<C, F, Params: Param> {
508 pub ctx: C,
510 f: F,
511 state: Params::State,
512 name: &'static str,
513}
514
515pub struct PipelineReactor<C> {
542 pub ctx: C,
544 body: Box<dyn CtxStepCall<C, (), Out = ()> + Send>,
545}
546
547impl<C: Send + 'static> PipelineReactor<C> {
548 pub fn new(ctx: C, body: impl CtxStepCall<C, (), Out = ()> + Send + 'static) -> Self {
554 Self {
555 ctx,
556 body: Box::new(body),
557 }
558 }
559}
560
561impl<C: Send + 'static> Reactor for PipelineReactor<C> {
562 fn run(&mut self, world: &mut World) {
563 self.body.call(&mut self.ctx, world, ());
564 }
565
566 fn name(&self) -> &'static str {
567 std::any::type_name::<C>()
568 }
569}
570
571#[diagnostic::on_unimplemented(
591 message = "this function cannot be used as a reactor step",
592 note = "reactor step signature: `fn(&mut C, Params...)` — context first, then resources",
593 note = "closures with resource parameters are not supported — use a named `fn`"
594)]
595pub trait IntoReactor<C, Params> {
596 type Reactor: Reactor + 'static;
598
599 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor;
602}
603
604impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> Reactor for ReactorFn<C, F, ()> {
609 fn run(&mut self, _world: &mut World) {
610 (self.f)(&mut self.ctx);
611 }
612
613 fn name(&self) -> &'static str {
614 self.name
615 }
616}
617
618impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> IntoReactor<C, ()> for F {
619 type Reactor = ReactorFn<C, F, ()>;
620
621 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
622 ReactorFn {
623 ctx,
624 f: self,
625 state: <() as Param>::init(registry),
626 name: std::any::type_name::<F>(),
627 }
628 }
629}
630
631macro_rules! impl_into_reactor {
636 ($($P:ident),+) => {
637 impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
638 Reactor for ReactorFn<C, F, ($($P,)+)>
639 where
640 for<'a> &'a mut F:
641 FnMut(&mut C, $($P,)+) +
642 FnMut(&mut C, $($P::Item<'a>,)+),
643 {
644 #[allow(non_snake_case)]
645 fn run(&mut self, world: &mut World) {
646 #[allow(clippy::too_many_arguments)]
647 fn call_inner<Ctx, $($P,)+>(
648 mut f: impl FnMut(&mut Ctx, $($P,)+),
649 ctx: &mut Ctx,
650 $($P: $P,)+
651 ) {
652 f(ctx, $($P,)+);
653 }
654
655 #[cfg(debug_assertions)]
659 world.clear_borrows();
660 let ($($P,)+) = unsafe {
661 <($($P,)+) as Param>::fetch(world, &mut self.state)
662 };
663 call_inner(&mut self.f, &mut self.ctx, $($P,)+);
664 }
665
666 fn name(&self) -> &'static str {
667 self.name
668 }
669 }
670
671 impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
672 IntoReactor<C, ($($P,)+)> for F
673 where
674 for<'a> &'a mut F:
675 FnMut(&mut C, $($P,)+) +
676 FnMut(&mut C, $($P::Item<'a>,)+),
677 {
678 type Reactor = ReactorFn<C, F, ($($P,)+)>;
679
680 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
681 let state = <($($P,)+) as Param>::init(registry);
682 {
683 #[allow(non_snake_case)]
684 let ($($P,)+) = &state;
685 registry.check_access(&[
686 $(
687 (<$P as Param>::resource_id($P),
688 std::any::type_name::<$P>()),
689 )+
690 ]);
691 }
692 ReactorFn {
693 ctx,
694 f: self,
695 state,
696 name: std::any::type_name::<F>(),
697 }
698 }
699 }
700 };
701}
702
703all_tuples!(impl_into_reactor);
704
705#[derive(Default)]
715pub struct DeferredRemovals {
716 tokens: Vec<Token>,
717}
718
719impl DeferredRemovals {
720 pub fn deregister(&mut self, token: Token) {
725 self.tokens.push(token);
726 }
727
728 #[inline]
731 pub(crate) fn take(&mut self) -> Vec<Token> {
732 std::mem::take(&mut self.tokens)
733 }
734
735 #[inline]
737 pub(crate) fn put(&mut self, tokens: Vec<Token>) {
738 debug_assert!(tokens.is_empty(), "put() expects a drained Vec");
739 self.tokens = tokens;
740 }
741
742 pub fn is_empty(&self) -> bool {
744 self.tokens.is_empty()
745 }
746}
747
748impl Resource for DeferredRemovals {}
749
750#[derive(Default)]
780pub struct SourceRegistry {
781 maps: FxHashMap<TypeId, Box<dyn Any + Send>>,
782}
783
784impl SourceRegistry {
785 pub fn new() -> Self {
787 Self::default()
788 }
789
790 pub fn insert<K: Hash + Eq + Send + 'static>(&mut self, key: K, source: DataSource) {
794 self.get_or_create_map::<K>().insert(key, source);
795 }
796
797 pub fn get<K: Hash + Eq + Send + 'static>(&self, key: &K) -> Option<DataSource> {
801 self.get_map::<K>().and_then(|map| map.get(key)).copied()
802 }
803
804 pub fn remove<K: Hash + Eq + Send + 'static>(&mut self, key: &K) -> Option<DataSource> {
807 self.get_map_mut::<K>().and_then(|map| map.remove(key))
808 }
809
810 pub fn contains<K: Hash + Eq + Send + 'static>(&self, key: &K) -> bool {
812 self.get_map::<K>().is_some_and(|map| map.contains_key(key))
813 }
814
815 fn get_map<K: Hash + Eq + Send + 'static>(&self) -> Option<&FxHashMap<K, DataSource>> {
816 self.maps.get(&TypeId::of::<K>()).map(|boxed| {
817 boxed.downcast_ref::<FxHashMap<K, DataSource>>().unwrap()
819 })
820 }
821
822 fn get_map_mut<K: Hash + Eq + Send + 'static>(
823 &mut self,
824 ) -> Option<&mut FxHashMap<K, DataSource>> {
825 self.maps.get_mut(&TypeId::of::<K>()).map(|boxed| {
826 boxed.downcast_mut::<FxHashMap<K, DataSource>>().unwrap()
828 })
829 }
830
831 fn get_or_create_map<K: Hash + Eq + Send + 'static>(
832 &mut self,
833 ) -> &mut FxHashMap<K, DataSource> {
834 self.maps
835 .entry(TypeId::of::<K>())
836 .or_insert_with(|| Box::<FxHashMap<K, DataSource>>::default())
837 .downcast_mut::<FxHashMap<K, DataSource>>()
839 .unwrap()
840 }
841}
842
843impl Resource for SourceRegistry {}
844
845pub struct ReactorSystem {
856 events: Events,
858
859 notify_id: ResourceId,
861 removals_id: ResourceId,
862}
863
864impl ReactorSystem {
865 pub fn new(world: &World) -> Self {
869 Self {
870 events: Events::with_capacity(256),
871 notify_id: world.id::<ReactorNotify>(),
872 removals_id: world.id::<DeferredRemovals>(),
873 }
874 }
875
876 pub fn dispatch(&mut self, world: &mut World) -> bool {
884 let notify_ptr: *mut ReactorNotify =
887 unsafe { world.get_mut::<ReactorNotify>(self.notify_id) };
888
889 {
891 let notify = unsafe { &mut *notify_ptr };
892 notify.poll(&mut self.events);
893 }
894 let ran = !self.events.is_empty();
895
896 for token in self.events.iter() {
899 let idx = token.index();
900 let reactor = {
903 let notify = unsafe { &mut *notify_ptr };
904 notify.take_reactor(idx)
905 };
906 if let Some(mut reactor) = reactor {
907 reactor.run(world);
908 let notify = unsafe { &mut *notify_ptr };
911 notify.put_reactor(idx, reactor);
912 }
913 }
914
915 let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
919 let mut pending = removals.take();
920 if !pending.is_empty() {
921 let notify = unsafe { &mut *notify_ptr };
923 while let Some(token) = pending.pop() {
924 notify.remove_reactor(token);
925 }
926 }
927 let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
929 removals.put(pending);
930
931 ran
932 }
933
934 pub fn reactor_count(&self, world: &World) -> usize {
936 world.resource::<ReactorNotify>().reactor_count()
937 }
938}
939
940#[cfg(test)]
945mod tests {
946 use super::*;
947 use crate::{Res, ResMut, WorldBuilder};
948
949 #[test]
952 fn reactor_fn_arity0() {
953 let wb = WorldBuilder::new();
954 let mut world = wb.build();
955 let reg = world.registry();
956
957 struct Ctx {
958 count: u32,
959 }
960
961 fn step(ctx: &mut Ctx) {
962 ctx.count += 1;
963 }
964
965 let mut reactor = step.into_reactor(Ctx { count: 0 }, reg);
966 reactor.run(&mut world);
967 assert_eq!(reactor.ctx.count, 1);
968 }
969
970 #[test]
971 fn reactor_fn_with_params() {
972 let mut wb = WorldBuilder::new();
973 wb.register::<u64>(10);
974 wb.register::<u32>(0);
975 let mut world = wb.build();
976 let reg = world.registry();
977
978 struct Ctx {
979 multiplier: u64,
980 }
981
982 fn step(ctx: &mut Ctx, val: Res<u64>, mut out: ResMut<u32>) {
983 *out = (*val * ctx.multiplier) as u32;
984 }
985
986 let mut reactor = step.into_reactor(Ctx { multiplier: 5 }, reg);
987 reactor.run(&mut world);
988 assert_eq!(*world.resource::<u32>(), 50);
989 }
990
991 fn dummy_reactor() -> ReactorFn<(), fn(&mut ()), ()> {
994 ReactorFn {
995 ctx: (),
996 f: (|_: &mut ()| {}) as fn(&mut ()),
997 state: (),
998 name: "dummy",
999 }
1000 }
1001
1002 #[test]
1003 fn reactor_notify_mark_fans_out() {
1004 let mut notify = ReactorNotify::new(4, 8);
1005 let mut events = Events::with_capacity(8);
1006
1007 let src = notify.register_source();
1008 let a1 = notify.register_built(dummy_reactor()).token();
1009 let a2 = notify.register_built(dummy_reactor()).token();
1010 let _a3 = notify.register_built(dummy_reactor()).token();
1011
1012 notify.subscribe(a1, src);
1013 notify.subscribe(a2, src);
1014 notify.mark(src);
1017 notify.notify.poll(&mut events);
1018
1019 assert_eq!(events.len(), 2);
1020 assert!(events.as_slice().contains(&a1));
1021 assert!(events.as_slice().contains(&a2));
1022 }
1023
1024 #[test]
1025 fn reactor_notify_dedup_across_sources() {
1026 let mut notify = ReactorNotify::new(4, 8);
1027 let mut events = Events::with_capacity(8);
1028
1029 let src1 = notify.register_source();
1030 let src2 = notify.register_source();
1031 let reactor = notify.register_built(dummy_reactor()).token();
1032
1033 notify.subscribe(reactor, src1);
1034 notify.subscribe(reactor, src2);
1035
1036 notify.mark(src1);
1037 notify.mark(src2);
1038
1039 notify.notify.poll(&mut events);
1040 assert_eq!(events.len(), 1);
1041 assert_eq!(events.as_slice()[0], reactor);
1042 }
1043
1044 #[test]
1045 fn reactor_notify_remove_reactor() {
1046 let mut notify = ReactorNotify::new(4, 8);
1047 let mut events = Events::with_capacity(8);
1048
1049 let src = notify.register_source();
1050
1051 struct Ctx;
1052 let token = notify
1053 .register_built(ReactorFn {
1054 ctx: Ctx,
1055 f: (|_: &mut Ctx| {}) as fn(&mut Ctx),
1056 state: (),
1057 name: "test",
1058 })
1059 .token();
1060 notify.subscribe(token, src);
1061
1062 notify.remove_reactor(token);
1063 notify.mark(src);
1064 notify.notify.poll(&mut events);
1065 assert!(events.is_empty());
1066 }
1067
1068 fn notify_mut(world: &World, id: ResourceId) -> &mut ReactorNotify {
1074 unsafe { world.get_mut::<ReactorNotify>(id) }
1075 }
1076
1077 #[test]
1078 fn reactor_system_dispatch() {
1079 let mut wb = WorldBuilder::new();
1080 wb.register::<u64>(0);
1081 wb.register(ReactorNotify::new(4, 8));
1082 wb.register(DeferredRemovals::default());
1083 let mut world = wb.build();
1084 let reg = world.registry();
1085 let nid = world.id::<ReactorNotify>();
1086
1087 let mut system = ReactorSystem::new(&world);
1088
1089 struct Ctx {
1090 _reactor_id: Token,
1091 increment: u64,
1092 }
1093
1094 fn step(ctx: &mut Ctx, mut val: ResMut<u64>) {
1095 *val += ctx.increment;
1096 }
1097
1098 let notify = notify_mut(&world, nid);
1099 let src = notify.register_source();
1100 notify
1101 .register(
1102 |t| Ctx {
1103 _reactor_id: t,
1104 increment: 10,
1105 },
1106 step,
1107 reg,
1108 )
1109 .subscribe(src);
1110 notify
1111 .register(
1112 |t| Ctx {
1113 _reactor_id: t,
1114 increment: 5,
1115 },
1116 step,
1117 reg,
1118 )
1119 .subscribe(src);
1120
1121 notify_mut(&world, nid).mark(src);
1123 let ran = system.dispatch(&mut world);
1124
1125 assert!(ran);
1126 assert_eq!(*world.resource::<u64>(), 15); }
1128
1129 #[test]
1130 fn reactor_system_deferred_removal() {
1131 let mut wb = WorldBuilder::new();
1132 wb.register::<u64>(0);
1133 wb.register(ReactorNotify::new(4, 8));
1134 wb.register(DeferredRemovals::default());
1135 let mut world = wb.build();
1136 let reg = world.registry();
1137 let nid = world.id::<ReactorNotify>();
1138
1139 let mut system = ReactorSystem::new(&world);
1140
1141 struct Ctx {
1142 reactor_id: Token,
1143 runs: u64,
1144 }
1145
1146 fn step(ctx: &mut Ctx, mut val: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1147 *val += 1;
1148 ctx.runs += 1;
1149 if ctx.runs >= 2 {
1150 removals.deregister(ctx.reactor_id);
1151 }
1152 }
1153
1154 let notify = notify_mut(&world, nid);
1155 let src = notify.register_source();
1156 notify
1157 .register(
1158 |t| Ctx {
1159 reactor_id: t,
1160 runs: 0,
1161 },
1162 step,
1163 reg,
1164 )
1165 .subscribe(src);
1166
1167 assert_eq!(system.reactor_count(&world), 1);
1168
1169 notify_mut(&world, nid).mark(src);
1171 system.dispatch(&mut world);
1172 assert_eq!(*world.resource::<u64>(), 1);
1173 assert_eq!(system.reactor_count(&world), 1);
1174
1175 notify_mut(&world, nid).mark(src);
1177 system.dispatch(&mut world);
1178 assert_eq!(*world.resource::<u64>(), 2);
1179 assert_eq!(system.reactor_count(&world), 0);
1180
1181 notify_mut(&world, nid).mark(src);
1183 let ran = system.dispatch(&mut world);
1184 assert!(!ran);
1185 assert_eq!(*world.resource::<u64>(), 2);
1186 }
1187
1188 #[test]
1189 fn reactor_system_only_subscribed_wake() {
1190 let mut wb = WorldBuilder::new();
1191 wb.register::<u64>(0);
1192 wb.register(ReactorNotify::new(4, 8));
1193 wb.register(DeferredRemovals::default());
1194 let mut world = wb.build();
1195 let reg = world.registry();
1196 let nid = world.id::<ReactorNotify>();
1197
1198 let mut system = ReactorSystem::new(&world);
1199
1200 struct Ctx {
1201 _reactor_id: Token,
1202 value: u64,
1203 }
1204
1205 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1206 *out += ctx.value;
1207 }
1208
1209 let notify = notify_mut(&world, nid);
1210 let btc = notify.register_source();
1211 let eth = notify.register_source();
1212
1213 notify
1214 .register(
1215 |t| Ctx {
1216 _reactor_id: t,
1217 value: 10,
1218 },
1219 step,
1220 reg,
1221 )
1222 .subscribe(btc);
1223 notify
1224 .register(
1225 |t| Ctx {
1226 _reactor_id: t,
1227 value: 100,
1228 },
1229 step,
1230 reg,
1231 )
1232 .subscribe(eth);
1233
1234 notify_mut(&world, nid).mark(btc);
1236 system.dispatch(&mut world);
1237 assert_eq!(*world.resource::<u64>(), 10);
1238
1239 notify_mut(&world, nid).mark(eth);
1241 system.dispatch(&mut world);
1242 assert_eq!(*world.resource::<u64>(), 110);
1243 }
1244
1245 #[test]
1246 fn runtime_registration() {
1247 let mut wb = WorldBuilder::new();
1248 wb.register::<u64>(0);
1249 wb.register(ReactorNotify::new(4, 8));
1250 wb.register(DeferredRemovals::default());
1251 let mut world = wb.build();
1252 let nid = world.id::<ReactorNotify>();
1253
1254 let mut system = ReactorSystem::new(&world);
1255
1256 struct Ctx {
1257 _reactor_id: Token,
1258 value: u64,
1259 }
1260
1261 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1262 *out += ctx.value;
1263 }
1264
1265 let src = {
1267 let reg = world.registry();
1268 let notify = notify_mut(&world, nid);
1269 let src = notify.register_source();
1270 notify
1271 .register(
1272 |t| Ctx {
1273 _reactor_id: t,
1274 value: 10,
1275 },
1276 step,
1277 reg,
1278 )
1279 .subscribe(src);
1280 src
1281 };
1282
1283 notify_mut(&world, nid).mark(src);
1285 system.dispatch(&mut world);
1286 assert_eq!(*world.resource::<u64>(), 10);
1287
1288 {
1290 let reg = world.registry();
1291 notify_mut(&world, nid)
1292 .register(
1293 |t| Ctx {
1294 _reactor_id: t,
1295 value: 100,
1296 },
1297 step,
1298 reg,
1299 )
1300 .subscribe(src);
1301 }
1302
1303 notify_mut(&world, nid).mark(src);
1305 system.dispatch(&mut world);
1306 assert_eq!(*world.resource::<u64>(), 120); }
1308
1309 #[test]
1310 fn register_after_remove_reuses_key() {
1311 let mut wb = WorldBuilder::new();
1312 wb.register::<u64>(0);
1313 wb.register(ReactorNotify::new(4, 8));
1314 wb.register(DeferredRemovals::default());
1315 let mut world = wb.build();
1316 let nid = world.id::<ReactorNotify>();
1317
1318 let mut system = ReactorSystem::new(&world);
1319
1320 struct Ctx {
1321 reactor_id: Token,
1322 value: u64,
1323 }
1324
1325 fn step(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1326 *out += ctx.value;
1327 if ctx.value == 10 {
1328 removals.deregister(ctx.reactor_id);
1329 }
1330 }
1331
1332 let src = {
1333 let reg = world.registry();
1334 let notify = notify_mut(&world, nid);
1335 let src = notify.register_source();
1336 notify
1337 .register(
1338 |t| Ctx {
1339 reactor_id: t,
1340 value: 10,
1341 },
1342 step,
1343 reg,
1344 )
1345 .subscribe(src);
1346 src
1347 };
1348
1349 notify_mut(&world, nid).mark(src);
1351 system.dispatch(&mut world);
1352 assert_eq!(*world.resource::<u64>(), 10);
1353 assert_eq!(system.reactor_count(&world), 0);
1354
1355 {
1357 let reg = world.registry();
1358 let notify = notify_mut(&world, nid);
1359 let token = notify
1360 .register(
1361 |t| Ctx {
1362 reactor_id: t,
1363 value: 100,
1364 },
1365 step,
1366 reg,
1367 )
1368 .token();
1369 notify.subscribe(token, src);
1370 assert_eq!(token.index(), 0); }
1372
1373 notify_mut(&world, nid).mark(src);
1375 system.dispatch(&mut world);
1376 assert_eq!(*world.resource::<u64>(), 110); assert_eq!(system.reactor_count(&world), 1); }
1379
1380 #[test]
1381 fn reactor_can_access_actor_notify() {
1382 let mut wb = WorldBuilder::new();
1385 wb.register::<u64>(0);
1386 wb.register(ReactorNotify::new(4, 8));
1387 wb.register(DeferredRemovals::default());
1388 let mut world = wb.build();
1389 let nid = world.id::<ReactorNotify>();
1390
1391 let mut system = ReactorSystem::new(&world);
1392
1393 struct Ctx {
1394 _reactor_id: Token,
1395 }
1396
1397 fn step(_ctx: &mut Ctx, notify: ResMut<ReactorNotify>, mut out: ResMut<u64>) {
1398 *out = notify.reactor_count() as u64;
1400 }
1401
1402 let src = {
1403 let reg = world.registry();
1404 let notify = notify_mut(&world, nid);
1405 let src = notify.register_source();
1406 notify
1407 .register(|t| Ctx { _reactor_id: t }, step, reg)
1408 .subscribe(src);
1409 src
1410 };
1411
1412 notify_mut(&world, nid).mark(src);
1413 system.dispatch(&mut world);
1414 }
1420
1421 #[test]
1424 fn multi_instrument_with_shared_source() {
1425 let mut wb = WorldBuilder::new();
1428 wb.register::<u64>(0);
1429 wb.register(ReactorNotify::new(8, 16));
1430 wb.register(DeferredRemovals::default());
1431 let mut world = wb.build();
1432 let nid = world.id::<ReactorNotify>();
1433 let mut system = ReactorSystem::new(&world);
1434
1435 struct Ctx {
1436 _reactor_id: Token,
1437 instrument: &'static str,
1438 }
1439
1440 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1441 *out += match ctx.instrument {
1443 "BTC" => 100,
1444 "ETH" => 10,
1445 "SOL" => 1,
1446 _ => 0,
1447 };
1448 }
1449
1450 let (btc_md, eth_md, sol_md, positions) = {
1451 let reg = world.registry();
1452 let notify = notify_mut(&world, nid);
1453
1454 let btc_md = notify.register_source();
1456 let eth_md = notify.register_source();
1457 let sol_md = notify.register_source();
1458 let positions = notify.register_source();
1460
1461 notify
1463 .register(
1464 |t| Ctx {
1465 _reactor_id: t,
1466 instrument: "BTC",
1467 },
1468 step,
1469 reg,
1470 )
1471 .subscribe(btc_md)
1472 .subscribe(positions);
1473
1474 notify
1476 .register(
1477 |t| Ctx {
1478 _reactor_id: t,
1479 instrument: "ETH",
1480 },
1481 step,
1482 reg,
1483 )
1484 .subscribe(eth_md)
1485 .subscribe(positions);
1486
1487 notify
1489 .register(
1490 |t| Ctx {
1491 _reactor_id: t,
1492 instrument: "SOL",
1493 },
1494 step,
1495 reg,
1496 )
1497 .subscribe(sol_md)
1498 .subscribe(positions);
1499
1500 (btc_md, eth_md, sol_md, positions)
1501 };
1502
1503 notify_mut(&world, nid).mark(btc_md);
1505 system.dispatch(&mut world);
1506 assert_eq!(*world.resource::<u64>(), 100);
1507
1508 notify_mut(&world, nid).mark(positions);
1510 system.dispatch(&mut world);
1511 assert_eq!(*world.resource::<u64>(), 211); notify_mut(&world, nid).mark(btc_md);
1515 notify_mut(&world, nid).mark(eth_md);
1516 system.dispatch(&mut world);
1517 assert_eq!(*world.resource::<u64>(), 321); notify_mut(&world, nid).mark(btc_md);
1521 notify_mut(&world, nid).mark(positions);
1522 system.dispatch(&mut world);
1523 assert_eq!(*world.resource::<u64>(), 432); let ran = system.dispatch(&mut world);
1528 assert!(!ran);
1529 assert_eq!(*world.resource::<u64>(), 432);
1530
1531 notify_mut(&world, nid).mark(sol_md);
1533 system.dispatch(&mut world);
1534 assert_eq!(*world.resource::<u64>(), 433);
1535 }
1536
1537 #[test]
1538 fn per_reactor_fill_routing() {
1539 use std::collections::HashMap;
1543
1544 let mut wb = WorldBuilder::new();
1545 wb.register::<u64>(0);
1546 wb.register(ReactorNotify::new(8, 16));
1547 wb.register(DeferredRemovals::default());
1548 let mut world = wb.build();
1549 let nid = world.id::<ReactorNotify>();
1550 let mut system = ReactorSystem::new(&world);
1551
1552 struct Ctx {
1553 reactor_id: Token,
1554 }
1555
1556 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1557 *out += ctx.reactor_id.index() as u64 + 1;
1558 }
1559
1560 let mut fill_sources: HashMap<usize, DataSource> = HashMap::new();
1562
1563 {
1564 let reg = world.registry();
1565 let notify = notify_mut(&world, nid);
1566
1567 for _ in 0..3 {
1568 let fill_src = notify.register_source();
1570 let token = notify
1571 .register(|t| Ctx { reactor_id: t }, step, reg)
1572 .subscribe(fill_src)
1573 .token();
1574
1575 fill_sources.insert(token.index(), fill_src);
1576 }
1577 }
1578
1579 let wire_client_id: usize = 1;
1581 let fill_source = fill_sources[&wire_client_id];
1582 notify_mut(&world, nid).mark(fill_source);
1583 system.dispatch(&mut world);
1584 assert_eq!(*world.resource::<u64>(), 2);
1586
1587 let fill_source = fill_sources[&0];
1589 notify_mut(&world, nid).mark(fill_source);
1590 system.dispatch(&mut world);
1591 assert_eq!(*world.resource::<u64>(), 3);
1593 }
1594
1595 #[test]
1596 fn dynamic_source_registration() {
1597 let mut wb = WorldBuilder::new();
1600 wb.register::<u64>(0);
1601 wb.register(ReactorNotify::new(4, 8));
1602 wb.register(DeferredRemovals::default());
1603 let mut world = wb.build();
1604 let nid = world.id::<ReactorNotify>();
1605 let mut system = ReactorSystem::new(&world);
1606
1607 struct Ctx {
1608 _reactor_id: Token,
1609 value: u64,
1610 }
1611
1612 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1613 *out += ctx.value;
1614 }
1615
1616 let btc_md = {
1618 let reg = world.registry();
1619 let notify = notify_mut(&world, nid);
1620 let btc_md = notify.register_source();
1621 notify
1622 .register(
1623 |t| Ctx {
1624 _reactor_id: t,
1625 value: 10,
1626 },
1627 step,
1628 reg,
1629 )
1630 .subscribe(btc_md);
1631 btc_md
1632 };
1633
1634 notify_mut(&world, nid).mark(btc_md);
1635 system.dispatch(&mut world);
1636 assert_eq!(*world.resource::<u64>(), 10);
1637
1638 let eth_md = {
1640 let reg = world.registry();
1641 let notify = notify_mut(&world, nid);
1642 let eth_md = notify.register_source();
1643 notify
1644 .register(
1645 |t| Ctx {
1646 _reactor_id: t,
1647 value: 100,
1648 },
1649 step,
1650 reg,
1651 )
1652 .subscribe(eth_md);
1653 eth_md
1654 };
1655
1656 notify_mut(&world, nid).mark(btc_md);
1658 notify_mut(&world, nid).mark(eth_md);
1659 system.dispatch(&mut world);
1660 assert_eq!(*world.resource::<u64>(), 120); }
1662
1663 #[test]
1666 fn remove_source_and_reuse_slot() {
1667 let mut wb = WorldBuilder::new();
1668 wb.register::<u64>(0);
1669 wb.register(ReactorNotify::new(4, 8));
1670 wb.register(DeferredRemovals::default());
1671 let mut world = wb.build();
1672 let nid = world.id::<ReactorNotify>();
1673 let mut system = ReactorSystem::new(&world);
1674
1675 struct Ctx {
1676 _reactor_id: Token,
1677 value: u64,
1678 }
1679
1680 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1681 *out += ctx.value;
1682 }
1683
1684 let (src_a, src_b) = {
1686 let reg = world.registry();
1687 let notify = notify_mut(&world, nid);
1688 let src_a = notify.register_source();
1689 let src_b = notify.register_source();
1690 notify
1691 .register(
1692 |t| Ctx {
1693 _reactor_id: t,
1694 value: 10,
1695 },
1696 step,
1697 reg,
1698 )
1699 .subscribe(src_a);
1700 notify
1701 .register(
1702 |t| Ctx {
1703 _reactor_id: t,
1704 value: 100,
1705 },
1706 step,
1707 reg,
1708 )
1709 .subscribe(src_b);
1710 (src_a, src_b)
1711 };
1712
1713 notify_mut(&world, nid).remove_source(src_a);
1715
1716 notify_mut(&world, nid).mark(src_a);
1718 let ran = system.dispatch(&mut world);
1719 assert!(!ran);
1720
1721 notify_mut(&world, nid).mark(src_b);
1723 system.dispatch(&mut world);
1724 assert_eq!(*world.resource::<u64>(), 100);
1725
1726 let src_c = notify_mut(&world, nid).register_source();
1728 assert_eq!(src_c.0, src_a.0); let reg = world.registry();
1732 let notify = notify_mut(&world, nid);
1733 notify
1734 .register(
1735 |t| Ctx {
1736 _reactor_id: t,
1737 value: 1,
1738 },
1739 step,
1740 reg,
1741 )
1742 .subscribe(src_c);
1743
1744 notify_mut(&world, nid).mark(src_c);
1745 system.dispatch(&mut world);
1746 assert_eq!(*world.resource::<u64>(), 101); }
1748
1749 #[test]
1752 fn source_registry_basic() {
1753 let mut registry = SourceRegistry::new();
1754
1755 #[derive(Hash, Eq, PartialEq, Debug)]
1756 struct InstrumentId(u32);
1757
1758 let src_a = DataSource(0);
1759 let src_b = DataSource(1);
1760
1761 registry.insert(InstrumentId(1), src_a);
1762 registry.insert(InstrumentId(2), src_b);
1763
1764 assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1765 assert_eq!(registry.get(&InstrumentId(2)), Some(src_b));
1766 assert_eq!(registry.get(&InstrumentId(3)), None);
1767 assert!(registry.contains(&InstrumentId(1)));
1768 assert!(!registry.contains(&InstrumentId(3)));
1769 }
1770
1771 #[test]
1772 fn source_registry_multiple_key_types() {
1773 let mut registry = SourceRegistry::new();
1774
1775 #[derive(Hash, Eq, PartialEq)]
1776 struct InstrumentId(u32);
1777
1778 #[derive(Hash, Eq, PartialEq)]
1779 struct StrategyId(u32);
1780
1781 let src_a = DataSource(0);
1782 let src_b = DataSource(1);
1783
1784 registry.insert(InstrumentId(1), src_a);
1786 registry.insert(StrategyId(1), src_b);
1787
1788 assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1790 assert_eq!(registry.get(&StrategyId(1)), Some(src_b));
1791 }
1792
1793 #[test]
1794 fn source_registry_tuple_keys() {
1795 let mut registry = SourceRegistry::new();
1796
1797 let src = DataSource(42);
1798 registry.insert(("BTC", "Binance"), src);
1799
1800 assert_eq!(registry.get(&("BTC", "Binance")), Some(src));
1801 assert_eq!(registry.get(&("ETH", "Binance")), None);
1802 }
1803
1804 #[test]
1805 fn source_registry_remove() {
1806 let mut registry = SourceRegistry::new();
1807
1808 let src = DataSource(0);
1809 registry.insert(42u64, src);
1810
1811 assert_eq!(registry.remove(&42u64), Some(src));
1812 assert_eq!(registry.get(&42u64), None);
1813 assert_eq!(registry.remove(&42u64), None); }
1815
1816 #[test]
1817 fn source_registry_integrated_with_reactor_system() {
1818 let mut wb = WorldBuilder::new();
1819 wb.register::<u64>(0);
1820 wb.register(ReactorNotify::new(4, 8));
1821 wb.register(DeferredRemovals::default());
1822 wb.register(SourceRegistry::new());
1823 let mut world = wb.build();
1824 let nid = world.id::<ReactorNotify>();
1825 let mut system = ReactorSystem::new(&world);
1826
1827 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1828 struct Instrument(u32);
1829 const BTC: Instrument = Instrument(0);
1830 const ETH: Instrument = Instrument(1);
1831
1832 struct Ctx {
1833 _reactor_id: Token,
1834 value: u64,
1835 }
1836
1837 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1838 *out += ctx.value;
1839 }
1840
1841 let btc_src = notify_mut(&world, nid).register_source();
1843 let eth_src = notify_mut(&world, nid).register_source();
1844
1845 world.resource_mut::<SourceRegistry>().insert(BTC, btc_src);
1846 world.resource_mut::<SourceRegistry>().insert(ETH, eth_src);
1847
1848 {
1850 let reg = world.registry();
1851 let btc = world.resource::<SourceRegistry>().get(&BTC).unwrap();
1852 let notify = notify_mut(&world, nid);
1853 notify
1854 .register(
1855 |t| Ctx {
1856 _reactor_id: t,
1857 value: 10,
1858 },
1859 step,
1860 reg,
1861 )
1862 .subscribe(btc);
1863 }
1864
1865 notify_mut(&world, nid).mark(btc_src);
1867 system.dispatch(&mut world);
1868 assert_eq!(*world.resource::<u64>(), 10);
1869
1870 let removed = world.resource_mut::<SourceRegistry>().remove(&BTC);
1872 assert!(removed.is_some());
1873 notify_mut(&world, nid).remove_source(removed.unwrap());
1874
1875 notify_mut(&world, nid).mark(btc_src);
1877 let ran = system.dispatch(&mut world);
1878 assert!(!ran);
1879 }
1880
1881 #[test]
1884 fn source_registry_overwrite_key() {
1885 let mut registry = SourceRegistry::new();
1886 let src_a = DataSource(0);
1887 let src_b = DataSource(1);
1888
1889 registry.insert(42u32, src_a);
1890 assert_eq!(registry.get(&42u32), Some(src_a));
1891
1892 registry.insert(42u32, src_b);
1894 assert_eq!(registry.get(&42u32), Some(src_b));
1895 }
1896
1897 #[test]
1898 fn source_registry_empty_get() {
1899 let registry = SourceRegistry::new();
1900 assert_eq!(registry.get(&42u32), None);
1902 assert!(!registry.contains(&42u32));
1903 }
1904
1905 #[test]
1906 fn source_registry_enum_keys() {
1907 #[derive(Hash, Eq, PartialEq)]
1908 enum Venue {
1909 Binance,
1910 Coinbase,
1911 }
1912
1913 let mut registry = SourceRegistry::new();
1914 let src = DataSource(0);
1915 registry.insert(Venue::Binance, src);
1916
1917 assert_eq!(registry.get(&Venue::Binance), Some(src));
1918 assert_eq!(registry.get(&Venue::Coinbase), None);
1919 }
1920
1921 #[test]
1922 fn source_registry_composite_key() {
1923 #[derive(Hash, Eq, PartialEq)]
1925 struct StrategyId(u32);
1926 #[derive(Hash, Eq, PartialEq)]
1927 struct InstrumentId(u32);
1928 #[derive(Hash, Eq, PartialEq)]
1929 struct VenueId(u32);
1930
1931 let mut registry = SourceRegistry::new();
1932 let src = DataSource(5);
1933 registry.insert((StrategyId(1), InstrumentId(0), VenueId(2)), src);
1934
1935 assert_eq!(
1936 registry.get(&(StrategyId(1), InstrumentId(0), VenueId(2))),
1937 Some(src)
1938 );
1939 assert_eq!(
1941 registry.get(&(StrategyId(2), InstrumentId(0), VenueId(2))),
1942 None
1943 );
1944 }
1945
1946 #[test]
1949 fn full_lifecycle_add_trade_remove() {
1950 let mut wb = WorldBuilder::new();
1952 wb.register::<u64>(0);
1953 wb.register(ReactorNotify::new(4, 8));
1954 wb.register(DeferredRemovals::default());
1955 wb.register(SourceRegistry::new());
1956 let mut world = wb.build();
1957 let nid = world.id::<ReactorNotify>();
1958 let mut system = ReactorSystem::new(&world);
1959
1960 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1961 struct Instrument(u32);
1962
1963 struct Ctx {
1964 _reactor_id: Token,
1965 value: u64,
1966 }
1967
1968 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1969 *out += ctx.value;
1970 }
1971
1972 let btc_src = notify_mut(&world, nid).register_source();
1974 world
1975 .resource_mut::<SourceRegistry>()
1976 .insert(Instrument(0), btc_src);
1977
1978 {
1979 let reg = world.registry();
1980 let notify = notify_mut(&world, nid);
1981 notify
1982 .register(
1983 |t| Ctx {
1984 _reactor_id: t,
1985 value: 10,
1986 },
1987 step,
1988 reg,
1989 )
1990 .subscribe(btc_src);
1991 }
1992
1993 notify_mut(&world, nid).mark(btc_src);
1995 system.dispatch(&mut world);
1996 assert_eq!(*world.resource::<u64>(), 10);
1997
1998 let eth_src = notify_mut(&world, nid).register_source();
2000 world
2001 .resource_mut::<SourceRegistry>()
2002 .insert(Instrument(1), eth_src);
2003
2004 {
2005 let reg = world.registry();
2006 let notify = notify_mut(&world, nid);
2007 notify
2008 .register(
2009 |t| Ctx {
2010 _reactor_id: t,
2011 value: 100,
2012 },
2013 step,
2014 reg,
2015 )
2016 .subscribe(eth_src);
2017 }
2018
2019 notify_mut(&world, nid).mark(btc_src);
2021 notify_mut(&world, nid).mark(eth_src);
2022 system.dispatch(&mut world);
2023 assert_eq!(*world.resource::<u64>(), 120);
2024
2025 let removed = world
2027 .resource_mut::<SourceRegistry>()
2028 .remove(&Instrument(0));
2029 notify_mut(&world, nid).remove_source(removed.unwrap());
2030
2031 notify_mut(&world, nid).mark(eth_src);
2033 system.dispatch(&mut world);
2034 assert_eq!(*world.resource::<u64>(), 220);
2035
2036 let sol_src = notify_mut(&world, nid).register_source();
2038 world
2039 .resource_mut::<SourceRegistry>()
2040 .insert(Instrument(2), sol_src);
2041 assert_eq!(sol_src.0, btc_src.0); {
2044 let reg = world.registry();
2045 let notify = notify_mut(&world, nid);
2046 notify
2047 .register(
2048 |t| Ctx {
2049 _reactor_id: t,
2050 value: 1000,
2051 },
2052 step,
2053 reg,
2054 )
2055 .subscribe(sol_src);
2056 }
2057
2058 notify_mut(&world, nid).mark(sol_src);
2060 notify_mut(&world, nid).mark(eth_src);
2061 system.dispatch(&mut world);
2062 assert_eq!(*world.resource::<u64>(), 1320); }
2064
2065 #[test]
2066 fn multi_strategy_same_instrument() {
2067 let mut wb = WorldBuilder::new();
2069 wb.register::<u64>(0);
2070 wb.register(ReactorNotify::new(8, 16));
2071 wb.register(DeferredRemovals::default());
2072 wb.register(SourceRegistry::new());
2073 let mut world = wb.build();
2074 let nid = world.id::<ReactorNotify>();
2075 let mut system = ReactorSystem::new(&world);
2076
2077 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
2078 struct StrategyInstrument(&'static str, &'static str);
2079
2080 struct Ctx {
2081 _reactor_id: Token,
2082 value: u64,
2083 }
2084
2085 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2086 *out += ctx.value;
2087 }
2088
2089 let reg = world.registry();
2091 let notify = notify_mut(&world, nid);
2092
2093 let mm_btc = notify.register_source();
2094 let mm_eth = notify.register_source();
2095 let arb_btc = notify.register_source();
2096
2097 notify
2099 .register(
2100 |t| Ctx {
2101 _reactor_id: t,
2102 value: 1,
2103 },
2104 step,
2105 reg,
2106 )
2107 .subscribe(mm_btc);
2108 notify
2109 .register(
2110 |t| Ctx {
2111 _reactor_id: t,
2112 value: 2,
2113 },
2114 step,
2115 reg,
2116 )
2117 .subscribe(mm_eth);
2118
2119 notify
2121 .register(
2122 |t| Ctx {
2123 _reactor_id: t,
2124 value: 100,
2125 },
2126 step,
2127 reg,
2128 )
2129 .subscribe(arb_btc);
2130
2131 world
2133 .resource_mut::<SourceRegistry>()
2134 .insert(StrategyInstrument("MM", "BTC"), mm_btc);
2135 world
2136 .resource_mut::<SourceRegistry>()
2137 .insert(StrategyInstrument("MM", "ETH"), mm_eth);
2138 world
2139 .resource_mut::<SourceRegistry>()
2140 .insert(StrategyInstrument("ARB", "BTC"), arb_btc);
2141
2142 let mm_btc_src = world
2145 .resource::<SourceRegistry>()
2146 .get(&StrategyInstrument("MM", "BTC"))
2147 .unwrap();
2148 let arb_btc_src = world
2149 .resource::<SourceRegistry>()
2150 .get(&StrategyInstrument("ARB", "BTC"))
2151 .unwrap();
2152
2153 notify_mut(&world, nid).mark(mm_btc_src);
2154 notify_mut(&world, nid).mark(arb_btc_src);
2155 system.dispatch(&mut world);
2156 assert_eq!(*world.resource::<u64>(), 101); }
2158
2159 #[test]
2160 fn reactor_self_removal_with_registry_cleanup() {
2161 let mut wb = WorldBuilder::new();
2163 wb.register::<u64>(0);
2164 wb.register(ReactorNotify::new(4, 8));
2165 wb.register(DeferredRemovals::default());
2166 wb.register(SourceRegistry::new());
2167 let mut world = wb.build();
2168 let nid = world.id::<ReactorNotify>();
2169 let mut system = ReactorSystem::new(&world);
2170
2171 struct Ctx {
2172 reactor_id: Token,
2173 }
2174
2175 fn one_shot(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
2176 *out += 1;
2177 removals.deregister(ctx.reactor_id);
2178 }
2179
2180 let src = notify_mut(&world, nid).register_source();
2181 world
2182 .resource_mut::<SourceRegistry>()
2183 .insert("one-shot", src);
2184
2185 {
2186 let reg = world.registry();
2187 let notify = notify_mut(&world, nid);
2188 notify
2189 .register(|t| Ctx { reactor_id: t }, one_shot, reg)
2190 .subscribe(src);
2191 }
2192
2193 notify_mut(&world, nid).mark(src);
2195 system.dispatch(&mut world);
2196 assert_eq!(*world.resource::<u64>(), 1);
2197 assert_eq!(system.reactor_count(&world), 0);
2198
2199 assert!(world.resource::<SourceRegistry>().contains(&"one-shot"));
2201
2202 notify_mut(&world, nid).mark(src);
2204 let ran = system.dispatch(&mut world);
2205 assert!(!ran);
2206 }
2207
2208 #[test]
2209 fn many_reactors_same_source() {
2210 let mut wb = WorldBuilder::new();
2212 wb.register::<u64>(0);
2213 wb.register(ReactorNotify::new(4, 64));
2214 wb.register(DeferredRemovals::default());
2215 let mut world = wb.build();
2216 let nid = world.id::<ReactorNotify>();
2217 let mut system = ReactorSystem::new(&world);
2218
2219 struct Ctx {
2220 _reactor_id: Token,
2221 }
2222
2223 fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2224 *out += 1;
2225 }
2226
2227 let src = notify_mut(&world, nid).register_source();
2228
2229 {
2230 let reg = world.registry();
2231 let notify = notify_mut(&world, nid);
2232 for _ in 0..50 {
2233 notify
2234 .register(|t| Ctx { _reactor_id: t }, step, reg)
2235 .subscribe(src);
2236 }
2237 }
2238
2239 assert_eq!(system.reactor_count(&world), 50);
2240
2241 notify_mut(&world, nid).mark(src);
2242 system.dispatch(&mut world);
2243 assert_eq!(*world.resource::<u64>(), 50); }
2245
2246 #[test]
2247 fn reactor_subscribes_to_multiple_sources() {
2248 let mut wb = WorldBuilder::new();
2251 wb.register::<u64>(0);
2252 wb.register(ReactorNotify::new(8, 8));
2253 wb.register(DeferredRemovals::default());
2254 let mut world = wb.build();
2255 let nid = world.id::<ReactorNotify>();
2256 let mut system = ReactorSystem::new(&world);
2257
2258 struct Ctx {
2259 _reactor_id: Token,
2260 }
2261
2262 fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2263 *out += 1;
2264 }
2265
2266 let mut sources = Vec::new();
2267 let notify = notify_mut(&world, nid);
2268 for _ in 0..5 {
2269 sources.push(notify.register_source());
2270 }
2271
2272 {
2273 let reg = world.registry();
2274 let notify = notify_mut(&world, nid);
2275 let mut registration = notify.register(|t| Ctx { _reactor_id: t }, step, reg);
2276 for &src in &sources {
2277 registration = registration.subscribe(src);
2278 }
2279 }
2280
2281 for &src in &sources {
2283 notify_mut(&world, nid).mark(src);
2284 }
2285
2286 system.dispatch(&mut world);
2287 assert_eq!(*world.resource::<u64>(), 1); }
2289
2290 #[test]
2291 fn stale_data_source_is_noop() {
2292 let mut wb = WorldBuilder::new();
2294 wb.register(ReactorNotify::new(4, 4));
2295 wb.register(DeferredRemovals::default());
2296 let mut world = wb.build();
2297 let nid = world.id::<ReactorNotify>();
2298 let mut system = ReactorSystem::new(&world);
2299
2300 let src = notify_mut(&world, nid).register_source();
2301 notify_mut(&world, nid).remove_source(src);
2302
2303 notify_mut(&world, nid).mark(src);
2305 let ran = system.dispatch(&mut world);
2306 assert!(!ran);
2307 }
2308
2309 #[test]
2310 fn double_remove_source_is_noop() {
2311 let mut notify = ReactorNotify::new(4, 4);
2312 let src = notify.register_source();
2313 notify.remove_source(src);
2314 notify.remove_source(src); }
2316
2317 #[test]
2320 fn pipeline_reactor_dispatch() {
2321 use crate::CtxPipelineBuilder;
2322
2323 let mut wb = WorldBuilder::new();
2324 wb.register::<u64>(0);
2325 wb.register(ReactorNotify::new(4, 8));
2326 wb.register(DeferredRemovals::default());
2327 let mut world = wb.build();
2328 let nid = world.id::<ReactorNotify>();
2329 let mut system = ReactorSystem::new(&world);
2330
2331 struct Ctx {
2332 _reactor_id: Token,
2333 instrument: &'static str,
2334 }
2335
2336 fn read_data(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2337 let _ = ctx.instrument;
2338 *val
2339 }
2340
2341 fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2342 x * 2
2343 }
2344
2345 fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2346 *out = x;
2347 }
2348
2349 let reg = world.registry();
2350
2351 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2352 .then(read_data, reg)
2353 .then(double, reg)
2354 .then(store, reg)
2355 .build();
2356
2357 let notify = notify_mut(&world, nid);
2358 let src = notify.register_source();
2359
2360 let reactor = PipelineReactor::new(
2362 Ctx {
2363 _reactor_id: Token::new(0),
2364 instrument: "BTC",
2365 },
2366 pipeline,
2367 );
2368 notify.register_built(reactor).subscribe(src);
2369
2370 *world.resource_mut::<u64>() = 10;
2372 notify_mut(&world, nid).mark(src);
2373 system.dispatch(&mut world);
2374
2375 assert_eq!(*world.resource::<u64>(), 20); }
2377
2378 #[test]
2379 fn dag_reactor_dispatch() {
2380 use crate::CtxDagBuilder;
2381
2382 let mut wb = WorldBuilder::new();
2383 wb.register::<u64>(0);
2384 wb.register(ReactorNotify::new(4, 8));
2385 wb.register(DeferredRemovals::default());
2386 let mut world = wb.build();
2387 let nid = world.id::<ReactorNotify>();
2388 let mut system = ReactorSystem::new(&world);
2389
2390 struct Ctx {
2391 _reactor_id: Token,
2392 }
2393
2394 fn root(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2395 let _ = ctx;
2396 *val
2397 }
2398
2399 fn arm_double(_ctx: &mut Ctx, val: &u64) -> u64 {
2400 *val * 2
2401 }
2402
2403 fn arm_add(_ctx: &mut Ctx, val: &u64) -> u64 {
2404 *val + 10
2405 }
2406
2407 fn merge(_ctx: &mut Ctx, mut out: ResMut<u64>, a: &u64, b: &u64) {
2408 *out = *a + *b;
2409 }
2410
2411 let reg = world.registry();
2412
2413 let dag = CtxDagBuilder::<Ctx, ()>::new()
2414 .root(root, reg)
2415 .fork()
2416 .arm(|seed| seed.then(arm_double, reg))
2417 .arm(|seed| seed.then(arm_add, reg))
2418 .merge(merge, reg)
2419 .build();
2420
2421 let notify = notify_mut(&world, nid);
2422 let src = notify.register_source();
2423
2424 let reactor = PipelineReactor::new(
2425 Ctx {
2426 _reactor_id: Token::new(0),
2427 },
2428 dag,
2429 );
2430 notify.register_built(reactor).subscribe(src);
2431
2432 *world.resource_mut::<u64>() = 5;
2433 notify_mut(&world, nid).mark(src);
2434 system.dispatch(&mut world);
2435
2436 assert_eq!(*world.resource::<u64>(), 25);
2438 }
2439
2440 #[test]
2441 fn multiple_pipeline_reactors_different_bodies() {
2442 use crate::CtxPipelineBuilder;
2443
2444 let mut wb = WorldBuilder::new();
2445 wb.register::<u64>(0);
2446 wb.register(ReactorNotify::new(4, 8));
2447 wb.register(DeferredRemovals::default());
2448 let mut world = wb.build();
2449 let nid = world.id::<ReactorNotify>();
2450 let mut system = ReactorSystem::new(&world);
2451
2452 struct Ctx {
2453 _reactor_id: Token,
2454 factor: u64,
2455 }
2456
2457 fn multiply(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2458 *val * ctx.factor
2459 }
2460
2461 fn accumulate(_ctx: &mut Ctx, mut out: ResMut<u64>, val: u64) {
2462 *out += val;
2463 }
2464
2465 let reg = world.registry();
2466
2467 let pipeline_a = CtxPipelineBuilder::<Ctx, ()>::new()
2469 .then(multiply, reg)
2470 .then(accumulate, reg)
2471 .build();
2472
2473 let pipeline_b = CtxPipelineBuilder::<Ctx, ()>::new()
2475 .then(multiply, reg)
2476 .then(accumulate, reg)
2477 .build();
2478
2479 let notify = notify_mut(&world, nid);
2480 let src = notify.register_source();
2481
2482 notify
2483 .register_built(PipelineReactor::new(
2484 Ctx {
2485 _reactor_id: Token::new(0),
2486 factor: 2,
2487 },
2488 pipeline_a,
2489 ))
2490 .subscribe(src);
2491
2492 notify
2493 .register_built(PipelineReactor::new(
2494 Ctx {
2495 _reactor_id: Token::new(1),
2496 factor: 10,
2497 },
2498 pipeline_b,
2499 ))
2500 .subscribe(src);
2501
2502 *world.resource_mut::<u64>() = 5;
2503 notify_mut(&world, nid).mark(src);
2504 system.dispatch(&mut world);
2505
2506 let val = *world.resource::<u64>();
2511 assert!(val > 5, "both reactors should have run, got {val}");
2512 }
2513
2514 #[test]
2515 fn pipeline_reactor_with_guard() {
2516 use crate::CtxPipelineBuilder;
2517
2518 let mut wb = WorldBuilder::new();
2519 wb.register::<u64>(0);
2520 wb.register(ReactorNotify::new(4, 8));
2521 wb.register(DeferredRemovals::default());
2522 let mut world = wb.build();
2523 let nid = world.id::<ReactorNotify>();
2524 let mut system = ReactorSystem::new(&world);
2525
2526 struct Ctx {
2527 _reactor_id: Token,
2528 threshold: u64,
2529 }
2530
2531 fn read(_ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2532 *val
2533 }
2534
2535 fn above_threshold(ctx: &mut Ctx, val: &u64) -> bool {
2536 *val > ctx.threshold
2537 }
2538
2539 fn write(_ctx: &mut Ctx, mut out: ResMut<u64>, _val: u64) {
2540 *out = 999;
2541 }
2542
2543 let reg = world.registry();
2544
2545 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2546 .then(read, reg)
2547 .guard(above_threshold, reg)
2548 .map(write, reg)
2549 .build();
2550
2551 let notify = notify_mut(&world, nid);
2552 let src = notify.register_source();
2553
2554 notify
2555 .register_built(PipelineReactor::new(
2556 Ctx {
2557 _reactor_id: Token::new(0),
2558 threshold: 10,
2559 },
2560 pipeline,
2561 ))
2562 .subscribe(src);
2563
2564 *world.resource_mut::<u64>() = 5;
2566 notify_mut(&world, nid).mark(src);
2567 system.dispatch(&mut world);
2568 assert_eq!(*world.resource::<u64>(), 5); *world.resource_mut::<u64>() = 20;
2572 notify_mut(&world, nid).mark(src);
2573 system.dispatch(&mut world);
2574 assert_eq!(*world.resource::<u64>(), 999);
2575 }
2576
2577 #[test]
2580 fn two_phase_registration_safe_api() {
2581 let mut wb = WorldBuilder::new();
2584 wb.register::<u64>(0);
2585 wb.register(ReactorNotify::new(4, 8));
2586 wb.register(DeferredRemovals::default());
2587 let mut world = wb.build();
2588
2589 let mut system = ReactorSystem::new(&world);
2590
2591 struct Ctx {
2592 reactor_id: Token,
2593 instrument: &'static str,
2594 }
2595
2596 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2597 let _ = ctx.instrument;
2598 *out += ctx.reactor_id.index() as u64 + 1;
2599 }
2600
2601 let src = world.resource_mut::<ReactorNotify>().register_source();
2603 let token = world.resource_mut::<ReactorNotify>().create_reactor();
2604
2605 let reactor = step.into_reactor(
2607 Ctx {
2608 reactor_id: token,
2609 instrument: "BTC",
2610 },
2611 world.registry(),
2612 );
2613
2614 world
2616 .resource_mut::<ReactorNotify>()
2617 .insert_reactor(token, reactor)
2618 .subscribe(src);
2619
2620 world.resource_mut::<ReactorNotify>().mark(src);
2622 system.dispatch(&mut world);
2623 assert_eq!(*world.resource::<u64>(), 1); let token2 = world.resource_mut::<ReactorNotify>().create_reactor();
2627 let actor2 = step.into_reactor(
2628 Ctx {
2629 reactor_id: token2,
2630 instrument: "ETH",
2631 },
2632 world.registry(),
2633 );
2634 world
2635 .resource_mut::<ReactorNotify>()
2636 .insert_reactor(token2, actor2)
2637 .subscribe(src);
2638
2639 world.resource_mut::<ReactorNotify>().mark(src);
2641 system.dispatch(&mut world);
2642 assert_eq!(*world.resource::<u64>(), 4); }
2644
2645 #[test]
2646 fn two_phase_with_pipeline_reactor() {
2647 use crate::CtxPipelineBuilder;
2648
2649 let mut wb = WorldBuilder::new();
2650 wb.register::<u64>(0);
2651 wb.register(ReactorNotify::new(4, 8));
2652 wb.register(DeferredRemovals::default());
2653 let mut world = wb.build();
2654
2655 let mut system = ReactorSystem::new(&world);
2656
2657 struct Ctx {
2658 _reactor_id: Token,
2659 }
2660
2661 fn read(ctx: &mut Ctx, val: Res<u64>, _: ()) -> u64 {
2662 let _ = ctx;
2663 *val
2664 }
2665
2666 fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2667 x * 2
2668 }
2669
2670 fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2671 *out = x;
2672 }
2673
2674 let src = world.resource_mut::<ReactorNotify>().register_source();
2676 let token = world.resource_mut::<ReactorNotify>().create_reactor();
2677
2678 let reg = world.registry();
2680 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2681 .then(read, reg)
2682 .then(double, reg)
2683 .then(store, reg)
2684 .build();
2685 let reactor = PipelineReactor::new(Ctx { _reactor_id: token }, pipeline);
2686
2687 world
2689 .resource_mut::<ReactorNotify>()
2690 .insert_reactor(token, reactor)
2691 .subscribe(src);
2692
2693 *world.resource_mut::<u64>() = 10;
2694 world.resource_mut::<ReactorNotify>().mark(src);
2695 system.dispatch(&mut world);
2696 assert_eq!(*world.resource::<u64>(), 20); }
2698}