Skip to main content

nexus_rt/
reactor.rs

1//! Reactor dispatch system with interest-based notification.
2//!
3//! Reactors are lightweight, per-instance dispatch units — each owns its
4//! own metadata (instrument ID, order ID, algo parameters) and runs a
5//! step function with pre-resolved [`Param`] access when woken.
6//!
7//! # Architecture
8//!
9//! ```text
10//! LocalNotify      (nexus-notify)  — dedup bitset, mark/poll
11//!     ↑
12//! ReactorNotify      (nexus-rt)      — World resource: reactor storage +
13//!                                    data source fan-out + registration
14//! SourceRegistry   (nexus-rt)      — World resource: typed key → DataSource
15//!     ↑
16//! ReactorSystem      (nexus-rt)      — thin dispatch handle (driver level)
17//! ```
18//!
19//! [`SourceRegistry`] maps domain keys (instrument IDs, strategy IDs,
20//! `(Symbol, Venue)` tuples) to [`DataSource`] values for runtime lookup.
21//! Any `Hash + Eq + Send + 'static` type works as a key. All three
22//! resources are auto-registered by [`WorldBuilder::build`](crate::WorldBuilder::build) when the
23//! `reactors` feature is enabled.
24//!
25//! # Use Cases
26//!
27//! ## 1. Market maker with per-instrument quoting reactors
28//!
29//! ```ignore
30//! // Step function — context first, then pre-resolved Params
31//! fn quoting_step(ctx: &mut QuotingCtx, books: Res<OrderBooks>, mut gw: ResMut<Gateway>) {
32//!     let quote = books.compute_quote(ctx.instrument, ctx.layer);
33//!     gw.submit_quote(quote);
34//! }
35//!
36//! // Setup
37//! let notify = world.resource_mut::<ReactorNotify>();
38//! let btc_md = notify.register_source();
39//! let positions = notify.register_source();
40//!
41//! // Map natural keys for runtime lookup
42//! world.resource_mut::<SourceRegistry>().insert(InstrumentId::BTC, btc_md);
43//!
44//! // Register reactor — subscribes to BTC data + positions
45//! notify.register(
46//!     |id| QuotingCtx { reactor_id: id, instrument: InstrumentId::BTC, layer: 1 },
47//!     quoting_step,
48//!     &registry,
49//! )
50//! .subscribe(btc_md)
51//! .subscribe(positions);
52//! ```
53//!
54//! ## 2. TWAP execution algo that self-removes on completion
55//!
56//! ```ignore
57//! fn twap_step(
58//!     ctx: &mut TwapCtx,
59//!     books: Res<OrderBooks>,
60//!     mut gw: ResMut<Gateway>,
61//!     mut removals: ResMut<DeferredRemovals>,
62//! ) {
63//!     gw.submit(ctx.instrument, ctx.slice_size, books.best_ask(ctx.instrument));
64//!     ctx.remaining -= ctx.slice_size;
65//!     if ctx.remaining == 0 {
66//!         removals.deregister(ctx.reactor_id);  // cleaned up after frame
67//!     }
68//! }
69//! ```
70//!
71//! ## 3. Runtime registration from event handlers
72//!
73//! ```ignore
74//! fn on_new_order(
75//!     event: NewOrder,
76//!     mut notify: ResMut<ReactorNotify>,
77//!     sources: Res<SourceRegistry>,
78//! ) {
79//!     let md_source = sources.get(&event.instrument).unwrap();
80//!     notify.register(
81//!         |id| TwapCtx { reactor_id: id, instrument: event.instrument, remaining: event.qty },
82//!         twap_step, &registry,
83//!     ).subscribe(md_source);
84//! }
85//! ```
86//!
87//! ## 4. Order fill routing via wire protocol
88//!
89//! ```ignore
90//! // On submission — embed reactor token in order
91//! fn submit(ctx: &mut Ctx, mut gw: ResMut<Gateway>) {
92//!     gw.submit(Order { client_id: ctx.reactor_id.index(), .. });
93//! }
94//!
95//! // On fill — route back to reactor's data source
96//! fn on_fill(fill: Fill, mut notify: ResMut<ReactorNotify>, sources: Res<SourceRegistry>) {
97//!     if let Some(src) = sources.get(&RoutingKey(fill.client_id)) {
98//!         notify.mark(src);
99//!     }
100//! }
101//! ```
102//!
103//! ## 5. Instrument delisting — cleanup
104//!
105//! ```ignore
106//! fn on_delist(event: Delist, mut notify: ResMut<ReactorNotify>, mut sources: ResMut<SourceRegistry>) {
107//!     if let Some(src) = sources.remove(&event.instrument) {
108//!         notify.remove_source(src);  // frees slab slot for reuse
109//!     }
110//! }
111//! ```
112//!
113//! ## 6. Event handler marking (hot path)
114//!
115//! ```ignore
116//! fn on_btc_tick(event: Tick, mut books: ResMut<OrderBooks>, mut notify: ResMut<ReactorNotify>) {
117//!     books.apply(event);
118//!     notify.mark(btc_md);  // pre-resolved DataSource, O(1), no lookup
119//! }
120//! ```
121
122use std::any::{Any, TypeId};
123use std::hash::Hash;
124
125use nexus_notify::local::LocalNotify;
126use nexus_notify::{Events, Token};
127use rustc_hash::FxHashMap;
128
129use crate::ctx_pipeline::CtxStepCall;
130use crate::handler::Param;
131use crate::world::{Registry, Resource, ResourceId, World};
132
133// =============================================================================
134// DataSource — identifies a category of change
135// =============================================================================
136
137/// Identifies a data source (e.g., "BTC market data", "positions").
138///
139/// Registered via [`ReactorNotify::register_source`]. Event handlers
140/// mark data sources via [`ReactorNotify::mark`] to wake subscribed reactors.
141#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
142pub struct DataSource(pub usize);
143
144// =============================================================================
145// ReactorNotify — World resource: storage + notification + registration
146// =============================================================================
147
148/// Reactor storage, interest mapping, and notification hub.
149///
150/// Lives in the [`World`] as a resource. Handles:
151/// - Reactor registration and storage (`Box<dyn Reactor>` in a slab)
152/// - Data source registration and interest mapping
153/// - Marking data sources as changed (fan-out + dedup)
154///
155/// Event handlers access this via [`ResMut<ReactorNotify>`](crate::ResMut)
156/// to mark data sources or register new reactors at runtime.
157pub struct ReactorNotify {
158    /// Per-reactor token dedup.
159    notify: LocalNotify,
160
161    /// Data source → reactor tokens subscribed to this source.
162    /// Slab-backed for dynamic add/remove with slot reuse.
163    /// Slab key = `DataSource.0`.
164    interests: slab::Slab<Vec<Token>>,
165
166    /// Reverse index: reactor token → data sources it's subscribed to.
167    /// Enables O(subscriptions) removal instead of O(all_sources × all_subs).
168    reactor_sources: Vec<Vec<DataSource>>,
169
170    /// Reactor storage. Slab key = token index.
171    /// `Option` enables move-out-move-back during dispatch to avoid
172    /// aliasing: the reactor is `take()`n before `run()`, then put back.
173    /// `Option<Box<dyn Reactor>>` is niche-optimized — zero extra bytes.
174    reactors: slab::Slab<Option<Box<dyn Reactor>>>,
175}
176
177// Manual Debug — slab of dyn Reactor can't derive
178impl std::fmt::Debug for ReactorNotify {
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        f.debug_struct("ReactorNotify")
181            .field("num_sources", &self.interests.len())
182            .field("num_reactors", &self.reactors.len())
183            .field("notify", &self.notify)
184            .finish()
185    }
186}
187
188impl ReactorNotify {
189    /// Create with capacity hints for data sources and reactors.
190    pub fn new(source_capacity: usize, reactor_capacity: usize) -> Self {
191        Self {
192            notify: LocalNotify::with_capacity(reactor_capacity),
193            interests: slab::Slab::with_capacity(source_capacity),
194            reactor_sources: Vec::with_capacity(reactor_capacity),
195            reactors: slab::Slab::with_capacity(reactor_capacity),
196        }
197    }
198
199    // ── Data sources ────────────────────────────────────────────────────
200
201    /// Register a new data source. Returns its identifier.
202    ///
203    /// Slab-backed — removed sources' slots are reused.
204    pub fn register_source(&mut self) -> DataSource {
205        DataSource(self.interests.insert(Vec::new()))
206    }
207
208    /// Remove a data source. Unsubscribes all reactors and frees the
209    /// slab slot for reuse.
210    ///
211    /// Marking a removed `DataSource` is a no-op (stale handle safety).
212    pub fn remove_source(&mut self, source: DataSource) {
213        if self.interests.contains(source.0) {
214            self.interests.remove(source.0);
215        }
216    }
217
218    // ── Reactor registration ──────────────────────────────────────────────
219
220    /// Reserve a slot for a new reactor and return its [`Token`].
221    ///
222    /// Use with [`insert_reactor`](Self::insert_reactor) to complete registration.
223    /// This two-phase pattern avoids borrow conflicts: you can
224    /// drop the `&mut ReactorNotify` borrow, build the reactor with
225    /// `world.registry()`, then call `insert`.
226    ///
227    /// # Example
228    ///
229    /// ```ignore
230    /// // Phase 1: reserve slot
231    /// let token = world.resource_mut::<ReactorNotify>().create_reactor();
232    ///
233    /// // Phase 2: build reactor (borrows &World for registry)
234    /// let reactor = quoting_step.into_reactor(
235    ///     QuotingCtx { reactor_id: token, instrument: BTC },
236    ///     world.registry(),
237    /// );
238    ///
239    /// // Phase 3: fill slot
240    /// world.resource_mut::<ReactorNotify>()
241    ///     .insert_reactor(token, reactor)
242    ///     .subscribe(btc_md);
243    /// ```
244    pub fn create_reactor(&mut self) -> Token {
245        // Reserve the slot with a None placeholder so no other registration
246        // can claim the same key between alloc and insert.
247        let key = self.reactors.insert(None);
248        self.notify.ensure_capacity(key);
249        // Grow reverse index to cover this reactor token.
250        if key >= self.reactor_sources.len() {
251            self.reactor_sources.resize_with(key + 1, Vec::new);
252        }
253        Token::new(key)
254    }
255
256    /// Insert a pre-built reactor at a previously allocated [`Token`].
257    ///
258    /// The token must have been returned by [`create_reactor`](Self::create_reactor)
259    /// and not yet filled. Completes the two-phase registration.
260    ///
261    /// # Panics
262    ///
263    /// Panics if the token was not allocated by [`create_reactor`](Self::create_reactor) or was
264    /// already filled.
265    pub fn insert_reactor(
266        &mut self,
267        token: Token,
268        reactor: impl Reactor + 'static,
269    ) -> ReactorRegistration<'_> {
270        let idx = token.index();
271        assert!(
272            self.reactors.contains(idx),
273            "token {} was not allocated by create_reactor",
274            idx,
275        );
276        assert!(
277            self.reactors[idx].is_none(),
278            "token {} was already filled",
279            idx,
280        );
281        self.reactors[idx] = Some(Box::new(reactor));
282        ReactorRegistration {
283            token,
284            notify: self,
285        }
286    }
287
288    /// Register a reactor from a step function + context factory.
289    ///
290    /// One-shot convenience when you already have `&Registry` (e.g.,
291    /// inside event handlers via [`Param`] resolution, or in tests).
292    /// For the safe `World`-based API, use [`create_reactor`](Self::create_reactor)
293    /// + [`insert_reactor`](Self::insert_reactor).
294    pub fn register<C, Params, F: IntoReactor<C, Params>>(
295        &mut self,
296        ctx_fn: impl FnOnce(Token) -> C,
297        step: F,
298        registry: &Registry,
299    ) -> ReactorRegistration<'_> {
300        let key = self.reactors.vacant_key();
301        let token = Token::new(key);
302        self.notify.ensure_capacity(key);
303        if key >= self.reactor_sources.len() {
304            self.reactor_sources.resize_with(key + 1, Vec::new);
305        }
306        let ctx = ctx_fn(token);
307        let reactor = step.into_reactor(ctx, registry);
308        let inserted = self.reactors.insert(Some(Box::new(reactor)));
309        debug_assert_eq!(inserted, key);
310        ReactorRegistration {
311            token,
312            notify: self,
313        }
314    }
315
316    /// Register a pre-built reactor in one step.
317    ///
318    /// Convenience for reactors that don't need their [`Token`] in
319    /// the context. For reactors that need the token (wire routing,
320    /// self-deregistration), use [`create_reactor`](Self::create_reactor)
321    /// + [`insert_reactor`](Self::insert_reactor).
322    pub fn register_built(&mut self, reactor: impl Reactor + 'static) -> ReactorRegistration<'_> {
323        let key = self.reactors.vacant_key();
324        let token = Token::new(key);
325        self.notify.ensure_capacity(key);
326        if key >= self.reactor_sources.len() {
327            self.reactor_sources.resize_with(key + 1, Vec::new);
328        }
329        let inserted = self.reactors.insert(Some(Box::new(reactor)));
330        debug_assert_eq!(inserted, key);
331        ReactorRegistration {
332            token,
333            notify: self,
334        }
335    }
336
337    // ── Subscription ────────────────────────────────────────────────────
338
339    /// Subscribe a reactor to a data source.
340    ///
341    /// Idempotent — subscribing twice is a no-op.
342    /// No-op if `source` has been removed.
343    pub fn subscribe(&mut self, reactor: Token, source: DataSource) {
344        if let Some(subscribers) = self.interests.get_mut(source.0) {
345            if !subscribers.contains(&reactor) {
346                subscribers.push(reactor);
347                // Maintain reverse index for O(subscriptions) removal.
348                let idx = reactor.index();
349                debug_assert!(
350                    idx < self.reactor_sources.len(),
351                    "reactor_sources missing entry for reactor token {}",
352                    idx,
353                );
354                self.reactor_sources[idx].push(source);
355            }
356        }
357    }
358
359    /// Unsubscribe a reactor from a data source.
360    pub fn unsubscribe(&mut self, reactor: Token, source: DataSource) {
361        if let Some(subscribers) = self.interests.get_mut(source.0) {
362            subscribers.retain(|&t| t != reactor);
363        }
364        if let Some(sources) = self.reactor_sources.get_mut(reactor.index()) {
365            sources.retain(|&s| s != source);
366        }
367    }
368
369    // ── Hot path ────────────────────────────────────────────────────────
370
371    /// Mark a data source as changed this frame.
372    ///
373    /// Fans out to all subscribed reactor tokens in the underlying
374    /// [`LocalNotify`], with per-reactor dedup.
375    #[inline]
376    pub fn mark(&mut self, source: DataSource) {
377        if let Some(subscribers) = self.interests.get(source.0) {
378            for &reactor_token in subscribers {
379                self.notify.mark(reactor_token);
380            }
381        }
382    }
383
384    /// Poll for woken reactor tokens into the events buffer.
385    #[inline]
386    pub(crate) fn poll(&mut self, events: &mut Events) {
387        self.notify.poll(events);
388    }
389
390    /// Take a reactor out of its slot for dispatch.
391    /// Returns None if the slot is empty or doesn't exist.
392    #[inline]
393    pub(crate) fn take_reactor(&mut self, idx: usize) -> Option<Box<dyn Reactor>> {
394        self.reactors.get_mut(idx).and_then(Option::take)
395    }
396
397    /// Put a reactor back into its slot after dispatch.
398    ///
399    /// The caller guarantees `idx` is a valid, occupied slab key
400    /// (it was just returned by `take_reactor`). Skips the redundant
401    /// `contains` check — single bounds-checked write.
402    #[inline]
403    pub(crate) fn put_reactor(&mut self, idx: usize, reactor: Box<dyn Reactor>) {
404        self.reactors[idx] = Some(reactor);
405    }
406
407    /// Remove a reactor and unsubscribe from all data sources.
408    ///
409    /// Uses the reverse index for O(subscriptions) removal instead of
410    /// scanning all data source interest lists.
411    pub fn remove_reactor(&mut self, token: Token) {
412        let idx = token.index();
413        if self.reactors.contains(idx) {
414            self.reactors.remove(idx);
415            // Use reverse index — only touch sources this reactor subscribed to.
416            if let Some(sources) = self.reactor_sources.get_mut(idx) {
417                for &source in sources.iter() {
418                    if let Some(subscribers) = self.interests.get_mut(source.0) {
419                        subscribers.retain(|&t| t != token);
420                    }
421                }
422                sources.clear();
423            }
424        }
425    }
426
427    // ── Introspection ───────────────────────────────────────────────────
428
429    /// Any reactors woken this frame?
430    pub fn has_notified(&self) -> bool {
431        self.notify.has_notified()
432    }
433
434    /// Number of reactors woken this frame.
435    pub fn notified_count(&self) -> usize {
436        self.notify.notified_count()
437    }
438
439    /// Number of registered data sources.
440    pub fn source_count(&self) -> usize {
441        self.interests.len()
442    }
443
444    /// Number of registered reactors.
445    pub fn reactor_count(&self) -> usize {
446        self.reactors.len()
447    }
448}
449
450impl Resource for ReactorNotify {}
451
452// =============================================================================
453// ReactorRegistration — builder for chaining subscriptions
454// =============================================================================
455
456/// Builder returned by [`ReactorNotify::register`] for chaining subscriptions.
457pub struct ReactorRegistration<'a> {
458    token: Token,
459    notify: &'a mut ReactorNotify,
460}
461
462impl ReactorRegistration<'_> {
463    /// Subscribe this reactor to a data source.
464    pub fn subscribe(self, source: DataSource) -> Self {
465        self.notify.subscribe(self.token, source);
466        self
467    }
468
469    /// The assigned token for this reactor.
470    pub fn token(&self) -> Token {
471        self.token
472    }
473}
474
475// =============================================================================
476// Reactor trait
477// =============================================================================
478
479/// A dispatchable unit with per-instance context.
480///
481/// Reactors own lightweight metadata (instrument ID, order ID, routing
482/// keys). Mutable state belongs in World resources, accessed via
483/// pre-resolved [`Res<T>`](crate::Res) / [`ResMut<T>`](crate::ResMut)
484/// in the step function.
485///
486/// Resource access is resolved once at registration time — dispatch
487/// is a single pointer deref per resource, no HashMap lookups.
488pub trait Reactor: Send {
489    /// Run this reactor with full World access.
490    fn run(&mut self, world: &mut World);
491
492    /// Returns the reactor's name for diagnostics.
493    fn name(&self) -> &'static str {
494        "<unnamed>"
495    }
496}
497
498// =============================================================================
499// ReactorFn — concrete dispatch wrapper
500// =============================================================================
501
502/// Concrete reactor wrapper produced by [`IntoReactor`].
503///
504/// Stores the step function, per-reactor context, and pre-resolved
505/// parameter state. Same pattern as [`Callback`](crate::Callback)
506/// but without an event argument.
507pub struct ReactorFn<C, F, Params: Param> {
508    /// Per-reactor owned context (instrument, order ID, config).
509    pub ctx: C,
510    f: F,
511    state: Params::State,
512    name: &'static str,
513}
514
515// =============================================================================
516// PipelineReactor — reactor backed by a CtxPipeline or CtxDag body
517// =============================================================================
518
519/// A reactor whose body is a [`CtxPipeline`](crate::CtxPipeline),
520/// [`CtxDag`](crate::CtxDag), or any [`CtxStepCall`].
521///
522/// The context `C` holds per-reactor metadata. The body is type-erased
523/// via `Box<dyn CtxStepCall>` since pipeline chain types are unnameable.
524///
525/// # Example
526///
527/// ```ignore
528/// let pipeline = CtxPipelineBuilder::<QuotingCtx, ()>::new()
529///     .then(read_books, &reg)
530///     .then(compute_quote, &reg)
531///     .then(submit_quote, &reg)
532///     .build();
533///
534/// let reactor = PipelineReactor::new(
535///     QuotingCtx { reactor_id: token, instrument: BTC },
536///     pipeline,
537/// );
538///
539/// notify.register_built(reactor).subscribe(btc_md);
540/// ```
541pub struct PipelineReactor<C> {
542    /// Per-reactor owned context.
543    pub ctx: C,
544    body: Box<dyn CtxStepCall<C, (), Out = ()> + Send>,
545}
546
547impl<C: Send + 'static> PipelineReactor<C> {
548    /// Create a reactor from a context and a pipeline/DAG body.
549    ///
550    /// The body must implement `CtxStepCall<C, (), Out = ()>`.
551    /// Both [`CtxPipeline`](crate::CtxPipeline) and
552    /// [`CtxDag`](crate::CtxDag) satisfy this when their output is `()`.
553    pub fn new(ctx: C, body: impl CtxStepCall<C, (), Out = ()> + Send + 'static) -> Self {
554        Self {
555            ctx,
556            body: Box::new(body),
557        }
558    }
559}
560
561impl<C: Send + 'static> Reactor for PipelineReactor<C> {
562    fn run(&mut self, world: &mut World) {
563        self.body.call(&mut self.ctx, world, ());
564    }
565
566    fn name(&self) -> &'static str {
567        std::any::type_name::<C>()
568    }
569}
570
571// =============================================================================
572// IntoReactor — conversion trait
573// =============================================================================
574
575/// Converts a step function into a [`Reactor`].
576///
577/// Step function signature: `fn(&mut C, Params...)` — context first,
578/// then resolved resources. No event argument, no return value.
579///
580/// # Example
581///
582/// ```ignore
583/// fn quoting_step(ctx: &mut QuotingCtx, books: Res<OrderBooks>, mut gw: ResMut<Gateway>) {
584///     let quote = books.compute_quote(ctx.instrument, ctx.layer);
585///     gw.submit_quote(quote);
586/// }
587///
588/// let reactor = quoting_step.into_reactor(QuotingCtx { instrument: BTC, layer: 1 }, &registry);
589/// ```
590#[diagnostic::on_unimplemented(
591    message = "this function cannot be used as a reactor step",
592    note = "reactor step signature: `fn(&mut C, Params...)` — context first, then resources",
593    note = "closures with resource parameters are not supported — use a named `fn`"
594)]
595pub trait IntoReactor<C, Params> {
596    /// The concrete reactor type produced.
597    type Reactor: Reactor + 'static;
598
599    /// Convert this function + context into a reactor, resolving
600    /// parameters from the registry.
601    fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor;
602}
603
604// =============================================================================
605// Arity 0: fn(&mut C) — context only, no Param
606// =============================================================================
607
608impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> Reactor for ReactorFn<C, F, ()> {
609    fn run(&mut self, _world: &mut World) {
610        (self.f)(&mut self.ctx);
611    }
612
613    fn name(&self) -> &'static str {
614        self.name
615    }
616}
617
618impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> IntoReactor<C, ()> for F {
619    type Reactor = ReactorFn<C, F, ()>;
620
621    fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
622        ReactorFn {
623            ctx,
624            f: self,
625            state: <() as Param>::init(registry),
626            name: std::any::type_name::<F>(),
627        }
628    }
629}
630
631// =============================================================================
632// Arities 1-8 via macro
633// =============================================================================
634
635macro_rules! impl_into_reactor {
636    ($($P:ident),+) => {
637        impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
638            Reactor for ReactorFn<C, F, ($($P,)+)>
639        where
640            for<'a> &'a mut F:
641                FnMut(&mut C, $($P,)+) +
642                FnMut(&mut C, $($P::Item<'a>,)+),
643        {
644            #[allow(non_snake_case)]
645            fn run(&mut self, world: &mut World) {
646                #[allow(clippy::too_many_arguments)]
647                fn call_inner<Ctx, $($P,)+>(
648                    mut f: impl FnMut(&mut Ctx, $($P,)+),
649                    ctx: &mut Ctx,
650                    $($P: $P,)+
651                ) {
652                    f(ctx, $($P,)+);
653                }
654
655                // SAFETY: state was produced by Param::init() on the same
656                // Registry that built this World. Single-threaded sequential
657                // dispatch ensures no mutable aliasing across params.
658                #[cfg(debug_assertions)]
659                world.clear_borrows();
660                let ($($P,)+) = unsafe {
661                    <($($P,)+) as Param>::fetch(world, &mut self.state)
662                };
663                call_inner(&mut self.f, &mut self.ctx, $($P,)+);
664            }
665
666            fn name(&self) -> &'static str {
667                self.name
668            }
669        }
670
671        impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
672            IntoReactor<C, ($($P,)+)> for F
673        where
674            for<'a> &'a mut F:
675                FnMut(&mut C, $($P,)+) +
676                FnMut(&mut C, $($P::Item<'a>,)+),
677        {
678            type Reactor = ReactorFn<C, F, ($($P,)+)>;
679
680            fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
681                let state = <($($P,)+) as Param>::init(registry);
682                {
683                    #[allow(non_snake_case)]
684                    let ($($P,)+) = &state;
685                    registry.check_access(&[
686                        $(
687                            (<$P as Param>::resource_id($P),
688                             std::any::type_name::<$P>()),
689                        )+
690                    ]);
691                }
692                ReactorFn {
693                    ctx,
694                    f: self,
695                    state,
696                    name: std::any::type_name::<F>(),
697                }
698            }
699        }
700    };
701}
702
703all_tuples!(impl_into_reactor);
704
705// =============================================================================
706// DeferredRemovals — World resource for reactor self-removal
707// =============================================================================
708
709/// Deferred reactor removal queue.
710///
711/// Reactors request removal during dispatch by pushing their token
712/// via [`ResMut<DeferredRemovals>`](crate::ResMut). The [`ReactorSystem`]
713/// drains this after all reactors in the frame have run.
714#[derive(Default)]
715pub struct DeferredRemovals {
716    tokens: Vec<Token>,
717}
718
719impl DeferredRemovals {
720    /// Request deferred removal of a reactor.
721    ///
722    /// Takes effect after the current dispatch frame completes.
723    /// Duplicate calls are harmless — `remove_reactor` is idempotent.
724    pub fn deregister(&mut self, token: Token) {
725        self.tokens.push(token);
726    }
727
728    /// Swap out the inner Vec for zero-alloc processing.
729    /// Returns the Vec (caller owns it). Leaves self with an empty Vec.
730    #[inline]
731    pub(crate) fn take(&mut self) -> Vec<Token> {
732        std::mem::take(&mut self.tokens)
733    }
734
735    /// Put a (drained) Vec back for reuse. Zero allocation.
736    #[inline]
737    pub(crate) fn put(&mut self, tokens: Vec<Token>) {
738        debug_assert!(tokens.is_empty(), "put() expects a drained Vec");
739        self.tokens = tokens;
740    }
741
742    /// Any removals pending?
743    pub fn is_empty(&self) -> bool {
744        self.tokens.is_empty()
745    }
746}
747
748impl Resource for DeferredRemovals {}
749
750// =============================================================================
751// SourceRegistry — typed key → DataSource resolution
752// =============================================================================
753
754/// Maps natural domain keys to [`DataSource`] values.
755///
756/// Single World resource supporting any number of key types via
757/// type erasure. Each key type `K` gets its own internal
758/// `HashMap<K, DataSource>`. The `TypeId` dispatch is one hash
759/// lookup to find the right inner map — cold path only.
760///
761/// Any `Hash + Eq + Send + 'static` type works as a key — no trait
762/// to implement, no macro to invoke. Newtypes, enums, and tuples
763/// all work out of the box.
764///
765/// # Example
766///
767/// ```ignore
768/// // Setup
769/// let src = notify.register_source();
770/// registry.insert(InstrumentId("BTC"), src);
771///
772/// // Runtime lookup (cold path — from event handler)
773/// let src = registry.get(&InstrumentId("BTC")).unwrap();
774/// notify.register(|t| ctx, step, reg).subscribe(src);
775///
776/// // Hot path — DataSource pre-resolved, no registry involvement
777/// notify.mark(src);
778/// ```
779#[derive(Default)]
780pub struct SourceRegistry {
781    maps: FxHashMap<TypeId, Box<dyn Any + Send>>,
782}
783
784impl SourceRegistry {
785    /// Create an empty registry.
786    pub fn new() -> Self {
787        Self::default()
788    }
789
790    /// Map a typed key to a [`DataSource`].
791    ///
792    /// Overwrites any previous mapping for this key.
793    pub fn insert<K: Hash + Eq + Send + 'static>(&mut self, key: K, source: DataSource) {
794        self.get_or_create_map::<K>().insert(key, source);
795    }
796
797    /// Look up a [`DataSource`] by typed key.
798    ///
799    /// Returns `None` if the key is not registered.
800    pub fn get<K: Hash + Eq + Send + 'static>(&self, key: &K) -> Option<DataSource> {
801        self.get_map::<K>().and_then(|map| map.get(key)).copied()
802    }
803
804    /// Remove a key mapping. Returns the [`DataSource`] so the caller
805    /// can also call [`ReactorNotify::remove_source`] to free the slot.
806    pub fn remove<K: Hash + Eq + Send + 'static>(&mut self, key: &K) -> Option<DataSource> {
807        self.get_map_mut::<K>().and_then(|map| map.remove(key))
808    }
809
810    /// Returns `true` if the key is mapped.
811    pub fn contains<K: Hash + Eq + Send + 'static>(&self, key: &K) -> bool {
812        self.get_map::<K>().is_some_and(|map| map.contains_key(key))
813    }
814
815    fn get_map<K: Hash + Eq + Send + 'static>(&self) -> Option<&FxHashMap<K, DataSource>> {
816        self.maps.get(&TypeId::of::<K>()).map(|boxed| {
817            // Invariant: map was inserted with type K, so downcast always succeeds.
818            boxed.downcast_ref::<FxHashMap<K, DataSource>>().unwrap()
819        })
820    }
821
822    fn get_map_mut<K: Hash + Eq + Send + 'static>(
823        &mut self,
824    ) -> Option<&mut FxHashMap<K, DataSource>> {
825        self.maps.get_mut(&TypeId::of::<K>()).map(|boxed| {
826            // Invariant: map was inserted with type K, so downcast always succeeds.
827            boxed.downcast_mut::<FxHashMap<K, DataSource>>().unwrap()
828        })
829    }
830
831    fn get_or_create_map<K: Hash + Eq + Send + 'static>(
832        &mut self,
833    ) -> &mut FxHashMap<K, DataSource> {
834        self.maps
835            .entry(TypeId::of::<K>())
836            .or_insert_with(|| Box::<FxHashMap<K, DataSource>>::default())
837            // Invariant: entry was just created or retrieved with type K.
838            .downcast_mut::<FxHashMap<K, DataSource>>()
839            .unwrap()
840    }
841}
842
843impl Resource for SourceRegistry {}
844
845// =============================================================================
846// ReactorSystem — thin dispatch handle
847// =============================================================================
848
849/// Lightweight dispatch handle for the reactor system.
850///
851/// Sits at the driver level (same as mio `Poll` or timer poller).
852/// Reads [`ReactorNotify`] via pre-resolved [`ResourceId`] during
853/// dispatch. All reactor storage and registration lives in
854/// [`ReactorNotify`] (World resource).
855pub struct ReactorSystem {
856    /// Pre-allocated events buffer for polling.
857    events: Events,
858
859    /// Pre-resolved resource IDs for reaching into World.
860    notify_id: ResourceId,
861    removals_id: ResourceId,
862}
863
864impl ReactorSystem {
865    /// Create a dispatch handle from a built [`World`].
866    ///
867    /// The World must contain [`ReactorNotify`] and [`DeferredRemovals`].
868    pub fn new(world: &World) -> Self {
869        Self {
870            events: Events::with_capacity(256),
871            notify_id: world.id::<ReactorNotify>(),
872            removals_id: world.id::<DeferredRemovals>(),
873        }
874    }
875
876    /// Dispatch all woken reactors and process deferred removals.
877    ///
878    /// 1. Polls [`ReactorNotify`] for woken reactor tokens (deduped)
879    /// 2. Runs each reactor's step function with pre-resolved Params
880    /// 3. Drains [`DeferredRemovals`] and removes reactors
881    ///
882    /// Returns `true` if any reactor ran (for scheduler propagation).
883    pub fn dispatch(&mut self, world: &mut World) -> bool {
884        // SAFETY: notify_id was resolved from the same WorldBuilder.
885        // ReactorNotify is heap-allocated — pointer stable for World's lifetime.
886        let notify_ptr: *mut ReactorNotify =
887            unsafe { world.get_mut::<ReactorNotify>(self.notify_id) };
888
889        // Poll — scoped &mut, dropped before reactor dispatch.
890        {
891            let notify = unsafe { &mut *notify_ptr };
892            notify.poll(&mut self.events);
893        }
894        let ran = !self.events.is_empty();
895
896        // Dispatch — each reactor is moved out before run(), put back after.
897        // &mut ReactorNotify is scoped tightly to avoid aliasing during run().
898        for token in self.events.iter() {
899            let idx = token.index();
900            // SAFETY: notify_ptr is valid for World's lifetime. Scoped &mut
901            // is dropped before reactor.run() to avoid aliasing.
902            let reactor = {
903                let notify = unsafe { &mut *notify_ptr };
904                notify.take_reactor(idx)
905            };
906            if let Some(mut reactor) = reactor {
907                reactor.run(world);
908                // SAFETY: re-derive &mut after run() completes. No aliasing —
909                // reactor was moved out of the slab during run().
910                let notify = unsafe { &mut *notify_ptr };
911                notify.put_reactor(idx, reactor);
912            }
913        }
914
915        // Deferred removals — swap Vec out to avoid holding two &mut.
916        // Zero allocation: Vec is swapped back and reused next frame.
917        // SAFETY: removals_id from same WorldBuilder. Dispatch complete.
918        let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
919        let mut pending = removals.take();
920        if !pending.is_empty() {
921            // SAFETY: re-derive &mut for cleanup phase. No other references.
922            let notify = unsafe { &mut *notify_ptr };
923            while let Some(token) = pending.pop() {
924                notify.remove_reactor(token);
925            }
926        }
927        // Put the (now empty) Vec back for reuse.
928        let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
929        removals.put(pending);
930
931        ran
932    }
933
934    /// Number of live reactors.
935    pub fn reactor_count(&self, world: &World) -> usize {
936        world.resource::<ReactorNotify>().reactor_count()
937    }
938}
939
940// =============================================================================
941// Tests
942// =============================================================================
943
944#[cfg(test)]
945mod tests {
946    use super::*;
947    use crate::{Res, ResMut, WorldBuilder};
948
949    // -- Reactor trait dispatch --------------------------------------------------
950
951    #[test]
952    fn reactor_fn_arity0() {
953        let wb = WorldBuilder::new();
954        let mut world = wb.build();
955        let reg = world.registry();
956
957        struct Ctx {
958            count: u32,
959        }
960
961        fn step(ctx: &mut Ctx) {
962            ctx.count += 1;
963        }
964
965        let mut reactor = step.into_reactor(Ctx { count: 0 }, reg);
966        reactor.run(&mut world);
967        assert_eq!(reactor.ctx.count, 1);
968    }
969
970    #[test]
971    fn reactor_fn_with_params() {
972        let mut wb = WorldBuilder::new();
973        wb.register::<u64>(10);
974        wb.register::<u32>(0);
975        let mut world = wb.build();
976        let reg = world.registry();
977
978        struct Ctx {
979            multiplier: u64,
980        }
981
982        fn step(ctx: &mut Ctx, val: Res<u64>, mut out: ResMut<u32>) {
983            *out = (*val * ctx.multiplier) as u32;
984        }
985
986        let mut reactor = step.into_reactor(Ctx { multiplier: 5 }, reg);
987        reactor.run(&mut world);
988        assert_eq!(*world.resource::<u32>(), 50);
989    }
990
991    // -- ReactorNotify ----------------------------------------------------------
992
993    fn dummy_reactor() -> ReactorFn<(), fn(&mut ()), ()> {
994        ReactorFn {
995            ctx: (),
996            f: (|_: &mut ()| {}) as fn(&mut ()),
997            state: (),
998            name: "dummy",
999        }
1000    }
1001
1002    #[test]
1003    fn reactor_notify_mark_fans_out() {
1004        let mut notify = ReactorNotify::new(4, 8);
1005        let mut events = Events::with_capacity(8);
1006
1007        let src = notify.register_source();
1008        let a1 = notify.register_built(dummy_reactor()).token();
1009        let a2 = notify.register_built(dummy_reactor()).token();
1010        let _a3 = notify.register_built(dummy_reactor()).token();
1011
1012        notify.subscribe(a1, src);
1013        notify.subscribe(a2, src);
1014        // _a3 not subscribed
1015
1016        notify.mark(src);
1017        notify.notify.poll(&mut events);
1018
1019        assert_eq!(events.len(), 2);
1020        assert!(events.as_slice().contains(&a1));
1021        assert!(events.as_slice().contains(&a2));
1022    }
1023
1024    #[test]
1025    fn reactor_notify_dedup_across_sources() {
1026        let mut notify = ReactorNotify::new(4, 8);
1027        let mut events = Events::with_capacity(8);
1028
1029        let src1 = notify.register_source();
1030        let src2 = notify.register_source();
1031        let reactor = notify.register_built(dummy_reactor()).token();
1032
1033        notify.subscribe(reactor, src1);
1034        notify.subscribe(reactor, src2);
1035
1036        notify.mark(src1);
1037        notify.mark(src2);
1038
1039        notify.notify.poll(&mut events);
1040        assert_eq!(events.len(), 1);
1041        assert_eq!(events.as_slice()[0], reactor);
1042    }
1043
1044    #[test]
1045    fn reactor_notify_remove_reactor() {
1046        let mut notify = ReactorNotify::new(4, 8);
1047        let mut events = Events::with_capacity(8);
1048
1049        let src = notify.register_source();
1050
1051        struct Ctx;
1052        let token = notify
1053            .register_built(ReactorFn {
1054                ctx: Ctx,
1055                f: (|_: &mut Ctx| {}) as fn(&mut Ctx),
1056                state: (),
1057                name: "test",
1058            })
1059            .token();
1060        notify.subscribe(token, src);
1061
1062        notify.remove_reactor(token);
1063        notify.mark(src);
1064        notify.notify.poll(&mut events);
1065        assert!(events.is_empty());
1066    }
1067
1068    // -- Full ReactorSystem integration -----------------------------------------
1069
1070    // Helper: registry() borrows &World, resource_mut() borrows &mut World.
1071    // In tests, we use unsafe get_mut via the notify_id to avoid the conflict,
1072    // same pattern as production dispatch code.
1073    fn notify_mut(world: &World, id: ResourceId) -> &mut ReactorNotify {
1074        unsafe { world.get_mut::<ReactorNotify>(id) }
1075    }
1076
1077    #[test]
1078    fn reactor_system_dispatch() {
1079        let mut wb = WorldBuilder::new();
1080        wb.register::<u64>(0);
1081        wb.register(ReactorNotify::new(4, 8));
1082        wb.register(DeferredRemovals::default());
1083        let mut world = wb.build();
1084        let reg = world.registry();
1085        let nid = world.id::<ReactorNotify>();
1086
1087        let mut system = ReactorSystem::new(&world);
1088
1089        struct Ctx {
1090            _reactor_id: Token,
1091            increment: u64,
1092        }
1093
1094        fn step(ctx: &mut Ctx, mut val: ResMut<u64>) {
1095            *val += ctx.increment;
1096        }
1097
1098        let notify = notify_mut(&world, nid);
1099        let src = notify.register_source();
1100        notify
1101            .register(
1102                |t| Ctx {
1103                    _reactor_id: t,
1104                    increment: 10,
1105                },
1106                step,
1107                reg,
1108            )
1109            .subscribe(src);
1110        notify
1111            .register(
1112                |t| Ctx {
1113                    _reactor_id: t,
1114                    increment: 5,
1115                },
1116                step,
1117                reg,
1118            )
1119            .subscribe(src);
1120
1121        // Mark and dispatch
1122        notify_mut(&world, nid).mark(src);
1123        let ran = system.dispatch(&mut world);
1124
1125        assert!(ran);
1126        assert_eq!(*world.resource::<u64>(), 15); // 10 + 5
1127    }
1128
1129    #[test]
1130    fn reactor_system_deferred_removal() {
1131        let mut wb = WorldBuilder::new();
1132        wb.register::<u64>(0);
1133        wb.register(ReactorNotify::new(4, 8));
1134        wb.register(DeferredRemovals::default());
1135        let mut world = wb.build();
1136        let reg = world.registry();
1137        let nid = world.id::<ReactorNotify>();
1138
1139        let mut system = ReactorSystem::new(&world);
1140
1141        struct Ctx {
1142            reactor_id: Token,
1143            runs: u64,
1144        }
1145
1146        fn step(ctx: &mut Ctx, mut val: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1147            *val += 1;
1148            ctx.runs += 1;
1149            if ctx.runs >= 2 {
1150                removals.deregister(ctx.reactor_id);
1151            }
1152        }
1153
1154        let notify = notify_mut(&world, nid);
1155        let src = notify.register_source();
1156        notify
1157            .register(
1158                |t| Ctx {
1159                    reactor_id: t,
1160                    runs: 0,
1161                },
1162                step,
1163                reg,
1164            )
1165            .subscribe(src);
1166
1167        assert_eq!(system.reactor_count(&world), 1);
1168
1169        // Frame 1 — reactor runs, runs=1, no removal
1170        notify_mut(&world, nid).mark(src);
1171        system.dispatch(&mut world);
1172        assert_eq!(*world.resource::<u64>(), 1);
1173        assert_eq!(system.reactor_count(&world), 1);
1174
1175        // Frame 2 — reactor runs, runs=2, deregisters
1176        notify_mut(&world, nid).mark(src);
1177        system.dispatch(&mut world);
1178        assert_eq!(*world.resource::<u64>(), 2);
1179        assert_eq!(system.reactor_count(&world), 0);
1180
1181        // Frame 3 — no reactors, nothing runs
1182        notify_mut(&world, nid).mark(src);
1183        let ran = system.dispatch(&mut world);
1184        assert!(!ran);
1185        assert_eq!(*world.resource::<u64>(), 2);
1186    }
1187
1188    #[test]
1189    fn reactor_system_only_subscribed_wake() {
1190        let mut wb = WorldBuilder::new();
1191        wb.register::<u64>(0);
1192        wb.register(ReactorNotify::new(4, 8));
1193        wb.register(DeferredRemovals::default());
1194        let mut world = wb.build();
1195        let reg = world.registry();
1196        let nid = world.id::<ReactorNotify>();
1197
1198        let mut system = ReactorSystem::new(&world);
1199
1200        struct Ctx {
1201            _reactor_id: Token,
1202            value: u64,
1203        }
1204
1205        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1206            *out += ctx.value;
1207        }
1208
1209        let notify = notify_mut(&world, nid);
1210        let btc = notify.register_source();
1211        let eth = notify.register_source();
1212
1213        notify
1214            .register(
1215                |t| Ctx {
1216                    _reactor_id: t,
1217                    value: 10,
1218                },
1219                step,
1220                reg,
1221            )
1222            .subscribe(btc);
1223        notify
1224            .register(
1225                |t| Ctx {
1226                    _reactor_id: t,
1227                    value: 100,
1228                },
1229                step,
1230                reg,
1231            )
1232            .subscribe(eth);
1233
1234        // Only BTC fires — only reactor 1 runs
1235        notify_mut(&world, nid).mark(btc);
1236        system.dispatch(&mut world);
1237        assert_eq!(*world.resource::<u64>(), 10);
1238
1239        // ETH fires — reactor 2 runs
1240        notify_mut(&world, nid).mark(eth);
1241        system.dispatch(&mut world);
1242        assert_eq!(*world.resource::<u64>(), 110);
1243    }
1244
1245    #[test]
1246    fn runtime_registration() {
1247        let mut wb = WorldBuilder::new();
1248        wb.register::<u64>(0);
1249        wb.register(ReactorNotify::new(4, 8));
1250        wb.register(DeferredRemovals::default());
1251        let mut world = wb.build();
1252        let nid = world.id::<ReactorNotify>();
1253
1254        let mut system = ReactorSystem::new(&world);
1255
1256        struct Ctx {
1257            _reactor_id: Token,
1258            value: u64,
1259        }
1260
1261        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1262            *out += ctx.value;
1263        }
1264
1265        // Register source and first reactor at setup
1266        let src = {
1267            let reg = world.registry();
1268            let notify = notify_mut(&world, nid);
1269            let src = notify.register_source();
1270            notify
1271                .register(
1272                    |t| Ctx {
1273                        _reactor_id: t,
1274                        value: 10,
1275                    },
1276                    step,
1277                    reg,
1278                )
1279                .subscribe(src);
1280            src
1281        };
1282
1283        // Frame 1 — one reactor
1284        notify_mut(&world, nid).mark(src);
1285        system.dispatch(&mut world);
1286        assert_eq!(*world.resource::<u64>(), 10);
1287
1288        // Runtime registration (simulates admin command handler)
1289        {
1290            let reg = world.registry();
1291            notify_mut(&world, nid)
1292                .register(
1293                    |t| Ctx {
1294                        _reactor_id: t,
1295                        value: 100,
1296                    },
1297                    step,
1298                    reg,
1299                )
1300                .subscribe(src);
1301        }
1302
1303        // Frame 2 — both reactors fire
1304        notify_mut(&world, nid).mark(src);
1305        system.dispatch(&mut world);
1306        assert_eq!(*world.resource::<u64>(), 120); // 10 + 10 + 100
1307    }
1308
1309    #[test]
1310    fn register_after_remove_reuses_key() {
1311        let mut wb = WorldBuilder::new();
1312        wb.register::<u64>(0);
1313        wb.register(ReactorNotify::new(4, 8));
1314        wb.register(DeferredRemovals::default());
1315        let mut world = wb.build();
1316        let nid = world.id::<ReactorNotify>();
1317
1318        let mut system = ReactorSystem::new(&world);
1319
1320        struct Ctx {
1321            reactor_id: Token,
1322            value: u64,
1323        }
1324
1325        fn step(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1326            *out += ctx.value;
1327            if ctx.value == 10 {
1328                removals.deregister(ctx.reactor_id);
1329            }
1330        }
1331
1332        let src = {
1333            let reg = world.registry();
1334            let notify = notify_mut(&world, nid);
1335            let src = notify.register_source();
1336            notify
1337                .register(
1338                    |t| Ctx {
1339                        reactor_id: t,
1340                        value: 10,
1341                    },
1342                    step,
1343                    reg,
1344                )
1345                .subscribe(src);
1346            src
1347        };
1348
1349        // Frame 1 — reactor runs and deregisters itself
1350        notify_mut(&world, nid).mark(src);
1351        system.dispatch(&mut world);
1352        assert_eq!(*world.resource::<u64>(), 10);
1353        assert_eq!(system.reactor_count(&world), 0);
1354
1355        // Register a NEW reactor — should reuse key 0
1356        {
1357            let reg = world.registry();
1358            let notify = notify_mut(&world, nid);
1359            let token = notify
1360                .register(
1361                    |t| Ctx {
1362                        reactor_id: t,
1363                        value: 100,
1364                    },
1365                    step,
1366                    reg,
1367                )
1368                .token();
1369            notify.subscribe(token, src);
1370            assert_eq!(token.index(), 0); // slab reused key 0
1371        }
1372
1373        // Frame 2 — new reactor runs correctly
1374        notify_mut(&world, nid).mark(src);
1375        system.dispatch(&mut world);
1376        assert_eq!(*world.resource::<u64>(), 110); // 10 + 100
1377        assert_eq!(system.reactor_count(&world), 1); // still alive (value != 10)
1378    }
1379
1380    #[test]
1381    fn reactor_can_access_actor_notify() {
1382        // Verify no aliasing UB: the move-out-move-back pattern
1383        // allows reactors to safely access ReactorNotify via ResMut.
1384        let mut wb = WorldBuilder::new();
1385        wb.register::<u64>(0);
1386        wb.register(ReactorNotify::new(4, 8));
1387        wb.register(DeferredRemovals::default());
1388        let mut world = wb.build();
1389        let nid = world.id::<ReactorNotify>();
1390
1391        let mut system = ReactorSystem::new(&world);
1392
1393        struct Ctx {
1394            _reactor_id: Token,
1395        }
1396
1397        fn step(_ctx: &mut Ctx, notify: ResMut<ReactorNotify>, mut out: ResMut<u64>) {
1398            // Reactor reads ReactorNotify — this would be UB without move-out
1399            *out = notify.reactor_count() as u64;
1400        }
1401
1402        let src = {
1403            let reg = world.registry();
1404            let notify = notify_mut(&world, nid);
1405            let src = notify.register_source();
1406            notify
1407                .register(|t| Ctx { _reactor_id: t }, step, reg)
1408                .subscribe(src);
1409            src
1410        };
1411
1412        notify_mut(&world, nid).mark(src);
1413        system.dispatch(&mut world);
1414        // Reactor count is 1, but during run() the reactor was taken out,
1415        // so reactor_count() sees 0 reactors with Some values... actually
1416        // slab.len() still counts the slot as occupied. The Option is
1417        // None but the slab entry exists. Let's just verify no panic.
1418        // The important thing is no aliasing UB.
1419    }
1420
1421    // -- Realistic data source patterns ---------------------------------------
1422
1423    #[test]
1424    fn multi_instrument_with_shared_source() {
1425        // Pattern: per-instrument market data sources + a shared "positions"
1426        // source. Quoting reactors subscribe to their instrument + positions.
1427        let mut wb = WorldBuilder::new();
1428        wb.register::<u64>(0);
1429        wb.register(ReactorNotify::new(8, 16));
1430        wb.register(DeferredRemovals::default());
1431        let mut world = wb.build();
1432        let nid = world.id::<ReactorNotify>();
1433        let mut system = ReactorSystem::new(&world);
1434
1435        struct Ctx {
1436            _reactor_id: Token,
1437            instrument: &'static str,
1438        }
1439
1440        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1441            // Each reactor increments by its instrument's "value"
1442            *out += match ctx.instrument {
1443                "BTC" => 100,
1444                "ETH" => 10,
1445                "SOL" => 1,
1446                _ => 0,
1447            };
1448        }
1449
1450        let (btc_md, eth_md, sol_md, positions) = {
1451            let reg = world.registry();
1452            let notify = notify_mut(&world, nid);
1453
1454            // Per-instrument data sources
1455            let btc_md = notify.register_source();
1456            let eth_md = notify.register_source();
1457            let sol_md = notify.register_source();
1458            // Shared source
1459            let positions = notify.register_source();
1460
1461            // BTC reactor: subscribes to btc_md + positions
1462            notify
1463                .register(
1464                    |t| Ctx {
1465                        _reactor_id: t,
1466                        instrument: "BTC",
1467                    },
1468                    step,
1469                    reg,
1470                )
1471                .subscribe(btc_md)
1472                .subscribe(positions);
1473
1474            // ETH reactor: subscribes to eth_md + positions
1475            notify
1476                .register(
1477                    |t| Ctx {
1478                        _reactor_id: t,
1479                        instrument: "ETH",
1480                    },
1481                    step,
1482                    reg,
1483                )
1484                .subscribe(eth_md)
1485                .subscribe(positions);
1486
1487            // SOL reactor: subscribes to sol_md + positions
1488            notify
1489                .register(
1490                    |t| Ctx {
1491                        _reactor_id: t,
1492                        instrument: "SOL",
1493                    },
1494                    step,
1495                    reg,
1496                )
1497                .subscribe(sol_md)
1498                .subscribe(positions);
1499
1500            (btc_md, eth_md, sol_md, positions)
1501        };
1502
1503        // Only BTC data changes — only BTC reactor wakes
1504        notify_mut(&world, nid).mark(btc_md);
1505        system.dispatch(&mut world);
1506        assert_eq!(*world.resource::<u64>(), 100);
1507
1508        // Position update — ALL reactors wake (shared source), deduped
1509        notify_mut(&world, nid).mark(positions);
1510        system.dispatch(&mut world);
1511        assert_eq!(*world.resource::<u64>(), 211); // 100 + 100 + 10 + 1
1512
1513        // BTC + ETH data change in same frame — both reactors wake once
1514        notify_mut(&world, nid).mark(btc_md);
1515        notify_mut(&world, nid).mark(eth_md);
1516        system.dispatch(&mut world);
1517        assert_eq!(*world.resource::<u64>(), 321); // 211 + 100 + 10
1518
1519        // BTC data + position in same frame — BTC reactor wakes ONCE (dedup)
1520        notify_mut(&world, nid).mark(btc_md);
1521        notify_mut(&world, nid).mark(positions);
1522        system.dispatch(&mut world);
1523        // BTC: 100 (deduped, only once), ETH: 10, SOL: 1
1524        assert_eq!(*world.resource::<u64>(), 432); // 321 + 100 + 10 + 1
1525
1526        // Nothing fires — no reactors wake
1527        let ran = system.dispatch(&mut world);
1528        assert!(!ran);
1529        assert_eq!(*world.resource::<u64>(), 432);
1530
1531        // SOL data only — only SOL reactor
1532        notify_mut(&world, nid).mark(sol_md);
1533        system.dispatch(&mut world);
1534        assert_eq!(*world.resource::<u64>(), 433);
1535    }
1536
1537    #[test]
1538    fn per_reactor_fill_routing() {
1539        // Pattern: each reactor gets its own DataSource for fill routing.
1540        // Wire protocol embeds the token index in the order client_id.
1541        // When a fill arrives, the handler marks the reactor's source directly.
1542        use std::collections::HashMap;
1543
1544        let mut wb = WorldBuilder::new();
1545        wb.register::<u64>(0);
1546        wb.register(ReactorNotify::new(8, 16));
1547        wb.register(DeferredRemovals::default());
1548        let mut world = wb.build();
1549        let nid = world.id::<ReactorNotify>();
1550        let mut system = ReactorSystem::new(&world);
1551
1552        struct Ctx {
1553            reactor_id: Token,
1554        }
1555
1556        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1557            *out += ctx.reactor_id.index() as u64 + 1;
1558        }
1559
1560        // Routing table: token index → reactor's fill source
1561        let mut fill_sources: HashMap<usize, DataSource> = HashMap::new();
1562
1563        {
1564            let reg = world.registry();
1565            let notify = notify_mut(&world, nid);
1566
1567            for _ in 0..3 {
1568                // Each reactor gets its own fill source
1569                let fill_src = notify.register_source();
1570                let token = notify
1571                    .register(|t| Ctx { reactor_id: t }, step, reg)
1572                    .subscribe(fill_src)
1573                    .token();
1574
1575                fill_sources.insert(token.index(), fill_src);
1576            }
1577        }
1578
1579        // Simulate fill arriving for reactor 1 — look up its source by wire ID
1580        let wire_client_id: usize = 1;
1581        let fill_source = fill_sources[&wire_client_id];
1582        notify_mut(&world, nid).mark(fill_source);
1583        system.dispatch(&mut world);
1584        // Only reactor 1 ran: token.index()=1, so += 2
1585        assert_eq!(*world.resource::<u64>(), 2);
1586
1587        // Fill for reactor 0
1588        let fill_source = fill_sources[&0];
1589        notify_mut(&world, nid).mark(fill_source);
1590        system.dispatch(&mut world);
1591        // Reactor 0: token.index()=0, += 1
1592        assert_eq!(*world.resource::<u64>(), 3);
1593    }
1594
1595    #[test]
1596    fn dynamic_source_registration() {
1597        // Pattern: data sources added at runtime when new instruments
1598        // come online. Reactors registered and subscribed dynamically.
1599        let mut wb = WorldBuilder::new();
1600        wb.register::<u64>(0);
1601        wb.register(ReactorNotify::new(4, 8));
1602        wb.register(DeferredRemovals::default());
1603        let mut world = wb.build();
1604        let nid = world.id::<ReactorNotify>();
1605        let mut system = ReactorSystem::new(&world);
1606
1607        struct Ctx {
1608            _reactor_id: Token,
1609            value: u64,
1610        }
1611
1612        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1613            *out += ctx.value;
1614        }
1615
1616        // Start with just BTC
1617        let btc_md = {
1618            let reg = world.registry();
1619            let notify = notify_mut(&world, nid);
1620            let btc_md = notify.register_source();
1621            notify
1622                .register(
1623                    |t| Ctx {
1624                        _reactor_id: t,
1625                        value: 10,
1626                    },
1627                    step,
1628                    reg,
1629                )
1630                .subscribe(btc_md);
1631            btc_md
1632        };
1633
1634        notify_mut(&world, nid).mark(btc_md);
1635        system.dispatch(&mut world);
1636        assert_eq!(*world.resource::<u64>(), 10);
1637
1638        // Runtime: new instrument comes online — register source + reactor
1639        let eth_md = {
1640            let reg = world.registry();
1641            let notify = notify_mut(&world, nid);
1642            let eth_md = notify.register_source();
1643            notify
1644                .register(
1645                    |t| Ctx {
1646                        _reactor_id: t,
1647                        value: 100,
1648                    },
1649                    step,
1650                    reg,
1651                )
1652                .subscribe(eth_md);
1653            eth_md
1654        };
1655
1656        // Both instruments fire
1657        notify_mut(&world, nid).mark(btc_md);
1658        notify_mut(&world, nid).mark(eth_md);
1659        system.dispatch(&mut world);
1660        assert_eq!(*world.resource::<u64>(), 120); // 10 + 10 + 100
1661    }
1662
1663    // -- Source removal + slab reuse ------------------------------------------
1664
1665    #[test]
1666    fn remove_source_and_reuse_slot() {
1667        let mut wb = WorldBuilder::new();
1668        wb.register::<u64>(0);
1669        wb.register(ReactorNotify::new(4, 8));
1670        wb.register(DeferredRemovals::default());
1671        let mut world = wb.build();
1672        let nid = world.id::<ReactorNotify>();
1673        let mut system = ReactorSystem::new(&world);
1674
1675        struct Ctx {
1676            _reactor_id: Token,
1677            value: u64,
1678        }
1679
1680        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1681            *out += ctx.value;
1682        }
1683
1684        // Register two sources
1685        let (src_a, src_b) = {
1686            let reg = world.registry();
1687            let notify = notify_mut(&world, nid);
1688            let src_a = notify.register_source();
1689            let src_b = notify.register_source();
1690            notify
1691                .register(
1692                    |t| Ctx {
1693                        _reactor_id: t,
1694                        value: 10,
1695                    },
1696                    step,
1697                    reg,
1698                )
1699                .subscribe(src_a);
1700            notify
1701                .register(
1702                    |t| Ctx {
1703                        _reactor_id: t,
1704                        value: 100,
1705                    },
1706                    step,
1707                    reg,
1708                )
1709                .subscribe(src_b);
1710            (src_a, src_b)
1711        };
1712
1713        // Remove source A
1714        notify_mut(&world, nid).remove_source(src_a);
1715
1716        // Marking removed source is a no-op
1717        notify_mut(&world, nid).mark(src_a);
1718        let ran = system.dispatch(&mut world);
1719        assert!(!ran);
1720
1721        // Source B still works
1722        notify_mut(&world, nid).mark(src_b);
1723        system.dispatch(&mut world);
1724        assert_eq!(*world.resource::<u64>(), 100);
1725
1726        // Register a new source — should reuse slot 0
1727        let src_c = notify_mut(&world, nid).register_source();
1728        assert_eq!(src_c.0, src_a.0); // slab reused the slot
1729
1730        // Subscribe reactor to new source and verify it works
1731        let reg = world.registry();
1732        let notify = notify_mut(&world, nid);
1733        notify
1734            .register(
1735                |t| Ctx {
1736                    _reactor_id: t,
1737                    value: 1,
1738                },
1739                step,
1740                reg,
1741            )
1742            .subscribe(src_c);
1743
1744        notify_mut(&world, nid).mark(src_c);
1745        system.dispatch(&mut world);
1746        assert_eq!(*world.resource::<u64>(), 101); // 100 + 1
1747    }
1748
1749    // -- SourceRegistry -------------------------------------------------------
1750
1751    #[test]
1752    fn source_registry_basic() {
1753        let mut registry = SourceRegistry::new();
1754
1755        #[derive(Hash, Eq, PartialEq, Debug)]
1756        struct InstrumentId(u32);
1757
1758        let src_a = DataSource(0);
1759        let src_b = DataSource(1);
1760
1761        registry.insert(InstrumentId(1), src_a);
1762        registry.insert(InstrumentId(2), src_b);
1763
1764        assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1765        assert_eq!(registry.get(&InstrumentId(2)), Some(src_b));
1766        assert_eq!(registry.get(&InstrumentId(3)), None);
1767        assert!(registry.contains(&InstrumentId(1)));
1768        assert!(!registry.contains(&InstrumentId(3)));
1769    }
1770
1771    #[test]
1772    fn source_registry_multiple_key_types() {
1773        let mut registry = SourceRegistry::new();
1774
1775        #[derive(Hash, Eq, PartialEq)]
1776        struct InstrumentId(u32);
1777
1778        #[derive(Hash, Eq, PartialEq)]
1779        struct StrategyId(u32);
1780
1781        let src_a = DataSource(0);
1782        let src_b = DataSource(1);
1783
1784        // Same registry, different key types
1785        registry.insert(InstrumentId(1), src_a);
1786        registry.insert(StrategyId(1), src_b);
1787
1788        // Different type namespaces — same inner value (1), different results
1789        assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1790        assert_eq!(registry.get(&StrategyId(1)), Some(src_b));
1791    }
1792
1793    #[test]
1794    fn source_registry_tuple_keys() {
1795        let mut registry = SourceRegistry::new();
1796
1797        let src = DataSource(42);
1798        registry.insert(("BTC", "Binance"), src);
1799
1800        assert_eq!(registry.get(&("BTC", "Binance")), Some(src));
1801        assert_eq!(registry.get(&("ETH", "Binance")), None);
1802    }
1803
1804    #[test]
1805    fn source_registry_remove() {
1806        let mut registry = SourceRegistry::new();
1807
1808        let src = DataSource(0);
1809        registry.insert(42u64, src);
1810
1811        assert_eq!(registry.remove(&42u64), Some(src));
1812        assert_eq!(registry.get(&42u64), None);
1813        assert_eq!(registry.remove(&42u64), None); // already gone
1814    }
1815
1816    #[test]
1817    fn source_registry_integrated_with_reactor_system() {
1818        let mut wb = WorldBuilder::new();
1819        wb.register::<u64>(0);
1820        wb.register(ReactorNotify::new(4, 8));
1821        wb.register(DeferredRemovals::default());
1822        wb.register(SourceRegistry::new());
1823        let mut world = wb.build();
1824        let nid = world.id::<ReactorNotify>();
1825        let mut system = ReactorSystem::new(&world);
1826
1827        #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1828        struct Instrument(u32);
1829        const BTC: Instrument = Instrument(0);
1830        const ETH: Instrument = Instrument(1);
1831
1832        struct Ctx {
1833            _reactor_id: Token,
1834            value: u64,
1835        }
1836
1837        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1838            *out += ctx.value;
1839        }
1840
1841        // Setup: register sources and map to natural keys
1842        let btc_src = notify_mut(&world, nid).register_source();
1843        let eth_src = notify_mut(&world, nid).register_source();
1844
1845        world.resource_mut::<SourceRegistry>().insert(BTC, btc_src);
1846        world.resource_mut::<SourceRegistry>().insert(ETH, eth_src);
1847
1848        // Register reactors using natural key lookup
1849        {
1850            let reg = world.registry();
1851            let btc = world.resource::<SourceRegistry>().get(&BTC).unwrap();
1852            let notify = notify_mut(&world, nid);
1853            notify
1854                .register(
1855                    |t| Ctx {
1856                        _reactor_id: t,
1857                        value: 10,
1858                    },
1859                    step,
1860                    reg,
1861                )
1862                .subscribe(btc);
1863        }
1864
1865        // Mark via pre-resolved DataSource
1866        notify_mut(&world, nid).mark(btc_src);
1867        system.dispatch(&mut world);
1868        assert_eq!(*world.resource::<u64>(), 10);
1869
1870        // Delist BTC — remove from both registry and notify
1871        let removed = world.resource_mut::<SourceRegistry>().remove(&BTC);
1872        assert!(removed.is_some());
1873        notify_mut(&world, nid).remove_source(removed.unwrap());
1874
1875        // Marking BTC is now a no-op
1876        notify_mut(&world, nid).mark(btc_src);
1877        let ran = system.dispatch(&mut world);
1878        assert!(!ran);
1879    }
1880
1881    // -- SourceRegistry edge cases --------------------------------------------
1882
1883    #[test]
1884    fn source_registry_overwrite_key() {
1885        let mut registry = SourceRegistry::new();
1886        let src_a = DataSource(0);
1887        let src_b = DataSource(1);
1888
1889        registry.insert(42u32, src_a);
1890        assert_eq!(registry.get(&42u32), Some(src_a));
1891
1892        // Overwrite same key with different source
1893        registry.insert(42u32, src_b);
1894        assert_eq!(registry.get(&42u32), Some(src_b));
1895    }
1896
1897    #[test]
1898    fn source_registry_empty_get() {
1899        let registry = SourceRegistry::new();
1900        // No key type has ever been registered
1901        assert_eq!(registry.get(&42u32), None);
1902        assert!(!registry.contains(&42u32));
1903    }
1904
1905    #[test]
1906    fn source_registry_enum_keys() {
1907        #[derive(Hash, Eq, PartialEq)]
1908        enum Venue {
1909            Binance,
1910            Coinbase,
1911        }
1912
1913        let mut registry = SourceRegistry::new();
1914        let src = DataSource(0);
1915        registry.insert(Venue::Binance, src);
1916
1917        assert_eq!(registry.get(&Venue::Binance), Some(src));
1918        assert_eq!(registry.get(&Venue::Coinbase), None);
1919    }
1920
1921    #[test]
1922    fn source_registry_composite_key() {
1923        // (Strategy, Instrument, Venue) triple as key
1924        #[derive(Hash, Eq, PartialEq)]
1925        struct StrategyId(u32);
1926        #[derive(Hash, Eq, PartialEq)]
1927        struct InstrumentId(u32);
1928        #[derive(Hash, Eq, PartialEq)]
1929        struct VenueId(u32);
1930
1931        let mut registry = SourceRegistry::new();
1932        let src = DataSource(5);
1933        registry.insert((StrategyId(1), InstrumentId(0), VenueId(2)), src);
1934
1935        assert_eq!(
1936            registry.get(&(StrategyId(1), InstrumentId(0), VenueId(2))),
1937            Some(src)
1938        );
1939        // Different strategy
1940        assert_eq!(
1941            registry.get(&(StrategyId(2), InstrumentId(0), VenueId(2))),
1942            None
1943        );
1944    }
1945
1946    // -- Full lifecycle scenarios ---------------------------------------------
1947
1948    #[test]
1949    fn full_lifecycle_add_trade_remove() {
1950        // Simulates: add instrument → reactors trade → delist → cleanup
1951        let mut wb = WorldBuilder::new();
1952        wb.register::<u64>(0);
1953        wb.register(ReactorNotify::new(4, 8));
1954        wb.register(DeferredRemovals::default());
1955        wb.register(SourceRegistry::new());
1956        let mut world = wb.build();
1957        let nid = world.id::<ReactorNotify>();
1958        let mut system = ReactorSystem::new(&world);
1959
1960        #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1961        struct Instrument(u32);
1962
1963        struct Ctx {
1964            _reactor_id: Token,
1965            value: u64,
1966        }
1967
1968        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1969            *out += ctx.value;
1970        }
1971
1972        // Phase 1: Add BTC
1973        let btc_src = notify_mut(&world, nid).register_source();
1974        world
1975            .resource_mut::<SourceRegistry>()
1976            .insert(Instrument(0), btc_src);
1977
1978        {
1979            let reg = world.registry();
1980            let notify = notify_mut(&world, nid);
1981            notify
1982                .register(
1983                    |t| Ctx {
1984                        _reactor_id: t,
1985                        value: 10,
1986                    },
1987                    step,
1988                    reg,
1989                )
1990                .subscribe(btc_src);
1991        }
1992
1993        // Phase 2: Trade
1994        notify_mut(&world, nid).mark(btc_src);
1995        system.dispatch(&mut world);
1996        assert_eq!(*world.resource::<u64>(), 10);
1997
1998        // Phase 3: Add ETH dynamically
1999        let eth_src = notify_mut(&world, nid).register_source();
2000        world
2001            .resource_mut::<SourceRegistry>()
2002            .insert(Instrument(1), eth_src);
2003
2004        {
2005            let reg = world.registry();
2006            let notify = notify_mut(&world, nid);
2007            notify
2008                .register(
2009                    |t| Ctx {
2010                        _reactor_id: t,
2011                        value: 100,
2012                    },
2013                    step,
2014                    reg,
2015                )
2016                .subscribe(eth_src);
2017        }
2018
2019        // Both trade
2020        notify_mut(&world, nid).mark(btc_src);
2021        notify_mut(&world, nid).mark(eth_src);
2022        system.dispatch(&mut world);
2023        assert_eq!(*world.resource::<u64>(), 120);
2024
2025        // Phase 4: Delist BTC
2026        let removed = world
2027            .resource_mut::<SourceRegistry>()
2028            .remove(&Instrument(0));
2029        notify_mut(&world, nid).remove_source(removed.unwrap());
2030
2031        // Only ETH remains
2032        notify_mut(&world, nid).mark(eth_src);
2033        system.dispatch(&mut world);
2034        assert_eq!(*world.resource::<u64>(), 220);
2035
2036        // Phase 5: Add SOL — reuses BTC's old slab slot
2037        let sol_src = notify_mut(&world, nid).register_source();
2038        world
2039            .resource_mut::<SourceRegistry>()
2040            .insert(Instrument(2), sol_src);
2041        assert_eq!(sol_src.0, btc_src.0); // slab reused
2042
2043        {
2044            let reg = world.registry();
2045            let notify = notify_mut(&world, nid);
2046            notify
2047                .register(
2048                    |t| Ctx {
2049                        _reactor_id: t,
2050                        value: 1000,
2051                    },
2052                    step,
2053                    reg,
2054                )
2055                .subscribe(sol_src);
2056        }
2057
2058        // SOL + ETH fire
2059        notify_mut(&world, nid).mark(sol_src);
2060        notify_mut(&world, nid).mark(eth_src);
2061        system.dispatch(&mut world);
2062        assert_eq!(*world.resource::<u64>(), 1320); // 220 + 1000 + 100
2063    }
2064
2065    #[test]
2066    fn multi_strategy_same_instrument() {
2067        // Two strategies on the same instrument with different data sources
2068        let mut wb = WorldBuilder::new();
2069        wb.register::<u64>(0);
2070        wb.register(ReactorNotify::new(8, 16));
2071        wb.register(DeferredRemovals::default());
2072        wb.register(SourceRegistry::new());
2073        let mut world = wb.build();
2074        let nid = world.id::<ReactorNotify>();
2075        let mut system = ReactorSystem::new(&world);
2076
2077        #[derive(Hash, Eq, PartialEq, Clone, Copy)]
2078        struct StrategyInstrument(&'static str, &'static str);
2079
2080        struct Ctx {
2081            _reactor_id: Token,
2082            value: u64,
2083        }
2084
2085        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2086            *out += ctx.value;
2087        }
2088
2089        // Per-strategy+instrument sources
2090        let reg = world.registry();
2091        let notify = notify_mut(&world, nid);
2092
2093        let mm_btc = notify.register_source();
2094        let mm_eth = notify.register_source();
2095        let arb_btc = notify.register_source();
2096
2097        // Market maker on BTC and ETH
2098        notify
2099            .register(
2100                |t| Ctx {
2101                    _reactor_id: t,
2102                    value: 1,
2103                },
2104                step,
2105                reg,
2106            )
2107            .subscribe(mm_btc);
2108        notify
2109            .register(
2110                |t| Ctx {
2111                    _reactor_id: t,
2112                    value: 2,
2113                },
2114                step,
2115                reg,
2116            )
2117            .subscribe(mm_eth);
2118
2119        // Arb strategy on BTC
2120        notify
2121            .register(
2122                |t| Ctx {
2123                    _reactor_id: t,
2124                    value: 100,
2125                },
2126                step,
2127                reg,
2128            )
2129            .subscribe(arb_btc);
2130
2131        // Map composite keys
2132        world
2133            .resource_mut::<SourceRegistry>()
2134            .insert(StrategyInstrument("MM", "BTC"), mm_btc);
2135        world
2136            .resource_mut::<SourceRegistry>()
2137            .insert(StrategyInstrument("MM", "ETH"), mm_eth);
2138        world
2139            .resource_mut::<SourceRegistry>()
2140            .insert(StrategyInstrument("ARB", "BTC"), arb_btc);
2141
2142        // BTC data arrives — both MM-BTC and ARB-BTC should fire
2143        // But they're separate sources, so handler marks both:
2144        let mm_btc_src = world
2145            .resource::<SourceRegistry>()
2146            .get(&StrategyInstrument("MM", "BTC"))
2147            .unwrap();
2148        let arb_btc_src = world
2149            .resource::<SourceRegistry>()
2150            .get(&StrategyInstrument("ARB", "BTC"))
2151            .unwrap();
2152
2153        notify_mut(&world, nid).mark(mm_btc_src);
2154        notify_mut(&world, nid).mark(arb_btc_src);
2155        system.dispatch(&mut world);
2156        assert_eq!(*world.resource::<u64>(), 101); // 1 + 100
2157    }
2158
2159    #[test]
2160    fn reactor_self_removal_with_registry_cleanup() {
2161        // Reactor deregisters itself AND the handler cleans up the source
2162        let mut wb = WorldBuilder::new();
2163        wb.register::<u64>(0);
2164        wb.register(ReactorNotify::new(4, 8));
2165        wb.register(DeferredRemovals::default());
2166        wb.register(SourceRegistry::new());
2167        let mut world = wb.build();
2168        let nid = world.id::<ReactorNotify>();
2169        let mut system = ReactorSystem::new(&world);
2170
2171        struct Ctx {
2172            reactor_id: Token,
2173        }
2174
2175        fn one_shot(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
2176            *out += 1;
2177            removals.deregister(ctx.reactor_id);
2178        }
2179
2180        let src = notify_mut(&world, nid).register_source();
2181        world
2182            .resource_mut::<SourceRegistry>()
2183            .insert("one-shot", src);
2184
2185        {
2186            let reg = world.registry();
2187            let notify = notify_mut(&world, nid);
2188            notify
2189                .register(|t| Ctx { reactor_id: t }, one_shot, reg)
2190                .subscribe(src);
2191        }
2192
2193        // Reactor runs once and removes itself
2194        notify_mut(&world, nid).mark(src);
2195        system.dispatch(&mut world);
2196        assert_eq!(*world.resource::<u64>(), 1);
2197        assert_eq!(system.reactor_count(&world), 0);
2198
2199        // Source still exists in registry but no reactors subscribe
2200        assert!(world.resource::<SourceRegistry>().contains(&"one-shot"));
2201
2202        // Mark again — no reactors wake
2203        notify_mut(&world, nid).mark(src);
2204        let ran = system.dispatch(&mut world);
2205        assert!(!ran);
2206    }
2207
2208    #[test]
2209    fn many_reactors_same_source() {
2210        // 50 reactors all subscribed to one source — all wake, deduped
2211        let mut wb = WorldBuilder::new();
2212        wb.register::<u64>(0);
2213        wb.register(ReactorNotify::new(4, 64));
2214        wb.register(DeferredRemovals::default());
2215        let mut world = wb.build();
2216        let nid = world.id::<ReactorNotify>();
2217        let mut system = ReactorSystem::new(&world);
2218
2219        struct Ctx {
2220            _reactor_id: Token,
2221        }
2222
2223        fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2224            *out += 1;
2225        }
2226
2227        let src = notify_mut(&world, nid).register_source();
2228
2229        {
2230            let reg = world.registry();
2231            let notify = notify_mut(&world, nid);
2232            for _ in 0..50 {
2233                notify
2234                    .register(|t| Ctx { _reactor_id: t }, step, reg)
2235                    .subscribe(src);
2236            }
2237        }
2238
2239        assert_eq!(system.reactor_count(&world), 50);
2240
2241        notify_mut(&world, nid).mark(src);
2242        system.dispatch(&mut world);
2243        assert_eq!(*world.resource::<u64>(), 50); // all 50 ran exactly once
2244    }
2245
2246    #[test]
2247    fn reactor_subscribes_to_multiple_sources() {
2248        // One reactor subscribed to 5 different sources.
2249        // All 5 fire in one frame — reactor runs exactly once.
2250        let mut wb = WorldBuilder::new();
2251        wb.register::<u64>(0);
2252        wb.register(ReactorNotify::new(8, 8));
2253        wb.register(DeferredRemovals::default());
2254        let mut world = wb.build();
2255        let nid = world.id::<ReactorNotify>();
2256        let mut system = ReactorSystem::new(&world);
2257
2258        struct Ctx {
2259            _reactor_id: Token,
2260        }
2261
2262        fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2263            *out += 1;
2264        }
2265
2266        let mut sources = Vec::new();
2267        let notify = notify_mut(&world, nid);
2268        for _ in 0..5 {
2269            sources.push(notify.register_source());
2270        }
2271
2272        {
2273            let reg = world.registry();
2274            let notify = notify_mut(&world, nid);
2275            let mut registration = notify.register(|t| Ctx { _reactor_id: t }, step, reg);
2276            for &src in &sources {
2277                registration = registration.subscribe(src);
2278            }
2279        }
2280
2281        // Mark all 5 sources
2282        for &src in &sources {
2283            notify_mut(&world, nid).mark(src);
2284        }
2285
2286        system.dispatch(&mut world);
2287        assert_eq!(*world.resource::<u64>(), 1); // ONE run despite 5 sources
2288    }
2289
2290    #[test]
2291    fn stale_data_source_is_noop() {
2292        // After removing a source, marking it must not panic
2293        let mut wb = WorldBuilder::new();
2294        wb.register(ReactorNotify::new(4, 4));
2295        wb.register(DeferredRemovals::default());
2296        let mut world = wb.build();
2297        let nid = world.id::<ReactorNotify>();
2298        let mut system = ReactorSystem::new(&world);
2299
2300        let src = notify_mut(&world, nid).register_source();
2301        notify_mut(&world, nid).remove_source(src);
2302
2303        // Must not panic
2304        notify_mut(&world, nid).mark(src);
2305        let ran = system.dispatch(&mut world);
2306        assert!(!ran);
2307    }
2308
2309    #[test]
2310    fn double_remove_source_is_noop() {
2311        let mut notify = ReactorNotify::new(4, 4);
2312        let src = notify.register_source();
2313        notify.remove_source(src);
2314        notify.remove_source(src); // must not panic
2315    }
2316
2317    // -- PipelineReactor: reactor body is a CtxPipeline ----------------------------
2318
2319    #[test]
2320    fn pipeline_reactor_dispatch() {
2321        use crate::CtxPipelineBuilder;
2322
2323        let mut wb = WorldBuilder::new();
2324        wb.register::<u64>(0);
2325        wb.register(ReactorNotify::new(4, 8));
2326        wb.register(DeferredRemovals::default());
2327        let mut world = wb.build();
2328        let nid = world.id::<ReactorNotify>();
2329        let mut system = ReactorSystem::new(&world);
2330
2331        struct Ctx {
2332            _reactor_id: Token,
2333            instrument: &'static str,
2334        }
2335
2336        fn read_data(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2337            let _ = ctx.instrument;
2338            *val
2339        }
2340
2341        fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2342            x * 2
2343        }
2344
2345        fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2346            *out = x;
2347        }
2348
2349        let reg = world.registry();
2350
2351        let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2352            .then(read_data, reg)
2353            .then(double, reg)
2354            .then(store, reg)
2355            .build();
2356
2357        let notify = notify_mut(&world, nid);
2358        let src = notify.register_source();
2359
2360        // Wrap pipeline in PipelineReactor
2361        let reactor = PipelineReactor::new(
2362            Ctx {
2363                _reactor_id: Token::new(0),
2364                instrument: "BTC",
2365            },
2366            pipeline,
2367        );
2368        notify.register_built(reactor).subscribe(src);
2369
2370        // Set initial value and dispatch
2371        *world.resource_mut::<u64>() = 10;
2372        notify_mut(&world, nid).mark(src);
2373        system.dispatch(&mut world);
2374
2375        assert_eq!(*world.resource::<u64>(), 20); // 10 * 2
2376    }
2377
2378    #[test]
2379    fn dag_reactor_dispatch() {
2380        use crate::CtxDagBuilder;
2381
2382        let mut wb = WorldBuilder::new();
2383        wb.register::<u64>(0);
2384        wb.register(ReactorNotify::new(4, 8));
2385        wb.register(DeferredRemovals::default());
2386        let mut world = wb.build();
2387        let nid = world.id::<ReactorNotify>();
2388        let mut system = ReactorSystem::new(&world);
2389
2390        struct Ctx {
2391            _reactor_id: Token,
2392        }
2393
2394        fn root(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2395            let _ = ctx;
2396            *val
2397        }
2398
2399        fn arm_double(_ctx: &mut Ctx, val: &u64) -> u64 {
2400            *val * 2
2401        }
2402
2403        fn arm_add(_ctx: &mut Ctx, val: &u64) -> u64 {
2404            *val + 10
2405        }
2406
2407        fn merge(_ctx: &mut Ctx, mut out: ResMut<u64>, a: &u64, b: &u64) {
2408            *out = *a + *b;
2409        }
2410
2411        let reg = world.registry();
2412
2413        let dag = CtxDagBuilder::<Ctx, ()>::new()
2414            .root(root, reg)
2415            .fork()
2416            .arm(|seed| seed.then(arm_double, reg))
2417            .arm(|seed| seed.then(arm_add, reg))
2418            .merge(merge, reg)
2419            .build();
2420
2421        let notify = notify_mut(&world, nid);
2422        let src = notify.register_source();
2423
2424        let reactor = PipelineReactor::new(
2425            Ctx {
2426                _reactor_id: Token::new(0),
2427            },
2428            dag,
2429        );
2430        notify.register_built(reactor).subscribe(src);
2431
2432        *world.resource_mut::<u64>() = 5;
2433        notify_mut(&world, nid).mark(src);
2434        system.dispatch(&mut world);
2435
2436        // (5 * 2) + (5 + 10) = 10 + 15 = 25
2437        assert_eq!(*world.resource::<u64>(), 25);
2438    }
2439
2440    #[test]
2441    fn multiple_pipeline_reactors_different_bodies() {
2442        use crate::CtxPipelineBuilder;
2443
2444        let mut wb = WorldBuilder::new();
2445        wb.register::<u64>(0);
2446        wb.register(ReactorNotify::new(4, 8));
2447        wb.register(DeferredRemovals::default());
2448        let mut world = wb.build();
2449        let nid = world.id::<ReactorNotify>();
2450        let mut system = ReactorSystem::new(&world);
2451
2452        struct Ctx {
2453            _reactor_id: Token,
2454            factor: u64,
2455        }
2456
2457        fn multiply(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2458            *val * ctx.factor
2459        }
2460
2461        fn accumulate(_ctx: &mut Ctx, mut out: ResMut<u64>, val: u64) {
2462            *out += val;
2463        }
2464
2465        let reg = world.registry();
2466
2467        // Reactor A: multiply by 2
2468        let pipeline_a = CtxPipelineBuilder::<Ctx, ()>::new()
2469            .then(multiply, reg)
2470            .then(accumulate, reg)
2471            .build();
2472
2473        // Reactor B: multiply by 10
2474        let pipeline_b = CtxPipelineBuilder::<Ctx, ()>::new()
2475            .then(multiply, reg)
2476            .then(accumulate, reg)
2477            .build();
2478
2479        let notify = notify_mut(&world, nid);
2480        let src = notify.register_source();
2481
2482        notify
2483            .register_built(PipelineReactor::new(
2484                Ctx {
2485                    _reactor_id: Token::new(0),
2486                    factor: 2,
2487                },
2488                pipeline_a,
2489            ))
2490            .subscribe(src);
2491
2492        notify
2493            .register_built(PipelineReactor::new(
2494                Ctx {
2495                    _reactor_id: Token::new(1),
2496                    factor: 10,
2497                },
2498                pipeline_b,
2499            ))
2500            .subscribe(src);
2501
2502        *world.resource_mut::<u64>() = 5;
2503        notify_mut(&world, nid).mark(src);
2504        system.dispatch(&mut world);
2505
2506        // Reactor A reads 5, adds 5*2=10, so resource=15
2507        // Reactor B reads 15, adds 15*10=150, so resource=165
2508        // (Order depends on dispatch order — both subscribed to same source)
2509        // The value is order-dependent. Just verify both ran:
2510        let val = *world.resource::<u64>();
2511        assert!(val > 5, "both reactors should have run, got {val}");
2512    }
2513
2514    #[test]
2515    fn pipeline_reactor_with_guard() {
2516        use crate::CtxPipelineBuilder;
2517
2518        let mut wb = WorldBuilder::new();
2519        wb.register::<u64>(0);
2520        wb.register(ReactorNotify::new(4, 8));
2521        wb.register(DeferredRemovals::default());
2522        let mut world = wb.build();
2523        let nid = world.id::<ReactorNotify>();
2524        let mut system = ReactorSystem::new(&world);
2525
2526        struct Ctx {
2527            _reactor_id: Token,
2528            threshold: u64,
2529        }
2530
2531        fn read(_ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2532            *val
2533        }
2534
2535        fn above_threshold(ctx: &mut Ctx, val: &u64) -> bool {
2536            *val > ctx.threshold
2537        }
2538
2539        fn write(_ctx: &mut Ctx, mut out: ResMut<u64>, _val: u64) {
2540            *out = 999;
2541        }
2542
2543        let reg = world.registry();
2544
2545        let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2546            .then(read, reg)
2547            .guard(above_threshold, reg)
2548            .map(write, reg)
2549            .build();
2550
2551        let notify = notify_mut(&world, nid);
2552        let src = notify.register_source();
2553
2554        notify
2555            .register_built(PipelineReactor::new(
2556                Ctx {
2557                    _reactor_id: Token::new(0),
2558                    threshold: 10,
2559                },
2560                pipeline,
2561            ))
2562            .subscribe(src);
2563
2564        // Value below threshold — guard blocks
2565        *world.resource_mut::<u64>() = 5;
2566        notify_mut(&world, nid).mark(src);
2567        system.dispatch(&mut world);
2568        assert_eq!(*world.resource::<u64>(), 5); // unchanged
2569
2570        // Value above threshold — guard passes
2571        *world.resource_mut::<u64>() = 20;
2572        notify_mut(&world, nid).mark(src);
2573        system.dispatch(&mut world);
2574        assert_eq!(*world.resource::<u64>(), 999);
2575    }
2576
2577    // -- Two-phase registration (safe API through World) ----------------------
2578
2579    #[test]
2580    fn two_phase_registration_safe_api() {
2581        // Demonstrates the safe API: create_reactor → into_reactor → insert
2582        // No unsafe, no registry borrow conflicts.
2583        let mut wb = WorldBuilder::new();
2584        wb.register::<u64>(0);
2585        wb.register(ReactorNotify::new(4, 8));
2586        wb.register(DeferredRemovals::default());
2587        let mut world = wb.build();
2588
2589        let mut system = ReactorSystem::new(&world);
2590
2591        struct Ctx {
2592            reactor_id: Token,
2593            instrument: &'static str,
2594        }
2595
2596        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2597            let _ = ctx.instrument;
2598            *out += ctx.reactor_id.index() as u64 + 1;
2599        }
2600
2601        // Phase 1: reserve slot
2602        let src = world.resource_mut::<ReactorNotify>().register_source();
2603        let token = world.resource_mut::<ReactorNotify>().create_reactor();
2604
2605        // Phase 2: build reactor with token + registry (no borrow conflict)
2606        let reactor = step.into_reactor(
2607            Ctx {
2608                reactor_id: token,
2609                instrument: "BTC",
2610            },
2611            world.registry(),
2612        );
2613
2614        // Phase 3: insert + subscribe
2615        world
2616            .resource_mut::<ReactorNotify>()
2617            .insert_reactor(token, reactor)
2618            .subscribe(src);
2619
2620        // Verify dispatch
2621        world.resource_mut::<ReactorNotify>().mark(src);
2622        system.dispatch(&mut world);
2623        assert_eq!(*world.resource::<u64>(), 1); // token index 0 + 1
2624
2625        // Second reactor — same pattern
2626        let token2 = world.resource_mut::<ReactorNotify>().create_reactor();
2627        let actor2 = step.into_reactor(
2628            Ctx {
2629                reactor_id: token2,
2630                instrument: "ETH",
2631            },
2632            world.registry(),
2633        );
2634        world
2635            .resource_mut::<ReactorNotify>()
2636            .insert_reactor(token2, actor2)
2637            .subscribe(src);
2638
2639        // Both reactors fire
2640        world.resource_mut::<ReactorNotify>().mark(src);
2641        system.dispatch(&mut world);
2642        assert_eq!(*world.resource::<u64>(), 4); // 1 + (0+1) + (1+1)
2643    }
2644
2645    #[test]
2646    fn two_phase_with_pipeline_reactor() {
2647        use crate::CtxPipelineBuilder;
2648
2649        let mut wb = WorldBuilder::new();
2650        wb.register::<u64>(0);
2651        wb.register(ReactorNotify::new(4, 8));
2652        wb.register(DeferredRemovals::default());
2653        let mut world = wb.build();
2654
2655        let mut system = ReactorSystem::new(&world);
2656
2657        struct Ctx {
2658            _reactor_id: Token,
2659        }
2660
2661        fn read(ctx: &mut Ctx, val: Res<u64>, _: ()) -> u64 {
2662            let _ = ctx;
2663            *val
2664        }
2665
2666        fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2667            x * 2
2668        }
2669
2670        fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2671            *out = x;
2672        }
2673
2674        // Phase 1: reserve + register source
2675        let src = world.resource_mut::<ReactorNotify>().register_source();
2676        let token = world.resource_mut::<ReactorNotify>().create_reactor();
2677
2678        // Phase 2: build pipeline + wrap in PipelineReactor (needs registry)
2679        let reg = world.registry();
2680        let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2681            .then(read, reg)
2682            .then(double, reg)
2683            .then(store, reg)
2684            .build();
2685        let reactor = PipelineReactor::new(Ctx { _reactor_id: token }, pipeline);
2686
2687        // Phase 3: insert
2688        world
2689            .resource_mut::<ReactorNotify>()
2690            .insert_reactor(token, reactor)
2691            .subscribe(src);
2692
2693        *world.resource_mut::<u64>() = 10;
2694        world.resource_mut::<ReactorNotify>().mark(src);
2695        system.dispatch(&mut world);
2696        assert_eq!(*world.resource::<u64>(), 20); // 10 * 2
2697    }
2698}