1#![allow(clippy::too_many_arguments)]
127
128use std::any::{Any, TypeId};
129use std::hash::Hash;
130
131use nexus_notify::local::LocalNotify;
132use nexus_notify::{Events, Token};
133use rustc_hash::FxHashMap;
134
135use crate::ctx_pipeline::CtxStepCall;
136use crate::handler::Param;
137use crate::world::{Registry, Resource, ResourceId, World};
138
139#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
156pub struct DataSource(pub usize);
157
158pub struct ReactorNotify {
172 notify: LocalNotify,
174
175 interests: slab::Slab<Vec<Token>>,
179
180 reactor_sources: Vec<Vec<DataSource>>,
183
184 reactors: slab::Slab<Option<Box<dyn Reactor>>>,
189}
190
191impl std::fmt::Debug for ReactorNotify {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("ReactorNotify")
195 .field("num_sources", &self.interests.len())
196 .field("num_reactors", &self.reactors.len())
197 .field("notify", &self.notify)
198 .finish()
199 }
200}
201
202impl ReactorNotify {
203 pub fn new(source_capacity: usize, reactor_capacity: usize) -> Self {
205 Self {
206 notify: LocalNotify::with_capacity(reactor_capacity),
207 interests: slab::Slab::with_capacity(source_capacity),
208 reactor_sources: Vec::with_capacity(reactor_capacity),
209 reactors: slab::Slab::with_capacity(reactor_capacity),
210 }
211 }
212
213 pub fn register_source(&mut self) -> DataSource {
219 DataSource(self.interests.insert(Vec::new()))
220 }
221
222 pub fn remove_source(&mut self, source: DataSource) {
227 if self.interests.contains(source.0) {
228 self.interests.remove(source.0);
229 }
230 }
231
232 pub fn create_reactor(&mut self) -> Token {
259 let key = self.reactors.insert(None);
262 self.notify.ensure_capacity(key);
263 if key >= self.reactor_sources.len() {
265 self.reactor_sources.resize_with(key + 1, Vec::new);
266 }
267 Token::new(key)
268 }
269
270 pub fn insert_reactor(
280 &mut self,
281 token: Token,
282 reactor: impl Reactor + 'static,
283 ) -> ReactorRegistration<'_> {
284 let idx = token.index();
285 assert!(
286 self.reactors.contains(idx),
287 "token {} was not allocated by create_reactor",
288 idx,
289 );
290 assert!(
291 self.reactors[idx].is_none(),
292 "token {} was already filled",
293 idx,
294 );
295 self.reactors[idx] = Some(Box::new(reactor));
296 ReactorRegistration {
297 token,
298 notify: self,
299 }
300 }
301
302 pub fn register<C, Params, F: IntoReactor<C, Params>>(
309 &mut self,
310 ctx_fn: impl FnOnce(Token) -> C,
311 step: F,
312 registry: &Registry,
313 ) -> ReactorRegistration<'_> {
314 let key = self.reactors.vacant_key();
315 let token = Token::new(key);
316 self.notify.ensure_capacity(key);
317 if key >= self.reactor_sources.len() {
318 self.reactor_sources.resize_with(key + 1, Vec::new);
319 }
320 let ctx = ctx_fn(token);
321 let reactor = step.into_reactor(ctx, registry);
322 let inserted = self.reactors.insert(Some(Box::new(reactor)));
323 debug_assert_eq!(inserted, key);
324 ReactorRegistration {
325 token,
326 notify: self,
327 }
328 }
329
330 pub fn register_built(&mut self, reactor: impl Reactor + 'static) -> ReactorRegistration<'_> {
337 let key = self.reactors.vacant_key();
338 let token = Token::new(key);
339 self.notify.ensure_capacity(key);
340 if key >= self.reactor_sources.len() {
341 self.reactor_sources.resize_with(key + 1, Vec::new);
342 }
343 let inserted = self.reactors.insert(Some(Box::new(reactor)));
344 debug_assert_eq!(inserted, key);
345 ReactorRegistration {
346 token,
347 notify: self,
348 }
349 }
350
351 pub fn subscribe(&mut self, reactor: Token, source: DataSource) {
358 if let Some(subscribers) = self.interests.get_mut(source.0) {
359 if !subscribers.contains(&reactor) {
360 subscribers.push(reactor);
361 let idx = reactor.index();
363 debug_assert!(
364 idx < self.reactor_sources.len(),
365 "reactor_sources missing entry for reactor token {}",
366 idx,
367 );
368 self.reactor_sources[idx].push(source);
369 }
370 }
371 }
372
373 pub fn unsubscribe(&mut self, reactor: Token, source: DataSource) {
375 if let Some(subscribers) = self.interests.get_mut(source.0) {
376 subscribers.retain(|&t| t != reactor);
377 }
378 if let Some(sources) = self.reactor_sources.get_mut(reactor.index()) {
379 sources.retain(|&s| s != source);
380 }
381 }
382
383 #[inline]
390 pub fn mark(&mut self, source: DataSource) {
391 if let Some(subscribers) = self.interests.get(source.0) {
392 for &reactor_token in subscribers {
393 self.notify.mark(reactor_token);
394 }
395 }
396 }
397
398 #[inline]
400 pub(crate) fn poll(&mut self, events: &mut Events) {
401 self.notify.poll(events);
402 }
403
404 #[inline]
407 pub(crate) fn take_reactor(&mut self, idx: usize) -> Option<Box<dyn Reactor>> {
408 self.reactors.get_mut(idx).and_then(Option::take)
409 }
410
411 #[inline]
417 pub(crate) fn put_reactor(&mut self, idx: usize, reactor: Box<dyn Reactor>) {
418 self.reactors[idx] = Some(reactor);
419 }
420
421 pub fn remove_reactor(&mut self, token: Token) {
426 let idx = token.index();
427 if self.reactors.contains(idx) {
428 self.reactors.remove(idx);
429 if let Some(sources) = self.reactor_sources.get_mut(idx) {
431 for &source in sources.iter() {
432 if let Some(subscribers) = self.interests.get_mut(source.0) {
433 subscribers.retain(|&t| t != token);
434 }
435 }
436 sources.clear();
437 }
438 }
439 }
440
441 pub fn has_notified(&self) -> bool {
445 self.notify.has_notified()
446 }
447
448 pub fn notified_count(&self) -> usize {
450 self.notify.notified_count()
451 }
452
453 pub fn source_count(&self) -> usize {
455 self.interests.len()
456 }
457
458 pub fn reactor_count(&self) -> usize {
460 self.reactors.len()
461 }
462}
463
464impl Resource for ReactorNotify {}
465
466pub struct ReactorRegistration<'a> {
472 token: Token,
473 notify: &'a mut ReactorNotify,
474}
475
476impl ReactorRegistration<'_> {
477 pub fn subscribe(self, source: DataSource) -> Self {
479 self.notify.subscribe(self.token, source);
480 self
481 }
482
483 pub fn token(&self) -> Token {
485 self.token
486 }
487}
488
489pub trait Reactor: Send {
503 fn run(&mut self, world: &mut World);
505
506 fn name(&self) -> &'static str {
508 "<unnamed>"
509 }
510}
511
512pub struct ReactorFn<C, F, Params: Param> {
522 pub ctx: C,
524 f: F,
525 state: Params::State,
526 name: &'static str,
527}
528
529pub struct PipelineReactor<C> {
556 pub ctx: C,
558 body: Box<dyn CtxStepCall<C, (), Out = ()> + Send>,
559}
560
561impl<C: Send + 'static> PipelineReactor<C> {
562 pub fn new(ctx: C, body: impl CtxStepCall<C, (), Out = ()> + Send + 'static) -> Self {
568 Self {
569 ctx,
570 body: Box::new(body),
571 }
572 }
573}
574
575impl<C: Send + 'static> Reactor for PipelineReactor<C> {
576 fn run(&mut self, world: &mut World) {
577 self.body.call(&mut self.ctx, world, ());
578 }
579
580 fn name(&self) -> &'static str {
581 std::any::type_name::<C>()
582 }
583}
584
585#[diagnostic::on_unimplemented(
605 message = "this function cannot be used as a reactor step",
606 note = "reactor step signature: `fn(&mut C, Params...)` — context first, then resources",
607 note = "closures with resource parameters are not supported — use a named `fn`"
608)]
609pub trait IntoReactor<C, Params> {
610 type Reactor: Reactor + 'static;
612
613 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor;
616}
617
618impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> Reactor for ReactorFn<C, F, ()> {
623 fn run(&mut self, _world: &mut World) {
624 (self.f)(&mut self.ctx);
625 }
626
627 fn name(&self) -> &'static str {
628 self.name
629 }
630}
631
632impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> IntoReactor<C, ()> for F {
633 type Reactor = ReactorFn<C, F, ()>;
634
635 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
636 ReactorFn {
637 ctx,
638 f: self,
639 state: <() as Param>::init(registry),
640 name: std::any::type_name::<F>(),
641 }
642 }
643}
644
645macro_rules! impl_into_reactor {
650 ($($P:ident),+) => {
651 impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
652 Reactor for ReactorFn<C, F, ($($P,)+)>
653 where
654 for<'a> &'a mut F:
655 FnMut(&mut C, $($P,)+) +
656 FnMut(&mut C, $($P::Item<'a>,)+),
657 {
658 #[allow(non_snake_case)]
659 fn run(&mut self, world: &mut World) {
660 fn call_inner<Ctx, $($P,)+>(
661 mut f: impl FnMut(&mut Ctx, $($P,)+),
662 ctx: &mut Ctx,
663 $($P: $P,)+
664 ) {
665 f(ctx, $($P,)+);
666 }
667
668 #[cfg(debug_assertions)]
672 world.clear_borrows();
673 let ($($P,)+) = unsafe {
674 <($($P,)+) as Param>::fetch(world, &mut self.state)
675 };
676 call_inner(&mut self.f, &mut self.ctx, $($P,)+);
677 }
678
679 fn name(&self) -> &'static str {
680 self.name
681 }
682 }
683
684 impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
685 IntoReactor<C, ($($P,)+)> for F
686 where
687 for<'a> &'a mut F:
688 FnMut(&mut C, $($P,)+) +
689 FnMut(&mut C, $($P::Item<'a>,)+),
690 {
691 type Reactor = ReactorFn<C, F, ($($P,)+)>;
692
693 fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
694 let state = <($($P,)+) as Param>::init(registry);
695 {
696 #[allow(non_snake_case)]
697 let ($($P,)+) = &state;
698 registry.check_access(&[
699 $(
700 (<$P as Param>::resource_id($P),
701 std::any::type_name::<$P>()),
702 )+
703 ]);
704 }
705 ReactorFn {
706 ctx,
707 f: self,
708 state,
709 name: std::any::type_name::<F>(),
710 }
711 }
712 }
713 };
714}
715
716all_tuples!(impl_into_reactor);
717
718#[derive(Default)]
728pub struct DeferredRemovals {
729 tokens: Vec<Token>,
730}
731
732impl DeferredRemovals {
733 pub fn deregister(&mut self, token: Token) {
738 self.tokens.push(token);
739 }
740
741 #[inline]
744 pub(crate) fn take(&mut self) -> Vec<Token> {
745 std::mem::take(&mut self.tokens)
746 }
747
748 #[inline]
750 pub(crate) fn put(&mut self, tokens: Vec<Token>) {
751 debug_assert!(tokens.is_empty(), "put() expects a drained Vec");
752 self.tokens = tokens;
753 }
754
755 pub fn is_empty(&self) -> bool {
757 self.tokens.is_empty()
758 }
759}
760
761impl Resource for DeferredRemovals {}
762
763#[derive(Default)]
793pub struct SourceRegistry {
794 maps: FxHashMap<TypeId, Box<dyn Any + Send>>,
795}
796
797impl SourceRegistry {
798 pub fn new() -> Self {
800 Self::default()
801 }
802
803 pub fn insert<K: Hash + Eq + Send + 'static>(&mut self, key: K, source: DataSource) {
807 self.get_or_create_map::<K>().insert(key, source);
808 }
809
810 pub fn get<K: Hash + Eq + Send + 'static>(&self, key: &K) -> Option<DataSource> {
814 self.get_map::<K>().and_then(|map| map.get(key)).copied()
815 }
816
817 pub fn remove<K: Hash + Eq + Send + 'static>(&mut self, key: &K) -> Option<DataSource> {
820 self.get_map_mut::<K>().and_then(|map| map.remove(key))
821 }
822
823 pub fn contains<K: Hash + Eq + Send + 'static>(&self, key: &K) -> bool {
825 self.get_map::<K>().is_some_and(|map| map.contains_key(key))
826 }
827
828 fn get_map<K: Hash + Eq + Send + 'static>(&self) -> Option<&FxHashMap<K, DataSource>> {
829 self.maps.get(&TypeId::of::<K>()).map(|boxed| {
830 boxed
832 .downcast_ref::<FxHashMap<K, DataSource>>()
833 .expect("invariant: TypeId matches stored map type")
834 })
835 }
836
837 fn get_map_mut<K: Hash + Eq + Send + 'static>(
838 &mut self,
839 ) -> Option<&mut FxHashMap<K, DataSource>> {
840 self.maps.get_mut(&TypeId::of::<K>()).map(|boxed| {
841 boxed
843 .downcast_mut::<FxHashMap<K, DataSource>>()
844 .expect("invariant: TypeId matches stored map type")
845 })
846 }
847
848 fn get_or_create_map<K: Hash + Eq + Send + 'static>(
849 &mut self,
850 ) -> &mut FxHashMap<K, DataSource> {
851 self.maps
852 .entry(TypeId::of::<K>())
853 .or_insert_with(|| Box::<FxHashMap<K, DataSource>>::default())
854 .downcast_mut::<FxHashMap<K, DataSource>>()
856 .unwrap()
857 }
858}
859
860impl Resource for SourceRegistry {}
861
862pub struct ReactorSystem {
873 events: Events,
875
876 notify_id: ResourceId,
878 removals_id: ResourceId,
879}
880
881impl ReactorSystem {
882 pub fn new(world: &World) -> Self {
886 Self {
887 events: Events::with_capacity(256),
888 notify_id: world.id::<ReactorNotify>(),
889 removals_id: world.id::<DeferredRemovals>(),
890 }
891 }
892
893 pub fn dispatch(&mut self, world: &mut World) -> bool {
901 let notify_ptr: *mut ReactorNotify =
904 unsafe { world.get_mut::<ReactorNotify>(self.notify_id) };
905
906 {
908 let notify = unsafe { &mut *notify_ptr };
909 notify.poll(&mut self.events);
910 }
911 let ran = !self.events.is_empty();
912
913 for token in self.events.iter() {
916 let idx = token.index();
917 let reactor = {
920 let notify = unsafe { &mut *notify_ptr };
921 notify.take_reactor(idx)
922 };
923 if let Some(mut reactor) = reactor {
924 reactor.run(world);
925 let notify = unsafe { &mut *notify_ptr };
928 notify.put_reactor(idx, reactor);
929 }
930 }
931
932 let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
936 let mut pending = removals.take();
937 if !pending.is_empty() {
938 let notify = unsafe { &mut *notify_ptr };
940 while let Some(token) = pending.pop() {
941 notify.remove_reactor(token);
942 }
943 }
944 let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
946 removals.put(pending);
947
948 ran
949 }
950
951 pub fn reactor_count(&self, world: &World) -> usize {
953 world.resource::<ReactorNotify>().reactor_count()
954 }
955}
956
957#[cfg(test)]
962mod tests {
963 use super::*;
964 use crate::{Res, ResMut, WorldBuilder};
965
966 #[test]
969 fn reactor_fn_arity0() {
970 let wb = WorldBuilder::new();
971 let mut world = wb.build();
972 let reg = world.registry();
973
974 struct Ctx {
975 count: u32,
976 }
977
978 fn step(ctx: &mut Ctx) {
979 ctx.count += 1;
980 }
981
982 let mut reactor = step.into_reactor(Ctx { count: 0 }, reg);
983 reactor.run(&mut world);
984 assert_eq!(reactor.ctx.count, 1);
985 }
986
987 #[test]
988 fn reactor_fn_with_params() {
989 let mut wb = WorldBuilder::new();
990 wb.register::<u64>(10);
991 wb.register::<u32>(0);
992 let mut world = wb.build();
993 let reg = world.registry();
994
995 struct Ctx {
996 multiplier: u64,
997 }
998
999 fn step(ctx: &mut Ctx, val: Res<u64>, mut out: ResMut<u32>) {
1000 *out = (*val * ctx.multiplier) as u32;
1001 }
1002
1003 let mut reactor = step.into_reactor(Ctx { multiplier: 5 }, reg);
1004 reactor.run(&mut world);
1005 assert_eq!(*world.resource::<u32>(), 50);
1006 }
1007
1008 fn dummy_reactor() -> ReactorFn<(), fn(&mut ()), ()> {
1011 ReactorFn {
1012 ctx: (),
1013 f: (|_: &mut ()| {}) as fn(&mut ()),
1014 state: (),
1015 name: "dummy",
1016 }
1017 }
1018
1019 #[test]
1020 fn reactor_notify_mark_fans_out() {
1021 let mut notify = ReactorNotify::new(4, 8);
1022 let mut events = Events::with_capacity(8);
1023
1024 let src = notify.register_source();
1025 let a1 = notify.register_built(dummy_reactor()).token();
1026 let a2 = notify.register_built(dummy_reactor()).token();
1027 let _a3 = notify.register_built(dummy_reactor()).token();
1028
1029 notify.subscribe(a1, src);
1030 notify.subscribe(a2, src);
1031 notify.mark(src);
1034 notify.notify.poll(&mut events);
1035
1036 assert_eq!(events.len(), 2);
1037 assert!(events.as_slice().contains(&a1));
1038 assert!(events.as_slice().contains(&a2));
1039 }
1040
1041 #[test]
1042 fn reactor_notify_dedup_across_sources() {
1043 let mut notify = ReactorNotify::new(4, 8);
1044 let mut events = Events::with_capacity(8);
1045
1046 let src1 = notify.register_source();
1047 let src2 = notify.register_source();
1048 let reactor = notify.register_built(dummy_reactor()).token();
1049
1050 notify.subscribe(reactor, src1);
1051 notify.subscribe(reactor, src2);
1052
1053 notify.mark(src1);
1054 notify.mark(src2);
1055
1056 notify.notify.poll(&mut events);
1057 assert_eq!(events.len(), 1);
1058 assert_eq!(events.as_slice()[0], reactor);
1059 }
1060
1061 #[test]
1062 fn reactor_notify_remove_reactor() {
1063 let mut notify = ReactorNotify::new(4, 8);
1064 let mut events = Events::with_capacity(8);
1065
1066 let src = notify.register_source();
1067
1068 struct Ctx;
1069 let token = notify
1070 .register_built(ReactorFn {
1071 ctx: Ctx,
1072 f: (|_: &mut Ctx| {}) as fn(&mut Ctx),
1073 state: (),
1074 name: "test",
1075 })
1076 .token();
1077 notify.subscribe(token, src);
1078
1079 notify.remove_reactor(token);
1080 notify.mark(src);
1081 notify.notify.poll(&mut events);
1082 assert!(events.is_empty());
1083 }
1084
1085 fn notify_mut(world: &World, id: ResourceId) -> &mut ReactorNotify {
1091 unsafe { world.get_mut::<ReactorNotify>(id) }
1092 }
1093
1094 #[test]
1095 fn reactor_system_dispatch() {
1096 let mut wb = WorldBuilder::new();
1097 wb.register::<u64>(0);
1098 wb.register(ReactorNotify::new(4, 8));
1099 wb.register(DeferredRemovals::default());
1100 let mut world = wb.build();
1101 let reg = world.registry();
1102 let nid = world.id::<ReactorNotify>();
1103
1104 let mut system = ReactorSystem::new(&world);
1105
1106 struct Ctx {
1107 _reactor_id: Token,
1108 increment: u64,
1109 }
1110
1111 fn step(ctx: &mut Ctx, mut val: ResMut<u64>) {
1112 *val += ctx.increment;
1113 }
1114
1115 let notify = notify_mut(&world, nid);
1116 let src = notify.register_source();
1117 notify
1118 .register(
1119 |t| Ctx {
1120 _reactor_id: t,
1121 increment: 10,
1122 },
1123 step,
1124 reg,
1125 )
1126 .subscribe(src);
1127 notify
1128 .register(
1129 |t| Ctx {
1130 _reactor_id: t,
1131 increment: 5,
1132 },
1133 step,
1134 reg,
1135 )
1136 .subscribe(src);
1137
1138 notify_mut(&world, nid).mark(src);
1140 let ran = system.dispatch(&mut world);
1141
1142 assert!(ran);
1143 assert_eq!(*world.resource::<u64>(), 15); }
1145
1146 #[test]
1147 fn reactor_system_deferred_removal() {
1148 let mut wb = WorldBuilder::new();
1149 wb.register::<u64>(0);
1150 wb.register(ReactorNotify::new(4, 8));
1151 wb.register(DeferredRemovals::default());
1152 let mut world = wb.build();
1153 let reg = world.registry();
1154 let nid = world.id::<ReactorNotify>();
1155
1156 let mut system = ReactorSystem::new(&world);
1157
1158 struct Ctx {
1159 reactor_id: Token,
1160 runs: u64,
1161 }
1162
1163 fn step(ctx: &mut Ctx, mut val: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1164 *val += 1;
1165 ctx.runs += 1;
1166 if ctx.runs >= 2 {
1167 removals.deregister(ctx.reactor_id);
1168 }
1169 }
1170
1171 let notify = notify_mut(&world, nid);
1172 let src = notify.register_source();
1173 notify
1174 .register(
1175 |t| Ctx {
1176 reactor_id: t,
1177 runs: 0,
1178 },
1179 step,
1180 reg,
1181 )
1182 .subscribe(src);
1183
1184 assert_eq!(system.reactor_count(&world), 1);
1185
1186 notify_mut(&world, nid).mark(src);
1188 system.dispatch(&mut world);
1189 assert_eq!(*world.resource::<u64>(), 1);
1190 assert_eq!(system.reactor_count(&world), 1);
1191
1192 notify_mut(&world, nid).mark(src);
1194 system.dispatch(&mut world);
1195 assert_eq!(*world.resource::<u64>(), 2);
1196 assert_eq!(system.reactor_count(&world), 0);
1197
1198 notify_mut(&world, nid).mark(src);
1200 let ran = system.dispatch(&mut world);
1201 assert!(!ran);
1202 assert_eq!(*world.resource::<u64>(), 2);
1203 }
1204
1205 #[test]
1206 fn reactor_system_only_subscribed_wake() {
1207 let mut wb = WorldBuilder::new();
1208 wb.register::<u64>(0);
1209 wb.register(ReactorNotify::new(4, 8));
1210 wb.register(DeferredRemovals::default());
1211 let mut world = wb.build();
1212 let reg = world.registry();
1213 let nid = world.id::<ReactorNotify>();
1214
1215 let mut system = ReactorSystem::new(&world);
1216
1217 struct Ctx {
1218 _reactor_id: Token,
1219 value: u64,
1220 }
1221
1222 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1223 *out += ctx.value;
1224 }
1225
1226 let notify = notify_mut(&world, nid);
1227 let btc = notify.register_source();
1228 let eth = notify.register_source();
1229
1230 notify
1231 .register(
1232 |t| Ctx {
1233 _reactor_id: t,
1234 value: 10,
1235 },
1236 step,
1237 reg,
1238 )
1239 .subscribe(btc);
1240 notify
1241 .register(
1242 |t| Ctx {
1243 _reactor_id: t,
1244 value: 100,
1245 },
1246 step,
1247 reg,
1248 )
1249 .subscribe(eth);
1250
1251 notify_mut(&world, nid).mark(btc);
1253 system.dispatch(&mut world);
1254 assert_eq!(*world.resource::<u64>(), 10);
1255
1256 notify_mut(&world, nid).mark(eth);
1258 system.dispatch(&mut world);
1259 assert_eq!(*world.resource::<u64>(), 110);
1260 }
1261
1262 #[test]
1263 fn runtime_registration() {
1264 let mut wb = WorldBuilder::new();
1265 wb.register::<u64>(0);
1266 wb.register(ReactorNotify::new(4, 8));
1267 wb.register(DeferredRemovals::default());
1268 let mut world = wb.build();
1269 let nid = world.id::<ReactorNotify>();
1270
1271 let mut system = ReactorSystem::new(&world);
1272
1273 struct Ctx {
1274 _reactor_id: Token,
1275 value: u64,
1276 }
1277
1278 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1279 *out += ctx.value;
1280 }
1281
1282 let src = {
1284 let reg = world.registry();
1285 let notify = notify_mut(&world, nid);
1286 let src = notify.register_source();
1287 notify
1288 .register(
1289 |t| Ctx {
1290 _reactor_id: t,
1291 value: 10,
1292 },
1293 step,
1294 reg,
1295 )
1296 .subscribe(src);
1297 src
1298 };
1299
1300 notify_mut(&world, nid).mark(src);
1302 system.dispatch(&mut world);
1303 assert_eq!(*world.resource::<u64>(), 10);
1304
1305 {
1307 let reg = world.registry();
1308 notify_mut(&world, nid)
1309 .register(
1310 |t| Ctx {
1311 _reactor_id: t,
1312 value: 100,
1313 },
1314 step,
1315 reg,
1316 )
1317 .subscribe(src);
1318 }
1319
1320 notify_mut(&world, nid).mark(src);
1322 system.dispatch(&mut world);
1323 assert_eq!(*world.resource::<u64>(), 120); }
1325
1326 #[test]
1327 fn register_after_remove_reuses_key() {
1328 let mut wb = WorldBuilder::new();
1329 wb.register::<u64>(0);
1330 wb.register(ReactorNotify::new(4, 8));
1331 wb.register(DeferredRemovals::default());
1332 let mut world = wb.build();
1333 let nid = world.id::<ReactorNotify>();
1334
1335 let mut system = ReactorSystem::new(&world);
1336
1337 struct Ctx {
1338 reactor_id: Token,
1339 value: u64,
1340 }
1341
1342 fn step(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1343 *out += ctx.value;
1344 if ctx.value == 10 {
1345 removals.deregister(ctx.reactor_id);
1346 }
1347 }
1348
1349 let src = {
1350 let reg = world.registry();
1351 let notify = notify_mut(&world, nid);
1352 let src = notify.register_source();
1353 notify
1354 .register(
1355 |t| Ctx {
1356 reactor_id: t,
1357 value: 10,
1358 },
1359 step,
1360 reg,
1361 )
1362 .subscribe(src);
1363 src
1364 };
1365
1366 notify_mut(&world, nid).mark(src);
1368 system.dispatch(&mut world);
1369 assert_eq!(*world.resource::<u64>(), 10);
1370 assert_eq!(system.reactor_count(&world), 0);
1371
1372 {
1374 let reg = world.registry();
1375 let notify = notify_mut(&world, nid);
1376 let token = notify
1377 .register(
1378 |t| Ctx {
1379 reactor_id: t,
1380 value: 100,
1381 },
1382 step,
1383 reg,
1384 )
1385 .token();
1386 notify.subscribe(token, src);
1387 assert_eq!(token.index(), 0); }
1389
1390 notify_mut(&world, nid).mark(src);
1392 system.dispatch(&mut world);
1393 assert_eq!(*world.resource::<u64>(), 110); assert_eq!(system.reactor_count(&world), 1); }
1396
1397 #[test]
1398 fn reactor_can_access_actor_notify() {
1399 let mut wb = WorldBuilder::new();
1402 wb.register::<u64>(0);
1403 wb.register(ReactorNotify::new(4, 8));
1404 wb.register(DeferredRemovals::default());
1405 let mut world = wb.build();
1406 let nid = world.id::<ReactorNotify>();
1407
1408 let mut system = ReactorSystem::new(&world);
1409
1410 struct Ctx {
1411 _reactor_id: Token,
1412 }
1413
1414 fn step(_ctx: &mut Ctx, notify: ResMut<ReactorNotify>, mut out: ResMut<u64>) {
1415 *out = notify.reactor_count() as u64;
1417 }
1418
1419 let src = {
1420 let reg = world.registry();
1421 let notify = notify_mut(&world, nid);
1422 let src = notify.register_source();
1423 notify
1424 .register(|t| Ctx { _reactor_id: t }, step, reg)
1425 .subscribe(src);
1426 src
1427 };
1428
1429 notify_mut(&world, nid).mark(src);
1430 system.dispatch(&mut world);
1431 }
1437
1438 #[test]
1441 fn multi_instrument_with_shared_source() {
1442 let mut wb = WorldBuilder::new();
1445 wb.register::<u64>(0);
1446 wb.register(ReactorNotify::new(8, 16));
1447 wb.register(DeferredRemovals::default());
1448 let mut world = wb.build();
1449 let nid = world.id::<ReactorNotify>();
1450 let mut system = ReactorSystem::new(&world);
1451
1452 struct Ctx {
1453 _reactor_id: Token,
1454 instrument: &'static str,
1455 }
1456
1457 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1458 *out += match ctx.instrument {
1460 "BTC" => 100,
1461 "ETH" => 10,
1462 "SOL" => 1,
1463 _ => 0,
1464 };
1465 }
1466
1467 let (btc_md, eth_md, sol_md, positions) = {
1468 let reg = world.registry();
1469 let notify = notify_mut(&world, nid);
1470
1471 let btc_md = notify.register_source();
1473 let eth_md = notify.register_source();
1474 let sol_md = notify.register_source();
1475 let positions = notify.register_source();
1477
1478 notify
1480 .register(
1481 |t| Ctx {
1482 _reactor_id: t,
1483 instrument: "BTC",
1484 },
1485 step,
1486 reg,
1487 )
1488 .subscribe(btc_md)
1489 .subscribe(positions);
1490
1491 notify
1493 .register(
1494 |t| Ctx {
1495 _reactor_id: t,
1496 instrument: "ETH",
1497 },
1498 step,
1499 reg,
1500 )
1501 .subscribe(eth_md)
1502 .subscribe(positions);
1503
1504 notify
1506 .register(
1507 |t| Ctx {
1508 _reactor_id: t,
1509 instrument: "SOL",
1510 },
1511 step,
1512 reg,
1513 )
1514 .subscribe(sol_md)
1515 .subscribe(positions);
1516
1517 (btc_md, eth_md, sol_md, positions)
1518 };
1519
1520 notify_mut(&world, nid).mark(btc_md);
1522 system.dispatch(&mut world);
1523 assert_eq!(*world.resource::<u64>(), 100);
1524
1525 notify_mut(&world, nid).mark(positions);
1527 system.dispatch(&mut world);
1528 assert_eq!(*world.resource::<u64>(), 211); notify_mut(&world, nid).mark(btc_md);
1532 notify_mut(&world, nid).mark(eth_md);
1533 system.dispatch(&mut world);
1534 assert_eq!(*world.resource::<u64>(), 321); notify_mut(&world, nid).mark(btc_md);
1538 notify_mut(&world, nid).mark(positions);
1539 system.dispatch(&mut world);
1540 assert_eq!(*world.resource::<u64>(), 432); let ran = system.dispatch(&mut world);
1545 assert!(!ran);
1546 assert_eq!(*world.resource::<u64>(), 432);
1547
1548 notify_mut(&world, nid).mark(sol_md);
1550 system.dispatch(&mut world);
1551 assert_eq!(*world.resource::<u64>(), 433);
1552 }
1553
1554 #[test]
1555 fn per_reactor_fill_routing() {
1556 use std::collections::HashMap;
1560
1561 let mut wb = WorldBuilder::new();
1562 wb.register::<u64>(0);
1563 wb.register(ReactorNotify::new(8, 16));
1564 wb.register(DeferredRemovals::default());
1565 let mut world = wb.build();
1566 let nid = world.id::<ReactorNotify>();
1567 let mut system = ReactorSystem::new(&world);
1568
1569 struct Ctx {
1570 reactor_id: Token,
1571 }
1572
1573 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1574 *out += ctx.reactor_id.index() as u64 + 1;
1575 }
1576
1577 let mut fill_sources: HashMap<usize, DataSource> = HashMap::new();
1579
1580 {
1581 let reg = world.registry();
1582 let notify = notify_mut(&world, nid);
1583
1584 for _ in 0..3 {
1585 let fill_src = notify.register_source();
1587 let token = notify
1588 .register(|t| Ctx { reactor_id: t }, step, reg)
1589 .subscribe(fill_src)
1590 .token();
1591
1592 fill_sources.insert(token.index(), fill_src);
1593 }
1594 }
1595
1596 let wire_client_id: usize = 1;
1598 let fill_source = fill_sources[&wire_client_id];
1599 notify_mut(&world, nid).mark(fill_source);
1600 system.dispatch(&mut world);
1601 assert_eq!(*world.resource::<u64>(), 2);
1603
1604 let fill_source = fill_sources[&0];
1606 notify_mut(&world, nid).mark(fill_source);
1607 system.dispatch(&mut world);
1608 assert_eq!(*world.resource::<u64>(), 3);
1610 }
1611
1612 #[test]
1613 fn dynamic_source_registration() {
1614 let mut wb = WorldBuilder::new();
1617 wb.register::<u64>(0);
1618 wb.register(ReactorNotify::new(4, 8));
1619 wb.register(DeferredRemovals::default());
1620 let mut world = wb.build();
1621 let nid = world.id::<ReactorNotify>();
1622 let mut system = ReactorSystem::new(&world);
1623
1624 struct Ctx {
1625 _reactor_id: Token,
1626 value: u64,
1627 }
1628
1629 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1630 *out += ctx.value;
1631 }
1632
1633 let btc_md = {
1635 let reg = world.registry();
1636 let notify = notify_mut(&world, nid);
1637 let btc_md = notify.register_source();
1638 notify
1639 .register(
1640 |t| Ctx {
1641 _reactor_id: t,
1642 value: 10,
1643 },
1644 step,
1645 reg,
1646 )
1647 .subscribe(btc_md);
1648 btc_md
1649 };
1650
1651 notify_mut(&world, nid).mark(btc_md);
1652 system.dispatch(&mut world);
1653 assert_eq!(*world.resource::<u64>(), 10);
1654
1655 let eth_md = {
1657 let reg = world.registry();
1658 let notify = notify_mut(&world, nid);
1659 let eth_md = notify.register_source();
1660 notify
1661 .register(
1662 |t| Ctx {
1663 _reactor_id: t,
1664 value: 100,
1665 },
1666 step,
1667 reg,
1668 )
1669 .subscribe(eth_md);
1670 eth_md
1671 };
1672
1673 notify_mut(&world, nid).mark(btc_md);
1675 notify_mut(&world, nid).mark(eth_md);
1676 system.dispatch(&mut world);
1677 assert_eq!(*world.resource::<u64>(), 120); }
1679
1680 #[test]
1683 fn remove_source_and_reuse_slot() {
1684 let mut wb = WorldBuilder::new();
1685 wb.register::<u64>(0);
1686 wb.register(ReactorNotify::new(4, 8));
1687 wb.register(DeferredRemovals::default());
1688 let mut world = wb.build();
1689 let nid = world.id::<ReactorNotify>();
1690 let mut system = ReactorSystem::new(&world);
1691
1692 struct Ctx {
1693 _reactor_id: Token,
1694 value: u64,
1695 }
1696
1697 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1698 *out += ctx.value;
1699 }
1700
1701 let (src_a, src_b) = {
1703 let reg = world.registry();
1704 let notify = notify_mut(&world, nid);
1705 let src_a = notify.register_source();
1706 let src_b = notify.register_source();
1707 notify
1708 .register(
1709 |t| Ctx {
1710 _reactor_id: t,
1711 value: 10,
1712 },
1713 step,
1714 reg,
1715 )
1716 .subscribe(src_a);
1717 notify
1718 .register(
1719 |t| Ctx {
1720 _reactor_id: t,
1721 value: 100,
1722 },
1723 step,
1724 reg,
1725 )
1726 .subscribe(src_b);
1727 (src_a, src_b)
1728 };
1729
1730 notify_mut(&world, nid).remove_source(src_a);
1732
1733 notify_mut(&world, nid).mark(src_a);
1735 let ran = system.dispatch(&mut world);
1736 assert!(!ran);
1737
1738 notify_mut(&world, nid).mark(src_b);
1740 system.dispatch(&mut world);
1741 assert_eq!(*world.resource::<u64>(), 100);
1742
1743 let src_c = notify_mut(&world, nid).register_source();
1745 assert_eq!(src_c.0, src_a.0); let reg = world.registry();
1749 let notify = notify_mut(&world, nid);
1750 notify
1751 .register(
1752 |t| Ctx {
1753 _reactor_id: t,
1754 value: 1,
1755 },
1756 step,
1757 reg,
1758 )
1759 .subscribe(src_c);
1760
1761 notify_mut(&world, nid).mark(src_c);
1762 system.dispatch(&mut world);
1763 assert_eq!(*world.resource::<u64>(), 101); }
1765
1766 #[test]
1769 fn source_registry_basic() {
1770 let mut registry = SourceRegistry::new();
1771
1772 #[derive(Hash, Eq, PartialEq, Debug)]
1773 struct InstrumentId(u32);
1774
1775 let src_a = DataSource(0);
1776 let src_b = DataSource(1);
1777
1778 registry.insert(InstrumentId(1), src_a);
1779 registry.insert(InstrumentId(2), src_b);
1780
1781 assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1782 assert_eq!(registry.get(&InstrumentId(2)), Some(src_b));
1783 assert_eq!(registry.get(&InstrumentId(3)), None);
1784 assert!(registry.contains(&InstrumentId(1)));
1785 assert!(!registry.contains(&InstrumentId(3)));
1786 }
1787
1788 #[test]
1789 fn source_registry_multiple_key_types() {
1790 let mut registry = SourceRegistry::new();
1791
1792 #[derive(Hash, Eq, PartialEq)]
1793 struct InstrumentId(u32);
1794
1795 #[derive(Hash, Eq, PartialEq)]
1796 struct StrategyId(u32);
1797
1798 let src_a = DataSource(0);
1799 let src_b = DataSource(1);
1800
1801 registry.insert(InstrumentId(1), src_a);
1803 registry.insert(StrategyId(1), src_b);
1804
1805 assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1807 assert_eq!(registry.get(&StrategyId(1)), Some(src_b));
1808 }
1809
1810 #[test]
1811 fn source_registry_tuple_keys() {
1812 let mut registry = SourceRegistry::new();
1813
1814 let src = DataSource(42);
1815 registry.insert(("BTC", "Binance"), src);
1816
1817 assert_eq!(registry.get(&("BTC", "Binance")), Some(src));
1818 assert_eq!(registry.get(&("ETH", "Binance")), None);
1819 }
1820
1821 #[test]
1822 fn source_registry_remove() {
1823 let mut registry = SourceRegistry::new();
1824
1825 let src = DataSource(0);
1826 registry.insert(42u64, src);
1827
1828 assert_eq!(registry.remove(&42u64), Some(src));
1829 assert_eq!(registry.get(&42u64), None);
1830 assert_eq!(registry.remove(&42u64), None); }
1832
1833 #[test]
1834 fn source_registry_integrated_with_reactor_system() {
1835 let mut wb = WorldBuilder::new();
1836 wb.register::<u64>(0);
1837 wb.register(ReactorNotify::new(4, 8));
1838 wb.register(DeferredRemovals::default());
1839 wb.register(SourceRegistry::new());
1840 let mut world = wb.build();
1841 let nid = world.id::<ReactorNotify>();
1842 let mut system = ReactorSystem::new(&world);
1843
1844 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1845 struct Instrument(u32);
1846 const BTC: Instrument = Instrument(0);
1847 const ETH: Instrument = Instrument(1);
1848
1849 struct Ctx {
1850 _reactor_id: Token,
1851 value: u64,
1852 }
1853
1854 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1855 *out += ctx.value;
1856 }
1857
1858 let btc_src = notify_mut(&world, nid).register_source();
1860 let eth_src = notify_mut(&world, nid).register_source();
1861
1862 world.resource_mut::<SourceRegistry>().insert(BTC, btc_src);
1863 world.resource_mut::<SourceRegistry>().insert(ETH, eth_src);
1864
1865 {
1867 let reg = world.registry();
1868 let btc = world.resource::<SourceRegistry>().get(&BTC).unwrap();
1869 let notify = notify_mut(&world, nid);
1870 notify
1871 .register(
1872 |t| Ctx {
1873 _reactor_id: t,
1874 value: 10,
1875 },
1876 step,
1877 reg,
1878 )
1879 .subscribe(btc);
1880 }
1881
1882 notify_mut(&world, nid).mark(btc_src);
1884 system.dispatch(&mut world);
1885 assert_eq!(*world.resource::<u64>(), 10);
1886
1887 let removed = world.resource_mut::<SourceRegistry>().remove(&BTC);
1889 assert!(removed.is_some());
1890 notify_mut(&world, nid).remove_source(removed.unwrap());
1891
1892 notify_mut(&world, nid).mark(btc_src);
1894 let ran = system.dispatch(&mut world);
1895 assert!(!ran);
1896 }
1897
1898 #[test]
1901 fn source_registry_overwrite_key() {
1902 let mut registry = SourceRegistry::new();
1903 let src_a = DataSource(0);
1904 let src_b = DataSource(1);
1905
1906 registry.insert(42u32, src_a);
1907 assert_eq!(registry.get(&42u32), Some(src_a));
1908
1909 registry.insert(42u32, src_b);
1911 assert_eq!(registry.get(&42u32), Some(src_b));
1912 }
1913
1914 #[test]
1915 fn source_registry_empty_get() {
1916 let registry = SourceRegistry::new();
1917 assert_eq!(registry.get(&42u32), None);
1919 assert!(!registry.contains(&42u32));
1920 }
1921
1922 #[test]
1923 fn source_registry_enum_keys() {
1924 #[derive(Hash, Eq, PartialEq)]
1925 enum Venue {
1926 Binance,
1927 Coinbase,
1928 }
1929
1930 let mut registry = SourceRegistry::new();
1931 let src = DataSource(0);
1932 registry.insert(Venue::Binance, src);
1933
1934 assert_eq!(registry.get(&Venue::Binance), Some(src));
1935 assert_eq!(registry.get(&Venue::Coinbase), None);
1936 }
1937
1938 #[test]
1939 fn source_registry_composite_key() {
1940 #[derive(Hash, Eq, PartialEq)]
1942 struct StrategyId(u32);
1943 #[derive(Hash, Eq, PartialEq)]
1944 struct InstrumentId(u32);
1945 #[derive(Hash, Eq, PartialEq)]
1946 struct VenueId(u32);
1947
1948 let mut registry = SourceRegistry::new();
1949 let src = DataSource(5);
1950 registry.insert((StrategyId(1), InstrumentId(0), VenueId(2)), src);
1951
1952 assert_eq!(
1953 registry.get(&(StrategyId(1), InstrumentId(0), VenueId(2))),
1954 Some(src)
1955 );
1956 assert_eq!(
1958 registry.get(&(StrategyId(2), InstrumentId(0), VenueId(2))),
1959 None
1960 );
1961 }
1962
1963 #[test]
1966 fn full_lifecycle_add_trade_remove() {
1967 let mut wb = WorldBuilder::new();
1969 wb.register::<u64>(0);
1970 wb.register(ReactorNotify::new(4, 8));
1971 wb.register(DeferredRemovals::default());
1972 wb.register(SourceRegistry::new());
1973 let mut world = wb.build();
1974 let nid = world.id::<ReactorNotify>();
1975 let mut system = ReactorSystem::new(&world);
1976
1977 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1978 struct Instrument(u32);
1979
1980 struct Ctx {
1981 _reactor_id: Token,
1982 value: u64,
1983 }
1984
1985 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1986 *out += ctx.value;
1987 }
1988
1989 let btc_src = notify_mut(&world, nid).register_source();
1991 world
1992 .resource_mut::<SourceRegistry>()
1993 .insert(Instrument(0), btc_src);
1994
1995 {
1996 let reg = world.registry();
1997 let notify = notify_mut(&world, nid);
1998 notify
1999 .register(
2000 |t| Ctx {
2001 _reactor_id: t,
2002 value: 10,
2003 },
2004 step,
2005 reg,
2006 )
2007 .subscribe(btc_src);
2008 }
2009
2010 notify_mut(&world, nid).mark(btc_src);
2012 system.dispatch(&mut world);
2013 assert_eq!(*world.resource::<u64>(), 10);
2014
2015 let eth_src = notify_mut(&world, nid).register_source();
2017 world
2018 .resource_mut::<SourceRegistry>()
2019 .insert(Instrument(1), eth_src);
2020
2021 {
2022 let reg = world.registry();
2023 let notify = notify_mut(&world, nid);
2024 notify
2025 .register(
2026 |t| Ctx {
2027 _reactor_id: t,
2028 value: 100,
2029 },
2030 step,
2031 reg,
2032 )
2033 .subscribe(eth_src);
2034 }
2035
2036 notify_mut(&world, nid).mark(btc_src);
2038 notify_mut(&world, nid).mark(eth_src);
2039 system.dispatch(&mut world);
2040 assert_eq!(*world.resource::<u64>(), 120);
2041
2042 let removed = world
2044 .resource_mut::<SourceRegistry>()
2045 .remove(&Instrument(0));
2046 notify_mut(&world, nid).remove_source(removed.unwrap());
2047
2048 notify_mut(&world, nid).mark(eth_src);
2050 system.dispatch(&mut world);
2051 assert_eq!(*world.resource::<u64>(), 220);
2052
2053 let sol_src = notify_mut(&world, nid).register_source();
2055 world
2056 .resource_mut::<SourceRegistry>()
2057 .insert(Instrument(2), sol_src);
2058 assert_eq!(sol_src.0, btc_src.0); {
2061 let reg = world.registry();
2062 let notify = notify_mut(&world, nid);
2063 notify
2064 .register(
2065 |t| Ctx {
2066 _reactor_id: t,
2067 value: 1000,
2068 },
2069 step,
2070 reg,
2071 )
2072 .subscribe(sol_src);
2073 }
2074
2075 notify_mut(&world, nid).mark(sol_src);
2077 notify_mut(&world, nid).mark(eth_src);
2078 system.dispatch(&mut world);
2079 assert_eq!(*world.resource::<u64>(), 1320); }
2081
2082 #[test]
2083 fn multi_strategy_same_instrument() {
2084 let mut wb = WorldBuilder::new();
2086 wb.register::<u64>(0);
2087 wb.register(ReactorNotify::new(8, 16));
2088 wb.register(DeferredRemovals::default());
2089 wb.register(SourceRegistry::new());
2090 let mut world = wb.build();
2091 let nid = world.id::<ReactorNotify>();
2092 let mut system = ReactorSystem::new(&world);
2093
2094 #[derive(Hash, Eq, PartialEq, Clone, Copy)]
2095 struct StrategyInstrument(&'static str, &'static str);
2096
2097 struct Ctx {
2098 _reactor_id: Token,
2099 value: u64,
2100 }
2101
2102 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2103 *out += ctx.value;
2104 }
2105
2106 let reg = world.registry();
2108 let notify = notify_mut(&world, nid);
2109
2110 let mm_btc = notify.register_source();
2111 let mm_eth = notify.register_source();
2112 let arb_btc = notify.register_source();
2113
2114 notify
2116 .register(
2117 |t| Ctx {
2118 _reactor_id: t,
2119 value: 1,
2120 },
2121 step,
2122 reg,
2123 )
2124 .subscribe(mm_btc);
2125 notify
2126 .register(
2127 |t| Ctx {
2128 _reactor_id: t,
2129 value: 2,
2130 },
2131 step,
2132 reg,
2133 )
2134 .subscribe(mm_eth);
2135
2136 notify
2138 .register(
2139 |t| Ctx {
2140 _reactor_id: t,
2141 value: 100,
2142 },
2143 step,
2144 reg,
2145 )
2146 .subscribe(arb_btc);
2147
2148 world
2150 .resource_mut::<SourceRegistry>()
2151 .insert(StrategyInstrument("MM", "BTC"), mm_btc);
2152 world
2153 .resource_mut::<SourceRegistry>()
2154 .insert(StrategyInstrument("MM", "ETH"), mm_eth);
2155 world
2156 .resource_mut::<SourceRegistry>()
2157 .insert(StrategyInstrument("ARB", "BTC"), arb_btc);
2158
2159 let mm_btc_src = world
2162 .resource::<SourceRegistry>()
2163 .get(&StrategyInstrument("MM", "BTC"))
2164 .unwrap();
2165 let arb_btc_src = world
2166 .resource::<SourceRegistry>()
2167 .get(&StrategyInstrument("ARB", "BTC"))
2168 .unwrap();
2169
2170 notify_mut(&world, nid).mark(mm_btc_src);
2171 notify_mut(&world, nid).mark(arb_btc_src);
2172 system.dispatch(&mut world);
2173 assert_eq!(*world.resource::<u64>(), 101); }
2175
2176 #[test]
2177 fn reactor_self_removal_with_registry_cleanup() {
2178 let mut wb = WorldBuilder::new();
2180 wb.register::<u64>(0);
2181 wb.register(ReactorNotify::new(4, 8));
2182 wb.register(DeferredRemovals::default());
2183 wb.register(SourceRegistry::new());
2184 let mut world = wb.build();
2185 let nid = world.id::<ReactorNotify>();
2186 let mut system = ReactorSystem::new(&world);
2187
2188 struct Ctx {
2189 reactor_id: Token,
2190 }
2191
2192 fn one_shot(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
2193 *out += 1;
2194 removals.deregister(ctx.reactor_id);
2195 }
2196
2197 let src = notify_mut(&world, nid).register_source();
2198 world
2199 .resource_mut::<SourceRegistry>()
2200 .insert("one-shot", src);
2201
2202 {
2203 let reg = world.registry();
2204 let notify = notify_mut(&world, nid);
2205 notify
2206 .register(|t| Ctx { reactor_id: t }, one_shot, reg)
2207 .subscribe(src);
2208 }
2209
2210 notify_mut(&world, nid).mark(src);
2212 system.dispatch(&mut world);
2213 assert_eq!(*world.resource::<u64>(), 1);
2214 assert_eq!(system.reactor_count(&world), 0);
2215
2216 assert!(world.resource::<SourceRegistry>().contains(&"one-shot"));
2218
2219 notify_mut(&world, nid).mark(src);
2221 let ran = system.dispatch(&mut world);
2222 assert!(!ran);
2223 }
2224
2225 #[test]
2226 fn many_reactors_same_source() {
2227 let mut wb = WorldBuilder::new();
2229 wb.register::<u64>(0);
2230 wb.register(ReactorNotify::new(4, 64));
2231 wb.register(DeferredRemovals::default());
2232 let mut world = wb.build();
2233 let nid = world.id::<ReactorNotify>();
2234 let mut system = ReactorSystem::new(&world);
2235
2236 struct Ctx {
2237 _reactor_id: Token,
2238 }
2239
2240 fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2241 *out += 1;
2242 }
2243
2244 let src = notify_mut(&world, nid).register_source();
2245
2246 {
2247 let reg = world.registry();
2248 let notify = notify_mut(&world, nid);
2249 for _ in 0..50 {
2250 notify
2251 .register(|t| Ctx { _reactor_id: t }, step, reg)
2252 .subscribe(src);
2253 }
2254 }
2255
2256 assert_eq!(system.reactor_count(&world), 50);
2257
2258 notify_mut(&world, nid).mark(src);
2259 system.dispatch(&mut world);
2260 assert_eq!(*world.resource::<u64>(), 50); }
2262
2263 #[test]
2264 fn reactor_subscribes_to_multiple_sources() {
2265 let mut wb = WorldBuilder::new();
2268 wb.register::<u64>(0);
2269 wb.register(ReactorNotify::new(8, 8));
2270 wb.register(DeferredRemovals::default());
2271 let mut world = wb.build();
2272 let nid = world.id::<ReactorNotify>();
2273 let mut system = ReactorSystem::new(&world);
2274
2275 struct Ctx {
2276 _reactor_id: Token,
2277 }
2278
2279 fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2280 *out += 1;
2281 }
2282
2283 let mut sources = Vec::new();
2284 let notify = notify_mut(&world, nid);
2285 for _ in 0..5 {
2286 sources.push(notify.register_source());
2287 }
2288
2289 {
2290 let reg = world.registry();
2291 let notify = notify_mut(&world, nid);
2292 let mut registration = notify.register(|t| Ctx { _reactor_id: t }, step, reg);
2293 for &src in &sources {
2294 registration = registration.subscribe(src);
2295 }
2296 }
2297
2298 for &src in &sources {
2300 notify_mut(&world, nid).mark(src);
2301 }
2302
2303 system.dispatch(&mut world);
2304 assert_eq!(*world.resource::<u64>(), 1); }
2306
2307 #[test]
2308 fn stale_data_source_is_noop() {
2309 let mut wb = WorldBuilder::new();
2311 wb.register(ReactorNotify::new(4, 4));
2312 wb.register(DeferredRemovals::default());
2313 let mut world = wb.build();
2314 let nid = world.id::<ReactorNotify>();
2315 let mut system = ReactorSystem::new(&world);
2316
2317 let src = notify_mut(&world, nid).register_source();
2318 notify_mut(&world, nid).remove_source(src);
2319
2320 notify_mut(&world, nid).mark(src);
2322 let ran = system.dispatch(&mut world);
2323 assert!(!ran);
2324 }
2325
2326 #[test]
2327 fn double_remove_source_is_noop() {
2328 let mut notify = ReactorNotify::new(4, 4);
2329 let src = notify.register_source();
2330 notify.remove_source(src);
2331 notify.remove_source(src); }
2333
2334 #[test]
2337 fn pipeline_reactor_dispatch() {
2338 use crate::CtxPipelineBuilder;
2339
2340 let mut wb = WorldBuilder::new();
2341 wb.register::<u64>(0);
2342 wb.register(ReactorNotify::new(4, 8));
2343 wb.register(DeferredRemovals::default());
2344 let mut world = wb.build();
2345 let nid = world.id::<ReactorNotify>();
2346 let mut system = ReactorSystem::new(&world);
2347
2348 struct Ctx {
2349 _reactor_id: Token,
2350 instrument: &'static str,
2351 }
2352
2353 fn read_data(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2354 let _ = ctx.instrument;
2355 *val
2356 }
2357
2358 fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2359 x * 2
2360 }
2361
2362 fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2363 *out = x;
2364 }
2365
2366 let reg = world.registry();
2367
2368 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2369 .then(read_data, reg)
2370 .then(double, reg)
2371 .then(store, reg)
2372 .build();
2373
2374 let notify = notify_mut(&world, nid);
2375 let src = notify.register_source();
2376
2377 let reactor = PipelineReactor::new(
2379 Ctx {
2380 _reactor_id: Token::new(0),
2381 instrument: "BTC",
2382 },
2383 pipeline,
2384 );
2385 notify.register_built(reactor).subscribe(src);
2386
2387 *world.resource_mut::<u64>() = 10;
2389 notify_mut(&world, nid).mark(src);
2390 system.dispatch(&mut world);
2391
2392 assert_eq!(*world.resource::<u64>(), 20); }
2394
2395 #[test]
2396 fn dag_reactor_dispatch() {
2397 use crate::CtxDagBuilder;
2398
2399 let mut wb = WorldBuilder::new();
2400 wb.register::<u64>(0);
2401 wb.register(ReactorNotify::new(4, 8));
2402 wb.register(DeferredRemovals::default());
2403 let mut world = wb.build();
2404 let nid = world.id::<ReactorNotify>();
2405 let mut system = ReactorSystem::new(&world);
2406
2407 struct Ctx {
2408 _reactor_id: Token,
2409 }
2410
2411 fn root(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2412 let _ = ctx;
2413 *val
2414 }
2415
2416 fn arm_double(_ctx: &mut Ctx, val: &u64) -> u64 {
2417 *val * 2
2418 }
2419
2420 fn arm_add(_ctx: &mut Ctx, val: &u64) -> u64 {
2421 *val + 10
2422 }
2423
2424 fn merge(_ctx: &mut Ctx, mut out: ResMut<u64>, a: &u64, b: &u64) {
2425 *out = *a + *b;
2426 }
2427
2428 let reg = world.registry();
2429
2430 let dag = CtxDagBuilder::<Ctx, ()>::new()
2431 .root(root, reg)
2432 .fork()
2433 .arm(|seed| seed.then(arm_double, reg))
2434 .arm(|seed| seed.then(arm_add, reg))
2435 .merge(merge, reg)
2436 .build();
2437
2438 let notify = notify_mut(&world, nid);
2439 let src = notify.register_source();
2440
2441 let reactor = PipelineReactor::new(
2442 Ctx {
2443 _reactor_id: Token::new(0),
2444 },
2445 dag,
2446 );
2447 notify.register_built(reactor).subscribe(src);
2448
2449 *world.resource_mut::<u64>() = 5;
2450 notify_mut(&world, nid).mark(src);
2451 system.dispatch(&mut world);
2452
2453 assert_eq!(*world.resource::<u64>(), 25);
2455 }
2456
2457 #[test]
2458 fn multiple_pipeline_reactors_different_bodies() {
2459 use crate::CtxPipelineBuilder;
2460
2461 let mut wb = WorldBuilder::new();
2462 wb.register::<u64>(0);
2463 wb.register(ReactorNotify::new(4, 8));
2464 wb.register(DeferredRemovals::default());
2465 let mut world = wb.build();
2466 let nid = world.id::<ReactorNotify>();
2467 let mut system = ReactorSystem::new(&world);
2468
2469 struct Ctx {
2470 _reactor_id: Token,
2471 factor: u64,
2472 }
2473
2474 fn multiply(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2475 *val * ctx.factor
2476 }
2477
2478 fn accumulate(_ctx: &mut Ctx, mut out: ResMut<u64>, val: u64) {
2479 *out += val;
2480 }
2481
2482 let reg = world.registry();
2483
2484 let pipeline_a = CtxPipelineBuilder::<Ctx, ()>::new()
2486 .then(multiply, reg)
2487 .then(accumulate, reg)
2488 .build();
2489
2490 let pipeline_b = CtxPipelineBuilder::<Ctx, ()>::new()
2492 .then(multiply, reg)
2493 .then(accumulate, reg)
2494 .build();
2495
2496 let notify = notify_mut(&world, nid);
2497 let src = notify.register_source();
2498
2499 notify
2500 .register_built(PipelineReactor::new(
2501 Ctx {
2502 _reactor_id: Token::new(0),
2503 factor: 2,
2504 },
2505 pipeline_a,
2506 ))
2507 .subscribe(src);
2508
2509 notify
2510 .register_built(PipelineReactor::new(
2511 Ctx {
2512 _reactor_id: Token::new(1),
2513 factor: 10,
2514 },
2515 pipeline_b,
2516 ))
2517 .subscribe(src);
2518
2519 *world.resource_mut::<u64>() = 5;
2520 notify_mut(&world, nid).mark(src);
2521 system.dispatch(&mut world);
2522
2523 let val = *world.resource::<u64>();
2528 assert!(val > 5, "both reactors should have run, got {val}");
2529 }
2530
2531 #[test]
2532 fn pipeline_reactor_with_guard() {
2533 use crate::CtxPipelineBuilder;
2534
2535 let mut wb = WorldBuilder::new();
2536 wb.register::<u64>(0);
2537 wb.register(ReactorNotify::new(4, 8));
2538 wb.register(DeferredRemovals::default());
2539 let mut world = wb.build();
2540 let nid = world.id::<ReactorNotify>();
2541 let mut system = ReactorSystem::new(&world);
2542
2543 struct Ctx {
2544 _reactor_id: Token,
2545 threshold: u64,
2546 }
2547
2548 fn read(_ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2549 *val
2550 }
2551
2552 fn above_threshold(ctx: &mut Ctx, val: &u64) -> bool {
2553 *val > ctx.threshold
2554 }
2555
2556 fn write(_ctx: &mut Ctx, mut out: ResMut<u64>, _val: u64) {
2557 *out = 999;
2558 }
2559
2560 let reg = world.registry();
2561
2562 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2563 .then(read, reg)
2564 .guard(above_threshold, reg)
2565 .map(write, reg)
2566 .build();
2567
2568 let notify = notify_mut(&world, nid);
2569 let src = notify.register_source();
2570
2571 notify
2572 .register_built(PipelineReactor::new(
2573 Ctx {
2574 _reactor_id: Token::new(0),
2575 threshold: 10,
2576 },
2577 pipeline,
2578 ))
2579 .subscribe(src);
2580
2581 *world.resource_mut::<u64>() = 5;
2583 notify_mut(&world, nid).mark(src);
2584 system.dispatch(&mut world);
2585 assert_eq!(*world.resource::<u64>(), 5); *world.resource_mut::<u64>() = 20;
2589 notify_mut(&world, nid).mark(src);
2590 system.dispatch(&mut world);
2591 assert_eq!(*world.resource::<u64>(), 999);
2592 }
2593
2594 #[test]
2597 fn two_phase_registration_safe_api() {
2598 let mut wb = WorldBuilder::new();
2601 wb.register::<u64>(0);
2602 wb.register(ReactorNotify::new(4, 8));
2603 wb.register(DeferredRemovals::default());
2604 let mut world = wb.build();
2605
2606 let mut system = ReactorSystem::new(&world);
2607
2608 struct Ctx {
2609 reactor_id: Token,
2610 instrument: &'static str,
2611 }
2612
2613 fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2614 let _ = ctx.instrument;
2615 *out += ctx.reactor_id.index() as u64 + 1;
2616 }
2617
2618 let src = world.resource_mut::<ReactorNotify>().register_source();
2620 let token = world.resource_mut::<ReactorNotify>().create_reactor();
2621
2622 let reactor = step.into_reactor(
2624 Ctx {
2625 reactor_id: token,
2626 instrument: "BTC",
2627 },
2628 world.registry(),
2629 );
2630
2631 world
2633 .resource_mut::<ReactorNotify>()
2634 .insert_reactor(token, reactor)
2635 .subscribe(src);
2636
2637 world.resource_mut::<ReactorNotify>().mark(src);
2639 system.dispatch(&mut world);
2640 assert_eq!(*world.resource::<u64>(), 1); let token2 = world.resource_mut::<ReactorNotify>().create_reactor();
2644 let actor2 = step.into_reactor(
2645 Ctx {
2646 reactor_id: token2,
2647 instrument: "ETH",
2648 },
2649 world.registry(),
2650 );
2651 world
2652 .resource_mut::<ReactorNotify>()
2653 .insert_reactor(token2, actor2)
2654 .subscribe(src);
2655
2656 world.resource_mut::<ReactorNotify>().mark(src);
2658 system.dispatch(&mut world);
2659 assert_eq!(*world.resource::<u64>(), 4); }
2661
2662 #[test]
2663 fn two_phase_with_pipeline_reactor() {
2664 use crate::CtxPipelineBuilder;
2665
2666 let mut wb = WorldBuilder::new();
2667 wb.register::<u64>(0);
2668 wb.register(ReactorNotify::new(4, 8));
2669 wb.register(DeferredRemovals::default());
2670 let mut world = wb.build();
2671
2672 let mut system = ReactorSystem::new(&world);
2673
2674 struct Ctx {
2675 _reactor_id: Token,
2676 }
2677
2678 fn read(ctx: &mut Ctx, val: Res<u64>) -> u64 {
2679 let _ = ctx;
2680 *val
2681 }
2682
2683 fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2684 x * 2
2685 }
2686
2687 fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2688 *out = x;
2689 }
2690
2691 let src = world.resource_mut::<ReactorNotify>().register_source();
2693 let token = world.resource_mut::<ReactorNotify>().create_reactor();
2694
2695 let reg = world.registry();
2697 let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2698 .then(crate::no_event(read), reg)
2699 .then(double, reg)
2700 .then(store, reg)
2701 .build();
2702 let reactor = PipelineReactor::new(Ctx { _reactor_id: token }, pipeline);
2703
2704 world
2706 .resource_mut::<ReactorNotify>()
2707 .insert_reactor(token, reactor)
2708 .subscribe(src);
2709
2710 *world.resource_mut::<u64>() = 10;
2711 world.resource_mut::<ReactorNotify>().mark(src);
2712 system.dispatch(&mut world);
2713 assert_eq!(*world.resource::<u64>(), 20); }
2715}