Skip to main content

nexus_rt/
ctx_dag.rs

1// Builder return types are necessarily complex — each combinator returns
2// CtxDagChain<C, In, Out, NodeType<Chain, ...>>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4
5//! Context-aware DAG dispatch.
6//!
7//! Mirrors the [`dag`](crate::dag) module but threads `&mut C`
8//! (per-instance context) through every step. Designed for use inside
9//! [`Callback`](crate::Callback) where each handler instance owns private
10//! state that pipeline steps need to read or mutate.
11//!
12//! # Step function convention
13//!
14//! Context first, then Params, then step input last:
15//!
16//! ```ignore
17//! fn update_book(ctx: &mut TradingCtx, book: ResMut<OrderBook>, msg: &Decoded) -> BookResult { .. }
18//! fn update_risk(ctx: &mut TradingCtx, risk: ResMut<RiskState>, msg: &Decoded) -> RiskResult { .. }
19//! fn check_and_submit(ctx: &mut TradingCtx, a: &BookResult, b: &RiskResult) { .. }
20//! ```
21//!
22//! # Integration with Callback
23//!
24//! The built [`CtxDag`] implements [`CtxStepCall`] — it takes
25//! `&mut C`, `&mut World`, and `In`, returning `Out`.
26//!
27//! To use a DAG from a [`Handler`](crate::Handler), create a normal
28//! [`Callback`](crate::Callback) whose handler function owns or accesses
29//! the context `C` and calls the DAG via its `run` method, passing
30//! `&mut C`, `&mut World`, and the handler input.
31//!
32//! # Three-tier step resolution
33//!
34//! Each combinator accepts functions via three tiers, matching the
35//! [`dag`](crate::dag) module:
36//!
37//! 1. **Named function with Params** — `fn(&mut C, Res<T>, &In) -> Out`
38//! 2. **Arity-0 closure** — `FnMut(&mut C, &In) -> Out`
39//! 3. **[`Opaque`](crate::Opaque) closure** — `FnMut(&mut C, &mut World, &In) -> Out`
40//!    (raw World access, no Param resolution)
41//!
42//! # Deferred combinators
43//!
44//! The following combinators from [`dag`](crate::dag) are not yet implemented:
45//! `scan`, `dedup`, `dispatch`, `route`, `tee`, `splat`, `cloned`,
46//! `not`/`and`/`or`/`xor` (bool), `ok_or_else`, `or_else`,
47//! `Result::unwrap_or_else`, `view`, and `BatchDag`. These can be added
48//! when a concrete use case requires them.
49//!
50//! # Examples
51//!
52//! ```ignore
53//! let dag = CtxDagBuilder::<MyCtx, WireMsg>::new()
54//!     .root(decode, &reg)
55//!     .fork()
56//!     .arm(|a| a.then(update_book, &reg))
57//!     .arm(|b| b.then(update_risk, &reg))
58//!     .merge(check_and_submit, &reg)
59//!     .build();
60//!
61//! dag.call(&mut ctx, &mut world, wire_msg);
62//! ```
63
64use std::marker::PhantomData;
65
66use crate::ctx_pipeline::{
67    CtxChainCall, CtxDiscardOptionNode, CtxFilterNode, CtxGuardNode, CtxIdentityNode,
68    CtxInspectErrNode, CtxInspectOptionNode, CtxInspectResultNode, CtxMapErrNode, CtxOkNode,
69    CtxOkOrNode, CtxOnNoneNode, CtxStepCall, CtxTapNode, CtxThenNode, CtxUnwrapOrElseOptionNode,
70    CtxUnwrapOrOptionNode, CtxUnwrapOrResultNode, IntoCtxProducer, IntoCtxRefStep, IntoCtxStep,
71};
72use crate::handler::Param;
73use crate::world::{Registry, World};
74
75// =============================================================================
76// CtxDagThenNode — DAG ref step (takes &Out, produces NewOut)
77// =============================================================================
78
79/// Chain node for DAG `.then()` — takes input by reference.
80#[doc(hidden)]
81pub struct CtxDagThenNode<Prev, S, NewOut> {
82    pub(crate) prev: Prev,
83    pub(crate) step: S,
84    pub(crate) _out: PhantomData<fn() -> NewOut>,
85}
86
87impl<C, In, Prev, S, NewOut: 'static> CtxChainCall<C, In> for CtxDagThenNode<Prev, S, NewOut>
88where
89    Prev: CtxChainCall<C, In>,
90    Prev::Out: 'static,
91    S: for<'a> CtxStepCall<C, &'a Prev::Out, Out = NewOut>,
92{
93    type Out = NewOut;
94    #[inline(always)]
95    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> NewOut {
96        let mid = self.prev.call(ctx, world, input);
97        self.step.call(ctx, world, &mid)
98    }
99}
100
101// =============================================================================
102// DAG Option/Result nodes that take inner by ref (matching dag.rs pattern)
103// =============================================================================
104
105/// Chain node for DAG `.map()` on `Option<T>` — takes `&T`.
106#[doc(hidden)]
107pub struct CtxDagMapOptionNode<Prev, S, U> {
108    pub(crate) prev: Prev,
109    pub(crate) step: S,
110    pub(crate) _out: PhantomData<fn() -> U>,
111}
112
113impl<C, In, T: 'static, Prev, S, U> CtxChainCall<C, In> for CtxDagMapOptionNode<Prev, S, U>
114where
115    Prev: CtxChainCall<C, In, Out = Option<T>>,
116    S: for<'a> CtxStepCall<C, &'a T, Out = U>,
117{
118    type Out = Option<U>;
119    #[inline(always)]
120    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<U> {
121        match self.prev.call(ctx, world, input) {
122            Some(val) => Some(self.step.call(ctx, world, &val)),
123            None => None,
124        }
125    }
126}
127
128/// Chain node for DAG `.and_then()` on `Option<T>` — takes `&T`.
129#[doc(hidden)]
130pub struct CtxDagAndThenOptionNode<Prev, S, U> {
131    pub(crate) prev: Prev,
132    pub(crate) step: S,
133    pub(crate) _out: PhantomData<fn() -> U>,
134}
135
136impl<C, In, T: 'static, Prev, S, U> CtxChainCall<C, In> for CtxDagAndThenOptionNode<Prev, S, U>
137where
138    Prev: CtxChainCall<C, In, Out = Option<T>>,
139    S: for<'a> CtxStepCall<C, &'a T, Out = Option<U>>,
140{
141    type Out = Option<U>;
142    #[inline(always)]
143    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<U> {
144        match self.prev.call(ctx, world, input) {
145            Some(val) => self.step.call(ctx, world, &val),
146            None => None,
147        }
148    }
149}
150
151/// Chain node for DAG `.map()` on `Result<T, E>` — takes `&T`.
152#[doc(hidden)]
153pub struct CtxDagMapResultNode<Prev, S, U> {
154    pub(crate) prev: Prev,
155    pub(crate) step: S,
156    pub(crate) _out: PhantomData<fn() -> U>,
157}
158
159impl<C, In, T: 'static, E, Prev, S, U> CtxChainCall<C, In> for CtxDagMapResultNode<Prev, S, U>
160where
161    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
162    S: for<'a> CtxStepCall<C, &'a T, Out = U>,
163{
164    type Out = Result<U, E>;
165    #[inline(always)]
166    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<U, E> {
167        match self.prev.call(ctx, world, input) {
168            Ok(val) => Ok(self.step.call(ctx, world, &val)),
169            Err(e) => Err(e),
170        }
171    }
172}
173
174/// Chain node for DAG `.and_then()` on `Result<T, E>` — takes `&T`.
175#[doc(hidden)]
176pub struct CtxDagAndThenResultNode<Prev, S, U> {
177    pub(crate) prev: Prev,
178    pub(crate) step: S,
179    pub(crate) _out: PhantomData<fn() -> U>,
180}
181
182impl<C, In, T: 'static, U, E, Prev, S> CtxChainCall<C, In> for CtxDagAndThenResultNode<Prev, S, U>
183where
184    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
185    S: for<'a> CtxStepCall<C, &'a T, Out = Result<U, E>>,
186{
187    type Out = Result<U, E>;
188    #[inline(always)]
189    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<U, E> {
190        match self.prev.call(ctx, world, input) {
191            Ok(val) => self.step.call(ctx, world, &val),
192            Err(e) => Err(e),
193        }
194    }
195}
196
197/// Chain node for DAG `.catch()` on `Result<T, E>` — takes `&E`.
198#[doc(hidden)]
199pub struct CtxDagCatchNode<Prev, S> {
200    pub(crate) prev: Prev,
201    pub(crate) step: S,
202}
203
204impl<C, In, T, E: 'static, Prev, S> CtxChainCall<C, In> for CtxDagCatchNode<Prev, S>
205where
206    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
207    S: for<'a> CtxStepCall<C, &'a E, Out = ()>,
208{
209    type Out = Option<T>;
210    #[inline(always)]
211    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
212        match self.prev.call(ctx, world, input) {
213            Ok(val) => Some(val),
214            Err(e) => {
215                self.step.call(ctx, world, &e);
216                None
217            }
218        }
219    }
220}
221
222// =============================================================================
223// CtxMergeStepCall / IntoCtxMergeStep — context-aware merge step dispatch
224// =============================================================================
225
226/// Callable trait for resolved context-aware merge steps.
227///
228/// Like [`MergeStepCall`](crate::dag::MergeStepCall) but with `&mut C` context.
229#[doc(hidden)]
230pub trait CtxMergeStepCall<C, Inputs> {
231    /// The output type of this merge step.
232    type Out;
233    /// Call this merge step with context, world, and input references.
234    fn call(&mut self, ctx: &mut C, world: &mut World, inputs: Inputs) -> Self::Out;
235}
236
237/// Converts a named function into a resolved context-aware merge step.
238///
239/// Context first, then Params, then N reference inputs:
240///
241/// ```ignore
242/// fn check(ctx: &mut Ctx, config: Res<Config>, ob: &ObResult, risk: &RiskResult) -> Decision { .. }
243/// ```
244#[doc(hidden)]
245#[diagnostic::on_unimplemented(
246    message = "this function cannot be used as a context-aware merge step",
247    note = "ctx merge steps: `fn(&mut C, Params..., &A, &B, ...) -> Out`",
248    note = "closures with resource parameters are not supported — use a named `fn`"
249)]
250pub trait IntoCtxMergeStep<C, Inputs, Out, Params> {
251    /// The concrete resolved merge step type.
252    type Step: CtxMergeStepCall<C, Inputs, Out = Out>;
253
254    /// Resolve Param state from the registry and produce a merge step.
255    fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step;
256}
257
258/// Internal: pre-resolved context-aware merge step with cached Param state.
259#[doc(hidden)]
260pub struct CtxMergeStep<F, Params: Param> {
261    f: F,
262    state: Params::State,
263    // Retained for future diagnostic/tracing use (step name in error messages).
264    #[allow(dead_code)]
265    name: &'static str,
266}
267
268// -- Merge arity 2, Param arity 0 (closures work) ---------------------------
269
270impl<C, A, B, Out, F> CtxMergeStepCall<C, (&A, &B)> for CtxMergeStep<F, ()>
271where
272    F: FnMut(&mut C, &A, &B) -> Out + 'static,
273{
274    type Out = Out;
275    #[inline(always)]
276    fn call(&mut self, ctx: &mut C, _world: &mut World, inputs: (&A, &B)) -> Out {
277        (self.f)(ctx, inputs.0, inputs.1)
278    }
279}
280
281impl<C, A, B, Out, F> IntoCtxMergeStep<C, (&A, &B), Out, ()> for F
282where
283    F: FnMut(&mut C, &A, &B) -> Out + 'static,
284{
285    type Step = CtxMergeStep<F, ()>;
286
287    fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
288        CtxMergeStep {
289            f: self,
290            state: <() as Param>::init(registry),
291            name: std::any::type_name::<F>(),
292        }
293    }
294}
295
296// -- Merge arity 2, Param arities 1-8 ---------------------------------------
297
298macro_rules! impl_ctx_merge2_step {
299    ($($P:ident),+) => {
300        impl<C, A, B, Out, F: 'static, $($P: Param + 'static),+>
301            CtxMergeStepCall<C, (&A, &B)> for CtxMergeStep<F, ($($P,)+)>
302        where
303            for<'a> &'a mut F:
304                FnMut(&mut C, $($P,)+ &A, &B) -> Out +
305                FnMut(&mut C, $($P::Item<'a>,)+ &A, &B) -> Out,
306        {
307            type Out = Out;
308            #[inline(always)]
309            #[allow(non_snake_case)]
310            fn call(&mut self, ctx: &mut C, world: &mut World, inputs: (&A, &B)) -> Out {
311                #[allow(clippy::too_many_arguments)]
312                fn call_inner<Ctx, $($P,)+ IA, IB, Output>(
313                    mut f: impl FnMut(&mut Ctx, $($P,)+ &IA, &IB) -> Output,
314                    ctx: &mut Ctx,
315                    $($P: $P,)+
316                    a: &IA, b: &IB,
317                ) -> Output {
318                    f(ctx, $($P,)+ a, b)
319                }
320                #[cfg(debug_assertions)]
321                world.clear_borrows();
322                // SAFETY: Resource IDs in self.state were obtained from the same
323                // Registry that built this World. Borrows are disjoint — enforced
324                // by conflict detection at build time (check_access).
325                let ($($P,)+) = unsafe {
326                    <($($P,)+) as Param>::fetch(world, &mut self.state)
327                };
328                call_inner(&mut self.f, ctx, $($P,)+ inputs.0, inputs.1)
329            }
330        }
331
332        impl<C, A, B, Out, F: 'static, $($P: Param + 'static),+>
333            IntoCtxMergeStep<C, (&A, &B), Out, ($($P,)+)> for F
334        where
335            for<'a> &'a mut F:
336                FnMut(&mut C, $($P,)+ &A, &B) -> Out +
337                FnMut(&mut C, $($P::Item<'a>,)+ &A, &B) -> Out,
338        {
339            type Step = CtxMergeStep<F, ($($P,)+)>;
340
341            fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
342                let state = <($($P,)+) as Param>::init(registry);
343                {
344                    #[allow(non_snake_case)]
345                    let ($($P,)+) = &state;
346                    registry.check_access(&[
347                        $((<$P as Param>::resource_id($P),
348                           std::any::type_name::<$P>()),)+
349                    ]);
350                }
351                CtxMergeStep { f: self, state, name: std::any::type_name::<F>() }
352            }
353        }
354    };
355}
356
357// -- Merge arity 3, Param arity 0 -------------------------------------------
358
359impl<Ctx, IA, IB, IC, Out, F> CtxMergeStepCall<Ctx, (&IA, &IB, &IC)> for CtxMergeStep<F, ()>
360where
361    F: FnMut(&mut Ctx, &IA, &IB, &IC) -> Out + 'static,
362{
363    type Out = Out;
364    #[inline(always)]
365    fn call(&mut self, ctx: &mut Ctx, _world: &mut World, inputs: (&IA, &IB, &IC)) -> Out {
366        (self.f)(ctx, inputs.0, inputs.1, inputs.2)
367    }
368}
369
370impl<Ctx, IA, IB, IC, Out, F> IntoCtxMergeStep<Ctx, (&IA, &IB, &IC), Out, ()> for F
371where
372    F: FnMut(&mut Ctx, &IA, &IB, &IC) -> Out + 'static,
373{
374    type Step = CtxMergeStep<F, ()>;
375
376    fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
377        CtxMergeStep {
378            f: self,
379            state: <() as Param>::init(registry),
380            name: std::any::type_name::<F>(),
381        }
382    }
383}
384
385// -- Merge arity 3, Param arities 1-8 ---------------------------------------
386
387macro_rules! impl_ctx_merge3_step {
388    ($($P:ident),+) => {
389        impl<Ctx, A, B, C, Out, F: 'static, $($P: Param + 'static),+>
390            CtxMergeStepCall<Ctx, (&A, &B, &C)> for CtxMergeStep<F, ($($P,)+)>
391        where
392            for<'a> &'a mut F:
393                FnMut(&mut Ctx, $($P,)+ &A, &B, &C) -> Out +
394                FnMut(&mut Ctx, $($P::Item<'a>,)+ &A, &B, &C) -> Out,
395        {
396            type Out = Out;
397            #[inline(always)]
398            #[allow(non_snake_case)]
399            fn call(&mut self, ctx: &mut Ctx, world: &mut World, inputs: (&A, &B, &C)) -> Out {
400                #[allow(clippy::too_many_arguments)]
401                fn call_inner<Cx, $($P,)+ IA, IB, IC, Output>(
402                    mut f: impl FnMut(&mut Cx, $($P,)+ &IA, &IB, &IC) -> Output,
403                    ctx: &mut Cx,
404                    $($P: $P,)+
405                    a: &IA, b: &IB, c: &IC,
406                ) -> Output {
407                    f(ctx, $($P,)+ a, b, c)
408                }
409                #[cfg(debug_assertions)]
410                world.clear_borrows();
411                // SAFETY: Resource IDs in self.state were obtained from the same
412                // Registry that built this World. Borrows are disjoint — enforced
413                // by conflict detection at build time (check_access).
414                let ($($P,)+) = unsafe {
415                    <($($P,)+) as Param>::fetch(world, &mut self.state)
416                };
417                call_inner(&mut self.f, ctx, $($P,)+ inputs.0, inputs.1, inputs.2)
418            }
419        }
420
421        impl<Ctx, A, B, C, Out, F: 'static, $($P: Param + 'static),+>
422            IntoCtxMergeStep<Ctx, (&A, &B, &C), Out, ($($P,)+)> for F
423        where
424            for<'a> &'a mut F:
425                FnMut(&mut Ctx, $($P,)+ &A, &B, &C) -> Out +
426                FnMut(&mut Ctx, $($P::Item<'a>,)+ &A, &B, &C) -> Out,
427        {
428            type Step = CtxMergeStep<F, ($($P,)+)>;
429
430            fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
431                let state = <($($P,)+) as Param>::init(registry);
432                {
433                    #[allow(non_snake_case)]
434                    let ($($P,)+) = &state;
435                    registry.check_access(&[
436                        $((<$P as Param>::resource_id($P),
437                           std::any::type_name::<$P>()),)+
438                    ]);
439                }
440                CtxMergeStep { f: self, state, name: std::any::type_name::<F>() }
441            }
442        }
443    };
444}
445
446// -- Merge arity 4, Param arity 0 -------------------------------------------
447
448impl<Ctx, IA, IB, IC, ID, Out, F> CtxMergeStepCall<Ctx, (&IA, &IB, &IC, &ID)>
449    for CtxMergeStep<F, ()>
450where
451    F: FnMut(&mut Ctx, &IA, &IB, &IC, &ID) -> Out + 'static,
452{
453    type Out = Out;
454    #[inline(always)]
455    fn call(&mut self, ctx: &mut Ctx, _world: &mut World, inputs: (&IA, &IB, &IC, &ID)) -> Out {
456        (self.f)(ctx, inputs.0, inputs.1, inputs.2, inputs.3)
457    }
458}
459
460impl<Ctx, IA, IB, IC, ID, Out, F> IntoCtxMergeStep<Ctx, (&IA, &IB, &IC, &ID), Out, ()> for F
461where
462    F: FnMut(&mut Ctx, &IA, &IB, &IC, &ID) -> Out + 'static,
463{
464    type Step = CtxMergeStep<F, ()>;
465
466    fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
467        CtxMergeStep {
468            f: self,
469            state: <() as Param>::init(registry),
470            name: std::any::type_name::<F>(),
471        }
472    }
473}
474
475// -- Merge arity 4, Param arities 1-8 ---------------------------------------
476
477macro_rules! impl_ctx_merge4_step {
478    ($($P:ident),+) => {
479        #[allow(clippy::many_single_char_names)]
480        impl<Ctx, A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
481            CtxMergeStepCall<Ctx, (&A, &B, &C, &D)> for CtxMergeStep<F, ($($P,)+)>
482        where
483            for<'a> &'a mut F:
484                FnMut(&mut Ctx, $($P,)+ &A, &B, &C, &D) -> Out +
485                FnMut(&mut Ctx, $($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
486        {
487            type Out = Out;
488            #[inline(always)]
489            #[allow(non_snake_case)]
490            fn call(&mut self, ctx: &mut Ctx, world: &mut World, inputs: (&A, &B, &C, &D)) -> Out {
491                #[allow(clippy::too_many_arguments)]
492                fn call_inner<Cx, $($P,)+ IA, IB, IC, ID, Output>(
493                    mut f: impl FnMut(&mut Cx, $($P,)+ &IA, &IB, &IC, &ID) -> Output,
494                    ctx: &mut Cx,
495                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID,
496                ) -> Output { f(ctx, $($P,)+ a, b, c, d) }
497                #[cfg(debug_assertions)]
498                world.clear_borrows();
499                // SAFETY: Resource IDs in self.state were obtained from the same
500                // Registry that built this World. Borrows are disjoint — enforced
501                // by conflict detection at build time (check_access).
502                let ($($P,)+) = unsafe {
503                    <($($P,)+) as Param>::fetch(world, &mut self.state)
504                };
505                call_inner(&mut self.f, ctx, $($P,)+ inputs.0, inputs.1, inputs.2, inputs.3)
506            }
507        }
508
509        #[allow(clippy::many_single_char_names)]
510        impl<Ctx, A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
511            IntoCtxMergeStep<Ctx, (&A, &B, &C, &D), Out, ($($P,)+)> for F
512        where
513            for<'a> &'a mut F:
514                FnMut(&mut Ctx, $($P,)+ &A, &B, &C, &D) -> Out +
515                FnMut(&mut Ctx, $($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
516        {
517            type Step = CtxMergeStep<F, ($($P,)+)>;
518
519            fn into_ctx_merge_step(self, registry: &Registry) -> Self::Step {
520                let state = <($($P,)+) as Param>::init(registry);
521                {
522                    #[allow(non_snake_case)]
523                    let ($($P,)+) = &state;
524                    registry.check_access(&[
525                        $((<$P as Param>::resource_id($P),
526                           std::any::type_name::<$P>()),)+
527                    ]);
528                }
529                CtxMergeStep { f: self, state, name: std::any::type_name::<F>() }
530            }
531        }
532    };
533}
534
535all_tuples!(impl_ctx_merge2_step);
536all_tuples!(impl_ctx_merge3_step);
537all_tuples!(impl_ctx_merge4_step);
538
539// =============================================================================
540// CtxDagBuilder — entry point
541// =============================================================================
542
543/// Entry point for building a context-aware DAG pipeline.
544///
545/// Like [`DagBuilder`](crate::DagBuilder) but every step receives `&mut C`
546/// as the first argument.
547///
548/// # Examples
549///
550/// ```ignore
551/// let dag = CtxDagBuilder::<MyCtx, WireMsg>::new()
552///     .root(decode, &reg)
553///     .fork()
554///     .arm(|a| a.then(update_book, &reg))
555///     .arm(|b| b.then(update_risk, &reg))
556///     .merge(check_and_submit, &reg)
557///     .build();
558/// ```
559#[must_use = "a DAG builder does nothing unless you chain steps and call .build()"]
560pub struct CtxDagBuilder<C, E>(PhantomData<fn(&mut C, E)>);
561
562impl<C, E> CtxDagBuilder<C, E> {
563    /// Create a new typed context-aware DAG entry point.
564    pub fn new() -> Self {
565        Self(PhantomData)
566    }
567
568    /// Set the root step. Takes the event `E` by value, produces `Out`.
569    pub fn root<Out, Params, S>(
570        self,
571        f: S,
572        registry: &Registry,
573    ) -> CtxDagChain<C, E, Out, CtxThenNode<CtxIdentityNode, S::Step>>
574    where
575        Out: 'static,
576        S: IntoCtxStep<C, E, Out, Params>,
577    {
578        CtxDagChain {
579            chain: CtxThenNode {
580                prev: CtxIdentityNode,
581                step: f.into_ctx_step(registry),
582            },
583            _marker: PhantomData,
584        }
585    }
586}
587
588impl<C, E> Default for CtxDagBuilder<C, E> {
589    fn default() -> Self {
590        Self::new()
591    }
592}
593
594// =============================================================================
595// CtxDagChain — main chain builder
596// =============================================================================
597
598/// Main chain builder for a context-aware DAG.
599///
600/// `Chain` implements [`CtxChainCall<C, E, Out = Out>`].
601#[must_use = "DAG chain does nothing until .build() is called"]
602pub struct CtxDagChain<C, In, Out, Chain> {
603    chain: Chain,
604    _marker: PhantomData<fn(&mut C, In) -> Out>,
605}
606
607impl<C, In, Out: 'static, Chain> CtxDagChain<C, In, Out, Chain> {
608    /// Enter fork mode. Subsequent `.arm()` calls add parallel branches.
609    pub fn fork(self) -> CtxDagChainFork<C, In, Out, Chain, ()> {
610        CtxDagChainFork {
611            chain: self.chain,
612            arms: (),
613            _marker: PhantomData,
614        }
615    }
616}
617
618impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxDagChain<C, In, (), Chain> {
619    /// Finalize into a [`CtxDag`] that implements [`CtxStepCall`].
620    #[must_use = "building a DAG without storing it does nothing"]
621    pub fn build(self) -> CtxDag<C, In, Chain> {
622        CtxDag {
623            chain: self.chain,
624            _marker: PhantomData,
625        }
626    }
627}
628
629impl<C, In, Chain: CtxChainCall<C, In, Out = Option<()>>> CtxDagChain<C, In, Option<()>, Chain> {
630    /// Finalize into a [`CtxDag`], discarding the `Option<()>`.
631    #[must_use = "building a DAG without storing it does nothing"]
632    pub fn build(self) -> CtxDag<C, In, CtxDiscardOptionNode<Chain>> {
633        CtxDag {
634            chain: CtxDiscardOptionNode { prev: self.chain },
635            _marker: PhantomData,
636        }
637    }
638}
639
640// =============================================================================
641// CtxDagArmSeed — arm builder seed
642// =============================================================================
643
644/// Arm builder seed for context-aware DAG. Used in `.arm()` closures.
645pub struct CtxDagArmSeed<C, In>(PhantomData<fn(&mut C, *const In)>);
646
647impl<C, In> CtxDagArmSeed<C, In> {
648    /// Create a new arm seed. Typically constructed internally by `.arm()`.
649    pub fn new() -> Self {
650        Self(PhantomData)
651    }
652}
653
654impl<C, In> Default for CtxDagArmSeed<C, In> {
655    fn default() -> Self {
656        Self::new()
657    }
658}
659
660impl<C, In: 'static> CtxDagArmSeed<C, In> {
661    /// Add the first step in this arm. Takes `&In` by reference.
662    pub fn then<Out, Params, S>(
663        self,
664        f: S,
665        registry: &Registry,
666    ) -> CtxDagArm<C, In, Out, CtxThenNode<CtxIdentityNode, S::Step>>
667    where
668        Out: 'static,
669        S: IntoCtxStep<C, &'static In, Out, Params>,
670        S::Step: for<'a> CtxStepCall<C, &'a In, Out = Out>,
671    {
672        CtxDagArm {
673            chain: CtxThenNode {
674                prev: CtxIdentityNode,
675                step: f.into_ctx_step(registry),
676            },
677            _marker: PhantomData,
678        }
679    }
680}
681
682// =============================================================================
683// CtxDagArm — built arm in a DAG fork
684// =============================================================================
685
686/// Built arm in a context-aware DAG fork.
687pub struct CtxDagArm<C, In, Out, Chain> {
688    chain: Chain,
689    _marker: PhantomData<fn(&mut C, *const In) -> Out>,
690}
691
692impl<C, In: 'static, Out: 'static, Chain> CtxDagArm<C, In, Out, Chain> {
693    /// Enter fork mode within this arm.
694    pub fn fork(self) -> CtxDagArmFork<C, In, Out, Chain, ()> {
695        CtxDagArmFork {
696            chain: self.chain,
697            arms: (),
698            _marker: PhantomData,
699        }
700    }
701}
702
703// =============================================================================
704// CtxDagChainFork / CtxDagArmFork — fork builders
705// =============================================================================
706
707/// Fork builder on the main chain. Accumulates arms as a tuple.
708pub struct CtxDagChainFork<C, In, ForkOut, Chain, Arms> {
709    chain: Chain,
710    arms: Arms,
711    _marker: PhantomData<fn(&mut C, In) -> ForkOut>,
712}
713
714/// Fork builder within an arm. Accumulates sub-arms as a tuple.
715pub struct CtxDagArmFork<C, In, ForkOut, Chain, Arms> {
716    chain: Chain,
717    arms: Arms,
718    _marker: PhantomData<fn(&mut C, *const In) -> ForkOut>,
719}
720
721// =============================================================================
722// CtxDag — built context-aware DAG
723// =============================================================================
724
725/// Built context-aware DAG.
726///
727/// Created by [`CtxDagChain::build`]. Implements [`CtxStepCall`]
728/// for use inside [`Callback`](crate::Callback) dispatch.
729pub struct CtxDag<C, In, Chain> {
730    chain: Chain,
731    _marker: PhantomData<fn(&mut C, In)>,
732}
733
734impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxStepCall<C, In> for CtxDag<C, In, Chain> {
735    type Out = ();
736    #[inline(always)]
737    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) {
738        self.chain.call(ctx, world, input);
739    }
740}
741
742impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxDag<C, In, Chain> {
743    /// Run the DAG with context, world, and input.
744    pub fn run(&mut self, ctx: &mut C, world: &mut World, input: In) {
745        self.chain.call(ctx, world, input);
746    }
747}
748
749// =============================================================================
750// Merge / Join named nodes — context-aware fork terminal nodes
751// =============================================================================
752
753/// Merge two context-aware fork arms into a single output.
754#[doc(hidden)]
755pub struct CtxMergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut> {
756    chain: Chain,
757    arm0: C0,
758    arm1: C1,
759    merge: MS,
760    _marker: PhantomData<fn(ForkOut) -> (A0, A1, MOut)>,
761}
762
763impl<Ctx, In, Chain, C0, C1, MS, ForkOut, A0, A1, MOut> CtxChainCall<Ctx, In>
764    for CtxMergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut>
765where
766    ForkOut: 'static,
767    A0: 'static,
768    A1: 'static,
769    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
770    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A0>,
771    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A1>,
772    MS: for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1), Out = MOut>,
773{
774    type Out = MOut;
775
776    #[inline(always)]
777    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) -> MOut {
778        let fork_out = self.chain.call(ctx, world, input);
779        let o0 = self.arm0.call(ctx, world, &fork_out);
780        let o1 = self.arm1.call(ctx, world, &fork_out);
781        self.merge.call(ctx, world, (&o0, &o1))
782    }
783}
784
785/// Merge three context-aware fork arms into a single output.
786#[doc(hidden)]
787pub struct CtxMergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> {
788    chain: Chain,
789    arm0: C0,
790    arm1: C1,
791    arm2: C2,
792    merge: MS,
793    _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, MOut)>,
794}
795
796impl<Ctx, In, Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> CtxChainCall<Ctx, In>
797    for CtxMergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut>
798where
799    ForkOut: 'static,
800    A0: 'static,
801    A1: 'static,
802    A2: 'static,
803    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
804    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A0>,
805    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A1>,
806    C2: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A2>,
807    MS: for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1, &'x A2), Out = MOut>,
808{
809    type Out = MOut;
810
811    #[inline(always)]
812    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) -> MOut {
813        let fork_out = self.chain.call(ctx, world, input);
814        let o0 = self.arm0.call(ctx, world, &fork_out);
815        let o1 = self.arm1.call(ctx, world, &fork_out);
816        let o2 = self.arm2.call(ctx, world, &fork_out);
817        self.merge.call(ctx, world, (&o0, &o1, &o2))
818    }
819}
820
821/// Merge four context-aware fork arms into a single output.
822#[doc(hidden)]
823pub struct CtxMergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> {
824    chain: Chain,
825    arm0: C0,
826    arm1: C1,
827    arm2: C2,
828    arm3: C3,
829    merge: MS,
830    _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, A3, MOut)>,
831}
832
833#[allow(clippy::many_single_char_names)]
834impl<Ctx, In, Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> CtxChainCall<Ctx, In>
835    for CtxMergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut>
836where
837    ForkOut: 'static,
838    A0: 'static,
839    A1: 'static,
840    A2: 'static,
841    A3: 'static,
842    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
843    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A0>,
844    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A1>,
845    C2: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A2>,
846    C3: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = A3>,
847    MS: for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1, &'x A2, &'x A3), Out = MOut>,
848{
849    type Out = MOut;
850
851    #[inline(always)]
852    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) -> MOut {
853        let fork_out = self.chain.call(ctx, world, input);
854        let o0 = self.arm0.call(ctx, world, &fork_out);
855        let o1 = self.arm1.call(ctx, world, &fork_out);
856        let o2 = self.arm2.call(ctx, world, &fork_out);
857        let o3 = self.arm3.call(ctx, world, &fork_out);
858        self.merge.call(ctx, world, (&o0, &o1, &o2, &o3))
859    }
860}
861
862/// Join two context-aware fork arms (all producing `()`).
863#[doc(hidden)]
864pub struct CtxJoinNode2<Chain, C0, C1, ForkOut> {
865    chain: Chain,
866    arm0: C0,
867    arm1: C1,
868    _marker: PhantomData<fn() -> ForkOut>,
869}
870
871impl<Ctx, In, Chain, C0, C1, ForkOut> CtxChainCall<Ctx, In> for CtxJoinNode2<Chain, C0, C1, ForkOut>
872where
873    ForkOut: 'static,
874    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
875    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
876    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
877{
878    type Out = ();
879
880    #[inline(always)]
881    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) {
882        let fork_out = self.chain.call(ctx, world, input);
883        self.arm0.call(ctx, world, &fork_out);
884        self.arm1.call(ctx, world, &fork_out);
885    }
886}
887
888/// Join three context-aware fork arms (all producing `()`).
889#[doc(hidden)]
890pub struct CtxJoinNode3<Chain, C0, C1, C2, ForkOut> {
891    chain: Chain,
892    arm0: C0,
893    arm1: C1,
894    arm2: C2,
895    _marker: PhantomData<fn() -> ForkOut>,
896}
897
898impl<Ctx, In, Chain, C0, C1, C2, ForkOut> CtxChainCall<Ctx, In>
899    for CtxJoinNode3<Chain, C0, C1, C2, ForkOut>
900where
901    ForkOut: 'static,
902    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
903    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
904    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
905    C2: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
906{
907    type Out = ();
908
909    #[inline(always)]
910    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) {
911        let fork_out = self.chain.call(ctx, world, input);
912        self.arm0.call(ctx, world, &fork_out);
913        self.arm1.call(ctx, world, &fork_out);
914        self.arm2.call(ctx, world, &fork_out);
915    }
916}
917
918/// Join four context-aware fork arms (all producing `()`).
919#[doc(hidden)]
920pub struct CtxJoinNode4<Chain, C0, C1, C2, C3, ForkOut> {
921    chain: Chain,
922    arm0: C0,
923    arm1: C1,
924    arm2: C2,
925    arm3: C3,
926    _marker: PhantomData<fn() -> ForkOut>,
927}
928
929#[allow(clippy::many_single_char_names)]
930impl<Ctx, In, Chain, C0, C1, C2, C3, ForkOut> CtxChainCall<Ctx, In>
931    for CtxJoinNode4<Chain, C0, C1, C2, C3, ForkOut>
932where
933    ForkOut: 'static,
934    Chain: CtxChainCall<Ctx, In, Out = ForkOut>,
935    C0: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
936    C1: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
937    C2: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
938    C3: for<'a> CtxChainCall<Ctx, &'a ForkOut, Out = ()>,
939{
940    type Out = ();
941
942    #[inline(always)]
943    fn call(&mut self, ctx: &mut Ctx, world: &mut World, input: In) {
944        let fork_out = self.chain.call(ctx, world, input);
945        self.arm0.call(ctx, world, &fork_out);
946        self.arm1.call(ctx, world, &fork_out);
947        self.arm2.call(ctx, world, &fork_out);
948        self.arm3.call(ctx, world, &fork_out);
949    }
950}
951
952// =============================================================================
953// Combinator macro — shared between CtxDagChain and CtxDagArm
954// =============================================================================
955
956/// Generates step combinators, Option/Result helpers for context-aware DAG.
957macro_rules! impl_ctx_dag_combinators {
958    (builder: $Builder:ident, upstream: $U:ident) => {
959        // =============================================================
960        // Core — any Out
961        // =============================================================
962
963        impl<Ctx, $U, Out: 'static, Chain> $Builder<Ctx, $U, Out, Chain> {
964            /// Append a step. The step receives `&Out` by reference.
965            pub fn then<NewOut, Params, S>(
966                self,
967                f: S,
968                registry: &Registry,
969            ) -> $Builder<Ctx, $U, NewOut, CtxDagThenNode<Chain, S::Step, NewOut>>
970            where
971                NewOut: 'static,
972                S: IntoCtxStep<Ctx, &'static Out, NewOut, Params>,
973                S::Step: for<'a> CtxStepCall<Ctx, &'a Out, Out = NewOut> + 'static,
974            {
975                $Builder {
976                    chain: CtxDagThenNode {
977                        prev: self.chain,
978                        step: f.into_ctx_step(registry),
979                        _out: PhantomData,
980                    },
981                    _marker: PhantomData,
982                }
983            }
984
985            /// Conditionally wrap the output in `Option`.
986            pub fn guard<Params, S: IntoCtxRefStep<Ctx, Out, bool, Params>>(
987                self,
988                f: S,
989                registry: &Registry,
990            ) -> $Builder<Ctx, $U, Option<Out>, CtxGuardNode<Chain, S::Step>> {
991                $Builder {
992                    chain: CtxGuardNode {
993                        prev: self.chain,
994                        step: f.into_ctx_ref_step(registry),
995                    },
996                    _marker: PhantomData,
997                }
998            }
999
1000            /// Observe the current value without consuming or changing it.
1001            pub fn tap<Params, S: IntoCtxRefStep<Ctx, Out, (), Params>>(
1002                self,
1003                f: S,
1004                registry: &Registry,
1005            ) -> $Builder<Ctx, $U, Out, CtxTapNode<Chain, S::Step>> {
1006                $Builder {
1007                    chain: CtxTapNode {
1008                        prev: self.chain,
1009                        step: f.into_ctx_ref_step(registry),
1010                    },
1011                    _marker: PhantomData,
1012                }
1013            }
1014        }
1015
1016        // =============================================================
1017        // Option helpers
1018        // =============================================================
1019
1020        impl<Ctx, $U, T: 'static, Chain> $Builder<Ctx, $U, Option<T>, Chain> {
1021            /// Transform the inner value. Step not called on None.
1022            pub fn map<U, Params, S>(
1023                self,
1024                f: S,
1025                registry: &Registry,
1026            ) -> $Builder<Ctx, $U, Option<U>, CtxDagMapOptionNode<Chain, S::Step, U>>
1027            where
1028                U: 'static,
1029                S: IntoCtxStep<Ctx, &'static T, U, Params>,
1030                S::Step: for<'x> CtxStepCall<Ctx, &'x T, Out = U>,
1031            {
1032                $Builder {
1033                    chain: CtxDagMapOptionNode {
1034                        prev: self.chain,
1035                        step: f.into_ctx_step(registry),
1036                        _out: PhantomData,
1037                    },
1038                    _marker: PhantomData,
1039                }
1040            }
1041
1042            /// Short-circuits on None. std: `Option::and_then`
1043            pub fn and_then<U, Params, S>(
1044                self,
1045                f: S,
1046                registry: &Registry,
1047            ) -> $Builder<Ctx, $U, Option<U>, CtxDagAndThenOptionNode<Chain, S::Step, U>>
1048            where
1049                U: 'static,
1050                S: IntoCtxStep<Ctx, &'static T, Option<U>, Params>,
1051                S::Step: for<'x> CtxStepCall<Ctx, &'x T, Out = Option<U>>,
1052            {
1053                $Builder {
1054                    chain: CtxDagAndThenOptionNode {
1055                        prev: self.chain,
1056                        step: f.into_ctx_step(registry),
1057                        _out: PhantomData,
1058                    },
1059                    _marker: PhantomData,
1060                }
1061            }
1062
1063            /// Side effect on None.
1064            pub fn on_none<Params, S: IntoCtxProducer<Ctx, (), Params>>(
1065                self,
1066                f: S,
1067                registry: &Registry,
1068            ) -> $Builder<Ctx, $U, Option<T>, CtxOnNoneNode<Chain, S::Step>> {
1069                $Builder {
1070                    chain: CtxOnNoneNode {
1071                        prev: self.chain,
1072                        producer: f.into_ctx_producer(registry),
1073                    },
1074                    _marker: PhantomData,
1075                }
1076            }
1077
1078            /// Keep value if predicate holds. std: `Option::filter`
1079            pub fn filter<Params, S: IntoCtxRefStep<Ctx, T, bool, Params>>(
1080                self,
1081                f: S,
1082                registry: &Registry,
1083            ) -> $Builder<Ctx, $U, Option<T>, CtxFilterNode<Chain, S::Step>> {
1084                $Builder {
1085                    chain: CtxFilterNode {
1086                        prev: self.chain,
1087                        step: f.into_ctx_ref_step(registry),
1088                    },
1089                    _marker: PhantomData,
1090                }
1091            }
1092
1093            /// Side effect on Some value. std: `Option::inspect`
1094            pub fn inspect<Params, S: IntoCtxRefStep<Ctx, T, (), Params>>(
1095                self,
1096                f: S,
1097                registry: &Registry,
1098            ) -> $Builder<Ctx, $U, Option<T>, CtxInspectOptionNode<Chain, S::Step>> {
1099                $Builder {
1100                    chain: CtxInspectOptionNode {
1101                        prev: self.chain,
1102                        step: f.into_ctx_ref_step(registry),
1103                    },
1104                    _marker: PhantomData,
1105                }
1106            }
1107
1108            /// None becomes Err(err). std: `Option::ok_or`
1109            pub fn ok_or<E: Clone>(
1110                self,
1111                err: E,
1112            ) -> $Builder<Ctx, $U, Result<T, E>, CtxOkOrNode<Chain, E>> {
1113                $Builder {
1114                    chain: CtxOkOrNode {
1115                        prev: self.chain,
1116                        err,
1117                    },
1118                    _marker: PhantomData,
1119                }
1120            }
1121
1122            /// Exit Option — None becomes the default value.
1123            pub fn unwrap_or(
1124                self,
1125                default: T,
1126            ) -> $Builder<Ctx, $U, T, CtxUnwrapOrOptionNode<Chain, T>>
1127            where
1128                T: Clone,
1129            {
1130                $Builder {
1131                    chain: CtxUnwrapOrOptionNode {
1132                        prev: self.chain,
1133                        default,
1134                    },
1135                    _marker: PhantomData,
1136                }
1137            }
1138
1139            /// Exit Option — None becomes `f()`.
1140            pub fn unwrap_or_else<Params, S: IntoCtxProducer<Ctx, T, Params>>(
1141                self,
1142                f: S,
1143                registry: &Registry,
1144            ) -> $Builder<Ctx, $U, T, CtxUnwrapOrElseOptionNode<Chain, S::Step>> {
1145                $Builder {
1146                    chain: CtxUnwrapOrElseOptionNode {
1147                        prev: self.chain,
1148                        producer: f.into_ctx_producer(registry),
1149                    },
1150                    _marker: PhantomData,
1151                }
1152            }
1153        }
1154
1155        // =============================================================
1156        // Result helpers
1157        // =============================================================
1158
1159        impl<Ctx, $U, T: 'static, Err: 'static, Chain> $Builder<Ctx, $U, Result<T, Err>, Chain> {
1160            /// Transform the Ok value. Step not called on Err.
1161            pub fn map<U, Params, S>(
1162                self,
1163                f: S,
1164                registry: &Registry,
1165            ) -> $Builder<Ctx, $U, Result<U, Err>, CtxDagMapResultNode<Chain, S::Step, U>>
1166            where
1167                U: 'static,
1168                S: IntoCtxStep<Ctx, &'static T, U, Params>,
1169                S::Step: for<'x> CtxStepCall<Ctx, &'x T, Out = U>,
1170            {
1171                $Builder {
1172                    chain: CtxDagMapResultNode {
1173                        prev: self.chain,
1174                        step: f.into_ctx_step(registry),
1175                        _out: PhantomData,
1176                    },
1177                    _marker: PhantomData,
1178                }
1179            }
1180
1181            /// Short-circuits on Err. std: `Result::and_then`
1182            pub fn and_then<U, Params, S>(
1183                self,
1184                f: S,
1185                registry: &Registry,
1186            ) -> $Builder<Ctx, $U, Result<U, Err>, CtxDagAndThenResultNode<Chain, S::Step, U>>
1187            where
1188                U: 'static,
1189                S: IntoCtxStep<Ctx, &'static T, Result<U, Err>, Params>,
1190                S::Step: for<'x> CtxStepCall<Ctx, &'x T, Out = Result<U, Err>>,
1191            {
1192                $Builder {
1193                    chain: CtxDagAndThenResultNode {
1194                        prev: self.chain,
1195                        step: f.into_ctx_step(registry),
1196                        _out: PhantomData,
1197                    },
1198                    _marker: PhantomData,
1199                }
1200            }
1201
1202            /// Handle error and transition to Option.
1203            pub fn catch<Params, S>(
1204                self,
1205                f: S,
1206                registry: &Registry,
1207            ) -> $Builder<Ctx, $U, Option<T>, CtxDagCatchNode<Chain, S::Step>>
1208            where
1209                S: IntoCtxStep<Ctx, &'static Err, (), Params>,
1210                S::Step: for<'x> CtxStepCall<Ctx, &'x Err, Out = ()>,
1211            {
1212                $Builder {
1213                    chain: CtxDagCatchNode {
1214                        prev: self.chain,
1215                        step: f.into_ctx_step(registry),
1216                    },
1217                    _marker: PhantomData,
1218                }
1219            }
1220
1221            /// Transform the error. std: `Result::map_err`
1222            pub fn map_err<E2, Params, S: IntoCtxStep<Ctx, Err, E2, Params>>(
1223                self,
1224                f: S,
1225                registry: &Registry,
1226            ) -> $Builder<Ctx, $U, Result<T, E2>, CtxMapErrNode<Chain, S::Step>> {
1227                $Builder {
1228                    chain: CtxMapErrNode {
1229                        prev: self.chain,
1230                        step: f.into_ctx_step(registry),
1231                    },
1232                    _marker: PhantomData,
1233                }
1234            }
1235
1236            /// Side effect on Ok value. std: `Result::inspect`
1237            pub fn inspect<Params, S: IntoCtxRefStep<Ctx, T, (), Params>>(
1238                self,
1239                f: S,
1240                registry: &Registry,
1241            ) -> $Builder<Ctx, $U, Result<T, Err>, CtxInspectResultNode<Chain, S::Step>> {
1242                $Builder {
1243                    chain: CtxInspectResultNode {
1244                        prev: self.chain,
1245                        step: f.into_ctx_ref_step(registry),
1246                    },
1247                    _marker: PhantomData,
1248                }
1249            }
1250
1251            /// Side effect on Err. std: `Result::inspect_err`
1252            pub fn inspect_err<Params, S: IntoCtxRefStep<Ctx, Err, (), Params>>(
1253                self,
1254                f: S,
1255                registry: &Registry,
1256            ) -> $Builder<Ctx, $U, Result<T, Err>, CtxInspectErrNode<Chain, S::Step>> {
1257                $Builder {
1258                    chain: CtxInspectErrNode {
1259                        prev: self.chain,
1260                        step: f.into_ctx_ref_step(registry),
1261                    },
1262                    _marker: PhantomData,
1263                }
1264            }
1265
1266            /// Discard error, enter Option land. std: `Result::ok`
1267            pub fn ok(self) -> $Builder<Ctx, $U, Option<T>, CtxOkNode<Chain>> {
1268                $Builder {
1269                    chain: CtxOkNode { prev: self.chain },
1270                    _marker: PhantomData,
1271                }
1272            }
1273
1274            /// Exit Result — Err becomes the default value.
1275            pub fn unwrap_or(
1276                self,
1277                default: T,
1278            ) -> $Builder<Ctx, $U, T, CtxUnwrapOrResultNode<Chain, T>>
1279            where
1280                T: Clone,
1281            {
1282                $Builder {
1283                    chain: CtxUnwrapOrResultNode {
1284                        prev: self.chain,
1285                        default,
1286                    },
1287                    _marker: PhantomData,
1288                }
1289            }
1290        }
1291    };
1292}
1293
1294impl_ctx_dag_combinators!(builder: CtxDagChain, upstream: In);
1295impl_ctx_dag_combinators!(builder: CtxDagArm, upstream: In);
1296
1297// =============================================================================
1298// Fork arity macro — arm accumulation, merge, join
1299// =============================================================================
1300
1301/// Generates arm accumulation, merge, and join for a context-aware fork type.
1302macro_rules! impl_ctx_dag_fork {
1303    (
1304        fork: $Fork:ident,
1305        output: $Output:ident,
1306        upstream: $U:ident
1307    ) => {
1308        // =============================================================
1309        // Arm accumulation: 0->1, 1->2, 2->3, 3->4
1310        // =============================================================
1311
1312        impl<Ctx, $U, ForkOut, Chain> $Fork<Ctx, $U, ForkOut, Chain, ()> {
1313            /// Add the first arm to this fork.
1314            pub fn arm<AOut, ACh>(
1315                self,
1316                f: impl FnOnce(CtxDagArmSeed<Ctx, ForkOut>) -> CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1317            ) -> $Fork<Ctx, $U, ForkOut, Chain, (CtxDagArm<Ctx, ForkOut, AOut, ACh>,)> {
1318                let arm = f(CtxDagArmSeed(PhantomData));
1319                $Fork {
1320                    chain: self.chain,
1321                    arms: (arm,),
1322                    _marker: PhantomData,
1323                }
1324            }
1325        }
1326
1327        impl<Ctx, $U, ForkOut, Chain, A0, C0>
1328            $Fork<Ctx, $U, ForkOut, Chain, (CtxDagArm<Ctx, ForkOut, A0, C0>,)>
1329        {
1330            /// Add a second arm to this fork.
1331            pub fn arm<AOut, ACh>(
1332                self,
1333                f: impl FnOnce(CtxDagArmSeed<Ctx, ForkOut>) -> CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1334            ) -> $Fork<
1335                Ctx,
1336                $U,
1337                ForkOut,
1338                Chain,
1339                (
1340                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1341                    CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1342                ),
1343            > {
1344                let arm = f(CtxDagArmSeed(PhantomData));
1345                let (a0,) = self.arms;
1346                $Fork {
1347                    chain: self.chain,
1348                    arms: (a0, arm),
1349                    _marker: PhantomData,
1350                }
1351            }
1352        }
1353
1354        impl<Ctx, $U, ForkOut, Chain, A0, C0, A1, C1>
1355            $Fork<
1356                Ctx,
1357                $U,
1358                ForkOut,
1359                Chain,
1360                (
1361                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1362                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1363                ),
1364            >
1365        {
1366            /// Add a third arm to this fork.
1367            pub fn arm<AOut, ACh>(
1368                self,
1369                f: impl FnOnce(CtxDagArmSeed<Ctx, ForkOut>) -> CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1370            ) -> $Fork<
1371                Ctx,
1372                $U,
1373                ForkOut,
1374                Chain,
1375                (
1376                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1377                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1378                    CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1379                ),
1380            > {
1381                let arm = f(CtxDagArmSeed(PhantomData));
1382                let (a0, a1) = self.arms;
1383                $Fork {
1384                    chain: self.chain,
1385                    arms: (a0, a1, arm),
1386                    _marker: PhantomData,
1387                }
1388            }
1389        }
1390
1391        impl<Ctx, $U, ForkOut, Chain, A0, C0, A1, C1, A2, C2>
1392            $Fork<
1393                Ctx,
1394                $U,
1395                ForkOut,
1396                Chain,
1397                (
1398                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1399                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1400                    CtxDagArm<Ctx, ForkOut, A2, C2>,
1401                ),
1402            >
1403        {
1404            /// Add a fourth arm to this fork.
1405            pub fn arm<AOut, ACh>(
1406                self,
1407                f: impl FnOnce(CtxDagArmSeed<Ctx, ForkOut>) -> CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1408            ) -> $Fork<
1409                Ctx,
1410                $U,
1411                ForkOut,
1412                Chain,
1413                (
1414                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1415                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1416                    CtxDagArm<Ctx, ForkOut, A2, C2>,
1417                    CtxDagArm<Ctx, ForkOut, AOut, ACh>,
1418                ),
1419            > {
1420                let arm = f(CtxDagArmSeed(PhantomData));
1421                let (a0, a1, a2) = self.arms;
1422                $Fork {
1423                    chain: self.chain,
1424                    arms: (a0, a1, a2, arm),
1425                    _marker: PhantomData,
1426                }
1427            }
1428        }
1429
1430        // =============================================================
1431        // Merge arity 2
1432        // =============================================================
1433
1434        impl<Ctx, $U, ForkOut: 'static, Chain, A0: 'static, C0, A1: 'static, C1>
1435            $Fork<
1436                Ctx,
1437                $U,
1438                ForkOut,
1439                Chain,
1440                (
1441                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1442                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1443                ),
1444            >
1445        {
1446            /// Merge two arms with a context-aware merge step.
1447            pub fn merge<MOut, Params, S>(
1448                self,
1449                f: S,
1450                registry: &Registry,
1451            ) -> $Output<
1452                Ctx,
1453                $U,
1454                MOut,
1455                CtxMergeNode2<Chain, C0, C1, S::Step, ForkOut, A0, A1, MOut>,
1456            >
1457            where
1458                MOut: 'static,
1459                S: IntoCtxMergeStep<Ctx, (&'static A0, &'static A1), MOut, Params>,
1460                S::Step: for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1), Out = MOut>,
1461            {
1462                let (a0, a1) = self.arms;
1463                $Output {
1464                    chain: CtxMergeNode2 {
1465                        chain: self.chain,
1466                        arm0: a0.chain,
1467                        arm1: a1.chain,
1468                        merge: f.into_ctx_merge_step(registry),
1469                        _marker: PhantomData,
1470                    },
1471                    _marker: PhantomData,
1472                }
1473            }
1474        }
1475
1476        impl<Ctx, $U, ForkOut: 'static, Chain, C0, C1>
1477            $Fork<
1478                Ctx,
1479                $U,
1480                ForkOut,
1481                Chain,
1482                (
1483                    CtxDagArm<Ctx, ForkOut, (), C0>,
1484                    CtxDagArm<Ctx, ForkOut, (), C1>,
1485                ),
1486            >
1487        {
1488            /// Join two sink arms (all producing `()`).
1489            pub fn join(
1490                self,
1491            ) -> $Output<Ctx, $U, (), CtxJoinNode2<Chain, C0, C1, ForkOut>> {
1492                let (a0, a1) = self.arms;
1493                $Output {
1494                    chain: CtxJoinNode2 {
1495                        chain: self.chain,
1496                        arm0: a0.chain,
1497                        arm1: a1.chain,
1498                        _marker: PhantomData,
1499                    },
1500                    _marker: PhantomData,
1501                }
1502            }
1503        }
1504
1505        // =============================================================
1506        // Merge arity 3
1507        // =============================================================
1508
1509        impl<
1510            Ctx,
1511            $U,
1512            ForkOut: 'static,
1513            Chain,
1514            A0: 'static,
1515            C0,
1516            A1: 'static,
1517            C1,
1518            A2: 'static,
1519            C2,
1520        >
1521            $Fork<
1522                Ctx,
1523                $U,
1524                ForkOut,
1525                Chain,
1526                (
1527                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1528                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1529                    CtxDagArm<Ctx, ForkOut, A2, C2>,
1530                ),
1531            >
1532        {
1533            /// Merge three arms with a context-aware merge step.
1534            pub fn merge<MOut, Params, S>(
1535                self,
1536                f: S,
1537                registry: &Registry,
1538            ) -> $Output<
1539                Ctx,
1540                $U,
1541                MOut,
1542                CtxMergeNode3<Chain, C0, C1, C2, S::Step, ForkOut, A0, A1, A2, MOut>,
1543            >
1544            where
1545                MOut: 'static,
1546                S: IntoCtxMergeStep<
1547                    Ctx,
1548                    (&'static A0, &'static A1, &'static A2),
1549                    MOut,
1550                    Params,
1551                >,
1552                S::Step:
1553                    for<'x> CtxMergeStepCall<Ctx, (&'x A0, &'x A1, &'x A2), Out = MOut>,
1554            {
1555                let (a0, a1, a2) = self.arms;
1556                $Output {
1557                    chain: CtxMergeNode3 {
1558                        chain: self.chain,
1559                        arm0: a0.chain,
1560                        arm1: a1.chain,
1561                        arm2: a2.chain,
1562                        merge: f.into_ctx_merge_step(registry),
1563                        _marker: PhantomData,
1564                    },
1565                    _marker: PhantomData,
1566                }
1567            }
1568        }
1569
1570        impl<Ctx, $U, ForkOut: 'static, Chain, C0, C1, C2>
1571            $Fork<
1572                Ctx,
1573                $U,
1574                ForkOut,
1575                Chain,
1576                (
1577                    CtxDagArm<Ctx, ForkOut, (), C0>,
1578                    CtxDagArm<Ctx, ForkOut, (), C1>,
1579                    CtxDagArm<Ctx, ForkOut, (), C2>,
1580                ),
1581            >
1582        {
1583            /// Join three sink arms (all producing `()`).
1584            pub fn join(
1585                self,
1586            ) -> $Output<Ctx, $U, (), CtxJoinNode3<Chain, C0, C1, C2, ForkOut>> {
1587                let (a0, a1, a2) = self.arms;
1588                $Output {
1589                    chain: CtxJoinNode3 {
1590                        chain: self.chain,
1591                        arm0: a0.chain,
1592                        arm1: a1.chain,
1593                        arm2: a2.chain,
1594                        _marker: PhantomData,
1595                    },
1596                    _marker: PhantomData,
1597                }
1598            }
1599        }
1600
1601        // =============================================================
1602        // Merge arity 4
1603        // =============================================================
1604
1605        #[allow(clippy::many_single_char_names)]
1606        impl<
1607            Ctx,
1608            $U,
1609            ForkOut: 'static,
1610            Chain,
1611            A0: 'static,
1612            C0,
1613            A1: 'static,
1614            C1,
1615            A2: 'static,
1616            C2,
1617            A3: 'static,
1618            C3,
1619        >
1620            $Fork<
1621                Ctx,
1622                $U,
1623                ForkOut,
1624                Chain,
1625                (
1626                    CtxDagArm<Ctx, ForkOut, A0, C0>,
1627                    CtxDagArm<Ctx, ForkOut, A1, C1>,
1628                    CtxDagArm<Ctx, ForkOut, A2, C2>,
1629                    CtxDagArm<Ctx, ForkOut, A3, C3>,
1630                ),
1631            >
1632        {
1633            /// Merge four arms with a context-aware merge step.
1634            pub fn merge<MOut, Params, S>(
1635                self,
1636                f: S,
1637                registry: &Registry,
1638            ) -> $Output<
1639                Ctx,
1640                $U,
1641                MOut,
1642                CtxMergeNode4<Chain, C0, C1, C2, C3, S::Step, ForkOut, A0, A1, A2, A3, MOut>,
1643            >
1644            where
1645                MOut: 'static,
1646                S: IntoCtxMergeStep<
1647                    Ctx,
1648                    (&'static A0, &'static A1, &'static A2, &'static A3),
1649                    MOut,
1650                    Params,
1651                >,
1652                S::Step: for<'x> CtxMergeStepCall<
1653                    Ctx,
1654                    (&'x A0, &'x A1, &'x A2, &'x A3),
1655                    Out = MOut,
1656                >,
1657            {
1658                let (a0, a1, a2, a3) = self.arms;
1659                $Output {
1660                    chain: CtxMergeNode4 {
1661                        chain: self.chain,
1662                        arm0: a0.chain,
1663                        arm1: a1.chain,
1664                        arm2: a2.chain,
1665                        arm3: a3.chain,
1666                        merge: f.into_ctx_merge_step(registry),
1667                        _marker: PhantomData,
1668                    },
1669                    _marker: PhantomData,
1670                }
1671            }
1672        }
1673
1674        impl<Ctx, $U, ForkOut: 'static, Chain, C0, C1, C2, C3>
1675            $Fork<
1676                Ctx,
1677                $U,
1678                ForkOut,
1679                Chain,
1680                (
1681                    CtxDagArm<Ctx, ForkOut, (), C0>,
1682                    CtxDagArm<Ctx, ForkOut, (), C1>,
1683                    CtxDagArm<Ctx, ForkOut, (), C2>,
1684                    CtxDagArm<Ctx, ForkOut, (), C3>,
1685                ),
1686            >
1687        {
1688            /// Join four sink arms (all producing `()`).
1689            pub fn join(
1690                self,
1691            ) -> $Output<Ctx, $U, (), CtxJoinNode4<Chain, C0, C1, C2, C3, ForkOut>> {
1692                let (a0, a1, a2, a3) = self.arms;
1693                $Output {
1694                    chain: CtxJoinNode4 {
1695                        chain: self.chain,
1696                        arm0: a0.chain,
1697                        arm1: a1.chain,
1698                        arm2: a2.chain,
1699                        arm3: a3.chain,
1700                        _marker: PhantomData,
1701                    },
1702                    _marker: PhantomData,
1703                }
1704            }
1705        }
1706    };
1707}
1708
1709impl_ctx_dag_fork!(fork: CtxDagChainFork, output: CtxDagChain, upstream: In);
1710impl_ctx_dag_fork!(fork: CtxDagArmFork, output: CtxDagArm, upstream: In);
1711
1712// =============================================================================
1713// Tests
1714// =============================================================================
1715
1716#[cfg(test)]
1717mod tests {
1718    use super::*;
1719    use crate::{Res, ResMut, WorldBuilder};
1720
1721    struct TradingCtx {
1722        book_updates: u32,
1723        risk_checks: u32,
1724        submissions: u32,
1725    }
1726
1727    // -- Fork/merge basic test ------------------------------------------------
1728
1729    #[test]
1730    fn ctx_dag_fork_merge_two_arms() {
1731        let mut wb = WorldBuilder::new();
1732        wb.register::<u64>(0);
1733        let mut world = wb.build();
1734        let reg = world.registry();
1735
1736        fn decode(ctx: &mut TradingCtx, raw: u32) -> u64 {
1737            ctx.book_updates += 1;
1738            raw as u64
1739        }
1740
1741        fn arm_a(ctx: &mut TradingCtx, val: &u64) -> u64 {
1742            ctx.book_updates += 1;
1743            *val * 2
1744        }
1745
1746        fn arm_b(ctx: &mut TradingCtx, val: &u64) -> u64 {
1747            ctx.risk_checks += 1;
1748            *val + 10
1749        }
1750
1751        fn merge_fn(ctx: &mut TradingCtx, a: &u64, b: &u64) {
1752            ctx.submissions += 1;
1753            assert_eq!(*a, 10); // 5 * 2
1754            assert_eq!(*b, 15); // 5 + 10
1755        }
1756
1757        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1758            .root(decode, &reg)
1759            .fork()
1760            .arm(|seed| seed.then(arm_a, &reg))
1761            .arm(|seed| seed.then(arm_b, &reg))
1762            .merge(merge_fn, &reg)
1763            .build();
1764
1765        let mut ctx = TradingCtx {
1766            book_updates: 0,
1767            risk_checks: 0,
1768            submissions: 0,
1769        };
1770
1771        dag.run(&mut ctx, &mut world, 5);
1772
1773        assert_eq!(ctx.book_updates, 2); // decode + arm_a
1774        assert_eq!(ctx.risk_checks, 1);
1775        assert_eq!(ctx.submissions, 1);
1776    }
1777
1778    // -- Linear chain test ----------------------------------------------------
1779
1780    #[test]
1781    fn ctx_dag_linear_then() {
1782        let mut wb = WorldBuilder::new();
1783        wb.register::<u64>(0);
1784        let mut world = wb.build();
1785        let reg = world.registry();
1786
1787        fn root(ctx: &mut TradingCtx, x: u32) -> u64 {
1788            ctx.book_updates += 1;
1789            x as u64 * 2
1790        }
1791
1792        fn store(ctx: &mut TradingCtx, mut out: ResMut<u64>, val: &u64) {
1793            ctx.submissions += 1;
1794            *out = *val;
1795        }
1796
1797        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1798            .root(root, &reg)
1799            .then(store, &reg)
1800            .build();
1801
1802        let mut ctx = TradingCtx {
1803            book_updates: 0,
1804            risk_checks: 0,
1805            submissions: 0,
1806        };
1807
1808        dag.run(&mut ctx, &mut world, 5);
1809
1810        assert_eq!(ctx.book_updates, 1);
1811        assert_eq!(ctx.submissions, 1);
1812        assert_eq!(*world.resource::<u64>(), 10);
1813    }
1814
1815    // -- Join test (all arms produce ()) --------------------------------------
1816
1817    #[test]
1818    fn ctx_dag_fork_join() {
1819        let mut world = WorldBuilder::new().build();
1820        let reg = world.registry();
1821
1822        fn root(_ctx: &mut TradingCtx, x: u32) -> u64 {
1823            x as u64
1824        }
1825
1826        fn side_a(ctx: &mut TradingCtx, _val: &u64) {
1827            ctx.book_updates += 1;
1828        }
1829
1830        fn side_b(ctx: &mut TradingCtx, _val: &u64) {
1831            ctx.risk_checks += 1;
1832        }
1833
1834        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1835            .root(root, &reg)
1836            .fork()
1837            .arm(|seed| seed.then(side_a, &reg))
1838            .arm(|seed| seed.then(side_b, &reg))
1839            .join()
1840            .build();
1841
1842        let mut ctx = TradingCtx {
1843            book_updates: 0,
1844            risk_checks: 0,
1845            submissions: 0,
1846        };
1847
1848        dag.run(&mut ctx, &mut world, 42);
1849
1850        assert_eq!(ctx.book_updates, 1);
1851        assert_eq!(ctx.risk_checks, 1);
1852    }
1853
1854    // -- Guard + merge --------------------------------------------------------
1855
1856    #[test]
1857    fn ctx_dag_guard_before_fork() {
1858        let mut world = WorldBuilder::new().build();
1859        let reg = world.registry();
1860
1861        fn root(_ctx: &mut TradingCtx, x: u32) -> u32 {
1862            x
1863        }
1864
1865        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1866            .root(root, &reg)
1867            .guard(|_ctx: &mut TradingCtx, x: &u32| *x > 10, &reg)
1868            .map(
1869                |ctx: &mut TradingCtx, x: &u32| {
1870                    ctx.submissions += 1;
1871                    *x * 2
1872                },
1873                &reg,
1874            )
1875            .unwrap_or(0u32)
1876            .then(
1877                |ctx: &mut TradingCtx, val: &u32| {
1878                    ctx.book_updates = *val;
1879                },
1880                &reg,
1881            )
1882            .build();
1883
1884        let mut ctx = TradingCtx {
1885            book_updates: 0,
1886            risk_checks: 0,
1887            submissions: 0,
1888        };
1889
1890        // x = 5, guard fails
1891        dag.run(&mut ctx, &mut world, 5);
1892        assert_eq!(ctx.book_updates, 0);
1893        assert_eq!(ctx.submissions, 0);
1894
1895        // x = 20, guard passes
1896        dag.run(&mut ctx, &mut world, 20);
1897        assert_eq!(ctx.book_updates, 40);
1898        assert_eq!(ctx.submissions, 1);
1899    }
1900
1901    // -- 3-arm fork test ------------------------------------------------------
1902
1903    #[test]
1904    fn ctx_dag_three_arm_fork_merge() {
1905        let mut wb = WorldBuilder::new();
1906        wb.register::<u64>(0);
1907        let mut world = wb.build();
1908        let reg = world.registry();
1909
1910        fn decode(ctx: &mut TradingCtx, raw: u32) -> u64 {
1911            ctx.book_updates += 1;
1912            raw as u64
1913        }
1914
1915        fn arm_a(ctx: &mut TradingCtx, val: &u64) -> u64 {
1916            ctx.book_updates += 1;
1917            *val * 2
1918        }
1919
1920        fn arm_b(ctx: &mut TradingCtx, val: &u64) -> u64 {
1921            ctx.risk_checks += 1;
1922            *val + 10
1923        }
1924
1925        fn arm_c(ctx: &mut TradingCtx, val: &u64) -> u64 {
1926            ctx.submissions += 1;
1927            *val * 3
1928        }
1929
1930        fn merge3(ctx: &mut TradingCtx, a: &u64, b: &u64, c: &u64) {
1931            ctx.submissions += 1;
1932            assert_eq!(*a, 10); // 5 * 2
1933            assert_eq!(*b, 15); // 5 + 10
1934            assert_eq!(*c, 15); // 5 * 3
1935        }
1936
1937        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1938            .root(decode, &reg)
1939            .fork()
1940            .arm(|seed| seed.then(arm_a, &reg))
1941            .arm(|seed| seed.then(arm_b, &reg))
1942            .arm(|seed| seed.then(arm_c, &reg))
1943            .merge(merge3, &reg)
1944            .build();
1945
1946        let mut ctx = TradingCtx {
1947            book_updates: 0,
1948            risk_checks: 0,
1949            submissions: 0,
1950        };
1951
1952        dag.run(&mut ctx, &mut world, 5);
1953
1954        assert_eq!(ctx.book_updates, 2); // decode + arm_a
1955        assert_eq!(ctx.risk_checks, 1); // arm_b
1956        assert_eq!(ctx.submissions, 2); // arm_c + merge3
1957    }
1958
1959    // -- Merge with Param resolution ------------------------------------------
1960
1961    #[test]
1962    fn ctx_dag_merge_with_param() {
1963        let mut wb = WorldBuilder::new();
1964        wb.register::<u64>(100);
1965        let mut world = wb.build();
1966        let reg = world.registry();
1967
1968        fn decode(ctx: &mut TradingCtx, raw: u32) -> u64 {
1969            ctx.book_updates += 1;
1970            raw as u64
1971        }
1972
1973        fn arm_a(_ctx: &mut TradingCtx, val: &u64) -> u64 {
1974            *val * 2
1975        }
1976
1977        fn arm_b(_ctx: &mut TradingCtx, val: &u64) -> u64 {
1978            *val + 10
1979        }
1980
1981        // Merge step that uses Res<u64> — exercises the unsafe Param::fetch path
1982        fn merge_with_res(ctx: &mut TradingCtx, scale: Res<u64>, a: &u64, b: &u64) {
1983            ctx.submissions += 1;
1984            // scale=100, a=10 (5*2), b=15 (5+10)
1985            assert_eq!(*scale, 100);
1986            assert_eq!(*a + *b, 25);
1987        }
1988
1989        let mut dag = CtxDagBuilder::<TradingCtx, u32>::new()
1990            .root(decode, &reg)
1991            .fork()
1992            .arm(|seed| seed.then(arm_a, &reg))
1993            .arm(|seed| seed.then(arm_b, &reg))
1994            .merge(merge_with_res, &reg)
1995            .build();
1996
1997        let mut ctx = TradingCtx {
1998            book_updates: 0,
1999            risk_checks: 0,
2000            submissions: 0,
2001        };
2002
2003        dag.run(&mut ctx, &mut world, 5);
2004
2005        assert_eq!(ctx.book_updates, 1);
2006        assert_eq!(ctx.submissions, 1);
2007    }
2008}