Skip to main content

nexus_rt/
dag.rs

1// Builder return types use complex generics for compile-time edge validation.
2#![allow(clippy::type_complexity)]
3
4//! DAG pipeline — monomorphized data-flow graphs with fan-out and merge.
5//!
6//! [`DagBuilder`] begins a typed DAG that encodes topology in the type system.
7//! After monomorphization, the entire DAG is a single flat function with
8//! all values as stack locals — no arena, no vtable dispatch. The only
9//! `unsafe` is in the shared [`Param::fetch`](crate::Param) path
10//! (resource access by pre-resolved index).
11//!
12//! Nodes receive their input **by reference** — fan-out is free (multiple
13//! arms borrow the same stack local). Nodes produce owned output values
14//! passed to the next step.
15//!
16//! # When to use
17//!
18//! Use DAG pipelines when data needs to fan out to multiple arms and
19//! merge back. For linear chains, prefer [`PipelineBuilder`](crate::PipelineBuilder).
20//! For dynamic fan-out by reference, use [`FanOut`](crate::FanOut) or
21//! [`Broadcast`](crate::Broadcast).
22//!
23//! # Flow control
24//!
25//! Option and Result combinators (`.guard()`, `.map()`, `.and_then()`,
26//! `.filter()`, `.catch()`, etc.) work on both the main chain and
27//! within arms.
28//!
29//! **Within an arm**, `None` / `Err` short-circuits the remaining steps
30//! in **that arm only**. Sibling arms execute unconditionally. The merge
31//! step receives whatever each arm produced (including `None`).
32//!
33//! `.tap()` observes the value mid-chain without consuming or changing it.
34//!
35//! `.route()` is binary conditional routing — evaluates a predicate and
36//! executes exactly one of two arms. Both arms must produce the same
37//! output type. For N-ary routing, nest `route` calls.
38//!
39//! To skip an entire fork, resolve Option/Result **before** `.fork()`:
40//!
41//! ```ignore
42//! DagBuilder::<RawMsg>::new()
43//!     .root(decode, reg)
44//!     .guard(|msg: &RawMsg| !msg.is_empty(), reg)  // None skips everything below
45//!     .unwrap_or(default)                           // → T, enter fork with concrete type
46//!     .fork()
47//!     // arms work with &T, not &Option<T>
48//! ```
49//!
50//! # Combinator quick reference
51//!
52//! **Topology:** `.root()`, `.then()`, `.fork()`, `.arm()`, `.merge()`,
53//! `.join()`, `.build()`
54//!
55//! **Flow control:** `.guard()`, `.tap()`, `.route()`, `.tee()`, `.scan()`,
56//! `.dedup()`
57//!
58//! **Tuple `(A, B, ...)` (2-5 elements):** `.splat()` (→ splat builder,
59//! call `.then()` with destructured `&T` args)
60//!
61//! **Option:** `.map()`, `.filter()`, `.inspect()`, `.and_then()`,
62//! `.on_none()`, `.ok_or()`, `.unwrap_or()`
63//!
64//! **Result:** `.map()`, `.and_then()`, `.catch()`, `.map_err()`,
65//! `.ok()`, `.unwrap_or()`
66//!
67//! **Bool:** `.not()`, `.and()`, `.or()`, `.xor()`
68//!
69//! **Terminal:** `.dispatch()`, `.cloned()`, `.build()`, `.build_batch(cap)`
70//!
71//! All combinators accepting functions resolve `Param` dependencies at build
72//! time via `IntoStep`, `IntoRefStep`, or `IntoProducer` — named functions
73//! get direct-pointer access. Arity-0 closures work everywhere. Raw
74//! `&mut World` closures are available as an escape hatch via `Opaque`.
75//!
76//! # Splat — tuple destructuring
77//!
78//! When a step returns a tuple, the next step normally receives the
79//! whole tuple as `&(A, B)`. `.splat()` destructures it into individual
80//! reference arguments (`&A, &B`), reusing the existing merge step
81//! infrastructure:
82//!
83//! ```ignore
84//! fn split(t: Tick) -> (f64, u64) { (t.price, t.size) }
85//! fn weighted(price: &f64, size: &u64) -> f64 { *price * *size as f64 }
86//!
87//! DagBuilder::<Tick>::new()
88//!     .root(split, reg)       // Tick → (f64, u64)
89//!     .splat()                // destructure
90//!     .then(weighted, reg)    // (&f64, &u64) → f64
91//!     .build();
92//! ```
93//!
94//! Supported for tuples of 2-5 elements. Beyond 5, define a named
95//! struct — if a combinator stage needs that many arguments, a struct
96//! makes the intent clearer and the code more maintainable.
97//!
98//! # Node signatures
99//!
100//! The root node takes the event by value. All other nodes take their
101//! input by reference:
102//!
103//! ```ignore
104//! // Root: event by value
105//! fn decode(raw: RawMsg) -> DecodedMsg { .. }
106//!
107//! // Regular: input by reference
108//! fn update_ob(msg: &DecodedMsg) { .. }
109//! fn check_risk(config: Res<Config>, msg: &DecodedMsg) -> RiskResult { .. }
110//! ```
111//!
112//! # Examples
113//!
114//! ```
115//! use nexus_rt::{WorldBuilder, ResMut, Handler, Resource};
116//! use nexus_rt::dag::DagBuilder;
117//!
118//! #[derive(Resource)]
119//! struct Accum(u64);
120//!
121//! let mut wb = WorldBuilder::new();
122//! wb.register(Accum(0));
123//! let mut world = wb.build();
124//! let reg = world.registry();
125//!
126//! fn double(x: u32) -> u64 { x as u64 * 2 }
127//! fn store(mut out: ResMut<Accum>, val: &u64) { out.0 = *val; }
128//!
129//! let mut dag = DagBuilder::<u32>::new()
130//!     .root(double, reg)
131//!     .then(store, reg)
132//!     .build();
133//!
134//! dag.run(&mut world, 5u32);
135//! assert_eq!(world.resource::<Accum>().0, 10);
136//! ```
137//!
138//! # Returning DAGs from functions (Rust 2024)
139//!
140//! When a factory function takes `&Registry` and returns `impl Handler<E>`,
141//! Rust 2024 captures the registry borrow in the return type by default.
142//! Use `+ use<...>` to exclude it:
143//!
144//! ```ignore
145//! fn on_tick<C: Config>(
146//!     reg: &Registry,
147//! ) -> impl Handler<Tick> + use<C> {
148//!     DagBuilder::<Tick>::new()
149//!         .root(split::<C>, reg)
150//!         .fork()
151//!         // ...
152//!         .build()
153//! }
154//! ```
155//!
156//! List every type parameter the DAG captures; omit the `&Registry`
157//! lifetime — it's consumed during `.build()`. See the
158//! [crate-level docs](crate#returning-impl-handler-from-functions-rust-2024)
159//! for the full explanation.
160
161use std::marker::PhantomData;
162
163use crate::Handler;
164use crate::pipeline::{
165    AndBoolNode, ChainCall, ClonedNode, ClonedOptionNode, ClonedResultNode, DagAndThenOptionNode,
166    DagAndThenResultNode, DagCatchNode, DagMapOptionNode, DagMapResultNode, DagRouteNode,
167    DagThenNode, DedupNode, DiscardOptionNode, DispatchNode, FilterNode, GuardNode, IdentityNode,
168    InspectErrNode, InspectOptionNode, InspectResultNode, IntoProducer, IntoRefScanStep,
169    IntoRefStep, IntoStep, MapErrNode, NotNode, OkOrElseNode, OkOrNode, OkResultNode, OnNoneNode,
170    OrBoolNode, OrElseNode, RefScanNode, StepCall, TapNode, TeeNode, ThenNode,
171    UnwrapOrElseOptionNode, UnwrapOrElseResultNode, UnwrapOrOptionNode, UnwrapOrResultNode,
172    XorBoolNode,
173};
174use crate::world::{Registry, World};
175
176// =============================================================================
177// MergeStepCall / IntoMergeStep — merge step dispatch
178// =============================================================================
179
180/// Callable trait for resolved merge steps.
181///
182/// Like [`StepCall`] but for merge steps with multiple reference inputs
183/// bundled as `Inputs` (e.g. `(&'a A, &'a B)`).
184#[doc(hidden)]
185pub trait MergeStepCall<Inputs, Out> {
186    /// Call this merge step with a world reference and input references.
187    fn call(&mut self, world: &mut World, inputs: Inputs) -> Out;
188}
189
190/// Converts a named function into a resolved merge step.
191///
192/// Params first, then N reference inputs, returns output:
193///
194/// ```ignore
195/// fn check(config: Res<Config>, ob: &ObResult, risk: &RiskResult) -> Decision { .. }
196/// ```
197#[doc(hidden)]
198#[diagnostic::on_unimplemented(
199    message = "this function cannot be used as a merge step",
200    note = "merge steps take reference tuple inputs from the fork arms",
201    note = "closures with resource parameters are not supported — use a named `fn`"
202)]
203pub trait IntoMergeStep<Inputs, Out, Params> {
204    /// The concrete resolved merge step type.
205    type Step: MergeStepCall<Inputs, Out>;
206
207    /// Resolve Param state from the registry and produce a merge step.
208    fn into_merge_step(self, registry: &Registry) -> Self::Step;
209}
210
211/// Internal: pre-resolved merge step with cached Param state.
212#[doc(hidden)]
213pub struct MergeStep<F, Params: crate::handler::Param> {
214    f: F,
215    state: Params::State,
216    #[allow(dead_code)]
217    name: &'static str,
218}
219
220// -- Merge arity 2 -----------------------------------------------------------
221
222// Param arity 0: closures work.
223impl<A, B, Out, F> MergeStepCall<(&A, &B), Out> for MergeStep<F, ()>
224where
225    F: FnMut(&A, &B) -> Out + 'static,
226{
227    #[inline(always)]
228    fn call(&mut self, _world: &mut World, inputs: (&A, &B)) -> Out {
229        (self.f)(inputs.0, inputs.1)
230    }
231}
232
233impl<A, B, Out, F> IntoMergeStep<(&A, &B), Out, ()> for F
234where
235    F: FnMut(&A, &B) -> Out + 'static,
236{
237    type Step = MergeStep<F, ()>;
238
239    fn into_merge_step(self, registry: &Registry) -> Self::Step {
240        MergeStep {
241            f: self,
242            state: <() as crate::handler::Param>::init(registry),
243            name: std::any::type_name::<F>(),
244        }
245    }
246}
247
248// Param arities 1-8 for merge arity 2.
249macro_rules! impl_merge2_step {
250    ($($P:ident),+) => {
251        impl<A, B, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
252            MergeStepCall<(&A, &B), Out> for MergeStep<F, ($($P,)+)>
253        where
254            for<'a> &'a mut F:
255                FnMut($($P,)+ &A, &B) -> Out +
256                FnMut($($P::Item<'a>,)+ &A, &B) -> Out,
257        {
258            #[inline(always)]
259            #[allow(non_snake_case)]
260            fn call(&mut self, world: &mut World, inputs: (&A, &B)) -> Out {
261                #[allow(clippy::too_many_arguments)]
262                fn call_inner<$($P,)+ IA, IB, Output>(
263                    mut f: impl FnMut($($P,)+ &IA, &IB) -> Output,
264                    $($P: $P,)+
265                    a: &IA, b: &IB,
266                ) -> Output {
267                    f($($P,)+ a, b)
268                }
269                // SAFETY: state was produced by Param::init() on the same Registry
270                // that built this World. Borrows are disjoint — enforced by
271                // conflict detection at build time.
272                #[cfg(debug_assertions)]
273                world.clear_borrows();
274                let ($($P,)+) = unsafe {
275                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
276                };
277                call_inner(&mut self.f, $($P,)+ inputs.0, inputs.1)
278            }
279        }
280
281        impl<A, B, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
282            IntoMergeStep<(&A, &B), Out, ($($P,)+)> for F
283        where
284            for<'a> &'a mut F:
285                FnMut($($P,)+ &A, &B) -> Out +
286                FnMut($($P::Item<'a>,)+ &A, &B) -> Out,
287        {
288            type Step = MergeStep<F, ($($P,)+)>;
289
290            fn into_merge_step(self, registry: &Registry) -> Self::Step {
291                let state = <($($P,)+) as crate::handler::Param>::init(registry);
292                {
293                    #[allow(non_snake_case)]
294                    let ($($P,)+) = &state;
295                    registry.check_access(&[
296                        $((<$P as crate::handler::Param>::resource_id($P),
297                           std::any::type_name::<$P>()),)+
298                    ]);
299                }
300                MergeStep { f: self, state, name: std::any::type_name::<F>() }
301            }
302        }
303    };
304}
305
306// -- Merge arity 3 -----------------------------------------------------------
307
308impl<A, B, C, Out, F> MergeStepCall<(&A, &B, &C), Out> for MergeStep<F, ()>
309where
310    F: FnMut(&A, &B, &C) -> Out + 'static,
311{
312    #[inline(always)]
313    fn call(&mut self, _world: &mut World, inputs: (&A, &B, &C)) -> Out {
314        (self.f)(inputs.0, inputs.1, inputs.2)
315    }
316}
317
318impl<A, B, C, Out, F> IntoMergeStep<(&A, &B, &C), Out, ()> for F
319where
320    F: FnMut(&A, &B, &C) -> Out + 'static,
321{
322    type Step = MergeStep<F, ()>;
323
324    fn into_merge_step(self, registry: &Registry) -> Self::Step {
325        MergeStep {
326            f: self,
327            state: <() as crate::handler::Param>::init(registry),
328            name: std::any::type_name::<F>(),
329        }
330    }
331}
332
333macro_rules! impl_merge3_step {
334    ($($P:ident),+) => {
335        impl<A, B, C, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
336            MergeStepCall<(&A, &B, &C), Out> for MergeStep<F, ($($P,)+)>
337        where
338            for<'a> &'a mut F:
339                FnMut($($P,)+ &A, &B, &C) -> Out +
340                FnMut($($P::Item<'a>,)+ &A, &B, &C) -> Out,
341        {
342            #[inline(always)]
343            #[allow(non_snake_case)]
344            fn call(&mut self, world: &mut World, inputs: (&A, &B, &C)) -> Out {
345                #[allow(clippy::too_many_arguments)]
346                fn call_inner<$($P,)+ IA, IB, IC, Output>(
347                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC) -> Output,
348                    $($P: $P,)+
349                    a: &IA, b: &IB, c: &IC,
350                ) -> Output {
351                    f($($P,)+ a, b, c)
352                }
353                // SAFETY: state was produced by Param::init() on the same Registry
354                // that built this World. Borrows are disjoint — enforced by
355                // conflict detection at build time.
356                #[cfg(debug_assertions)]
357                world.clear_borrows();
358                let ($($P,)+) = unsafe {
359                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
360                };
361                call_inner(&mut self.f, $($P,)+ inputs.0, inputs.1, inputs.2)
362            }
363        }
364
365        impl<A, B, C, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
366            IntoMergeStep<(&A, &B, &C), Out, ($($P,)+)> for F
367        where
368            for<'a> &'a mut F:
369                FnMut($($P,)+ &A, &B, &C) -> Out +
370                FnMut($($P::Item<'a>,)+ &A, &B, &C) -> Out,
371        {
372            type Step = MergeStep<F, ($($P,)+)>;
373
374            fn into_merge_step(self, registry: &Registry) -> Self::Step {
375                let state = <($($P,)+) as crate::handler::Param>::init(registry);
376                {
377                    #[allow(non_snake_case)]
378                    let ($($P,)+) = &state;
379                    registry.check_access(&[
380                        $((<$P as crate::handler::Param>::resource_id($P),
381                           std::any::type_name::<$P>()),)+
382                    ]);
383                }
384                MergeStep { f: self, state, name: std::any::type_name::<F>() }
385            }
386        }
387    };
388}
389
390// -- Merge arity 4 -----------------------------------------------------------
391
392impl<A, B, C, D, Out, F> MergeStepCall<(&A, &B, &C, &D), Out> for MergeStep<F, ()>
393where
394    F: FnMut(&A, &B, &C, &D) -> Out + 'static,
395{
396    #[inline(always)]
397    fn call(&mut self, _world: &mut World, i: (&A, &B, &C, &D)) -> Out {
398        (self.f)(i.0, i.1, i.2, i.3)
399    }
400}
401
402impl<A, B, C, D, Out, F> IntoMergeStep<(&A, &B, &C, &D), Out, ()> for F
403where
404    F: FnMut(&A, &B, &C, &D) -> Out + 'static,
405{
406    type Step = MergeStep<F, ()>;
407    fn into_merge_step(self, registry: &Registry) -> Self::Step {
408        MergeStep {
409            f: self,
410            state: <() as crate::handler::Param>::init(registry),
411            name: std::any::type_name::<F>(),
412        }
413    }
414}
415
416macro_rules! impl_merge4_step {
417    ($($P:ident),+) => {
418        impl<A, B, C, D, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
419            MergeStepCall<(&A, &B, &C, &D), Out> for MergeStep<F, ($($P,)+)>
420        where for<'a> &'a mut F:
421            FnMut($($P,)+ &A, &B, &C, &D) -> Out +
422            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
423        {
424            #[inline(always)]
425            #[allow(non_snake_case)]
426            fn call(&mut self, world: &mut World, i: (&A, &B, &C, &D)) -> Out {
427                #[allow(clippy::too_many_arguments)]
428                fn call_inner<$($P,)+ IA, IB, IC, ID, Output>(
429                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC, &ID) -> Output,
430                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID,
431                ) -> Output { f($($P,)+ a, b, c, d) }
432                // SAFETY: state was produced by Param::init() on the same Registry
433                // that built this World. Borrows are disjoint — enforced by
434                // conflict detection at build time.
435                #[cfg(debug_assertions)]
436                world.clear_borrows();
437                let ($($P,)+) = unsafe {
438                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
439                };
440                call_inner(&mut self.f, $($P,)+ i.0, i.1, i.2, i.3)
441            }
442        }
443        impl<A, B, C, D, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
444            IntoMergeStep<(&A, &B, &C, &D), Out, ($($P,)+)> for F
445        where for<'a> &'a mut F:
446            FnMut($($P,)+ &A, &B, &C, &D) -> Out +
447            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
448        {
449            type Step = MergeStep<F, ($($P,)+)>;
450            fn into_merge_step(self, registry: &Registry) -> Self::Step {
451                let state = <($($P,)+) as crate::handler::Param>::init(registry);
452                { #[allow(non_snake_case)] let ($($P,)+) = &state;
453                  registry.check_access(&[$((<$P as crate::handler::Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
454                MergeStep { f: self, state, name: std::any::type_name::<F>() }
455            }
456        }
457    };
458}
459
460// -- Merge arity 5 -----------------------------------------------------------
461
462impl<A, B, C, D, E, Out, F> MergeStepCall<(&A, &B, &C, &D, &E), Out> for MergeStep<F, ()>
463where
464    F: FnMut(&A, &B, &C, &D, &E) -> Out + 'static,
465{
466    #[inline(always)]
467    fn call(&mut self, _world: &mut World, i: (&A, &B, &C, &D, &E)) -> Out {
468        (self.f)(i.0, i.1, i.2, i.3, i.4)
469    }
470}
471
472impl<A, B, C, D, E, Out, F> IntoMergeStep<(&A, &B, &C, &D, &E), Out, ()> for F
473where
474    F: FnMut(&A, &B, &C, &D, &E) -> Out + 'static,
475{
476    type Step = MergeStep<F, ()>;
477    fn into_merge_step(self, registry: &Registry) -> Self::Step {
478        MergeStep {
479            f: self,
480            state: <() as crate::handler::Param>::init(registry),
481            name: std::any::type_name::<F>(),
482        }
483    }
484}
485
486macro_rules! impl_merge5_step {
487    ($($P:ident),+) => {
488        impl<A, B, C, D, E, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
489            MergeStepCall<(&A, &B, &C, &D, &E), Out> for MergeStep<F, ($($P,)+)>
490        where for<'a> &'a mut F:
491            FnMut($($P,)+ &A, &B, &C, &D, &E) -> Out +
492            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D, &E) -> Out,
493        {
494            #[inline(always)]
495            #[allow(non_snake_case)]
496            fn call(&mut self, world: &mut World, i: (&A, &B, &C, &D, &E)) -> Out {
497                #[allow(clippy::too_many_arguments)]
498                fn call_inner<$($P,)+ IA, IB, IC, ID, IE, Output>(
499                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC, &ID, &IE) -> Output,
500                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID, e: &IE,
501                ) -> Output { f($($P,)+ a, b, c, d, e) }
502                // SAFETY: state was produced by Param::init() on the same Registry
503                // that built this World. Borrows are disjoint — enforced by
504                // conflict detection at build time.
505                #[cfg(debug_assertions)]
506                world.clear_borrows();
507                let ($($P,)+) = unsafe {
508                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
509                };
510                call_inner(&mut self.f, $($P,)+ i.0, i.1, i.2, i.3, i.4)
511            }
512        }
513        impl<A, B, C, D, E, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
514            IntoMergeStep<(&A, &B, &C, &D, &E), Out, ($($P,)+)> for F
515        where for<'a> &'a mut F:
516            FnMut($($P,)+ &A, &B, &C, &D, &E) -> Out +
517            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D, &E) -> Out,
518        {
519            type Step = MergeStep<F, ($($P,)+)>;
520            fn into_merge_step(self, registry: &Registry) -> Self::Step {
521                let state = <($($P,)+) as crate::handler::Param>::init(registry);
522                { #[allow(non_snake_case)] let ($($P,)+) = &state;
523                  registry.check_access(&[$((<$P as crate::handler::Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
524                MergeStep { f: self, state, name: std::any::type_name::<F>() }
525            }
526        }
527    };
528}
529
530all_tuples!(impl_merge2_step);
531all_tuples!(impl_merge3_step);
532all_tuples!(impl_merge4_step);
533all_tuples!(impl_merge5_step);
534
535// =============================================================================
536// DAG — monomorphized, zero vtable dispatch
537// =============================================================================
538//
539// Encodes DAG topology in the type system at compile time. After
540// monomorphization, the entire DAG is a single flat function with all
541// values as stack locals. No arena, no bitmap. Only unsafe is
542// in the shared Param::fetch path (resource access by pre-resolved index).
543//
544// Fan-out: multiple nodes borrow the same stack local (no Clone).
545// Merge: merge step borrows all arm outputs.
546// Panic safety: stack unwinding drops all locals automatically.
547
548/// Entry point for building a DAG pipeline.
549///
550/// The DAG encodes topology in the type system at compile time,
551/// producing a single monomorphized chain of named node types. All values live as
552/// stack locals in the `run()` body — no arena, no vtable dispatch.
553/// The only `unsafe` is in the shared [`Param::fetch`](crate::Param)
554/// path (resource access by pre-resolved index).
555///
556/// # Examples
557///
558/// ```
559/// use nexus_rt::{WorldBuilder, ResMut, Handler, Resource};
560/// use nexus_rt::dag::DagBuilder;
561///
562/// #[derive(Resource)]
563/// struct Accum(u64);
564///
565/// let mut wb = WorldBuilder::new();
566/// wb.register(Accum(0));
567/// let mut world = wb.build();
568/// let reg = world.registry();
569///
570/// fn double(x: u32) -> u64 { x as u64 * 2 }
571/// fn store(mut out: ResMut<Accum>, val: &u64) { out.0 = *val; }
572///
573/// let mut dag = DagBuilder::<u32>::new()
574///     .root(double, reg)
575///     .then(store, reg)
576///     .build();
577///
578/// dag.run(&mut world, 5u32);
579/// assert_eq!(world.resource::<Accum>().0, 10);
580/// ```
581#[must_use = "a DAG builder does nothing unless you chain steps and call .build()"]
582pub struct DagBuilder<E>(PhantomData<fn(E)>);
583
584impl<E> DagBuilder<E> {
585    /// Create a new typed DAG entry point.
586    pub fn new() -> Self {
587        Self(PhantomData)
588    }
589
590    /// Set the root step. Takes the event `E` by value, produces `Out`.
591    pub fn root<Out, Params, S>(
592        self,
593        f: S,
594        registry: &Registry,
595    ) -> DagChain<E, Out, ThenNode<IdentityNode, S::Step>>
596    where
597        Out: 'static,
598        S: IntoStep<E, Out, Params>,
599    {
600        DagChain {
601            chain: ThenNode {
602                prev: IdentityNode,
603                step: f.into_step(registry),
604            },
605            _marker: PhantomData,
606        }
607    }
608}
609
610impl<E> Default for DagBuilder<E> {
611    fn default() -> Self {
612        Self::new()
613    }
614}
615
616/// Main chain builder for a typed DAG.
617///
618/// `Chain` implements [`ChainCall<E, Out = Out>`] — a named node type
619/// representing all steps composed so far. No closures, no `use<>`.
620#[must_use = "DAG chain does nothing until .build() is called"]
621pub struct DagChain<E, Out, Chain> {
622    pub(crate) chain: Chain,
623    pub(crate) _marker: PhantomData<fn(E) -> Out>,
624}
625
626impl<E, Out: 'static, Chain> DagChain<E, Out, Chain> {
627    /// Enter fork mode. Subsequent `.arm()` calls add parallel branches.
628    pub fn fork(self) -> DagChainFork<E, Out, Chain, ()> {
629        DagChainFork {
630            chain: self.chain,
631            arms: (),
632            _marker: PhantomData,
633        }
634    }
635}
636
637impl<E, Chain> DagChain<E, (), Chain>
638where
639    Chain: ChainCall<E, Out = ()> + Send,
640{
641    /// Finalize into a [`Dag`] that implements [`Handler<E>`].
642    ///
643    /// Only available when the chain ends with `()` or `Option<()>`.
644    /// If your DAG produces a value, add a final `.then()` that consumes
645    /// the output.
646    #[must_use = "building a DAG without storing it does nothing"]
647    pub fn build(self) -> Dag<Chain> {
648        Dag { chain: self.chain }
649    }
650}
651
652impl<E, Chain> DagChain<E, Option<()>, Chain>
653where
654    Chain: ChainCall<E, Out = Option<()>> + Send,
655{
656    /// Finalize into a [`Dag`], discarding the `Option<()>`.
657    ///
658    /// DAGs ending with `Option<()>` produce the same [`Dag`] as those
659    /// ending with `()`.
660    #[must_use = "building a DAG without storing it does nothing"]
661    pub fn build(self) -> Dag<DiscardOptionNode<Chain>> {
662        Dag {
663            chain: DiscardOptionNode { prev: self.chain },
664        }
665    }
666}
667
668/// Arm builder seed. Used in `.arm()` closures and to build arms for
669/// [`.route()`](DagChain::route).
670///
671/// Call `.then()` to add the first step in this arm.
672pub struct DagArmSeed<In>(PhantomData<fn(*const In)>);
673
674impl<In: 'static> DagArmSeed<In> {
675    /// Create a new arm builder seed.
676    ///
677    /// Used to build arms passed to [`DagChain::route`] or
678    /// [`DagArm::route`]:
679    ///
680    /// ```ignore
681    /// let fast = DagArmSeed::new().then(fast_path, &reg);
682    /// let slow = DagArmSeed::new().then(slow_path, &reg);
683    /// dag.route(predicate, &reg, fast, slow)
684    /// ```
685    pub fn new() -> Self {
686        Self(PhantomData)
687    }
688}
689
690impl<In: 'static> Default for DagArmSeed<In> {
691    fn default() -> Self {
692        Self::new()
693    }
694}
695
696impl<In: 'static> DagArmSeed<In> {
697    /// Add the first step in this arm. Takes `&In` by reference.
698    pub fn then<Out, Params, S>(
699        self,
700        f: S,
701        registry: &Registry,
702    ) -> DagArm<In, Out, ThenNode<IdentityNode, S::Step>>
703    where
704        Out: 'static,
705        S: IntoStep<&'static In, Out, Params>,
706        S::Step: for<'a> StepCall<&'a In, Out = Out>,
707    {
708        DagArm {
709            chain: ThenNode {
710                prev: IdentityNode,
711                step: f.into_step(registry),
712            },
713            _marker: PhantomData,
714        }
715    }
716}
717
718/// Built arm in a typed DAG fork.
719///
720/// `Chain` implements [`ChainCall<&In, Out = Out>`] — a named node type
721/// for this arm's steps.
722pub struct DagArm<In, Out, Chain> {
723    pub(crate) chain: Chain,
724    pub(crate) _marker: PhantomData<fn(*const In) -> Out>,
725}
726
727impl<In: 'static, Out: 'static, Chain> DagArm<In, Out, Chain> {
728    /// Enter fork mode within this arm.
729    pub fn fork(self) -> DagArmFork<In, Out, Chain, ()> {
730        DagArmFork {
731            chain: self.chain,
732            arms: (),
733            _marker: PhantomData,
734        }
735    }
736}
737
738/// Fork builder on the main chain. Accumulates arms as a tuple.
739pub struct DagChainFork<E, ForkOut, Chain, Arms> {
740    chain: Chain,
741    arms: Arms,
742    _marker: PhantomData<fn(E) -> ForkOut>,
743}
744
745/// Fork builder within an arm. Accumulates sub-arms as a tuple.
746pub struct DagArmFork<In, ForkOut, Chain, Arms> {
747    chain: Chain,
748    arms: Arms,
749    _marker: PhantomData<fn(*const In) -> ForkOut>,
750}
751
752/// Final built DAG. Implements [`Handler<E>`].
753///
754/// Created by [`DagChain::build`]. The entire DAG is monomorphized
755/// at compile time — no boxing, no virtual dispatch, no arena.
756/// Supports `for<'a> Handler<&'a T>` for zero-copy event dispatch.
757/// For batch processing, see [`BatchDag`].
758pub struct Dag<Chain> {
759    chain: Chain,
760}
761
762impl<E, Chain> Handler<E> for Dag<Chain>
763where
764    Chain: ChainCall<E, Out = ()> + Send,
765{
766    fn run(&mut self, world: &mut World, event: E) {
767        self.chain.call(world, event);
768    }
769
770    fn name(&self) -> &'static str {
771        "dag::Dag"
772    }
773}
774
775// =============================================================================
776// Fork arity macro — arm accumulation, merge, join
777// =============================================================================
778
779// =============================================================================
780// Combinator macro — shared between DagChain and DagArm
781// =============================================================================
782
783/// Generates step combinators, Option/Result helpers, and clone helpers.
784///
785/// DagChain and DagArm use the same named node types — `In` appears only
786/// on the `ChainCall<In>` trait impl, not on the struct. No closures, no
787/// `use<>` captures.
788macro_rules! impl_dag_combinators {
789    (builder: $Builder:ident, upstream: $U:ident) => {
790        // =============================================================
791        // Core — any Out
792        // =============================================================
793
794        impl<$U, Out: 'static, Chain> $Builder<$U, Out, Chain> {
795            /// Append a step. The step receives `&Out` by reference.
796            pub fn then<NewOut, Params, S>(
797                self,
798                f: S,
799                registry: &Registry,
800            ) -> $Builder<$U, NewOut, DagThenNode<Chain, S::Step, NewOut>>
801            where
802                NewOut: 'static,
803                S: IntoStep<&'static Out, NewOut, Params>,
804                S::Step: for<'a> StepCall<&'a Out, Out = NewOut>,
805            {
806                $Builder {
807                    chain: DagThenNode {
808                        prev: self.chain,
809                        step: f.into_step(registry),
810                        _out: PhantomData,
811                    },
812                    _marker: PhantomData,
813                }
814            }
815
816            /// Dispatch output to a [`Handler<Out>`].
817            pub fn dispatch<H: Handler<Out>>(
818                self,
819                handler: H,
820            ) -> $Builder<$U, (), DispatchNode<Chain, H>> {
821                $Builder {
822                    chain: DispatchNode {
823                        prev: self.chain,
824                        handler,
825                    },
826                    _marker: PhantomData,
827                }
828            }
829
830            /// Conditionally wrap the output in `Option`.
831            pub fn guard<Params, S: IntoRefStep<Out, bool, Params>>(
832                self,
833                f: S,
834                registry: &Registry,
835            ) -> $Builder<$U, Option<Out>, GuardNode<Chain, S::Step>> {
836                $Builder {
837                    chain: GuardNode {
838                        prev: self.chain,
839                        step: f.into_ref_step(registry),
840                    },
841                    _marker: PhantomData,
842                }
843            }
844
845            /// Open a view scope. Steps inside operate on a read-only
846            /// view constructed from the event. Close with `.end_view()`.
847            pub fn view<V: crate::view::View<Out>>(
848                self,
849            ) -> crate::view::ViewScope<$U, Out, V, Chain, ()> {
850                crate::view::ViewScope::new(self.chain)
851            }
852
853            /// Observe the current value without consuming or changing it.
854            pub fn tap<Params, S: IntoRefStep<Out, (), Params>>(
855                self,
856                f: S,
857                registry: &Registry,
858            ) -> $Builder<$U, Out, TapNode<Chain, S::Step>> {
859                $Builder {
860                    chain: TapNode {
861                        prev: self.chain,
862                        step: f.into_ref_step(registry),
863                    },
864                    _marker: PhantomData,
865                }
866            }
867
868            /// Binary conditional routing. Both arms borrow `&Out`.
869            pub fn route<NewOut, C0, C1, Params, Pred: IntoRefStep<Out, bool, Params>>(
870                self,
871                pred: Pred,
872                registry: &Registry,
873                on_true: DagArm<Out, NewOut, C0>,
874                on_false: DagArm<Out, NewOut, C1>,
875            ) -> $Builder<$U, NewOut, DagRouteNode<Chain, Pred::Step, C0, C1, NewOut>>
876            where
877                C0: for<'a> ChainCall<&'a Out, Out = NewOut>,
878                C1: for<'a> ChainCall<&'a Out, Out = NewOut>,
879            {
880                $Builder {
881                    chain: DagRouteNode {
882                        prev: self.chain,
883                        pred: pred.into_ref_step(registry),
884                        on_true: on_true.chain,
885                        on_false: on_false.chain,
886                        _out: PhantomData,
887                    },
888                    _marker: PhantomData,
889                }
890            }
891
892            /// Fork off a multi-step side-effect chain.
893            pub fn tee<C>(self, side: DagArm<Out, (), C>) -> $Builder<$U, Out, TeeNode<Chain, C>>
894            where
895                C: for<'a> ChainCall<&'a Out, Out = ()>,
896            {
897                $Builder {
898                    chain: TeeNode {
899                        prev: self.chain,
900                        side: side.chain,
901                    },
902                    _marker: PhantomData,
903                }
904            }
905
906            /// Scan with persistent accumulator. The step receives
907            /// `&mut Acc` and `&Out` by reference.
908            pub fn scan<Acc, NewOut, Params, S>(
909                self,
910                initial: Acc,
911                f: S,
912                registry: &Registry,
913            ) -> $Builder<$U, NewOut, RefScanNode<Chain, S::Step, Acc>>
914            where
915                Acc: 'static,
916                NewOut: 'static,
917                S: IntoRefScanStep<Acc, Out, NewOut, Params>,
918            {
919                $Builder {
920                    chain: RefScanNode {
921                        prev: self.chain,
922                        step: f.into_ref_scan_step(registry),
923                        acc: initial,
924                    },
925                    _marker: PhantomData,
926                }
927            }
928        }
929
930        // =============================================================
931        // Dedup — suppress unchanged values
932        // =============================================================
933
934        impl<$U, Out: PartialEq + Clone + 'static, Chain> $Builder<$U, Out, Chain> {
935            /// Suppress consecutive unchanged values.
936            pub fn dedup(self) -> $Builder<$U, Option<Out>, DedupNode<Chain, Out>> {
937                $Builder {
938                    chain: DedupNode {
939                        prev: self.chain,
940                        last: None,
941                    },
942                    _marker: PhantomData,
943                }
944            }
945        }
946
947        // =============================================================
948        // Bool combinators
949        // =============================================================
950
951        impl<$U, Chain> $Builder<$U, bool, Chain> {
952            /// Invert a boolean value.
953            #[allow(clippy::should_implement_trait)]
954            pub fn not(self) -> $Builder<$U, bool, NotNode<Chain>> {
955                $Builder {
956                    chain: NotNode { prev: self.chain },
957                    _marker: PhantomData,
958                }
959            }
960
961            /// Short-circuit AND with a second boolean.
962            pub fn and<Params, S: IntoProducer<bool, Params>>(
963                self,
964                f: S,
965                registry: &Registry,
966            ) -> $Builder<$U, bool, AndBoolNode<Chain, S::Step>> {
967                $Builder {
968                    chain: AndBoolNode {
969                        prev: self.chain,
970                        producer: f.into_producer(registry),
971                    },
972                    _marker: PhantomData,
973                }
974            }
975
976            /// Short-circuit OR with a second boolean.
977            pub fn or<Params, S: IntoProducer<bool, Params>>(
978                self,
979                f: S,
980                registry: &Registry,
981            ) -> $Builder<$U, bool, OrBoolNode<Chain, S::Step>> {
982                $Builder {
983                    chain: OrBoolNode {
984                        prev: self.chain,
985                        producer: f.into_producer(registry),
986                    },
987                    _marker: PhantomData,
988                }
989            }
990
991            /// XOR with a second boolean.
992            pub fn xor<Params, S: IntoProducer<bool, Params>>(
993                self,
994                f: S,
995                registry: &Registry,
996            ) -> $Builder<$U, bool, XorBoolNode<Chain, S::Step>> {
997                $Builder {
998                    chain: XorBoolNode {
999                        prev: self.chain,
1000                        producer: f.into_producer(registry),
1001                    },
1002                    _marker: PhantomData,
1003                }
1004            }
1005        }
1006
1007        // =============================================================
1008        // Clone helpers — &T → T transitions
1009        // =============================================================
1010
1011        impl<'a, $U, T: Clone, Chain> $Builder<$U, &'a T, Chain> {
1012            /// Clone a borrowed output to produce an owned value.
1013            pub fn cloned(self) -> $Builder<$U, T, ClonedNode<Chain>> {
1014                $Builder {
1015                    chain: ClonedNode { prev: self.chain },
1016                    _marker: PhantomData,
1017                }
1018            }
1019        }
1020
1021        impl<'a, $U, T: Clone, Chain> $Builder<$U, Option<&'a T>, Chain> {
1022            /// Clone inner borrowed value. `Option<&T>` → `Option<T>`.
1023            pub fn cloned(self) -> $Builder<$U, Option<T>, ClonedOptionNode<Chain>> {
1024                $Builder {
1025                    chain: ClonedOptionNode { prev: self.chain },
1026                    _marker: PhantomData,
1027                }
1028            }
1029        }
1030
1031        impl<'a, $U, T: Clone, Err, Chain> $Builder<$U, Result<&'a T, Err>, Chain> {
1032            /// Clone inner borrowed Ok value.
1033            pub fn cloned(self) -> $Builder<$U, Result<T, Err>, ClonedResultNode<Chain>> {
1034                $Builder {
1035                    chain: ClonedResultNode { prev: self.chain },
1036                    _marker: PhantomData,
1037                }
1038            }
1039        }
1040
1041        // =============================================================
1042        // Option helpers
1043        // =============================================================
1044
1045        impl<$U, T: 'static, Chain> $Builder<$U, Option<T>, Chain> {
1046            /// Transform the inner value. Step not called on None.
1047            pub fn map<U, Params, S: IntoStep<&'static T, U, Params>>(
1048                self,
1049                f: S,
1050                registry: &Registry,
1051            ) -> $Builder<$U, Option<U>, DagMapOptionNode<Chain, S::Step, U>>
1052            where
1053                U: 'static,
1054                S::Step: for<'x> StepCall<&'x T, Out = U>,
1055            {
1056                $Builder {
1057                    chain: DagMapOptionNode {
1058                        prev: self.chain,
1059                        step: f.into_step(registry),
1060                        _out: PhantomData,
1061                    },
1062                    _marker: PhantomData,
1063                }
1064            }
1065
1066            /// Short-circuits on None. std: `Option::and_then`
1067            pub fn and_then<U, Params, S: IntoStep<&'static T, Option<U>, Params>>(
1068                self,
1069                f: S,
1070                registry: &Registry,
1071            ) -> $Builder<$U, Option<U>, DagAndThenOptionNode<Chain, S::Step, U>>
1072            where
1073                U: 'static,
1074                S::Step: for<'x> StepCall<&'x T, Out = Option<U>>,
1075            {
1076                $Builder {
1077                    chain: DagAndThenOptionNode {
1078                        prev: self.chain,
1079                        step: f.into_step(registry),
1080                        _out: PhantomData,
1081                    },
1082                    _marker: PhantomData,
1083                }
1084            }
1085
1086            /// Side effect on None.
1087            pub fn on_none<Params, S: IntoProducer<(), Params>>(
1088                self,
1089                f: S,
1090                registry: &Registry,
1091            ) -> $Builder<$U, Option<T>, OnNoneNode<Chain, S::Step>> {
1092                $Builder {
1093                    chain: OnNoneNode {
1094                        prev: self.chain,
1095                        producer: f.into_producer(registry),
1096                    },
1097                    _marker: PhantomData,
1098                }
1099            }
1100
1101            /// Keep value if predicate holds. std: `Option::filter`
1102            pub fn filter<Params, S: IntoRefStep<T, bool, Params>>(
1103                self,
1104                f: S,
1105                registry: &Registry,
1106            ) -> $Builder<$U, Option<T>, FilterNode<Chain, S::Step>> {
1107                $Builder {
1108                    chain: FilterNode {
1109                        prev: self.chain,
1110                        step: f.into_ref_step(registry),
1111                    },
1112                    _marker: PhantomData,
1113                }
1114            }
1115
1116            /// Side effect on Some value. std: `Option::inspect`
1117            pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
1118                self,
1119                f: S,
1120                registry: &Registry,
1121            ) -> $Builder<$U, Option<T>, InspectOptionNode<Chain, S::Step>> {
1122                $Builder {
1123                    chain: InspectOptionNode {
1124                        prev: self.chain,
1125                        step: f.into_ref_step(registry),
1126                    },
1127                    _marker: PhantomData,
1128                }
1129            }
1130
1131            /// None becomes Err(err). std: `Option::ok_or`
1132            pub fn ok_or<Err: Clone>(
1133                self,
1134                err: Err,
1135            ) -> $Builder<$U, Result<T, Err>, OkOrNode<Chain, Err>> {
1136                $Builder {
1137                    chain: OkOrNode {
1138                        prev: self.chain,
1139                        err,
1140                    },
1141                    _marker: PhantomData,
1142                }
1143            }
1144
1145            /// None becomes Err(f()). std: `Option::ok_or_else`
1146            pub fn ok_or_else<Err, Params, S: IntoProducer<Err, Params>>(
1147                self,
1148                f: S,
1149                registry: &Registry,
1150            ) -> $Builder<$U, Result<T, Err>, OkOrElseNode<Chain, S::Step>> {
1151                $Builder {
1152                    chain: OkOrElseNode {
1153                        prev: self.chain,
1154                        producer: f.into_producer(registry),
1155                    },
1156                    _marker: PhantomData,
1157                }
1158            }
1159
1160            /// Exit Option — None becomes the default value.
1161            pub fn unwrap_or(self, default: T) -> $Builder<$U, T, UnwrapOrOptionNode<Chain, T>>
1162            where
1163                T: Clone,
1164            {
1165                $Builder {
1166                    chain: UnwrapOrOptionNode {
1167                        prev: self.chain,
1168                        default,
1169                    },
1170                    _marker: PhantomData,
1171                }
1172            }
1173
1174            /// Exit Option — None becomes `f()`.
1175            pub fn unwrap_or_else<Params, S: IntoProducer<T, Params>>(
1176                self,
1177                f: S,
1178                registry: &Registry,
1179            ) -> $Builder<$U, T, UnwrapOrElseOptionNode<Chain, S::Step>> {
1180                $Builder {
1181                    chain: UnwrapOrElseOptionNode {
1182                        prev: self.chain,
1183                        producer: f.into_producer(registry),
1184                    },
1185                    _marker: PhantomData,
1186                }
1187            }
1188        }
1189
1190        // =============================================================
1191        // Result helpers
1192        // =============================================================
1193
1194        impl<$U, T: 'static, Err: 'static, Chain> $Builder<$U, Result<T, Err>, Chain> {
1195            /// Transform the Ok value. Step not called on Err.
1196            pub fn map<U, Params, S: IntoStep<&'static T, U, Params>>(
1197                self,
1198                f: S,
1199                registry: &Registry,
1200            ) -> $Builder<$U, Result<U, Err>, DagMapResultNode<Chain, S::Step, U>>
1201            where
1202                U: 'static,
1203                S::Step: for<'x> StepCall<&'x T, Out = U>,
1204            {
1205                $Builder {
1206                    chain: DagMapResultNode {
1207                        prev: self.chain,
1208                        step: f.into_step(registry),
1209                        _out: PhantomData,
1210                    },
1211                    _marker: PhantomData,
1212                }
1213            }
1214
1215            /// Short-circuits on Err. std: `Result::and_then`
1216            pub fn and_then<U, Params, S: IntoStep<&'static T, Result<U, Err>, Params>>(
1217                self,
1218                f: S,
1219                registry: &Registry,
1220            ) -> $Builder<$U, Result<U, Err>, DagAndThenResultNode<Chain, S::Step, U>>
1221            where
1222                U: 'static,
1223                S::Step: for<'x> StepCall<&'x T, Out = Result<U, Err>>,
1224            {
1225                $Builder {
1226                    chain: DagAndThenResultNode {
1227                        prev: self.chain,
1228                        step: f.into_step(registry),
1229                        _out: PhantomData,
1230                    },
1231                    _marker: PhantomData,
1232                }
1233            }
1234
1235            /// Handle error and transition to Option.
1236            pub fn catch<Params, S: IntoStep<&'static Err, (), Params>>(
1237                self,
1238                f: S,
1239                registry: &Registry,
1240            ) -> $Builder<$U, Option<T>, DagCatchNode<Chain, S::Step>>
1241            where
1242                S::Step: for<'x> StepCall<&'x Err, Out = ()>,
1243            {
1244                $Builder {
1245                    chain: DagCatchNode {
1246                        prev: self.chain,
1247                        step: f.into_step(registry),
1248                    },
1249                    _marker: PhantomData,
1250                }
1251            }
1252
1253            /// Transform the error. std: `Result::map_err`
1254            pub fn map_err<Err2, Params, S: IntoStep<Err, Err2, Params>>(
1255                self,
1256                f: S,
1257                registry: &Registry,
1258            ) -> $Builder<$U, Result<T, Err2>, MapErrNode<Chain, S::Step>> {
1259                $Builder {
1260                    chain: MapErrNode {
1261                        prev: self.chain,
1262                        step: f.into_step(registry),
1263                    },
1264                    _marker: PhantomData,
1265                }
1266            }
1267
1268            /// Recover from Err. std: `Result::or_else`
1269            pub fn or_else<Err2, Params, S: IntoStep<Err, Result<T, Err2>, Params>>(
1270                self,
1271                f: S,
1272                registry: &Registry,
1273            ) -> $Builder<$U, Result<T, Err2>, OrElseNode<Chain, S::Step>> {
1274                $Builder {
1275                    chain: OrElseNode {
1276                        prev: self.chain,
1277                        step: f.into_step(registry),
1278                    },
1279                    _marker: PhantomData,
1280                }
1281            }
1282
1283            /// Side effect on Ok. std: `Result::inspect`
1284            pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
1285                self,
1286                f: S,
1287                registry: &Registry,
1288            ) -> $Builder<$U, Result<T, Err>, InspectResultNode<Chain, S::Step>> {
1289                $Builder {
1290                    chain: InspectResultNode {
1291                        prev: self.chain,
1292                        step: f.into_ref_step(registry),
1293                    },
1294                    _marker: PhantomData,
1295                }
1296            }
1297
1298            /// Side effect on Err. std: `Result::inspect_err`
1299            pub fn inspect_err<Params, S: IntoRefStep<Err, (), Params>>(
1300                self,
1301                f: S,
1302                registry: &Registry,
1303            ) -> $Builder<$U, Result<T, Err>, InspectErrNode<Chain, S::Step>> {
1304                $Builder {
1305                    chain: InspectErrNode {
1306                        prev: self.chain,
1307                        step: f.into_ref_step(registry),
1308                    },
1309                    _marker: PhantomData,
1310                }
1311            }
1312
1313            /// Discard error, enter Option land. std: `Result::ok`
1314            pub fn ok(self) -> $Builder<$U, Option<T>, OkResultNode<Chain>> {
1315                $Builder {
1316                    chain: OkResultNode { prev: self.chain },
1317                    _marker: PhantomData,
1318                }
1319            }
1320
1321            /// Exit Result — Err becomes the default value.
1322            pub fn unwrap_or(self, default: T) -> $Builder<$U, T, UnwrapOrResultNode<Chain, T>>
1323            where
1324                T: Clone,
1325            {
1326                $Builder {
1327                    chain: UnwrapOrResultNode {
1328                        prev: self.chain,
1329                        default,
1330                    },
1331                    _marker: PhantomData,
1332                }
1333            }
1334
1335            /// Exit Result — Err becomes `f(err)`.
1336            pub fn unwrap_or_else<Params, S: IntoStep<Err, T, Params>>(
1337                self,
1338                f: S,
1339                registry: &Registry,
1340            ) -> $Builder<$U, T, UnwrapOrElseResultNode<Chain, S::Step>> {
1341                $Builder {
1342                    chain: UnwrapOrElseResultNode {
1343                        prev: self.chain,
1344                        step: f.into_step(registry),
1345                    },
1346                    _marker: PhantomData,
1347                }
1348            }
1349        }
1350    };
1351}
1352
1353impl_dag_combinators!(builder: DagChain, upstream: E);
1354impl_dag_combinators!(builder: DagArm, upstream: In);
1355
1356// =============================================================================
1357// Merge / Join named nodes — fork terminal nodes
1358// =============================================================================
1359
1360/// Merge two fork arms into a single output via [`MergeStepCall`].
1361#[doc(hidden)]
1362pub struct MergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut> {
1363    pub(crate) chain: Chain,
1364    pub(crate) arm0: C0,
1365    pub(crate) arm1: C1,
1366    pub(crate) merge: MS,
1367    pub(crate) _marker: PhantomData<fn(ForkOut) -> (A0, A1, MOut)>,
1368}
1369
1370impl<In, Chain, C0, C1, MS, ForkOut, A0, A1, MOut> ChainCall<In>
1371    for MergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut>
1372where
1373    ForkOut: 'static,
1374    A0: 'static,
1375    A1: 'static,
1376    Chain: ChainCall<In, Out = ForkOut>,
1377    C0: for<'a> ChainCall<&'a ForkOut, Out = A0>,
1378    C1: for<'a> ChainCall<&'a ForkOut, Out = A1>,
1379    MS: for<'x> MergeStepCall<(&'x A0, &'x A1), MOut>,
1380{
1381    type Out = MOut;
1382
1383    #[inline(always)]
1384    fn call(&mut self, world: &mut World, input: In) -> MOut {
1385        let fork_out = self.chain.call(world, input);
1386        let o0 = self.arm0.call(world, &fork_out);
1387        let o1 = self.arm1.call(world, &fork_out);
1388        self.merge.call(world, (&o0, &o1))
1389    }
1390}
1391
1392/// Merge three fork arms into a single output via [`MergeStepCall`].
1393#[doc(hidden)]
1394pub struct MergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> {
1395    pub(crate) chain: Chain,
1396    pub(crate) arm0: C0,
1397    pub(crate) arm1: C1,
1398    pub(crate) arm2: C2,
1399    pub(crate) merge: MS,
1400    pub(crate) _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, MOut)>,
1401}
1402
1403impl<In, Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> ChainCall<In>
1404    for MergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut>
1405where
1406    ForkOut: 'static,
1407    A0: 'static,
1408    A1: 'static,
1409    A2: 'static,
1410    Chain: ChainCall<In, Out = ForkOut>,
1411    C0: for<'a> ChainCall<&'a ForkOut, Out = A0>,
1412    C1: for<'a> ChainCall<&'a ForkOut, Out = A1>,
1413    C2: for<'a> ChainCall<&'a ForkOut, Out = A2>,
1414    MS: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2), MOut>,
1415{
1416    type Out = MOut;
1417
1418    #[inline(always)]
1419    fn call(&mut self, world: &mut World, input: In) -> MOut {
1420        let fork_out = self.chain.call(world, input);
1421        let o0 = self.arm0.call(world, &fork_out);
1422        let o1 = self.arm1.call(world, &fork_out);
1423        let o2 = self.arm2.call(world, &fork_out);
1424        self.merge.call(world, (&o0, &o1, &o2))
1425    }
1426}
1427
1428/// Merge four fork arms into a single output via [`MergeStepCall`].
1429#[doc(hidden)]
1430pub struct MergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> {
1431    pub(crate) chain: Chain,
1432    pub(crate) arm0: C0,
1433    pub(crate) arm1: C1,
1434    pub(crate) arm2: C2,
1435    pub(crate) arm3: C3,
1436    pub(crate) merge: MS,
1437    pub(crate) _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, A3, MOut)>,
1438}
1439
1440#[allow(clippy::many_single_char_names)]
1441impl<In, Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> ChainCall<In>
1442    for MergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut>
1443where
1444    ForkOut: 'static,
1445    A0: 'static,
1446    A1: 'static,
1447    A2: 'static,
1448    A3: 'static,
1449    Chain: ChainCall<In, Out = ForkOut>,
1450    C0: for<'a> ChainCall<&'a ForkOut, Out = A0>,
1451    C1: for<'a> ChainCall<&'a ForkOut, Out = A1>,
1452    C2: for<'a> ChainCall<&'a ForkOut, Out = A2>,
1453    C3: for<'a> ChainCall<&'a ForkOut, Out = A3>,
1454    MS: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2, &'x A3), MOut>,
1455{
1456    type Out = MOut;
1457
1458    #[inline(always)]
1459    fn call(&mut self, world: &mut World, input: In) -> MOut {
1460        let fork_out = self.chain.call(world, input);
1461        let o0 = self.arm0.call(world, &fork_out);
1462        let o1 = self.arm1.call(world, &fork_out);
1463        let o2 = self.arm2.call(world, &fork_out);
1464        let o3 = self.arm3.call(world, &fork_out);
1465        self.merge.call(world, (&o0, &o1, &o2, &o3))
1466    }
1467}
1468
1469/// Join two fork arms (all producing `()`).
1470#[doc(hidden)]
1471pub struct JoinNode2<Chain, C0, C1, ForkOut> {
1472    pub(crate) chain: Chain,
1473    pub(crate) arm0: C0,
1474    pub(crate) arm1: C1,
1475    pub(crate) _marker: PhantomData<fn() -> ForkOut>,
1476}
1477
1478impl<In, Chain, C0, C1, ForkOut> ChainCall<In> for JoinNode2<Chain, C0, C1, ForkOut>
1479where
1480    ForkOut: 'static,
1481    Chain: ChainCall<In, Out = ForkOut>,
1482    C0: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1483    C1: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1484{
1485    type Out = ();
1486
1487    #[inline(always)]
1488    fn call(&mut self, world: &mut World, input: In) {
1489        let fork_out = self.chain.call(world, input);
1490        self.arm0.call(world, &fork_out);
1491        self.arm1.call(world, &fork_out);
1492    }
1493}
1494
1495/// Join three fork arms (all producing `()`).
1496#[doc(hidden)]
1497pub struct JoinNode3<Chain, C0, C1, C2, ForkOut> {
1498    pub(crate) chain: Chain,
1499    pub(crate) arm0: C0,
1500    pub(crate) arm1: C1,
1501    pub(crate) arm2: C2,
1502    pub(crate) _marker: PhantomData<fn() -> ForkOut>,
1503}
1504
1505impl<In, Chain, C0, C1, C2, ForkOut> ChainCall<In> for JoinNode3<Chain, C0, C1, C2, ForkOut>
1506where
1507    ForkOut: 'static,
1508    Chain: ChainCall<In, Out = ForkOut>,
1509    C0: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1510    C1: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1511    C2: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1512{
1513    type Out = ();
1514
1515    #[inline(always)]
1516    fn call(&mut self, world: &mut World, input: In) {
1517        let fork_out = self.chain.call(world, input);
1518        self.arm0.call(world, &fork_out);
1519        self.arm1.call(world, &fork_out);
1520        self.arm2.call(world, &fork_out);
1521    }
1522}
1523
1524/// Join four fork arms (all producing `()`).
1525#[doc(hidden)]
1526pub struct JoinNode4<Chain, C0, C1, C2, C3, ForkOut> {
1527    pub(crate) chain: Chain,
1528    pub(crate) arm0: C0,
1529    pub(crate) arm1: C1,
1530    pub(crate) arm2: C2,
1531    pub(crate) arm3: C3,
1532    pub(crate) _marker: PhantomData<fn() -> ForkOut>,
1533}
1534
1535#[allow(clippy::many_single_char_names)]
1536impl<In, Chain, C0, C1, C2, C3, ForkOut> ChainCall<In> for JoinNode4<Chain, C0, C1, C2, C3, ForkOut>
1537where
1538    ForkOut: 'static,
1539    Chain: ChainCall<In, Out = ForkOut>,
1540    C0: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1541    C1: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1542    C2: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1543    C3: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1544{
1545    type Out = ();
1546
1547    #[inline(always)]
1548    fn call(&mut self, world: &mut World, input: In) {
1549        let fork_out = self.chain.call(world, input);
1550        self.arm0.call(world, &fork_out);
1551        self.arm1.call(world, &fork_out);
1552        self.arm2.call(world, &fork_out);
1553        self.arm3.call(world, &fork_out);
1554    }
1555}
1556
1557// =============================================================================
1558// Splat — tuple destructuring into individual reference arguments (DAG)
1559// =============================================================================
1560//
1561// DAG splat reuses IntoMergeStep/MergeStepCall since DAG steps take inputs
1562// by reference — the function signature is the same as a merge step:
1563// `fn(Params..., &A, &B) -> Out`.
1564//
1565// Builder types are `#[doc(hidden)]` — users only see `.splat().then()`.
1566
1567macro_rules! define_dag_splat_builders {
1568    (
1569        $N:literal,
1570        chain: $SplatChain:ident,
1571        arm: $SplatArm:ident,
1572        arm_start: $SplatArmStart:ident,
1573        splat_then: $SplatThenNode:ident,
1574        splat_arm_start: $SplatArmStartNode:ident,
1575        ($($T:ident),+),
1576        ($($idx:tt),+)
1577    ) => {
1578        // -- Named node: splat + step on upstream chain --
1579
1580        #[doc(hidden)]
1581        pub struct $SplatThenNode<Chain, MS, $($T,)+ NewOut> {
1582            pub(crate) chain: Chain,
1583            pub(crate) merge: MS,
1584            pub(crate) _marker: PhantomData<fn() -> ($($T,)+ NewOut)>,
1585        }
1586
1587        impl<In, Chain, MS, $($T: 'static,)+ NewOut> ChainCall<In>
1588            for $SplatThenNode<Chain, MS, $($T,)+ NewOut>
1589        where
1590            Chain: ChainCall<In, Out = ($($T,)+)>,
1591            MS: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1592        {
1593            type Out = NewOut;
1594
1595            #[inline(always)]
1596            fn call(&mut self, world: &mut World, input: In) -> NewOut {
1597                let tuple = self.chain.call(world, input);
1598                self.merge.call(world, ($(&tuple.$idx,)+))
1599            }
1600        }
1601
1602        // -- Named node: splat at arm start (no upstream chain) --
1603
1604        #[doc(hidden)]
1605        pub struct $SplatArmStartNode<MS, $($T,)+ Out> {
1606            pub(crate) merge: MS,
1607            pub(crate) _marker: PhantomData<fn() -> ($($T,)+ Out)>,
1608        }
1609
1610        impl<'inp, $($T: 'static,)+ MS, Out> ChainCall<&'inp ($($T,)+)>
1611            for $SplatArmStartNode<MS, $($T,)+ Out>
1612        where
1613            MS: for<'x> MergeStepCall<($(&'x $T,)+), Out>,
1614        {
1615            type Out = Out;
1616
1617            #[inline(always)]
1618            fn call(&mut self, world: &mut World, input: &($($T,)+)) -> Out {
1619                self.merge.call(world, ($(&input.$idx,)+))
1620            }
1621        }
1622
1623        // -- Splat builder on main chain --
1624
1625        /// DAG splat builder on the main chain.
1626        #[doc(hidden)]
1627        pub struct $SplatChain<E, $($T,)+ Chain> {
1628            chain: Chain,
1629            _marker: PhantomData<fn(E) -> ($($T,)+)>,
1630        }
1631
1632        impl<E, $($T: 'static,)+ Chain> $SplatChain<E, $($T,)+ Chain> {
1633            /// Add a step that receives the tuple elements as individual `&T` arguments.
1634            pub fn then<NewOut, Params, S>(
1635                self,
1636                f: S,
1637                registry: &Registry,
1638            ) -> DagChain<E, NewOut, $SplatThenNode<Chain, S::Step, $($T,)+ NewOut>>
1639            where
1640                NewOut: 'static,
1641                S: IntoMergeStep<($(&'static $T,)+), NewOut, Params>,
1642                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1643            {
1644                DagChain {
1645                    chain: $SplatThenNode {
1646                        chain: self.chain,
1647                        merge: f.into_merge_step(registry),
1648                        _marker: PhantomData,
1649                    },
1650                    _marker: PhantomData,
1651                }
1652            }
1653        }
1654
1655        impl<E, $($T: 'static,)+ Chain> DagChain<E, ($($T,)+), Chain> {
1656            /// Destructure the tuple output into individual `&T` arguments.
1657            pub fn splat(self) -> $SplatChain<E, $($T,)+ Chain> {
1658                $SplatChain {
1659                    chain: self.chain,
1660                    _marker: PhantomData,
1661                }
1662            }
1663        }
1664
1665        // -- Splat builder within an arm --
1666
1667        /// DAG splat builder within an arm.
1668        #[doc(hidden)]
1669        pub struct $SplatArm<In, $($T,)+ Chain> {
1670            chain: Chain,
1671            _marker: PhantomData<fn(*const In) -> ($($T,)+)>,
1672        }
1673
1674        impl<In: 'static, $($T: 'static,)+ Chain> $SplatArm<In, $($T,)+ Chain> {
1675            /// Add a step that receives the tuple elements as individual `&T` arguments.
1676            pub fn then<NewOut, Params, S>(
1677                self,
1678                f: S,
1679                registry: &Registry,
1680            ) -> DagArm<In, NewOut, $SplatThenNode<Chain, S::Step, $($T,)+ NewOut>>
1681            where
1682                NewOut: 'static,
1683                S: IntoMergeStep<($(&'static $T,)+), NewOut, Params>,
1684                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1685            {
1686                DagArm {
1687                    chain: $SplatThenNode {
1688                        chain: self.chain,
1689                        merge: f.into_merge_step(registry),
1690                        _marker: PhantomData,
1691                    },
1692                    _marker: PhantomData,
1693                }
1694            }
1695        }
1696
1697        impl<In: 'static, $($T: 'static,)+ Chain> DagArm<In, ($($T,)+), Chain> {
1698            /// Destructure the tuple output into individual `&T` arguments.
1699            pub fn splat(self) -> $SplatArm<In, $($T,)+ Chain> {
1700                $SplatArm {
1701                    chain: self.chain,
1702                    _marker: PhantomData,
1703                }
1704            }
1705        }
1706
1707        // -- Splat at arm start position --
1708
1709        /// DAG splat builder at arm start position.
1710        #[doc(hidden)]
1711        pub struct $SplatArmStart<$($T),+>(PhantomData<fn(($($T,)+))>);
1712
1713        impl<$($T: 'static),+> $SplatArmStart<$($T),+> {
1714            /// Add a step that receives the tuple elements as individual `&T` arguments.
1715            pub fn then<Out, Params, S>(
1716                self,
1717                f: S,
1718                registry: &Registry,
1719            ) -> DagArm<($($T,)+), Out, $SplatArmStartNode<S::Step, $($T,)+ Out>>
1720            where
1721                Out: 'static,
1722                S: IntoMergeStep<($(&'static $T,)+), Out, Params>,
1723                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), Out>,
1724            {
1725                DagArm {
1726                    chain: $SplatArmStartNode {
1727                        merge: f.into_merge_step(registry),
1728                        _marker: PhantomData,
1729                    },
1730                    _marker: PhantomData,
1731                }
1732            }
1733        }
1734
1735        impl<$($T: 'static),+> DagArmSeed<($($T,)+)> {
1736            /// Destructure the tuple input into individual `&T` arguments.
1737            pub fn splat(self) -> $SplatArmStart<$($T),+> {
1738                $SplatArmStart(PhantomData)
1739            }
1740        }
1741    };
1742}
1743
1744define_dag_splat_builders!(2,
1745    chain: DagSplatChain2,
1746    arm: DagSplatArm2,
1747    arm_start: DagSplatArmStart2,
1748    splat_then: SplatThenNode2,
1749    splat_arm_start: SplatArmStartNode2,
1750    (T0, T1),
1751    (0, 1)
1752);
1753
1754define_dag_splat_builders!(3,
1755    chain: DagSplatChain3,
1756    arm: DagSplatArm3,
1757    arm_start: DagSplatArmStart3,
1758    splat_then: SplatThenNode3,
1759    splat_arm_start: SplatArmStartNode3,
1760    (T0, T1, T2),
1761    (0, 1, 2)
1762);
1763
1764define_dag_splat_builders!(4,
1765    chain: DagSplatChain4,
1766    arm: DagSplatArm4,
1767    arm_start: DagSplatArmStart4,
1768    splat_then: SplatThenNode4,
1769    splat_arm_start: SplatArmStartNode4,
1770    (T0, T1, T2, T3),
1771    (0, 1, 2, 3)
1772);
1773
1774define_dag_splat_builders!(5,
1775    chain: DagSplatChain5,
1776    arm: DagSplatArm5,
1777    arm_start: DagSplatArmStart5,
1778    splat_then: SplatThenNode5,
1779    splat_arm_start: SplatArmStartNode5,
1780    (T0, T1, T2, T3, T4),
1781    (0, 1, 2, 3, 4)
1782);
1783
1784// =============================================================================
1785// Fork arity macro — arm accumulation, merge, join
1786// =============================================================================
1787
1788/// Generates arm accumulation, merge, and join for a fork type.
1789///
1790/// ChainFork and ArmFork differ only in what output builder they
1791/// produce (DagChain vs DagArm). All dispatch logic lives in the
1792/// named MergeNode/JoinNode types — the macro just wires construction.
1793macro_rules! impl_dag_fork {
1794    (
1795        fork: $Fork:ident,
1796        output: $Output:ident,
1797        upstream: $U:ident
1798    ) => {
1799        // =============================================================
1800        // Arm accumulation: 0→1, 1→2, 2→3, 3→4
1801        // =============================================================
1802
1803        impl<$U, ForkOut, Chain> $Fork<$U, ForkOut, Chain, ()> {
1804            /// Add the first arm to this fork.
1805            pub fn arm<AOut, ACh>(
1806                self,
1807                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1808            ) -> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, AOut, ACh>,)> {
1809                let arm = f(DagArmSeed(PhantomData));
1810                $Fork {
1811                    chain: self.chain,
1812                    arms: (arm,),
1813                    _marker: PhantomData,
1814                }
1815            }
1816        }
1817
1818        impl<$U, ForkOut, Chain, A0, C0> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>,)> {
1819            /// Add a second arm to this fork.
1820            pub fn arm<AOut, ACh>(
1821                self,
1822                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1823            ) -> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, AOut, ACh>)>
1824            {
1825                let arm = f(DagArmSeed(PhantomData));
1826                let (a0,) = self.arms;
1827                $Fork {
1828                    chain: self.chain,
1829                    arms: (a0, arm),
1830                    _marker: PhantomData,
1831                }
1832            }
1833        }
1834
1835        impl<$U, ForkOut, Chain, A0, C0, A1, C1>
1836            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, A1, C1>)>
1837        {
1838            /// Add a third arm to this fork.
1839            pub fn arm<AOut, ACh>(
1840                self,
1841                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1842            ) -> $Fork<
1843                $U,
1844                ForkOut,
1845                Chain,
1846                (
1847                    DagArm<ForkOut, A0, C0>,
1848                    DagArm<ForkOut, A1, C1>,
1849                    DagArm<ForkOut, AOut, ACh>,
1850                ),
1851            > {
1852                let arm = f(DagArmSeed(PhantomData));
1853                let (a0, a1) = self.arms;
1854                $Fork {
1855                    chain: self.chain,
1856                    arms: (a0, a1, arm),
1857                    _marker: PhantomData,
1858                }
1859            }
1860        }
1861
1862        impl<$U, ForkOut, Chain, A0, C0, A1, C1, A2, C2>
1863            $Fork<
1864                $U,
1865                ForkOut,
1866                Chain,
1867                (
1868                    DagArm<ForkOut, A0, C0>,
1869                    DagArm<ForkOut, A1, C1>,
1870                    DagArm<ForkOut, A2, C2>,
1871                ),
1872            >
1873        {
1874            /// Add a fourth arm to this fork.
1875            pub fn arm<AOut, ACh>(
1876                self,
1877                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1878            ) -> $Fork<
1879                $U,
1880                ForkOut,
1881                Chain,
1882                (
1883                    DagArm<ForkOut, A0, C0>,
1884                    DagArm<ForkOut, A1, C1>,
1885                    DagArm<ForkOut, A2, C2>,
1886                    DagArm<ForkOut, AOut, ACh>,
1887                ),
1888            > {
1889                let arm = f(DagArmSeed(PhantomData));
1890                let (a0, a1, a2) = self.arms;
1891                $Fork {
1892                    chain: self.chain,
1893                    arms: (a0, a1, a2, arm),
1894                    _marker: PhantomData,
1895                }
1896            }
1897        }
1898
1899        // =============================================================
1900        // Merge arity 2
1901        // =============================================================
1902
1903        impl<$U, ForkOut: 'static, Chain, A0: 'static, C0, A1: 'static, C1>
1904            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, A1, C1>)>
1905        {
1906            /// Merge two arms with a merge step.
1907            pub fn merge<MOut, Params, S>(
1908                self,
1909                f: S,
1910                registry: &Registry,
1911            ) -> $Output<
1912                $U,
1913                MOut,
1914                MergeNode2<Chain, C0, C1, S::Step, ForkOut, A0, A1, MOut>,
1915            >
1916            where
1917                MOut: 'static,
1918                S: IntoMergeStep<(&'static A0, &'static A1), MOut, Params>,
1919                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1), MOut>,
1920            {
1921                let (a0, a1) = self.arms;
1922                $Output {
1923                    chain: MergeNode2 {
1924                        chain: self.chain,
1925                        arm0: a0.chain,
1926                        arm1: a1.chain,
1927                        merge: f.into_merge_step(registry),
1928                        _marker: PhantomData,
1929                    },
1930                    _marker: PhantomData,
1931                }
1932            }
1933        }
1934
1935        impl<$U, ForkOut: 'static, Chain, C0, C1>
1936            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, (), C0>, DagArm<ForkOut, (), C1>)>
1937        {
1938            /// Join two sink arms (all producing `()`).
1939            pub fn join(
1940                self,
1941            ) -> $Output<$U, (), JoinNode2<Chain, C0, C1, ForkOut>> {
1942                let (a0, a1) = self.arms;
1943                $Output {
1944                    chain: JoinNode2 {
1945                        chain: self.chain,
1946                        arm0: a0.chain,
1947                        arm1: a1.chain,
1948                        _marker: PhantomData,
1949                    },
1950                    _marker: PhantomData,
1951                }
1952            }
1953        }
1954
1955        // =============================================================
1956        // Merge arity 3
1957        // =============================================================
1958
1959        impl<$U, ForkOut: 'static, Chain, A0: 'static, C0, A1: 'static, C1, A2: 'static, C2>
1960            $Fork<
1961                $U,
1962                ForkOut,
1963                Chain,
1964                (
1965                    DagArm<ForkOut, A0, C0>,
1966                    DagArm<ForkOut, A1, C1>,
1967                    DagArm<ForkOut, A2, C2>,
1968                ),
1969            >
1970        {
1971            /// Merge three arms with a merge step.
1972            pub fn merge<MOut, Params, S>(
1973                self,
1974                f: S,
1975                registry: &Registry,
1976            ) -> $Output<
1977                $U,
1978                MOut,
1979                MergeNode3<Chain, C0, C1, C2, S::Step, ForkOut, A0, A1, A2, MOut>,
1980            >
1981            where
1982                MOut: 'static,
1983                S: IntoMergeStep<(&'static A0, &'static A1, &'static A2), MOut, Params>,
1984                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2), MOut>,
1985            {
1986                let (a0, a1, a2) = self.arms;
1987                $Output {
1988                    chain: MergeNode3 {
1989                        chain: self.chain,
1990                        arm0: a0.chain,
1991                        arm1: a1.chain,
1992                        arm2: a2.chain,
1993                        merge: f.into_merge_step(registry),
1994                        _marker: PhantomData,
1995                    },
1996                    _marker: PhantomData,
1997                }
1998            }
1999        }
2000
2001        impl<$U, ForkOut: 'static, Chain, C0, C1, C2>
2002            $Fork<
2003                $U,
2004                ForkOut,
2005                Chain,
2006                (
2007                    DagArm<ForkOut, (), C0>,
2008                    DagArm<ForkOut, (), C1>,
2009                    DagArm<ForkOut, (), C2>,
2010                ),
2011            >
2012        {
2013            /// Join three sink arms (all producing `()`).
2014            pub fn join(
2015                self,
2016            ) -> $Output<$U, (), JoinNode3<Chain, C0, C1, C2, ForkOut>> {
2017                let (a0, a1, a2) = self.arms;
2018                $Output {
2019                    chain: JoinNode3 {
2020                        chain: self.chain,
2021                        arm0: a0.chain,
2022                        arm1: a1.chain,
2023                        arm2: a2.chain,
2024                        _marker: PhantomData,
2025                    },
2026                    _marker: PhantomData,
2027                }
2028            }
2029        }
2030
2031        // =============================================================
2032        // Merge arity 4
2033        // =============================================================
2034
2035        #[allow(clippy::many_single_char_names)]
2036        impl<
2037            $U,
2038            ForkOut: 'static,
2039            Chain,
2040            A0: 'static,
2041            C0,
2042            A1: 'static,
2043            C1,
2044            A2: 'static,
2045            C2,
2046            A3: 'static,
2047            C3,
2048        >
2049            $Fork<
2050                $U,
2051                ForkOut,
2052                Chain,
2053                (
2054                    DagArm<ForkOut, A0, C0>,
2055                    DagArm<ForkOut, A1, C1>,
2056                    DagArm<ForkOut, A2, C2>,
2057                    DagArm<ForkOut, A3, C3>,
2058                ),
2059            >
2060        {
2061            /// Merge four arms with a merge step.
2062            pub fn merge<MOut, Params, S>(
2063                self,
2064                f: S,
2065                registry: &Registry,
2066            ) -> $Output<
2067                $U,
2068                MOut,
2069                MergeNode4<Chain, C0, C1, C2, C3, S::Step, ForkOut, A0, A1, A2, A3, MOut>,
2070            >
2071            where
2072                MOut: 'static,
2073                S: IntoMergeStep<
2074                    (&'static A0, &'static A1, &'static A2, &'static A3),
2075                    MOut,
2076                    Params,
2077                >,
2078                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2, &'x A3), MOut>,
2079            {
2080                let (a0, a1, a2, a3) = self.arms;
2081                $Output {
2082                    chain: MergeNode4 {
2083                        chain: self.chain,
2084                        arm0: a0.chain,
2085                        arm1: a1.chain,
2086                        arm2: a2.chain,
2087                        arm3: a3.chain,
2088                        merge: f.into_merge_step(registry),
2089                        _marker: PhantomData,
2090                    },
2091                    _marker: PhantomData,
2092                }
2093            }
2094        }
2095
2096        impl<$U, ForkOut: 'static, Chain, C0, C1, C2, C3>
2097            $Fork<
2098                $U,
2099                ForkOut,
2100                Chain,
2101                (
2102                    DagArm<ForkOut, (), C0>,
2103                    DagArm<ForkOut, (), C1>,
2104                    DagArm<ForkOut, (), C2>,
2105                    DagArm<ForkOut, (), C3>,
2106                ),
2107            >
2108        {
2109            /// Join four sink arms (all producing `()`).
2110            pub fn join(
2111                self,
2112            ) -> $Output<$U, (), JoinNode4<Chain, C0, C1, C2, C3, ForkOut>> {
2113                let (a0, a1, a2, a3) = self.arms;
2114                $Output {
2115                    chain: JoinNode4 {
2116                        chain: self.chain,
2117                        arm0: a0.chain,
2118                        arm1: a1.chain,
2119                        arm2: a2.chain,
2120                        arm3: a3.chain,
2121                        _marker: PhantomData,
2122                    },
2123                    _marker: PhantomData,
2124                }
2125            }
2126        }
2127    };
2128}
2129
2130impl_dag_fork!(fork: DagChainFork, output: DagChain, upstream: E);
2131impl_dag_fork!(fork: DagArmFork, output: DagArm, upstream: In);
2132
2133// =============================================================================
2134// build_batch — when Out: PipelineOutput (() or Option<()>)
2135// =============================================================================
2136
2137impl<E, Out: crate::PipelineOutput, Chain: ChainCall<E, Out = Out>> DagChain<E, Out, Chain> {
2138    /// Finalize into a [`BatchDag`] with a pre-allocated input buffer.
2139    ///
2140    /// Same DAG chain as [`build`](DagChain::build), but the DAG owns an
2141    /// input buffer that drivers fill between dispatch cycles. Each call
2142    /// to [`BatchDag::run`] drains the buffer, running every item through
2143    /// the chain independently.
2144    ///
2145    /// Available when the DAG ends with `()` or `Option<()>` (e.g.
2146    /// after `.guard()` or `.filter()` followed by `.unwrap_or(())`).
2147    ///
2148    /// `capacity` is the initial allocation — the buffer can grow if needed,
2149    /// but sizing it for the expected batch size avoids reallocation.
2150    #[must_use = "building a DAG without storing it does nothing"]
2151    pub fn build_batch(self, capacity: usize) -> BatchDag<E, Chain> {
2152        BatchDag {
2153            input: Vec::with_capacity(capacity),
2154            chain: self.chain,
2155        }
2156    }
2157}
2158
2159// =============================================================================
2160// BatchDag<E, F> — DAG with owned input buffer
2161// =============================================================================
2162
2163/// Batch DAG that owns a pre-allocated input buffer.
2164///
2165/// Created by [`DagChain::build_batch`]. Each item flows through the
2166/// full DAG chain independently — the same per-item `Option` and
2167/// `Result` flow control as [`Dag`]. Errors are handled inline (via
2168/// `.catch()`, `.unwrap_or()`, etc.) and the batch continues to the
2169/// next item.
2170///
2171/// Unlike [`Dag`], `BatchDag` does not implement [`Handler`] — it is
2172/// driven directly by the owner via [`run()`](BatchDag::run).
2173///
2174/// # Examples
2175///
2176/// ```
2177/// use nexus_rt::{WorldBuilder, ResMut, Resource};
2178/// use nexus_rt::dag::DagBuilder;
2179///
2180/// #[derive(Resource)]
2181/// struct Accum(u64);
2182///
2183/// let mut wb = WorldBuilder::new();
2184/// wb.register(Accum(0));
2185/// let mut world = wb.build();
2186/// let reg = world.registry();
2187///
2188/// fn double(x: u32) -> u64 { x as u64 * 2 }
2189/// fn store(mut out: ResMut<Accum>, val: &u64) { out.0 += *val; }
2190///
2191/// let mut batch = DagBuilder::<u32>::new()
2192///     .root(double, reg)
2193///     .then(store, reg)
2194///     .build_batch(8);
2195///
2196/// batch.input_mut().extend([1, 2, 3]);
2197/// batch.run(&mut world);
2198///
2199/// assert_eq!(world.resource::<Accum>().0, 12); // 2 + 4 + 6
2200/// assert!(batch.input().is_empty());
2201/// ```
2202pub struct BatchDag<E, F> {
2203    input: Vec<E>,
2204    chain: F,
2205}
2206
2207impl<E, Out: crate::PipelineOutput, F: ChainCall<E, Out = Out>> BatchDag<E, F> {
2208    /// Mutable access to the input buffer. Drivers fill this between
2209    /// dispatch cycles.
2210    pub fn input_mut(&mut self) -> &mut Vec<E> {
2211        &mut self.input
2212    }
2213
2214    /// Read-only access to the input buffer.
2215    pub fn input(&self) -> &[E] {
2216        &self.input
2217    }
2218
2219    /// Drain the input buffer, running each item through the DAG.
2220    ///
2221    /// Each item gets independent `Option`/`Result` flow control — an
2222    /// error on one item does not affect subsequent items. After `run()`,
2223    /// the input buffer is empty but retains its allocation.
2224    pub fn run(&mut self, world: &mut World) {
2225        for item in self.input.drain(..) {
2226            let _ = self.chain.call(world, item);
2227        }
2228    }
2229}
2230
2231// =============================================================================
2232// resolve_arm — pre-resolve a step for manual dispatch
2233// =============================================================================
2234
2235/// Resolve a step for use in manual dispatch (e.g. inside an
2236/// opaque `.then()` closure).
2237///
2238/// Returns a closure with pre-resolved [`Param`](crate::Param) state —
2239/// the same build-time resolution that `.then()` performs, but as a
2240/// standalone value the caller can invoke from any context.
2241///
2242/// # Examples
2243///
2244/// ```ignore
2245/// let mut arm0 = resolve_arm(handle_new, reg);
2246/// let mut arm1 = resolve_arm(handle_cancel, reg);
2247///
2248/// dag.then(move |world: &mut World, msg: &Decoded| match msg.kind {
2249///     MsgKind::NewOrder => arm0(world, msg),
2250///     MsgKind::Cancel   => arm1(world, msg),
2251/// }, reg)
2252/// ```
2253pub fn resolve_arm<In, Out, Params, S>(
2254    f: S,
2255    registry: &Registry,
2256) -> impl FnMut(&mut World, &In) -> Out + use<In, Out, Params, S>
2257where
2258    In: 'static,
2259    Out: 'static,
2260    S: IntoStep<&'static In, Out, Params>,
2261    S::Step: for<'a> StepCall<&'a In, Out = Out>,
2262{
2263    let mut resolved = f.into_step(registry);
2264    move |world: &mut World, input: &In| resolved.call(world, input)
2265}
2266
2267// =============================================================================
2268// Tests
2269// =============================================================================
2270
2271#[cfg(test)]
2272#[allow(
2273    clippy::ref_option,
2274    clippy::unnecessary_wraps,
2275    clippy::needless_pass_by_value,
2276    clippy::trivially_copy_pass_by_ref,
2277    clippy::ptr_arg
2278)]
2279mod tests {
2280    use super::*;
2281    use crate::{IntoHandler, Res, ResMut, Virtual, WorldBuilder};
2282
2283    // -- Linear chains --
2284
2285    #[test]
2286    fn dag_linear_2() {
2287        let mut wb = WorldBuilder::new();
2288        wb.register::<u64>(0);
2289        let mut world = wb.build();
2290        let reg = world.registry();
2291
2292        fn root_mul2(x: u32) -> u64 {
2293            x as u64 * 2
2294        }
2295        fn store(mut out: ResMut<u64>, val: &u64) {
2296            *out = *val;
2297        }
2298
2299        let mut dag = DagBuilder::<u32>::new()
2300            .root(root_mul2, reg)
2301            .then(store, reg)
2302            .build();
2303
2304        dag.run(&mut world, 5u32);
2305        assert_eq!(*world.resource::<u64>(), 10);
2306    }
2307
2308    #[test]
2309    fn dag_linear_3() {
2310        let mut wb = WorldBuilder::new();
2311        wb.register::<u64>(0);
2312        let mut world = wb.build();
2313        let reg = world.registry();
2314
2315        fn root_mul2(x: u32) -> u64 {
2316            x as u64 * 2
2317        }
2318        fn add_one(val: &u64) -> u64 {
2319            *val + 1
2320        }
2321        fn store(mut out: ResMut<u64>, val: &u64) {
2322            *out = *val;
2323        }
2324
2325        let mut dag = DagBuilder::<u32>::new()
2326            .root(root_mul2, reg)
2327            .then(add_one, reg)
2328            .then(store, reg)
2329            .build();
2330
2331        dag.run(&mut world, 5u32);
2332        assert_eq!(*world.resource::<u64>(), 11); // (5*2)+1
2333    }
2334
2335    #[test]
2336    fn dag_linear_5() {
2337        let mut wb = WorldBuilder::new();
2338        wb.register::<u64>(0);
2339        let mut world = wb.build();
2340        let reg = world.registry();
2341
2342        fn root_id(x: u32) -> u64 {
2343            x as u64
2344        }
2345        fn add_one(val: &u64) -> u64 {
2346            *val + 1
2347        }
2348        fn store(mut out: ResMut<u64>, val: &u64) {
2349            *out = *val;
2350        }
2351
2352        let mut dag = DagBuilder::<u32>::new()
2353            .root(root_id, reg)
2354            .then(add_one, reg)
2355            .then(add_one, reg)
2356            .then(add_one, reg)
2357            .then(store, reg)
2358            .build();
2359
2360        dag.run(&mut world, 0u32);
2361        assert_eq!(*world.resource::<u64>(), 3); // 0+1+1+1
2362    }
2363
2364    // -- Diamond: root → [a, b] → merge → sink --
2365
2366    #[test]
2367    fn dag_diamond() {
2368        let mut wb = WorldBuilder::new();
2369        wb.register::<u64>(0);
2370        let mut world = wb.build();
2371        let reg = world.registry();
2372
2373        fn root_mul2(x: u32) -> u32 {
2374            x.wrapping_mul(2)
2375        }
2376        fn add_one(val: &u32) -> u32 {
2377            val.wrapping_add(1)
2378        }
2379        fn mul3(val: &u32) -> u32 {
2380            val.wrapping_mul(3)
2381        }
2382        fn merge_add(a: &u32, b: &u32) -> u32 {
2383            a.wrapping_add(*b)
2384        }
2385        fn store(mut out: ResMut<u64>, val: &u32) {
2386            *out = *val as u64;
2387        }
2388
2389        let mut dag = DagBuilder::<u32>::new()
2390            .root(root_mul2, reg)
2391            .fork()
2392            .arm(|a| a.then(add_one, reg))
2393            .arm(|b| b.then(mul3, reg))
2394            .merge(merge_add, reg)
2395            .then(store, reg)
2396            .build();
2397
2398        dag.run(&mut world, 5u32);
2399        // root: 10, arm_a: 11, arm_b: 30, merge: 41
2400        assert_eq!(*world.resource::<u64>(), 41);
2401    }
2402
2403    // -- Fan-out to sinks (.join()) --
2404
2405    #[test]
2406    fn dag_fan_out_join() {
2407        let mut wb = WorldBuilder::new();
2408        wb.register::<u64>(0);
2409        wb.register::<i64>(0);
2410        let mut world = wb.build();
2411        let reg = world.registry();
2412
2413        fn root_id(x: u32) -> u64 {
2414            x as u64
2415        }
2416        fn sink_u64(mut out: ResMut<u64>, val: &u64) {
2417            *out = *val * 2;
2418        }
2419        fn sink_i64(mut out: ResMut<i64>, val: &u64) {
2420            *out = *val as i64 * 3;
2421        }
2422
2423        let mut dag = DagBuilder::<u32>::new()
2424            .root(root_id, reg)
2425            .fork()
2426            .arm(|a| a.then(sink_u64, reg))
2427            .arm(|b| b.then(sink_i64, reg))
2428            .join()
2429            .build();
2430
2431        dag.run(&mut world, 5u32);
2432        assert_eq!(*world.resource::<u64>(), 10);
2433        assert_eq!(*world.resource::<i64>(), 15);
2434    }
2435
2436    // -- Nested fork within an arm --
2437
2438    #[test]
2439    fn dag_nested_fork() {
2440        let mut wb = WorldBuilder::new();
2441        wb.register::<u64>(0);
2442        let mut world = wb.build();
2443        let reg = world.registry();
2444
2445        fn root_id(x: u32) -> u32 {
2446            x
2447        }
2448        fn add_10(val: &u32) -> u32 {
2449            val.wrapping_add(10)
2450        }
2451        fn mul2(val: &u32) -> u32 {
2452            val.wrapping_mul(2)
2453        }
2454        fn mul3(val: &u32) -> u32 {
2455            val.wrapping_mul(3)
2456        }
2457        fn inner_merge(a: &u32, b: &u32) -> u32 {
2458            a.wrapping_add(*b)
2459        }
2460        fn outer_merge(a: &u32, b: &u32) -> u32 {
2461            a.wrapping_add(*b)
2462        }
2463        fn store(mut out: ResMut<u64>, val: &u32) {
2464            *out = *val as u64;
2465        }
2466
2467        // root(5)=5 → fork
2468        //   arm_a: add_10(5)=15 → fork
2469        //     sub_c: mul2(15)=30
2470        //     sub_d: mul3(15)=45
2471        //     inner_merge(30,45)=75
2472        //   arm_b: mul3(5)=15
2473        // outer_merge(75,15)=90
2474        let mut dag = DagBuilder::<u32>::new()
2475            .root(root_id, reg)
2476            .fork()
2477            .arm(|a| {
2478                a.then(add_10, reg)
2479                    .fork()
2480                    .arm(|c| c.then(mul2, reg))
2481                    .arm(|d| d.then(mul3, reg))
2482                    .merge(inner_merge, reg)
2483            })
2484            .arm(|b| b.then(mul3, reg))
2485            .merge(outer_merge, reg)
2486            .then(store, reg)
2487            .build();
2488
2489        dag.run(&mut world, 5u32);
2490        assert_eq!(*world.resource::<u64>(), 90);
2491    }
2492
2493    // -- Complex topology: asymmetric arm lengths --
2494
2495    #[test]
2496    fn dag_complex_topology() {
2497        let mut wb = WorldBuilder::new();
2498        wb.register::<u64>(0);
2499        let mut world = wb.build();
2500        let reg = world.registry();
2501
2502        fn root_mul2(x: u32) -> u32 {
2503            x.wrapping_mul(2)
2504        }
2505        fn add_one(val: &u32) -> u32 {
2506            val.wrapping_add(1)
2507        }
2508        fn add_then_mul2(val: &u32) -> u32 {
2509            val.wrapping_add(1).wrapping_mul(2)
2510        }
2511        fn mul3(val: &u32) -> u32 {
2512            val.wrapping_mul(3)
2513        }
2514        fn merge_add(a: &u32, b: &u32) -> u32 {
2515            a.wrapping_add(*b)
2516        }
2517        fn store(mut out: ResMut<u64>, val: &u32) {
2518            *out = *val as u64;
2519        }
2520
2521        // root(5)=10 → fork
2522        //   a: add_one(10)=11 → add_then_mul2(11)=24
2523        //   b: mul3(10)=30
2524        // merge(24, 30) = 54
2525        let mut dag = DagBuilder::<u32>::new()
2526            .root(root_mul2, reg)
2527            .fork()
2528            .arm(|a| a.then(add_one, reg).then(add_then_mul2, reg))
2529            .arm(|b| b.then(mul3, reg))
2530            .merge(merge_add, reg)
2531            .then(store, reg)
2532            .build();
2533
2534        dag.run(&mut world, 5u32);
2535        assert_eq!(*world.resource::<u64>(), 54);
2536    }
2537
2538    // -- Boxable into Box<dyn Handler<E>> --
2539
2540    #[test]
2541    fn dag_boxable() {
2542        let mut wb = WorldBuilder::new();
2543        wb.register::<u64>(0);
2544        let mut world = wb.build();
2545        let reg = world.registry();
2546
2547        fn root_id(x: u32) -> u64 {
2548            x as u64
2549        }
2550        fn store(mut out: ResMut<u64>, val: &u64) {
2551            *out = *val;
2552        }
2553
2554        let mut boxed: Virtual<u32> = Box::new(
2555            DagBuilder::<u32>::new()
2556                .root(root_id, reg)
2557                .then(store, reg)
2558                .build(),
2559        );
2560        boxed.run(&mut world, 77u32);
2561        assert_eq!(*world.resource::<u64>(), 77);
2562    }
2563
2564    // -- World access (Res<T>, ResMut<T>) in nodes --
2565
2566    #[test]
2567    fn dag_world_access() {
2568        let mut wb = WorldBuilder::new();
2569        wb.register::<u64>(10); // factor
2570        wb.register::<String>(String::new());
2571        let mut world = wb.build();
2572        let reg = world.registry();
2573
2574        fn scale(factor: Res<u64>, val: &u32) -> u64 {
2575            *factor * (*val as u64)
2576        }
2577        fn store(mut out: ResMut<String>, val: &u64) {
2578            *out = val.to_string();
2579        }
2580
2581        let mut dag = DagBuilder::<u32>::new()
2582            .root(|x: u32| x, reg)
2583            .then(scale, reg)
2584            .then(store, reg)
2585            .build();
2586
2587        dag.run(&mut world, 7u32);
2588        assert_eq!(world.resource::<String>().as_str(), "70");
2589    }
2590
2591    // -- Root-only (terminal root outputting ()) --
2592
2593    #[test]
2594    fn dag_root_only() {
2595        let mut wb = WorldBuilder::new();
2596        wb.register::<u64>(0);
2597        let mut world = wb.build();
2598        let reg = world.registry();
2599
2600        let mut dag = DagBuilder::<u32>::new()
2601            .root(
2602                |mut out: ResMut<u64>, x: u32| {
2603                    *out = x as u64;
2604                },
2605                reg,
2606            )
2607            .build();
2608
2609        dag.run(&mut world, 42u32);
2610        assert_eq!(*world.resource::<u64>(), 42);
2611    }
2612
2613    // -- Multiple dispatches reuse state --
2614
2615    #[test]
2616    fn dag_multiple_dispatches() {
2617        let mut wb = WorldBuilder::new();
2618        wb.register::<u64>(0);
2619        let mut world = wb.build();
2620        let reg = world.registry();
2621
2622        fn root_id(x: u32) -> u64 {
2623            x as u64
2624        }
2625        fn store(mut out: ResMut<u64>, val: &u64) {
2626            *out = *val;
2627        }
2628
2629        let mut dag = DagBuilder::<u32>::new()
2630            .root(root_id, reg)
2631            .then(store, reg)
2632            .build();
2633
2634        dag.run(&mut world, 1u32);
2635        assert_eq!(*world.resource::<u64>(), 1);
2636        dag.run(&mut world, 2u32);
2637        assert_eq!(*world.resource::<u64>(), 2);
2638        dag.run(&mut world, 3u32);
2639        assert_eq!(*world.resource::<u64>(), 3);
2640    }
2641
2642    // -- 3-way merge --
2643
2644    #[test]
2645    fn dag_3way_merge() {
2646        let mut wb = WorldBuilder::new();
2647        wb.register::<String>(String::new());
2648        let mut world = wb.build();
2649        let reg = world.registry();
2650
2651        fn root_id(x: u32) -> u64 {
2652            x as u64
2653        }
2654        fn mul1(val: &u64) -> u64 {
2655            *val
2656        }
2657        fn mul2(val: &u64) -> u64 {
2658            *val * 2
2659        }
2660        fn mul3(val: &u64) -> u64 {
2661            *val * 3
2662        }
2663        fn merge3_fmt(mut out: ResMut<String>, a: &u64, b: &u64, c: &u64) {
2664            *out = format!("{},{},{}", a, b, c);
2665        }
2666
2667        let mut dag = DagBuilder::<u32>::new()
2668            .root(root_id, reg)
2669            .fork()
2670            .arm(|a| a.then(mul1, reg))
2671            .arm(|b| b.then(mul2, reg))
2672            .arm(|c| c.then(mul3, reg))
2673            .merge(merge3_fmt, reg)
2674            .build();
2675
2676        dag.run(&mut world, 10u32);
2677        assert_eq!(world.resource::<String>().as_str(), "10,20,30");
2678    }
2679
2680    // -- DAG combinators --
2681
2682    #[test]
2683    fn dag_dispatch() {
2684        fn root(x: u32) -> u64 {
2685            x as u64 + 42
2686        }
2687        fn sink(mut out: ResMut<u64>, event: u64) {
2688            *out = event;
2689        }
2690        let mut wb = WorldBuilder::new();
2691        wb.register::<u64>(0);
2692        let mut world = wb.build();
2693        let reg = world.registry();
2694
2695        let mut dag = DagBuilder::<u32>::new()
2696            .root(root, reg)
2697            .dispatch(sink.into_handler(reg))
2698            .build();
2699
2700        dag.run(&mut world, 0u32);
2701        assert_eq!(*world.resource::<u64>(), 42);
2702    }
2703
2704    #[test]
2705    fn dag_option_map() {
2706        fn root(_x: u32) -> Option<u64> {
2707            Some(10)
2708        }
2709        fn double(val: &u64) -> u64 {
2710            *val * 2
2711        }
2712        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2713            *out = val.unwrap_or(0);
2714        }
2715        let mut wb = WorldBuilder::new();
2716        wb.register::<u64>(0);
2717        let mut world = wb.build();
2718        let reg = world.registry();
2719
2720        let mut dag = DagBuilder::<u32>::new()
2721            .root(root, reg)
2722            .map(double, reg)
2723            .then(sink, reg)
2724            .build();
2725
2726        dag.run(&mut world, 0u32);
2727        assert_eq!(*world.resource::<u64>(), 20);
2728    }
2729
2730    #[test]
2731    fn dag_option_map_none() {
2732        fn root(_x: u32) -> Option<u64> {
2733            None
2734        }
2735        fn double(val: &u64) -> u64 {
2736            *val * 2
2737        }
2738        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2739            *out = val.unwrap_or(999);
2740        }
2741        let mut wb = WorldBuilder::new();
2742        wb.register::<u64>(0);
2743        let mut world = wb.build();
2744        let reg = world.registry();
2745
2746        let mut dag = DagBuilder::<u32>::new()
2747            .root(root, reg)
2748            .map(double, reg)
2749            .then(sink, reg)
2750            .build();
2751
2752        dag.run(&mut world, 0u32);
2753        assert_eq!(*world.resource::<u64>(), 999);
2754    }
2755
2756    #[test]
2757    fn dag_option_and_then() {
2758        fn root(_x: u32) -> Option<u64> {
2759            Some(5)
2760        }
2761        fn check(val: &u64) -> Option<u64> {
2762            if *val > 3 { Some(*val * 10) } else { None }
2763        }
2764        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2765            *out = val.unwrap_or(0);
2766        }
2767        let mut wb = WorldBuilder::new();
2768        wb.register::<u64>(0);
2769        let mut world = wb.build();
2770        let reg = world.registry();
2771
2772        let mut dag = DagBuilder::<u32>::new()
2773            .root(root, reg)
2774            .and_then(check, reg)
2775            .then(sink, reg)
2776            .build();
2777
2778        dag.run(&mut world, 0u32);
2779        assert_eq!(*world.resource::<u64>(), 50);
2780    }
2781
2782    #[test]
2783    fn dag_option_filter_keeps() {
2784        fn root(_x: u32) -> Option<u64> {
2785            Some(5)
2786        }
2787        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2788            *out = val.unwrap_or(0);
2789        }
2790        let mut wb = WorldBuilder::new();
2791        wb.register::<u64>(0);
2792        let mut world = wb.build();
2793
2794        let mut dag = DagBuilder::<u32>::new()
2795            .root(root, world.registry())
2796            .filter(|v: &u64| *v > 3, world.registry())
2797            .then(sink, world.registry())
2798            .build();
2799
2800        dag.run(&mut world, 0u32);
2801        assert_eq!(*world.resource::<u64>(), 5);
2802    }
2803
2804    #[test]
2805    fn dag_option_filter_drops() {
2806        fn root(_x: u32) -> Option<u64> {
2807            Some(5)
2808        }
2809        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2810            *out = val.unwrap_or(0);
2811        }
2812        let mut wb = WorldBuilder::new();
2813        wb.register::<u64>(0);
2814        let mut world = wb.build();
2815
2816        let mut dag = DagBuilder::<u32>::new()
2817            .root(root, world.registry())
2818            .filter(|v: &u64| *v > 10, world.registry())
2819            .then(sink, world.registry())
2820            .build();
2821
2822        dag.run(&mut world, 0u32);
2823        assert_eq!(*world.resource::<u64>(), 0);
2824    }
2825
2826    #[test]
2827    fn dag_option_on_none() {
2828        fn root(_x: u32) -> Option<u64> {
2829            None
2830        }
2831        fn sink(_val: &Option<u64>) {}
2832        let mut wb = WorldBuilder::new();
2833        wb.register::<bool>(false);
2834        let mut world = wb.build();
2835        let reg = world.registry();
2836
2837        let mut dag = DagBuilder::<u32>::new()
2838            .root(root, reg)
2839            .on_none(
2840                |w: &mut World| {
2841                    *w.resource_mut::<bool>() = true;
2842                },
2843                reg,
2844            )
2845            .then(sink, reg)
2846            .build();
2847
2848        dag.run(&mut world, 0u32);
2849        assert!(*world.resource::<bool>());
2850    }
2851
2852    #[test]
2853    fn dag_option_unwrap_or() {
2854        fn root(_x: u32) -> Option<u64> {
2855            None
2856        }
2857        fn sink(mut out: ResMut<u64>, val: &u64) {
2858            *out = *val;
2859        }
2860        let mut wb = WorldBuilder::new();
2861        wb.register::<u64>(0);
2862        let mut world = wb.build();
2863        let reg = world.registry();
2864
2865        let mut dag = DagBuilder::<u32>::new()
2866            .root(root, reg)
2867            .unwrap_or(42u64)
2868            .then(sink, reg)
2869            .build();
2870
2871        dag.run(&mut world, 0u32);
2872        assert_eq!(*world.resource::<u64>(), 42);
2873    }
2874
2875    #[test]
2876    fn dag_option_ok_or() {
2877        fn root(_x: u32) -> Option<u64> {
2878            None
2879        }
2880        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2881            *out = val.as_ref().map_or(999, |v| *v);
2882        }
2883        let mut wb = WorldBuilder::new();
2884        wb.register::<u64>(0);
2885        let mut world = wb.build();
2886        let reg = world.registry();
2887
2888        let mut dag = DagBuilder::<u32>::new()
2889            .root(root, reg)
2890            .ok_or("missing")
2891            .then(sink, reg)
2892            .build();
2893
2894        dag.run(&mut world, 0u32);
2895        assert_eq!(*world.resource::<u64>(), 999);
2896    }
2897
2898    #[test]
2899    fn dag_result_map() {
2900        fn root(_x: u32) -> Result<u64, &'static str> {
2901            Ok(10)
2902        }
2903        fn double(val: &u64) -> u64 {
2904            *val * 2
2905        }
2906        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2907            *out = val.as_ref().copied().unwrap_or(0);
2908        }
2909        let mut wb = WorldBuilder::new();
2910        wb.register::<u64>(0);
2911        let mut world = wb.build();
2912        let reg = world.registry();
2913
2914        let mut dag = DagBuilder::<u32>::new()
2915            .root(root, reg)
2916            .map(double, reg)
2917            .then(sink, reg)
2918            .build();
2919
2920        dag.run(&mut world, 0u32);
2921        assert_eq!(*world.resource::<u64>(), 20);
2922    }
2923
2924    #[test]
2925    fn dag_result_and_then() {
2926        fn root(_x: u32) -> Result<u64, &'static str> {
2927            Ok(5)
2928        }
2929        fn check(val: &u64) -> Result<u64, &'static str> {
2930            if *val > 3 {
2931                Ok(*val * 10)
2932            } else {
2933                Err("too small")
2934            }
2935        }
2936        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2937            *out = val.as_ref().copied().unwrap_or(0);
2938        }
2939        let mut wb = WorldBuilder::new();
2940        wb.register::<u64>(0);
2941        let mut world = wb.build();
2942        let reg = world.registry();
2943
2944        let mut dag = DagBuilder::<u32>::new()
2945            .root(root, reg)
2946            .and_then(check, reg)
2947            .then(sink, reg)
2948            .build();
2949
2950        dag.run(&mut world, 0u32);
2951        assert_eq!(*world.resource::<u64>(), 50);
2952    }
2953
2954    #[test]
2955    fn dag_result_catch() {
2956        fn root(_x: u32) -> Result<u64, String> {
2957            Err("oops".into())
2958        }
2959        fn handle_err(mut log: ResMut<String>, err: &String) {
2960            *log = err.clone();
2961        }
2962        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2963            *out = val.unwrap_or(0);
2964        }
2965        let mut wb = WorldBuilder::new();
2966        wb.register::<u64>(0);
2967        wb.register::<String>(String::new());
2968        let mut world = wb.build();
2969        let reg = world.registry();
2970
2971        let mut dag = DagBuilder::<u32>::new()
2972            .root(root, reg)
2973            .catch(handle_err, reg)
2974            .then(sink, reg)
2975            .build();
2976
2977        dag.run(&mut world, 0u32);
2978        assert_eq!(*world.resource::<u64>(), 0);
2979        assert_eq!(world.resource::<String>().as_str(), "oops");
2980    }
2981
2982    #[test]
2983    fn dag_result_ok() {
2984        fn root(_x: u32) -> Result<u64, &'static str> {
2985            Err("fail")
2986        }
2987        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2988            *out = val.unwrap_or(0);
2989        }
2990        let mut wb = WorldBuilder::new();
2991        wb.register::<u64>(0);
2992        let mut world = wb.build();
2993        let reg = world.registry();
2994
2995        let mut dag = DagBuilder::<u32>::new()
2996            .root(root, reg)
2997            .ok()
2998            .then(sink, reg)
2999            .build();
3000
3001        dag.run(&mut world, 0u32);
3002        assert_eq!(*world.resource::<u64>(), 0);
3003    }
3004
3005    #[test]
3006    fn dag_result_unwrap_or_else() {
3007        fn root(_x: u32) -> Result<u64, &'static str> {
3008            Err("fail")
3009        }
3010        fn sink(mut out: ResMut<u64>, val: &u64) {
3011            *out = *val;
3012        }
3013        let mut wb = WorldBuilder::new();
3014        wb.register::<u64>(0);
3015        let mut world = wb.build();
3016        let reg = world.registry();
3017
3018        let mut dag = DagBuilder::<u32>::new()
3019            .root(root, reg)
3020            .unwrap_or_else(|_err: &str| 42u64, reg)
3021            .then(sink, reg)
3022            .build();
3023
3024        dag.run(&mut world, 0u32);
3025        assert_eq!(*world.resource::<u64>(), 42);
3026    }
3027
3028    #[test]
3029    fn dag_result_map_err() {
3030        fn root(_x: u32) -> Result<u64, u32> {
3031            Err(5)
3032        }
3033        fn sink(mut out: ResMut<u64>, val: &Result<u64, String>) {
3034            *out = match val {
3035                Ok(v) => *v,
3036                Err(e) => e.len() as u64,
3037            };
3038        }
3039        let mut wb = WorldBuilder::new();
3040        wb.register::<u64>(0);
3041        let mut world = wb.build();
3042        let reg = world.registry();
3043
3044        let mut dag = DagBuilder::<u32>::new()
3045            .root(root, reg)
3046            .map_err(|e: u32| format!("err:{e}"), reg)
3047            .then(sink, reg)
3048            .build();
3049
3050        dag.run(&mut world, 0u32);
3051        // "err:5".len() == 5
3052        assert_eq!(*world.resource::<u64>(), 5);
3053    }
3054
3055    #[test]
3056    fn dag_arm_combinators() {
3057        fn root(x: u32) -> u64 {
3058            x as u64 + 10
3059        }
3060        fn arm_step(val: &u64) -> Option<u64> {
3061            if *val > 5 { Some(*val * 3) } else { None }
3062        }
3063        fn double(val: &u64) -> u64 {
3064            *val * 2
3065        }
3066        fn merge_fn(a: &u64, b: &u64) -> String {
3067            format!("{a},{b}")
3068        }
3069        fn sink(mut out: ResMut<String>, val: &String) {
3070            *out = val.clone();
3071        }
3072        let mut wb = WorldBuilder::new();
3073        wb.register::<String>(String::new());
3074        let mut world = wb.build();
3075        let reg = world.registry();
3076
3077        // Arm 0: root → arm_step (Option) → unwrap_or(0)
3078        // Arm 1: root → double
3079        let mut dag = DagBuilder::<u32>::new()
3080            .root(root, reg)
3081            .fork()
3082            .arm(|a| a.then(arm_step, reg).unwrap_or(0u64))
3083            .arm(|b| b.then(double, reg))
3084            .merge(merge_fn, reg)
3085            .then(sink, reg)
3086            .build();
3087
3088        dag.run(&mut world, 0u32);
3089        // root(0) = 10
3090        // arm0: 10 > 5 → Some(30) → unwrap → 30
3091        // arm1: 10 * 2 = 20
3092        assert_eq!(world.resource::<String>().as_str(), "30,20");
3093    }
3094
3095    #[test]
3096    fn dag_option_inspect() {
3097        fn root(_x: u32) -> Option<u64> {
3098            Some(42)
3099        }
3100        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3101            *out = val.unwrap_or(0);
3102        }
3103        let mut wb = WorldBuilder::new();
3104        wb.register::<u64>(0);
3105        wb.register::<bool>(false);
3106        let mut world = wb.build();
3107        let reg = world.registry();
3108
3109        let mut dag = DagBuilder::<u32>::new()
3110            .root(root, reg)
3111            .inspect(
3112                |w: &mut World, _val: &u64| {
3113                    *w.resource_mut::<bool>() = true;
3114                },
3115                reg,
3116            )
3117            .then(sink, reg)
3118            .build();
3119
3120        dag.run(&mut world, 0u32);
3121        assert_eq!(*world.resource::<u64>(), 42);
3122        assert!(*world.resource::<bool>());
3123    }
3124
3125    // -- Guard combinator --
3126
3127    #[test]
3128    fn dag_guard_keeps() {
3129        fn root(x: u32) -> u64 {
3130            x as u64
3131        }
3132        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3133            *out = val.unwrap_or(0);
3134        }
3135        let mut wb = WorldBuilder::new();
3136        wb.register::<u64>(0);
3137        let mut world = wb.build();
3138        let reg = world.registry();
3139
3140        let mut dag = DagBuilder::<u32>::new()
3141            .root(root, reg)
3142            .guard(|v: &u64| *v > 3, reg)
3143            .then(sink, reg)
3144            .build();
3145
3146        dag.run(&mut world, 5u32);
3147        assert_eq!(*world.resource::<u64>(), 5);
3148    }
3149
3150    #[test]
3151    fn dag_guard_drops() {
3152        fn root(x: u32) -> u64 {
3153            x as u64
3154        }
3155        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3156            *out = val.unwrap_or(999);
3157        }
3158        let mut wb = WorldBuilder::new();
3159        wb.register::<u64>(0);
3160        let mut world = wb.build();
3161        let reg = world.registry();
3162
3163        let mut dag = DagBuilder::<u32>::new()
3164            .root(root, reg)
3165            .guard(|v: &u64| *v > 10, reg)
3166            .then(sink, reg)
3167            .build();
3168
3169        dag.run(&mut world, 5u32);
3170        assert_eq!(*world.resource::<u64>(), 999);
3171    }
3172
3173    #[test]
3174    fn dag_arm_guard() {
3175        fn root(x: u32) -> u64 {
3176            x as u64
3177        }
3178        fn double(val: &u64) -> u64 {
3179            *val * 2
3180        }
3181        fn merge_fn(a: &Option<u64>, b: &u64) -> String {
3182            format!("{:?},{}", a, b)
3183        }
3184        fn sink(mut out: ResMut<String>, val: &String) {
3185            *out = val.clone();
3186        }
3187        let mut wb = WorldBuilder::new();
3188        wb.register::<String>(String::new());
3189        let mut world = wb.build();
3190        let reg = world.registry();
3191
3192        // arm_a: guard drops (5 < 10), arm_b: runs normally
3193        let mut dag = DagBuilder::<u32>::new()
3194            .root(root, reg)
3195            .fork()
3196            .arm(|a| a.then(double, reg).guard(|v: &u64| *v > 100, reg))
3197            .arm(|b| b.then(double, reg))
3198            .merge(merge_fn, reg)
3199            .then(sink, reg)
3200            .build();
3201
3202        dag.run(&mut world, 5u32);
3203        // arm_a: 10, guard fails → None. arm_b: 10.
3204        assert_eq!(world.resource::<String>().as_str(), "None,10");
3205    }
3206
3207    // -- Tap combinator --
3208
3209    #[test]
3210    fn dag_tap_observes_without_changing() {
3211        fn root(x: u32) -> u64 {
3212            x as u64 * 2
3213        }
3214        fn sink(mut out: ResMut<u64>, val: &u64) {
3215            *out = *val;
3216        }
3217        let mut wb = WorldBuilder::new();
3218        wb.register::<u64>(0);
3219        wb.register::<bool>(false);
3220        let mut world = wb.build();
3221        let reg = world.registry();
3222
3223        let mut dag = DagBuilder::<u32>::new()
3224            .root(root, reg)
3225            .tap(
3226                |w: &mut World, val: &u64| {
3227                    // Side-effect: record that we observed the value.
3228                    *w.resource_mut::<bool>() = *val == 10;
3229                },
3230                reg,
3231            )
3232            .then(sink, reg)
3233            .build();
3234
3235        dag.run(&mut world, 5u32);
3236        assert_eq!(*world.resource::<u64>(), 10); // value passed through
3237        assert!(*world.resource::<bool>()); // tap fired
3238    }
3239
3240    #[test]
3241    fn dag_arm_tap() {
3242        fn root(x: u32) -> u64 {
3243            x as u64
3244        }
3245        fn double(val: &u64) -> u64 {
3246            *val * 2
3247        }
3248        fn merge_add(a: &u64, b: &u64) -> u64 {
3249            *a + *b
3250        }
3251        fn sink(mut out: ResMut<u64>, val: &u64) {
3252            *out = *val;
3253        }
3254        let mut wb = WorldBuilder::new();
3255        wb.register::<u64>(0);
3256        wb.register::<bool>(false);
3257        let mut world = wb.build();
3258        let reg = world.registry();
3259
3260        let mut dag = DagBuilder::<u32>::new()
3261            .root(root, reg)
3262            .fork()
3263            .arm(|a| {
3264                a.then(double, reg).tap(
3265                    |w: &mut World, _v: &u64| {
3266                        *w.resource_mut::<bool>() = true;
3267                    },
3268                    reg,
3269                )
3270            })
3271            .arm(|b| b.then(double, reg))
3272            .merge(merge_add, reg)
3273            .then(sink, reg)
3274            .build();
3275
3276        dag.run(&mut world, 5u32);
3277        // arm_a: 10, arm_b: 10, merge: 20
3278        assert_eq!(*world.resource::<u64>(), 20);
3279        assert!(*world.resource::<bool>()); // tap in arm_a fired
3280    }
3281
3282    // -- Route combinator --
3283
3284    #[test]
3285    fn dag_route_true_arm() {
3286        fn root(x: u32) -> u64 {
3287            x as u64
3288        }
3289        fn double(val: &u64) -> u64 {
3290            *val * 2
3291        }
3292        fn triple(val: &u64) -> u64 {
3293            *val * 3
3294        }
3295        fn sink(mut out: ResMut<u64>, val: &u64) {
3296            *out = *val;
3297        }
3298        let mut wb = WorldBuilder::new();
3299        wb.register::<u64>(0);
3300        let mut world = wb.build();
3301        let reg = world.registry();
3302
3303        let arm_t = DagArmSeed::new().then(double, reg);
3304        let arm_f = DagArmSeed::new().then(triple, reg);
3305
3306        let mut dag = DagBuilder::<u32>::new()
3307            .root(root, reg)
3308            .route(|v: &u64| *v > 3, reg, arm_t, arm_f)
3309            .then(sink, reg)
3310            .build();
3311
3312        dag.run(&mut world, 5u32); // 5 > 3 → true arm → double → 10
3313        assert_eq!(*world.resource::<u64>(), 10);
3314    }
3315
3316    #[test]
3317    fn dag_route_false_arm() {
3318        fn root(x: u32) -> u64 {
3319            x as u64
3320        }
3321        fn double(val: &u64) -> u64 {
3322            *val * 2
3323        }
3324        fn triple(val: &u64) -> u64 {
3325            *val * 3
3326        }
3327        fn sink(mut out: ResMut<u64>, val: &u64) {
3328            *out = *val;
3329        }
3330        let mut wb = WorldBuilder::new();
3331        wb.register::<u64>(0);
3332        let mut world = wb.build();
3333        let reg = world.registry();
3334
3335        let arm_t = DagArmSeed::new().then(double, reg);
3336        let arm_f = DagArmSeed::new().then(triple, reg);
3337
3338        let mut dag = DagBuilder::<u32>::new()
3339            .root(root, reg)
3340            .route(|v: &u64| *v > 10, reg, arm_t, arm_f)
3341            .then(sink, reg)
3342            .build();
3343
3344        dag.run(&mut world, 5u32); // 5 <= 10 → false arm → triple → 15
3345        assert_eq!(*world.resource::<u64>(), 15);
3346    }
3347
3348    #[test]
3349    fn dag_route_nested() {
3350        fn root(x: u32) -> u64 {
3351            x as u64
3352        }
3353        fn pass(val: &u64) -> u64 {
3354            *val
3355        }
3356        fn add_100(val: &u64) -> u64 {
3357            *val + 100
3358        }
3359        fn add_200(val: &u64) -> u64 {
3360            *val + 200
3361        }
3362        fn add_300(val: &u64) -> u64 {
3363            *val + 300
3364        }
3365        fn sink(mut out: ResMut<u64>, val: &u64) {
3366            *out = *val;
3367        }
3368        let mut wb = WorldBuilder::new();
3369        wb.register::<u64>(0);
3370        let mut world = wb.build();
3371        let reg = world.registry();
3372
3373        // N-ary via nesting: <5 → +100, 5..10 → +200, >=10 → +300
3374        let inner_t = DagArmSeed::new().then(add_200, reg);
3375        let inner_f = DagArmSeed::new().then(add_300, reg);
3376        let outer_t = DagArmSeed::new().then(add_100, reg);
3377        let outer_f =
3378            DagArmSeed::new()
3379                .then(pass, reg)
3380                .route(|v: &u64| *v < 10, reg, inner_t, inner_f);
3381
3382        let mut dag = DagBuilder::<u32>::new()
3383            .root(root, reg)
3384            .route(|v: &u64| *v < 5, reg, outer_t, outer_f)
3385            .then(sink, reg)
3386            .build();
3387
3388        dag.run(&mut world, 3u32); // 3 < 5 → +100 → 103
3389        assert_eq!(*world.resource::<u64>(), 103);
3390
3391        dag.run(&mut world, 7u32); // 7 >= 5, 7 < 10 → +200 → 207
3392        assert_eq!(*world.resource::<u64>(), 207);
3393
3394        dag.run(&mut world, 15u32); // 15 >= 5, 15 >= 10 → +300 → 315
3395        assert_eq!(*world.resource::<u64>(), 315);
3396    }
3397
3398    // -- Tee combinator --
3399
3400    #[test]
3401    fn dag_tee_side_effect_chain() {
3402        fn root(x: u32) -> u64 {
3403            x as u64 * 2
3404        }
3405        fn log_step(mut counter: ResMut<u32>, _val: &u64) {
3406            *counter += 1;
3407        }
3408        fn sink(mut out: ResMut<u64>, val: &u64) {
3409            *out = *val;
3410        }
3411        let mut wb = WorldBuilder::new();
3412        wb.register::<u64>(0);
3413        wb.register::<u32>(0);
3414        let mut world = wb.build();
3415        let reg = world.registry();
3416
3417        let side = DagArmSeed::new().then(log_step, reg);
3418
3419        let mut dag = DagBuilder::<u32>::new()
3420            .root(root, reg)
3421            .tee(side)
3422            .then(sink, reg)
3423            .build();
3424
3425        dag.run(&mut world, 5u32);
3426        assert_eq!(*world.resource::<u64>(), 10); // value passed through
3427        assert_eq!(*world.resource::<u32>(), 1); // side-effect fired
3428
3429        dag.run(&mut world, 7u32);
3430        assert_eq!(*world.resource::<u64>(), 14);
3431        assert_eq!(*world.resource::<u32>(), 2); // fired again
3432    }
3433
3434    // -- Dedup combinator --
3435
3436    #[test]
3437    fn dag_dedup_suppresses_unchanged() {
3438        fn root(x: u32) -> u64 {
3439            x as u64 / 2 // intentional integer division: 4→2, 5→2
3440        }
3441        fn sink(mut out: ResMut<u32>, val: &Option<u64>) {
3442            if val.is_some() {
3443                *out += 1;
3444            }
3445        }
3446        let mut wb = WorldBuilder::new();
3447        wb.register::<u32>(0);
3448        let mut world = wb.build();
3449        let reg = world.registry();
3450
3451        let mut dag = DagBuilder::<u32>::new()
3452            .root(root, reg)
3453            .dedup()
3454            .then(sink, reg)
3455            .build();
3456
3457        dag.run(&mut world, 4u32); // 2 — first, Some
3458        assert_eq!(*world.resource::<u32>(), 1);
3459
3460        dag.run(&mut world, 5u32); // 2 — same, None
3461        assert_eq!(*world.resource::<u32>(), 1);
3462
3463        dag.run(&mut world, 6u32); // 3 — changed, Some
3464        assert_eq!(*world.resource::<u32>(), 2);
3465    }
3466
3467    // -- Bool combinators --
3468
3469    #[test]
3470    fn dag_not() {
3471        fn root(x: u32) -> bool {
3472            x > 5
3473        }
3474        fn sink(mut out: ResMut<bool>, val: &bool) {
3475            *out = *val;
3476        }
3477        let mut wb = WorldBuilder::new();
3478        wb.register::<bool>(false);
3479        let mut world = wb.build();
3480        let reg = world.registry();
3481
3482        let mut dag = DagBuilder::<u32>::new()
3483            .root(root, reg)
3484            .not()
3485            .then(sink, reg)
3486            .build();
3487
3488        dag.run(&mut world, 3u32); // 3 > 5 = false, not = true
3489        assert!(*world.resource::<bool>());
3490
3491        dag.run(&mut world, 10u32); // 10 > 5 = true, not = false
3492        assert!(!*world.resource::<bool>());
3493    }
3494
3495    #[test]
3496    fn dag_and() {
3497        fn root(x: u32) -> bool {
3498            x > 5
3499        }
3500        fn sink(mut out: ResMut<bool>, val: &bool) {
3501            *out = *val;
3502        }
3503        let mut wb = WorldBuilder::new();
3504        wb.register::<bool>(true); // "market open" flag
3505        let mut world = wb.build();
3506        let reg = world.registry();
3507
3508        let mut dag = DagBuilder::<u32>::new()
3509            .root(root, reg)
3510            .and(|w: &mut World| *w.resource::<bool>(), reg)
3511            .then(sink, reg)
3512            .build();
3513
3514        dag.run(&mut world, 10u32); // true && true = true
3515        assert!(*world.resource::<bool>());
3516
3517        *world.resource_mut::<bool>() = false; // close market
3518        dag.run(&mut world, 10u32); // true && false = false
3519        assert!(!*world.resource::<bool>());
3520    }
3521
3522    #[test]
3523    fn dag_or() {
3524        fn root(x: u32) -> bool {
3525            x > 5
3526        }
3527        fn sink(mut out: ResMut<bool>, val: &bool) {
3528            *out = *val;
3529        }
3530        let mut wb = WorldBuilder::new();
3531        wb.register::<bool>(false);
3532        let mut world = wb.build();
3533        let reg = world.registry();
3534
3535        let mut dag = DagBuilder::<u32>::new()
3536            .root(root, reg)
3537            .or(|w: &mut World| *w.resource::<bool>(), reg)
3538            .then(sink, reg)
3539            .build();
3540
3541        dag.run(&mut world, 3u32); // false || false = false
3542        assert!(!*world.resource::<bool>());
3543
3544        *world.resource_mut::<bool>() = true;
3545        dag.run(&mut world, 3u32); // false || true = true
3546        assert!(*world.resource::<bool>());
3547    }
3548
3549    #[test]
3550    fn dag_xor() {
3551        fn root(x: u32) -> bool {
3552            x > 5
3553        }
3554        fn sink(mut out: ResMut<bool>, val: &bool) {
3555            *out = *val;
3556        }
3557        let mut wb = WorldBuilder::new();
3558        wb.register::<bool>(true);
3559        let mut world = wb.build();
3560        let reg = world.registry();
3561
3562        let mut dag = DagBuilder::<u32>::new()
3563            .root(root, reg)
3564            .xor(|w: &mut World| *w.resource::<bool>(), reg)
3565            .then(sink, reg)
3566            .build();
3567
3568        dag.run(&mut world, 10u32); // true ^ true = false
3569        assert!(!*world.resource::<bool>());
3570    }
3571
3572    // =========================================================================
3573    // Splat — tuple destructuring
3574    // =========================================================================
3575
3576    #[test]
3577    fn dag_splat2_on_chain() {
3578        let mut wb = WorldBuilder::new();
3579        wb.register::<u64>(0);
3580        let mut world = wb.build();
3581        let reg = world.registry();
3582
3583        fn split(x: u32) -> (u32, u32) {
3584            (x, x * 2)
3585        }
3586        fn store(mut out: ResMut<u64>, a: &u32, b: &u32) {
3587            *out = *a as u64 + *b as u64;
3588        }
3589
3590        let mut dag = DagBuilder::<u32>::new()
3591            .root(split, reg)
3592            .splat()
3593            .then(store, reg)
3594            .build();
3595
3596        dag.run(&mut world, 5u32);
3597        assert_eq!(*world.resource::<u64>(), 15); // 5 + 10
3598    }
3599
3600    #[test]
3601    fn dag_splat3_on_chain() {
3602        let mut wb = WorldBuilder::new();
3603        wb.register::<u64>(0);
3604        let mut world = wb.build();
3605        let reg = world.registry();
3606
3607        fn split3(x: u32) -> (u32, u32, u32) {
3608            (x, x + 1, x + 2)
3609        }
3610        fn sum3(a: &u32, b: &u32, c: &u32) -> u64 {
3611            *a as u64 + *b as u64 + *c as u64
3612        }
3613        fn store(mut out: ResMut<u64>, val: &u64) {
3614            *out = *val;
3615        }
3616
3617        let mut dag = DagBuilder::<u32>::new()
3618            .root(split3, reg)
3619            .splat()
3620            .then(sum3, reg)
3621            .then(store, reg)
3622            .build();
3623
3624        dag.run(&mut world, 10u32);
3625        assert_eq!(*world.resource::<u64>(), 33); // 10+11+12
3626    }
3627
3628    #[test]
3629    fn dag_splat2_with_param() {
3630        let mut wb = WorldBuilder::new();
3631        wb.register::<u64>(100);
3632        let mut world = wb.build();
3633        let reg = world.registry();
3634
3635        fn split(x: u32) -> (u32, u32) {
3636            (x, x * 3)
3637        }
3638        fn add_base(base: Res<u64>, a: &u32, b: &u32) -> u64 {
3639            *base + *a as u64 + *b as u64
3640        }
3641        fn store(mut out: ResMut<u64>, val: &u64) {
3642            *out = *val;
3643        }
3644
3645        let mut dag = DagBuilder::<u32>::new()
3646            .root(split, reg)
3647            .splat()
3648            .then(add_base, reg)
3649            .then(store, reg)
3650            .build();
3651
3652        dag.run(&mut world, 5u32);
3653        assert_eq!(*world.resource::<u64>(), 120); // 100 + 5 + 15
3654    }
3655
3656    #[test]
3657    fn dag_splat_on_arm_start() {
3658        let mut wb = WorldBuilder::new();
3659        wb.register::<u64>(0);
3660        let mut world = wb.build();
3661        let reg = world.registry();
3662
3663        fn split(x: u32) -> (u32, u32) {
3664            (x, x + 10)
3665        }
3666        fn sum2(a: &u32, b: &u32) -> u64 {
3667            *a as u64 + *b as u64
3668        }
3669        fn identity(x: &(u32, u32)) -> u64 {
3670            x.0 as u64 * x.1 as u64
3671        }
3672        fn merge_add(a: &u64, b: &u64) -> u64 {
3673            *a + *b
3674        }
3675        fn store(mut out: ResMut<u64>, val: &u64) {
3676            *out = *val;
3677        }
3678
3679        let mut dag = DagBuilder::<u32>::new()
3680            .root(split, reg)
3681            .fork()
3682            .arm(|a| a.splat().then(sum2, reg))
3683            .arm(|b| b.then(identity, reg))
3684            .merge(merge_add, reg)
3685            .then(store, reg)
3686            .build();
3687
3688        dag.run(&mut world, 5u32);
3689        // arm_a: splat (5, 15) → sum2 = 20
3690        // arm_b: identity (5, 15) → 75
3691        // merge: 20 + 75 = 95
3692        assert_eq!(*world.resource::<u64>(), 95);
3693    }
3694
3695    #[test]
3696    fn dag_splat_on_arm() {
3697        let mut wb = WorldBuilder::new();
3698        wb.register::<u64>(0);
3699        let mut world = wb.build();
3700        let reg = world.registry();
3701
3702        fn root_id(x: u32) -> u32 {
3703            x
3704        }
3705        fn make_pair(val: &u32) -> (u32, u32) {
3706            (*val, *val + 100)
3707        }
3708        fn sum2(a: &u32, b: &u32) -> u64 {
3709            *a as u64 + *b as u64
3710        }
3711        fn double(val: &u32) -> u64 {
3712            *val as u64 * 2
3713        }
3714        fn merge_add(a: &u64, b: &u64) -> u64 {
3715            *a + *b
3716        }
3717        fn store(mut out: ResMut<u64>, val: &u64) {
3718            *out = *val;
3719        }
3720
3721        let mut dag = DagBuilder::<u32>::new()
3722            .root(root_id, reg)
3723            .fork()
3724            .arm(|a| a.then(make_pair, reg).splat().then(sum2, reg))
3725            .arm(|b| b.then(double, reg))
3726            .merge(merge_add, reg)
3727            .then(store, reg)
3728            .build();
3729
3730        dag.run(&mut world, 7u32);
3731        // arm_a: make_pair(7) = (7, 107), splat → sum2 = 114
3732        // arm_b: double(7) = 14
3733        // merge: 114 + 14 = 128
3734        assert_eq!(*world.resource::<u64>(), 128);
3735    }
3736
3737    #[test]
3738    fn dag_splat4_on_chain() {
3739        let mut wb = WorldBuilder::new();
3740        wb.register::<u64>(0);
3741        let mut world = wb.build();
3742        let reg = world.registry();
3743
3744        fn split4(x: u32) -> (u32, u32, u32, u32) {
3745            (x, x + 1, x + 2, x + 3)
3746        }
3747        fn sum4(a: &u32, b: &u32, c: &u32, d: &u32) -> u64 {
3748            (*a + *b + *c + *d) as u64
3749        }
3750        fn store(mut out: ResMut<u64>, val: &u64) {
3751            *out = *val;
3752        }
3753
3754        let mut dag = DagBuilder::<u32>::new()
3755            .root(split4, reg)
3756            .splat()
3757            .then(sum4, reg)
3758            .then(store, reg)
3759            .build();
3760
3761        dag.run(&mut world, 10u32);
3762        assert_eq!(*world.resource::<u64>(), 46); // 10+11+12+13
3763    }
3764
3765    #[test]
3766    fn dag_splat5_on_chain() {
3767        let mut wb = WorldBuilder::new();
3768        wb.register::<u64>(0);
3769        let mut world = wb.build();
3770        let reg = world.registry();
3771
3772        fn split5(x: u32) -> (u8, u8, u8, u8, u8) {
3773            let x = x as u8;
3774            (x, x + 1, x + 2, x + 3, x + 4)
3775        }
3776        #[allow(clippy::many_single_char_names)]
3777        fn sum5(a: &u8, b: &u8, c: &u8, d: &u8, e: &u8) -> u64 {
3778            (*a as u64) + (*b as u64) + (*c as u64) + (*d as u64) + (*e as u64)
3779        }
3780        fn store(mut out: ResMut<u64>, val: &u64) {
3781            *out = *val;
3782        }
3783
3784        let mut dag = DagBuilder::<u32>::new()
3785            .root(split5, reg)
3786            .splat()
3787            .then(sum5, reg)
3788            .then(store, reg)
3789            .build();
3790
3791        dag.run(&mut world, 1u32);
3792        assert_eq!(*world.resource::<u64>(), 15); // 1+2+3+4+5
3793    }
3794
3795    #[test]
3796    fn dag_splat_boxable() {
3797        let mut wb = WorldBuilder::new();
3798        wb.register::<u64>(0);
3799        let mut world = wb.build();
3800        let reg = world.registry();
3801
3802        fn split(x: u32) -> (u32, u32) {
3803            (x, x * 2)
3804        }
3805        fn store(mut out: ResMut<u64>, a: &u32, b: &u32) {
3806            *out = *a as u64 + *b as u64;
3807        }
3808
3809        let dag = DagBuilder::<u32>::new()
3810            .root(split, reg)
3811            .splat()
3812            .then(store, reg)
3813            .build();
3814
3815        let mut boxed: Virtual<u32> = Box::new(dag);
3816        boxed.run(&mut world, 5u32);
3817        assert_eq!(*world.resource::<u64>(), 15);
3818    }
3819
3820    // -- Batch DAG --
3821
3822    #[test]
3823    fn batch_dag_basic() {
3824        let mut wb = WorldBuilder::new();
3825        wb.register::<u64>(0);
3826        let mut world = wb.build();
3827        let reg = world.registry();
3828
3829        fn double(x: u32) -> u64 {
3830            x as u64 * 2
3831        }
3832        fn store(mut out: ResMut<u64>, val: &u64) {
3833            *out += *val;
3834        }
3835
3836        let mut batch = DagBuilder::<u32>::new()
3837            .root(double, reg)
3838            .then(store, reg)
3839            .build_batch(8);
3840
3841        batch.input_mut().extend([1, 2, 3]);
3842        batch.run(&mut world);
3843
3844        assert_eq!(*world.resource::<u64>(), 12); // 2 + 4 + 6
3845        assert!(batch.input().is_empty());
3846    }
3847
3848    #[test]
3849    fn batch_dag_option_terminal() {
3850        let mut wb = WorldBuilder::new();
3851        wb.register::<u64>(0);
3852        let mut world = wb.build();
3853        let reg = world.registry();
3854
3855        fn double(x: u32) -> u64 {
3856            x as u64 * 2
3857        }
3858        fn store(mut out: ResMut<u64>, val: &u64) {
3859            *out += *val;
3860        }
3861
3862        let mut batch = DagBuilder::<u32>::new()
3863            .root(double, reg)
3864            .guard(|val: &u64| *val > 5, reg)
3865            .map(store, reg)
3866            .unwrap_or(())
3867            .build_batch(8);
3868
3869        batch.input_mut().extend([1, 2, 3, 4, 5]);
3870        batch.run(&mut world);
3871
3872        // double: 2, 4, 6, 8, 10
3873        // guard keeps > 5: 6, 8, 10
3874        assert_eq!(*world.resource::<u64>(), 24); // 6 + 8 + 10
3875    }
3876
3877    #[test]
3878    fn batch_dag_buffer_reuse() {
3879        let mut wb = WorldBuilder::new();
3880        wb.register::<u64>(0);
3881        let mut world = wb.build();
3882        let reg = world.registry();
3883
3884        fn double(x: u32) -> u64 {
3885            x as u64 * 2
3886        }
3887        fn store(mut out: ResMut<u64>, val: &u64) {
3888            *out += *val;
3889        }
3890
3891        let mut batch = DagBuilder::<u32>::new()
3892            .root(double, reg)
3893            .then(store, reg)
3894            .build_batch(8);
3895
3896        batch.input_mut().extend([1, 2]);
3897        batch.run(&mut world);
3898        assert_eq!(*world.resource::<u64>(), 6); // 2 + 4
3899        assert!(batch.input().is_empty());
3900
3901        batch.input_mut().extend([10, 20]);
3902        batch.run(&mut world);
3903        assert_eq!(*world.resource::<u64>(), 66); // 6 + 20 + 40
3904    }
3905
3906    #[test]
3907    fn batch_dag_retains_allocation() {
3908        let mut world = WorldBuilder::new().build();
3909        let reg = world.registry();
3910
3911        fn noop(_x: u32) {}
3912
3913        let mut batch = DagBuilder::<u32>::new().root(noop, reg).build_batch(64);
3914
3915        batch.input_mut().extend([1, 2, 3]);
3916        batch.run(&mut world);
3917
3918        assert!(batch.input().is_empty());
3919        assert!(batch.input_mut().capacity() >= 64);
3920    }
3921
3922    #[test]
3923    fn batch_dag_empty_is_noop() {
3924        let mut wb = WorldBuilder::new();
3925        wb.register::<u64>(0);
3926        let mut world = wb.build();
3927        let reg = world.registry();
3928
3929        fn double(x: u32) -> u64 {
3930            x as u64 * 2
3931        }
3932        fn store(mut out: ResMut<u64>, val: &u64) {
3933            *out += *val;
3934        }
3935
3936        let mut batch = DagBuilder::<u32>::new()
3937            .root(double, reg)
3938            .then(store, reg)
3939            .build_batch(8);
3940
3941        batch.run(&mut world);
3942        assert_eq!(*world.resource::<u64>(), 0);
3943    }
3944
3945    #[test]
3946    fn batch_dag_with_splat() {
3947        let mut wb = WorldBuilder::new();
3948        wb.register::<u64>(0);
3949        let mut world = wb.build();
3950        let reg = world.registry();
3951
3952        fn split(x: u32) -> (u64, u64) {
3953            (x as u64, x as u64 * 10)
3954        }
3955        fn combine(a: &u64, b: &u64) -> u64 {
3956            *a + *b
3957        }
3958        fn store(mut out: ResMut<u64>, val: &u64) {
3959            *out += *val;
3960        }
3961
3962        let mut batch = DagBuilder::<u32>::new()
3963            .root(split, reg)
3964            .splat()
3965            .then(combine, reg)
3966            .then(store, reg)
3967            .build_batch(4);
3968
3969        batch.input_mut().extend([1, 2]);
3970        batch.run(&mut world);
3971
3972        // 1 → (1, 10) → 11, 2 → (2, 20) → 22
3973        assert_eq!(*world.resource::<u64>(), 33); // 11 + 22
3974    }
3975
3976    // -- Conditional then (formerly switch) --
3977
3978    #[test]
3979    fn dag_then_conditional_basic() {
3980        fn root(x: u32) -> u64 {
3981            x as u64
3982        }
3983        fn sink(mut out: ResMut<u64>, val: &u64) {
3984            *out = *val;
3985        }
3986
3987        let mut wb = WorldBuilder::new();
3988        wb.register::<u64>(0);
3989        let mut world = wb.build();
3990        let reg = world.registry();
3991
3992        let mut dag = DagBuilder::<u32>::new()
3993            .root(root, reg)
3994            .then(|val: &u64| if *val > 5 { *val * 10 } else { *val + 1 }, reg)
3995            .then(sink, reg)
3996            .build();
3997
3998        dag.run(&mut world, 10u32); // 10 > 5 → 100
3999        assert_eq!(*world.resource::<u64>(), 100);
4000
4001        dag.run(&mut world, 3u32); // 3 <= 5 → 4
4002        assert_eq!(*world.resource::<u64>(), 4);
4003    }
4004
4005    #[test]
4006    fn dag_then_conditional_3_way() {
4007        fn root(x: u32) -> u32 {
4008            x
4009        }
4010        fn sink(mut out: ResMut<u64>, val: &u64) {
4011            *out = *val;
4012        }
4013
4014        let mut wb = WorldBuilder::new();
4015        wb.register::<u64>(0);
4016        let mut world = wb.build();
4017        let reg = world.registry();
4018
4019        let mut dag = DagBuilder::<u32>::new()
4020            .root(root, reg)
4021            .then(
4022                |val: &u32| match *val % 3 {
4023                    0 => *val as u64 + 100,
4024                    1 => *val as u64 + 200,
4025                    _ => *val as u64 + 300,
4026                },
4027                reg,
4028            )
4029            .then(sink, reg)
4030            .build();
4031
4032        dag.run(&mut world, 6u32); // 6 % 3 == 0 → 106
4033        assert_eq!(*world.resource::<u64>(), 106);
4034
4035        dag.run(&mut world, 7u32); // 7 % 3 == 1 → 207
4036        assert_eq!(*world.resource::<u64>(), 207);
4037
4038        dag.run(&mut world, 8u32); // 8 % 3 == 2 → 308
4039        assert_eq!(*world.resource::<u64>(), 308);
4040    }
4041
4042    #[test]
4043    fn dag_then_with_resolve_arm() {
4044        fn root(x: u32) -> u32 {
4045            x
4046        }
4047        fn double(val: &u32) -> u64 {
4048            *val as u64 * 2
4049        }
4050        fn triple(val: &u32) -> u64 {
4051            *val as u64 * 3
4052        }
4053        fn sink(mut out: ResMut<u64>, val: &u64) {
4054            *out = *val;
4055        }
4056
4057        let mut wb = WorldBuilder::new();
4058        wb.register::<u64>(0);
4059        let mut world = wb.build();
4060        let reg = world.registry();
4061
4062        let mut arm_even = resolve_arm(double, reg);
4063        let mut arm_odd = resolve_arm(triple, reg);
4064
4065        let mut dag = DagBuilder::<u32>::new()
4066            .root(root, reg)
4067            .then(
4068                move |world: &mut World, val: &u32| {
4069                    if *val % 2 == 0 {
4070                        arm_even(world, val)
4071                    } else {
4072                        arm_odd(world, val)
4073                    }
4074                },
4075                reg,
4076            )
4077            .then(sink, reg)
4078            .build();
4079
4080        dag.run(&mut world, 4u32); // even → double → 8
4081        assert_eq!(*world.resource::<u64>(), 8);
4082
4083        dag.run(&mut world, 5u32); // odd → triple → 15
4084        assert_eq!(*world.resource::<u64>(), 15);
4085    }
4086
4087    #[test]
4088    fn dag_resolve_arm_with_params() {
4089        fn root(x: u32) -> u32 {
4090            x
4091        }
4092        fn add_offset(offset: Res<i64>, val: &u32) -> u64 {
4093            (*offset + *val as i64) as u64
4094        }
4095        fn plain_double(val: &u32) -> u64 {
4096            *val as u64 * 2
4097        }
4098        fn sink(mut out: ResMut<u64>, val: &u64) {
4099            *out = *val;
4100        }
4101
4102        let mut wb = WorldBuilder::new();
4103        wb.register::<u64>(0);
4104        wb.register::<i64>(100);
4105        let mut world = wb.build();
4106        let reg = world.registry();
4107
4108        // Each arm resolves different params
4109        let mut arm_offset = resolve_arm(add_offset, reg);
4110        let mut arm_double = resolve_arm(plain_double, reg);
4111
4112        let mut dag = DagBuilder::<u32>::new()
4113            .root(root, reg)
4114            .then(
4115                move |world: &mut World, val: &u32| {
4116                    if *val > 10 {
4117                        arm_offset(world, val)
4118                    } else {
4119                        arm_double(world, val)
4120                    }
4121                },
4122                reg,
4123            )
4124            .then(sink, reg)
4125            .build();
4126
4127        dag.run(&mut world, 20u32); // > 10 → add_offset → 100 + 20 = 120
4128        assert_eq!(*world.resource::<u64>(), 120);
4129
4130        dag.run(&mut world, 5u32); // <= 10 → double → 10
4131        assert_eq!(*world.resource::<u64>(), 10);
4132    }
4133
4134    #[test]
4135    fn dag_then_conditional_in_fork_arm() {
4136        fn root(x: u32) -> u32 {
4137            x
4138        }
4139        fn pass(val: &u32) -> u32 {
4140            *val
4141        }
4142        fn sink_u64(mut out: ResMut<u64>, val: &u64) {
4143            *out = *val;
4144        }
4145        fn sink_i64(mut out: ResMut<i64>, val: &u32) {
4146            *out = -(*val as i64);
4147        }
4148
4149        let mut wb = WorldBuilder::new();
4150        wb.register::<u64>(0);
4151        wb.register::<i64>(0);
4152        let mut world = wb.build();
4153        let reg = world.registry();
4154
4155        let mut dag = DagBuilder::<u32>::new()
4156            .root(root, reg)
4157            .fork()
4158            .arm(|a| {
4159                a.then(pass, reg)
4160                    .then(
4161                        |val: &u32| {
4162                            if *val > 5 {
4163                                *val as u64 * 10
4164                            } else {
4165                                *val as u64
4166                            }
4167                        },
4168                        reg,
4169                    )
4170                    .then(sink_u64, reg)
4171            })
4172            .arm(|a| a.then(sink_i64, reg))
4173            .join()
4174            .build();
4175
4176        dag.run(&mut world, 10u32); // arm0: 10 > 5 → 100, arm1: -10
4177        assert_eq!(*world.resource::<u64>(), 100);
4178        assert_eq!(*world.resource::<i64>(), -10);
4179
4180        dag.run(&mut world, 3u32); // arm0: 3 <= 5 → 3, arm1: -3
4181        assert_eq!(*world.resource::<u64>(), 3);
4182        assert_eq!(*world.resource::<i64>(), -3);
4183    }
4184
4185    #[test]
4186    fn batch_dag_then_conditional() {
4187        fn root(x: u32) -> u32 {
4188            x
4189        }
4190        fn sink(mut out: ResMut<u64>, val: &u64) {
4191            *out += *val;
4192        }
4193
4194        let mut wb = WorldBuilder::new();
4195        wb.register::<u64>(0);
4196        let mut world = wb.build();
4197        let reg = world.registry();
4198
4199        let mut batch = DagBuilder::<u32>::new()
4200            .root(root, reg)
4201            .then(
4202                |val: &u32| {
4203                    if *val % 2 == 0 {
4204                        *val as u64 * 10
4205                    } else {
4206                        *val as u64
4207                    }
4208                },
4209                reg,
4210            )
4211            .then(sink, reg)
4212            .build_batch(8);
4213
4214        batch.input_mut().extend([1, 2, 3, 4]);
4215        batch.run(&mut world);
4216
4217        // 1 → 1, 2 → 20, 3 → 3, 4 → 40 = 64
4218        assert_eq!(*world.resource::<u64>(), 64);
4219    }
4220
4221    // =========================================================================
4222    // Scan combinator (DAG)
4223    // =========================================================================
4224
4225    #[test]
4226    fn dag_scan_arity0_closure() {
4227        let mut wb = WorldBuilder::new();
4228        wb.register::<u64>(0);
4229        let mut world = wb.build();
4230        let reg = world.registry();
4231
4232        fn store(mut out: ResMut<u64>, val: &u64) {
4233            *out = *val;
4234        }
4235
4236        let mut dag = DagBuilder::<u64>::new()
4237            .root(|x: u64| x, reg)
4238            .scan(
4239                0u64,
4240                |acc: &mut u64, val: &u64| {
4241                    *acc += val;
4242                    *acc
4243                },
4244                reg,
4245            )
4246            .then(store, reg)
4247            .build();
4248
4249        dag.run(&mut world, 10);
4250        assert_eq!(*world.resource::<u64>(), 10);
4251        dag.run(&mut world, 20);
4252        assert_eq!(*world.resource::<u64>(), 30);
4253        dag.run(&mut world, 5);
4254        assert_eq!(*world.resource::<u64>(), 35);
4255    }
4256
4257    #[test]
4258    fn dag_scan_named_fn_with_param() {
4259        let mut wb = WorldBuilder::new();
4260        wb.register::<u64>(100);
4261        wb.register::<String>(String::new());
4262        let mut world = wb.build();
4263        let reg = world.registry();
4264
4265        fn threshold(limit: Res<u64>, acc: &mut u64, val: &u64) -> Option<u64> {
4266            *acc += val;
4267            if *acc > *limit { Some(*acc) } else { None }
4268        }
4269        fn store_opt(mut out: ResMut<String>, val: &Option<u64>) {
4270            *out = val
4271                .as_ref()
4272                .map_or_else(|| "below".into(), |v| format!("hit:{v}"));
4273        }
4274
4275        let mut dag = DagBuilder::<u64>::new()
4276            .root(|x: u64| x, reg)
4277            .scan(0u64, threshold, reg)
4278            .then(store_opt, reg)
4279            .build();
4280
4281        dag.run(&mut world, 50);
4282        assert_eq!(world.resource::<String>().as_str(), "below");
4283        dag.run(&mut world, 60);
4284        assert_eq!(world.resource::<String>().as_str(), "hit:110");
4285    }
4286
4287    #[test]
4288    fn dag_arm_scan() {
4289        let mut wb = WorldBuilder::new();
4290        wb.register::<u64>(0);
4291        let mut world = wb.build();
4292        let reg = world.registry();
4293
4294        fn store(mut out: ResMut<u64>, val: &u64) {
4295            *out = *val;
4296        }
4297
4298        let scan_arm = DagArmSeed::<u64>::new()
4299            .then(|v: &u64| *v, reg)
4300            .scan(
4301                0u64,
4302                |acc: &mut u64, val: &u64| {
4303                    *acc += val;
4304                    *acc
4305                },
4306                reg,
4307            )
4308            .then(store, reg);
4309
4310        let pass_arm = DagArmSeed::<u64>::new().then(|_: &u64| {}, reg);
4311
4312        let mut dag = DagBuilder::<u64>::new()
4313            .root(|x: u64| x, reg)
4314            .fork()
4315            .arm(|_| scan_arm)
4316            .arm(|_| pass_arm)
4317            .merge(|(): &(), (): &()| {}, reg)
4318            .build();
4319
4320        dag.run(&mut world, 10);
4321        assert_eq!(*world.resource::<u64>(), 10);
4322        dag.run(&mut world, 20);
4323        assert_eq!(*world.resource::<u64>(), 30);
4324    }
4325
4326    // =========================================================================
4327    // Build — Option<()> terminal
4328    // =========================================================================
4329
4330    #[test]
4331    fn build_option_unit_terminal() {
4332        let mut wb = WorldBuilder::new();
4333        wb.register::<u64>(0);
4334        let mut world = wb.build();
4335        let reg = world.registry();
4336
4337        // root takes by value (IntoStep), then .guard() produces Option
4338        fn check(x: u32) -> u64 {
4339            x as u64
4340        }
4341        fn store(mut out: ResMut<u64>, val: &u64) {
4342            *out += *val;
4343        }
4344
4345        // guard → Option<u64>, map(store) → Option<()>, build() should work
4346        let mut dag = DagBuilder::<u32>::new()
4347            .root(check, reg)
4348            .guard(|val: &u64| *val > 5, reg)
4349            .map(store, reg)
4350            .build();
4351
4352        dag.run(&mut world, 3); // guard filters → None
4353        assert_eq!(*world.resource::<u64>(), 0);
4354        dag.run(&mut world, 7); // passes guard → stores 7
4355        assert_eq!(*world.resource::<u64>(), 7);
4356    }
4357
4358    // =========================================================================
4359    // Build — borrowed event type
4360    // =========================================================================
4361
4362    #[test]
4363    fn build_borrowed_event_direct() {
4364        let mut wb = WorldBuilder::new();
4365        wb.register::<u64>(0);
4366        let mut world = wb.build();
4367
4368        fn decode(msg: &[u8]) -> u64 {
4369            msg.len() as u64
4370        }
4371        fn store(mut out: ResMut<u64>, val: &u64) {
4372            *out = *val;
4373        }
4374
4375        // msg declared before dag so it outlives the DAG (drop order).
4376        let msg = vec![1u8, 2, 3];
4377        let reg = world.registry();
4378        let mut dag = DagBuilder::<&[u8]>::new()
4379            .root(decode, reg)
4380            .then(store, reg)
4381            .build();
4382
4383        dag.run(&mut world, &msg);
4384        assert_eq!(*world.resource::<u64>(), 3);
4385    }
4386}