1#![allow(clippy::too_many_arguments)]
127
128use std::any::{Any, TypeId};
129use std::hash::Hash;
130
131use nexus_notify::local::LocalNotify;
132use nexus_notify::{Events, Token};
133use rustc_hash::FxHashMap;
134
135use crate::ctx_pipeline::CtxStepCall;
136use crate::handler::Param;
137use crate::world::{Registry, Resource, ResourceId, World};
138
139#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
156pub struct DataSource(pub usize);
157
158pub struct ReactorNotify {
172 notify: LocalNotify,
174
175 interests: slab::Slab<Vec<Token>>,
179
180 reactor_sources: Vec<Vec<DataSource>>,
183
184 reactors: slab::Slab<Option<Box<dyn Reactor>>>,
189}
190
191impl std::fmt::Debug for ReactorNotify {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("ReactorNotify")
195 .field("num_sources", &self.interests.len())
196 .field("num_reactors", &self.reactors.len())
197 .field("notify", &self.notify)
198 .finish()
199 }
200}
201
202impl ReactorNotify {
203 pub fn new(source_capacity: usize, reactor_capacity: usize) -> Self {
205 Self {
206 notify: LocalNotify::with_capacity(reactor_capacity),
207 interests: slab::Slab::with_capacity(source_capacity),
208 reactor_sources: Vec::with_capacity(reactor_capacity),
209 reactors: slab::Slab::with_capacity(reactor_capacity),
210 }
211 }
212
213 pub fn register_source(&mut self) -> DataSource {
219 DataSource(self.interests.insert(Vec::new()))
220 }
221
222 pub fn remove_source(&mut self, source: DataSource) {
227 if self.interests.contains(source.0) {
228 self.interests.remove(source.0);
229 }
230 }
231
232 pub fn create_reactor(&mut self) -> Token {
259 let key = self.reactors.insert(None);
262 self.notify.ensure_capacity(key);
263 if key >= self.reactor_sources.len() {
265 self.reactor_sources.resize_with(key + 1, Vec::new);
266 }
267 Token::new(key)
268 }
269
270 pub fn insert_reactor(
280 &mut self,
281 token: Token,
282 reactor: impl Reactor + 'static,
283 ) -> ReactorRegistration<'_> {
284 let idx = token.index();
285 assert!(
286 self.reactors.contains(idx),
287 "token {} was not allocated by create_reactor",
288 idx,
289 );
290 assert!(
291 self.reactors[idx].is_none(),
292 "token {} was already filled",
293 idx,
294 );
295 self.reactors[idx] = Some(Box::new(reactor));
296 ReactorRegistration {
297 token,
298 notify: self,
299 }
300 }
301
302 pub fn register<C, Params, F: IntoReactor<C, Params>>(
309 &mut self,
310 ctx_fn: impl FnOnce(Token) -> C,
311 step: F,
312 registry: &Registry,
313 ) -> ReactorRegistration<'_> {
314 let key = self.reactors.vacant_key();
315 let token = Token::new(key);
316 self.notify.ensure_capacity(key);
317 if key >= self.reactor_sources.len() {
318 self.reactor_sources.resize_with(key + 1, Vec::new);
319 }
320 let ctx = ctx_fn(token);
321 let reactor = step.into_reactor(ctx, registry);
322 let inserted = self.reactors.insert(Some(Box::new(reactor)));
323 debug_assert_eq!(inserted, key);
324 ReactorRegistration {
325 token,
326 notify: self,
327 }
328 }
329
330 pub fn register_built(&mut self, reactor: impl Reactor + 'static) -> ReactorRegistration<'_> {
337 let key = self.reactors.vacant_key();
338 let token = Token::new(key);
339 self.notify.ensure_capacity(key);
340 if key >= self.reactor_sources.len() {
341 self.reactor_sources.resize_with(key + 1, Vec::new);
342 }
343 let inserted = self.reactors.insert(Some(Box::new(reactor)));
344 debug_assert_eq!(inserted, key);
345 ReactorRegistration {
346 token,
347 notify: self,
348 }
349 }
350
351 pub fn subscribe(&mut self, reactor: Token, source: DataSource) {
358 if let Some(subscribers) = self.interests.get_mut(source.0)
359 && !subscribers.contains(&reactor)
360 {
361 subscribers.push(reactor);
362 let idx = reactor.index();
364 debug_assert!(
365 idx < self.reactor_sources.len(),
366 "reactor_sources missing entry for reactor token {}",
367 idx,
368 );
369 self.reactor_sources[idx].push(source);
370 }
371 }
372
373 pub fn unsubscribe(&mut self, reactor: Token, source: DataSource) {
375 if let Some(subscribers) = self.interests.get_mut(source.0) {
376 subscribers.retain(|&t| t != reactor);
377 }
378 if let Some(sources) = self.reactor_sources.get_mut(reactor.index()) {
379 sources.retain(|&s| s != source);
380 }
381 }
382
383 #[inline]
390 pub fn mark(&mut self, source: DataSource) {
391 if let Some(subscribers) = self.interests.get(source.0) {
392 for &reactor_token in subscribers {
393 self.notify.mark(reactor_token);
394 }
395 }
396 }
397
398 #[inline]
400 pub(crate) fn poll(&mut self, events: &mut Events) {
401 self.notify.poll(events);
402 }
403
404 #[inline]
407 pub(crate) fn take_reactor(&mut self, idx: usize) -> Option<Box<dyn Reactor>> {
408 self.reactors.get_mut(idx).and_then(Option::take)
409 }
410
411 #[inline]
417 pub(crate) fn put_reactor(&mut self, idx: usize, reactor: Box<dyn Reactor>) {
418 self.reactors[idx] = Some(reactor);
419 }
420
421 pub fn remove_reactor(&mut self, token: Token) {
426 let idx = token.index();
427 if self.reactors.contains(idx) {
428 self.reactors.remove(idx);
429 if let Some(sources) = self.reactor_sources.get_mut(idx) {
431 for &source in sources.iter() {
432 if let Some(subscribers) = self.interests.get_mut(source.0) {
433 subscribers.retain(|&t| t != token);
434 }
435 }
436 sources.clear();
437 }
438 }
439 }
440
441 pub fn has_notified(&self) -> bool {
445 self.notify.has_notified()
446 }
447
448 pub fn notified_count(&self) -> usize {
450 self.notify.notified_count()
451 }
452
453 pub fn source_count(&self) -> usize {
455 self.interests.len()
456 }
457
458 pub fn reactor_count(&self) -> usize {
460 self.reactors.len()
461 }
462}
463
464impl Resource for ReactorNotify {}
465
466pub struct ReactorRegistration<'a> {
472 token: Token,
473 notify: &'a mut ReactorNotify,
474}
475
476impl ReactorRegistration<'_> {
477 pub fn subscribe(self, source: DataSource) -> Self {
479 self.notify.subscribe(self.token, source);
480 self
481 }
482
483 pub fn token(&self) -> Token {
485 self.token
486 }
487}
488
489pub trait Reactor: Send {
503 fn run(&mut self, world: &mut World);
505
506 fn name(&self) -> &'static str {
508 "<unnamed>"
509 }
510}
511
512pub struct ReactorFn<C, F, Params: Param> {
522 pub ctx: C,
524 f: F,
525 state: Params::State,
526 name: &'static str,
527}
528
529pub struct PipelineReactor<C> {
556 pub ctx: C,
558 body: Box<dyn CtxStepCall<C, (), Out = ()> + Send>,
559}
560
561impl<C: Send + 'static> PipelineReactor<C> {
562 pub fn new(ctx: C, body: impl CtxStepCall<C, (), Out = ()> + Send + 'static) -> Self {
568 Self {
569 ctx,
570 body: Box::new(body),
571 }
572 }
573}
574
575impl<C: Send + 'static> Reactor for PipelineReactor<C> {
576 fn run(&mut self, world: &mut World) {
577 self.body.call(&mut self.ctx, world, ());
578 }
579
580 fn name(&self) -> &'static str {
581 std::any::type_name::<C>()
582 }
583}
584
585#[diagnostic::on_unimplemented(
605 message = "this function cannot be used as a reactor step",
606 note = "reactor step signature: `fn(&mut C, Params...)` — context first, then resources",
607 note = "closures with resource parameters are not supported — use a named `fn`"
608)]
609pub trait IntoReactor<C, Params> {
610 type Reactor: Reactor + 'static;
612
613 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor;
616}
617
618impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> Reactor for ReactorFn<C, F, ()> {
623 fn run(&mut self, _world: &mut World) {
624 (self.f)(&mut self.ctx);
625 }
626
627 fn name(&self) -> &'static str {
628 self.name
629 }
630}
631
632impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> IntoReactor<C, ()> for F {
633 type Reactor = ReactorFn<C, F, ()>;
634
635 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
636 ReactorFn {
637 ctx,
638 f: self,
639 state: <() as Param>::init(registry),
640 name: std::any::type_name::<F>(),
641 }
642 }
643}
644
645macro_rules! impl_into_reactor {
650 ($($P:ident),+) => {
651 impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
652 Reactor for ReactorFn<C, F, ($($P,)+)>
653 where
654 for<'a> &'a mut F:
655 FnMut(&mut C, $($P,)+) +
656 FnMut(&mut C, $($P::Item<'a>,)+),
657 {
658 #[allow(non_snake_case)]
659 fn run(&mut self, world: &mut World) {
660 fn call_inner<Ctx, $($P,)+>(
661 mut f: impl FnMut(&mut Ctx, $($P,)+),
662 ctx: &mut Ctx,
663 $($P: $P,)+
664 ) {
665 f(ctx, $($P,)+);
666 }
667
668 #[cfg(debug_assertions)]
672 world.clear_borrows();
673 let ($($P,)+) = unsafe {
674 <($($P,)+) as Param>::fetch(world, &mut self.state)
675 };
676 call_inner(&mut self.f, &mut self.ctx, $($P,)+);
677 }
678
679 fn name(&self) -> &'static str {
680 self.name
681 }
682 }
683
684 impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
685 IntoReactor<C, ($($P,)+)> for F
686 where
687 for<'a> &'a mut F:
688 FnMut(&mut C, $($P,)+) +
689 FnMut(&mut C, $($P::Item<'a>,)+),
690 {
691 type Reactor = ReactorFn<C, F, ($($P,)+)>;
692
693 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
694 let state = <($($P,)+) as Param>::init(registry);
695 {
696 #[allow(non_snake_case)]
697 let ($($P,)+) = &state;
698 registry.check_access(&[
699 $(
700 (<$P as Param>::resource_id($P),
701 std::any::type_name::<$P>()),
702 )+
703 ]);
704 }
705 ReactorFn {
706 ctx,
707 f: self,
708 state,
709 name: std::any::type_name::<F>(),
710 }
711 }
712 }
713 };
714}
715
716all_tuples!(impl_into_reactor);
717
718#[derive(Default)]
728pub struct DeferredRemovals {
729 tokens: Vec<Token>,
730}
731
732impl DeferredRemovals {
733 pub fn deregister(&mut self, token: Token) {
738 self.tokens.push(token);
739 }
740
741 #[inline]
744 pub(crate) fn take(&mut self) -> Vec<Token> {
745 std::mem::take(&mut self.tokens)
746 }
747
748 #[inline]
750 pub(crate) fn put(&mut self, tokens: Vec<Token>) {
751 debug_assert!(tokens.is_empty(), "put() expects a drained Vec");
752 self.tokens = tokens;
753 }
754
755 pub fn is_empty(&self) -> bool {
757 self.tokens.is_empty()
758 }
759}
760
761impl Resource for DeferredRemovals {}
762
763#[derive(Default)]
793pub struct SourceRegistry {
794 maps: FxHashMap<TypeId, Box<dyn Any + Send>>,
795}
796
797impl SourceRegistry {
798 pub fn new() -> Self {
800 Self::default()
801 }
802
803 pub fn insert<K: Hash + Eq + Send + 'static>(&mut self, key: K, source: DataSource) {
807 self.get_or_create_map::<K>().insert(key, source);
808 }
809
810 pub fn get<K: Hash + Eq + Send + 'static>(&self, key: &K) -> Option<DataSource> {
814 self.get_map::<K>().and_then(|map| map.get(key)).copied()
815 }
816
817 pub fn remove<K: Hash + Eq + Send + 'static>(&mut self, key: &K) -> Option<DataSource> {
820 self.get_map_mut::<K>().and_then(|map| map.remove(key))
821 }
822
823 pub fn contains<K: Hash + Eq + Send + 'static>(&self, key: &K) -> bool {
825 self.get_map::<K>().is_some_and(|map| map.contains_key(key))
826 }
827
828 fn get_map<K: Hash + Eq + Send + 'static>(&self) -> Option<&FxHashMap<K, DataSource>> {
829 self.maps.get(&TypeId::of::<K>()).map(|boxed| {
830 boxed
832 .downcast_ref::<FxHashMap<K, DataSource>>()
833 .expect("invariant: TypeId matches stored map type")
834 })
835 }
836
837 fn get_map_mut<K: Hash + Eq + Send + 'static>(
838 &mut self,
839 ) -> Option<&mut FxHashMap<K, DataSource>> {
840 self.maps.get_mut(&TypeId::of::<K>()).map(|boxed| {
841 boxed
843 .downcast_mut::<FxHashMap<K, DataSource>>()
844 .expect("invariant: TypeId matches stored map type")
845 })
846 }
847
848 fn get_or_create_map<K: Hash + Eq + Send + 'static>(
849 &mut self,
850 ) -> &mut FxHashMap<K, DataSource> {
851 self.maps
852 .entry(TypeId::of::<K>())
853 .or_insert_with(|| Box::<FxHashMap<K, DataSource>>::default())
854 .downcast_mut::<FxHashMap<K, DataSource>>()
856 .unwrap()
857 }
858}
859
860impl Resource for SourceRegistry {}
861
862pub struct ReactorSystem {
873 events: Events,
875
876 notify_id: ResourceId,
878 removals_id: ResourceId,
879}
880
881impl ReactorSystem {
882 pub fn new(world: &World) -> Self {
886 Self {
887 events: Events::with_capacity(256),
888 notify_id: world.id::<ReactorNotify>(),
889 removals_id: world.id::<DeferredRemovals>(),
890 }
891 }
892
893 pub fn dispatch(&mut self, world: &mut World) -> bool {
901 let notify_ptr: *mut ReactorNotify =
904 unsafe { world.get_mut::<ReactorNotify>(self.notify_id) };
905
906 {
908 let notify = unsafe { &mut *notify_ptr };
909 notify.poll(&mut self.events);
910 }
911 let ran = !self.events.is_empty();
912
913 for token in self.events.iter() {
916 let idx = token.index();
917 let reactor = {
920 let notify = unsafe { &mut *notify_ptr };
921 notify.take_reactor(idx)
922 };
923 if let Some(mut reactor) = reactor {
924 reactor.run(world);
925 let notify = unsafe { &mut *notify_ptr };
928 notify.put_reactor(idx, reactor);
929 }
930 }
931
932 let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
936 let mut pending = removals.take();
937 if !pending.is_empty() {
938 let notify = unsafe { &mut *notify_ptr };
940 while let Some(token) = pending.pop() {
941 notify.remove_reactor(token);
942 }
943 }
944 let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
946 removals.put(pending);
947
948 ran
949 }
950
951 pub fn reactor_count(&self, world: &World) -> usize {
953 world.resource::<ReactorNotify>().reactor_count()
954 }
955}
956
957#[cfg(test)]
962mod tests {
963 use super::*;
964 use crate::{Res, ResMut, WorldBuilder};
965
966 #[test]
969 fn reactor_fn_arity0() {
970 let wb = WorldBuilder::new();
971 let mut world = wb.build();
972 let reg = world.registry();
973
974 struct Ctx {
975 count: u32,
976 }
977
978 fn step(ctx: &mut Ctx) {
979 ctx.count += 1;
980 }
981
982 let mut reactor = step.into_reactor(Ctx { count: 0 }, reg);
983 reactor.run(&mut world);
984 assert_eq!(reactor.ctx.count, 1);
985 }
986
987 #[test]
988 fn reactor_fn_with_params() {
989 let mut wb = WorldBuilder::new();
990 wb.register::<u64>(10);
991 wb.register::<u32>(0);
992 let mut world = wb.build();
993 let reg = world.registry();
994
995 struct Ctx {
996 multiplier: u64,
997 }
998
999 fn step(ctx: &mut Ctx, val: Res<u64>, mut out: ResMut<u32>) {
1000 *out = (*val * ctx.multiplier) as u32;
1001 }
1002
1003 let mut reactor = step.into_reactor(Ctx { multiplier: 5 }, reg);
1004 reactor.run(&mut world);
1005 assert_eq!(*world.resource::<u32>(), 50);
1006 }
1007
1008 fn dummy_reactor() -> ReactorFn<(), fn(&mut ()), ()> {
1011 ReactorFn {
1012 ctx: (),
1013 f: (|(): &mut ()| {}) as fn(&mut ()),
1014 state: (),
1015 name: "dummy",
1016 }
1017 }
1018
1019 #[test]
1020 fn reactor_notify_mark_fans_out() {
1021 let mut notify = ReactorNotify::new(4, 8);
1022 let mut events = Events::with_capacity(8);
1023
1024 let src = notify.register_source();
1025 let a1 = notify.register_built(dummy_reactor()).token();
1026 let a2 = notify.register_built(dummy_reactor()).token();
1027 let _a3 = notify.register_built(dummy_reactor()).token();
1028
1029 notify.subscribe(a1, src);
1030 notify.subscribe(a2, src);
1031 notify.mark(src);
1034 notify.notify.poll(&mut events);
1035
1036 assert_eq!(events.len(), 2);
1037 assert!(events.as_slice().contains(&a1));
1038 assert!(events.as_slice().contains(&a2));
1039 }
1040
1041 #[test]
1042 fn reactor_notify_dedup_across_sources() {
1043 let mut notify = ReactorNotify::new(4, 8);
1044 let mut events = Events::with_capacity(8);
1045
1046 let src1 = notify.register_source();
1047 let src2 = notify.register_source();
1048 let reactor = notify.register_built(dummy_reactor()).token();
1049
1050 notify.subscribe(reactor, src1);
1051 notify.subscribe(reactor, src2);
1052
1053 notify.mark(src1);
1054 notify.mark(src2);
1055
1056 notify.notify.poll(&mut events);
1057 assert_eq!(events.len(), 1);
1058 assert_eq!(events.as_slice()[0], reactor);
1059 }
1060
1061 #[test]
1062 fn reactor_notify_remove_reactor() {
1063 let mut notify = ReactorNotify::new(4, 8);
1064 let mut events = Events::with_capacity(8);
1065
1066 let src = notify.register_source();
1067
1068 struct Ctx;
1069 let token = notify
1070 .register_built(ReactorFn {
1071 ctx: Ctx,
1072 f: (|_: &mut Ctx| {}) as fn(&mut Ctx),
1073 state: (),
1074 name: "test",
1075 })
1076 .token();
1077 notify.subscribe(token, src);
1078
1079 notify.remove_reactor(token);
1080 notify.mark(src);
1081 notify.notify.poll(&mut events);
1082 assert!(events.is_empty());
1083 }
1084
1085 #[allow(clippy::mut_from_ref)]
1091 fn notify_mut(world: &World, id: ResourceId) -> &mut ReactorNotify {
1092 unsafe { world.get_mut::<ReactorNotify>(id) }
1093 }
1094
1095 #[test]
1096 fn reactor_system_dispatch() {
1097 let mut wb = WorldBuilder::new();
1098 wb.register::<u64>(0);
1099 wb.register(ReactorNotify::new(4, 8));
1100 wb.register(DeferredRemovals::default());
1101 let mut world = wb.build();
1102 let reg = world.registry();
1103 let nid = world.id::<ReactorNotify>();
1104
1105 let mut system = ReactorSystem::new(&world);
1106
1107 struct Ctx {
1108 _reactor_id: Token,
1109 increment: u64,
1110 }
1111
1112 fn step(ctx: &mut Ctx, mut val: ResMut<u64>) {
1113 *val += ctx.increment;
1114 }
1115
1116 let notify = notify_mut(&world, nid);
1117 let src = notify.register_source();
1118 notify
1119 .register(
1120 |t| Ctx {
1121 _reactor_id: t,
1122 increment: 10,
1123 },
1124 step,
1125 reg,
1126 )
1127 .subscribe(src);
1128 notify
1129 .register(
1130 |t| Ctx {
1131 _reactor_id: t,
1132 increment: 5,
1133 },
1134 step,
1135 reg,
1136 )
1137 .subscribe(src);
1138
1139 notify_mut(&world, nid).mark(src);
1141 let ran = system.dispatch(&mut world);
1142
1143 assert!(ran);
1144 assert_eq!(*world.resource::<u64>(), 15); }
1146
1147 #[test]
1148 fn reactor_system_deferred_removal() {
1149 let mut wb = WorldBuilder::new();
1150 wb.register::<u64>(0);
1151 wb.register(ReactorNotify::new(4, 8));
1152 wb.register(DeferredRemovals::default());
1153 let mut world = wb.build();
1154 let reg = world.registry();
1155 let nid = world.id::<ReactorNotify>();
1156
1157 let mut system = ReactorSystem::new(&world);
1158
1159 struct Ctx {
1160 reactor_id: Token,
1161 runs: u64,
1162 }
1163
1164 fn step(ctx: &mut Ctx, mut val: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1165 *val += 1;
1166 ctx.runs += 1;
1167 if ctx.runs >= 2 {
1168 removals.deregister(ctx.reactor_id);
1169 }
1170 }
1171
1172 let notify = notify_mut(&world, nid);
1173 let src = notify.register_source();
1174 notify
1175 .register(
1176 |t| Ctx {
1177 reactor_id: t,
1178 runs: 0,
1179 },
1180 step,
1181 reg,
1182 )
1183 .subscribe(src);
1184
1185 assert_eq!(system.reactor_count(&world), 1);
1186
1187 notify_mut(&world, nid).mark(src);
1189 system.dispatch(&mut world);
1190 assert_eq!(*world.resource::<u64>(), 1);
1191 assert_eq!(system.reactor_count(&world), 1);
1192
1193 notify_mut(&world, nid).mark(src);
1195 system.dispatch(&mut world);
1196 assert_eq!(*world.resource::<u64>(), 2);
1197 assert_eq!(system.reactor_count(&world), 0);
1198
1199 notify_mut(&world, nid).mark(src);
1201 let ran = system.dispatch(&mut world);
1202 assert!(!ran);
1203 assert_eq!(*world.resource::<u64>(), 2);
1204 }
1205
1206 #[test]
1207 fn reactor_system_only_subscribed_wake() {
1208 let mut wb = WorldBuilder::new();
1209 wb.register::<u64>(0);
1210 wb.register(ReactorNotify::new(4, 8));
1211 wb.register(DeferredRemovals::default());
1212 let mut world = wb.build();
1213 let reg = world.registry();
1214 let nid = world.id::<ReactorNotify>();
1215
1216 let mut system = ReactorSystem::new(&world);
1217
1218 struct Ctx {
1219 _reactor_id: Token,
1220 value: u64,
1221 }
1222
1223 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1224 *out += ctx.value;
1225 }
1226
1227 let notify = notify_mut(&world, nid);
1228 let btc = notify.register_source();
1229 let eth = notify.register_source();
1230
1231 notify
1232 .register(
1233 |t| Ctx {
1234 _reactor_id: t,
1235 value: 10,
1236 },
1237 step,
1238 reg,
1239 )
1240 .subscribe(btc);
1241 notify
1242 .register(
1243 |t| Ctx {
1244 _reactor_id: t,
1245 value: 100,
1246 },
1247 step,
1248 reg,
1249 )
1250 .subscribe(eth);
1251
1252 notify_mut(&world, nid).mark(btc);
1254 system.dispatch(&mut world);
1255 assert_eq!(*world.resource::<u64>(), 10);
1256
1257 notify_mut(&world, nid).mark(eth);
1259 system.dispatch(&mut world);
1260 assert_eq!(*world.resource::<u64>(), 110);
1261 }
1262
1263 #[test]
1264 fn runtime_registration() {
1265 let mut wb = WorldBuilder::new();
1266 wb.register::<u64>(0);
1267 wb.register(ReactorNotify::new(4, 8));
1268 wb.register(DeferredRemovals::default());
1269 let mut world = wb.build();
1270 let nid = world.id::<ReactorNotify>();
1271
1272 let mut system = ReactorSystem::new(&world);
1273
1274 struct Ctx {
1275 _reactor_id: Token,
1276 value: u64,
1277 }
1278
1279 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1280 *out += ctx.value;
1281 }
1282
1283 let src = {
1285 let reg = world.registry();
1286 let notify = notify_mut(&world, nid);
1287 let src = notify.register_source();
1288 notify
1289 .register(
1290 |t| Ctx {
1291 _reactor_id: t,
1292 value: 10,
1293 },
1294 step,
1295 reg,
1296 )
1297 .subscribe(src);
1298 src
1299 };
1300
1301 notify_mut(&world, nid).mark(src);
1303 system.dispatch(&mut world);
1304 assert_eq!(*world.resource::<u64>(), 10);
1305
1306 {
1308 let reg = world.registry();
1309 notify_mut(&world, nid)
1310 .register(
1311 |t| Ctx {
1312 _reactor_id: t,
1313 value: 100,
1314 },
1315 step,
1316 reg,
1317 )
1318 .subscribe(src);
1319 }
1320
1321 notify_mut(&world, nid).mark(src);
1323 system.dispatch(&mut world);
1324 assert_eq!(*world.resource::<u64>(), 120); }
1326
1327 #[test]
1328 fn register_after_remove_reuses_key() {
1329 let mut wb = WorldBuilder::new();
1330 wb.register::<u64>(0);
1331 wb.register(ReactorNotify::new(4, 8));
1332 wb.register(DeferredRemovals::default());
1333 let mut world = wb.build();
1334 let nid = world.id::<ReactorNotify>();
1335
1336 let mut system = ReactorSystem::new(&world);
1337
1338 struct Ctx {
1339 reactor_id: Token,
1340 value: u64,
1341 }
1342
1343 fn step(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1344 *out += ctx.value;
1345 if ctx.value == 10 {
1346 removals.deregister(ctx.reactor_id);
1347 }
1348 }
1349
1350 let src = {
1351 let reg = world.registry();
1352 let notify = notify_mut(&world, nid);
1353 let src = notify.register_source();
1354 notify
1355 .register(
1356 |t| Ctx {
1357 reactor_id: t,
1358 value: 10,
1359 },
1360 step,
1361 reg,
1362 )
1363 .subscribe(src);
1364 src
1365 };
1366
1367 notify_mut(&world, nid).mark(src);
1369 system.dispatch(&mut world);
1370 assert_eq!(*world.resource::<u64>(), 10);
1371 assert_eq!(system.reactor_count(&world), 0);
1372
1373 {
1375 let reg = world.registry();
1376 let notify = notify_mut(&world, nid);
1377 let token = notify
1378 .register(
1379 |t| Ctx {
1380 reactor_id: t,
1381 value: 100,
1382 },
1383 step,
1384 reg,
1385 )
1386 .token();
1387 notify.subscribe(token, src);
1388 assert_eq!(token.index(), 0); }
1390
1391 notify_mut(&world, nid).mark(src);
1393 system.dispatch(&mut world);
1394 assert_eq!(*world.resource::<u64>(), 110); assert_eq!(system.reactor_count(&world), 1); }
1397
1398 #[test]
1399 fn reactor_can_access_actor_notify() {
1400 let mut wb = WorldBuilder::new();
1403 wb.register::<u64>(0);
1404 wb.register(ReactorNotify::new(4, 8));
1405 wb.register(DeferredRemovals::default());
1406 let mut world = wb.build();
1407 let nid = world.id::<ReactorNotify>();
1408
1409 let mut system = ReactorSystem::new(&world);
1410
1411 struct Ctx {
1412 _reactor_id: Token,
1413 }
1414
1415 fn step(_ctx: &mut Ctx, notify: ResMut<ReactorNotify>, mut out: ResMut<u64>) {
1416 *out = notify.reactor_count() as u64;
1418 }
1419
1420 let src = {
1421 let reg = world.registry();
1422 let notify = notify_mut(&world, nid);
1423 let src = notify.register_source();
1424 notify
1425 .register(|t| Ctx { _reactor_id: t }, step, reg)
1426 .subscribe(src);
1427 src
1428 };
1429
1430 notify_mut(&world, nid).mark(src);
1431 system.dispatch(&mut world);
1432 }
1438
1439 #[test]
1442 fn multi_instrument_with_shared_source() {
1443 let mut wb = WorldBuilder::new();
1446 wb.register::<u64>(0);
1447 wb.register(ReactorNotify::new(8, 16));
1448 wb.register(DeferredRemovals::default());
1449 let mut world = wb.build();
1450 let nid = world.id::<ReactorNotify>();
1451 let mut system = ReactorSystem::new(&world);
1452
1453 struct Ctx {
1454 _reactor_id: Token,
1455 instrument: &'static str,
1456 }
1457
1458 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1459 *out += match ctx.instrument {
1461 "BTC" => 100,
1462 "ETH" => 10,
1463 "SOL" => 1,
1464 _ => 0,
1465 };
1466 }
1467
1468 let (btc_md, eth_md, sol_md, positions) = {
1469 let reg = world.registry();
1470 let notify = notify_mut(&world, nid);
1471
1472 let btc_md = notify.register_source();
1474 let eth_md = notify.register_source();
1475 let sol_md = notify.register_source();
1476 let positions = notify.register_source();
1478
1479 notify
1481 .register(
1482 |t| Ctx {
1483 _reactor_id: t,
1484 instrument: "BTC",
1485 },
1486 step,
1487 reg,
1488 )
1489 .subscribe(btc_md)
1490 .subscribe(positions);
1491
1492 notify
1494 .register(
1495 |t| Ctx {
1496 _reactor_id: t,
1497 instrument: "ETH",
1498 },
1499 step,
1500 reg,
1501 )
1502 .subscribe(eth_md)
1503 .subscribe(positions);
1504
1505 notify
1507 .register(
1508 |t| Ctx {
1509 _reactor_id: t,
1510 instrument: "SOL",
1511 },
1512 step,
1513 reg,
1514 )
1515 .subscribe(sol_md)
1516 .subscribe(positions);
1517
1518 (btc_md, eth_md, sol_md, positions)
1519 };
1520
1521 notify_mut(&world, nid).mark(btc_md);
1523 system.dispatch(&mut world);
1524 assert_eq!(*world.resource::<u64>(), 100);
1525
1526 notify_mut(&world, nid).mark(positions);
1528 system.dispatch(&mut world);
1529 assert_eq!(*world.resource::<u64>(), 211); notify_mut(&world, nid).mark(btc_md);
1533 notify_mut(&world, nid).mark(eth_md);
1534 system.dispatch(&mut world);
1535 assert_eq!(*world.resource::<u64>(), 321); notify_mut(&world, nid).mark(btc_md);
1539 notify_mut(&world, nid).mark(positions);
1540 system.dispatch(&mut world);
1541 assert_eq!(*world.resource::<u64>(), 432); let ran = system.dispatch(&mut world);
1546 assert!(!ran);
1547 assert_eq!(*world.resource::<u64>(), 432);
1548
1549 notify_mut(&world, nid).mark(sol_md);
1551 system.dispatch(&mut world);
1552 assert_eq!(*world.resource::<u64>(), 433);
1553 }
1554
1555 #[test]
1556 fn per_reactor_fill_routing() {
1557 use std::collections::HashMap;
1561
1562 let mut wb = WorldBuilder::new();
1563 wb.register::<u64>(0);
1564 wb.register(ReactorNotify::new(8, 16));
1565 wb.register(DeferredRemovals::default());
1566 let mut world = wb.build();
1567 let nid = world.id::<ReactorNotify>();
1568 let mut system = ReactorSystem::new(&world);
1569
1570 struct Ctx {
1571 reactor_id: Token,
1572 }
1573
1574 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1575 *out += ctx.reactor_id.index() as u64 + 1;
1576 }
1577
1578 let mut fill_sources: HashMap<usize, DataSource> = HashMap::new();
1580
1581 {
1582 let reg = world.registry();
1583 let notify = notify_mut(&world, nid);
1584
1585 for _ in 0..3 {
1586 let fill_src = notify.register_source();
1588 let token = notify
1589 .register(|t| Ctx { reactor_id: t }, step, reg)
1590 .subscribe(fill_src)
1591 .token();
1592
1593 fill_sources.insert(token.index(), fill_src);
1594 }
1595 }
1596
1597 let wire_client_id: usize = 1;
1599 let fill_source = fill_sources[&wire_client_id];
1600 notify_mut(&world, nid).mark(fill_source);
1601 system.dispatch(&mut world);
1602 assert_eq!(*world.resource::<u64>(), 2);
1604
1605 let fill_source = fill_sources[&0];
1607 notify_mut(&world, nid).mark(fill_source);
1608 system.dispatch(&mut world);
1609 assert_eq!(*world.resource::<u64>(), 3);
1611 }
1612
1613 #[test]
1614 fn dynamic_source_registration() {
1615 let mut wb = WorldBuilder::new();
1618 wb.register::<u64>(0);
1619 wb.register(ReactorNotify::new(4, 8));
1620 wb.register(DeferredRemovals::default());
1621 let mut world = wb.build();
1622 let nid = world.id::<ReactorNotify>();
1623 let mut system = ReactorSystem::new(&world);
1624
1625 struct Ctx {
1626 _reactor_id: Token,
1627 value: u64,
1628 }
1629
1630 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1631 *out += ctx.value;
1632 }
1633
1634 let btc_md = {
1636 let reg = world.registry();
1637 let notify = notify_mut(&world, nid);
1638 let btc_md = notify.register_source();
1639 notify
1640 .register(
1641 |t| Ctx {
1642 _reactor_id: t,
1643 value: 10,
1644 },
1645 step,
1646 reg,
1647 )
1648 .subscribe(btc_md);
1649 btc_md
1650 };
1651
1652 notify_mut(&world, nid).mark(btc_md);
1653 system.dispatch(&mut world);
1654 assert_eq!(*world.resource::<u64>(), 10);
1655
1656 let eth_md = {
1658 let reg = world.registry();
1659 let notify = notify_mut(&world, nid);
1660 let eth_md = notify.register_source();
1661 notify
1662 .register(
1663 |t| Ctx {
1664 _reactor_id: t,
1665 value: 100,
1666 },
1667 step,
1668 reg,
1669 )
1670 .subscribe(eth_md);
1671 eth_md
1672 };
1673
1674 notify_mut(&world, nid).mark(btc_md);
1676 notify_mut(&world, nid).mark(eth_md);
1677 system.dispatch(&mut world);
1678 assert_eq!(*world.resource::<u64>(), 120); }
1680
1681 #[test]
1684 fn remove_source_and_reuse_slot() {
1685 let mut wb = WorldBuilder::new();
1686 wb.register::<u64>(0);
1687 wb.register(ReactorNotify::new(4, 8));
1688 wb.register(DeferredRemovals::default());
1689 let mut world = wb.build();
1690 let nid = world.id::<ReactorNotify>();
1691 let mut system = ReactorSystem::new(&world);
1692
1693 struct Ctx {
1694 _reactor_id: Token,
1695 value: u64,
1696 }
1697
1698 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1699 *out += ctx.value;
1700 }
1701
1702 let (src_a, src_b) = {
1704 let reg = world.registry();
1705 let notify = notify_mut(&world, nid);
1706 let src_a = notify.register_source();
1707 let src_b = notify.register_source();
1708 notify
1709 .register(
1710 |t| Ctx {
1711 _reactor_id: t,
1712 value: 10,
1713 },
1714 step,
1715 reg,
1716 )
1717 .subscribe(src_a);
1718 notify
1719 .register(
1720 |t| Ctx {
1721 _reactor_id: t,
1722 value: 100,
1723 },
1724 step,
1725 reg,
1726 )
1727 .subscribe(src_b);
1728 (src_a, src_b)
1729 };
1730
1731 notify_mut(&world, nid).remove_source(src_a);
1733
1734 notify_mut(&world, nid).mark(src_a);
1736 let ran = system.dispatch(&mut world);
1737 assert!(!ran);
1738
1739 notify_mut(&world, nid).mark(src_b);
1741 system.dispatch(&mut world);
1742 assert_eq!(*world.resource::<u64>(), 100);
1743
1744 let src_c = notify_mut(&world, nid).register_source();
1746 assert_eq!(src_c.0, src_a.0); let reg = world.registry();
1750 let notify = notify_mut(&world, nid);
1751 notify
1752 .register(
1753 |t| Ctx {
1754 _reactor_id: t,
1755 value: 1,
1756 },
1757 step,
1758 reg,
1759 )
1760 .subscribe(src_c);
1761
1762 notify_mut(&world, nid).mark(src_c);
1763 system.dispatch(&mut world);
1764 assert_eq!(*world.resource::<u64>(), 101); }
1766
1767 #[test]
1770 fn source_registry_basic() {
1771 let mut registry = SourceRegistry::new();
1772
1773 #[derive(Hash, Eq, PartialEq, Debug)]
1774 struct InstrumentId(u32);
1775
1776 let src_a = DataSource(0);
1777 let src_b = DataSource(1);
1778
1779 registry.insert(InstrumentId(1), src_a);
1780 registry.insert(InstrumentId(2), src_b);
1781
1782 assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1783 assert_eq!(registry.get(&InstrumentId(2)), Some(src_b));
1784 assert_eq!(registry.get(&InstrumentId(3)), None);
1785 assert!(registry.contains(&InstrumentId(1)));
1786 assert!(!registry.contains(&InstrumentId(3)));
1787 }
1788
1789 #[test]
1790 fn source_registry_multiple_key_types() {
1791 let mut registry = SourceRegistry::new();
1792
1793 #[derive(Hash, Eq, PartialEq)]
1794 struct InstrumentId(u32);
1795
1796 #[derive(Hash, Eq, PartialEq)]
1797 struct StrategyId(u32);
1798
1799 let src_a = DataSource(0);
1800 let src_b = DataSource(1);
1801
1802 registry.insert(InstrumentId(1), src_a);
1804 registry.insert(StrategyId(1), src_b);
1805
1806 assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1808 assert_eq!(registry.get(&StrategyId(1)), Some(src_b));
1809 }
1810
1811 #[test]
1812 fn source_registry_tuple_keys() {
1813 let mut registry = SourceRegistry::new();
1814
1815 let src = DataSource(42);
1816 registry.insert(("BTC", "Binance"), src);
1817
1818 assert_eq!(registry.get(&("BTC", "Binance")), Some(src));
1819 assert_eq!(registry.get(&("ETH", "Binance")), None);
1820 }
1821
1822 #[test]
1823 fn source_registry_remove() {
1824 let mut registry = SourceRegistry::new();
1825
1826 let src = DataSource(0);
1827 registry.insert(42u64, src);
1828
1829 assert_eq!(registry.remove(&42u64), Some(src));
1830 assert_eq!(registry.get(&42u64), None);
1831 assert_eq!(registry.remove(&42u64), None); }
1833
1834 #[test]
1835 fn source_registry_integrated_with_reactor_system() {
1836 let mut wb = WorldBuilder::new();
1837 wb.register::<u64>(0);
1838 wb.register(ReactorNotify::new(4, 8));
1839 wb.register(DeferredRemovals::default());
1840 wb.register(SourceRegistry::new());
1841 let mut world = wb.build();
1842 let nid = world.id::<ReactorNotify>();
1843 let mut system = ReactorSystem::new(&world);
1844
1845 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1846 struct Instrument(u32);
1847 const BTC: Instrument = Instrument(0);
1848 const ETH: Instrument = Instrument(1);
1849
1850 struct Ctx {
1851 _reactor_id: Token,
1852 value: u64,
1853 }
1854
1855 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1856 *out += ctx.value;
1857 }
1858
1859 let btc_src = notify_mut(&world, nid).register_source();
1861 let eth_src = notify_mut(&world, nid).register_source();
1862
1863 world.resource_mut::<SourceRegistry>().insert(BTC, btc_src);
1864 world.resource_mut::<SourceRegistry>().insert(ETH, eth_src);
1865
1866 {
1868 let reg = world.registry();
1869 let btc = world.resource::<SourceRegistry>().get(&BTC).unwrap();
1870 let notify = notify_mut(&world, nid);
1871 notify
1872 .register(
1873 |t| Ctx {
1874 _reactor_id: t,
1875 value: 10,
1876 },
1877 step,
1878 reg,
1879 )
1880 .subscribe(btc);
1881 }
1882
1883 notify_mut(&world, nid).mark(btc_src);
1885 system.dispatch(&mut world);
1886 assert_eq!(*world.resource::<u64>(), 10);
1887
1888 let removed = world.resource_mut::<SourceRegistry>().remove(&BTC);
1890 assert!(removed.is_some());
1891 notify_mut(&world, nid).remove_source(removed.unwrap());
1892
1893 notify_mut(&world, nid).mark(btc_src);
1895 let ran = system.dispatch(&mut world);
1896 assert!(!ran);
1897 }
1898
1899 #[test]
1902 fn source_registry_overwrite_key() {
1903 let mut registry = SourceRegistry::new();
1904 let src_a = DataSource(0);
1905 let src_b = DataSource(1);
1906
1907 registry.insert(42u32, src_a);
1908 assert_eq!(registry.get(&42u32), Some(src_a));
1909
1910 registry.insert(42u32, src_b);
1912 assert_eq!(registry.get(&42u32), Some(src_b));
1913 }
1914
1915 #[test]
1916 fn source_registry_empty_get() {
1917 let registry = SourceRegistry::new();
1918 assert_eq!(registry.get(&42u32), None);
1920 assert!(!registry.contains(&42u32));
1921 }
1922
1923 #[test]
1924 fn source_registry_enum_keys() {
1925 #[derive(Hash, Eq, PartialEq)]
1926 enum Venue {
1927 Binance,
1928 Coinbase,
1929 }
1930
1931 let mut registry = SourceRegistry::new();
1932 let src = DataSource(0);
1933 registry.insert(Venue::Binance, src);
1934
1935 assert_eq!(registry.get(&Venue::Binance), Some(src));
1936 assert_eq!(registry.get(&Venue::Coinbase), None);
1937 }
1938
1939 #[test]
1940 fn source_registry_composite_key() {
1941 #[derive(Hash, Eq, PartialEq)]
1943 struct StrategyId(u32);
1944 #[derive(Hash, Eq, PartialEq)]
1945 struct InstrumentId(u32);
1946 #[derive(Hash, Eq, PartialEq)]
1947 struct VenueId(u32);
1948
1949 let mut registry = SourceRegistry::new();
1950 let src = DataSource(5);
1951 registry.insert((StrategyId(1), InstrumentId(0), VenueId(2)), src);
1952
1953 assert_eq!(
1954 registry.get(&(StrategyId(1), InstrumentId(0), VenueId(2))),
1955 Some(src)
1956 );
1957 assert_eq!(
1959 registry.get(&(StrategyId(2), InstrumentId(0), VenueId(2))),
1960 None
1961 );
1962 }
1963
1964 #[test]
1967 fn full_lifecycle_add_trade_remove() {
1968 let mut wb = WorldBuilder::new();
1970 wb.register::<u64>(0);
1971 wb.register(ReactorNotify::new(4, 8));
1972 wb.register(DeferredRemovals::default());
1973 wb.register(SourceRegistry::new());
1974 let mut world = wb.build();
1975 let nid = world.id::<ReactorNotify>();
1976 let mut system = ReactorSystem::new(&world);
1977
1978 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1979 struct Instrument(u32);
1980
1981 struct Ctx {
1982 _reactor_id: Token,
1983 value: u64,
1984 }
1985
1986 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1987 *out += ctx.value;
1988 }
1989
1990 let btc_src = notify_mut(&world, nid).register_source();
1992 world
1993 .resource_mut::<SourceRegistry>()
1994 .insert(Instrument(0), btc_src);
1995
1996 {
1997 let reg = world.registry();
1998 let notify = notify_mut(&world, nid);
1999 notify
2000 .register(
2001 |t| Ctx {
2002 _reactor_id: t,
2003 value: 10,
2004 },
2005 step,
2006 reg,
2007 )
2008 .subscribe(btc_src);
2009 }
2010
2011 notify_mut(&world, nid).mark(btc_src);
2013 system.dispatch(&mut world);
2014 assert_eq!(*world.resource::<u64>(), 10);
2015
2016 let eth_src = notify_mut(&world, nid).register_source();
2018 world
2019 .resource_mut::<SourceRegistry>()
2020 .insert(Instrument(1), eth_src);
2021
2022 {
2023 let reg = world.registry();
2024 let notify = notify_mut(&world, nid);
2025 notify
2026 .register(
2027 |t| Ctx {
2028 _reactor_id: t,
2029 value: 100,
2030 },
2031 step,
2032 reg,
2033 )
2034 .subscribe(eth_src);
2035 }
2036
2037 notify_mut(&world, nid).mark(btc_src);
2039 notify_mut(&world, nid).mark(eth_src);
2040 system.dispatch(&mut world);
2041 assert_eq!(*world.resource::<u64>(), 120);
2042
2043 let removed = world
2045 .resource_mut::<SourceRegistry>()
2046 .remove(&Instrument(0));
2047 notify_mut(&world, nid).remove_source(removed.unwrap());
2048
2049 notify_mut(&world, nid).mark(eth_src);
2051 system.dispatch(&mut world);
2052 assert_eq!(*world.resource::<u64>(), 220);
2053
2054 let sol_src = notify_mut(&world, nid).register_source();
2056 world
2057 .resource_mut::<SourceRegistry>()
2058 .insert(Instrument(2), sol_src);
2059 assert_eq!(sol_src.0, btc_src.0); {
2062 let reg = world.registry();
2063 let notify = notify_mut(&world, nid);
2064 notify
2065 .register(
2066 |t| Ctx {
2067 _reactor_id: t,
2068 value: 1000,
2069 },
2070 step,
2071 reg,
2072 )
2073 .subscribe(sol_src);
2074 }
2075
2076 notify_mut(&world, nid).mark(sol_src);
2078 notify_mut(&world, nid).mark(eth_src);
2079 system.dispatch(&mut world);
2080 assert_eq!(*world.resource::<u64>(), 1320); }
2082
2083 #[test]
2084 fn multi_strategy_same_instrument() {
2085 let mut wb = WorldBuilder::new();
2087 wb.register::<u64>(0);
2088 wb.register(ReactorNotify::new(8, 16));
2089 wb.register(DeferredRemovals::default());
2090 wb.register(SourceRegistry::new());
2091 let mut world = wb.build();
2092 let nid = world.id::<ReactorNotify>();
2093 let mut system = ReactorSystem::new(&world);
2094
2095 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
2096 struct StrategyInstrument(&'static str, &'static str);
2097
2098 struct Ctx {
2099 _reactor_id: Token,
2100 value: u64,
2101 }
2102
2103 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2104 *out += ctx.value;
2105 }
2106
2107 let reg = world.registry();
2109 let notify = notify_mut(&world, nid);
2110
2111 let mm_btc = notify.register_source();
2112 let mm_eth = notify.register_source();
2113 let arb_btc = notify.register_source();
2114
2115 notify
2117 .register(
2118 |t| Ctx {
2119 _reactor_id: t,
2120 value: 1,
2121 },
2122 step,
2123 reg,
2124 )
2125 .subscribe(mm_btc);
2126 notify
2127 .register(
2128 |t| Ctx {
2129 _reactor_id: t,
2130 value: 2,
2131 },
2132 step,
2133 reg,
2134 )
2135 .subscribe(mm_eth);
2136
2137 notify
2139 .register(
2140 |t| Ctx {
2141 _reactor_id: t,
2142 value: 100,
2143 },
2144 step,
2145 reg,
2146 )
2147 .subscribe(arb_btc);
2148
2149 world
2151 .resource_mut::<SourceRegistry>()
2152 .insert(StrategyInstrument("MM", "BTC"), mm_btc);
2153 world
2154 .resource_mut::<SourceRegistry>()
2155 .insert(StrategyInstrument("MM", "ETH"), mm_eth);
2156 world
2157 .resource_mut::<SourceRegistry>()
2158 .insert(StrategyInstrument("ARB", "BTC"), arb_btc);
2159
2160 let mm_btc_src = world
2163 .resource::<SourceRegistry>()
2164 .get(&StrategyInstrument("MM", "BTC"))
2165 .unwrap();
2166 let arb_btc_src = world
2167 .resource::<SourceRegistry>()
2168 .get(&StrategyInstrument("ARB", "BTC"))
2169 .unwrap();
2170
2171 notify_mut(&world, nid).mark(mm_btc_src);
2172 notify_mut(&world, nid).mark(arb_btc_src);
2173 system.dispatch(&mut world);
2174 assert_eq!(*world.resource::<u64>(), 101); }
2176
2177 #[test]
2178 fn reactor_self_removal_with_registry_cleanup() {
2179 let mut wb = WorldBuilder::new();
2181 wb.register::<u64>(0);
2182 wb.register(ReactorNotify::new(4, 8));
2183 wb.register(DeferredRemovals::default());
2184 wb.register(SourceRegistry::new());
2185 let mut world = wb.build();
2186 let nid = world.id::<ReactorNotify>();
2187 let mut system = ReactorSystem::new(&world);
2188
2189 struct Ctx {
2190 reactor_id: Token,
2191 }
2192
2193 fn one_shot(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
2194 *out += 1;
2195 removals.deregister(ctx.reactor_id);
2196 }
2197
2198 let src = notify_mut(&world, nid).register_source();
2199 world
2200 .resource_mut::<SourceRegistry>()
2201 .insert("one-shot", src);
2202
2203 {
2204 let reg = world.registry();
2205 let notify = notify_mut(&world, nid);
2206 notify
2207 .register(|t| Ctx { reactor_id: t }, one_shot, reg)
2208 .subscribe(src);
2209 }
2210
2211 notify_mut(&world, nid).mark(src);
2213 system.dispatch(&mut world);
2214 assert_eq!(*world.resource::<u64>(), 1);
2215 assert_eq!(system.reactor_count(&world), 0);
2216
2217 assert!(world.resource::<SourceRegistry>().contains(&"one-shot"));
2219
2220 notify_mut(&world, nid).mark(src);
2222 let ran = system.dispatch(&mut world);
2223 assert!(!ran);
2224 }
2225
2226 #[test]
2227 fn many_reactors_same_source() {
2228 let mut wb = WorldBuilder::new();
2230 wb.register::<u64>(0);
2231 wb.register(ReactorNotify::new(4, 64));
2232 wb.register(DeferredRemovals::default());
2233 let mut world = wb.build();
2234 let nid = world.id::<ReactorNotify>();
2235 let mut system = ReactorSystem::new(&world);
2236
2237 struct Ctx {
2238 _reactor_id: Token,
2239 }
2240
2241 fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2242 *out += 1;
2243 }
2244
2245 let src = notify_mut(&world, nid).register_source();
2246
2247 {
2248 let reg = world.registry();
2249 let notify = notify_mut(&world, nid);
2250 for _ in 0..50 {
2251 notify
2252 .register(|t| Ctx { _reactor_id: t }, step, reg)
2253 .subscribe(src);
2254 }
2255 }
2256
2257 assert_eq!(system.reactor_count(&world), 50);
2258
2259 notify_mut(&world, nid).mark(src);
2260 system.dispatch(&mut world);
2261 assert_eq!(*world.resource::<u64>(), 50); }
2263
2264 #[test]
2265 fn reactor_subscribes_to_multiple_sources() {
2266 let mut wb = WorldBuilder::new();
2269 wb.register::<u64>(0);
2270 wb.register(ReactorNotify::new(8, 8));
2271 wb.register(DeferredRemovals::default());
2272 let mut world = wb.build();
2273 let nid = world.id::<ReactorNotify>();
2274 let mut system = ReactorSystem::new(&world);
2275
2276 struct Ctx {
2277 _reactor_id: Token,
2278 }
2279
2280 fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2281 *out += 1;
2282 }
2283
2284 let mut sources = Vec::new();
2285 let notify = notify_mut(&world, nid);
2286 for _ in 0..5 {
2287 sources.push(notify.register_source());
2288 }
2289
2290 {
2291 let reg = world.registry();
2292 let notify = notify_mut(&world, nid);
2293 let mut registration = notify.register(|t| Ctx { _reactor_id: t }, step, reg);
2294 for &src in &sources {
2295 registration = registration.subscribe(src);
2296 }
2297 }
2298
2299 for &src in &sources {
2301 notify_mut(&world, nid).mark(src);
2302 }
2303
2304 system.dispatch(&mut world);
2305 assert_eq!(*world.resource::<u64>(), 1); }
2307
2308 #[test]
2309 fn stale_data_source_is_noop() {
2310 let mut wb = WorldBuilder::new();
2312 wb.register(ReactorNotify::new(4, 4));
2313 wb.register(DeferredRemovals::default());
2314 let mut world = wb.build();
2315 let nid = world.id::<ReactorNotify>();
2316 let mut system = ReactorSystem::new(&world);
2317
2318 let src = notify_mut(&world, nid).register_source();
2319 notify_mut(&world, nid).remove_source(src);
2320
2321 notify_mut(&world, nid).mark(src);
2323 let ran = system.dispatch(&mut world);
2324 assert!(!ran);
2325 }
2326
2327 #[test]
2328 fn double_remove_source_is_noop() {
2329 let mut notify = ReactorNotify::new(4, 4);
2330 let src = notify.register_source();
2331 notify.remove_source(src);
2332 notify.remove_source(src); }
2334
2335 #[test]
2338 fn pipeline_reactor_dispatch() {
2339 use crate::CtxPipelineBuilder;
2340
2341 let mut wb = WorldBuilder::new();
2342 wb.register::<u64>(0);
2343 wb.register(ReactorNotify::new(4, 8));
2344 wb.register(DeferredRemovals::default());
2345 let mut world = wb.build();
2346 let nid = world.id::<ReactorNotify>();
2347 let mut system = ReactorSystem::new(&world);
2348
2349 struct Ctx {
2350 _reactor_id: Token,
2351 instrument: &'static str,
2352 }
2353
2354 fn read_data(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2355 let _ = ctx.instrument;
2356 *val
2357 }
2358
2359 fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2360 x * 2
2361 }
2362
2363 fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2364 *out = x;
2365 }
2366
2367 let reg = world.registry();
2368
2369 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2370 .then(read_data, reg)
2371 .then(double, reg)
2372 .then(store, reg)
2373 .build();
2374
2375 let notify = notify_mut(&world, nid);
2376 let src = notify.register_source();
2377
2378 let reactor = PipelineReactor::new(
2380 Ctx {
2381 _reactor_id: Token::new(0),
2382 instrument: "BTC",
2383 },
2384 pipeline,
2385 );
2386 notify.register_built(reactor).subscribe(src);
2387
2388 *world.resource_mut::<u64>() = 10;
2390 notify_mut(&world, nid).mark(src);
2391 system.dispatch(&mut world);
2392
2393 assert_eq!(*world.resource::<u64>(), 20); }
2395
2396 #[test]
2397 fn dag_reactor_dispatch() {
2398 use crate::CtxDagBuilder;
2399
2400 let mut wb = WorldBuilder::new();
2401 wb.register::<u64>(0);
2402 wb.register(ReactorNotify::new(4, 8));
2403 wb.register(DeferredRemovals::default());
2404 let mut world = wb.build();
2405 let nid = world.id::<ReactorNotify>();
2406 let mut system = ReactorSystem::new(&world);
2407
2408 struct Ctx {
2409 _reactor_id: Token,
2410 }
2411
2412 fn root(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2413 let _ = ctx;
2414 *val
2415 }
2416
2417 fn arm_double(_ctx: &mut Ctx, val: &u64) -> u64 {
2418 *val * 2
2419 }
2420
2421 fn arm_add(_ctx: &mut Ctx, val: &u64) -> u64 {
2422 *val + 10
2423 }
2424
2425 fn merge(_ctx: &mut Ctx, mut out: ResMut<u64>, a: &u64, b: &u64) {
2426 *out = *a + *b;
2427 }
2428
2429 let reg = world.registry();
2430
2431 let dag = CtxDagBuilder::<Ctx, ()>::new()
2432 .root(root, reg)
2433 .fork()
2434 .arm(|seed| seed.then(arm_double, reg))
2435 .arm(|seed| seed.then(arm_add, reg))
2436 .merge(merge, reg)
2437 .build();
2438
2439 let notify = notify_mut(&world, nid);
2440 let src = notify.register_source();
2441
2442 let reactor = PipelineReactor::new(
2443 Ctx {
2444 _reactor_id: Token::new(0),
2445 },
2446 dag,
2447 );
2448 notify.register_built(reactor).subscribe(src);
2449
2450 *world.resource_mut::<u64>() = 5;
2451 notify_mut(&world, nid).mark(src);
2452 system.dispatch(&mut world);
2453
2454 assert_eq!(*world.resource::<u64>(), 25);
2456 }
2457
2458 #[test]
2459 fn multiple_pipeline_reactors_different_bodies() {
2460 use crate::CtxPipelineBuilder;
2461
2462 let mut wb = WorldBuilder::new();
2463 wb.register::<u64>(0);
2464 wb.register(ReactorNotify::new(4, 8));
2465 wb.register(DeferredRemovals::default());
2466 let mut world = wb.build();
2467 let nid = world.id::<ReactorNotify>();
2468 let mut system = ReactorSystem::new(&world);
2469
2470 struct Ctx {
2471 _reactor_id: Token,
2472 factor: u64,
2473 }
2474
2475 fn multiply(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2476 *val * ctx.factor
2477 }
2478
2479 fn accumulate(_ctx: &mut Ctx, mut out: ResMut<u64>, val: u64) {
2480 *out += val;
2481 }
2482
2483 let reg = world.registry();
2484
2485 let pipeline_a = CtxPipelineBuilder::<Ctx, ()>::new()
2487 .then(multiply, reg)
2488 .then(accumulate, reg)
2489 .build();
2490
2491 let pipeline_b = CtxPipelineBuilder::<Ctx, ()>::new()
2493 .then(multiply, reg)
2494 .then(accumulate, reg)
2495 .build();
2496
2497 let notify = notify_mut(&world, nid);
2498 let src = notify.register_source();
2499
2500 notify
2501 .register_built(PipelineReactor::new(
2502 Ctx {
2503 _reactor_id: Token::new(0),
2504 factor: 2,
2505 },
2506 pipeline_a,
2507 ))
2508 .subscribe(src);
2509
2510 notify
2511 .register_built(PipelineReactor::new(
2512 Ctx {
2513 _reactor_id: Token::new(1),
2514 factor: 10,
2515 },
2516 pipeline_b,
2517 ))
2518 .subscribe(src);
2519
2520 *world.resource_mut::<u64>() = 5;
2521 notify_mut(&world, nid).mark(src);
2522 system.dispatch(&mut world);
2523
2524 let val = *world.resource::<u64>();
2529 assert!(val > 5, "both reactors should have run, got {val}");
2530 }
2531
2532 #[test]
2533 fn pipeline_reactor_with_guard() {
2534 use crate::CtxPipelineBuilder;
2535
2536 let mut wb = WorldBuilder::new();
2537 wb.register::<u64>(0);
2538 wb.register(ReactorNotify::new(4, 8));
2539 wb.register(DeferredRemovals::default());
2540 let mut world = wb.build();
2541 let nid = world.id::<ReactorNotify>();
2542 let mut system = ReactorSystem::new(&world);
2543
2544 struct Ctx {
2545 _reactor_id: Token,
2546 threshold: u64,
2547 }
2548
2549 fn read(_ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2550 *val
2551 }
2552
2553 fn above_threshold(ctx: &mut Ctx, val: &u64) -> bool {
2554 *val > ctx.threshold
2555 }
2556
2557 fn write(_ctx: &mut Ctx, mut out: ResMut<u64>, _val: u64) {
2558 *out = 999;
2559 }
2560
2561 let reg = world.registry();
2562
2563 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2564 .then(read, reg)
2565 .guard(above_threshold, reg)
2566 .map(write, reg)
2567 .build();
2568
2569 let notify = notify_mut(&world, nid);
2570 let src = notify.register_source();
2571
2572 notify
2573 .register_built(PipelineReactor::new(
2574 Ctx {
2575 _reactor_id: Token::new(0),
2576 threshold: 10,
2577 },
2578 pipeline,
2579 ))
2580 .subscribe(src);
2581
2582 *world.resource_mut::<u64>() = 5;
2584 notify_mut(&world, nid).mark(src);
2585 system.dispatch(&mut world);
2586 assert_eq!(*world.resource::<u64>(), 5); *world.resource_mut::<u64>() = 20;
2590 notify_mut(&world, nid).mark(src);
2591 system.dispatch(&mut world);
2592 assert_eq!(*world.resource::<u64>(), 999);
2593 }
2594
2595 #[test]
2598 fn two_phase_registration_safe_api() {
2599 let mut wb = WorldBuilder::new();
2602 wb.register::<u64>(0);
2603 wb.register(ReactorNotify::new(4, 8));
2604 wb.register(DeferredRemovals::default());
2605 let mut world = wb.build();
2606
2607 let mut system = ReactorSystem::new(&world);
2608
2609 struct Ctx {
2610 reactor_id: Token,
2611 instrument: &'static str,
2612 }
2613
2614 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2615 let _ = ctx.instrument;
2616 *out += ctx.reactor_id.index() as u64 + 1;
2617 }
2618
2619 let src = world.resource_mut::<ReactorNotify>().register_source();
2621 let token = world.resource_mut::<ReactorNotify>().create_reactor();
2622
2623 let reactor = step.into_reactor(
2625 Ctx {
2626 reactor_id: token,
2627 instrument: "BTC",
2628 },
2629 world.registry(),
2630 );
2631
2632 world
2634 .resource_mut::<ReactorNotify>()
2635 .insert_reactor(token, reactor)
2636 .subscribe(src);
2637
2638 world.resource_mut::<ReactorNotify>().mark(src);
2640 system.dispatch(&mut world);
2641 assert_eq!(*world.resource::<u64>(), 1); let token2 = world.resource_mut::<ReactorNotify>().create_reactor();
2645 let actor2 = step.into_reactor(
2646 Ctx {
2647 reactor_id: token2,
2648 instrument: "ETH",
2649 },
2650 world.registry(),
2651 );
2652 world
2653 .resource_mut::<ReactorNotify>()
2654 .insert_reactor(token2, actor2)
2655 .subscribe(src);
2656
2657 world.resource_mut::<ReactorNotify>().mark(src);
2659 system.dispatch(&mut world);
2660 assert_eq!(*world.resource::<u64>(), 4); }
2662
2663 #[test]
2664 fn two_phase_with_pipeline_reactor() {
2665 use crate::CtxPipelineBuilder;
2666
2667 let mut wb = WorldBuilder::new();
2668 wb.register::<u64>(0);
2669 wb.register(ReactorNotify::new(4, 8));
2670 wb.register(DeferredRemovals::default());
2671 let mut world = wb.build();
2672
2673 let mut system = ReactorSystem::new(&world);
2674
2675 struct Ctx {
2676 _reactor_id: Token,
2677 }
2678
2679 fn read(ctx: &mut Ctx, val: Res<u64>) -> u64 {
2680 let _ = ctx;
2681 *val
2682 }
2683
2684 fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2685 x * 2
2686 }
2687
2688 fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2689 *out = x;
2690 }
2691
2692 let src = world.resource_mut::<ReactorNotify>().register_source();
2694 let token = world.resource_mut::<ReactorNotify>().create_reactor();
2695
2696 let reg = world.registry();
2698 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2699 .then(crate::no_event(read), reg)
2700 .then(double, reg)
2701 .then(store, reg)
2702 .build();
2703 let reactor = PipelineReactor::new(Ctx { _reactor_id: token }, pipeline);
2704
2705 world
2707 .resource_mut::<ReactorNotify>()
2708 .insert_reactor(token, reactor)
2709 .subscribe(src);
2710
2711 *world.resource_mut::<u64>() = 10;
2712 world.resource_mut::<ReactorNotify>().mark(src);
2713 system.dispatch(&mut world);
2714 assert_eq!(*world.resource::<u64>(), 20); }
2716}