Skip to main content

nexus_rt/
ctx_dag.rs

1// Builder return types are necessarily complex — each combinator returns
2// CtxDagChain<C, In, Out, NodeType<Chain, ...>>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4// Handler arity is architecturally required by the Param trait — handlers
5// take N typed parameters and the macro-generated dispatch impls expand
6// per-arity into call_inner functions with N + Input arguments. Module-level
7// allow rather than one inline attribute per arity expansion.
8#![allow(clippy::too_many_arguments)]
9
10//! Context-aware DAG dispatch.
11//!
12//! Mirrors the [`dag`](crate::dag) module but threads `&mut C`
13//! (per-instance context) through every step. Designed for use inside
14//! [`Callback`](crate::Callback) where each handler instance owns private
15//! state that pipeline steps need to read or mutate.
16//!
17//! # Step function convention
18//!
19//! Context first, then Params, then step input last:
20//!
21//! ```ignore
22//! fn update_book(ctx: &mut TradingCtx, book: ResMut<OrderBook>, msg: &Decoded) -> BookResult { .. }
23//! fn update_risk(ctx: &mut TradingCtx, risk: ResMut<RiskState>, msg: &Decoded) -> RiskResult { .. }
24//! fn check_and_submit(ctx: &mut TradingCtx, a: &BookResult, b: &RiskResult) { .. }
25//! ```
26//!
27//! # Integration with Callback
28//!
29//! The built [`CtxDag`] implements [`CtxStepCall`] — it takes
30//! `&mut C`, `&mut World`, and `In`, returning `Out`.
31//!
32//! To use a DAG from a [`Handler`](crate::Handler), create a normal
33//! [`Callback`](crate::Callback) whose handler function owns or accesses
34//! the context `C` and calls the DAG via its `run` method, passing
35//! `&mut C`, `&mut World`, and the handler input.
36//!
37//! # Three-tier step resolution
38//!
39//! Each combinator accepts functions via three tiers, matching the
40//! [`dag`](crate::dag) module:
41//!
42//! 1. **Named function with Params** — `fn(&mut C, Res<T>, &In) -> Out`
43//! 2. **Arity-0 closure** — `FnMut(&mut C, &In) -> Out`
44//! 3. **[`Opaque`](crate::Opaque) closure** — `FnMut(&mut C, &mut World, &In) -> Out`
45//!    (raw World access, no Param resolution)
46//!
47//! # Deferred combinators
48//!
49//! The following combinators from [`dag`](crate::dag) are not yet implemented:
50//! `scan`, `dedup`, `dispatch`, `route`, `tee`, `splat`, `cloned`,
51//! `not`/`and`/`or`/`xor` (bool), `ok_or_else`, `or_else`,
52//! `Result::unwrap_or_else`, `view`, and `BatchDag`. These can be added
53//! when a concrete use case requires them.
54//!
55//! # Examples
56//!
57//! ```ignore
58//! let dag = CtxDagBuilder::<MyCtx, WireMsg>::new()
59//!     .root(decode, &reg)
60//!     .fork()
61//!     .arm(|a| a.then(update_book, &reg))
62//!     .arm(|b| b.then(update_risk, &reg))
63//!     .merge(check_and_submit, &reg)
64//!     .build();
65//!
66//! dag.call(&mut ctx, &mut world, wire_msg);
67//! ```
68
69use std::marker::PhantomData;
70
71use crate::ctx_pipeline::{
72    CtxChainCall, CtxDiscardOptionNode, CtxFilterNode, CtxGuardNode, CtxIdentityNode,
73    CtxInspectErrNode, CtxInspectOptionNode, CtxInspectResultNode, CtxMapErrNode, CtxOkNode,
74    CtxOkOrNode, CtxOnNoneNode, CtxStepCall, CtxTapNode, CtxThenNode, CtxUnwrapOrElseOptionNode,
75    CtxUnwrapOrOptionNode, CtxUnwrapOrResultNode, IntoCtxProducer, IntoCtxRefStep, IntoCtxStep,
76};
77use crate::handler::Param;
78use crate::world::{Registry, World};
79
80// =============================================================================
81// CtxDagThenNode — DAG ref step (takes &Out, produces NewOut)
82// =============================================================================
83
84/// Chain node for DAG `.then()` — takes input by reference.
85#[doc(hidden)]
86pub struct CtxDagThenNode<Prev, S, NewOut> {
87    pub(crate) prev: Prev,
88    pub(crate) step: S,
89    pub(crate) _out: PhantomData<fn() -> NewOut>,
90}
91
92impl<C, In, Prev, S, NewOut: 'static> CtxChainCall<C, In> for CtxDagThenNode<Prev, S, NewOut>
93where
94    Prev: CtxChainCall<C, In>,
95    Prev::Out: 'static,
96    S: for<'a> CtxStepCall<C, &'a Prev::Out, Out = NewOut>,
97{
98    type Out = NewOut;
99    #[inline(always)]
100    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> NewOut {
101        let mid = self.prev.call(ctx, world, input);
102        self.step.call(ctx, world, &mid)
103    }
104}
105
106// =============================================================================
107// DAG Option/Result nodes that take inner by ref (matching dag.rs pattern)
108// =============================================================================
109
110/// Chain node for DAG `.map()` on `Option<T>` — takes `&T`.
111#[doc(hidden)]
112pub struct CtxDagMapOptionNode<Prev, S, U> {
113    pub(crate) prev: Prev,
114    pub(crate) step: S,
115    pub(crate) _out: PhantomData<fn() -> U>,
116}
117
118impl<C, In, T: 'static, Prev, S, U> CtxChainCall<C, In> for CtxDagMapOptionNode<Prev, S, U>
119where
120    Prev: CtxChainCall<C, In, Out = Option<T>>,
121    S: for<'a> CtxStepCall<C, &'a T, Out = U>,
122{
123    type Out = Option<U>;
124    #[inline(always)]
125    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<U> {
126        match self.prev.call(ctx, world, input) {
127            Some(val) => Some(self.step.call(ctx, world, &val)),
128            None => None,
129        }
130    }
131}
132
133/// Chain node for DAG `.and_then()` on `Option<T>` — takes `&T`.
134#[doc(hidden)]
135pub struct CtxDagAndThenOptionNode<Prev, S, U> {
136    pub(crate) prev: Prev,
137    pub(crate) step: S,
138    pub(crate) _out: PhantomData<fn() -> U>,
139}
140
141impl<C, In, T: 'static, Prev, S, U> CtxChainCall<C, In> for CtxDagAndThenOptionNode<Prev, S, U>
142where
143    Prev: CtxChainCall<C, In, Out = Option<T>>,
144    S: for<'a> CtxStepCall<C, &'a T, Out = Option<U>>,
145{
146    type Out = Option<U>;
147    #[inline(always)]
148    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<U> {
149        match self.prev.call(ctx, world, input) {
150            Some(val) => self.step.call(ctx, world, &val),
151            None => None,
152        }
153    }
154}
155
156/// Chain node for DAG `.map()` on `Result<T, E>` — takes `&T`.
157#[doc(hidden)]
158pub struct CtxDagMapResultNode<Prev, S, U> {
159    pub(crate) prev: Prev,
160    pub(crate) step: S,
161    pub(crate) _out: PhantomData<fn() -> U>,
162}
163
164impl<C, In, T: 'static, E, Prev, S, U> CtxChainCall<C, In> for CtxDagMapResultNode<Prev, S, U>
165where
166    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
167    S: for<'a> CtxStepCall<C, &'a T, Out = U>,
168{
169    type Out = Result<U, E>;
170    #[inline(always)]
171    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<U, E> {
172        match self.prev.call(ctx, world, input) {
173            Ok(val) => Ok(self.step.call(ctx, world, &val)),
174            Err(e) => Err(e),
175        }
176    }
177}
178
179/// Chain node for DAG `.and_then()` on `Result<T, E>` — takes `&T`.
180#[doc(hidden)]
181pub struct CtxDagAndThenResultNode<Prev, S, U> {
182    pub(crate) prev: Prev,
183    pub(crate) step: S,
184    pub(crate) _out: PhantomData<fn() -> U>,
185}
186
187impl<C, In, T: 'static, U, E, Prev, S> CtxChainCall<C, In> for CtxDagAndThenResultNode<Prev, S, U>
188where
189    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
190    S: for<'a> CtxStepCall<C, &'a T, Out = Result<U, E>>,
191{
192    type Out = Result<U, E>;
193    #[inline(always)]
194    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<U, E> {
195        match self.prev.call(ctx, world, input) {
196            Ok(val) => self.step.call(ctx, world, &val),
197            Err(e) => Err(e),
198        }
199    }
200}
201
202/// Chain node for DAG `.catch()` on `Result<T, E>` — takes `&E`.
203#[doc(hidden)]
204pub struct CtxDagCatchNode<Prev, S> {
205    pub(crate) prev: Prev,
206    pub(crate) step: S,
207}
208
209impl<C, In, T, E: 'static, Prev, S> CtxChainCall<C, In> for CtxDagCatchNode<Prev, S>
210where
211    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
212    S: for<'a> CtxStepCall<C, &'a E, Out = ()>,
213{
214    type Out = Option<T>;
215    #[inline(always)]
216    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
217        match self.prev.call(ctx, world, input) {
218            Ok(val) => Some(val),
219            Err(e) => {
220                self.step.call(ctx, world, &e);
221                None
222            }
223        }
224    }
225}
226
227// =============================================================================
228// CtxMergeStepCall / IntoCtxMergeStep — context-aware merge step dispatch
229// =============================================================================
230
231/// Callable trait for resolved context-aware merge steps.
232///
233/// Like [`MergeStepCall`](crate::dag::MergeStepCall) but with `&mut C` context.
234#[doc(hidden)]
235pub trait CtxMergeStepCall<C, Inputs> {
236    /// The output type of this merge step.
237    type Out;
238    /// Call this merge step with context, world, and input references.
239    fn call(&mut self, ctx: &mut C, world: &mut World, inputs: Inputs) -> Self::Out;
240}
241
242/// Converts a named function into a resolved context-aware merge step.
243///
244/// Context first, then Params, then N reference inputs:
245///
246/// ```ignore
247/// fn check(ctx: &mut Ctx, config: Res<Config>, ob: &ObResult, risk: &RiskResult) -> Decision { .. }
248/// ```
249#[doc(hidden)]
250#[diagnostic::on_unimplemented(
251    message = "this function cannot be used as a context-aware merge step",
252    note = "ctx merge steps: `fn(&mut C, Params..., &A, &B, ...) -> Out`",
253    note = "closures with resource parameters are not supported — use a named `fn`"
254)]
255pub trait IntoCtxMergeStep<C, Inputs, Out, Params> {
256    /// The concrete resolved merge step type.
257    type Step: CtxMergeStepCall<C, Inputs, Out = Out>;
258
259    /// Resolve Param state from the registry and produce a merge step.
260    fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step;
261}
262
263/// Internal: pre-resolved context-aware merge step with cached Param state.
264#[doc(hidden)]
265pub struct CtxMergeStep<F, Params: Param> {
266    f: F,
267    state: Params::State,
268    // Retained for future diagnostic/tracing use (step name in error messages).
269    #[allow(dead_code)]
270    name: &'static str,
271}
272
273// -- Merge arity 2, Param arity 0 (closures work) ---------------------------
274
275impl<C, A, B, Out, F> CtxMergeStepCall<C, (&A, &B)> for CtxMergeStep<F, ()>
276where
277    F: FnMut(&mut C, &A, &B) -> Out + 'static,
278{
279    type Out = Out;
280    #[inline(always)]
281    fn call(&mut self, ctx: &mut C, _world: &mut World, inputs: (&A, &B)) -> Out {
282        (self.f)(ctx, inputs.0, inputs.1)
283    }
284}
285
286impl<C, A, B, Out, F> IntoCtxMergeStep<C, (&A, &B), Out, ()> for F
287where
288    F: FnMut(&mut C, &A, &B) -> Out + 'static,
289{
290    type Step = CtxMergeStep<F, ()>;
291
292    fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
293        CtxMergeStep {
294            f: self,
295            state: <() as Param>::init(registry),
296            name: std::any::type_name::<F>(),
297        }
298    }
299}
300
301// -- Merge arity 2, Param arities 1-8 ---------------------------------------
302
303macro_rules! impl_ctx_merge2_step {
304    ($($P:ident),+) => {
305        impl<C, A, B, Out, F: 'static, $($P: Param + 'static),+>
306            CtxMergeStepCall<C, (&A, &B)> for CtxMergeStep<F, ($($P,)+)>
307        where
308            for<'a> &'a mut F:
309                FnMut(&mut C, $($P,)+ &A, &B) -> Out +
310                FnMut(&mut C, $($P::Item<'a>,)+ &A, &B) -> Out,
311        {
312            type Out = Out;
313            #[inline(always)]
314            #[allow(non_snake_case)]
315            fn call(&mut self, ctx: &mut C, world: &mut World, inputs: (&A, &B)) -> Out {
316                fn call_inner<Ctx, $($P,)+ IA, IB, Output>(
317                    mut f: impl FnMut(&mut Ctx, $($P,)+ &IA, &IB) -> Output,
318                    ctx: &mut Ctx,
319                    $($P: $P,)+
320                    a: &IA, b: &IB,
321                ) -> Output {
322                    f(ctx, $($P,)+ a, b)
323                }
324                #[cfg(debug_assertions)]
325                world.clear_borrows();
326                // SAFETY: Resource IDs in self.state were obtained from the same
327                // Registry that built this World. Borrows are disjoint — enforced
328                // by conflict detection at build time (check_access).
329                let ($($P,)+) = unsafe {
330                    <($($P,)+) as Param>::fetch(world, &mut self.state)
331                };
332                call_inner(&mut self.f, ctx, $($P,)+ inputs.0, inputs.1)
333            }
334        }
335
336        impl<C, A, B, Out, F: 'static, $($P: Param + 'static),+>
337            IntoCtxMergeStep<C, (&A, &B), Out, ($($P,)+)> for F
338        where
339            for<'a> &'a mut F:
340                FnMut(&mut C, $($P,)+ &A, &B) -> Out +
341                FnMut(&mut C, $($P::Item<'a>,)+ &A, &B) -> Out,
342        {
343            type Step = CtxMergeStep<F, ($($P,)+)>;
344
345            fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
346                let state = <($($P,)+) as Param>::init(registry);
347                {
348                    #[allow(non_snake_case)]
349                    let ($($P,)+) = &state;
350                    registry.check_access(&[
351                        $((<$P as Param>::resource_id($P),
352                           std::any::type_name::<$P>()),)+
353                    ]);
354                }
355                CtxMergeStep { f: self, state, name: std::any::type_name::<F>() }
356            }
357        }
358    };
359}
360
361// -- Merge arity 3, Param arity 0 -------------------------------------------
362
363impl<Ctx, IA, IB, IC, Out, F> CtxMergeStepCall<Ctx, (&IA, &IB, &IC)> for CtxMergeStep<F, ()>
364where
365    F: FnMut(&mut Ctx, &IA, &IB, &IC) -> Out + 'static,
366{
367    type Out = Out;
368    #[inline(always)]
369    fn call(&mut self, ctx: &mut Ctx, _world: &mut World, inputs: (&IA, &IB, &IC)) -> Out {
370        (self.f)(ctx, inputs.0, inputs.1, inputs.2)
371    }
372}
373
374impl<Ctx, IA, IB, IC, Out, F> IntoCtxMergeStep<Ctx, (&IA, &IB, &IC), Out, ()> for F
375where
376    F: FnMut(&mut Ctx, &IA, &IB, &IC) -> Out + 'static,
377{
378    type Step = CtxMergeStep<F, ()>;
379
380    fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
381        CtxMergeStep {
382            f: self,
383            state: <() as Param>::init(registry),
384            name: std::any::type_name::<F>(),
385        }
386    }
387}
388
389// -- Merge arity 3, Param arities 1-8 ---------------------------------------
390
391macro_rules! impl_ctx_merge3_step {
392    ($($P:ident),+) => {
393        impl<Ctx, A, B, C, Out, F: 'static, $($P: Param + 'static),+>
394            CtxMergeStepCall<Ctx, (&A, &B, &C)> for CtxMergeStep<F, ($($P,)+)>
395        where
396            for<'a> &'a mut F:
397                FnMut(&mut Ctx, $($P,)+ &A, &B, &C) -> Out +
398                FnMut(&mut Ctx, $($P::Item<'a>,)+ &A, &B, &C) -> Out,
399        {
400            type Out = Out;
401            #[inline(always)]
402            #[allow(non_snake_case)]
403            fn call(&mut self, ctx: &mut Ctx, world: &mut World, inputs: (&A, &B, &C)) -> Out {
404                fn call_inner<Cx, $($P,)+ IA, IB, IC, Output>(
405                    mut f: impl FnMut(&mut Cx, $($P,)+ &IA, &IB, &IC) -> Output,
406                    ctx: &mut Cx,
407                    $($P: $P,)+
408                    a: &IA, b: &IB, c: &IC,
409                ) -> Output {
410                    f(ctx, $($P,)+ a, b, c)
411                }
412                #[cfg(debug_assertions)]
413                world.clear_borrows();
414                // SAFETY: Resource IDs in self.state were obtained from the same
415                // Registry that built this World. Borrows are disjoint — enforced
416                // by conflict detection at build time (check_access).
417                let ($($P,)+) = unsafe {
418                    <($($P,)+) as Param>::fetch(world, &mut self.state)
419                };
420                call_inner(&mut self.f, ctx, $($P,)+ inputs.0, inputs.1, inputs.2)
421            }
422        }
423
424        impl<Ctx, A, B, C, Out, F: 'static, $($P: Param + 'static),+>
425            IntoCtxMergeStep<Ctx, (&A, &B, &C), Out, ($($P,)+)> for F
426        where
427            for<'a> &'a mut F:
428                FnMut(&mut Ctx, $($P,)+ &A, &B, &C) -> Out +
429                FnMut(&mut Ctx, $($P::Item<'a>,)+ &A, &B, &C) -> Out,
430        {
431            type Step = CtxMergeStep<F, ($($P,)+)>;
432
433            fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
434                let state = <($($P,)+) as Param>::init(registry);
435                {
436                    #[allow(non_snake_case)]
437                    let ($($P,)+) = &state;
438                    registry.check_access(&[
439                        $((<$P as Param>::resource_id($P),
440                           std::any::type_name::<$P>()),)+
441                    ]);
442                }
443                CtxMergeStep { f: self, state, name: std::any::type_name::<F>() }
444            }
445        }
446    };
447}
448
449// -- Merge arity 4, Param arity 0 -------------------------------------------
450
451impl<Ctx, IA, IB, IC, ID, Out, F> CtxMergeStepCall<Ctx, (&IA, &IB, &IC, &ID)>
452    for CtxMergeStep<F, ()>
453where
454    F: FnMut(&mut Ctx, &IA, &IB, &IC, &ID) -> Out + 'static,
455{
456    type Out = Out;
457    #[inline(always)]
458    fn call(&mut self, ctx: &mut Ctx, _world: &mut World, inputs: (&IA, &IB, &IC, &ID)) -> Out {
459        (self.f)(ctx, inputs.0, inputs.1, inputs.2, inputs.3)
460    }
461}
462
463impl<Ctx, IA, IB, IC, ID, Out, F> IntoCtxMergeStep<Ctx, (&IA, &IB, &IC, &ID), Out, ()> for F
464where
465    F: FnMut(&mut Ctx, &IA, &IB, &IC, &ID) -> Out + 'static,
466{
467    type Step = CtxMergeStep<F, ()>;
468
469    fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
470        CtxMergeStep {
471            f: self,
472            state: <() as Param>::init(registry),
473            name: std::any::type_name::<F>(),
474        }
475    }
476}
477
478// -- Merge arity 4, Param arities 1-8 ---------------------------------------
479
480macro_rules! impl_ctx_merge4_step {
481    ($($P:ident),+) => {
482        #[allow(clippy::many_single_char_names)]
483        impl<Ctx, A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
484            CtxMergeStepCall<Ctx, (&A, &B, &C, &D)> for CtxMergeStep<F, ($($P,)+)>
485        where
486            for<'a> &'a mut F:
487                FnMut(&mut Ctx, $($P,)+ &A, &B, &C, &D) -> Out +
488                FnMut(&mut Ctx, $($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
489        {
490            type Out = Out;
491            #[inline(always)]
492            #[allow(non_snake_case)]
493            fn call(&mut self, ctx: &mut Ctx, world: &mut World, inputs: (&A, &B, &C, &D)) -> Out {
494                fn call_inner<Cx, $($P,)+ IA, IB, IC, ID, Output>(
495                    mut f: impl FnMut(&mut Cx, $($P,)+ &IA, &IB, &IC, &ID) -> Output,
496                    ctx: &mut Cx,
497                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID,
498                ) -> Output { f(ctx, $($P,)+ a, b, c, d) }
499                #[cfg(debug_assertions)]
500                world.clear_borrows();
501                // SAFETY: Resource IDs in self.state were obtained from the same
502                // Registry that built this World. Borrows are disjoint — enforced
503                // by conflict detection at build time (check_access).
504                let ($($P,)+) = unsafe {
505                    <($($P,)+) as Param>::fetch(world, &mut self.state)
506                };
507                call_inner(&mut self.f, ctx, $($P,)+ inputs.0, inputs.1, inputs.2, inputs.3)
508            }
509        }
510
511        #[allow(clippy::many_single_char_names)]
512        impl<Ctx, A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
513            IntoCtxMergeStep<Ctx, (&A, &B, &C, &D), Out, ($($P,)+)> for F
514        where
515            for<'a> &'a mut F:
516                FnMut(&mut Ctx, $($P,)+ &A, &B, &C, &D) -> Out +
517                FnMut(&mut Ctx, $($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
518        {
519            type Step = CtxMergeStep<F, ($($P,)+)>;
520
521            fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
522                let state = <($($P,)+) as Param>::init(registry);
523                {
524                    #[allow(non_snake_case)]
525                    let ($($P,)+) = &state;
526                    registry.check_access(&[
527                        $((<$P as Param>::resource_id($P),
528                           std::any::type_name::<$P>()),)+
529                    ]);
530                }
531                CtxMergeStep { f: self, state, name: std::any::type_name::<F>() }
532            }
533        }
534    };
535}
536
537all_tuples!(impl_ctx_merge2_step);
538all_tuples!(impl_ctx_merge3_step);
539all_tuples!(impl_ctx_merge4_step);
540
541// =============================================================================
542// CtxDagBuilder — entry point
543// =============================================================================
544
545/// Entry point for building a context-aware DAG pipeline.
546///
547/// Like [`DagBuilder`](crate::DagBuilder) but every step receives `&mut C`
548/// as the first argument.
549///
550/// # Examples
551///
552/// ```ignore
553/// let dag = CtxDagBuilder::<MyCtx, WireMsg>::new()
554///     .root(decode, &reg)
555///     .fork()
556///     .arm(|a| a.then(update_book, &reg))
557///     .arm(|b| b.then(update_risk, &reg))
558///     .merge(check_and_submit, &reg)
559///     .build();
560/// ```
561#[must_use = "a DAG builder does nothing unless you chain steps and call .build()"]
562pub struct CtxDagBuilder<C, E>(PhantomData<fn(&mut C, E)>);
563
564impl<C, E> CtxDagBuilder<C, E> {
565    /// Create a new typed context-aware DAG entry point.
566    pub fn new() -> Self {
567        Self(PhantomData)
568    }
569
570    /// Set the root step. Takes the event `E` by value, produces `Out`.
571    pub fn root<Out, Params, S>(
572        self,
573        f: S,
574        registry: &Registry,
575    ) -> CtxDagChain<C, E, Out, CtxThenNode<CtxIdentityNode, S::Step>>
576    where
577        Out: 'static,
578        S: IntoCtxStep<C, E, Out, Params>,
579    {
580        CtxDagChain {
581            chain: CtxThenNode {
582                prev: CtxIdentityNode,
583                step: f.into_ctx_step(registry),
584            },
585            _marker: PhantomData,
586        }
587    }
588}
589
590impl<C, E> Default for CtxDagBuilder<C, E> {
591    fn default() -> Self {
592        Self::new()
593    }
594}
595
596// =============================================================================
597// CtxDagChain — main chain builder
598// =============================================================================
599
600/// Main chain builder for a context-aware DAG.
601///
602/// `Chain` implements [`CtxChainCall<C, E, Out = Out>`].
603#[must_use = "DAG chain does nothing until .build() is called"]
604pub struct CtxDagChain<C, In, Out, Chain> {
605    chain: Chain,
606    _marker: PhantomData<fn(&mut C, In) -> Out>,
607}
608
609impl<C, In, Out: 'static, Chain> CtxDagChain<C, In, Out, Chain> {
610    /// Enter fork mode. Subsequent `.arm()` calls add parallel branches.
611    pub fn fork(self) -> CtxDagChainFork<C, In, Out, Chain, ()> {
612        CtxDagChainFork {
613            chain: self.chain,
614            arms: (),
615            _marker: PhantomData,
616        }
617    }
618}
619
620impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxDagChain<C, In, (), Chain> {
621    /// Finalize into a [`CtxDag`] that implements [`CtxStepCall`].
622    #[must_use = "building a DAG without storing it does nothing"]
623    pub fn build(self) -> CtxDag<C, In, Chain> {
624        CtxDag {
625            chain: self.chain,
626            _marker: PhantomData,
627        }
628    }
629}
630
631impl<C, In, Chain: CtxChainCall<C, In, Out = Option<()>>> CtxDagChain<C, In, Option<()>, Chain> {
632    /// Finalize into a [`CtxDag`], discarding the `Option<()>`.
633    #[must_use = "building a DAG without storing it does nothing"]
634    pub fn build(self) -> CtxDag<C, In, CtxDiscardOptionNode<Chain>> {
635        CtxDag {
636            chain: CtxDiscardOptionNode { prev: self.chain },
637            _marker: PhantomData,
638        }
639    }
640}
641
642// =============================================================================
643// CtxDagArmSeed — arm builder seed
644// =============================================================================
645
646/// Arm builder seed for context-aware DAG. Used in `.arm()` closures.
647pub struct CtxDagArmSeed<C, In>(PhantomData<fn(&mut C, *const In)>);
648
649impl<C, In> CtxDagArmSeed<C, In> {
650    /// Create a new arm seed. Typically constructed internally by `.arm()`.
651    pub fn new() -> Self {
652        Self(PhantomData)
653    }
654}
655
656impl<C, In> Default for CtxDagArmSeed<C, In> {
657    fn default() -> Self {
658        Self::new()
659    }
660}
661
662impl<C, In: 'static> CtxDagArmSeed<C, In> {
663    /// Add the first step in this arm. Takes `&In` by reference.
664    pub fn then<Out, Params, S>(
665        self,
666        f: S,
667        registry: &Registry,
668    ) -> CtxDagArm<C, In, Out, CtxThenNode<CtxIdentityNode, S::Step>>
669    where
670        Out: 'static,
671        S: IntoCtxStep<C, &'static In, Out, Params>,
672        S::Step: for<'a> CtxStepCall<C, &'a In, Out = Out>,
673    {
674        CtxDagArm {
675            chain: CtxThenNode {
676                prev: CtxIdentityNode,
677                step: f.into_ctx_step(registry),
678            },
679            _marker: PhantomData,
680        }
681    }
682}
683
684// =============================================================================
685// CtxDagArm — built arm in a DAG fork
686// =============================================================================
687
688/// Built arm in a context-aware DAG fork.
689pub struct CtxDagArm<C, In, Out, Chain> {
690    chain: Chain,
691    _marker: PhantomData<fn(&mut C, *const In) -> Out>,
692}
693
694impl<C, In: 'static, Out: 'static, Chain> CtxDagArm<C, In, Out, Chain> {
695    /// Enter fork mode within this arm.
696    pub fn fork(self) -> CtxDagArmFork<C, In, Out, Chain, ()> {
697        CtxDagArmFork {
698            chain: self.chain,
699            arms: (),
700            _marker: PhantomData,
701        }
702    }
703}
704
705// =============================================================================
706// CtxDagChainFork / CtxDagArmFork — fork builders
707// =============================================================================
708
709/// Fork builder on the main chain. Accumulates arms as a tuple.
710pub struct CtxDagChainFork<C, In, ForkOut, Chain, Arms> {
711    chain: Chain,
712    arms: Arms,
713    _marker: PhantomData<fn(&mut C, In) -> ForkOut>,
714}
715
716/// Fork builder within an arm. Accumulates sub-arms as a tuple.
717pub struct CtxDagArmFork<C, In, ForkOut, Chain, Arms> {
718    chain: Chain,
719    arms: Arms,
720    _marker: PhantomData<fn(&mut C, *const In) -> ForkOut>,
721}
722
723// =============================================================================
724// CtxDag — built context-aware DAG
725// =============================================================================
726
727/// Built context-aware DAG.
728///
729/// Created by [`CtxDagChain::build`]. Implements [`CtxStepCall`]
730/// for use inside [`Callback`](crate::Callback) dispatch.
731pub struct CtxDag<C, In, Chain> {
732    chain: Chain,
733    _marker: PhantomData<fn(&mut C, In)>,
734}
735
736impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxStepCall<C, In> for CtxDag<C, In, Chain> {
737    type Out = ();
738    #[inline(always)]
739    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) {
740        self.chain.call(ctx, world, input);
741    }
742}
743
744impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxDag<C, In, Chain> {
745    /// Run the DAG with context, world, and input.
746    pub fn run(&mut self, ctx: &mut C, world: &mut World, input: In) {
747        self.chain.call(ctx, world, input);
748    }
749}
750
751// =============================================================================
752// Merge / Join named nodes — context-aware fork terminal nodes
753// =============================================================================
754
755/// Merge two context-aware fork arms into a single output.
756#[doc(hidden)]
757pub struct CtxMergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut> {
758    chain: Chain,
759    arm0: C0,
760    arm1: C1,
761    merge: MS,
762    _marker: PhantomData<fn(ForkOut) -> (A0, A1, MOut)>,
763}
764
765impl<Ctx, In, Chain, C0, C1, MS, ForkOut, A0, A1, MOut> CtxChainCall<Ctx, In>
766    for CtxMergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut>
767where
768    ForkOut: 'static,
769    A0: 'static,
770    A1: 'static,
771    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
772    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A0>,
773    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A1>,
774    MS: for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1), Out = MOut>,
775{
776    type Out = MOut;
777
778    #[inline(always)]
779    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) -> MOut {
780        let fork_out = self.chain.call(ctx, world, input);
781        let o0 = self.arm0.call(ctx, world, &fork_out);
782        let o1 = self.arm1.call(ctx, world, &fork_out);
783        self.merge.call(ctx, world, (&o0, &o1))
784    }
785}
786
787/// Merge three context-aware fork arms into a single output.
788#[doc(hidden)]
789pub struct CtxMergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> {
790    chain: Chain,
791    arm0: C0,
792    arm1: C1,
793    arm2: C2,
794    merge: MS,
795    _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, MOut)>,
796}
797
798impl<Ctx, In, Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> CtxChainCall<Ctx, In>
799    for CtxMergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut>
800where
801    ForkOut: 'static,
802    A0: 'static,
803    A1: 'static,
804    A2: 'static,
805    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
806    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A0>,
807    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A1>,
808    C2: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A2>,
809    MS: for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1, &'x A2), Out = MOut>,
810{
811    type Out = MOut;
812
813    #[inline(always)]
814    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) -> MOut {
815        let fork_out = self.chain.call(ctx, world, input);
816        let o0 = self.arm0.call(ctx, world, &fork_out);
817        let o1 = self.arm1.call(ctx, world, &fork_out);
818        let o2 = self.arm2.call(ctx, world, &fork_out);
819        self.merge.call(ctx, world, (&o0, &o1, &o2))
820    }
821}
822
823/// Merge four context-aware fork arms into a single output.
824#[doc(hidden)]
825pub struct CtxMergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> {
826    chain: Chain,
827    arm0: C0,
828    arm1: C1,
829    arm2: C2,
830    arm3: C3,
831    merge: MS,
832    _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, A3, MOut)>,
833}
834
835#[allow(clippy::many_single_char_names)]
836impl<Ctx, In, Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> CtxChainCall<Ctx, In>
837    for CtxMergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut>
838where
839    ForkOut: 'static,
840    A0: 'static,
841    A1: 'static,
842    A2: 'static,
843    A3: 'static,
844    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
845    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A0>,
846    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A1>,
847    C2: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A2>,
848    C3: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A3>,
849    MS: for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1, &'x A2, &'x A3), Out = MOut>,
850{
851    type Out = MOut;
852
853    #[inline(always)]
854    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) -> MOut {
855        let fork_out = self.chain.call(ctx, world, input);
856        let o0 = self.arm0.call(ctx, world, &fork_out);
857        let o1 = self.arm1.call(ctx, world, &fork_out);
858        let o2 = self.arm2.call(ctx, world, &fork_out);
859        let o3 = self.arm3.call(ctx, world, &fork_out);
860        self.merge.call(ctx, world, (&o0, &o1, &o2, &o3))
861    }
862}
863
864/// Join two context-aware fork arms (all producing `()`).
865#[doc(hidden)]
866pub struct CtxJoinNode2<Chain, C0, C1, ForkOut> {
867    chain: Chain,
868    arm0: C0,
869    arm1: C1,
870    _marker: PhantomData<fn() -> ForkOut>,
871}
872
873impl<Ctx, In, Chain, C0, C1, ForkOut> CtxChainCall<Ctx, In> for CtxJoinNode2<Chain, C0, C1, ForkOut>
874where
875    ForkOut: 'static,
876    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
877    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
878    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
879{
880    type Out = ();
881
882    #[inline(always)]
883    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) {
884        let fork_out = self.chain.call(ctx, world, input);
885        self.arm0.call(ctx, world, &fork_out);
886        self.arm1.call(ctx, world, &fork_out);
887    }
888}
889
890/// Join three context-aware fork arms (all producing `()`).
891#[doc(hidden)]
892pub struct CtxJoinNode3<Chain, C0, C1, C2, ForkOut> {
893    chain: Chain,
894    arm0: C0,
895    arm1: C1,
896    arm2: C2,
897    _marker: PhantomData<fn() -> ForkOut>,
898}
899
900impl<Ctx, In, Chain, C0, C1, C2, ForkOut> CtxChainCall<Ctx, In>
901    for CtxJoinNode3<Chain, C0, C1, C2, ForkOut>
902where
903    ForkOut: 'static,
904    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
905    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
906    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
907    C2: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
908{
909    type Out = ();
910
911    #[inline(always)]
912    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) {
913        let fork_out = self.chain.call(ctx, world, input);
914        self.arm0.call(ctx, world, &fork_out);
915        self.arm1.call(ctx, world, &fork_out);
916        self.arm2.call(ctx, world, &fork_out);
917    }
918}
919
920/// Join four context-aware fork arms (all producing `()`).
921#[doc(hidden)]
922pub struct CtxJoinNode4<Chain, C0, C1, C2, C3, ForkOut> {
923    chain: Chain,
924    arm0: C0,
925    arm1: C1,
926    arm2: C2,
927    arm3: C3,
928    _marker: PhantomData<fn() -> ForkOut>,
929}
930
931#[allow(clippy::many_single_char_names)]
932impl<Ctx, In, Chain, C0, C1, C2, C3, ForkOut> CtxChainCall<Ctx, In>
933    for CtxJoinNode4<Chain, C0, C1, C2, C3, ForkOut>
934where
935    ForkOut: 'static,
936    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
937    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
938    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
939    C2: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
940    C3: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
941{
942    type Out = ();
943
944    #[inline(always)]
945    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) {
946        let fork_out = self.chain.call(ctx, world, input);
947        self.arm0.call(ctx, world, &fork_out);
948        self.arm1.call(ctx, world, &fork_out);
949        self.arm2.call(ctx, world, &fork_out);
950        self.arm3.call(ctx, world, &fork_out);
951    }
952}
953
954// =============================================================================
955// Combinator macro — shared between CtxDagChain and CtxDagArm
956// =============================================================================
957
958/// Generates step combinators, Option/Result helpers for context-aware DAG.
959macro_rules! impl_ctx_dag_combinators {
960    (builder: $Builder:ident, upstream: $U:ident) => {
961        // =============================================================
962        // Core — any Out
963        // =============================================================
964
965        impl<Ctx, $U, Out: 'static, Chain> $Builder<Ctx, $U, Out, Chain> {
966            /// Append a step. The step receives `&Out` by reference.
967            pub fn then<NewOut, Params, S>(
968                self,
969                f: S,
970                registry: &Registry,
971            ) -> $Builder<Ctx, $U, NewOut, CtxDagThenNode<Chain, S::Step, NewOut>>
972            where
973                NewOut: 'static,
974                S: IntoCtxStep<Ctx, &'static Out, NewOut, Params>,
975                S::Step: for<'a> CtxStepCall<Ctx, &'a Out, Out = NewOut> + 'static,
976            {
977                $Builder {
978                    chain: CtxDagThenNode {
979                        prev: self.chain,
980                        step: f.into_ctx_step(registry),
981                        _out: PhantomData,
982                    },
983                    _marker: PhantomData,
984                }
985            }
986
987            /// Conditionally wrap the output in `Option`.
988            pub fn guard<Params, S: IntoCtxRefStep<Ctx, Out, bool, Params>>(
989                self,
990                f: S,
991                registry: &Registry,
992            ) -> $Builder<Ctx, $U, Option<Out>, CtxGuardNode<Chain, S::Step>> {
993                $Builder {
994                    chain: CtxGuardNode {
995                        prev: self.chain,
996                        step: f.into_ctx_ref_step(registry),
997                    },
998                    _marker: PhantomData,
999                }
1000            }
1001
1002            /// Observe the current value without consuming or changing it.
1003            pub fn tap<Params, S: IntoCtxRefStep<Ctx, Out, (), Params>>(
1004                self,
1005                f: S,
1006                registry: &Registry,
1007            ) -> $Builder<Ctx, $U, Out, CtxTapNode<Chain, S::Step>> {
1008                $Builder {
1009                    chain: CtxTapNode {
1010                        prev: self.chain,
1011                        step: f.into_ctx_ref_step(registry),
1012                    },
1013                    _marker: PhantomData,
1014                }
1015            }
1016        }
1017
1018        // =============================================================
1019        // Option helpers
1020        // =============================================================
1021
1022        impl<Ctx, $U, T: 'static, Chain> $Builder<Ctx, $U, Option<T>, Chain> {
1023            /// Transform the inner value. Step not called on None.
1024            pub fn map<U, Params, S>(
1025                self,
1026                f: S,
1027                registry: &Registry,
1028            ) -> $Builder<Ctx, $U, Option<U>, CtxDagMapOptionNode<Chain, S::Step, U>>
1029            where
1030                U: 'static,
1031                S: IntoCtxStep<Ctx, &'static T, U, Params>,
1032                S::Step: for<'x> CtxStepCall<Ctx, &'x T, Out = U>,
1033            {
1034                $Builder {
1035                    chain: CtxDagMapOptionNode {
1036                        prev: self.chain,
1037                        step: f.into_ctx_step(registry),
1038                        _out: PhantomData,
1039                    },
1040                    _marker: PhantomData,
1041                }
1042            }
1043
1044            /// Short-circuits on None. std: `Option::and_then`
1045            pub fn and_then<U, Params, S>(
1046                self,
1047                f: S,
1048                registry: &Registry,
1049            ) -> $Builder<Ctx, $U, Option<U>, CtxDagAndThenOptionNode<Chain, S::Step, U>>
1050            where
1051                U: 'static,
1052                S: IntoCtxStep<Ctx, &'static T, Option<U>, Params>,
1053                S::Step: for<'x> CtxStepCall<Ctx, &'x T, Out = Option<U>>,
1054            {
1055                $Builder {
1056                    chain: CtxDagAndThenOptionNode {
1057                        prev: self.chain,
1058                        step: f.into_ctx_step(registry),
1059                        _out: PhantomData,
1060                    },
1061                    _marker: PhantomData,
1062                }
1063            }
1064
1065            /// Side effect on None.
1066            pub fn on_none<Params, S: IntoCtxProducer<Ctx, (), Params>>(
1067                self,
1068                f: S,
1069                registry: &Registry,
1070            ) -> $Builder<Ctx, $U, Option<T>, CtxOnNoneNode<Chain, S::Step>> {
1071                $Builder {
1072                    chain: CtxOnNoneNode {
1073                        prev: self.chain,
1074                        producer: f.into_ctx_producer(registry),
1075                    },
1076                    _marker: PhantomData,
1077                }
1078            }
1079
1080            /// Keep value if predicate holds. std: `Option::filter`
1081            pub fn filter<Params, S: IntoCtxRefStep<Ctx, T, bool, Params>>(
1082                self,
1083                f: S,
1084                registry: &Registry,
1085            ) -> $Builder<Ctx, $U, Option<T>, CtxFilterNode<Chain, S::Step>> {
1086                $Builder {
1087                    chain: CtxFilterNode {
1088                        prev: self.chain,
1089                        step: f.into_ctx_ref_step(registry),
1090                    },
1091                    _marker: PhantomData,
1092                }
1093            }
1094
1095            /// Side effect on Some value. std: `Option::inspect`
1096            pub fn inspect<Params, S: IntoCtxRefStep<Ctx, T, (), Params>>(
1097                self,
1098                f: S,
1099                registry: &Registry,
1100            ) -> $Builder<Ctx, $U, Option<T>, CtxInspectOptionNode<Chain, S::Step>> {
1101                $Builder {
1102                    chain: CtxInspectOptionNode {
1103                        prev: self.chain,
1104                        step: f.into_ctx_ref_step(registry),
1105                    },
1106                    _marker: PhantomData,
1107                }
1108            }
1109
1110            /// None becomes Err(err). std: `Option::ok_or`
1111            pub fn ok_or<E: Clone>(
1112                self,
1113                err: E,
1114            ) -> $Builder<Ctx, $U, Result<T, E>, CtxOkOrNode<Chain, E>> {
1115                $Builder {
1116                    chain: CtxOkOrNode {
1117                        prev: self.chain,
1118                        err,
1119                    },
1120                    _marker: PhantomData,
1121                }
1122            }
1123
1124            /// Exit Option — None becomes the default value.
1125            pub fn unwrap_or(
1126                self,
1127                default: T,
1128            ) -> $Builder<Ctx, $U, T, CtxUnwrapOrOptionNode<Chain, T>>
1129            where
1130                T: Clone,
1131            {
1132                $Builder {
1133                    chain: CtxUnwrapOrOptionNode {
1134                        prev: self.chain,
1135                        default,
1136                    },
1137                    _marker: PhantomData,
1138                }
1139            }
1140
1141            /// Exit Option — None becomes `f()`.
1142            pub fn unwrap_or_else<Params, S: IntoCtxProducer<Ctx, T, Params>>(
1143                self,
1144                f: S,
1145                registry: &Registry,
1146            ) -> $Builder<Ctx, $U, T, CtxUnwrapOrElseOptionNode<Chain, S::Step>> {
1147                $Builder {
1148                    chain: CtxUnwrapOrElseOptionNode {
1149                        prev: self.chain,
1150                        producer: f.into_ctx_producer(registry),
1151                    },
1152                    _marker: PhantomData,
1153                }
1154            }
1155        }
1156
1157        // =============================================================
1158        // Result helpers
1159        // =============================================================
1160
1161        impl<Ctx, $U, T: 'static, Err: 'static, Chain> $Builder<Ctx, $U, Result<T, Err>, Chain> {
1162            /// Transform the Ok value. Step not called on Err.
1163            pub fn map<U, Params, S>(
1164                self,
1165                f: S,
1166                registry: &Registry,
1167            ) -> $Builder<Ctx, $U, Result<U, Err>, CtxDagMapResultNode<Chain, S::Step, U>>
1168            where
1169                U: 'static,
1170                S: IntoCtxStep<Ctx, &'static T, U, Params>,
1171                S::Step: for<'x> CtxStepCall<Ctx, &'x T, Out = U>,
1172            {
1173                $Builder {
1174                    chain: CtxDagMapResultNode {
1175                        prev: self.chain,
1176                        step: f.into_ctx_step(registry),
1177                        _out: PhantomData,
1178                    },
1179                    _marker: PhantomData,
1180                }
1181            }
1182
1183            /// Short-circuits on Err. std: `Result::and_then`
1184            pub fn and_then<U, Params, S>(
1185                self,
1186                f: S,
1187                registry: &Registry,
1188            ) -> $Builder<Ctx, $U, Result<U, Err>, CtxDagAndThenResultNode<Chain, S::Step, U>>
1189            where
1190                U: 'static,
1191                S: IntoCtxStep<Ctx, &'static T, Result<U, Err>, Params>,
1192                S::Step: for<'x> CtxStepCall<Ctx, &'x T, Out = Result<U, Err>>,
1193            {
1194                $Builder {
1195                    chain: CtxDagAndThenResultNode {
1196                        prev: self.chain,
1197                        step: f.into_ctx_step(registry),
1198                        _out: PhantomData,
1199                    },
1200                    _marker: PhantomData,
1201                }
1202            }
1203
1204            /// Handle error and transition to Option.
1205            pub fn catch<Params, S>(
1206                self,
1207                f: S,
1208                registry: &Registry,
1209            ) -> $Builder<Ctx, $U, Option<T>, CtxDagCatchNode<Chain, S::Step>>
1210            where
1211                S: IntoCtxStep<Ctx, &'static Err, (), Params>,
1212                S::Step: for<'x> CtxStepCall<Ctx, &'x Err, Out = ()>,
1213            {
1214                $Builder {
1215                    chain: CtxDagCatchNode {
1216                        prev: self.chain,
1217                        step: f.into_ctx_step(registry),
1218                    },
1219                    _marker: PhantomData,
1220                }
1221            }
1222
1223            /// Transform the error. std: `Result::map_err`
1224            pub fn map_err<E2, Params, S: IntoCtxStep<Ctx, Err, E2, Params>>(
1225                self,
1226                f: S,
1227                registry: &Registry,
1228            ) -> $Builder<Ctx, $U, Result<T, E2>, CtxMapErrNode<Chain, S::Step>> {
1229                $Builder {
1230                    chain: CtxMapErrNode {
1231                        prev: self.chain,
1232                        step: f.into_ctx_step(registry),
1233                    },
1234                    _marker: PhantomData,
1235                }
1236            }
1237
1238            /// Side effect on Ok value. std: `Result::inspect`
1239            pub fn inspect<Params, S: IntoCtxRefStep<Ctx, T, (), Params>>(
1240                self,
1241                f: S,
1242                registry: &Registry,
1243            ) -> $Builder<Ctx, $U, Result<T, Err>, CtxInspectResultNode<Chain, S::Step>> {
1244                $Builder {
1245                    chain: CtxInspectResultNode {
1246                        prev: self.chain,
1247                        step: f.into_ctx_ref_step(registry),
1248                    },
1249                    _marker: PhantomData,
1250                }
1251            }
1252
1253            /// Side effect on Err. std: `Result::inspect_err`
1254            pub fn inspect_err<Params, S: IntoCtxRefStep<Ctx, Err, (), Params>>(
1255                self,
1256                f: S,
1257                registry: &Registry,
1258            ) -> $Builder<Ctx, $U, Result<T, Err>, CtxInspectErrNode<Chain, S::Step>> {
1259                $Builder {
1260                    chain: CtxInspectErrNode {
1261                        prev: self.chain,
1262                        step: f.into_ctx_ref_step(registry),
1263                    },
1264                    _marker: PhantomData,
1265                }
1266            }
1267
1268            /// Discard error, enter Option land. std: `Result::ok`
1269            pub fn ok(self) -> $Builder<Ctx, $U, Option<T>, CtxOkNode<Chain>> {
1270                $Builder {
1271                    chain: CtxOkNode { prev: self.chain },
1272                    _marker: PhantomData,
1273                }
1274            }
1275
1276            /// Exit Result — Err becomes the default value.
1277            pub fn unwrap_or(
1278                self,
1279                default: T,
1280            ) -> $Builder<Ctx, $U, T, CtxUnwrapOrResultNode<Chain, T>>
1281            where
1282                T: Clone,
1283            {
1284                $Builder {
1285                    chain: CtxUnwrapOrResultNode {
1286                        prev: self.chain,
1287                        default,
1288                    },
1289                    _marker: PhantomData,
1290                }
1291            }
1292        }
1293    };
1294}
1295
1296impl_ctx_dag_combinators!(builder: CtxDagChain, upstream: In);
1297impl_ctx_dag_combinators!(builder: CtxDagArm, upstream: In);
1298
1299// =============================================================================
1300// Fork arity macro — arm accumulation, merge, join
1301// =============================================================================
1302
1303/// Generates arm accumulation, merge, and join for a context-aware fork type.
1304macro_rules! impl_ctx_dag_fork {
1305    (
1306        fork: $Fork:ident,
1307        output: $Output:ident,
1308        upstream: $U:ident
1309    ) => {
1310        // =============================================================
1311        // Arm accumulation: 0->1, 1->2, 2->3, 3->4
1312        // =============================================================
1313
1314        impl<Ctx, $U, ForkOut, Chain> $Fork<Ctx, $U, ForkOut, Chain, ()> {
1315            /// Add the first arm to this fork.
1316            pub fn arm<AOut, ACh>(
1317                self,
1318                f: impl FnOnce(CtxDagArmSeed<Ctx, ForkOut>) -> CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1319            ) -> $Fork<Ctx, $U, ForkOut, Chain, (CtxDagArm<Ctx, ForkOut, AOut, ACh>,)> {
1320                let arm = f(CtxDagArmSeed(PhantomData));
1321                $Fork {
1322                    chain: self.chain,
1323                    arms: (arm,),
1324                    _marker: PhantomData,
1325                }
1326            }
1327        }
1328
1329        impl<Ctx, $U, ForkOut, Chain, A0, C0>
1330            $Fork<Ctx, $U, ForkOut, Chain, (CtxDagArm<Ctx, ForkOut, A0, C0>,)>
1331        {
1332            /// Add a second arm to this fork.
1333            pub fn arm<AOut, ACh>(
1334                self,
1335                f: impl FnOnce(CtxDagArmSeed<Ctx, ForkOut>) -> CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1336            ) -> $Fork<
1337                Ctx,
1338                $U,
1339                ForkOut,
1340                Chain,
1341                (
1342                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1343                    CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1344                ),
1345            > {
1346                let arm = f(CtxDagArmSeed(PhantomData));
1347                let (a0,) = self.arms;
1348                $Fork {
1349                    chain: self.chain,
1350                    arms: (a0, arm),
1351                    _marker: PhantomData,
1352                }
1353            }
1354        }
1355
1356        impl<Ctx, $U, ForkOut, Chain, A0, C0, A1, C1>
1357            $Fork<
1358                Ctx,
1359                $U,
1360                ForkOut,
1361                Chain,
1362                (
1363                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1364                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1365                ),
1366            >
1367        {
1368            /// Add a third arm to this fork.
1369            pub fn arm<AOut, ACh>(
1370                self,
1371                f: impl FnOnce(CtxDagArmSeed<Ctx, ForkOut>) -> CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1372            ) -> $Fork<
1373                Ctx,
1374                $U,
1375                ForkOut,
1376                Chain,
1377                (
1378                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1379                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1380                    CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1381                ),
1382            > {
1383                let arm = f(CtxDagArmSeed(PhantomData));
1384                let (a0, a1) = self.arms;
1385                $Fork {
1386                    chain: self.chain,
1387                    arms: (a0, a1, arm),
1388                    _marker: PhantomData,
1389                }
1390            }
1391        }
1392
1393        impl<Ctx, $U, ForkOut, Chain, A0, C0, A1, C1, A2, C2>
1394            $Fork<
1395                Ctx,
1396                $U,
1397                ForkOut,
1398                Chain,
1399                (
1400                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1401                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1402                    CtxDagArm<Ctx, ForkOut, A2, C2>,
1403                ),
1404            >
1405        {
1406            /// Add a fourth arm to this fork.
1407            pub fn arm<AOut, ACh>(
1408                self,
1409                f: impl FnOnce(CtxDagArmSeed<Ctx, ForkOut>) -> CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1410            ) -> $Fork<
1411                Ctx,
1412                $U,
1413                ForkOut,
1414                Chain,
1415                (
1416                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1417                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1418                    CtxDagArm<Ctx, ForkOut, A2, C2>,
1419                    CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1420                ),
1421            > {
1422                let arm = f(CtxDagArmSeed(PhantomData));
1423                let (a0, a1, a2) = self.arms;
1424                $Fork {
1425                    chain: self.chain,
1426                    arms: (a0, a1, a2, arm),
1427                    _marker: PhantomData,
1428                }
1429            }
1430        }
1431
1432        // =============================================================
1433        // Merge arity 2
1434        // =============================================================
1435
1436        impl<Ctx, $U, ForkOut: 'static, Chain, A0: 'static, C0, A1: 'static, C1>
1437            $Fork<
1438                Ctx,
1439                $U,
1440                ForkOut,
1441                Chain,
1442                (
1443                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1444                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1445                ),
1446            >
1447        {
1448            /// Merge two arms with a context-aware merge step.
1449            pub fn merge<MOut, Params, S>(
1450                self,
1451                f: S,
1452                registry: &Registry,
1453            ) -> $Output<
1454                Ctx,
1455                $U,
1456                MOut,
1457                CtxMergeNode2<Chain, C0, C1, S::Step, ForkOut, A0, A1, MOut>,
1458            >
1459            where
1460                MOut: 'static,
1461                S: IntoCtxMergeStep<Ctx, (&'static A0, &'static A1), MOut, Params>,
1462                S::Step: for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1), Out = MOut>,
1463            {
1464                let (a0, a1) = self.arms;
1465                $Output {
1466                    chain: CtxMergeNode2 {
1467                        chain: self.chain,
1468                        arm0: a0.chain,
1469                        arm1: a1.chain,
1470                        merge: f.into_ctx_merge_step(registry),
1471                        _marker: PhantomData,
1472                    },
1473                    _marker: PhantomData,
1474                }
1475            }
1476        }
1477
1478        impl<Ctx, $U, ForkOut: 'static, Chain, C0, C1>
1479            $Fork<
1480                Ctx,
1481                $U,
1482                ForkOut,
1483                Chain,
1484                (
1485                    CtxDagArm<Ctx, ForkOut, (), C0>,
1486                    CtxDagArm<Ctx, ForkOut, (), C1>,
1487                ),
1488            >
1489        {
1490            /// Join two sink arms (all producing `()`).
1491            pub fn join(
1492                self,
1493            ) -> $Output<Ctx, $U, (), CtxJoinNode2<Chain, C0, C1, ForkOut>> {
1494                let (a0, a1) = self.arms;
1495                $Output {
1496                    chain: CtxJoinNode2 {
1497                        chain: self.chain,
1498                        arm0: a0.chain,
1499                        arm1: a1.chain,
1500                        _marker: PhantomData,
1501                    },
1502                    _marker: PhantomData,
1503                }
1504            }
1505        }
1506
1507        // =============================================================
1508        // Merge arity 3
1509        // =============================================================
1510
1511        impl<
1512            Ctx,
1513            $U,
1514            ForkOut: 'static,
1515            Chain,
1516            A0: 'static,
1517            C0,
1518            A1: 'static,
1519            C1,
1520            A2: 'static,
1521            C2,
1522        >
1523            $Fork<
1524                Ctx,
1525                $U,
1526                ForkOut,
1527                Chain,
1528                (
1529                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1530                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1531                    CtxDagArm<Ctx, ForkOut, A2, C2>,
1532                ),
1533            >
1534        {
1535            /// Merge three arms with a context-aware merge step.
1536            pub fn merge<MOut, Params, S>(
1537                self,
1538                f: S,
1539                registry: &Registry,
1540            ) -> $Output<
1541                Ctx,
1542                $U,
1543                MOut,
1544                CtxMergeNode3<Chain, C0, C1, C2, S::Step, ForkOut, A0, A1, A2, MOut>,
1545            >
1546            where
1547                MOut: 'static,
1548                S: IntoCtxMergeStep<
1549                    Ctx,
1550                    (&'static A0, &'static A1, &'static A2),
1551                    MOut,
1552                    Params,
1553                >,
1554                S::Step:
1555                    for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1, &'x A2), Out = MOut>,
1556            {
1557                let (a0, a1, a2) = self.arms;
1558                $Output {
1559                    chain: CtxMergeNode3 {
1560                        chain: self.chain,
1561                        arm0: a0.chain,
1562                        arm1: a1.chain,
1563                        arm2: a2.chain,
1564                        merge: f.into_ctx_merge_step(registry),
1565                        _marker: PhantomData,
1566                    },
1567                    _marker: PhantomData,
1568                }
1569            }
1570        }
1571
1572        impl<Ctx, $U, ForkOut: 'static, Chain, C0, C1, C2>
1573            $Fork<
1574                Ctx,
1575                $U,
1576                ForkOut,
1577                Chain,
1578                (
1579                    CtxDagArm<Ctx, ForkOut, (), C0>,
1580                    CtxDagArm<Ctx, ForkOut, (), C1>,
1581                    CtxDagArm<Ctx, ForkOut, (), C2>,
1582                ),
1583            >
1584        {
1585            /// Join three sink arms (all producing `()`).
1586            pub fn join(
1587                self,
1588            ) -> $Output<Ctx, $U, (), CtxJoinNode3<Chain, C0, C1, C2, ForkOut>> {
1589                let (a0, a1, a2) = self.arms;
1590                $Output {
1591                    chain: CtxJoinNode3 {
1592                        chain: self.chain,
1593                        arm0: a0.chain,
1594                        arm1: a1.chain,
1595                        arm2: a2.chain,
1596                        _marker: PhantomData,
1597                    },
1598                    _marker: PhantomData,
1599                }
1600            }
1601        }
1602
1603        // =============================================================
1604        // Merge arity 4
1605        // =============================================================
1606
1607        #[allow(clippy::many_single_char_names)]
1608        impl<
1609            Ctx,
1610            $U,
1611            ForkOut: 'static,
1612            Chain,
1613            A0: 'static,
1614            C0,
1615            A1: 'static,
1616            C1,
1617            A2: 'static,
1618            C2,
1619            A3: 'static,
1620            C3,
1621        >
1622            $Fork<
1623                Ctx,
1624                $U,
1625                ForkOut,
1626                Chain,
1627                (
1628                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1629                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1630                    CtxDagArm<Ctx, ForkOut, A2, C2>,
1631                    CtxDagArm<Ctx, ForkOut, A3, C3>,
1632                ),
1633            >
1634        {
1635            /// Merge four arms with a context-aware merge step.
1636            pub fn merge<MOut, Params, S>(
1637                self,
1638                f: S,
1639                registry: &Registry,
1640            ) -> $Output<
1641                Ctx,
1642                $U,
1643                MOut,
1644                CtxMergeNode4<Chain, C0, C1, C2, C3, S::Step, ForkOut, A0, A1, A2, A3, MOut>,
1645            >
1646            where
1647                MOut: 'static,
1648                S: IntoCtxMergeStep<
1649                    Ctx,
1650                    (&'static A0, &'static A1, &'static A2, &'static A3),
1651                    MOut,
1652                    Params,
1653                >,
1654                S::Step: for<'x> CtxMergeStepCall<
1655                    Ctx,
1656                    (&'x A0, &'x A1, &'x A2, &'x A3),
1657                    Out = MOut,
1658                >,
1659            {
1660                let (a0, a1, a2, a3) = self.arms;
1661                $Output {
1662                    chain: CtxMergeNode4 {
1663                        chain: self.chain,
1664                        arm0: a0.chain,
1665                        arm1: a1.chain,
1666                        arm2: a2.chain,
1667                        arm3: a3.chain,
1668                        merge: f.into_ctx_merge_step(registry),
1669                        _marker: PhantomData,
1670                    },
1671                    _marker: PhantomData,
1672                }
1673            }
1674        }
1675
1676        impl<Ctx, $U, ForkOut: 'static, Chain, C0, C1, C2, C3>
1677            $Fork<
1678                Ctx,
1679                $U,
1680                ForkOut,
1681                Chain,
1682                (
1683                    CtxDagArm<Ctx, ForkOut, (), C0>,
1684                    CtxDagArm<Ctx, ForkOut, (), C1>,
1685                    CtxDagArm<Ctx, ForkOut, (), C2>,
1686                    CtxDagArm<Ctx, ForkOut, (), C3>,
1687                ),
1688            >
1689        {
1690            /// Join four sink arms (all producing `()`).
1691            pub fn join(
1692                self,
1693            ) -> $Output<Ctx, $U, (), CtxJoinNode4<Chain, C0, C1, C2, C3, ForkOut>> {
1694                let (a0, a1, a2, a3) = self.arms;
1695                $Output {
1696                    chain: CtxJoinNode4 {
1697                        chain: self.chain,
1698                        arm0: a0.chain,
1699                        arm1: a1.chain,
1700                        arm2: a2.chain,
1701                        arm3: a3.chain,
1702                        _marker: PhantomData,
1703                    },
1704                    _marker: PhantomData,
1705                }
1706            }
1707        }
1708    };
1709}
1710
1711impl_ctx_dag_fork!(fork: CtxDagChainFork, output: CtxDagChain, upstream: In);
1712impl_ctx_dag_fork!(fork: CtxDagArmFork, output: CtxDagArm, upstream: In);
1713
1714// =============================================================================
1715// Tests
1716// =============================================================================
1717
1718#[cfg(test)]
1719mod tests {
1720    use super::*;
1721    use crate::{Res, ResMut, WorldBuilder};
1722
1723    struct TradingCtx {
1724        book_updates: u32,
1725        risk_checks: u32,
1726        submissions: u32,
1727    }
1728
1729    // -- Fork/merge basic test ------------------------------------------------
1730
1731    #[test]
1732    fn ctx_dag_fork_merge_two_arms() {
1733        let mut wb = WorldBuilder::new();
1734        wb.register::<u64>(0);
1735        let mut world = wb.build();
1736        let reg = world.registry();
1737
1738        fn decode(ctx: &mut TradingCtx, raw: u32) -> u64 {
1739            ctx.book_updates += 1;
1740            raw as u64
1741        }
1742
1743        fn arm_a(ctx: &mut TradingCtx, val: &u64) -> u64 {
1744            ctx.book_updates += 1;
1745            *val * 2
1746        }
1747
1748        fn arm_b(ctx: &mut TradingCtx, val: &u64) -> u64 {
1749            ctx.risk_checks += 1;
1750            *val + 10
1751        }
1752
1753        fn merge_fn(ctx: &mut TradingCtx, a: &u64, b: &u64) {
1754            ctx.submissions += 1;
1755            assert_eq!(*a, 10); // 5 * 2
1756            assert_eq!(*b, 15); // 5 + 10
1757        }
1758
1759        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1760            .root(decode, &reg)
1761            .fork()
1762            .arm(|seed| seed.then(arm_a, &reg))
1763            .arm(|seed| seed.then(arm_b, &reg))
1764            .merge(merge_fn, &reg)
1765            .build();
1766
1767        let mut ctx = TradingCtx {
1768            book_updates: 0,
1769            risk_checks: 0,
1770            submissions: 0,
1771        };
1772
1773        dag.run(&mut ctx, &mut world, 5);
1774
1775        assert_eq!(ctx.book_updates, 2); // decode + arm_a
1776        assert_eq!(ctx.risk_checks, 1);
1777        assert_eq!(ctx.submissions, 1);
1778    }
1779
1780    // -- Linear chain test ----------------------------------------------------
1781
1782    #[test]
1783    fn ctx_dag_linear_then() {
1784        let mut wb = WorldBuilder::new();
1785        wb.register::<u64>(0);
1786        let mut world = wb.build();
1787        let reg = world.registry();
1788
1789        fn root(ctx: &mut TradingCtx, x: u32) -> u64 {
1790            ctx.book_updates += 1;
1791            x as u64 * 2
1792        }
1793
1794        fn store(ctx: &mut TradingCtx, mut out: ResMut<u64>, val: &u64) {
1795            ctx.submissions += 1;
1796            *out = *val;
1797        }
1798
1799        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1800            .root(root, &reg)
1801            .then(store, &reg)
1802            .build();
1803
1804        let mut ctx = TradingCtx {
1805            book_updates: 0,
1806            risk_checks: 0,
1807            submissions: 0,
1808        };
1809
1810        dag.run(&mut ctx, &mut world, 5);
1811
1812        assert_eq!(ctx.book_updates, 1);
1813        assert_eq!(ctx.submissions, 1);
1814        assert_eq!(*world.resource::<u64>(), 10);
1815    }
1816
1817    // -- Join test (all arms produce ()) --------------------------------------
1818
1819    #[test]
1820    fn ctx_dag_fork_join() {
1821        let mut world = WorldBuilder::new().build();
1822        let reg = world.registry();
1823
1824        fn root(_ctx: &mut TradingCtx, x: u32) -> u64 {
1825            x as u64
1826        }
1827
1828        fn side_a(ctx: &mut TradingCtx, _val: &u64) {
1829            ctx.book_updates += 1;
1830        }
1831
1832        fn side_b(ctx: &mut TradingCtx, _val: &u64) {
1833            ctx.risk_checks += 1;
1834        }
1835
1836        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1837            .root(root, &reg)
1838            .fork()
1839            .arm(|seed| seed.then(side_a, &reg))
1840            .arm(|seed| seed.then(side_b, &reg))
1841            .join()
1842            .build();
1843
1844        let mut ctx = TradingCtx {
1845            book_updates: 0,
1846            risk_checks: 0,
1847            submissions: 0,
1848        };
1849
1850        dag.run(&mut ctx, &mut world, 42);
1851
1852        assert_eq!(ctx.book_updates, 1);
1853        assert_eq!(ctx.risk_checks, 1);
1854    }
1855
1856    // -- Guard + merge --------------------------------------------------------
1857
1858    #[test]
1859    fn ctx_dag_guard_before_fork() {
1860        let mut world = WorldBuilder::new().build();
1861        let reg = world.registry();
1862
1863        fn root(_ctx: &mut TradingCtx, x: u32) -> u32 {
1864            x
1865        }
1866
1867        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1868            .root(root, &reg)
1869            .guard(|_ctx: &mut TradingCtx, x: &u32| *x > 10, &reg)
1870            .map(
1871                |ctx: &mut TradingCtx, x: &u32| {
1872                    ctx.submissions += 1;
1873                    *x * 2
1874                },
1875                &reg,
1876            )
1877            .unwrap_or(0u32)
1878            .then(
1879                |ctx: &mut TradingCtx, val: &u32| {
1880                    ctx.book_updates = *val;
1881                },
1882                &reg,
1883            )
1884            .build();
1885
1886        let mut ctx = TradingCtx {
1887            book_updates: 0,
1888            risk_checks: 0,
1889            submissions: 0,
1890        };
1891
1892        // x = 5, guard fails
1893        dag.run(&mut ctx, &mut world, 5);
1894        assert_eq!(ctx.book_updates, 0);
1895        assert_eq!(ctx.submissions, 0);
1896
1897        // x = 20, guard passes
1898        dag.run(&mut ctx, &mut world, 20);
1899        assert_eq!(ctx.book_updates, 40);
1900        assert_eq!(ctx.submissions, 1);
1901    }
1902
1903    // -- 3-arm fork test ------------------------------------------------------
1904
1905    #[test]
1906    fn ctx_dag_three_arm_fork_merge() {
1907        let mut wb = WorldBuilder::new();
1908        wb.register::<u64>(0);
1909        let mut world = wb.build();
1910        let reg = world.registry();
1911
1912        fn decode(ctx: &mut TradingCtx, raw: u32) -> u64 {
1913            ctx.book_updates += 1;
1914            raw as u64
1915        }
1916
1917        fn arm_a(ctx: &mut TradingCtx, val: &u64) -> u64 {
1918            ctx.book_updates += 1;
1919            *val * 2
1920        }
1921
1922        fn arm_b(ctx: &mut TradingCtx, val: &u64) -> u64 {
1923            ctx.risk_checks += 1;
1924            *val + 10
1925        }
1926
1927        fn arm_c(ctx: &mut TradingCtx, val: &u64) -> u64 {
1928            ctx.submissions += 1;
1929            *val * 3
1930        }
1931
1932        fn merge3(ctx: &mut TradingCtx, a: &u64, b: &u64, c: &u64) {
1933            ctx.submissions += 1;
1934            assert_eq!(*a, 10); // 5 * 2
1935            assert_eq!(*b, 15); // 5 + 10
1936            assert_eq!(*c, 15); // 5 * 3
1937        }
1938
1939        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1940            .root(decode, &reg)
1941            .fork()
1942            .arm(|seed| seed.then(arm_a, &reg))
1943            .arm(|seed| seed.then(arm_b, &reg))
1944            .arm(|seed| seed.then(arm_c, &reg))
1945            .merge(merge3, &reg)
1946            .build();
1947
1948        let mut ctx = TradingCtx {
1949            book_updates: 0,
1950            risk_checks: 0,
1951            submissions: 0,
1952        };
1953
1954        dag.run(&mut ctx, &mut world, 5);
1955
1956        assert_eq!(ctx.book_updates, 2); // decode + arm_a
1957        assert_eq!(ctx.risk_checks, 1); // arm_b
1958        assert_eq!(ctx.submissions, 2); // arm_c + merge3
1959    }
1960
1961    // -- Merge with Param resolution ------------------------------------------
1962
1963    #[test]
1964    fn ctx_dag_merge_with_param() {
1965        let mut wb = WorldBuilder::new();
1966        wb.register::<u64>(100);
1967        let mut world = wb.build();
1968        let reg = world.registry();
1969
1970        fn decode(ctx: &mut TradingCtx, raw: u32) -> u64 {
1971            ctx.book_updates += 1;
1972            raw as u64
1973        }
1974
1975        fn arm_a(_ctx: &mut TradingCtx, val: &u64) -> u64 {
1976            *val * 2
1977        }
1978
1979        fn arm_b(_ctx: &mut TradingCtx, val: &u64) -> u64 {
1980            *val + 10
1981        }
1982
1983        // Merge step that uses Res<u64> — exercises the unsafe Param::fetch path
1984        fn merge_with_res(ctx: &mut TradingCtx, scale: Res<u64>, a: &u64, b: &u64) {
1985            ctx.submissions += 1;
1986            // scale=100, a=10 (5*2), b=15 (5+10)
1987            assert_eq!(*scale, 100);
1988            assert_eq!(*a + *b, 25);
1989        }
1990
1991        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1992            .root(decode, &reg)
1993            .fork()
1994            .arm(|seed| seed.then(arm_a, &reg))
1995            .arm(|seed| seed.then(arm_b, &reg))
1996            .merge(merge_with_res, &reg)
1997            .build();
1998
1999        let mut ctx = TradingCtx {
2000            book_updates: 0,
2001            risk_checks: 0,
2002            submissions: 0,
2003        };
2004
2005        dag.run(&mut ctx, &mut world, 5);
2006
2007        assert_eq!(ctx.book_updates, 1);
2008        assert_eq!(ctx.submissions, 1);
2009    }
2010}