Skip to main content

nexus_rt/
reactor.rs

1//! Reactor dispatch system with interest-based notification.
2//!
3//! Reactors are lightweight, per-instance dispatch units — each owns its
4//! own metadata (instrument ID, order ID, algo parameters) and runs a
5//! step function with pre-resolved [`Param`] access when woken.
6//!
7//! # Architecture
8//!
9//! ```text
10//! LocalNotify      (nexus-notify)  — dedup bitset, mark/poll
11//!     ↑
12//! ReactorNotify      (nexus-rt)      — World resource: reactor storage +
13//!                                    data source fan-out + registration
14//! SourceRegistry   (nexus-rt)      — World resource: typed key → DataSource
15//!     ↑
16//! ReactorSystem      (nexus-rt)      — thin dispatch handle (driver level)
17//! ```
18//!
19//! [`SourceRegistry`] maps domain keys (instrument IDs, strategy IDs,
20//! `(Symbol, Venue)` tuples) to [`DataSource`] values for runtime lookup.
21//! Any `Hash + Eq + Send + 'static` type works as a key. All three
22//! resources are auto-registered by [`WorldBuilder::build`](crate::WorldBuilder::build) when the
23//! `reactors` feature is enabled.
24//!
25//! # Use Cases
26//!
27//! ## 1. Market maker with per-instrument quoting reactors
28//!
29//! ```ignore
30//! // Step function — context first, then pre-resolved Params
31//! fn quoting_step(ctx: &mut QuotingCtx, books: Res<OrderBooks>, mut gw: ResMut<Gateway>) {
32//!     let quote = books.compute_quote(ctx.instrument, ctx.layer);
33//!     gw.submit_quote(quote);
34//! }
35//!
36//! // Setup
37//! let notify = world.resource_mut::<ReactorNotify>();
38//! let btc_md = notify.register_source();
39//! let positions = notify.register_source();
40//!
41//! // Map natural keys for runtime lookup
42//! world.resource_mut::<SourceRegistry>().insert(InstrumentId::BTC, btc_md);
43//!
44//! // Register reactor — subscribes to BTC data + positions
45//! notify.register(
46//!     |id| QuotingCtx { reactor_id: id, instrument: InstrumentId::BTC, layer: 1 },
47//!     quoting_step,
48//!     &registry,
49//! )
50//! .subscribe(btc_md)
51//! .subscribe(positions);
52//! ```
53//!
54//! ## 2. TWAP execution algo that self-removes on completion
55//!
56//! ```ignore
57//! fn twap_step(
58//!     ctx: &mut TwapCtx,
59//!     books: Res<OrderBooks>,
60//!     mut gw: ResMut<Gateway>,
61//!     mut removals: ResMut<DeferredRemovals>,
62//! ) {
63//!     gw.submit(ctx.instrument, ctx.slice_size, books.best_ask(ctx.instrument));
64//!     ctx.remaining -= ctx.slice_size;
65//!     if ctx.remaining == 0 {
66//!         removals.deregister(ctx.reactor_id);  // cleaned up after frame
67//!     }
68//! }
69//! ```
70//!
71//! ## 3. Runtime registration from event handlers
72//!
73//! ```ignore
74//! fn on_new_order(
75//!     event: NewOrder,
76//!     mut notify: ResMut<ReactorNotify>,
77//!     sources: Res<SourceRegistry>,
78//! ) {
79//!     let md_source = sources.get(&event.instrument).unwrap();
80//!     notify.register(
81//!         |id| TwapCtx { reactor_id: id, instrument: event.instrument, remaining: event.qty },
82//!         twap_step, &registry,
83//!     ).subscribe(md_source);
84//! }
85//! ```
86//!
87//! ## 4. Order fill routing via wire protocol
88//!
89//! ```ignore
90//! // On submission — embed reactor token in order
91//! fn submit(ctx: &mut Ctx, mut gw: ResMut<Gateway>) {
92//!     gw.submit(Order { client_id: ctx.reactor_id.index(), .. });
93//! }
94//!
95//! // On fill — route back to reactor's data source
96//! fn on_fill(fill: Fill, mut notify: ResMut<ReactorNotify>, sources: Res<SourceRegistry>) {
97//!     if let Some(src) = sources.get(&RoutingKey(fill.client_id)) {
98//!         notify.mark(src);
99//!     }
100//! }
101//! ```
102//!
103//! ## 5. Instrument delisting — cleanup
104//!
105//! ```ignore
106//! fn on_delist(event: Delist, mut notify: ResMut<ReactorNotify>, mut sources: ResMut<SourceRegistry>) {
107//!     if let Some(src) = sources.remove(&event.instrument) {
108//!         notify.remove_source(src);  // frees slab slot for reuse
109//!     }
110//! }
111//! ```
112//!
113//! ## 6. Event handler marking (hot path)
114//!
115//! ```ignore
116//! fn on_btc_tick(event: Tick, mut books: ResMut<OrderBooks>, mut notify: ResMut<ReactorNotify>) {
117//!     books.apply(event);
118//!     notify.mark(btc_md);  // pre-resolved DataSource, O(1), no lookup
119//! }
120//! ```
121
122use std::any::{Any, TypeId};
123use std::hash::Hash;
124
125use nexus_notify::local::LocalNotify;
126use nexus_notify::{Events, Token};
127use rustc_hash::FxHashMap;
128
129use crate::ctx_pipeline::CtxStepCall;
130use crate::handler::Param;
131use crate::world::{Registry, Resource, ResourceId, World};
132
133// =============================================================================
134// DataSource — identifies a category of change
135// =============================================================================
136
137/// Identifies a data source (e.g., "BTC market data", "positions").
138///
139/// Registered via [`ReactorNotify::register_source`]. Event handlers
140/// mark data sources via [`ReactorNotify::mark`] to wake subscribed reactors.
141#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
142pub struct DataSource(pub usize);
143
144// =============================================================================
145// ReactorNotify — World resource: storage + notification + registration
146// =============================================================================
147
148/// Reactor storage, interest mapping, and notification hub.
149///
150/// Lives in the [`World`] as a resource. Handles:
151/// - Reactor registration and storage (`Box<dyn Reactor>` in a slab)
152/// - Data source registration and interest mapping
153/// - Marking data sources as changed (fan-out + dedup)
154///
155/// Event handlers access this via [`ResMut<ReactorNotify>`](crate::ResMut)
156/// to mark data sources or register new reactors at runtime.
157pub struct ReactorNotify {
158    /// Per-reactor token dedup.
159    notify: LocalNotify,
160
161    /// Data source → reactor tokens subscribed to this source.
162    /// Slab-backed for dynamic add/remove with slot reuse.
163    /// Slab key = `DataSource.0`.
164    interests: slab::Slab<Vec<Token>>,
165
166    /// Reverse index: reactor token → data sources it's subscribed to.
167    /// Enables O(subscriptions) removal instead of O(all_sources × all_subs).
168    reactor_sources: Vec<Vec<DataSource>>,
169
170    /// Reactor storage. Slab key = token index.
171    /// `Option` enables move-out-move-back during dispatch to avoid
172    /// aliasing: the reactor is `take()`n before `run()`, then put back.
173    /// `Option<Box<dyn Reactor>>` is niche-optimized — zero extra bytes.
174    reactors: slab::Slab<Option<Box<dyn Reactor>>>,
175}
176
177// Manual Debug — slab of dyn Reactor can't derive
178impl std::fmt::Debug for ReactorNotify {
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        f.debug_struct("ReactorNotify")
181            .field("num_sources", &self.interests.len())
182            .field("num_reactors", &self.reactors.len())
183            .field("notify", &self.notify)
184            .finish()
185    }
186}
187
188impl ReactorNotify {
189    /// Create with capacity hints for data sources and reactors.
190    pub fn new(source_capacity: usize, reactor_capacity: usize) -> Self {
191        Self {
192            notify: LocalNotify::with_capacity(reactor_capacity),
193            interests: slab::Slab::with_capacity(source_capacity),
194            reactor_sources: Vec::with_capacity(reactor_capacity),
195            reactors: slab::Slab::with_capacity(reactor_capacity),
196        }
197    }
198
199    // ── Data sources ────────────────────────────────────────────────────
200
201    /// Register a new data source. Returns its identifier.
202    ///
203    /// Slab-backed — removed sources' slots are reused.
204    pub fn register_source(&mut self) -> DataSource {
205        DataSource(self.interests.insert(Vec::new()))
206    }
207
208    /// Remove a data source. Unsubscribes all reactors and frees the
209    /// slab slot for reuse.
210    ///
211    /// Marking a removed `DataSource` is a no-op (stale handle safety).
212    pub fn remove_source(&mut self, source: DataSource) {
213        if self.interests.contains(source.0) {
214            self.interests.remove(source.0);
215        }
216    }
217
218    // ── Reactor registration ──────────────────────────────────────────────
219
220    /// Reserve a slot for a new reactor and return its [`Token`].
221    ///
222    /// Use with [`insert_reactor`](Self::insert_reactor) to complete registration.
223    /// This two-phase pattern avoids borrow conflicts: you can
224    /// drop the `&mut ReactorNotify` borrow, build the reactor with
225    /// `world.registry()`, then call `insert`.
226    ///
227    /// # Example
228    ///
229    /// ```ignore
230    /// // Phase 1: reserve slot
231    /// let token = world.resource_mut::<ReactorNotify>().create_reactor();
232    ///
233    /// // Phase 2: build reactor (borrows &World for registry)
234    /// let reactor = quoting_step.into_reactor(
235    ///     QuotingCtx { reactor_id: token, instrument: BTC },
236    ///     world.registry(),
237    /// );
238    ///
239    /// // Phase 3: fill slot
240    /// world.resource_mut::<ReactorNotify>()
241    ///     .insert_reactor(token, reactor)
242    ///     .subscribe(btc_md);
243    /// ```
244    pub fn create_reactor(&mut self) -> Token {
245        // Reserve the slot with a None placeholder so no other registration
246        // can claim the same key between alloc and insert.
247        let key = self.reactors.insert(None);
248        self.notify.ensure_capacity(key);
249        // Grow reverse index to cover this reactor token.
250        if key >= self.reactor_sources.len() {
251            self.reactor_sources.resize_with(key + 1, Vec::new);
252        }
253        Token::new(key)
254    }
255
256    /// Insert a pre-built reactor at a previously allocated [`Token`].
257    ///
258    /// The token must have been returned by [`create_reactor`](Self::create_reactor)
259    /// and not yet filled. Completes the two-phase registration.
260    ///
261    /// # Panics
262    ///
263    /// Panics if the token was not allocated by [`create_reactor`](Self::create_reactor) or was
264    /// already filled.
265    pub fn insert_reactor(
266        &mut self,
267        token: Token,
268        reactor: impl Reactor + 'static,
269    ) -> ReactorRegistration<'_> {
270        let idx = token.index();
271        assert!(
272            self.reactors.contains(idx),
273            "token {} was not allocated by create_reactor",
274            idx,
275        );
276        assert!(
277            self.reactors[idx].is_none(),
278            "token {} was already filled",
279            idx,
280        );
281        self.reactors[idx] = Some(Box::new(reactor));
282        ReactorRegistration {
283            token,
284            notify: self,
285        }
286    }
287
288    /// Register a reactor from a step function + context factory.
289    ///
290    /// One-shot convenience when you already have `&Registry` (e.g.,
291    /// inside event handlers via [`Param`] resolution, or in tests).
292    /// For the safe `World`-based API, use [`create_reactor`](Self::create_reactor)
293    /// + [`insert_reactor`](Self::insert_reactor).
294    pub fn register<C, Params, F: IntoReactor<C, Params>>(
295        &mut self,
296        ctx_fn: impl FnOnce(Token) -> C,
297        step: F,
298        registry: &Registry,
299    ) -> ReactorRegistration<'_> {
300        let key = self.reactors.vacant_key();
301        let token = Token::new(key);
302        self.notify.ensure_capacity(key);
303        if key >= self.reactor_sources.len() {
304            self.reactor_sources.resize_with(key + 1, Vec::new);
305        }
306        let ctx = ctx_fn(token);
307        let reactor = step.into_reactor(ctx, registry);
308        let inserted = self.reactors.insert(Some(Box::new(reactor)));
309        debug_assert_eq!(inserted, key);
310        ReactorRegistration {
311            token,
312            notify: self,
313        }
314    }
315
316    /// Register a pre-built reactor in one step.
317    ///
318    /// Convenience for reactors that don't need their [`Token`] in
319    /// the context. For reactors that need the token (wire routing,
320    /// self-deregistration), use [`create_reactor`](Self::create_reactor)
321    /// + [`insert_reactor`](Self::insert_reactor).
322    pub fn register_built(&mut self, reactor: impl Reactor + 'static) -> ReactorRegistration<'_> {
323        let key = self.reactors.vacant_key();
324        let token = Token::new(key);
325        self.notify.ensure_capacity(key);
326        if key >= self.reactor_sources.len() {
327            self.reactor_sources.resize_with(key + 1, Vec::new);
328        }
329        let inserted = self.reactors.insert(Some(Box::new(reactor)));
330        debug_assert_eq!(inserted, key);
331        ReactorRegistration {
332            token,
333            notify: self,
334        }
335    }
336
337    // ── Subscription ────────────────────────────────────────────────────
338
339    /// Subscribe a reactor to a data source.
340    ///
341    /// Idempotent — subscribing twice is a no-op.
342    /// No-op if `source` has been removed.
343    pub fn subscribe(&mut self, reactor: Token, source: DataSource) {
344        if let Some(subscribers) = self.interests.get_mut(source.0) {
345            if !subscribers.contains(&reactor) {
346                subscribers.push(reactor);
347                // Maintain reverse index for O(subscriptions) removal.
348                let idx = reactor.index();
349                debug_assert!(
350                    idx < self.reactor_sources.len(),
351                    "reactor_sources missing entry for reactor token {}",
352                    idx,
353                );
354                self.reactor_sources[idx].push(source);
355            }
356        }
357    }
358
359    /// Unsubscribe a reactor from a data source.
360    pub fn unsubscribe(&mut self, reactor: Token, source: DataSource) {
361        if let Some(subscribers) = self.interests.get_mut(source.0) {
362            subscribers.retain(|&t| t != reactor);
363        }
364        if let Some(sources) = self.reactor_sources.get_mut(reactor.index()) {
365            sources.retain(|&s| s != source);
366        }
367    }
368
369    // ── Hot path ────────────────────────────────────────────────────────
370
371    /// Mark a data source as changed this frame.
372    ///
373    /// Fans out to all subscribed reactor tokens in the underlying
374    /// [`LocalNotify`], with per-reactor dedup.
375    #[inline]
376    pub fn mark(&mut self, source: DataSource) {
377        if let Some(subscribers) = self.interests.get(source.0) {
378            for &reactor_token in subscribers {
379                self.notify.mark(reactor_token);
380            }
381        }
382    }
383
384    /// Poll for woken reactor tokens into the events buffer.
385    #[inline]
386    pub(crate) fn poll(&mut self, events: &mut Events) {
387        self.notify.poll(events);
388    }
389
390    /// Take a reactor out of its slot for dispatch.
391    /// Returns None if the slot is empty or doesn't exist.
392    #[inline]
393    pub(crate) fn take_reactor(&mut self, idx: usize) -> Option<Box<dyn Reactor>> {
394        self.reactors.get_mut(idx).and_then(Option::take)
395    }
396
397    /// Put a reactor back into its slot after dispatch.
398    ///
399    /// The caller guarantees `idx` is a valid, occupied slab key
400    /// (it was just returned by `take_reactor`). Skips the redundant
401    /// `contains` check — single bounds-checked write.
402    #[inline]
403    pub(crate) fn put_reactor(&mut self, idx: usize, reactor: Box<dyn Reactor>) {
404        self.reactors[idx] = Some(reactor);
405    }
406
407    /// Remove a reactor and unsubscribe from all data sources.
408    ///
409    /// Uses the reverse index for O(subscriptions) removal instead of
410    /// scanning all data source interest lists.
411    pub fn remove_reactor(&mut self, token: Token) {
412        let idx = token.index();
413        if self.reactors.contains(idx) {
414            self.reactors.remove(idx);
415            // Use reverse index — only touch sources this reactor subscribed to.
416            if let Some(sources) = self.reactor_sources.get_mut(idx) {
417                for &source in sources.iter() {
418                    if let Some(subscribers) = self.interests.get_mut(source.0) {
419                        subscribers.retain(|&t| t != token);
420                    }
421                }
422                sources.clear();
423            }
424        }
425    }
426
427    // ── Introspection ───────────────────────────────────────────────────
428
429    /// Any reactors woken this frame?
430    pub fn has_notified(&self) -> bool {
431        self.notify.has_notified()
432    }
433
434    /// Number of reactors woken this frame.
435    pub fn notified_count(&self) -> usize {
436        self.notify.notified_count()
437    }
438
439    /// Number of registered data sources.
440    pub fn source_count(&self) -> usize {
441        self.interests.len()
442    }
443
444    /// Number of registered reactors.
445    pub fn reactor_count(&self) -> usize {
446        self.reactors.len()
447    }
448}
449
450impl Resource for ReactorNotify {}
451
452// =============================================================================
453// ReactorRegistration — builder for chaining subscriptions
454// =============================================================================
455
456/// Builder returned by [`ReactorNotify::register`] for chaining subscriptions.
457pub struct ReactorRegistration<'a> {
458    token: Token,
459    notify: &'a mut ReactorNotify,
460}
461
462impl ReactorRegistration<'_> {
463    /// Subscribe this reactor to a data source.
464    pub fn subscribe(self, source: DataSource) -> Self {
465        self.notify.subscribe(self.token, source);
466        self
467    }
468
469    /// The assigned token for this reactor.
470    pub fn token(&self) -> Token {
471        self.token
472    }
473}
474
475// =============================================================================
476// Reactor trait
477// =============================================================================
478
479/// A dispatchable unit with per-instance context.
480///
481/// Reactors own lightweight metadata (instrument ID, order ID, routing
482/// keys). Mutable state belongs in World resources, accessed via
483/// pre-resolved [`Res<T>`](crate::Res) / [`ResMut<T>`](crate::ResMut)
484/// in the step function.
485///
486/// Resource access is resolved once at registration time — dispatch
487/// is a single pointer deref per resource, no HashMap lookups.
488pub trait Reactor: Send {
489    /// Run this reactor with full World access.
490    fn run(&mut self, world: &mut World);
491
492    /// Returns the reactor's name for diagnostics.
493    fn name(&self) -> &'static str {
494        "<unnamed>"
495    }
496}
497
498// =============================================================================
499// ReactorFn — concrete dispatch wrapper
500// =============================================================================
501
502/// Concrete reactor wrapper produced by [`IntoReactor`].
503///
504/// Stores the step function, per-reactor context, and pre-resolved
505/// parameter state. Same pattern as [`Callback`](crate::Callback)
506/// but without an event argument.
507pub struct ReactorFn<C, F, Params: Param> {
508    /// Per-reactor owned context (instrument, order ID, config).
509    pub ctx: C,
510    f: F,
511    state: Params::State,
512    name: &'static str,
513}
514
515// =============================================================================
516// PipelineReactor — reactor backed by a CtxPipeline or CtxDag body
517// =============================================================================
518
519/// A reactor whose body is a [`CtxPipeline`](crate::CtxPipeline),
520/// [`CtxDag`](crate::CtxDag), or any [`CtxStepCall`].
521///
522/// The context `C` holds per-reactor metadata. The body is type-erased
523/// via `Box<dyn CtxStepCall>` since pipeline chain types are unnameable.
524///
525/// # Example
526///
527/// ```ignore
528/// let pipeline = CtxPipelineBuilder::<QuotingCtx, ()>::new()
529///     .then(read_books, &reg)
530///     .then(compute_quote, &reg)
531///     .then(submit_quote, &reg)
532///     .build();
533///
534/// let reactor = PipelineReactor::new(
535///     QuotingCtx { reactor_id: token, instrument: BTC },
536///     pipeline,
537/// );
538///
539/// notify.register_built(reactor).subscribe(btc_md);
540/// ```
541pub struct PipelineReactor<C> {
542    /// Per-reactor owned context.
543    pub ctx: C,
544    body: Box<dyn CtxStepCall<C, (), Out = ()> + Send>,
545}
546
547impl<C: Send + 'static> PipelineReactor<C> {
548    /// Create a reactor from a context and a pipeline/DAG body.
549    ///
550    /// The body must implement `CtxStepCall<C, (), Out = ()>`.
551    /// Both [`CtxPipeline`](crate::CtxPipeline) and
552    /// [`CtxDag`](crate::CtxDag) satisfy this when their output is `()`.
553    pub fn new(ctx: C, body: impl CtxStepCall<C, (), Out = ()> + Send + 'static) -> Self {
554        Self {
555            ctx,
556            body: Box::new(body),
557        }
558    }
559}
560
561impl<C: Send + 'static> Reactor for PipelineReactor<C> {
562    fn run(&mut self, world: &mut World) {
563        self.body.call(&mut self.ctx, world, ());
564    }
565
566    fn name(&self) -> &'static str {
567        std::any::type_name::<C>()
568    }
569}
570
571// =============================================================================
572// IntoReactor — conversion trait
573// =============================================================================
574
575/// Converts a step function into a [`Reactor`].
576///
577/// Step function signature: `fn(&mut C, Params...)` — context first,
578/// then resolved resources. No event argument, no return value.
579///
580/// # Example
581///
582/// ```ignore
583/// fn quoting_step(ctx: &mut QuotingCtx, books: Res<OrderBooks>, mut gw: ResMut<Gateway>) {
584///     let quote = books.compute_quote(ctx.instrument, ctx.layer);
585///     gw.submit_quote(quote);
586/// }
587///
588/// let reactor = quoting_step.into_reactor(QuotingCtx { instrument: BTC, layer: 1 }, &registry);
589/// ```
590#[diagnostic::on_unimplemented(
591    message = "this function cannot be used as a reactor step",
592    note = "reactor step signature: `fn(&mut C, Params...)` — context first, then resources",
593    note = "closures with resource parameters are not supported — use a named `fn`"
594)]
595pub trait IntoReactor<C, Params> {
596    /// The concrete reactor type produced.
597    type Reactor: Reactor + 'static;
598
599    /// Convert this function + context into a reactor, resolving
600    /// parameters from the registry.
601    fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor;
602}
603
604// =============================================================================
605// Arity 0: fn(&mut C) — context only, no Param
606// =============================================================================
607
608impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> Reactor for ReactorFn<C, F, ()> {
609    fn run(&mut self, _world: &mut World) {
610        (self.f)(&mut self.ctx);
611    }
612
613    fn name(&self) -> &'static str {
614        self.name
615    }
616}
617
618impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> IntoReactor<C, ()> for F {
619    type Reactor = ReactorFn<C, F, ()>;
620
621    fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
622        ReactorFn {
623            ctx,
624            f: self,
625            state: <() as Param>::init(registry),
626            name: std::any::type_name::<F>(),
627        }
628    }
629}
630
631// =============================================================================
632// Arities 1-8 via macro
633// =============================================================================
634
635macro_rules! impl_into_reactor {
636    ($($P:ident),+) => {
637        impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
638            Reactor for ReactorFn<C, F, ($($P,)+)>
639        where
640            for<'a> &'a mut F:
641                FnMut(&mut C, $($P,)+) +
642                FnMut(&mut C, $($P::Item<'a>,)+),
643        {
644            #[allow(non_snake_case)]
645            fn run(&mut self, world: &mut World) {
646                #[allow(clippy::too_many_arguments)]
647                fn call_inner<Ctx, $($P,)+>(
648                    mut f: impl FnMut(&mut Ctx, $($P,)+),
649                    ctx: &mut Ctx,
650                    $($P: $P,)+
651                ) {
652                    f(ctx, $($P,)+);
653                }
654
655                // SAFETY: state was produced by Param::init() on the same
656                // Registry that built this World. Single-threaded sequential
657                // dispatch ensures no mutable aliasing across params.
658                #[cfg(debug_assertions)]
659                world.clear_borrows();
660                let ($($P,)+) = unsafe {
661                    <($($P,)+) as Param>::fetch(world, &mut self.state)
662                };
663                call_inner(&mut self.f, &mut self.ctx, $($P,)+);
664            }
665
666            fn name(&self) -> &'static str {
667                self.name
668            }
669        }
670
671        impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
672            IntoReactor<C, ($($P,)+)> for F
673        where
674            for<'a> &'a mut F:
675                FnMut(&mut C, $($P,)+) +
676                FnMut(&mut C, $($P::Item<'a>,)+),
677        {
678            type Reactor = ReactorFn<C, F, ($($P,)+)>;
679
680            fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
681                let state = <($($P,)+) as Param>::init(registry);
682                {
683                    #[allow(non_snake_case)]
684                    let ($($P,)+) = &state;
685                    registry.check_access(&[
686                        $(
687                            (<$P as Param>::resource_id($P),
688                             std::any::type_name::<$P>()),
689                        )+
690                    ]);
691                }
692                ReactorFn {
693                    ctx,
694                    f: self,
695                    state,
696                    name: std::any::type_name::<F>(),
697                }
698            }
699        }
700    };
701}
702
703all_tuples!(impl_into_reactor);
704
705// =============================================================================
706// DeferredRemovals — World resource for reactor self-removal
707// =============================================================================
708
709/// Deferred reactor removal queue.
710///
711/// Reactors request removal during dispatch by pushing their token
712/// via [`ResMut<DeferredRemovals>`](crate::ResMut). The [`ReactorSystem`]
713/// drains this after all reactors in the frame have run.
714#[derive(Default)]
715pub struct DeferredRemovals {
716    tokens: Vec<Token>,
717}
718
719impl DeferredRemovals {
720    /// Request deferred removal of a reactor.
721    ///
722    /// Takes effect after the current dispatch frame completes.
723    /// Duplicate calls are harmless — `remove_reactor` is idempotent.
724    pub fn deregister(&mut self, token: Token) {
725        self.tokens.push(token);
726    }
727
728    /// Swap out the inner Vec for zero-alloc processing.
729    /// Returns the Vec (caller owns it). Leaves self with an empty Vec.
730    #[inline]
731    pub(crate) fn take(&mut self) -> Vec<Token> {
732        std::mem::take(&mut self.tokens)
733    }
734
735    /// Put a (drained) Vec back for reuse. Zero allocation.
736    #[inline]
737    pub(crate) fn put(&mut self, tokens: Vec<Token>) {
738        debug_assert!(tokens.is_empty(), "put() expects a drained Vec");
739        self.tokens = tokens;
740    }
741
742    /// Any removals pending?
743    pub fn is_empty(&self) -> bool {
744        self.tokens.is_empty()
745    }
746}
747
748impl Resource for DeferredRemovals {}
749
750// =============================================================================
751// SourceRegistry — typed key → DataSource resolution
752// =============================================================================
753
754/// Maps natural domain keys to [`DataSource`] values.
755///
756/// Single World resource supporting any number of key types via
757/// type erasure. Each key type `K` gets its own internal
758/// `HashMap<K, DataSource>`. The `TypeId` dispatch is one hash
759/// lookup to find the right inner map — cold path only.
760///
761/// Any `Hash + Eq + Send + 'static` type works as a key — no trait
762/// to implement, no macro to invoke. Newtypes, enums, and tuples
763/// all work out of the box.
764///
765/// # Example
766///
767/// ```ignore
768/// // Setup
769/// let src = notify.register_source();
770/// registry.insert(InstrumentId("BTC"), src);
771///
772/// // Runtime lookup (cold path — from event handler)
773/// let src = registry.get(&InstrumentId("BTC")).unwrap();
774/// notify.register(|t| ctx, step, reg).subscribe(src);
775///
776/// // Hot path — DataSource pre-resolved, no registry involvement
777/// notify.mark(src);
778/// ```
779#[derive(Default)]
780pub struct SourceRegistry {
781    maps: FxHashMap<TypeId, Box<dyn Any + Send>>,
782}
783
784impl SourceRegistry {
785    /// Create an empty registry.
786    pub fn new() -> Self {
787        Self::default()
788    }
789
790    /// Map a typed key to a [`DataSource`].
791    ///
792    /// Overwrites any previous mapping for this key.
793    pub fn insert<K: Hash + Eq + Send + 'static>(&mut self, key: K, source: DataSource) {
794        self.get_or_create_map::<K>().insert(key, source);
795    }
796
797    /// Look up a [`DataSource`] by typed key.
798    ///
799    /// Returns `None` if the key is not registered.
800    pub fn get<K: Hash + Eq + Send + 'static>(&self, key: &K) -> Option<DataSource> {
801        self.get_map::<K>().and_then(|map| map.get(key)).copied()
802    }
803
804    /// Remove a key mapping. Returns the [`DataSource`] so the caller
805    /// can also call [`ReactorNotify::remove_source`] to free the slot.
806    pub fn remove<K: Hash + Eq + Send + 'static>(&mut self, key: &K) -> Option<DataSource> {
807        self.get_map_mut::<K>().and_then(|map| map.remove(key))
808    }
809
810    /// Returns `true` if the key is mapped.
811    pub fn contains<K: Hash + Eq + Send + 'static>(&self, key: &K) -> bool {
812        self.get_map::<K>().is_some_and(|map| map.contains_key(key))
813    }
814
815    fn get_map<K: Hash + Eq + Send + 'static>(&self) -> Option<&FxHashMap<K, DataSource>> {
816        self.maps.get(&TypeId::of::<K>()).map(|boxed| {
817            // Invariant: map was inserted with type K, so downcast always succeeds.
818            boxed
819                .downcast_ref::<FxHashMap<K, DataSource>>()
820                .expect("invariant: TypeId matches stored map type")
821        })
822    }
823
824    fn get_map_mut<K: Hash + Eq + Send + 'static>(
825        &mut self,
826    ) -> Option<&mut FxHashMap<K, DataSource>> {
827        self.maps.get_mut(&TypeId::of::<K>()).map(|boxed| {
828            // Invariant: map was inserted with type K, so downcast always succeeds.
829            boxed
830                .downcast_mut::<FxHashMap<K, DataSource>>()
831                .expect("invariant: TypeId matches stored map type")
832        })
833    }
834
835    fn get_or_create_map<K: Hash + Eq + Send + 'static>(
836        &mut self,
837    ) -> &mut FxHashMap<K, DataSource> {
838        self.maps
839            .entry(TypeId::of::<K>())
840            .or_insert_with(|| Box::<FxHashMap<K, DataSource>>::default())
841            // Invariant: entry was just created or retrieved with type K.
842            .downcast_mut::<FxHashMap<K, DataSource>>()
843            .unwrap()
844    }
845}
846
847impl Resource for SourceRegistry {}
848
849// =============================================================================
850// ReactorSystem — thin dispatch handle
851// =============================================================================
852
853/// Lightweight dispatch handle for the reactor system.
854///
855/// Sits at the driver level (same as mio `Poll` or timer poller).
856/// Reads [`ReactorNotify`] via pre-resolved [`ResourceId`] during
857/// dispatch. All reactor storage and registration lives in
858/// [`ReactorNotify`] (World resource).
859pub struct ReactorSystem {
860    /// Pre-allocated events buffer for polling.
861    events: Events,
862
863    /// Pre-resolved resource IDs for reaching into World.
864    notify_id: ResourceId,
865    removals_id: ResourceId,
866}
867
868impl ReactorSystem {
869    /// Create a dispatch handle from a built [`World`].
870    ///
871    /// The World must contain [`ReactorNotify`] and [`DeferredRemovals`].
872    pub fn new(world: &World) -> Self {
873        Self {
874            events: Events::with_capacity(256),
875            notify_id: world.id::<ReactorNotify>(),
876            removals_id: world.id::<DeferredRemovals>(),
877        }
878    }
879
880    /// Dispatch all woken reactors and process deferred removals.
881    ///
882    /// 1. Polls [`ReactorNotify`] for woken reactor tokens (deduped)
883    /// 2. Runs each reactor's step function with pre-resolved Params
884    /// 3. Drains [`DeferredRemovals`] and removes reactors
885    ///
886    /// Returns `true` if any reactor ran (for scheduler propagation).
887    pub fn dispatch(&mut self, world: &mut World) -> bool {
888        // SAFETY: notify_id was resolved from the same WorldBuilder.
889        // ReactorNotify is heap-allocated — pointer stable for World's lifetime.
890        let notify_ptr: *mut ReactorNotify =
891            unsafe { world.get_mut::<ReactorNotify>(self.notify_id) };
892
893        // Poll — scoped &mut, dropped before reactor dispatch.
894        {
895            let notify = unsafe { &mut *notify_ptr };
896            notify.poll(&mut self.events);
897        }
898        let ran = !self.events.is_empty();
899
900        // Dispatch — each reactor is moved out before run(), put back after.
901        // &mut ReactorNotify is scoped tightly to avoid aliasing during run().
902        for token in self.events.iter() {
903            let idx = token.index();
904            // SAFETY: notify_ptr is valid for World's lifetime. Scoped &mut
905            // is dropped before reactor.run() to avoid aliasing.
906            let reactor = {
907                let notify = unsafe { &mut *notify_ptr };
908                notify.take_reactor(idx)
909            };
910            if let Some(mut reactor) = reactor {
911                reactor.run(world);
912                // SAFETY: re-derive &mut after run() completes. No aliasing —
913                // reactor was moved out of the slab during run().
914                let notify = unsafe { &mut *notify_ptr };
915                notify.put_reactor(idx, reactor);
916            }
917        }
918
919        // Deferred removals — swap Vec out to avoid holding two &mut.
920        // Zero allocation: Vec is swapped back and reused next frame.
921        // SAFETY: removals_id from same WorldBuilder. Dispatch complete.
922        let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
923        let mut pending = removals.take();
924        if !pending.is_empty() {
925            // SAFETY: re-derive &mut for cleanup phase. No other references.
926            let notify = unsafe { &mut *notify_ptr };
927            while let Some(token) = pending.pop() {
928                notify.remove_reactor(token);
929            }
930        }
931        // Put the (now empty) Vec back for reuse.
932        let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
933        removals.put(pending);
934
935        ran
936    }
937
938    /// Number of live reactors.
939    pub fn reactor_count(&self, world: &World) -> usize {
940        world.resource::<ReactorNotify>().reactor_count()
941    }
942}
943
944// =============================================================================
945// Tests
946// =============================================================================
947
948#[cfg(test)]
949mod tests {
950    use super::*;
951    use crate::{Res, ResMut, WorldBuilder};
952
953    // -- Reactor trait dispatch --------------------------------------------------
954
955    #[test]
956    fn reactor_fn_arity0() {
957        let wb = WorldBuilder::new();
958        let mut world = wb.build();
959        let reg = world.registry();
960
961        struct Ctx {
962            count: u32,
963        }
964
965        fn step(ctx: &mut Ctx) {
966            ctx.count += 1;
967        }
968
969        let mut reactor = step.into_reactor(Ctx { count: 0 }, reg);
970        reactor.run(&mut world);
971        assert_eq!(reactor.ctx.count, 1);
972    }
973
974    #[test]
975    fn reactor_fn_with_params() {
976        let mut wb = WorldBuilder::new();
977        wb.register::<u64>(10);
978        wb.register::<u32>(0);
979        let mut world = wb.build();
980        let reg = world.registry();
981
982        struct Ctx {
983            multiplier: u64,
984        }
985
986        fn step(ctx: &mut Ctx, val: Res<u64>, mut out: ResMut<u32>) {
987            *out = (*val * ctx.multiplier) as u32;
988        }
989
990        let mut reactor = step.into_reactor(Ctx { multiplier: 5 }, reg);
991        reactor.run(&mut world);
992        assert_eq!(*world.resource::<u32>(), 50);
993    }
994
995    // -- ReactorNotify ----------------------------------------------------------
996
997    fn dummy_reactor() -> ReactorFn<(), fn(&mut ()), ()> {
998        ReactorFn {
999            ctx: (),
1000            f: (|_: &mut ()| {}) as fn(&mut ()),
1001            state: (),
1002            name: "dummy",
1003        }
1004    }
1005
1006    #[test]
1007    fn reactor_notify_mark_fans_out() {
1008        let mut notify = ReactorNotify::new(4, 8);
1009        let mut events = Events::with_capacity(8);
1010
1011        let src = notify.register_source();
1012        let a1 = notify.register_built(dummy_reactor()).token();
1013        let a2 = notify.register_built(dummy_reactor()).token();
1014        let _a3 = notify.register_built(dummy_reactor()).token();
1015
1016        notify.subscribe(a1, src);
1017        notify.subscribe(a2, src);
1018        // _a3 not subscribed
1019
1020        notify.mark(src);
1021        notify.notify.poll(&mut events);
1022
1023        assert_eq!(events.len(), 2);
1024        assert!(events.as_slice().contains(&a1));
1025        assert!(events.as_slice().contains(&a2));
1026    }
1027
1028    #[test]
1029    fn reactor_notify_dedup_across_sources() {
1030        let mut notify = ReactorNotify::new(4, 8);
1031        let mut events = Events::with_capacity(8);
1032
1033        let src1 = notify.register_source();
1034        let src2 = notify.register_source();
1035        let reactor = notify.register_built(dummy_reactor()).token();
1036
1037        notify.subscribe(reactor, src1);
1038        notify.subscribe(reactor, src2);
1039
1040        notify.mark(src1);
1041        notify.mark(src2);
1042
1043        notify.notify.poll(&mut events);
1044        assert_eq!(events.len(), 1);
1045        assert_eq!(events.as_slice()[0], reactor);
1046    }
1047
1048    #[test]
1049    fn reactor_notify_remove_reactor() {
1050        let mut notify = ReactorNotify::new(4, 8);
1051        let mut events = Events::with_capacity(8);
1052
1053        let src = notify.register_source();
1054
1055        struct Ctx;
1056        let token = notify
1057            .register_built(ReactorFn {
1058                ctx: Ctx,
1059                f: (|_: &mut Ctx| {}) as fn(&mut Ctx),
1060                state: (),
1061                name: "test",
1062            })
1063            .token();
1064        notify.subscribe(token, src);
1065
1066        notify.remove_reactor(token);
1067        notify.mark(src);
1068        notify.notify.poll(&mut events);
1069        assert!(events.is_empty());
1070    }
1071
1072    // -- Full ReactorSystem integration -----------------------------------------
1073
1074    // Helper: registry() borrows &World, resource_mut() borrows &mut World.
1075    // In tests, we use unsafe get_mut via the notify_id to avoid the conflict,
1076    // same pattern as production dispatch code.
1077    fn notify_mut(world: &World, id: ResourceId) -> &mut ReactorNotify {
1078        unsafe { world.get_mut::<ReactorNotify>(id) }
1079    }
1080
1081    #[test]
1082    fn reactor_system_dispatch() {
1083        let mut wb = WorldBuilder::new();
1084        wb.register::<u64>(0);
1085        wb.register(ReactorNotify::new(4, 8));
1086        wb.register(DeferredRemovals::default());
1087        let mut world = wb.build();
1088        let reg = world.registry();
1089        let nid = world.id::<ReactorNotify>();
1090
1091        let mut system = ReactorSystem::new(&world);
1092
1093        struct Ctx {
1094            _reactor_id: Token,
1095            increment: u64,
1096        }
1097
1098        fn step(ctx: &mut Ctx, mut val: ResMut<u64>) {
1099            *val += ctx.increment;
1100        }
1101
1102        let notify = notify_mut(&world, nid);
1103        let src = notify.register_source();
1104        notify
1105            .register(
1106                |t| Ctx {
1107                    _reactor_id: t,
1108                    increment: 10,
1109                },
1110                step,
1111                reg,
1112            )
1113            .subscribe(src);
1114        notify
1115            .register(
1116                |t| Ctx {
1117                    _reactor_id: t,
1118                    increment: 5,
1119                },
1120                step,
1121                reg,
1122            )
1123            .subscribe(src);
1124
1125        // Mark and dispatch
1126        notify_mut(&world, nid).mark(src);
1127        let ran = system.dispatch(&mut world);
1128
1129        assert!(ran);
1130        assert_eq!(*world.resource::<u64>(), 15); // 10 + 5
1131    }
1132
1133    #[test]
1134    fn reactor_system_deferred_removal() {
1135        let mut wb = WorldBuilder::new();
1136        wb.register::<u64>(0);
1137        wb.register(ReactorNotify::new(4, 8));
1138        wb.register(DeferredRemovals::default());
1139        let mut world = wb.build();
1140        let reg = world.registry();
1141        let nid = world.id::<ReactorNotify>();
1142
1143        let mut system = ReactorSystem::new(&world);
1144
1145        struct Ctx {
1146            reactor_id: Token,
1147            runs: u64,
1148        }
1149
1150        fn step(ctx: &mut Ctx, mut val: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1151            *val += 1;
1152            ctx.runs += 1;
1153            if ctx.runs >= 2 {
1154                removals.deregister(ctx.reactor_id);
1155            }
1156        }
1157
1158        let notify = notify_mut(&world, nid);
1159        let src = notify.register_source();
1160        notify
1161            .register(
1162                |t| Ctx {
1163                    reactor_id: t,
1164                    runs: 0,
1165                },
1166                step,
1167                reg,
1168            )
1169            .subscribe(src);
1170
1171        assert_eq!(system.reactor_count(&world), 1);
1172
1173        // Frame 1 — reactor runs, runs=1, no removal
1174        notify_mut(&world, nid).mark(src);
1175        system.dispatch(&mut world);
1176        assert_eq!(*world.resource::<u64>(), 1);
1177        assert_eq!(system.reactor_count(&world), 1);
1178
1179        // Frame 2 — reactor runs, runs=2, deregisters
1180        notify_mut(&world, nid).mark(src);
1181        system.dispatch(&mut world);
1182        assert_eq!(*world.resource::<u64>(), 2);
1183        assert_eq!(system.reactor_count(&world), 0);
1184
1185        // Frame 3 — no reactors, nothing runs
1186        notify_mut(&world, nid).mark(src);
1187        let ran = system.dispatch(&mut world);
1188        assert!(!ran);
1189        assert_eq!(*world.resource::<u64>(), 2);
1190    }
1191
1192    #[test]
1193    fn reactor_system_only_subscribed_wake() {
1194        let mut wb = WorldBuilder::new();
1195        wb.register::<u64>(0);
1196        wb.register(ReactorNotify::new(4, 8));
1197        wb.register(DeferredRemovals::default());
1198        let mut world = wb.build();
1199        let reg = world.registry();
1200        let nid = world.id::<ReactorNotify>();
1201
1202        let mut system = ReactorSystem::new(&world);
1203
1204        struct Ctx {
1205            _reactor_id: Token,
1206            value: u64,
1207        }
1208
1209        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1210            *out += ctx.value;
1211        }
1212
1213        let notify = notify_mut(&world, nid);
1214        let btc = notify.register_source();
1215        let eth = notify.register_source();
1216
1217        notify
1218            .register(
1219                |t| Ctx {
1220                    _reactor_id: t,
1221                    value: 10,
1222                },
1223                step,
1224                reg,
1225            )
1226            .subscribe(btc);
1227        notify
1228            .register(
1229                |t| Ctx {
1230                    _reactor_id: t,
1231                    value: 100,
1232                },
1233                step,
1234                reg,
1235            )
1236            .subscribe(eth);
1237
1238        // Only BTC fires — only reactor 1 runs
1239        notify_mut(&world, nid).mark(btc);
1240        system.dispatch(&mut world);
1241        assert_eq!(*world.resource::<u64>(), 10);
1242
1243        // ETH fires — reactor 2 runs
1244        notify_mut(&world, nid).mark(eth);
1245        system.dispatch(&mut world);
1246        assert_eq!(*world.resource::<u64>(), 110);
1247    }
1248
1249    #[test]
1250    fn runtime_registration() {
1251        let mut wb = WorldBuilder::new();
1252        wb.register::<u64>(0);
1253        wb.register(ReactorNotify::new(4, 8));
1254        wb.register(DeferredRemovals::default());
1255        let mut world = wb.build();
1256        let nid = world.id::<ReactorNotify>();
1257
1258        let mut system = ReactorSystem::new(&world);
1259
1260        struct Ctx {
1261            _reactor_id: Token,
1262            value: u64,
1263        }
1264
1265        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1266            *out += ctx.value;
1267        }
1268
1269        // Register source and first reactor at setup
1270        let src = {
1271            let reg = world.registry();
1272            let notify = notify_mut(&world, nid);
1273            let src = notify.register_source();
1274            notify
1275                .register(
1276                    |t| Ctx {
1277                        _reactor_id: t,
1278                        value: 10,
1279                    },
1280                    step,
1281                    reg,
1282                )
1283                .subscribe(src);
1284            src
1285        };
1286
1287        // Frame 1 — one reactor
1288        notify_mut(&world, nid).mark(src);
1289        system.dispatch(&mut world);
1290        assert_eq!(*world.resource::<u64>(), 10);
1291
1292        // Runtime registration (simulates admin command handler)
1293        {
1294            let reg = world.registry();
1295            notify_mut(&world, nid)
1296                .register(
1297                    |t| Ctx {
1298                        _reactor_id: t,
1299                        value: 100,
1300                    },
1301                    step,
1302                    reg,
1303                )
1304                .subscribe(src);
1305        }
1306
1307        // Frame 2 — both reactors fire
1308        notify_mut(&world, nid).mark(src);
1309        system.dispatch(&mut world);
1310        assert_eq!(*world.resource::<u64>(), 120); // 10 + 10 + 100
1311    }
1312
1313    #[test]
1314    fn register_after_remove_reuses_key() {
1315        let mut wb = WorldBuilder::new();
1316        wb.register::<u64>(0);
1317        wb.register(ReactorNotify::new(4, 8));
1318        wb.register(DeferredRemovals::default());
1319        let mut world = wb.build();
1320        let nid = world.id::<ReactorNotify>();
1321
1322        let mut system = ReactorSystem::new(&world);
1323
1324        struct Ctx {
1325            reactor_id: Token,
1326            value: u64,
1327        }
1328
1329        fn step(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1330            *out += ctx.value;
1331            if ctx.value == 10 {
1332                removals.deregister(ctx.reactor_id);
1333            }
1334        }
1335
1336        let src = {
1337            let reg = world.registry();
1338            let notify = notify_mut(&world, nid);
1339            let src = notify.register_source();
1340            notify
1341                .register(
1342                    |t| Ctx {
1343                        reactor_id: t,
1344                        value: 10,
1345                    },
1346                    step,
1347                    reg,
1348                )
1349                .subscribe(src);
1350            src
1351        };
1352
1353        // Frame 1 — reactor runs and deregisters itself
1354        notify_mut(&world, nid).mark(src);
1355        system.dispatch(&mut world);
1356        assert_eq!(*world.resource::<u64>(), 10);
1357        assert_eq!(system.reactor_count(&world), 0);
1358
1359        // Register a NEW reactor — should reuse key 0
1360        {
1361            let reg = world.registry();
1362            let notify = notify_mut(&world, nid);
1363            let token = notify
1364                .register(
1365                    |t| Ctx {
1366                        reactor_id: t,
1367                        value: 100,
1368                    },
1369                    step,
1370                    reg,
1371                )
1372                .token();
1373            notify.subscribe(token, src);
1374            assert_eq!(token.index(), 0); // slab reused key 0
1375        }
1376
1377        // Frame 2 — new reactor runs correctly
1378        notify_mut(&world, nid).mark(src);
1379        system.dispatch(&mut world);
1380        assert_eq!(*world.resource::<u64>(), 110); // 10 + 100
1381        assert_eq!(system.reactor_count(&world), 1); // still alive (value != 10)
1382    }
1383
1384    #[test]
1385    fn reactor_can_access_actor_notify() {
1386        // Verify no aliasing UB: the move-out-move-back pattern
1387        // allows reactors to safely access ReactorNotify via ResMut.
1388        let mut wb = WorldBuilder::new();
1389        wb.register::<u64>(0);
1390        wb.register(ReactorNotify::new(4, 8));
1391        wb.register(DeferredRemovals::default());
1392        let mut world = wb.build();
1393        let nid = world.id::<ReactorNotify>();
1394
1395        let mut system = ReactorSystem::new(&world);
1396
1397        struct Ctx {
1398            _reactor_id: Token,
1399        }
1400
1401        fn step(_ctx: &mut Ctx, notify: ResMut<ReactorNotify>, mut out: ResMut<u64>) {
1402            // Reactor reads ReactorNotify — this would be UB without move-out
1403            *out = notify.reactor_count() as u64;
1404        }
1405
1406        let src = {
1407            let reg = world.registry();
1408            let notify = notify_mut(&world, nid);
1409            let src = notify.register_source();
1410            notify
1411                .register(|t| Ctx { _reactor_id: t }, step, reg)
1412                .subscribe(src);
1413            src
1414        };
1415
1416        notify_mut(&world, nid).mark(src);
1417        system.dispatch(&mut world);
1418        // Reactor count is 1, but during run() the reactor was taken out,
1419        // so reactor_count() sees 0 reactors with Some values... actually
1420        // slab.len() still counts the slot as occupied. The Option is
1421        // None but the slab entry exists. Let's just verify no panic.
1422        // The important thing is no aliasing UB.
1423    }
1424
1425    // -- Realistic data source patterns ---------------------------------------
1426
1427    #[test]
1428    fn multi_instrument_with_shared_source() {
1429        // Pattern: per-instrument market data sources + a shared "positions"
1430        // source. Quoting reactors subscribe to their instrument + positions.
1431        let mut wb = WorldBuilder::new();
1432        wb.register::<u64>(0);
1433        wb.register(ReactorNotify::new(8, 16));
1434        wb.register(DeferredRemovals::default());
1435        let mut world = wb.build();
1436        let nid = world.id::<ReactorNotify>();
1437        let mut system = ReactorSystem::new(&world);
1438
1439        struct Ctx {
1440            _reactor_id: Token,
1441            instrument: &'static str,
1442        }
1443
1444        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1445            // Each reactor increments by its instrument's "value"
1446            *out += match ctx.instrument {
1447                "BTC" => 100,
1448                "ETH" => 10,
1449                "SOL" => 1,
1450                _ => 0,
1451            };
1452        }
1453
1454        let (btc_md, eth_md, sol_md, positions) = {
1455            let reg = world.registry();
1456            let notify = notify_mut(&world, nid);
1457
1458            // Per-instrument data sources
1459            let btc_md = notify.register_source();
1460            let eth_md = notify.register_source();
1461            let sol_md = notify.register_source();
1462            // Shared source
1463            let positions = notify.register_source();
1464
1465            // BTC reactor: subscribes to btc_md + positions
1466            notify
1467                .register(
1468                    |t| Ctx {
1469                        _reactor_id: t,
1470                        instrument: "BTC",
1471                    },
1472                    step,
1473                    reg,
1474                )
1475                .subscribe(btc_md)
1476                .subscribe(positions);
1477
1478            // ETH reactor: subscribes to eth_md + positions
1479            notify
1480                .register(
1481                    |t| Ctx {
1482                        _reactor_id: t,
1483                        instrument: "ETH",
1484                    },
1485                    step,
1486                    reg,
1487                )
1488                .subscribe(eth_md)
1489                .subscribe(positions);
1490
1491            // SOL reactor: subscribes to sol_md + positions
1492            notify
1493                .register(
1494                    |t| Ctx {
1495                        _reactor_id: t,
1496                        instrument: "SOL",
1497                    },
1498                    step,
1499                    reg,
1500                )
1501                .subscribe(sol_md)
1502                .subscribe(positions);
1503
1504            (btc_md, eth_md, sol_md, positions)
1505        };
1506
1507        // Only BTC data changes — only BTC reactor wakes
1508        notify_mut(&world, nid).mark(btc_md);
1509        system.dispatch(&mut world);
1510        assert_eq!(*world.resource::<u64>(), 100);
1511
1512        // Position update — ALL reactors wake (shared source), deduped
1513        notify_mut(&world, nid).mark(positions);
1514        system.dispatch(&mut world);
1515        assert_eq!(*world.resource::<u64>(), 211); // 100 + 100 + 10 + 1
1516
1517        // BTC + ETH data change in same frame — both reactors wake once
1518        notify_mut(&world, nid).mark(btc_md);
1519        notify_mut(&world, nid).mark(eth_md);
1520        system.dispatch(&mut world);
1521        assert_eq!(*world.resource::<u64>(), 321); // 211 + 100 + 10
1522
1523        // BTC data + position in same frame — BTC reactor wakes ONCE (dedup)
1524        notify_mut(&world, nid).mark(btc_md);
1525        notify_mut(&world, nid).mark(positions);
1526        system.dispatch(&mut world);
1527        // BTC: 100 (deduped, only once), ETH: 10, SOL: 1
1528        assert_eq!(*world.resource::<u64>(), 432); // 321 + 100 + 10 + 1
1529
1530        // Nothing fires — no reactors wake
1531        let ran = system.dispatch(&mut world);
1532        assert!(!ran);
1533        assert_eq!(*world.resource::<u64>(), 432);
1534
1535        // SOL data only — only SOL reactor
1536        notify_mut(&world, nid).mark(sol_md);
1537        system.dispatch(&mut world);
1538        assert_eq!(*world.resource::<u64>(), 433);
1539    }
1540
1541    #[test]
1542    fn per_reactor_fill_routing() {
1543        // Pattern: each reactor gets its own DataSource for fill routing.
1544        // Wire protocol embeds the token index in the order client_id.
1545        // When a fill arrives, the handler marks the reactor's source directly.
1546        use std::collections::HashMap;
1547
1548        let mut wb = WorldBuilder::new();
1549        wb.register::<u64>(0);
1550        wb.register(ReactorNotify::new(8, 16));
1551        wb.register(DeferredRemovals::default());
1552        let mut world = wb.build();
1553        let nid = world.id::<ReactorNotify>();
1554        let mut system = ReactorSystem::new(&world);
1555
1556        struct Ctx {
1557            reactor_id: Token,
1558        }
1559
1560        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1561            *out += ctx.reactor_id.index() as u64 + 1;
1562        }
1563
1564        // Routing table: token index → reactor's fill source
1565        let mut fill_sources: HashMap<usize, DataSource> = HashMap::new();
1566
1567        {
1568            let reg = world.registry();
1569            let notify = notify_mut(&world, nid);
1570
1571            for _ in 0..3 {
1572                // Each reactor gets its own fill source
1573                let fill_src = notify.register_source();
1574                let token = notify
1575                    .register(|t| Ctx { reactor_id: t }, step, reg)
1576                    .subscribe(fill_src)
1577                    .token();
1578
1579                fill_sources.insert(token.index(), fill_src);
1580            }
1581        }
1582
1583        // Simulate fill arriving for reactor 1 — look up its source by wire ID
1584        let wire_client_id: usize = 1;
1585        let fill_source = fill_sources[&wire_client_id];
1586        notify_mut(&world, nid).mark(fill_source);
1587        system.dispatch(&mut world);
1588        // Only reactor 1 ran: token.index()=1, so += 2
1589        assert_eq!(*world.resource::<u64>(), 2);
1590
1591        // Fill for reactor 0
1592        let fill_source = fill_sources[&0];
1593        notify_mut(&world, nid).mark(fill_source);
1594        system.dispatch(&mut world);
1595        // Reactor 0: token.index()=0, += 1
1596        assert_eq!(*world.resource::<u64>(), 3);
1597    }
1598
1599    #[test]
1600    fn dynamic_source_registration() {
1601        // Pattern: data sources added at runtime when new instruments
1602        // come online. Reactors registered and subscribed dynamically.
1603        let mut wb = WorldBuilder::new();
1604        wb.register::<u64>(0);
1605        wb.register(ReactorNotify::new(4, 8));
1606        wb.register(DeferredRemovals::default());
1607        let mut world = wb.build();
1608        let nid = world.id::<ReactorNotify>();
1609        let mut system = ReactorSystem::new(&world);
1610
1611        struct Ctx {
1612            _reactor_id: Token,
1613            value: u64,
1614        }
1615
1616        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1617            *out += ctx.value;
1618        }
1619
1620        // Start with just BTC
1621        let btc_md = {
1622            let reg = world.registry();
1623            let notify = notify_mut(&world, nid);
1624            let btc_md = notify.register_source();
1625            notify
1626                .register(
1627                    |t| Ctx {
1628                        _reactor_id: t,
1629                        value: 10,
1630                    },
1631                    step,
1632                    reg,
1633                )
1634                .subscribe(btc_md);
1635            btc_md
1636        };
1637
1638        notify_mut(&world, nid).mark(btc_md);
1639        system.dispatch(&mut world);
1640        assert_eq!(*world.resource::<u64>(), 10);
1641
1642        // Runtime: new instrument comes online — register source + reactor
1643        let eth_md = {
1644            let reg = world.registry();
1645            let notify = notify_mut(&world, nid);
1646            let eth_md = notify.register_source();
1647            notify
1648                .register(
1649                    |t| Ctx {
1650                        _reactor_id: t,
1651                        value: 100,
1652                    },
1653                    step,
1654                    reg,
1655                )
1656                .subscribe(eth_md);
1657            eth_md
1658        };
1659
1660        // Both instruments fire
1661        notify_mut(&world, nid).mark(btc_md);
1662        notify_mut(&world, nid).mark(eth_md);
1663        system.dispatch(&mut world);
1664        assert_eq!(*world.resource::<u64>(), 120); // 10 + 10 + 100
1665    }
1666
1667    // -- Source removal + slab reuse ------------------------------------------
1668
1669    #[test]
1670    fn remove_source_and_reuse_slot() {
1671        let mut wb = WorldBuilder::new();
1672        wb.register::<u64>(0);
1673        wb.register(ReactorNotify::new(4, 8));
1674        wb.register(DeferredRemovals::default());
1675        let mut world = wb.build();
1676        let nid = world.id::<ReactorNotify>();
1677        let mut system = ReactorSystem::new(&world);
1678
1679        struct Ctx {
1680            _reactor_id: Token,
1681            value: u64,
1682        }
1683
1684        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1685            *out += ctx.value;
1686        }
1687
1688        // Register two sources
1689        let (src_a, src_b) = {
1690            let reg = world.registry();
1691            let notify = notify_mut(&world, nid);
1692            let src_a = notify.register_source();
1693            let src_b = notify.register_source();
1694            notify
1695                .register(
1696                    |t| Ctx {
1697                        _reactor_id: t,
1698                        value: 10,
1699                    },
1700                    step,
1701                    reg,
1702                )
1703                .subscribe(src_a);
1704            notify
1705                .register(
1706                    |t| Ctx {
1707                        _reactor_id: t,
1708                        value: 100,
1709                    },
1710                    step,
1711                    reg,
1712                )
1713                .subscribe(src_b);
1714            (src_a, src_b)
1715        };
1716
1717        // Remove source A
1718        notify_mut(&world, nid).remove_source(src_a);
1719
1720        // Marking removed source is a no-op
1721        notify_mut(&world, nid).mark(src_a);
1722        let ran = system.dispatch(&mut world);
1723        assert!(!ran);
1724
1725        // Source B still works
1726        notify_mut(&world, nid).mark(src_b);
1727        system.dispatch(&mut world);
1728        assert_eq!(*world.resource::<u64>(), 100);
1729
1730        // Register a new source — should reuse slot 0
1731        let src_c = notify_mut(&world, nid).register_source();
1732        assert_eq!(src_c.0, src_a.0); // slab reused the slot
1733
1734        // Subscribe reactor to new source and verify it works
1735        let reg = world.registry();
1736        let notify = notify_mut(&world, nid);
1737        notify
1738            .register(
1739                |t| Ctx {
1740                    _reactor_id: t,
1741                    value: 1,
1742                },
1743                step,
1744                reg,
1745            )
1746            .subscribe(src_c);
1747
1748        notify_mut(&world, nid).mark(src_c);
1749        system.dispatch(&mut world);
1750        assert_eq!(*world.resource::<u64>(), 101); // 100 + 1
1751    }
1752
1753    // -- SourceRegistry -------------------------------------------------------
1754
1755    #[test]
1756    fn source_registry_basic() {
1757        let mut registry = SourceRegistry::new();
1758
1759        #[derive(Hash, Eq, PartialEq, Debug)]
1760        struct InstrumentId(u32);
1761
1762        let src_a = DataSource(0);
1763        let src_b = DataSource(1);
1764
1765        registry.insert(InstrumentId(1), src_a);
1766        registry.insert(InstrumentId(2), src_b);
1767
1768        assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1769        assert_eq!(registry.get(&InstrumentId(2)), Some(src_b));
1770        assert_eq!(registry.get(&InstrumentId(3)), None);
1771        assert!(registry.contains(&InstrumentId(1)));
1772        assert!(!registry.contains(&InstrumentId(3)));
1773    }
1774
1775    #[test]
1776    fn source_registry_multiple_key_types() {
1777        let mut registry = SourceRegistry::new();
1778
1779        #[derive(Hash, Eq, PartialEq)]
1780        struct InstrumentId(u32);
1781
1782        #[derive(Hash, Eq, PartialEq)]
1783        struct StrategyId(u32);
1784
1785        let src_a = DataSource(0);
1786        let src_b = DataSource(1);
1787
1788        // Same registry, different key types
1789        registry.insert(InstrumentId(1), src_a);
1790        registry.insert(StrategyId(1), src_b);
1791
1792        // Different type namespaces — same inner value (1), different results
1793        assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1794        assert_eq!(registry.get(&StrategyId(1)), Some(src_b));
1795    }
1796
1797    #[test]
1798    fn source_registry_tuple_keys() {
1799        let mut registry = SourceRegistry::new();
1800
1801        let src = DataSource(42);
1802        registry.insert(("BTC", "Binance"), src);
1803
1804        assert_eq!(registry.get(&("BTC", "Binance")), Some(src));
1805        assert_eq!(registry.get(&("ETH", "Binance")), None);
1806    }
1807
1808    #[test]
1809    fn source_registry_remove() {
1810        let mut registry = SourceRegistry::new();
1811
1812        let src = DataSource(0);
1813        registry.insert(42u64, src);
1814
1815        assert_eq!(registry.remove(&42u64), Some(src));
1816        assert_eq!(registry.get(&42u64), None);
1817        assert_eq!(registry.remove(&42u64), None); // already gone
1818    }
1819
1820    #[test]
1821    fn source_registry_integrated_with_reactor_system() {
1822        let mut wb = WorldBuilder::new();
1823        wb.register::<u64>(0);
1824        wb.register(ReactorNotify::new(4, 8));
1825        wb.register(DeferredRemovals::default());
1826        wb.register(SourceRegistry::new());
1827        let mut world = wb.build();
1828        let nid = world.id::<ReactorNotify>();
1829        let mut system = ReactorSystem::new(&world);
1830
1831        #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1832        struct Instrument(u32);
1833        const BTC: Instrument = Instrument(0);
1834        const ETH: Instrument = Instrument(1);
1835
1836        struct Ctx {
1837            _reactor_id: Token,
1838            value: u64,
1839        }
1840
1841        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1842            *out += ctx.value;
1843        }
1844
1845        // Setup: register sources and map to natural keys
1846        let btc_src = notify_mut(&world, nid).register_source();
1847        let eth_src = notify_mut(&world, nid).register_source();
1848
1849        world.resource_mut::<SourceRegistry>().insert(BTC, btc_src);
1850        world.resource_mut::<SourceRegistry>().insert(ETH, eth_src);
1851
1852        // Register reactors using natural key lookup
1853        {
1854            let reg = world.registry();
1855            let btc = world.resource::<SourceRegistry>().get(&BTC).unwrap();
1856            let notify = notify_mut(&world, nid);
1857            notify
1858                .register(
1859                    |t| Ctx {
1860                        _reactor_id: t,
1861                        value: 10,
1862                    },
1863                    step,
1864                    reg,
1865                )
1866                .subscribe(btc);
1867        }
1868
1869        // Mark via pre-resolved DataSource
1870        notify_mut(&world, nid).mark(btc_src);
1871        system.dispatch(&mut world);
1872        assert_eq!(*world.resource::<u64>(), 10);
1873
1874        // Delist BTC — remove from both registry and notify
1875        let removed = world.resource_mut::<SourceRegistry>().remove(&BTC);
1876        assert!(removed.is_some());
1877        notify_mut(&world, nid).remove_source(removed.unwrap());
1878
1879        // Marking BTC is now a no-op
1880        notify_mut(&world, nid).mark(btc_src);
1881        let ran = system.dispatch(&mut world);
1882        assert!(!ran);
1883    }
1884
1885    // -- SourceRegistry edge cases --------------------------------------------
1886
1887    #[test]
1888    fn source_registry_overwrite_key() {
1889        let mut registry = SourceRegistry::new();
1890        let src_a = DataSource(0);
1891        let src_b = DataSource(1);
1892
1893        registry.insert(42u32, src_a);
1894        assert_eq!(registry.get(&42u32), Some(src_a));
1895
1896        // Overwrite same key with different source
1897        registry.insert(42u32, src_b);
1898        assert_eq!(registry.get(&42u32), Some(src_b));
1899    }
1900
1901    #[test]
1902    fn source_registry_empty_get() {
1903        let registry = SourceRegistry::new();
1904        // No key type has ever been registered
1905        assert_eq!(registry.get(&42u32), None);
1906        assert!(!registry.contains(&42u32));
1907    }
1908
1909    #[test]
1910    fn source_registry_enum_keys() {
1911        #[derive(Hash, Eq, PartialEq)]
1912        enum Venue {
1913            Binance,
1914            Coinbase,
1915        }
1916
1917        let mut registry = SourceRegistry::new();
1918        let src = DataSource(0);
1919        registry.insert(Venue::Binance, src);
1920
1921        assert_eq!(registry.get(&Venue::Binance), Some(src));
1922        assert_eq!(registry.get(&Venue::Coinbase), None);
1923    }
1924
1925    #[test]
1926    fn source_registry_composite_key() {
1927        // (Strategy, Instrument, Venue) triple as key
1928        #[derive(Hash, Eq, PartialEq)]
1929        struct StrategyId(u32);
1930        #[derive(Hash, Eq, PartialEq)]
1931        struct InstrumentId(u32);
1932        #[derive(Hash, Eq, PartialEq)]
1933        struct VenueId(u32);
1934
1935        let mut registry = SourceRegistry::new();
1936        let src = DataSource(5);
1937        registry.insert((StrategyId(1), InstrumentId(0), VenueId(2)), src);
1938
1939        assert_eq!(
1940            registry.get(&(StrategyId(1), InstrumentId(0), VenueId(2))),
1941            Some(src)
1942        );
1943        // Different strategy
1944        assert_eq!(
1945            registry.get(&(StrategyId(2), InstrumentId(0), VenueId(2))),
1946            None
1947        );
1948    }
1949
1950    // -- Full lifecycle scenarios ---------------------------------------------
1951
1952    #[test]
1953    fn full_lifecycle_add_trade_remove() {
1954        // Simulates: add instrument → reactors trade → delist → cleanup
1955        let mut wb = WorldBuilder::new();
1956        wb.register::<u64>(0);
1957        wb.register(ReactorNotify::new(4, 8));
1958        wb.register(DeferredRemovals::default());
1959        wb.register(SourceRegistry::new());
1960        let mut world = wb.build();
1961        let nid = world.id::<ReactorNotify>();
1962        let mut system = ReactorSystem::new(&world);
1963
1964        #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1965        struct Instrument(u32);
1966
1967        struct Ctx {
1968            _reactor_id: Token,
1969            value: u64,
1970        }
1971
1972        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1973            *out += ctx.value;
1974        }
1975
1976        // Phase 1: Add BTC
1977        let btc_src = notify_mut(&world, nid).register_source();
1978        world
1979            .resource_mut::<SourceRegistry>()
1980            .insert(Instrument(0), btc_src);
1981
1982        {
1983            let reg = world.registry();
1984            let notify = notify_mut(&world, nid);
1985            notify
1986                .register(
1987                    |t| Ctx {
1988                        _reactor_id: t,
1989                        value: 10,
1990                    },
1991                    step,
1992                    reg,
1993                )
1994                .subscribe(btc_src);
1995        }
1996
1997        // Phase 2: Trade
1998        notify_mut(&world, nid).mark(btc_src);
1999        system.dispatch(&mut world);
2000        assert_eq!(*world.resource::<u64>(), 10);
2001
2002        // Phase 3: Add ETH dynamically
2003        let eth_src = notify_mut(&world, nid).register_source();
2004        world
2005            .resource_mut::<SourceRegistry>()
2006            .insert(Instrument(1), eth_src);
2007
2008        {
2009            let reg = world.registry();
2010            let notify = notify_mut(&world, nid);
2011            notify
2012                .register(
2013                    |t| Ctx {
2014                        _reactor_id: t,
2015                        value: 100,
2016                    },
2017                    step,
2018                    reg,
2019                )
2020                .subscribe(eth_src);
2021        }
2022
2023        // Both trade
2024        notify_mut(&world, nid).mark(btc_src);
2025        notify_mut(&world, nid).mark(eth_src);
2026        system.dispatch(&mut world);
2027        assert_eq!(*world.resource::<u64>(), 120);
2028
2029        // Phase 4: Delist BTC
2030        let removed = world
2031            .resource_mut::<SourceRegistry>()
2032            .remove(&Instrument(0));
2033        notify_mut(&world, nid).remove_source(removed.unwrap());
2034
2035        // Only ETH remains
2036        notify_mut(&world, nid).mark(eth_src);
2037        system.dispatch(&mut world);
2038        assert_eq!(*world.resource::<u64>(), 220);
2039
2040        // Phase 5: Add SOL — reuses BTC's old slab slot
2041        let sol_src = notify_mut(&world, nid).register_source();
2042        world
2043            .resource_mut::<SourceRegistry>()
2044            .insert(Instrument(2), sol_src);
2045        assert_eq!(sol_src.0, btc_src.0); // slab reused
2046
2047        {
2048            let reg = world.registry();
2049            let notify = notify_mut(&world, nid);
2050            notify
2051                .register(
2052                    |t| Ctx {
2053                        _reactor_id: t,
2054                        value: 1000,
2055                    },
2056                    step,
2057                    reg,
2058                )
2059                .subscribe(sol_src);
2060        }
2061
2062        // SOL + ETH fire
2063        notify_mut(&world, nid).mark(sol_src);
2064        notify_mut(&world, nid).mark(eth_src);
2065        system.dispatch(&mut world);
2066        assert_eq!(*world.resource::<u64>(), 1320); // 220 + 1000 + 100
2067    }
2068
2069    #[test]
2070    fn multi_strategy_same_instrument() {
2071        // Two strategies on the same instrument with different data sources
2072        let mut wb = WorldBuilder::new();
2073        wb.register::<u64>(0);
2074        wb.register(ReactorNotify::new(8, 16));
2075        wb.register(DeferredRemovals::default());
2076        wb.register(SourceRegistry::new());
2077        let mut world = wb.build();
2078        let nid = world.id::<ReactorNotify>();
2079        let mut system = ReactorSystem::new(&world);
2080
2081        #[derive(Hash, Eq, PartialEq, Clone, Copy)]
2082        struct StrategyInstrument(&'static str, &'static str);
2083
2084        struct Ctx {
2085            _reactor_id: Token,
2086            value: u64,
2087        }
2088
2089        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2090            *out += ctx.value;
2091        }
2092
2093        // Per-strategy+instrument sources
2094        let reg = world.registry();
2095        let notify = notify_mut(&world, nid);
2096
2097        let mm_btc = notify.register_source();
2098        let mm_eth = notify.register_source();
2099        let arb_btc = notify.register_source();
2100
2101        // Market maker on BTC and ETH
2102        notify
2103            .register(
2104                |t| Ctx {
2105                    _reactor_id: t,
2106                    value: 1,
2107                },
2108                step,
2109                reg,
2110            )
2111            .subscribe(mm_btc);
2112        notify
2113            .register(
2114                |t| Ctx {
2115                    _reactor_id: t,
2116                    value: 2,
2117                },
2118                step,
2119                reg,
2120            )
2121            .subscribe(mm_eth);
2122
2123        // Arb strategy on BTC
2124        notify
2125            .register(
2126                |t| Ctx {
2127                    _reactor_id: t,
2128                    value: 100,
2129                },
2130                step,
2131                reg,
2132            )
2133            .subscribe(arb_btc);
2134
2135        // Map composite keys
2136        world
2137            .resource_mut::<SourceRegistry>()
2138            .insert(StrategyInstrument("MM", "BTC"), mm_btc);
2139        world
2140            .resource_mut::<SourceRegistry>()
2141            .insert(StrategyInstrument("MM", "ETH"), mm_eth);
2142        world
2143            .resource_mut::<SourceRegistry>()
2144            .insert(StrategyInstrument("ARB", "BTC"), arb_btc);
2145
2146        // BTC data arrives — both MM-BTC and ARB-BTC should fire
2147        // But they're separate sources, so handler marks both:
2148        let mm_btc_src = world
2149            .resource::<SourceRegistry>()
2150            .get(&StrategyInstrument("MM", "BTC"))
2151            .unwrap();
2152        let arb_btc_src = world
2153            .resource::<SourceRegistry>()
2154            .get(&StrategyInstrument("ARB", "BTC"))
2155            .unwrap();
2156
2157        notify_mut(&world, nid).mark(mm_btc_src);
2158        notify_mut(&world, nid).mark(arb_btc_src);
2159        system.dispatch(&mut world);
2160        assert_eq!(*world.resource::<u64>(), 101); // 1 + 100
2161    }
2162
2163    #[test]
2164    fn reactor_self_removal_with_registry_cleanup() {
2165        // Reactor deregisters itself AND the handler cleans up the source
2166        let mut wb = WorldBuilder::new();
2167        wb.register::<u64>(0);
2168        wb.register(ReactorNotify::new(4, 8));
2169        wb.register(DeferredRemovals::default());
2170        wb.register(SourceRegistry::new());
2171        let mut world = wb.build();
2172        let nid = world.id::<ReactorNotify>();
2173        let mut system = ReactorSystem::new(&world);
2174
2175        struct Ctx {
2176            reactor_id: Token,
2177        }
2178
2179        fn one_shot(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
2180            *out += 1;
2181            removals.deregister(ctx.reactor_id);
2182        }
2183
2184        let src = notify_mut(&world, nid).register_source();
2185        world
2186            .resource_mut::<SourceRegistry>()
2187            .insert("one-shot", src);
2188
2189        {
2190            let reg = world.registry();
2191            let notify = notify_mut(&world, nid);
2192            notify
2193                .register(|t| Ctx { reactor_id: t }, one_shot, reg)
2194                .subscribe(src);
2195        }
2196
2197        // Reactor runs once and removes itself
2198        notify_mut(&world, nid).mark(src);
2199        system.dispatch(&mut world);
2200        assert_eq!(*world.resource::<u64>(), 1);
2201        assert_eq!(system.reactor_count(&world), 0);
2202
2203        // Source still exists in registry but no reactors subscribe
2204        assert!(world.resource::<SourceRegistry>().contains(&"one-shot"));
2205
2206        // Mark again — no reactors wake
2207        notify_mut(&world, nid).mark(src);
2208        let ran = system.dispatch(&mut world);
2209        assert!(!ran);
2210    }
2211
2212    #[test]
2213    fn many_reactors_same_source() {
2214        // 50 reactors all subscribed to one source — all wake, deduped
2215        let mut wb = WorldBuilder::new();
2216        wb.register::<u64>(0);
2217        wb.register(ReactorNotify::new(4, 64));
2218        wb.register(DeferredRemovals::default());
2219        let mut world = wb.build();
2220        let nid = world.id::<ReactorNotify>();
2221        let mut system = ReactorSystem::new(&world);
2222
2223        struct Ctx {
2224            _reactor_id: Token,
2225        }
2226
2227        fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2228            *out += 1;
2229        }
2230
2231        let src = notify_mut(&world, nid).register_source();
2232
2233        {
2234            let reg = world.registry();
2235            let notify = notify_mut(&world, nid);
2236            for _ in 0..50 {
2237                notify
2238                    .register(|t| Ctx { _reactor_id: t }, step, reg)
2239                    .subscribe(src);
2240            }
2241        }
2242
2243        assert_eq!(system.reactor_count(&world), 50);
2244
2245        notify_mut(&world, nid).mark(src);
2246        system.dispatch(&mut world);
2247        assert_eq!(*world.resource::<u64>(), 50); // all 50 ran exactly once
2248    }
2249
2250    #[test]
2251    fn reactor_subscribes_to_multiple_sources() {
2252        // One reactor subscribed to 5 different sources.
2253        // All 5 fire in one frame — reactor runs exactly once.
2254        let mut wb = WorldBuilder::new();
2255        wb.register::<u64>(0);
2256        wb.register(ReactorNotify::new(8, 8));
2257        wb.register(DeferredRemovals::default());
2258        let mut world = wb.build();
2259        let nid = world.id::<ReactorNotify>();
2260        let mut system = ReactorSystem::new(&world);
2261
2262        struct Ctx {
2263            _reactor_id: Token,
2264        }
2265
2266        fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2267            *out += 1;
2268        }
2269
2270        let mut sources = Vec::new();
2271        let notify = notify_mut(&world, nid);
2272        for _ in 0..5 {
2273            sources.push(notify.register_source());
2274        }
2275
2276        {
2277            let reg = world.registry();
2278            let notify = notify_mut(&world, nid);
2279            let mut registration = notify.register(|t| Ctx { _reactor_id: t }, step, reg);
2280            for &src in &sources {
2281                registration = registration.subscribe(src);
2282            }
2283        }
2284
2285        // Mark all 5 sources
2286        for &src in &sources {
2287            notify_mut(&world, nid).mark(src);
2288        }
2289
2290        system.dispatch(&mut world);
2291        assert_eq!(*world.resource::<u64>(), 1); // ONE run despite 5 sources
2292    }
2293
2294    #[test]
2295    fn stale_data_source_is_noop() {
2296        // After removing a source, marking it must not panic
2297        let mut wb = WorldBuilder::new();
2298        wb.register(ReactorNotify::new(4, 4));
2299        wb.register(DeferredRemovals::default());
2300        let mut world = wb.build();
2301        let nid = world.id::<ReactorNotify>();
2302        let mut system = ReactorSystem::new(&world);
2303
2304        let src = notify_mut(&world, nid).register_source();
2305        notify_mut(&world, nid).remove_source(src);
2306
2307        // Must not panic
2308        notify_mut(&world, nid).mark(src);
2309        let ran = system.dispatch(&mut world);
2310        assert!(!ran);
2311    }
2312
2313    #[test]
2314    fn double_remove_source_is_noop() {
2315        let mut notify = ReactorNotify::new(4, 4);
2316        let src = notify.register_source();
2317        notify.remove_source(src);
2318        notify.remove_source(src); // must not panic
2319    }
2320
2321    // -- PipelineReactor: reactor body is a CtxPipeline ----------------------------
2322
2323    #[test]
2324    fn pipeline_reactor_dispatch() {
2325        use crate::CtxPipelineBuilder;
2326
2327        let mut wb = WorldBuilder::new();
2328        wb.register::<u64>(0);
2329        wb.register(ReactorNotify::new(4, 8));
2330        wb.register(DeferredRemovals::default());
2331        let mut world = wb.build();
2332        let nid = world.id::<ReactorNotify>();
2333        let mut system = ReactorSystem::new(&world);
2334
2335        struct Ctx {
2336            _reactor_id: Token,
2337            instrument: &'static str,
2338        }
2339
2340        fn read_data(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2341            let _ = ctx.instrument;
2342            *val
2343        }
2344
2345        fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2346            x * 2
2347        }
2348
2349        fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2350            *out = x;
2351        }
2352
2353        let reg = world.registry();
2354
2355        let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2356            .then(read_data, reg)
2357            .then(double, reg)
2358            .then(store, reg)
2359            .build();
2360
2361        let notify = notify_mut(&world, nid);
2362        let src = notify.register_source();
2363
2364        // Wrap pipeline in PipelineReactor
2365        let reactor = PipelineReactor::new(
2366            Ctx {
2367                _reactor_id: Token::new(0),
2368                instrument: "BTC",
2369            },
2370            pipeline,
2371        );
2372        notify.register_built(reactor).subscribe(src);
2373
2374        // Set initial value and dispatch
2375        *world.resource_mut::<u64>() = 10;
2376        notify_mut(&world, nid).mark(src);
2377        system.dispatch(&mut world);
2378
2379        assert_eq!(*world.resource::<u64>(), 20); // 10 * 2
2380    }
2381
2382    #[test]
2383    fn dag_reactor_dispatch() {
2384        use crate::CtxDagBuilder;
2385
2386        let mut wb = WorldBuilder::new();
2387        wb.register::<u64>(0);
2388        wb.register(ReactorNotify::new(4, 8));
2389        wb.register(DeferredRemovals::default());
2390        let mut world = wb.build();
2391        let nid = world.id::<ReactorNotify>();
2392        let mut system = ReactorSystem::new(&world);
2393
2394        struct Ctx {
2395            _reactor_id: Token,
2396        }
2397
2398        fn root(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2399            let _ = ctx;
2400            *val
2401        }
2402
2403        fn arm_double(_ctx: &mut Ctx, val: &u64) -> u64 {
2404            *val * 2
2405        }
2406
2407        fn arm_add(_ctx: &mut Ctx, val: &u64) -> u64 {
2408            *val + 10
2409        }
2410
2411        fn merge(_ctx: &mut Ctx, mut out: ResMut<u64>, a: &u64, b: &u64) {
2412            *out = *a + *b;
2413        }
2414
2415        let reg = world.registry();
2416
2417        let dag = CtxDagBuilder::<Ctx, ()>::new()
2418            .root(root, reg)
2419            .fork()
2420            .arm(|seed| seed.then(arm_double, reg))
2421            .arm(|seed| seed.then(arm_add, reg))
2422            .merge(merge, reg)
2423            .build();
2424
2425        let notify = notify_mut(&world, nid);
2426        let src = notify.register_source();
2427
2428        let reactor = PipelineReactor::new(
2429            Ctx {
2430                _reactor_id: Token::new(0),
2431            },
2432            dag,
2433        );
2434        notify.register_built(reactor).subscribe(src);
2435
2436        *world.resource_mut::<u64>() = 5;
2437        notify_mut(&world, nid).mark(src);
2438        system.dispatch(&mut world);
2439
2440        // (5 * 2) + (5 + 10) = 10 + 15 = 25
2441        assert_eq!(*world.resource::<u64>(), 25);
2442    }
2443
2444    #[test]
2445    fn multiple_pipeline_reactors_different_bodies() {
2446        use crate::CtxPipelineBuilder;
2447
2448        let mut wb = WorldBuilder::new();
2449        wb.register::<u64>(0);
2450        wb.register(ReactorNotify::new(4, 8));
2451        wb.register(DeferredRemovals::default());
2452        let mut world = wb.build();
2453        let nid = world.id::<ReactorNotify>();
2454        let mut system = ReactorSystem::new(&world);
2455
2456        struct Ctx {
2457            _reactor_id: Token,
2458            factor: u64,
2459        }
2460
2461        fn multiply(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2462            *val * ctx.factor
2463        }
2464
2465        fn accumulate(_ctx: &mut Ctx, mut out: ResMut<u64>, val: u64) {
2466            *out += val;
2467        }
2468
2469        let reg = world.registry();
2470
2471        // Reactor A: multiply by 2
2472        let pipeline_a = CtxPipelineBuilder::<Ctx, ()>::new()
2473            .then(multiply, reg)
2474            .then(accumulate, reg)
2475            .build();
2476
2477        // Reactor B: multiply by 10
2478        let pipeline_b = CtxPipelineBuilder::<Ctx, ()>::new()
2479            .then(multiply, reg)
2480            .then(accumulate, reg)
2481            .build();
2482
2483        let notify = notify_mut(&world, nid);
2484        let src = notify.register_source();
2485
2486        notify
2487            .register_built(PipelineReactor::new(
2488                Ctx {
2489                    _reactor_id: Token::new(0),
2490                    factor: 2,
2491                },
2492                pipeline_a,
2493            ))
2494            .subscribe(src);
2495
2496        notify
2497            .register_built(PipelineReactor::new(
2498                Ctx {
2499                    _reactor_id: Token::new(1),
2500                    factor: 10,
2501                },
2502                pipeline_b,
2503            ))
2504            .subscribe(src);
2505
2506        *world.resource_mut::<u64>() = 5;
2507        notify_mut(&world, nid).mark(src);
2508        system.dispatch(&mut world);
2509
2510        // Reactor A reads 5, adds 5*2=10, so resource=15
2511        // Reactor B reads 15, adds 15*10=150, so resource=165
2512        // (Order depends on dispatch order — both subscribed to same source)
2513        // The value is order-dependent. Just verify both ran:
2514        let val = *world.resource::<u64>();
2515        assert!(val > 5, "both reactors should have run, got {val}");
2516    }
2517
2518    #[test]
2519    fn pipeline_reactor_with_guard() {
2520        use crate::CtxPipelineBuilder;
2521
2522        let mut wb = WorldBuilder::new();
2523        wb.register::<u64>(0);
2524        wb.register(ReactorNotify::new(4, 8));
2525        wb.register(DeferredRemovals::default());
2526        let mut world = wb.build();
2527        let nid = world.id::<ReactorNotify>();
2528        let mut system = ReactorSystem::new(&world);
2529
2530        struct Ctx {
2531            _reactor_id: Token,
2532            threshold: u64,
2533        }
2534
2535        fn read(_ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2536            *val
2537        }
2538
2539        fn above_threshold(ctx: &mut Ctx, val: &u64) -> bool {
2540            *val > ctx.threshold
2541        }
2542
2543        fn write(_ctx: &mut Ctx, mut out: ResMut<u64>, _val: u64) {
2544            *out = 999;
2545        }
2546
2547        let reg = world.registry();
2548
2549        let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2550            .then(read, reg)
2551            .guard(above_threshold, reg)
2552            .map(write, reg)
2553            .build();
2554
2555        let notify = notify_mut(&world, nid);
2556        let src = notify.register_source();
2557
2558        notify
2559            .register_built(PipelineReactor::new(
2560                Ctx {
2561                    _reactor_id: Token::new(0),
2562                    threshold: 10,
2563                },
2564                pipeline,
2565            ))
2566            .subscribe(src);
2567
2568        // Value below threshold — guard blocks
2569        *world.resource_mut::<u64>() = 5;
2570        notify_mut(&world, nid).mark(src);
2571        system.dispatch(&mut world);
2572        assert_eq!(*world.resource::<u64>(), 5); // unchanged
2573
2574        // Value above threshold — guard passes
2575        *world.resource_mut::<u64>() = 20;
2576        notify_mut(&world, nid).mark(src);
2577        system.dispatch(&mut world);
2578        assert_eq!(*world.resource::<u64>(), 999);
2579    }
2580
2581    // -- Two-phase registration (safe API through World) ----------------------
2582
2583    #[test]
2584    fn two_phase_registration_safe_api() {
2585        // Demonstrates the safe API: create_reactor → into_reactor → insert
2586        // No unsafe, no registry borrow conflicts.
2587        let mut wb = WorldBuilder::new();
2588        wb.register::<u64>(0);
2589        wb.register(ReactorNotify::new(4, 8));
2590        wb.register(DeferredRemovals::default());
2591        let mut world = wb.build();
2592
2593        let mut system = ReactorSystem::new(&world);
2594
2595        struct Ctx {
2596            reactor_id: Token,
2597            instrument: &'static str,
2598        }
2599
2600        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2601            let _ = ctx.instrument;
2602            *out += ctx.reactor_id.index() as u64 + 1;
2603        }
2604
2605        // Phase 1: reserve slot
2606        let src = world.resource_mut::<ReactorNotify>().register_source();
2607        let token = world.resource_mut::<ReactorNotify>().create_reactor();
2608
2609        // Phase 2: build reactor with token + registry (no borrow conflict)
2610        let reactor = step.into_reactor(
2611            Ctx {
2612                reactor_id: token,
2613                instrument: "BTC",
2614            },
2615            world.registry(),
2616        );
2617
2618        // Phase 3: insert + subscribe
2619        world
2620            .resource_mut::<ReactorNotify>()
2621            .insert_reactor(token, reactor)
2622            .subscribe(src);
2623
2624        // Verify dispatch
2625        world.resource_mut::<ReactorNotify>().mark(src);
2626        system.dispatch(&mut world);
2627        assert_eq!(*world.resource::<u64>(), 1); // token index 0 + 1
2628
2629        // Second reactor — same pattern
2630        let token2 = world.resource_mut::<ReactorNotify>().create_reactor();
2631        let actor2 = step.into_reactor(
2632            Ctx {
2633                reactor_id: token2,
2634                instrument: "ETH",
2635            },
2636            world.registry(),
2637        );
2638        world
2639            .resource_mut::<ReactorNotify>()
2640            .insert_reactor(token2, actor2)
2641            .subscribe(src);
2642
2643        // Both reactors fire
2644        world.resource_mut::<ReactorNotify>().mark(src);
2645        system.dispatch(&mut world);
2646        assert_eq!(*world.resource::<u64>(), 4); // 1 + (0+1) + (1+1)
2647    }
2648
2649    #[test]
2650    fn two_phase_with_pipeline_reactor() {
2651        use crate::CtxPipelineBuilder;
2652
2653        let mut wb = WorldBuilder::new();
2654        wb.register::<u64>(0);
2655        wb.register(ReactorNotify::new(4, 8));
2656        wb.register(DeferredRemovals::default());
2657        let mut world = wb.build();
2658
2659        let mut system = ReactorSystem::new(&world);
2660
2661        struct Ctx {
2662            _reactor_id: Token,
2663        }
2664
2665        fn read(ctx: &mut Ctx, val: Res<u64>, _: ()) -> u64 {
2666            let _ = ctx;
2667            *val
2668        }
2669
2670        fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2671            x * 2
2672        }
2673
2674        fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2675            *out = x;
2676        }
2677
2678        // Phase 1: reserve + register source
2679        let src = world.resource_mut::<ReactorNotify>().register_source();
2680        let token = world.resource_mut::<ReactorNotify>().create_reactor();
2681
2682        // Phase 2: build pipeline + wrap in PipelineReactor (needs registry)
2683        let reg = world.registry();
2684        let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2685            .then(read, reg)
2686            .then(double, reg)
2687            .then(store, reg)
2688            .build();
2689        let reactor = PipelineReactor::new(Ctx { _reactor_id: token }, pipeline);
2690
2691        // Phase 3: insert
2692        world
2693            .resource_mut::<ReactorNotify>()
2694            .insert_reactor(token, reactor)
2695            .subscribe(src);
2696
2697        *world.resource_mut::<u64>() = 10;
2698        world.resource_mut::<ReactorNotify>().mark(src);
2699        system.dispatch(&mut world);
2700        assert_eq!(*world.resource::<u64>(), 20); // 10 * 2
2701    }
2702}