Skip to main content

nexus_rt/
reactor.rs

1//! Reactor dispatch system with interest-based notification.
2//!
3//! Reactors are lightweight, per-instance dispatch units — each owns its
4//! own metadata (instrument ID, order ID, algo parameters) and runs a
5//! step function with pre-resolved [`Param`] access when woken.
6//!
7//! # Architecture
8//!
9//! ```text
10//! LocalNotify      (nexus-notify)  — dedup bitset, mark/poll
11//!     ↑
12//! ReactorNotify      (nexus-rt)      — World resource: reactor storage +
13//!                                    data source fan-out + registration
14//! SourceRegistry   (nexus-rt)      — World resource: typed key → DataSource
15//!     ↑
16//! ReactorSystem      (nexus-rt)      — thin dispatch handle (driver level)
17//! ```
18//!
19//! [`SourceRegistry`] maps domain keys (instrument IDs, strategy IDs,
20//! `(Symbol, Venue)` tuples) to [`DataSource`] values for runtime lookup.
21//! Any `Hash + Eq + Send + 'static` type works as a key. All three
22//! resources are auto-registered by [`WorldBuilder::build`](crate::WorldBuilder::build) when the
23//! `reactors` feature is enabled.
24//!
25//! # Use Cases
26//!
27//! ## 1. Market maker with per-instrument quoting reactors
28//!
29//! ```ignore
30//! // Step function — context first, then pre-resolved Params
31//! fn quoting_step(ctx: &mut QuotingCtx, books: Res<OrderBooks>, mut gw: ResMut<Gateway>) {
32//!     let quote = books.compute_quote(ctx.instrument, ctx.layer);
33//!     gw.submit_quote(quote);
34//! }
35//!
36//! // Setup
37//! let notify = world.resource_mut::<ReactorNotify>();
38//! let btc_md = notify.register_source();
39//! let positions = notify.register_source();
40//!
41//! // Map natural keys for runtime lookup
42//! world.resource_mut::<SourceRegistry>().insert(InstrumentId::BTC, btc_md);
43//!
44//! // Register reactor — subscribes to BTC data + positions
45//! notify.register(
46//!     |id| QuotingCtx { reactor_id: id, instrument: InstrumentId::BTC, layer: 1 },
47//!     quoting_step,
48//!     &registry,
49//! )
50//! .subscribe(btc_md)
51//! .subscribe(positions);
52//! ```
53//!
54//! ## 2. TWAP execution algo that self-removes on completion
55//!
56//! ```ignore
57//! fn twap_step(
58//!     ctx: &mut TwapCtx,
59//!     books: Res<OrderBooks>,
60//!     mut gw: ResMut<Gateway>,
61//!     mut removals: ResMut<DeferredRemovals>,
62//! ) {
63//!     gw.submit(ctx.instrument, ctx.slice_size, books.best_ask(ctx.instrument));
64//!     ctx.remaining -= ctx.slice_size;
65//!     if ctx.remaining == 0 {
66//!         removals.deregister(ctx.reactor_id);  // cleaned up after frame
67//!     }
68//! }
69//! ```
70//!
71//! ## 3. Runtime registration from event handlers
72//!
73//! ```ignore
74//! fn on_new_order(
75//!     event: NewOrder,
76//!     mut notify: ResMut<ReactorNotify>,
77//!     sources: Res<SourceRegistry>,
78//! ) {
79//!     let md_source = sources.get(&event.instrument).unwrap();
80//!     notify.register(
81//!         |id| TwapCtx { reactor_id: id, instrument: event.instrument, remaining: event.qty },
82//!         twap_step, &registry,
83//!     ).subscribe(md_source);
84//! }
85//! ```
86//!
87//! ## 4. Order fill routing via wire protocol
88//!
89//! ```ignore
90//! // On submission — embed reactor token in order
91//! fn submit(ctx: &mut Ctx, mut gw: ResMut<Gateway>) {
92//!     gw.submit(Order { client_id: ctx.reactor_id.index(), .. });
93//! }
94//!
95//! // On fill — route back to reactor's data source
96//! fn on_fill(fill: Fill, mut notify: ResMut<ReactorNotify>, sources: Res<SourceRegistry>) {
97//!     if let Some(src) = sources.get(&RoutingKey(fill.client_id)) {
98//!         notify.mark(src);
99//!     }
100//! }
101//! ```
102//!
103//! ## 5. Instrument delisting — cleanup
104//!
105//! ```ignore
106//! fn on_delist(event: Delist, mut notify: ResMut<ReactorNotify>, mut sources: ResMut<SourceRegistry>) {
107//!     if let Some(src) = sources.remove(&event.instrument) {
108//!         notify.remove_source(src);  // frees slab slot for reuse
109//!     }
110//! }
111//! ```
112//!
113//! ## 6. Event handler marking (hot path)
114//!
115//! ```ignore
116//! fn on_btc_tick(event: Tick, mut books: ResMut<OrderBooks>, mut notify: ResMut<ReactorNotify>) {
117//!     books.apply(event);
118//!     notify.mark(btc_md);  // pre-resolved DataSource, O(1), no lookup
119//! }
120//! ```
121
122// Handler arity is architecturally required by the Param trait — handlers
123// take N typed parameters and the macro-generated dispatch impls expand
124// per-arity into call_inner functions with N + Input arguments. Module-level
125// allow rather than one inline attribute per arity expansion.
126#![allow(clippy::too_many_arguments)]
127
128use std::any::{Any, TypeId};
129use std::hash::Hash;
130
131use nexus_notify::local::LocalNotify;
132use nexus_notify::{Events, Token};
133use rustc_hash::FxHashMap;
134
135use crate::ctx_pipeline::CtxStepCall;
136use crate::handler::Param;
137use crate::world::{Registry, Resource, ResourceId, World};
138
139// =============================================================================
140// DataSource — identifies a category of change
141// =============================================================================
142
143/// Identifies a data source (e.g., "BTC market data", "positions").
144///
145/// Registered via [`ReactorNotify::register_source`]. Event handlers
146/// mark data sources via [`ReactorNotify::mark`] to wake subscribed reactors.
147///
148/// # Type-tag intent
149///
150/// `DataSource` is a `usize` newtype with no invariants beyond what the
151/// underlying integer provides. The newtype exists purely to prevent
152/// accidentally passing some other identifier where a `DataSource` is
153/// expected. Don't reduce it to a type alias and don't add invariants
154/// the rest of the crate doesn't enforce.
155#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
156pub struct DataSource(pub usize);
157
158// =============================================================================
159// ReactorNotify — World resource: storage + notification + registration
160// =============================================================================
161
162/// Reactor storage, interest mapping, and notification hub.
163///
164/// Lives in the [`World`] as a resource. Handles:
165/// - Reactor registration and storage (`Box<dyn Reactor>` in a slab)
166/// - Data source registration and interest mapping
167/// - Marking data sources as changed (fan-out + dedup)
168///
169/// Event handlers access this via [`ResMut<ReactorNotify>`](crate::ResMut)
170/// to mark data sources or register new reactors at runtime.
171pub struct ReactorNotify {
172    /// Per-reactor token dedup.
173    notify: LocalNotify,
174
175    /// Data source → reactor tokens subscribed to this source.
176    /// Slab-backed for dynamic add/remove with slot reuse.
177    /// Slab key = `DataSource.0`.
178    interests: slab::Slab<Vec<Token>>,
179
180    /// Reverse index: reactor token → data sources it's subscribed to.
181    /// Enables O(subscriptions) removal instead of O(all_sources × all_subs).
182    reactor_sources: Vec<Vec<DataSource>>,
183
184    /// Reactor storage. Slab key = token index.
185    /// `Option` enables move-out-move-back during dispatch to avoid
186    /// aliasing: the reactor is `take()`n before `run()`, then put back.
187    /// `Option<Box<dyn Reactor>>` is niche-optimized — zero extra bytes.
188    reactors: slab::Slab<Option<Box<dyn Reactor>>>,
189}
190
191// Manual Debug — slab of dyn Reactor can't derive
192impl std::fmt::Debug for ReactorNotify {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("ReactorNotify")
195            .field("num_sources", &self.interests.len())
196            .field("num_reactors", &self.reactors.len())
197            .field("notify", &self.notify)
198            .finish()
199    }
200}
201
202impl ReactorNotify {
203    /// Create with capacity hints for data sources and reactors.
204    pub fn new(source_capacity: usize, reactor_capacity: usize) -> Self {
205        Self {
206            notify: LocalNotify::with_capacity(reactor_capacity),
207            interests: slab::Slab::with_capacity(source_capacity),
208            reactor_sources: Vec::with_capacity(reactor_capacity),
209            reactors: slab::Slab::with_capacity(reactor_capacity),
210        }
211    }
212
213    // ── Data sources ────────────────────────────────────────────────────
214
215    /// Register a new data source. Returns its identifier.
216    ///
217    /// Slab-backed — removed sources' slots are reused.
218    pub fn register_source(&mut self) -> DataSource {
219        DataSource(self.interests.insert(Vec::new()))
220    }
221
222    /// Remove a data source. Unsubscribes all reactors and frees the
223    /// slab slot for reuse.
224    ///
225    /// Marking a removed `DataSource` is a no-op (stale handle safety).
226    pub fn remove_source(&mut self, source: DataSource) {
227        if self.interests.contains(source.0) {
228            self.interests.remove(source.0);
229        }
230    }
231
232    // ── Reactor registration ──────────────────────────────────────────────
233
234    /// Reserve a slot for a new reactor and return its [`Token`].
235    ///
236    /// Use with [`insert_reactor`](Self::insert_reactor) to complete registration.
237    /// This two-phase pattern avoids borrow conflicts: you can
238    /// drop the `&mut ReactorNotify` borrow, build the reactor with
239    /// `world.registry()`, then call `insert`.
240    ///
241    /// # Example
242    ///
243    /// ```ignore
244    /// // Phase 1: reserve slot
245    /// let token = world.resource_mut::<ReactorNotify>().create_reactor();
246    ///
247    /// // Phase 2: build reactor (borrows &World for registry)
248    /// let reactor = quoting_step.into_reactor(
249    ///     QuotingCtx { reactor_id: token, instrument: BTC },
250    ///     world.registry(),
251    /// );
252    ///
253    /// // Phase 3: fill slot
254    /// world.resource_mut::<ReactorNotify>()
255    ///     .insert_reactor(token, reactor)
256    ///     .subscribe(btc_md);
257    /// ```
258    pub fn create_reactor(&mut self) -> Token {
259        // Reserve the slot with a None placeholder so no other registration
260        // can claim the same key between alloc and insert.
261        let key = self.reactors.insert(None);
262        self.notify.ensure_capacity(key);
263        // Grow reverse index to cover this reactor token.
264        if key >= self.reactor_sources.len() {
265            self.reactor_sources.resize_with(key + 1, Vec::new);
266        }
267        Token::new(key)
268    }
269
270    /// Insert a pre-built reactor at a previously allocated [`Token`].
271    ///
272    /// The token must have been returned by [`create_reactor`](Self::create_reactor)
273    /// and not yet filled. Completes the two-phase registration.
274    ///
275    /// # Panics
276    ///
277    /// Panics if the token was not allocated by [`create_reactor`](Self::create_reactor) or was
278    /// already filled.
279    pub fn insert_reactor(
280        &mut self,
281        token: Token,
282        reactor: impl Reactor + 'static,
283    ) -> ReactorRegistration<'_> {
284        let idx = token.index();
285        assert!(
286            self.reactors.contains(idx),
287            "token {} was not allocated by create_reactor",
288            idx,
289        );
290        assert!(
291            self.reactors[idx].is_none(),
292            "token {} was already filled",
293            idx,
294        );
295        self.reactors[idx] = Some(Box::new(reactor));
296        ReactorRegistration {
297            token,
298            notify: self,
299        }
300    }
301
302    /// Register a reactor from a step function + context factory.
303    ///
304    /// One-shot convenience when you already have `&Registry` (e.g.,
305    /// inside event handlers via [`Param`] resolution, or in tests).
306    /// For the safe `World`-based API, use [`create_reactor`](Self::create_reactor)
307    /// + [`insert_reactor`](Self::insert_reactor).
308    pub fn register<C, Params, F: IntoReactor<C, Params>>(
309        &mut self,
310        ctx_fn: impl FnOnce(Token) -> C,
311        step: F,
312        registry: &Registry,
313    ) -> ReactorRegistration<'_> {
314        let key = self.reactors.vacant_key();
315        let token = Token::new(key);
316        self.notify.ensure_capacity(key);
317        if key >= self.reactor_sources.len() {
318            self.reactor_sources.resize_with(key + 1, Vec::new);
319        }
320        let ctx = ctx_fn(token);
321        let reactor = step.into_reactor(ctx, registry);
322        let inserted = self.reactors.insert(Some(Box::new(reactor)));
323        debug_assert_eq!(inserted, key);
324        ReactorRegistration {
325            token,
326            notify: self,
327        }
328    }
329
330    /// Register a pre-built reactor in one step.
331    ///
332    /// Convenience for reactors that don't need their [`Token`] in
333    /// the context. For reactors that need the token (wire routing,
334    /// self-deregistration), use [`create_reactor`](Self::create_reactor)
335    /// + [`insert_reactor`](Self::insert_reactor).
336    pub fn register_built(&mut self, reactor: impl Reactor + 'static) -> ReactorRegistration<'_> {
337        let key = self.reactors.vacant_key();
338        let token = Token::new(key);
339        self.notify.ensure_capacity(key);
340        if key >= self.reactor_sources.len() {
341            self.reactor_sources.resize_with(key + 1, Vec::new);
342        }
343        let inserted = self.reactors.insert(Some(Box::new(reactor)));
344        debug_assert_eq!(inserted, key);
345        ReactorRegistration {
346            token,
347            notify: self,
348        }
349    }
350
351    // ── Subscription ────────────────────────────────────────────────────
352
353    /// Subscribe a reactor to a data source.
354    ///
355    /// Idempotent — subscribing twice is a no-op.
356    /// No-op if `source` has been removed.
357    pub fn subscribe(&mut self, reactor: Token, source: DataSource) {
358        if let Some(subscribers) = self.interests.get_mut(source.0) {
359            if !subscribers.contains(&reactor) {
360                subscribers.push(reactor);
361                // Maintain reverse index for O(subscriptions) removal.
362                let idx = reactor.index();
363                debug_assert!(
364                    idx < self.reactor_sources.len(),
365                    "reactor_sources missing entry for reactor token {}",
366                    idx,
367                );
368                self.reactor_sources[idx].push(source);
369            }
370        }
371    }
372
373    /// Unsubscribe a reactor from a data source.
374    pub fn unsubscribe(&mut self, reactor: Token, source: DataSource) {
375        if let Some(subscribers) = self.interests.get_mut(source.0) {
376            subscribers.retain(|&t| t != reactor);
377        }
378        if let Some(sources) = self.reactor_sources.get_mut(reactor.index()) {
379            sources.retain(|&s| s != source);
380        }
381    }
382
383    // ── Hot path ────────────────────────────────────────────────────────
384
385    /// Mark a data source as changed this frame.
386    ///
387    /// Fans out to all subscribed reactor tokens in the underlying
388    /// [`LocalNotify`], with per-reactor dedup.
389    #[inline]
390    pub fn mark(&mut self, source: DataSource) {
391        if let Some(subscribers) = self.interests.get(source.0) {
392            for &reactor_token in subscribers {
393                self.notify.mark(reactor_token);
394            }
395        }
396    }
397
398    /// Poll for woken reactor tokens into the events buffer.
399    #[inline]
400    pub(crate) fn poll(&mut self, events: &mut Events) {
401        self.notify.poll(events);
402    }
403
404    /// Take a reactor out of its slot for dispatch.
405    /// Returns None if the slot is empty or doesn't exist.
406    #[inline]
407    pub(crate) fn take_reactor(&mut self, idx: usize) -> Option<Box<dyn Reactor>> {
408        self.reactors.get_mut(idx).and_then(Option::take)
409    }
410
411    /// Put a reactor back into its slot after dispatch.
412    ///
413    /// The caller guarantees `idx` is a valid, occupied slab key
414    /// (it was just returned by `take_reactor`). Skips the redundant
415    /// `contains` check — single bounds-checked write.
416    #[inline]
417    pub(crate) fn put_reactor(&mut self, idx: usize, reactor: Box<dyn Reactor>) {
418        self.reactors[idx] = Some(reactor);
419    }
420
421    /// Remove a reactor and unsubscribe from all data sources.
422    ///
423    /// Uses the reverse index for O(subscriptions) removal instead of
424    /// scanning all data source interest lists.
425    pub fn remove_reactor(&mut self, token: Token) {
426        let idx = token.index();
427        if self.reactors.contains(idx) {
428            self.reactors.remove(idx);
429            // Use reverse index — only touch sources this reactor subscribed to.
430            if let Some(sources) = self.reactor_sources.get_mut(idx) {
431                for &source in sources.iter() {
432                    if let Some(subscribers) = self.interests.get_mut(source.0) {
433                        subscribers.retain(|&t| t != token);
434                    }
435                }
436                sources.clear();
437            }
438        }
439    }
440
441    // ── Introspection ───────────────────────────────────────────────────
442
443    /// Any reactors woken this frame?
444    pub fn has_notified(&self) -> bool {
445        self.notify.has_notified()
446    }
447
448    /// Number of reactors woken this frame.
449    pub fn notified_count(&self) -> usize {
450        self.notify.notified_count()
451    }
452
453    /// Number of registered data sources.
454    pub fn source_count(&self) -> usize {
455        self.interests.len()
456    }
457
458    /// Number of registered reactors.
459    pub fn reactor_count(&self) -> usize {
460        self.reactors.len()
461    }
462}
463
464impl Resource for ReactorNotify {}
465
466// =============================================================================
467// ReactorRegistration — builder for chaining subscriptions
468// =============================================================================
469
470/// Builder returned by [`ReactorNotify::register`] for chaining subscriptions.
471pub struct ReactorRegistration<'a> {
472    token: Token,
473    notify: &'a mut ReactorNotify,
474}
475
476impl ReactorRegistration<'_> {
477    /// Subscribe this reactor to a data source.
478    pub fn subscribe(self, source: DataSource) -> Self {
479        self.notify.subscribe(self.token, source);
480        self
481    }
482
483    /// The assigned token for this reactor.
484    pub fn token(&self) -> Token {
485        self.token
486    }
487}
488
489// =============================================================================
490// Reactor trait
491// =============================================================================
492
493/// A dispatchable unit with per-instance context.
494///
495/// Reactors own lightweight metadata (instrument ID, order ID, routing
496/// keys). Mutable state belongs in World resources, accessed via
497/// pre-resolved [`Res<T>`](crate::Res) / [`ResMut<T>`](crate::ResMut)
498/// in the step function.
499///
500/// Resource access is resolved once at registration time — dispatch
501/// is a single pointer deref per resource, no HashMap lookups.
502pub trait Reactor: Send {
503    /// Run this reactor with full World access.
504    fn run(&mut self, world: &mut World);
505
506    /// Returns the reactor's name for diagnostics.
507    fn name(&self) -> &'static str {
508        "<unnamed>"
509    }
510}
511
512// =============================================================================
513// ReactorFn — concrete dispatch wrapper
514// =============================================================================
515
516/// Concrete reactor wrapper produced by [`IntoReactor`].
517///
518/// Stores the step function, per-reactor context, and pre-resolved
519/// parameter state. Same pattern as [`Callback`](crate::Callback)
520/// but without an event argument.
521pub struct ReactorFn<C, F, Params: Param> {
522    /// Per-reactor owned context (instrument, order ID, config).
523    pub ctx: C,
524    f: F,
525    state: Params::State,
526    name: &'static str,
527}
528
529// =============================================================================
530// PipelineReactor — reactor backed by a CtxPipeline or CtxDag body
531// =============================================================================
532
533/// A reactor whose body is a [`CtxPipeline`](crate::CtxPipeline),
534/// [`CtxDag`](crate::CtxDag), or any [`CtxStepCall`].
535///
536/// The context `C` holds per-reactor metadata. The body is type-erased
537/// via `Box<dyn CtxStepCall>` since pipeline chain types are unnameable.
538///
539/// # Example
540///
541/// ```ignore
542/// let pipeline = CtxPipelineBuilder::<QuotingCtx, ()>::new()
543///     .then(read_books, &reg)
544///     .then(compute_quote, &reg)
545///     .then(submit_quote, &reg)
546///     .build();
547///
548/// let reactor = PipelineReactor::new(
549///     QuotingCtx { reactor_id: token, instrument: BTC },
550///     pipeline,
551/// );
552///
553/// notify.register_built(reactor).subscribe(btc_md);
554/// ```
555pub struct PipelineReactor<C> {
556    /// Per-reactor owned context.
557    pub ctx: C,
558    body: Box<dyn CtxStepCall<C, (), Out = ()> + Send>,
559}
560
561impl<C: Send + 'static> PipelineReactor<C> {
562    /// Create a reactor from a context and a pipeline/DAG body.
563    ///
564    /// The body must implement `CtxStepCall<C, (), Out = ()>`.
565    /// Both [`CtxPipeline`](crate::CtxPipeline) and
566    /// [`CtxDag`](crate::CtxDag) satisfy this when their output is `()`.
567    pub fn new(ctx: C, body: impl CtxStepCall<C, (), Out = ()> + Send + 'static) -> Self {
568        Self {
569            ctx,
570            body: Box::new(body),
571        }
572    }
573}
574
575impl<C: Send + 'static> Reactor for PipelineReactor<C> {
576    fn run(&mut self, world: &mut World) {
577        self.body.call(&mut self.ctx, world, ());
578    }
579
580    fn name(&self) -> &'static str {
581        std::any::type_name::<C>()
582    }
583}
584
585// =============================================================================
586// IntoReactor — conversion trait
587// =============================================================================
588
589/// Converts a step function into a [`Reactor`].
590///
591/// Step function signature: `fn(&mut C, Params...)` — context first,
592/// then resolved resources. No event argument, no return value.
593///
594/// # Example
595///
596/// ```ignore
597/// fn quoting_step(ctx: &mut QuotingCtx, books: Res<OrderBooks>, mut gw: ResMut<Gateway>) {
598///     let quote = books.compute_quote(ctx.instrument, ctx.layer);
599///     gw.submit_quote(quote);
600/// }
601///
602/// let reactor = quoting_step.into_reactor(QuotingCtx { instrument: BTC, layer: 1 }, &registry);
603/// ```
604#[diagnostic::on_unimplemented(
605    message = "this function cannot be used as a reactor step",
606    note = "reactor step signature: `fn(&mut C, Params...)` — context first, then resources",
607    note = "closures with resource parameters are not supported — use a named `fn`"
608)]
609pub trait IntoReactor<C, Params> {
610    /// The concrete reactor type produced.
611    type Reactor: Reactor + 'static;
612
613    /// Convert this function + context into a reactor, resolving
614    /// parameters from the registry.
615    fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor;
616}
617
618// =============================================================================
619// Arity 0: fn(&mut C) — context only, no Param
620// =============================================================================
621
622impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> Reactor for ReactorFn<C, F, ()> {
623    fn run(&mut self, _world: &mut World) {
624        (self.f)(&mut self.ctx);
625    }
626
627    fn name(&self) -> &'static str {
628        self.name
629    }
630}
631
632impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> IntoReactor<C, ()> for F {
633    type Reactor = ReactorFn<C, F, ()>;
634
635    fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
636        ReactorFn {
637            ctx,
638            f: self,
639            state: <() as Param>::init(registry),
640            name: std::any::type_name::<F>(),
641        }
642    }
643}
644
645// =============================================================================
646// Arities 1-8 via macro
647// =============================================================================
648
649macro_rules! impl_into_reactor {
650    ($($P:ident),+) => {
651        impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
652            Reactor for ReactorFn<C, F, ($($P,)+)>
653        where
654            for<'a> &'a mut F:
655                FnMut(&mut C, $($P,)+) +
656                FnMut(&mut C, $($P::Item<'a>,)+),
657        {
658            #[allow(non_snake_case)]
659            fn run(&mut self, world: &mut World) {
660                fn call_inner<Ctx, $($P,)+>(
661                    mut f: impl FnMut(&mut Ctx, $($P,)+),
662                    ctx: &mut Ctx,
663                    $($P: $P,)+
664                ) {
665                    f(ctx, $($P,)+);
666                }
667
668                // SAFETY: state was produced by Param::init() on the same
669                // Registry that built this World. Single-threaded sequential
670                // dispatch ensures no mutable aliasing across params.
671                #[cfg(debug_assertions)]
672                world.clear_borrows();
673                let ($($P,)+) = unsafe {
674                    <($($P,)+) as Param>::fetch(world, &mut self.state)
675                };
676                call_inner(&mut self.f, &mut self.ctx, $($P,)+);
677            }
678
679            fn name(&self) -> &'static str {
680                self.name
681            }
682        }
683
684        impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
685            IntoReactor<C, ($($P,)+)> for F
686        where
687            for<'a> &'a mut F:
688                FnMut(&mut C, $($P,)+) +
689                FnMut(&mut C, $($P::Item<'a>,)+),
690        {
691            type Reactor = ReactorFn<C, F, ($($P,)+)>;
692
693            fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
694                let state = <($($P,)+) as Param>::init(registry);
695                {
696                    #[allow(non_snake_case)]
697                    let ($($P,)+) = &state;
698                    registry.check_access(&[
699                        $(
700                            (<$P as Param>::resource_id($P),
701                             std::any::type_name::<$P>()),
702                        )+
703                    ]);
704                }
705                ReactorFn {
706                    ctx,
707                    f: self,
708                    state,
709                    name: std::any::type_name::<F>(),
710                }
711            }
712        }
713    };
714}
715
716all_tuples!(impl_into_reactor);
717
718// =============================================================================
719// DeferredRemovals — World resource for reactor self-removal
720// =============================================================================
721
722/// Deferred reactor removal queue.
723///
724/// Reactors request removal during dispatch by pushing their token
725/// via [`ResMut<DeferredRemovals>`](crate::ResMut). The [`ReactorSystem`]
726/// drains this after all reactors in the frame have run.
727#[derive(Default)]
728pub struct DeferredRemovals {
729    tokens: Vec<Token>,
730}
731
732impl DeferredRemovals {
733    /// Request deferred removal of a reactor.
734    ///
735    /// Takes effect after the current dispatch frame completes.
736    /// Duplicate calls are harmless — `remove_reactor` is idempotent.
737    pub fn deregister(&mut self, token: Token) {
738        self.tokens.push(token);
739    }
740
741    /// Swap out the inner Vec for zero-alloc processing.
742    /// Returns the Vec (caller owns it). Leaves self with an empty Vec.
743    #[inline]
744    pub(crate) fn take(&mut self) -> Vec<Token> {
745        std::mem::take(&mut self.tokens)
746    }
747
748    /// Put a (drained) Vec back for reuse. Zero allocation.
749    #[inline]
750    pub(crate) fn put(&mut self, tokens: Vec<Token>) {
751        debug_assert!(tokens.is_empty(), "put() expects a drained Vec");
752        self.tokens = tokens;
753    }
754
755    /// Any removals pending?
756    pub fn is_empty(&self) -> bool {
757        self.tokens.is_empty()
758    }
759}
760
761impl Resource for DeferredRemovals {}
762
763// =============================================================================
764// SourceRegistry — typed key → DataSource resolution
765// =============================================================================
766
767/// Maps natural domain keys to [`DataSource`] values.
768///
769/// Single World resource supporting any number of key types via
770/// type erasure. Each key type `K` gets its own internal
771/// `HashMap<K, DataSource>`. The `TypeId` dispatch is one hash
772/// lookup to find the right inner map — cold path only.
773///
774/// Any `Hash + Eq + Send + 'static` type works as a key — no trait
775/// to implement, no macro to invoke. Newtypes, enums, and tuples
776/// all work out of the box.
777///
778/// # Example
779///
780/// ```ignore
781/// // Setup
782/// let src = notify.register_source();
783/// registry.insert(InstrumentId("BTC"), src);
784///
785/// // Runtime lookup (cold path — from event handler)
786/// let src = registry.get(&InstrumentId("BTC")).unwrap();
787/// notify.register(|t| ctx, step, reg).subscribe(src);
788///
789/// // Hot path — DataSource pre-resolved, no registry involvement
790/// notify.mark(src);
791/// ```
792#[derive(Default)]
793pub struct SourceRegistry {
794    maps: FxHashMap<TypeId, Box<dyn Any + Send>>,
795}
796
797impl SourceRegistry {
798    /// Create an empty registry.
799    pub fn new() -> Self {
800        Self::default()
801    }
802
803    /// Map a typed key to a [`DataSource`].
804    ///
805    /// Overwrites any previous mapping for this key.
806    pub fn insert<K: Hash + Eq + Send + 'static>(&mut self, key: K, source: DataSource) {
807        self.get_or_create_map::<K>().insert(key, source);
808    }
809
810    /// Look up a [`DataSource`] by typed key.
811    ///
812    /// Returns `None` if the key is not registered.
813    pub fn get<K: Hash + Eq + Send + 'static>(&self, key: &K) -> Option<DataSource> {
814        self.get_map::<K>().and_then(|map| map.get(key)).copied()
815    }
816
817    /// Remove a key mapping. Returns the [`DataSource`] so the caller
818    /// can also call [`ReactorNotify::remove_source`] to free the slot.
819    pub fn remove<K: Hash + Eq + Send + 'static>(&mut self, key: &K) -> Option<DataSource> {
820        self.get_map_mut::<K>().and_then(|map| map.remove(key))
821    }
822
823    /// Returns `true` if the key is mapped.
824    pub fn contains<K: Hash + Eq + Send + 'static>(&self, key: &K) -> bool {
825        self.get_map::<K>().is_some_and(|map| map.contains_key(key))
826    }
827
828    fn get_map<K: Hash + Eq + Send + 'static>(&self) -> Option<&FxHashMap<K, DataSource>> {
829        self.maps.get(&TypeId::of::<K>()).map(|boxed| {
830            // Invariant: map was inserted with type K, so downcast always succeeds.
831            boxed
832                .downcast_ref::<FxHashMap<K, DataSource>>()
833                .expect("invariant: TypeId matches stored map type")
834        })
835    }
836
837    fn get_map_mut<K: Hash + Eq + Send + 'static>(
838        &mut self,
839    ) -> Option<&mut FxHashMap<K, DataSource>> {
840        self.maps.get_mut(&TypeId::of::<K>()).map(|boxed| {
841            // Invariant: map was inserted with type K, so downcast always succeeds.
842            boxed
843                .downcast_mut::<FxHashMap<K, DataSource>>()
844                .expect("invariant: TypeId matches stored map type")
845        })
846    }
847
848    fn get_or_create_map<K: Hash + Eq + Send + 'static>(
849        &mut self,
850    ) -> &mut FxHashMap<K, DataSource> {
851        self.maps
852            .entry(TypeId::of::<K>())
853            .or_insert_with(|| Box::<FxHashMap<K, DataSource>>::default())
854            // Invariant: entry was just created or retrieved with type K.
855            .downcast_mut::<FxHashMap<K, DataSource>>()
856            .unwrap()
857    }
858}
859
860impl Resource for SourceRegistry {}
861
862// =============================================================================
863// ReactorSystem — thin dispatch handle
864// =============================================================================
865
866/// Lightweight dispatch handle for the reactor system.
867///
868/// Sits at the driver level (same as mio `Poll` or timer poller).
869/// Reads [`ReactorNotify`] via pre-resolved [`ResourceId`] during
870/// dispatch. All reactor storage and registration lives in
871/// [`ReactorNotify`] (World resource).
872pub struct ReactorSystem {
873    /// Pre-allocated events buffer for polling.
874    events: Events,
875
876    /// Pre-resolved resource IDs for reaching into World.
877    notify_id: ResourceId,
878    removals_id: ResourceId,
879}
880
881impl ReactorSystem {
882    /// Create a dispatch handle from a built [`World`].
883    ///
884    /// The World must contain [`ReactorNotify`] and [`DeferredRemovals`].
885    pub fn new(world: &World) -> Self {
886        Self {
887            events: Events::with_capacity(256),
888            notify_id: world.id::<ReactorNotify>(),
889            removals_id: world.id::<DeferredRemovals>(),
890        }
891    }
892
893    /// Dispatch all woken reactors and process deferred removals.
894    ///
895    /// 1. Polls [`ReactorNotify`] for woken reactor tokens (deduped)
896    /// 2. Runs each reactor's step function with pre-resolved Params
897    /// 3. Drains [`DeferredRemovals`] and removes reactors
898    ///
899    /// Returns `true` if any reactor ran (for scheduler propagation).
900    pub fn dispatch(&mut self, world: &mut World) -> bool {
901        // SAFETY: notify_id was resolved from the same WorldBuilder.
902        // ReactorNotify is heap-allocated — pointer stable for World's lifetime.
903        let notify_ptr: *mut ReactorNotify =
904            unsafe { world.get_mut::<ReactorNotify>(self.notify_id) };
905
906        // Poll — scoped &mut, dropped before reactor dispatch.
907        {
908            let notify = unsafe { &mut *notify_ptr };
909            notify.poll(&mut self.events);
910        }
911        let ran = !self.events.is_empty();
912
913        // Dispatch — each reactor is moved out before run(), put back after.
914        // &mut ReactorNotify is scoped tightly to avoid aliasing during run().
915        for token in self.events.iter() {
916            let idx = token.index();
917            // SAFETY: notify_ptr is valid for World's lifetime. Scoped &mut
918            // is dropped before reactor.run() to avoid aliasing.
919            let reactor = {
920                let notify = unsafe { &mut *notify_ptr };
921                notify.take_reactor(idx)
922            };
923            if let Some(mut reactor) = reactor {
924                reactor.run(world);
925                // SAFETY: re-derive &mut after run() completes. No aliasing —
926                // reactor was moved out of the slab during run().
927                let notify = unsafe { &mut *notify_ptr };
928                notify.put_reactor(idx, reactor);
929            }
930        }
931
932        // Deferred removals — swap Vec out to avoid holding two &mut.
933        // Zero allocation: Vec is swapped back and reused next frame.
934        // SAFETY: removals_id from same WorldBuilder. Dispatch complete.
935        let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
936        let mut pending = removals.take();
937        if !pending.is_empty() {
938            // SAFETY: re-derive &mut for cleanup phase. No other references.
939            let notify = unsafe { &mut *notify_ptr };
940            while let Some(token) = pending.pop() {
941                notify.remove_reactor(token);
942            }
943        }
944        // Put the (now empty) Vec back for reuse.
945        let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
946        removals.put(pending);
947
948        ran
949    }
950
951    /// Number of live reactors.
952    pub fn reactor_count(&self, world: &World) -> usize {
953        world.resource::<ReactorNotify>().reactor_count()
954    }
955}
956
957// =============================================================================
958// Tests
959// =============================================================================
960
961#[cfg(test)]
962mod tests {
963    use super::*;
964    use crate::{Res, ResMut, WorldBuilder};
965
966    // -- Reactor trait dispatch --------------------------------------------------
967
968    #[test]
969    fn reactor_fn_arity0() {
970        let wb = WorldBuilder::new();
971        let mut world = wb.build();
972        let reg = world.registry();
973
974        struct Ctx {
975            count: u32,
976        }
977
978        fn step(ctx: &mut Ctx) {
979            ctx.count += 1;
980        }
981
982        let mut reactor = step.into_reactor(Ctx { count: 0 }, reg);
983        reactor.run(&mut world);
984        assert_eq!(reactor.ctx.count, 1);
985    }
986
987    #[test]
988    fn reactor_fn_with_params() {
989        let mut wb = WorldBuilder::new();
990        wb.register::<u64>(10);
991        wb.register::<u32>(0);
992        let mut world = wb.build();
993        let reg = world.registry();
994
995        struct Ctx {
996            multiplier: u64,
997        }
998
999        fn step(ctx: &mut Ctx, val: Res<u64>, mut out: ResMut<u32>) {
1000            *out = (*val * ctx.multiplier) as u32;
1001        }
1002
1003        let mut reactor = step.into_reactor(Ctx { multiplier: 5 }, reg);
1004        reactor.run(&mut world);
1005        assert_eq!(*world.resource::<u32>(), 50);
1006    }
1007
1008    // -- ReactorNotify ----------------------------------------------------------
1009
1010    fn dummy_reactor() -> ReactorFn<(), fn(&mut ()), ()> {
1011        ReactorFn {
1012            ctx: (),
1013            f: (|_: &mut ()| {}) as fn(&mut ()),
1014            state: (),
1015            name: "dummy",
1016        }
1017    }
1018
1019    #[test]
1020    fn reactor_notify_mark_fans_out() {
1021        let mut notify = ReactorNotify::new(4, 8);
1022        let mut events = Events::with_capacity(8);
1023
1024        let src = notify.register_source();
1025        let a1 = notify.register_built(dummy_reactor()).token();
1026        let a2 = notify.register_built(dummy_reactor()).token();
1027        let _a3 = notify.register_built(dummy_reactor()).token();
1028
1029        notify.subscribe(a1, src);
1030        notify.subscribe(a2, src);
1031        // _a3 not subscribed
1032
1033        notify.mark(src);
1034        notify.notify.poll(&mut events);
1035
1036        assert_eq!(events.len(), 2);
1037        assert!(events.as_slice().contains(&a1));
1038        assert!(events.as_slice().contains(&a2));
1039    }
1040
1041    #[test]
1042    fn reactor_notify_dedup_across_sources() {
1043        let mut notify = ReactorNotify::new(4, 8);
1044        let mut events = Events::with_capacity(8);
1045
1046        let src1 = notify.register_source();
1047        let src2 = notify.register_source();
1048        let reactor = notify.register_built(dummy_reactor()).token();
1049
1050        notify.subscribe(reactor, src1);
1051        notify.subscribe(reactor, src2);
1052
1053        notify.mark(src1);
1054        notify.mark(src2);
1055
1056        notify.notify.poll(&mut events);
1057        assert_eq!(events.len(), 1);
1058        assert_eq!(events.as_slice()[0], reactor);
1059    }
1060
1061    #[test]
1062    fn reactor_notify_remove_reactor() {
1063        let mut notify = ReactorNotify::new(4, 8);
1064        let mut events = Events::with_capacity(8);
1065
1066        let src = notify.register_source();
1067
1068        struct Ctx;
1069        let token = notify
1070            .register_built(ReactorFn {
1071                ctx: Ctx,
1072                f: (|_: &mut Ctx| {}) as fn(&mut Ctx),
1073                state: (),
1074                name: "test",
1075            })
1076            .token();
1077        notify.subscribe(token, src);
1078
1079        notify.remove_reactor(token);
1080        notify.mark(src);
1081        notify.notify.poll(&mut events);
1082        assert!(events.is_empty());
1083    }
1084
1085    // -- Full ReactorSystem integration -----------------------------------------
1086
1087    // Helper: registry() borrows &World, resource_mut() borrows &mut World.
1088    // In tests, we use unsafe get_mut via the notify_id to avoid the conflict,
1089    // same pattern as production dispatch code.
1090    fn notify_mut(world: &World, id: ResourceId) -> &mut ReactorNotify {
1091        unsafe { world.get_mut::<ReactorNotify>(id) }
1092    }
1093
1094    #[test]
1095    fn reactor_system_dispatch() {
1096        let mut wb = WorldBuilder::new();
1097        wb.register::<u64>(0);
1098        wb.register(ReactorNotify::new(4, 8));
1099        wb.register(DeferredRemovals::default());
1100        let mut world = wb.build();
1101        let reg = world.registry();
1102        let nid = world.id::<ReactorNotify>();
1103
1104        let mut system = ReactorSystem::new(&world);
1105
1106        struct Ctx {
1107            _reactor_id: Token,
1108            increment: u64,
1109        }
1110
1111        fn step(ctx: &mut Ctx, mut val: ResMut<u64>) {
1112            *val += ctx.increment;
1113        }
1114
1115        let notify = notify_mut(&world, nid);
1116        let src = notify.register_source();
1117        notify
1118            .register(
1119                |t| Ctx {
1120                    _reactor_id: t,
1121                    increment: 10,
1122                },
1123                step,
1124                reg,
1125            )
1126            .subscribe(src);
1127        notify
1128            .register(
1129                |t| Ctx {
1130                    _reactor_id: t,
1131                    increment: 5,
1132                },
1133                step,
1134                reg,
1135            )
1136            .subscribe(src);
1137
1138        // Mark and dispatch
1139        notify_mut(&world, nid).mark(src);
1140        let ran = system.dispatch(&mut world);
1141
1142        assert!(ran);
1143        assert_eq!(*world.resource::<u64>(), 15); // 10 + 5
1144    }
1145
1146    #[test]
1147    fn reactor_system_deferred_removal() {
1148        let mut wb = WorldBuilder::new();
1149        wb.register::<u64>(0);
1150        wb.register(ReactorNotify::new(4, 8));
1151        wb.register(DeferredRemovals::default());
1152        let mut world = wb.build();
1153        let reg = world.registry();
1154        let nid = world.id::<ReactorNotify>();
1155
1156        let mut system = ReactorSystem::new(&world);
1157
1158        struct Ctx {
1159            reactor_id: Token,
1160            runs: u64,
1161        }
1162
1163        fn step(ctx: &mut Ctx, mut val: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1164            *val += 1;
1165            ctx.runs += 1;
1166            if ctx.runs >= 2 {
1167                removals.deregister(ctx.reactor_id);
1168            }
1169        }
1170
1171        let notify = notify_mut(&world, nid);
1172        let src = notify.register_source();
1173        notify
1174            .register(
1175                |t| Ctx {
1176                    reactor_id: t,
1177                    runs: 0,
1178                },
1179                step,
1180                reg,
1181            )
1182            .subscribe(src);
1183
1184        assert_eq!(system.reactor_count(&world), 1);
1185
1186        // Frame 1 — reactor runs, runs=1, no removal
1187        notify_mut(&world, nid).mark(src);
1188        system.dispatch(&mut world);
1189        assert_eq!(*world.resource::<u64>(), 1);
1190        assert_eq!(system.reactor_count(&world), 1);
1191
1192        // Frame 2 — reactor runs, runs=2, deregisters
1193        notify_mut(&world, nid).mark(src);
1194        system.dispatch(&mut world);
1195        assert_eq!(*world.resource::<u64>(), 2);
1196        assert_eq!(system.reactor_count(&world), 0);
1197
1198        // Frame 3 — no reactors, nothing runs
1199        notify_mut(&world, nid).mark(src);
1200        let ran = system.dispatch(&mut world);
1201        assert!(!ran);
1202        assert_eq!(*world.resource::<u64>(), 2);
1203    }
1204
1205    #[test]
1206    fn reactor_system_only_subscribed_wake() {
1207        let mut wb = WorldBuilder::new();
1208        wb.register::<u64>(0);
1209        wb.register(ReactorNotify::new(4, 8));
1210        wb.register(DeferredRemovals::default());
1211        let mut world = wb.build();
1212        let reg = world.registry();
1213        let nid = world.id::<ReactorNotify>();
1214
1215        let mut system = ReactorSystem::new(&world);
1216
1217        struct Ctx {
1218            _reactor_id: Token,
1219            value: u64,
1220        }
1221
1222        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1223            *out += ctx.value;
1224        }
1225
1226        let notify = notify_mut(&world, nid);
1227        let btc = notify.register_source();
1228        let eth = notify.register_source();
1229
1230        notify
1231            .register(
1232                |t| Ctx {
1233                    _reactor_id: t,
1234                    value: 10,
1235                },
1236                step,
1237                reg,
1238            )
1239            .subscribe(btc);
1240        notify
1241            .register(
1242                |t| Ctx {
1243                    _reactor_id: t,
1244                    value: 100,
1245                },
1246                step,
1247                reg,
1248            )
1249            .subscribe(eth);
1250
1251        // Only BTC fires — only reactor 1 runs
1252        notify_mut(&world, nid).mark(btc);
1253        system.dispatch(&mut world);
1254        assert_eq!(*world.resource::<u64>(), 10);
1255
1256        // ETH fires — reactor 2 runs
1257        notify_mut(&world, nid).mark(eth);
1258        system.dispatch(&mut world);
1259        assert_eq!(*world.resource::<u64>(), 110);
1260    }
1261
1262    #[test]
1263    fn runtime_registration() {
1264        let mut wb = WorldBuilder::new();
1265        wb.register::<u64>(0);
1266        wb.register(ReactorNotify::new(4, 8));
1267        wb.register(DeferredRemovals::default());
1268        let mut world = wb.build();
1269        let nid = world.id::<ReactorNotify>();
1270
1271        let mut system = ReactorSystem::new(&world);
1272
1273        struct Ctx {
1274            _reactor_id: Token,
1275            value: u64,
1276        }
1277
1278        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1279            *out += ctx.value;
1280        }
1281
1282        // Register source and first reactor at setup
1283        let src = {
1284            let reg = world.registry();
1285            let notify = notify_mut(&world, nid);
1286            let src = notify.register_source();
1287            notify
1288                .register(
1289                    |t| Ctx {
1290                        _reactor_id: t,
1291                        value: 10,
1292                    },
1293                    step,
1294                    reg,
1295                )
1296                .subscribe(src);
1297            src
1298        };
1299
1300        // Frame 1 — one reactor
1301        notify_mut(&world, nid).mark(src);
1302        system.dispatch(&mut world);
1303        assert_eq!(*world.resource::<u64>(), 10);
1304
1305        // Runtime registration (simulates admin command handler)
1306        {
1307            let reg = world.registry();
1308            notify_mut(&world, nid)
1309                .register(
1310                    |t| Ctx {
1311                        _reactor_id: t,
1312                        value: 100,
1313                    },
1314                    step,
1315                    reg,
1316                )
1317                .subscribe(src);
1318        }
1319
1320        // Frame 2 — both reactors fire
1321        notify_mut(&world, nid).mark(src);
1322        system.dispatch(&mut world);
1323        assert_eq!(*world.resource::<u64>(), 120); // 10 + 10 + 100
1324    }
1325
1326    #[test]
1327    fn register_after_remove_reuses_key() {
1328        let mut wb = WorldBuilder::new();
1329        wb.register::<u64>(0);
1330        wb.register(ReactorNotify::new(4, 8));
1331        wb.register(DeferredRemovals::default());
1332        let mut world = wb.build();
1333        let nid = world.id::<ReactorNotify>();
1334
1335        let mut system = ReactorSystem::new(&world);
1336
1337        struct Ctx {
1338            reactor_id: Token,
1339            value: u64,
1340        }
1341
1342        fn step(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
1343            *out += ctx.value;
1344            if ctx.value == 10 {
1345                removals.deregister(ctx.reactor_id);
1346            }
1347        }
1348
1349        let src = {
1350            let reg = world.registry();
1351            let notify = notify_mut(&world, nid);
1352            let src = notify.register_source();
1353            notify
1354                .register(
1355                    |t| Ctx {
1356                        reactor_id: t,
1357                        value: 10,
1358                    },
1359                    step,
1360                    reg,
1361                )
1362                .subscribe(src);
1363            src
1364        };
1365
1366        // Frame 1 — reactor runs and deregisters itself
1367        notify_mut(&world, nid).mark(src);
1368        system.dispatch(&mut world);
1369        assert_eq!(*world.resource::<u64>(), 10);
1370        assert_eq!(system.reactor_count(&world), 0);
1371
1372        // Register a NEW reactor — should reuse key 0
1373        {
1374            let reg = world.registry();
1375            let notify = notify_mut(&world, nid);
1376            let token = notify
1377                .register(
1378                    |t| Ctx {
1379                        reactor_id: t,
1380                        value: 100,
1381                    },
1382                    step,
1383                    reg,
1384                )
1385                .token();
1386            notify.subscribe(token, src);
1387            assert_eq!(token.index(), 0); // slab reused key 0
1388        }
1389
1390        // Frame 2 — new reactor runs correctly
1391        notify_mut(&world, nid).mark(src);
1392        system.dispatch(&mut world);
1393        assert_eq!(*world.resource::<u64>(), 110); // 10 + 100
1394        assert_eq!(system.reactor_count(&world), 1); // still alive (value != 10)
1395    }
1396
1397    #[test]
1398    fn reactor_can_access_actor_notify() {
1399        // Verify no aliasing UB: the move-out-move-back pattern
1400        // allows reactors to safely access ReactorNotify via ResMut.
1401        let mut wb = WorldBuilder::new();
1402        wb.register::<u64>(0);
1403        wb.register(ReactorNotify::new(4, 8));
1404        wb.register(DeferredRemovals::default());
1405        let mut world = wb.build();
1406        let nid = world.id::<ReactorNotify>();
1407
1408        let mut system = ReactorSystem::new(&world);
1409
1410        struct Ctx {
1411            _reactor_id: Token,
1412        }
1413
1414        fn step(_ctx: &mut Ctx, notify: ResMut<ReactorNotify>, mut out: ResMut<u64>) {
1415            // Reactor reads ReactorNotify — this would be UB without move-out
1416            *out = notify.reactor_count() as u64;
1417        }
1418
1419        let src = {
1420            let reg = world.registry();
1421            let notify = notify_mut(&world, nid);
1422            let src = notify.register_source();
1423            notify
1424                .register(|t| Ctx { _reactor_id: t }, step, reg)
1425                .subscribe(src);
1426            src
1427        };
1428
1429        notify_mut(&world, nid).mark(src);
1430        system.dispatch(&mut world);
1431        // Reactor count is 1, but during run() the reactor was taken out,
1432        // so reactor_count() sees 0 reactors with Some values... actually
1433        // slab.len() still counts the slot as occupied. The Option is
1434        // None but the slab entry exists. Let's just verify no panic.
1435        // The important thing is no aliasing UB.
1436    }
1437
1438    // -- Realistic data source patterns ---------------------------------------
1439
1440    #[test]
1441    fn multi_instrument_with_shared_source() {
1442        // Pattern: per-instrument market data sources + a shared "positions"
1443        // source. Quoting reactors subscribe to their instrument + positions.
1444        let mut wb = WorldBuilder::new();
1445        wb.register::<u64>(0);
1446        wb.register(ReactorNotify::new(8, 16));
1447        wb.register(DeferredRemovals::default());
1448        let mut world = wb.build();
1449        let nid = world.id::<ReactorNotify>();
1450        let mut system = ReactorSystem::new(&world);
1451
1452        struct Ctx {
1453            _reactor_id: Token,
1454            instrument: &'static str,
1455        }
1456
1457        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1458            // Each reactor increments by its instrument's "value"
1459            *out += match ctx.instrument {
1460                "BTC" => 100,
1461                "ETH" => 10,
1462                "SOL" => 1,
1463                _ => 0,
1464            };
1465        }
1466
1467        let (btc_md, eth_md, sol_md, positions) = {
1468            let reg = world.registry();
1469            let notify = notify_mut(&world, nid);
1470
1471            // Per-instrument data sources
1472            let btc_md = notify.register_source();
1473            let eth_md = notify.register_source();
1474            let sol_md = notify.register_source();
1475            // Shared source
1476            let positions = notify.register_source();
1477
1478            // BTC reactor: subscribes to btc_md + positions
1479            notify
1480                .register(
1481                    |t| Ctx {
1482                        _reactor_id: t,
1483                        instrument: "BTC",
1484                    },
1485                    step,
1486                    reg,
1487                )
1488                .subscribe(btc_md)
1489                .subscribe(positions);
1490
1491            // ETH reactor: subscribes to eth_md + positions
1492            notify
1493                .register(
1494                    |t| Ctx {
1495                        _reactor_id: t,
1496                        instrument: "ETH",
1497                    },
1498                    step,
1499                    reg,
1500                )
1501                .subscribe(eth_md)
1502                .subscribe(positions);
1503
1504            // SOL reactor: subscribes to sol_md + positions
1505            notify
1506                .register(
1507                    |t| Ctx {
1508                        _reactor_id: t,
1509                        instrument: "SOL",
1510                    },
1511                    step,
1512                    reg,
1513                )
1514                .subscribe(sol_md)
1515                .subscribe(positions);
1516
1517            (btc_md, eth_md, sol_md, positions)
1518        };
1519
1520        // Only BTC data changes — only BTC reactor wakes
1521        notify_mut(&world, nid).mark(btc_md);
1522        system.dispatch(&mut world);
1523        assert_eq!(*world.resource::<u64>(), 100);
1524
1525        // Position update — ALL reactors wake (shared source), deduped
1526        notify_mut(&world, nid).mark(positions);
1527        system.dispatch(&mut world);
1528        assert_eq!(*world.resource::<u64>(), 211); // 100 + 100 + 10 + 1
1529
1530        // BTC + ETH data change in same frame — both reactors wake once
1531        notify_mut(&world, nid).mark(btc_md);
1532        notify_mut(&world, nid).mark(eth_md);
1533        system.dispatch(&mut world);
1534        assert_eq!(*world.resource::<u64>(), 321); // 211 + 100 + 10
1535
1536        // BTC data + position in same frame — BTC reactor wakes ONCE (dedup)
1537        notify_mut(&world, nid).mark(btc_md);
1538        notify_mut(&world, nid).mark(positions);
1539        system.dispatch(&mut world);
1540        // BTC: 100 (deduped, only once), ETH: 10, SOL: 1
1541        assert_eq!(*world.resource::<u64>(), 432); // 321 + 100 + 10 + 1
1542
1543        // Nothing fires — no reactors wake
1544        let ran = system.dispatch(&mut world);
1545        assert!(!ran);
1546        assert_eq!(*world.resource::<u64>(), 432);
1547
1548        // SOL data only — only SOL reactor
1549        notify_mut(&world, nid).mark(sol_md);
1550        system.dispatch(&mut world);
1551        assert_eq!(*world.resource::<u64>(), 433);
1552    }
1553
1554    #[test]
1555    fn per_reactor_fill_routing() {
1556        // Pattern: each reactor gets its own DataSource for fill routing.
1557        // Wire protocol embeds the token index in the order client_id.
1558        // When a fill arrives, the handler marks the reactor's source directly.
1559        use std::collections::HashMap;
1560
1561        let mut wb = WorldBuilder::new();
1562        wb.register::<u64>(0);
1563        wb.register(ReactorNotify::new(8, 16));
1564        wb.register(DeferredRemovals::default());
1565        let mut world = wb.build();
1566        let nid = world.id::<ReactorNotify>();
1567        let mut system = ReactorSystem::new(&world);
1568
1569        struct Ctx {
1570            reactor_id: Token,
1571        }
1572
1573        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1574            *out += ctx.reactor_id.index() as u64 + 1;
1575        }
1576
1577        // Routing table: token index → reactor's fill source
1578        let mut fill_sources: HashMap<usize, DataSource> = HashMap::new();
1579
1580        {
1581            let reg = world.registry();
1582            let notify = notify_mut(&world, nid);
1583
1584            for _ in 0..3 {
1585                // Each reactor gets its own fill source
1586                let fill_src = notify.register_source();
1587                let token = notify
1588                    .register(|t| Ctx { reactor_id: t }, step, reg)
1589                    .subscribe(fill_src)
1590                    .token();
1591
1592                fill_sources.insert(token.index(), fill_src);
1593            }
1594        }
1595
1596        // Simulate fill arriving for reactor 1 — look up its source by wire ID
1597        let wire_client_id: usize = 1;
1598        let fill_source = fill_sources[&wire_client_id];
1599        notify_mut(&world, nid).mark(fill_source);
1600        system.dispatch(&mut world);
1601        // Only reactor 1 ran: token.index()=1, so += 2
1602        assert_eq!(*world.resource::<u64>(), 2);
1603
1604        // Fill for reactor 0
1605        let fill_source = fill_sources[&0];
1606        notify_mut(&world, nid).mark(fill_source);
1607        system.dispatch(&mut world);
1608        // Reactor 0: token.index()=0, += 1
1609        assert_eq!(*world.resource::<u64>(), 3);
1610    }
1611
1612    #[test]
1613    fn dynamic_source_registration() {
1614        // Pattern: data sources added at runtime when new instruments
1615        // come online. Reactors registered and subscribed dynamically.
1616        let mut wb = WorldBuilder::new();
1617        wb.register::<u64>(0);
1618        wb.register(ReactorNotify::new(4, 8));
1619        wb.register(DeferredRemovals::default());
1620        let mut world = wb.build();
1621        let nid = world.id::<ReactorNotify>();
1622        let mut system = ReactorSystem::new(&world);
1623
1624        struct Ctx {
1625            _reactor_id: Token,
1626            value: u64,
1627        }
1628
1629        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1630            *out += ctx.value;
1631        }
1632
1633        // Start with just BTC
1634        let btc_md = {
1635            let reg = world.registry();
1636            let notify = notify_mut(&world, nid);
1637            let btc_md = notify.register_source();
1638            notify
1639                .register(
1640                    |t| Ctx {
1641                        _reactor_id: t,
1642                        value: 10,
1643                    },
1644                    step,
1645                    reg,
1646                )
1647                .subscribe(btc_md);
1648            btc_md
1649        };
1650
1651        notify_mut(&world, nid).mark(btc_md);
1652        system.dispatch(&mut world);
1653        assert_eq!(*world.resource::<u64>(), 10);
1654
1655        // Runtime: new instrument comes online — register source + reactor
1656        let eth_md = {
1657            let reg = world.registry();
1658            let notify = notify_mut(&world, nid);
1659            let eth_md = notify.register_source();
1660            notify
1661                .register(
1662                    |t| Ctx {
1663                        _reactor_id: t,
1664                        value: 100,
1665                    },
1666                    step,
1667                    reg,
1668                )
1669                .subscribe(eth_md);
1670            eth_md
1671        };
1672
1673        // Both instruments fire
1674        notify_mut(&world, nid).mark(btc_md);
1675        notify_mut(&world, nid).mark(eth_md);
1676        system.dispatch(&mut world);
1677        assert_eq!(*world.resource::<u64>(), 120); // 10 + 10 + 100
1678    }
1679
1680    // -- Source removal + slab reuse ------------------------------------------
1681
1682    #[test]
1683    fn remove_source_and_reuse_slot() {
1684        let mut wb = WorldBuilder::new();
1685        wb.register::<u64>(0);
1686        wb.register(ReactorNotify::new(4, 8));
1687        wb.register(DeferredRemovals::default());
1688        let mut world = wb.build();
1689        let nid = world.id::<ReactorNotify>();
1690        let mut system = ReactorSystem::new(&world);
1691
1692        struct Ctx {
1693            _reactor_id: Token,
1694            value: u64,
1695        }
1696
1697        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1698            *out += ctx.value;
1699        }
1700
1701        // Register two sources
1702        let (src_a, src_b) = {
1703            let reg = world.registry();
1704            let notify = notify_mut(&world, nid);
1705            let src_a = notify.register_source();
1706            let src_b = notify.register_source();
1707            notify
1708                .register(
1709                    |t| Ctx {
1710                        _reactor_id: t,
1711                        value: 10,
1712                    },
1713                    step,
1714                    reg,
1715                )
1716                .subscribe(src_a);
1717            notify
1718                .register(
1719                    |t| Ctx {
1720                        _reactor_id: t,
1721                        value: 100,
1722                    },
1723                    step,
1724                    reg,
1725                )
1726                .subscribe(src_b);
1727            (src_a, src_b)
1728        };
1729
1730        // Remove source A
1731        notify_mut(&world, nid).remove_source(src_a);
1732
1733        // Marking removed source is a no-op
1734        notify_mut(&world, nid).mark(src_a);
1735        let ran = system.dispatch(&mut world);
1736        assert!(!ran);
1737
1738        // Source B still works
1739        notify_mut(&world, nid).mark(src_b);
1740        system.dispatch(&mut world);
1741        assert_eq!(*world.resource::<u64>(), 100);
1742
1743        // Register a new source — should reuse slot 0
1744        let src_c = notify_mut(&world, nid).register_source();
1745        assert_eq!(src_c.0, src_a.0); // slab reused the slot
1746
1747        // Subscribe reactor to new source and verify it works
1748        let reg = world.registry();
1749        let notify = notify_mut(&world, nid);
1750        notify
1751            .register(
1752                |t| Ctx {
1753                    _reactor_id: t,
1754                    value: 1,
1755                },
1756                step,
1757                reg,
1758            )
1759            .subscribe(src_c);
1760
1761        notify_mut(&world, nid).mark(src_c);
1762        system.dispatch(&mut world);
1763        assert_eq!(*world.resource::<u64>(), 101); // 100 + 1
1764    }
1765
1766    // -- SourceRegistry -------------------------------------------------------
1767
1768    #[test]
1769    fn source_registry_basic() {
1770        let mut registry = SourceRegistry::new();
1771
1772        #[derive(Hash, Eq, PartialEq, Debug)]
1773        struct InstrumentId(u32);
1774
1775        let src_a = DataSource(0);
1776        let src_b = DataSource(1);
1777
1778        registry.insert(InstrumentId(1), src_a);
1779        registry.insert(InstrumentId(2), src_b);
1780
1781        assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1782        assert_eq!(registry.get(&InstrumentId(2)), Some(src_b));
1783        assert_eq!(registry.get(&InstrumentId(3)), None);
1784        assert!(registry.contains(&InstrumentId(1)));
1785        assert!(!registry.contains(&InstrumentId(3)));
1786    }
1787
1788    #[test]
1789    fn source_registry_multiple_key_types() {
1790        let mut registry = SourceRegistry::new();
1791
1792        #[derive(Hash, Eq, PartialEq)]
1793        struct InstrumentId(u32);
1794
1795        #[derive(Hash, Eq, PartialEq)]
1796        struct StrategyId(u32);
1797
1798        let src_a = DataSource(0);
1799        let src_b = DataSource(1);
1800
1801        // Same registry, different key types
1802        registry.insert(InstrumentId(1), src_a);
1803        registry.insert(StrategyId(1), src_b);
1804
1805        // Different type namespaces — same inner value (1), different results
1806        assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
1807        assert_eq!(registry.get(&StrategyId(1)), Some(src_b));
1808    }
1809
1810    #[test]
1811    fn source_registry_tuple_keys() {
1812        let mut registry = SourceRegistry::new();
1813
1814        let src = DataSource(42);
1815        registry.insert(("BTC", "Binance"), src);
1816
1817        assert_eq!(registry.get(&("BTC", "Binance")), Some(src));
1818        assert_eq!(registry.get(&("ETH", "Binance")), None);
1819    }
1820
1821    #[test]
1822    fn source_registry_remove() {
1823        let mut registry = SourceRegistry::new();
1824
1825        let src = DataSource(0);
1826        registry.insert(42u64, src);
1827
1828        assert_eq!(registry.remove(&42u64), Some(src));
1829        assert_eq!(registry.get(&42u64), None);
1830        assert_eq!(registry.remove(&42u64), None); // already gone
1831    }
1832
1833    #[test]
1834    fn source_registry_integrated_with_reactor_system() {
1835        let mut wb = WorldBuilder::new();
1836        wb.register::<u64>(0);
1837        wb.register(ReactorNotify::new(4, 8));
1838        wb.register(DeferredRemovals::default());
1839        wb.register(SourceRegistry::new());
1840        let mut world = wb.build();
1841        let nid = world.id::<ReactorNotify>();
1842        let mut system = ReactorSystem::new(&world);
1843
1844        #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1845        struct Instrument(u32);
1846        const BTC: Instrument = Instrument(0);
1847        const ETH: Instrument = Instrument(1);
1848
1849        struct Ctx {
1850            _reactor_id: Token,
1851            value: u64,
1852        }
1853
1854        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1855            *out += ctx.value;
1856        }
1857
1858        // Setup: register sources and map to natural keys
1859        let btc_src = notify_mut(&world, nid).register_source();
1860        let eth_src = notify_mut(&world, nid).register_source();
1861
1862        world.resource_mut::<SourceRegistry>().insert(BTC, btc_src);
1863        world.resource_mut::<SourceRegistry>().insert(ETH, eth_src);
1864
1865        // Register reactors using natural key lookup
1866        {
1867            let reg = world.registry();
1868            let btc = world.resource::<SourceRegistry>().get(&BTC).unwrap();
1869            let notify = notify_mut(&world, nid);
1870            notify
1871                .register(
1872                    |t| Ctx {
1873                        _reactor_id: t,
1874                        value: 10,
1875                    },
1876                    step,
1877                    reg,
1878                )
1879                .subscribe(btc);
1880        }
1881
1882        // Mark via pre-resolved DataSource
1883        notify_mut(&world, nid).mark(btc_src);
1884        system.dispatch(&mut world);
1885        assert_eq!(*world.resource::<u64>(), 10);
1886
1887        // Delist BTC — remove from both registry and notify
1888        let removed = world.resource_mut::<SourceRegistry>().remove(&BTC);
1889        assert!(removed.is_some());
1890        notify_mut(&world, nid).remove_source(removed.unwrap());
1891
1892        // Marking BTC is now a no-op
1893        notify_mut(&world, nid).mark(btc_src);
1894        let ran = system.dispatch(&mut world);
1895        assert!(!ran);
1896    }
1897
1898    // -- SourceRegistry edge cases --------------------------------------------
1899
1900    #[test]
1901    fn source_registry_overwrite_key() {
1902        let mut registry = SourceRegistry::new();
1903        let src_a = DataSource(0);
1904        let src_b = DataSource(1);
1905
1906        registry.insert(42u32, src_a);
1907        assert_eq!(registry.get(&42u32), Some(src_a));
1908
1909        // Overwrite same key with different source
1910        registry.insert(42u32, src_b);
1911        assert_eq!(registry.get(&42u32), Some(src_b));
1912    }
1913
1914    #[test]
1915    fn source_registry_empty_get() {
1916        let registry = SourceRegistry::new();
1917        // No key type has ever been registered
1918        assert_eq!(registry.get(&42u32), None);
1919        assert!(!registry.contains(&42u32));
1920    }
1921
1922    #[test]
1923    fn source_registry_enum_keys() {
1924        #[derive(Hash, Eq, PartialEq)]
1925        enum Venue {
1926            Binance,
1927            Coinbase,
1928        }
1929
1930        let mut registry = SourceRegistry::new();
1931        let src = DataSource(0);
1932        registry.insert(Venue::Binance, src);
1933
1934        assert_eq!(registry.get(&Venue::Binance), Some(src));
1935        assert_eq!(registry.get(&Venue::Coinbase), None);
1936    }
1937
1938    #[test]
1939    fn source_registry_composite_key() {
1940        // (Strategy, Instrument, Venue) triple as key
1941        #[derive(Hash, Eq, PartialEq)]
1942        struct StrategyId(u32);
1943        #[derive(Hash, Eq, PartialEq)]
1944        struct InstrumentId(u32);
1945        #[derive(Hash, Eq, PartialEq)]
1946        struct VenueId(u32);
1947
1948        let mut registry = SourceRegistry::new();
1949        let src = DataSource(5);
1950        registry.insert((StrategyId(1), InstrumentId(0), VenueId(2)), src);
1951
1952        assert_eq!(
1953            registry.get(&(StrategyId(1), InstrumentId(0), VenueId(2))),
1954            Some(src)
1955        );
1956        // Different strategy
1957        assert_eq!(
1958            registry.get(&(StrategyId(2), InstrumentId(0), VenueId(2))),
1959            None
1960        );
1961    }
1962
1963    // -- Full lifecycle scenarios ---------------------------------------------
1964
1965    #[test]
1966    fn full_lifecycle_add_trade_remove() {
1967        // Simulates: add instrument → reactors trade → delist → cleanup
1968        let mut wb = WorldBuilder::new();
1969        wb.register::<u64>(0);
1970        wb.register(ReactorNotify::new(4, 8));
1971        wb.register(DeferredRemovals::default());
1972        wb.register(SourceRegistry::new());
1973        let mut world = wb.build();
1974        let nid = world.id::<ReactorNotify>();
1975        let mut system = ReactorSystem::new(&world);
1976
1977        #[derive(Hash, Eq, PartialEq, Clone, Copy)]
1978        struct Instrument(u32);
1979
1980        struct Ctx {
1981            _reactor_id: Token,
1982            value: u64,
1983        }
1984
1985        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
1986            *out += ctx.value;
1987        }
1988
1989        // Phase 1: Add BTC
1990        let btc_src = notify_mut(&world, nid).register_source();
1991        world
1992            .resource_mut::<SourceRegistry>()
1993            .insert(Instrument(0), btc_src);
1994
1995        {
1996            let reg = world.registry();
1997            let notify = notify_mut(&world, nid);
1998            notify
1999                .register(
2000                    |t| Ctx {
2001                        _reactor_id: t,
2002                        value: 10,
2003                    },
2004                    step,
2005                    reg,
2006                )
2007                .subscribe(btc_src);
2008        }
2009
2010        // Phase 2: Trade
2011        notify_mut(&world, nid).mark(btc_src);
2012        system.dispatch(&mut world);
2013        assert_eq!(*world.resource::<u64>(), 10);
2014
2015        // Phase 3: Add ETH dynamically
2016        let eth_src = notify_mut(&world, nid).register_source();
2017        world
2018            .resource_mut::<SourceRegistry>()
2019            .insert(Instrument(1), eth_src);
2020
2021        {
2022            let reg = world.registry();
2023            let notify = notify_mut(&world, nid);
2024            notify
2025                .register(
2026                    |t| Ctx {
2027                        _reactor_id: t,
2028                        value: 100,
2029                    },
2030                    step,
2031                    reg,
2032                )
2033                .subscribe(eth_src);
2034        }
2035
2036        // Both trade
2037        notify_mut(&world, nid).mark(btc_src);
2038        notify_mut(&world, nid).mark(eth_src);
2039        system.dispatch(&mut world);
2040        assert_eq!(*world.resource::<u64>(), 120);
2041
2042        // Phase 4: Delist BTC
2043        let removed = world
2044            .resource_mut::<SourceRegistry>()
2045            .remove(&Instrument(0));
2046        notify_mut(&world, nid).remove_source(removed.unwrap());
2047
2048        // Only ETH remains
2049        notify_mut(&world, nid).mark(eth_src);
2050        system.dispatch(&mut world);
2051        assert_eq!(*world.resource::<u64>(), 220);
2052
2053        // Phase 5: Add SOL — reuses BTC's old slab slot
2054        let sol_src = notify_mut(&world, nid).register_source();
2055        world
2056            .resource_mut::<SourceRegistry>()
2057            .insert(Instrument(2), sol_src);
2058        assert_eq!(sol_src.0, btc_src.0); // slab reused
2059
2060        {
2061            let reg = world.registry();
2062            let notify = notify_mut(&world, nid);
2063            notify
2064                .register(
2065                    |t| Ctx {
2066                        _reactor_id: t,
2067                        value: 1000,
2068                    },
2069                    step,
2070                    reg,
2071                )
2072                .subscribe(sol_src);
2073        }
2074
2075        // SOL + ETH fire
2076        notify_mut(&world, nid).mark(sol_src);
2077        notify_mut(&world, nid).mark(eth_src);
2078        system.dispatch(&mut world);
2079        assert_eq!(*world.resource::<u64>(), 1320); // 220 + 1000 + 100
2080    }
2081
2082    #[test]
2083    fn multi_strategy_same_instrument() {
2084        // Two strategies on the same instrument with different data sources
2085        let mut wb = WorldBuilder::new();
2086        wb.register::<u64>(0);
2087        wb.register(ReactorNotify::new(8, 16));
2088        wb.register(DeferredRemovals::default());
2089        wb.register(SourceRegistry::new());
2090        let mut world = wb.build();
2091        let nid = world.id::<ReactorNotify>();
2092        let mut system = ReactorSystem::new(&world);
2093
2094        #[derive(Hash, Eq, PartialEq, Clone, Copy)]
2095        struct StrategyInstrument(&'static str, &'static str);
2096
2097        struct Ctx {
2098            _reactor_id: Token,
2099            value: u64,
2100        }
2101
2102        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2103            *out += ctx.value;
2104        }
2105
2106        // Per-strategy+instrument sources
2107        let reg = world.registry();
2108        let notify = notify_mut(&world, nid);
2109
2110        let mm_btc = notify.register_source();
2111        let mm_eth = notify.register_source();
2112        let arb_btc = notify.register_source();
2113
2114        // Market maker on BTC and ETH
2115        notify
2116            .register(
2117                |t| Ctx {
2118                    _reactor_id: t,
2119                    value: 1,
2120                },
2121                step,
2122                reg,
2123            )
2124            .subscribe(mm_btc);
2125        notify
2126            .register(
2127                |t| Ctx {
2128                    _reactor_id: t,
2129                    value: 2,
2130                },
2131                step,
2132                reg,
2133            )
2134            .subscribe(mm_eth);
2135
2136        // Arb strategy on BTC
2137        notify
2138            .register(
2139                |t| Ctx {
2140                    _reactor_id: t,
2141                    value: 100,
2142                },
2143                step,
2144                reg,
2145            )
2146            .subscribe(arb_btc);
2147
2148        // Map composite keys
2149        world
2150            .resource_mut::<SourceRegistry>()
2151            .insert(StrategyInstrument("MM", "BTC"), mm_btc);
2152        world
2153            .resource_mut::<SourceRegistry>()
2154            .insert(StrategyInstrument("MM", "ETH"), mm_eth);
2155        world
2156            .resource_mut::<SourceRegistry>()
2157            .insert(StrategyInstrument("ARB", "BTC"), arb_btc);
2158
2159        // BTC data arrives — both MM-BTC and ARB-BTC should fire
2160        // But they're separate sources, so handler marks both:
2161        let mm_btc_src = world
2162            .resource::<SourceRegistry>()
2163            .get(&StrategyInstrument("MM", "BTC"))
2164            .unwrap();
2165        let arb_btc_src = world
2166            .resource::<SourceRegistry>()
2167            .get(&StrategyInstrument("ARB", "BTC"))
2168            .unwrap();
2169
2170        notify_mut(&world, nid).mark(mm_btc_src);
2171        notify_mut(&world, nid).mark(arb_btc_src);
2172        system.dispatch(&mut world);
2173        assert_eq!(*world.resource::<u64>(), 101); // 1 + 100
2174    }
2175
2176    #[test]
2177    fn reactor_self_removal_with_registry_cleanup() {
2178        // Reactor deregisters itself AND the handler cleans up the source
2179        let mut wb = WorldBuilder::new();
2180        wb.register::<u64>(0);
2181        wb.register(ReactorNotify::new(4, 8));
2182        wb.register(DeferredRemovals::default());
2183        wb.register(SourceRegistry::new());
2184        let mut world = wb.build();
2185        let nid = world.id::<ReactorNotify>();
2186        let mut system = ReactorSystem::new(&world);
2187
2188        struct Ctx {
2189            reactor_id: Token,
2190        }
2191
2192        fn one_shot(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
2193            *out += 1;
2194            removals.deregister(ctx.reactor_id);
2195        }
2196
2197        let src = notify_mut(&world, nid).register_source();
2198        world
2199            .resource_mut::<SourceRegistry>()
2200            .insert("one-shot", src);
2201
2202        {
2203            let reg = world.registry();
2204            let notify = notify_mut(&world, nid);
2205            notify
2206                .register(|t| Ctx { reactor_id: t }, one_shot, reg)
2207                .subscribe(src);
2208        }
2209
2210        // Reactor runs once and removes itself
2211        notify_mut(&world, nid).mark(src);
2212        system.dispatch(&mut world);
2213        assert_eq!(*world.resource::<u64>(), 1);
2214        assert_eq!(system.reactor_count(&world), 0);
2215
2216        // Source still exists in registry but no reactors subscribe
2217        assert!(world.resource::<SourceRegistry>().contains(&"one-shot"));
2218
2219        // Mark again — no reactors wake
2220        notify_mut(&world, nid).mark(src);
2221        let ran = system.dispatch(&mut world);
2222        assert!(!ran);
2223    }
2224
2225    #[test]
2226    fn many_reactors_same_source() {
2227        // 50 reactors all subscribed to one source — all wake, deduped
2228        let mut wb = WorldBuilder::new();
2229        wb.register::<u64>(0);
2230        wb.register(ReactorNotify::new(4, 64));
2231        wb.register(DeferredRemovals::default());
2232        let mut world = wb.build();
2233        let nid = world.id::<ReactorNotify>();
2234        let mut system = ReactorSystem::new(&world);
2235
2236        struct Ctx {
2237            _reactor_id: Token,
2238        }
2239
2240        fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2241            *out += 1;
2242        }
2243
2244        let src = notify_mut(&world, nid).register_source();
2245
2246        {
2247            let reg = world.registry();
2248            let notify = notify_mut(&world, nid);
2249            for _ in 0..50 {
2250                notify
2251                    .register(|t| Ctx { _reactor_id: t }, step, reg)
2252                    .subscribe(src);
2253            }
2254        }
2255
2256        assert_eq!(system.reactor_count(&world), 50);
2257
2258        notify_mut(&world, nid).mark(src);
2259        system.dispatch(&mut world);
2260        assert_eq!(*world.resource::<u64>(), 50); // all 50 ran exactly once
2261    }
2262
2263    #[test]
2264    fn reactor_subscribes_to_multiple_sources() {
2265        // One reactor subscribed to 5 different sources.
2266        // All 5 fire in one frame — reactor runs exactly once.
2267        let mut wb = WorldBuilder::new();
2268        wb.register::<u64>(0);
2269        wb.register(ReactorNotify::new(8, 8));
2270        wb.register(DeferredRemovals::default());
2271        let mut world = wb.build();
2272        let nid = world.id::<ReactorNotify>();
2273        let mut system = ReactorSystem::new(&world);
2274
2275        struct Ctx {
2276            _reactor_id: Token,
2277        }
2278
2279        fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
2280            *out += 1;
2281        }
2282
2283        let mut sources = Vec::new();
2284        let notify = notify_mut(&world, nid);
2285        for _ in 0..5 {
2286            sources.push(notify.register_source());
2287        }
2288
2289        {
2290            let reg = world.registry();
2291            let notify = notify_mut(&world, nid);
2292            let mut registration = notify.register(|t| Ctx { _reactor_id: t }, step, reg);
2293            for &src in &sources {
2294                registration = registration.subscribe(src);
2295            }
2296        }
2297
2298        // Mark all 5 sources
2299        for &src in &sources {
2300            notify_mut(&world, nid).mark(src);
2301        }
2302
2303        system.dispatch(&mut world);
2304        assert_eq!(*world.resource::<u64>(), 1); // ONE run despite 5 sources
2305    }
2306
2307    #[test]
2308    fn stale_data_source_is_noop() {
2309        // After removing a source, marking it must not panic
2310        let mut wb = WorldBuilder::new();
2311        wb.register(ReactorNotify::new(4, 4));
2312        wb.register(DeferredRemovals::default());
2313        let mut world = wb.build();
2314        let nid = world.id::<ReactorNotify>();
2315        let mut system = ReactorSystem::new(&world);
2316
2317        let src = notify_mut(&world, nid).register_source();
2318        notify_mut(&world, nid).remove_source(src);
2319
2320        // Must not panic
2321        notify_mut(&world, nid).mark(src);
2322        let ran = system.dispatch(&mut world);
2323        assert!(!ran);
2324    }
2325
2326    #[test]
2327    fn double_remove_source_is_noop() {
2328        let mut notify = ReactorNotify::new(4, 4);
2329        let src = notify.register_source();
2330        notify.remove_source(src);
2331        notify.remove_source(src); // must not panic
2332    }
2333
2334    // -- PipelineReactor: reactor body is a CtxPipeline ----------------------------
2335
2336    #[test]
2337    fn pipeline_reactor_dispatch() {
2338        use crate::CtxPipelineBuilder;
2339
2340        let mut wb = WorldBuilder::new();
2341        wb.register::<u64>(0);
2342        wb.register(ReactorNotify::new(4, 8));
2343        wb.register(DeferredRemovals::default());
2344        let mut world = wb.build();
2345        let nid = world.id::<ReactorNotify>();
2346        let mut system = ReactorSystem::new(&world);
2347
2348        struct Ctx {
2349            _reactor_id: Token,
2350            instrument: &'static str,
2351        }
2352
2353        fn read_data(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2354            let _ = ctx.instrument;
2355            *val
2356        }
2357
2358        fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2359            x * 2
2360        }
2361
2362        fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2363            *out = x;
2364        }
2365
2366        let reg = world.registry();
2367
2368        let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2369            .then(read_data, reg)
2370            .then(double, reg)
2371            .then(store, reg)
2372            .build();
2373
2374        let notify = notify_mut(&world, nid);
2375        let src = notify.register_source();
2376
2377        // Wrap pipeline in PipelineReactor
2378        let reactor = PipelineReactor::new(
2379            Ctx {
2380                _reactor_id: Token::new(0),
2381                instrument: "BTC",
2382            },
2383            pipeline,
2384        );
2385        notify.register_built(reactor).subscribe(src);
2386
2387        // Set initial value and dispatch
2388        *world.resource_mut::<u64>() = 10;
2389        notify_mut(&world, nid).mark(src);
2390        system.dispatch(&mut world);
2391
2392        assert_eq!(*world.resource::<u64>(), 20); // 10 * 2
2393    }
2394
2395    #[test]
2396    fn dag_reactor_dispatch() {
2397        use crate::CtxDagBuilder;
2398
2399        let mut wb = WorldBuilder::new();
2400        wb.register::<u64>(0);
2401        wb.register(ReactorNotify::new(4, 8));
2402        wb.register(DeferredRemovals::default());
2403        let mut world = wb.build();
2404        let nid = world.id::<ReactorNotify>();
2405        let mut system = ReactorSystem::new(&world);
2406
2407        struct Ctx {
2408            _reactor_id: Token,
2409        }
2410
2411        fn root(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2412            let _ = ctx;
2413            *val
2414        }
2415
2416        fn arm_double(_ctx: &mut Ctx, val: &u64) -> u64 {
2417            *val * 2
2418        }
2419
2420        fn arm_add(_ctx: &mut Ctx, val: &u64) -> u64 {
2421            *val + 10
2422        }
2423
2424        fn merge(_ctx: &mut Ctx, mut out: ResMut<u64>, a: &u64, b: &u64) {
2425            *out = *a + *b;
2426        }
2427
2428        let reg = world.registry();
2429
2430        let dag = CtxDagBuilder::<Ctx, ()>::new()
2431            .root(root, reg)
2432            .fork()
2433            .arm(|seed| seed.then(arm_double, reg))
2434            .arm(|seed| seed.then(arm_add, reg))
2435            .merge(merge, reg)
2436            .build();
2437
2438        let notify = notify_mut(&world, nid);
2439        let src = notify.register_source();
2440
2441        let reactor = PipelineReactor::new(
2442            Ctx {
2443                _reactor_id: Token::new(0),
2444            },
2445            dag,
2446        );
2447        notify.register_built(reactor).subscribe(src);
2448
2449        *world.resource_mut::<u64>() = 5;
2450        notify_mut(&world, nid).mark(src);
2451        system.dispatch(&mut world);
2452
2453        // (5 * 2) + (5 + 10) = 10 + 15 = 25
2454        assert_eq!(*world.resource::<u64>(), 25);
2455    }
2456
2457    #[test]
2458    fn multiple_pipeline_reactors_different_bodies() {
2459        use crate::CtxPipelineBuilder;
2460
2461        let mut wb = WorldBuilder::new();
2462        wb.register::<u64>(0);
2463        wb.register(ReactorNotify::new(4, 8));
2464        wb.register(DeferredRemovals::default());
2465        let mut world = wb.build();
2466        let nid = world.id::<ReactorNotify>();
2467        let mut system = ReactorSystem::new(&world);
2468
2469        struct Ctx {
2470            _reactor_id: Token,
2471            factor: u64,
2472        }
2473
2474        fn multiply(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2475            *val * ctx.factor
2476        }
2477
2478        fn accumulate(_ctx: &mut Ctx, mut out: ResMut<u64>, val: u64) {
2479            *out += val;
2480        }
2481
2482        let reg = world.registry();
2483
2484        // Reactor A: multiply by 2
2485        let pipeline_a = CtxPipelineBuilder::<Ctx, ()>::new()
2486            .then(multiply, reg)
2487            .then(accumulate, reg)
2488            .build();
2489
2490        // Reactor B: multiply by 10
2491        let pipeline_b = CtxPipelineBuilder::<Ctx, ()>::new()
2492            .then(multiply, reg)
2493            .then(accumulate, reg)
2494            .build();
2495
2496        let notify = notify_mut(&world, nid);
2497        let src = notify.register_source();
2498
2499        notify
2500            .register_built(PipelineReactor::new(
2501                Ctx {
2502                    _reactor_id: Token::new(0),
2503                    factor: 2,
2504                },
2505                pipeline_a,
2506            ))
2507            .subscribe(src);
2508
2509        notify
2510            .register_built(PipelineReactor::new(
2511                Ctx {
2512                    _reactor_id: Token::new(1),
2513                    factor: 10,
2514                },
2515                pipeline_b,
2516            ))
2517            .subscribe(src);
2518
2519        *world.resource_mut::<u64>() = 5;
2520        notify_mut(&world, nid).mark(src);
2521        system.dispatch(&mut world);
2522
2523        // Reactor A reads 5, adds 5*2=10, so resource=15
2524        // Reactor B reads 15, adds 15*10=150, so resource=165
2525        // (Order depends on dispatch order — both subscribed to same source)
2526        // The value is order-dependent. Just verify both ran:
2527        let val = *world.resource::<u64>();
2528        assert!(val > 5, "both reactors should have run, got {val}");
2529    }
2530
2531    #[test]
2532    fn pipeline_reactor_with_guard() {
2533        use crate::CtxPipelineBuilder;
2534
2535        let mut wb = WorldBuilder::new();
2536        wb.register::<u64>(0);
2537        wb.register(ReactorNotify::new(4, 8));
2538        wb.register(DeferredRemovals::default());
2539        let mut world = wb.build();
2540        let nid = world.id::<ReactorNotify>();
2541        let mut system = ReactorSystem::new(&world);
2542
2543        struct Ctx {
2544            _reactor_id: Token,
2545            threshold: u64,
2546        }
2547
2548        fn read(_ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
2549            *val
2550        }
2551
2552        fn above_threshold(ctx: &mut Ctx, val: &u64) -> bool {
2553            *val > ctx.threshold
2554        }
2555
2556        fn write(_ctx: &mut Ctx, mut out: ResMut<u64>, _val: u64) {
2557            *out = 999;
2558        }
2559
2560        let reg = world.registry();
2561
2562        let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2563            .then(read, reg)
2564            .guard(above_threshold, reg)
2565            .map(write, reg)
2566            .build();
2567
2568        let notify = notify_mut(&world, nid);
2569        let src = notify.register_source();
2570
2571        notify
2572            .register_built(PipelineReactor::new(
2573                Ctx {
2574                    _reactor_id: Token::new(0),
2575                    threshold: 10,
2576                },
2577                pipeline,
2578            ))
2579            .subscribe(src);
2580
2581        // Value below threshold — guard blocks
2582        *world.resource_mut::<u64>() = 5;
2583        notify_mut(&world, nid).mark(src);
2584        system.dispatch(&mut world);
2585        assert_eq!(*world.resource::<u64>(), 5); // unchanged
2586
2587        // Value above threshold — guard passes
2588        *world.resource_mut::<u64>() = 20;
2589        notify_mut(&world, nid).mark(src);
2590        system.dispatch(&mut world);
2591        assert_eq!(*world.resource::<u64>(), 999);
2592    }
2593
2594    // -- Two-phase registration (safe API through World) ----------------------
2595
2596    #[test]
2597    fn two_phase_registration_safe_api() {
2598        // Demonstrates the safe API: create_reactor → into_reactor → insert
2599        // No unsafe, no registry borrow conflicts.
2600        let mut wb = WorldBuilder::new();
2601        wb.register::<u64>(0);
2602        wb.register(ReactorNotify::new(4, 8));
2603        wb.register(DeferredRemovals::default());
2604        let mut world = wb.build();
2605
2606        let mut system = ReactorSystem::new(&world);
2607
2608        struct Ctx {
2609            reactor_id: Token,
2610            instrument: &'static str,
2611        }
2612
2613        fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
2614            let _ = ctx.instrument;
2615            *out += ctx.reactor_id.index() as u64 + 1;
2616        }
2617
2618        // Phase 1: reserve slot
2619        let src = world.resource_mut::<ReactorNotify>().register_source();
2620        let token = world.resource_mut::<ReactorNotify>().create_reactor();
2621
2622        // Phase 2: build reactor with token + registry (no borrow conflict)
2623        let reactor = step.into_reactor(
2624            Ctx {
2625                reactor_id: token,
2626                instrument: "BTC",
2627            },
2628            world.registry(),
2629        );
2630
2631        // Phase 3: insert + subscribe
2632        world
2633            .resource_mut::<ReactorNotify>()
2634            .insert_reactor(token, reactor)
2635            .subscribe(src);
2636
2637        // Verify dispatch
2638        world.resource_mut::<ReactorNotify>().mark(src);
2639        system.dispatch(&mut world);
2640        assert_eq!(*world.resource::<u64>(), 1); // token index 0 + 1
2641
2642        // Second reactor — same pattern
2643        let token2 = world.resource_mut::<ReactorNotify>().create_reactor();
2644        let actor2 = step.into_reactor(
2645            Ctx {
2646                reactor_id: token2,
2647                instrument: "ETH",
2648            },
2649            world.registry(),
2650        );
2651        world
2652            .resource_mut::<ReactorNotify>()
2653            .insert_reactor(token2, actor2)
2654            .subscribe(src);
2655
2656        // Both reactors fire
2657        world.resource_mut::<ReactorNotify>().mark(src);
2658        system.dispatch(&mut world);
2659        assert_eq!(*world.resource::<u64>(), 4); // 1 + (0+1) + (1+1)
2660    }
2661
2662    #[test]
2663    fn two_phase_with_pipeline_reactor() {
2664        use crate::CtxPipelineBuilder;
2665
2666        let mut wb = WorldBuilder::new();
2667        wb.register::<u64>(0);
2668        wb.register(ReactorNotify::new(4, 8));
2669        wb.register(DeferredRemovals::default());
2670        let mut world = wb.build();
2671
2672        let mut system = ReactorSystem::new(&world);
2673
2674        struct Ctx {
2675            _reactor_id: Token,
2676        }
2677
2678        fn read(ctx: &mut Ctx, val: Res<u64>) -> u64 {
2679            let _ = ctx;
2680            *val
2681        }
2682
2683        fn double(_ctx: &mut Ctx, x: u64) -> u64 {
2684            x * 2
2685        }
2686
2687        fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
2688            *out = x;
2689        }
2690
2691        // Phase 1: reserve + register source
2692        let src = world.resource_mut::<ReactorNotify>().register_source();
2693        let token = world.resource_mut::<ReactorNotify>().create_reactor();
2694
2695        // Phase 2: build pipeline + wrap in PipelineReactor (needs registry)
2696        let reg = world.registry();
2697        let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
2698            .then(crate::no_event(read), reg)
2699            .then(double, reg)
2700            .then(store, reg)
2701            .build();
2702        let reactor = PipelineReactor::new(Ctx { _reactor_id: token }, pipeline);
2703
2704        // Phase 3: insert
2705        world
2706            .resource_mut::<ReactorNotify>()
2707            .insert_reactor(token, reactor)
2708            .subscribe(src);
2709
2710        *world.resource_mut::<u64>() = 10;
2711        world.resource_mut::<ReactorNotify>().mark(src);
2712        system.dispatch(&mut world);
2713        assert_eq!(*world.resource::<u64>(), 20); // 10 * 2
2714    }
2715}