Skip to main content

nexus_rt/
dag.rs

1// Builder return types use complex generics for compile-time edge validation.
2#![allow(clippy::type_complexity)]
3
4//! DAG pipeline — monomorphized data-flow graphs with fan-out and merge.
5//!
6//! [`DagBuilder`] begins a typed DAG that encodes topology in the type system.
7//! After monomorphization, the entire DAG is a single flat function with
8//! all values as stack locals — no arena, no vtable dispatch. The only
9//! `unsafe` is in the shared [`Param::fetch`](crate::Param) path
10//! (resource access by pre-resolved index).
11//!
12//! Nodes receive their input **by reference** — fan-out is free (multiple
13//! arms borrow the same stack local). Nodes produce owned output values
14//! passed to the next step.
15//!
16//! # When to use
17//!
18//! Use DAG pipelines when data needs to fan out to multiple arms and
19//! merge back. For linear chains, prefer [`PipelineBuilder`](crate::PipelineBuilder).
20//! For dynamic fan-out by reference, use [`FanOut`](crate::FanOut) or
21//! [`Broadcast`](crate::Broadcast).
22//!
23//! # Flow control
24//!
25//! Option and Result combinators (`.guard()`, `.map()`, `.and_then()`,
26//! `.filter()`, `.catch()`, etc.) work on both the main chain and
27//! within arms.
28//!
29//! **Within an arm**, `None` / `Err` short-circuits the remaining steps
30//! in **that arm only**. Sibling arms execute unconditionally. The merge
31//! step receives whatever each arm produced (including `None`).
32//!
33//! `.tap()` observes the value mid-chain without consuming or changing it.
34//!
35//! `.route()` is binary conditional routing — evaluates a predicate and
36//! executes exactly one of two arms. Both arms must produce the same
37//! output type. For N-ary routing, nest `route` calls.
38//!
39//! To skip an entire fork, resolve Option/Result **before** `.fork()`:
40//!
41//! ```ignore
42//! DagBuilder::<RawMsg>::new()
43//!     .root(decode, reg)
44//!     .guard(|msg: &RawMsg| !msg.is_empty(), reg)  // None skips everything below
45//!     .unwrap_or(default)                           // → T, enter fork with concrete type
46//!     .fork()
47//!     // arms work with &T, not &Option<T>
48//! ```
49//!
50//! # Combinator quick reference
51//!
52//! **Topology:** `.root()`, `.then()`, `.fork()`, `.arm()`, `.merge()`,
53//! `.join()`, `.build()`
54//!
55//! **Flow control:** `.guard()`, `.tap()`, `.route()`, `.tee()`, `.scan()`,
56//! `.dedup()`
57//!
58//! **Tuple `(A, B, ...)` (2-5 elements):** `.splat()` (→ splat builder,
59//! call `.then()` with destructured `&T` args)
60//!
61//! **Option:** `.map()`, `.filter()`, `.inspect()`, `.and_then()`,
62//! `.on_none()`, `.ok_or()`, `.unwrap_or()`
63//!
64//! **Result:** `.map()`, `.and_then()`, `.catch()`, `.map_err()`,
65//! `.ok()`, `.unwrap_or()`
66//!
67//! **Bool:** `.not()`, `.and()`, `.or()`, `.xor()`
68//!
69//! **Terminal:** `.dispatch()`, `.cloned()`, `.build()`, `.build_batch(cap)`
70//!
71//! All combinators accepting functions resolve `Param` dependencies at build
72//! time via `IntoStep`, `IntoRefStep`, or `IntoProducer` — named functions
73//! get direct-pointer access. Arity-0 closures work everywhere. Raw
74//! `&mut World` closures are available as an escape hatch via `Opaque`.
75//!
76//! # Splat — tuple destructuring
77//!
78//! When a step returns a tuple, the next step normally receives the
79//! whole tuple as `&(A, B)`. `.splat()` destructures it into individual
80//! reference arguments (`&A, &B`), reusing the existing merge step
81//! infrastructure:
82//!
83//! ```ignore
84//! fn split(t: Tick) -> (f64, u64) { (t.price, t.size) }
85//! fn weighted(price: &f64, size: &u64) -> f64 { *price * *size as f64 }
86//!
87//! DagBuilder::<Tick>::new()
88//!     .root(split, reg)       // Tick → (f64, u64)
89//!     .splat()                // destructure
90//!     .then(weighted, reg)    // (&f64, &u64) → f64
91//!     .build();
92//! ```
93//!
94//! Supported for tuples of 2-5 elements. Beyond 5, define a named
95//! struct — if a combinator stage needs that many arguments, a struct
96//! makes the intent clearer and the code more maintainable.
97//!
98//! # Node signatures
99//!
100//! The root node takes the event by value. All other nodes take their
101//! input by reference:
102//!
103//! ```ignore
104//! // Root: event by value
105//! fn decode(raw: RawMsg) -> DecodedMsg { .. }
106//!
107//! // Regular: input by reference
108//! fn update_ob(msg: &DecodedMsg) { .. }
109//! fn check_risk(config: Res<Config>, msg: &DecodedMsg) -> RiskResult { .. }
110//! ```
111//!
112//! # Examples
113//!
114//! ```
115//! use nexus_rt::{WorldBuilder, ResMut, Handler, Resource};
116//! use nexus_rt::dag::DagBuilder;
117//!
118//! #[derive(Resource)]
119//! struct Accum(u64);
120//!
121//! let mut wb = WorldBuilder::new();
122//! wb.register(Accum(0));
123//! let mut world = wb.build();
124//! let reg = world.registry();
125//!
126//! fn double(x: u32) -> u64 { x as u64 * 2 }
127//! fn store(mut out: ResMut<Accum>, val: &u64) { out.0 = *val; }
128//!
129//! let mut dag = DagBuilder::<u32>::new()
130//!     .root(double, reg)
131//!     .then(store, reg)
132//!     .build();
133//!
134//! dag.run(&mut world, 5u32);
135//! assert_eq!(world.resource::<Accum>().0, 10);
136//! ```
137//!
138//! # Returning DAGs from functions (Rust 2024)
139//!
140//! When a factory function takes `&Registry` and returns `impl Handler<E>`,
141//! Rust 2024 captures the registry borrow in the return type by default.
142//! Use `+ use<...>` to exclude it:
143//!
144//! ```ignore
145//! fn on_tick<C: Config>(
146//!     reg: &Registry,
147//! ) -> impl Handler<Tick> + use<C> {
148//!     DagBuilder::<Tick>::new()
149//!         .root(split::<C>, reg)
150//!         .fork()
151//!         // ...
152//!         .build()
153//! }
154//! ```
155//!
156//! List every type parameter the DAG captures; omit the `&Registry`
157//! lifetime — it's consumed during `.build()`. See the
158//! [crate-level docs](crate#returning-impl-handler-from-functions-rust-2024)
159//! for the full explanation.
160
161use std::marker::PhantomData;
162
163use crate::Handler;
164use crate::pipeline::{
165    AndBoolNode, ChainCall, ClonedNode, ClonedOptionNode, ClonedResultNode, DagAndThenOptionNode,
166    DagAndThenResultNode, DagCatchNode, DagMapOptionNode, DagMapResultNode, DagRouteNode,
167    DagThenNode, DedupNode, DiscardOptionNode, DispatchNode, FilterNode, GuardNode, IdentityNode,
168    InspectErrNode, InspectOptionNode, InspectResultNode, IntoProducer, IntoRefScanStep,
169    IntoRefStep, IntoStep, MapErrNode, NotNode, OkOrElseNode, OkOrNode, OkResultNode, OnNoneNode,
170    OrBoolNode, OrElseNode, RefScanNode, StepCall, TapNode, TeeNode, ThenNode,
171    UnwrapOrElseOptionNode, UnwrapOrElseResultNode, UnwrapOrOptionNode, UnwrapOrResultNode,
172    XorBoolNode,
173};
174use crate::world::{Registry, World};
175
176// =============================================================================
177// MergeStepCall / IntoMergeStep — merge step dispatch
178// =============================================================================
179
180/// Callable trait for resolved merge steps.
181///
182/// Like [`StepCall`] but for merge steps with multiple reference inputs
183/// bundled as `Inputs` (e.g. `(&'a A, &'a B)`).
184#[doc(hidden)]
185pub trait MergeStepCall<Inputs, Out> {
186    /// Call this merge step with a world reference and input references.
187    fn call(&mut self, world: &mut World, inputs: Inputs) -> Out;
188}
189
190/// Converts a named function into a resolved merge step.
191///
192/// Params first, then N reference inputs, returns output:
193///
194/// ```ignore
195/// fn check(config: Res<Config>, ob: &ObResult, risk: &RiskResult) -> Decision { .. }
196/// ```
197#[doc(hidden)]
198#[diagnostic::on_unimplemented(
199    message = "this function cannot be used as a merge step",
200    note = "merge steps take reference tuple inputs from the fork arms",
201    note = "closures with resource parameters are not supported — use a named `fn`"
202)]
203pub trait IntoMergeStep<Inputs, Out, Params> {
204    /// The concrete resolved merge step type.
205    type Step: MergeStepCall<Inputs, Out>;
206
207    /// Resolve Param state from the registry and produce a merge step.
208    fn into_merge_step(self, registry: &Registry) -> Self::Step;
209}
210
211/// Internal: pre-resolved merge step with cached Param state.
212#[doc(hidden)]
213pub struct MergeStep<F, Params: crate::handler::Param> {
214    f: F,
215    state: Params::State,
216    #[allow(dead_code)]
217    name: &'static str,
218}
219
220// -- Merge arity 2 -----------------------------------------------------------
221
222// Param arity 0: closures work.
223impl<A, B, Out, F> MergeStepCall<(&A, &B), Out> for MergeStep<F, ()>
224where
225    F: FnMut(&A, &B) -> Out + 'static,
226{
227    #[inline(always)]
228    fn call(&mut self, _world: &mut World, inputs: (&A, &B)) -> Out {
229        (self.f)(inputs.0, inputs.1)
230    }
231}
232
233impl<A, B, Out, F> IntoMergeStep<(&A, &B), Out, ()> for F
234where
235    F: FnMut(&A, &B) -> Out + 'static,
236{
237    type Step = MergeStep<F, ()>;
238
239    fn into_merge_step(self, registry: &Registry) -> Self::Step {
240        MergeStep {
241            f: self,
242            state: <() as crate::handler::Param>::init(registry),
243            name: std::any::type_name::<F>(),
244        }
245    }
246}
247
248// Param arities 1-8 for merge arity 2.
249macro_rules! impl_merge2_step {
250    ($($P:ident),+) => {
251        impl<A, B, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
252            MergeStepCall<(&A, &B), Out> for MergeStep<F, ($($P,)+)>
253        where
254            for<'a> &'a mut F:
255                FnMut($($P,)+ &A, &B) -> Out +
256                FnMut($($P::Item<'a>,)+ &A, &B) -> Out,
257        {
258            #[inline(always)]
259            #[allow(non_snake_case)]
260            fn call(&mut self, world: &mut World, inputs: (&A, &B)) -> Out {
261                #[allow(clippy::too_many_arguments)]
262                fn call_inner<$($P,)+ IA, IB, Output>(
263                    mut f: impl FnMut($($P,)+ &IA, &IB) -> Output,
264                    $($P: $P,)+
265                    a: &IA, b: &IB,
266                ) -> Output {
267                    f($($P,)+ a, b)
268                }
269                // SAFETY: state was produced by Param::init() on the same Registry
270                // that built this World. Borrows are disjoint — enforced by
271                // conflict detection at build time.
272                #[cfg(debug_assertions)]
273                world.clear_borrows();
274                let ($($P,)+) = unsafe {
275                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
276                };
277                call_inner(&mut self.f, $($P,)+ inputs.0, inputs.1)
278            }
279        }
280
281        impl<A, B, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
282            IntoMergeStep<(&A, &B), Out, ($($P,)+)> for F
283        where
284            for<'a> &'a mut F:
285                FnMut($($P,)+ &A, &B) -> Out +
286                FnMut($($P::Item<'a>,)+ &A, &B) -> Out,
287        {
288            type Step = MergeStep<F, ($($P,)+)>;
289
290            fn into_merge_step(self, registry: &Registry) -> Self::Step {
291                let state = <($($P,)+) as crate::handler::Param>::init(registry);
292                {
293                    #[allow(non_snake_case)]
294                    let ($($P,)+) = &state;
295                    registry.check_access(&[
296                        $((<$P as crate::handler::Param>::resource_id($P),
297                           std::any::type_name::<$P>()),)+
298                    ]);
299                }
300                MergeStep { f: self, state, name: std::any::type_name::<F>() }
301            }
302        }
303    };
304}
305
306// -- Merge arity 3 -----------------------------------------------------------
307
308impl<A, B, C, Out, F> MergeStepCall<(&A, &B, &C), Out> for MergeStep<F, ()>
309where
310    F: FnMut(&A, &B, &C) -> Out + 'static,
311{
312    #[inline(always)]
313    fn call(&mut self, _world: &mut World, inputs: (&A, &B, &C)) -> Out {
314        (self.f)(inputs.0, inputs.1, inputs.2)
315    }
316}
317
318impl<A, B, C, Out, F> IntoMergeStep<(&A, &B, &C), Out, ()> for F
319where
320    F: FnMut(&A, &B, &C) -> Out + 'static,
321{
322    type Step = MergeStep<F, ()>;
323
324    fn into_merge_step(self, registry: &Registry) -> Self::Step {
325        MergeStep {
326            f: self,
327            state: <() as crate::handler::Param>::init(registry),
328            name: std::any::type_name::<F>(),
329        }
330    }
331}
332
333macro_rules! impl_merge3_step {
334    ($($P:ident),+) => {
335        impl<A, B, C, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
336            MergeStepCall<(&A, &B, &C), Out> for MergeStep<F, ($($P,)+)>
337        where
338            for<'a> &'a mut F:
339                FnMut($($P,)+ &A, &B, &C) -> Out +
340                FnMut($($P::Item<'a>,)+ &A, &B, &C) -> Out,
341        {
342            #[inline(always)]
343            #[allow(non_snake_case)]
344            fn call(&mut self, world: &mut World, inputs: (&A, &B, &C)) -> Out {
345                #[allow(clippy::too_many_arguments)]
346                fn call_inner<$($P,)+ IA, IB, IC, Output>(
347                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC) -> Output,
348                    $($P: $P,)+
349                    a: &IA, b: &IB, c: &IC,
350                ) -> Output {
351                    f($($P,)+ a, b, c)
352                }
353                // SAFETY: state was produced by Param::init() on the same Registry
354                // that built this World. Borrows are disjoint — enforced by
355                // conflict detection at build time.
356                #[cfg(debug_assertions)]
357                world.clear_borrows();
358                let ($($P,)+) = unsafe {
359                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
360                };
361                call_inner(&mut self.f, $($P,)+ inputs.0, inputs.1, inputs.2)
362            }
363        }
364
365        impl<A, B, C, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
366            IntoMergeStep<(&A, &B, &C), Out, ($($P,)+)> for F
367        where
368            for<'a> &'a mut F:
369                FnMut($($P,)+ &A, &B, &C) -> Out +
370                FnMut($($P::Item<'a>,)+ &A, &B, &C) -> Out,
371        {
372            type Step = MergeStep<F, ($($P,)+)>;
373
374            fn into_merge_step(self, registry: &Registry) -> Self::Step {
375                let state = <($($P,)+) as crate::handler::Param>::init(registry);
376                {
377                    #[allow(non_snake_case)]
378                    let ($($P,)+) = &state;
379                    registry.check_access(&[
380                        $((<$P as crate::handler::Param>::resource_id($P),
381                           std::any::type_name::<$P>()),)+
382                    ]);
383                }
384                MergeStep { f: self, state, name: std::any::type_name::<F>() }
385            }
386        }
387    };
388}
389
390// -- Merge arity 4 -----------------------------------------------------------
391
392impl<A, B, C, D, Out, F> MergeStepCall<(&A, &B, &C, &D), Out> for MergeStep<F, ()>
393where
394    F: FnMut(&A, &B, &C, &D) -> Out + 'static,
395{
396    #[inline(always)]
397    fn call(&mut self, _world: &mut World, i: (&A, &B, &C, &D)) -> Out {
398        (self.f)(i.0, i.1, i.2, i.3)
399    }
400}
401
402impl<A, B, C, D, Out, F> IntoMergeStep<(&A, &B, &C, &D), Out, ()> for F
403where
404    F: FnMut(&A, &B, &C, &D) -> Out + 'static,
405{
406    type Step = MergeStep<F, ()>;
407    fn into_merge_step(self, registry: &Registry) -> Self::Step {
408        MergeStep {
409            f: self,
410            state: <() as crate::handler::Param>::init(registry),
411            name: std::any::type_name::<F>(),
412        }
413    }
414}
415
416macro_rules! impl_merge4_step {
417    ($($P:ident),+) => {
418        impl<A, B, C, D, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
419            MergeStepCall<(&A, &B, &C, &D), Out> for MergeStep<F, ($($P,)+)>
420        where for<'a> &'a mut F:
421            FnMut($($P,)+ &A, &B, &C, &D) -> Out +
422            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
423        {
424            #[inline(always)]
425            #[allow(non_snake_case)]
426            fn call(&mut self, world: &mut World, i: (&A, &B, &C, &D)) -> Out {
427                #[allow(clippy::too_many_arguments)]
428                fn call_inner<$($P,)+ IA, IB, IC, ID, Output>(
429                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC, &ID) -> Output,
430                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID,
431                ) -> Output { f($($P,)+ a, b, c, d) }
432                // SAFETY: state was produced by Param::init() on the same Registry
433                // that built this World. Borrows are disjoint — enforced by
434                // conflict detection at build time.
435                #[cfg(debug_assertions)]
436                world.clear_borrows();
437                let ($($P,)+) = unsafe {
438                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
439                };
440                call_inner(&mut self.f, $($P,)+ i.0, i.1, i.2, i.3)
441            }
442        }
443        impl<A, B, C, D, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
444            IntoMergeStep<(&A, &B, &C, &D), Out, ($($P,)+)> for F
445        where for<'a> &'a mut F:
446            FnMut($($P,)+ &A, &B, &C, &D) -> Out +
447            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
448        {
449            type Step = MergeStep<F, ($($P,)+)>;
450            fn into_merge_step(self, registry: &Registry) -> Self::Step {
451                let state = <($($P,)+) as crate::handler::Param>::init(registry);
452                { #[allow(non_snake_case)] let ($($P,)+) = &state;
453                  registry.check_access(&[$((<$P as crate::handler::Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
454                MergeStep { f: self, state, name: std::any::type_name::<F>() }
455            }
456        }
457    };
458}
459
460// -- Merge arity 5 -----------------------------------------------------------
461
462impl<A, B, C, D, E, Out, F> MergeStepCall<(&A, &B, &C, &D, &E), Out> for MergeStep<F, ()>
463where
464    F: FnMut(&A, &B, &C, &D, &E) -> Out + 'static,
465{
466    #[inline(always)]
467    fn call(&mut self, _world: &mut World, i: (&A, &B, &C, &D, &E)) -> Out {
468        (self.f)(i.0, i.1, i.2, i.3, i.4)
469    }
470}
471
472impl<A, B, C, D, E, Out, F> IntoMergeStep<(&A, &B, &C, &D, &E), Out, ()> for F
473where
474    F: FnMut(&A, &B, &C, &D, &E) -> Out + 'static,
475{
476    type Step = MergeStep<F, ()>;
477    fn into_merge_step(self, registry: &Registry) -> Self::Step {
478        MergeStep {
479            f: self,
480            state: <() as crate::handler::Param>::init(registry),
481            name: std::any::type_name::<F>(),
482        }
483    }
484}
485
486macro_rules! impl_merge5_step {
487    ($($P:ident),+) => {
488        impl<A, B, C, D, E, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
489            MergeStepCall<(&A, &B, &C, &D, &E), Out> for MergeStep<F, ($($P,)+)>
490        where for<'a> &'a mut F:
491            FnMut($($P,)+ &A, &B, &C, &D, &E) -> Out +
492            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D, &E) -> Out,
493        {
494            #[inline(always)]
495            #[allow(non_snake_case)]
496            fn call(&mut self, world: &mut World, i: (&A, &B, &C, &D, &E)) -> Out {
497                #[allow(clippy::too_many_arguments)]
498                fn call_inner<$($P,)+ IA, IB, IC, ID, IE, Output>(
499                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC, &ID, &IE) -> Output,
500                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID, e: &IE,
501                ) -> Output { f($($P,)+ a, b, c, d, e) }
502                // SAFETY: state was produced by Param::init() on the same Registry
503                // that built this World. Borrows are disjoint — enforced by
504                // conflict detection at build time.
505                #[cfg(debug_assertions)]
506                world.clear_borrows();
507                let ($($P,)+) = unsafe {
508                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
509                };
510                call_inner(&mut self.f, $($P,)+ i.0, i.1, i.2, i.3, i.4)
511            }
512        }
513        impl<A, B, C, D, E, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
514            IntoMergeStep<(&A, &B, &C, &D, &E), Out, ($($P,)+)> for F
515        where for<'a> &'a mut F:
516            FnMut($($P,)+ &A, &B, &C, &D, &E) -> Out +
517            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D, &E) -> Out,
518        {
519            type Step = MergeStep<F, ($($P,)+)>;
520            fn into_merge_step(self, registry: &Registry) -> Self::Step {
521                let state = <($($P,)+) as crate::handler::Param>::init(registry);
522                { #[allow(non_snake_case)] let ($($P,)+) = &state;
523                  registry.check_access(&[$((<$P as crate::handler::Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
524                MergeStep { f: self, state, name: std::any::type_name::<F>() }
525            }
526        }
527    };
528}
529
530all_tuples!(impl_merge2_step);
531all_tuples!(impl_merge3_step);
532all_tuples!(impl_merge4_step);
533all_tuples!(impl_merge5_step);
534
535// =============================================================================
536// DAG — monomorphized, zero vtable dispatch
537// =============================================================================
538//
539// Encodes DAG topology in the type system at compile time. After
540// monomorphization, the entire DAG is a single flat function with all
541// values as stack locals. No arena, no bitmap. Only unsafe is
542// in the shared Param::fetch path (resource access by pre-resolved index).
543//
544// Fan-out: multiple nodes borrow the same stack local (no Clone).
545// Merge: merge step borrows all arm outputs.
546// Panic safety: stack unwinding drops all locals automatically.
547
548/// Entry point for building a DAG pipeline.
549///
550/// The DAG encodes topology in the type system at compile time,
551/// producing a single monomorphized chain of named node types. All values live as
552/// stack locals in the `run()` body — no arena, no vtable dispatch.
553/// The only `unsafe` is in the shared [`Param::fetch`](crate::Param)
554/// path (resource access by pre-resolved index).
555///
556/// # Examples
557///
558/// ```
559/// use nexus_rt::{WorldBuilder, ResMut, Handler, Resource};
560/// use nexus_rt::dag::DagBuilder;
561///
562/// #[derive(Resource)]
563/// struct Accum(u64);
564///
565/// let mut wb = WorldBuilder::new();
566/// wb.register(Accum(0));
567/// let mut world = wb.build();
568/// let reg = world.registry();
569///
570/// fn double(x: u32) -> u64 { x as u64 * 2 }
571/// fn store(mut out: ResMut<Accum>, val: &u64) { out.0 = *val; }
572///
573/// let mut dag = DagBuilder::<u32>::new()
574///     .root(double, reg)
575///     .then(store, reg)
576///     .build();
577///
578/// dag.run(&mut world, 5u32);
579/// assert_eq!(world.resource::<Accum>().0, 10);
580/// ```
581#[must_use = "a DAG builder does nothing unless you chain steps and call .build()"]
582pub struct DagBuilder<E>(PhantomData<fn(E)>);
583
584impl<E> DagBuilder<E> {
585    /// Create a new typed DAG entry point.
586    pub fn new() -> Self {
587        Self(PhantomData)
588    }
589
590    /// Set the root step. Takes the event `E` by value, produces `Out`.
591    pub fn root<Out, Params, S>(
592        self,
593        f: S,
594        registry: &Registry,
595    ) -> DagChain<E, Out, ThenNode<IdentityNode, S::Step>>
596    where
597        Out: 'static,
598        S: IntoStep<E, Out, Params>,
599    {
600        DagChain {
601            chain: ThenNode {
602                prev: IdentityNode,
603                step: f.into_step(registry),
604            },
605            _marker: PhantomData,
606        }
607    }
608}
609
610impl<E> Default for DagBuilder<E> {
611    fn default() -> Self {
612        Self::new()
613    }
614}
615
616/// Main chain builder for a typed DAG.
617///
618/// `Chain` implements [`ChainCall<E, Out = Out>`] — a named node type
619/// representing all steps composed so far. No closures, no `use<>`.
620#[must_use = "DAG chain does nothing until .build() is called"]
621pub struct DagChain<E, Out, Chain> {
622    pub(crate) chain: Chain,
623    pub(crate) _marker: PhantomData<fn(E) -> Out>,
624}
625
626impl<E, Out: 'static, Chain> DagChain<E, Out, Chain> {
627    /// Enter fork mode. Subsequent `.arm()` calls add parallel branches.
628    pub fn fork(self) -> DagChainFork<E, Out, Chain, ()> {
629        DagChainFork {
630            chain: self.chain,
631            arms: (),
632            _marker: PhantomData,
633        }
634    }
635}
636
637impl<E, Chain> DagChain<E, (), Chain>
638where
639    Chain: ChainCall<E, Out = ()> + Send,
640{
641    /// Finalize into a [`Dag`] that implements [`Handler<E>`].
642    ///
643    /// Only available when the chain ends with `()` or `Option<()>`.
644    /// If your DAG produces a value, add a final `.then()` that consumes
645    /// the output.
646    #[must_use = "building a DAG without storing it does nothing"]
647    pub fn build(self) -> Dag<Chain> {
648        Dag { chain: self.chain }
649    }
650}
651
652impl<E, Chain> DagChain<E, Option<()>, Chain>
653where
654    Chain: ChainCall<E, Out = Option<()>> + Send,
655{
656    /// Finalize into a [`Dag`], discarding the `Option<()>`.
657    ///
658    /// DAGs ending with `Option<()>` produce the same [`Dag`] as those
659    /// ending with `()`.
660    #[must_use = "building a DAG without storing it does nothing"]
661    pub fn build(self) -> Dag<DiscardOptionNode<Chain>> {
662        Dag {
663            chain: DiscardOptionNode { prev: self.chain },
664        }
665    }
666}
667
668/// Arm builder seed. Used in `.arm()` closures and to build arms for
669/// [`.route()`](DagChain::route).
670///
671/// Call `.then()` to add the first step in this arm.
672pub struct DagArmSeed<In>(PhantomData<fn(*const In)>);
673
674impl<In: 'static> DagArmSeed<In> {
675    /// Create a new arm builder seed.
676    ///
677    /// Used to build arms passed to [`DagChain::route`] or
678    /// [`DagArm::route`]:
679    ///
680    /// ```ignore
681    /// let fast = DagArmSeed::new().then(fast_path, &reg);
682    /// let slow = DagArmSeed::new().then(slow_path, &reg);
683    /// dag.route(predicate, &reg, fast, slow)
684    /// ```
685    pub fn new() -> Self {
686        Self(PhantomData)
687    }
688}
689
690impl<In: 'static> Default for DagArmSeed<In> {
691    fn default() -> Self {
692        Self::new()
693    }
694}
695
696impl<In: 'static> DagArmSeed<In> {
697    /// Add the first step in this arm. Takes `&In` by reference.
698    pub fn then<Out, Params, S>(
699        self,
700        f: S,
701        registry: &Registry,
702    ) -> DagArm<In, Out, ThenNode<IdentityNode, S::Step>>
703    where
704        Out: 'static,
705        S: IntoStep<&'static In, Out, Params>,
706        S::Step: for<'a> StepCall<&'a In, Out = Out>,
707    {
708        DagArm {
709            chain: ThenNode {
710                prev: IdentityNode,
711                step: f.into_step(registry),
712            },
713            _marker: PhantomData,
714        }
715    }
716}
717
718/// Built arm in a typed DAG fork.
719///
720/// `Chain` implements [`ChainCall<&In, Out = Out>`] — a named node type
721/// for this arm's steps.
722pub struct DagArm<In, Out, Chain> {
723    pub(crate) chain: Chain,
724    pub(crate) _marker: PhantomData<fn(*const In) -> Out>,
725}
726
727impl<In: 'static, Out: 'static, Chain> DagArm<In, Out, Chain> {
728    /// Enter fork mode within this arm.
729    pub fn fork(self) -> DagArmFork<In, Out, Chain, ()> {
730        DagArmFork {
731            chain: self.chain,
732            arms: (),
733            _marker: PhantomData,
734        }
735    }
736}
737
738/// Fork builder on the main chain. Accumulates arms as a tuple.
739pub struct DagChainFork<E, ForkOut, Chain, Arms> {
740    chain: Chain,
741    arms: Arms,
742    _marker: PhantomData<fn(E) -> ForkOut>,
743}
744
745/// Fork builder within an arm. Accumulates sub-arms as a tuple.
746pub struct DagArmFork<In, ForkOut, Chain, Arms> {
747    chain: Chain,
748    arms: Arms,
749    _marker: PhantomData<fn(*const In) -> ForkOut>,
750}
751
752/// Final built DAG. Implements [`Handler<E>`].
753///
754/// Created by [`DagChain::build`]. The entire DAG is monomorphized
755/// at compile time — no boxing, no virtual dispatch, no arena.
756/// Supports `for<'a> Handler<&'a T>` for zero-copy event dispatch.
757/// For batch processing, see [`BatchDag`].
758pub struct Dag<Chain> {
759    chain: Chain,
760}
761
762impl<E, Chain> Handler<E> for Dag<Chain>
763where
764    Chain: ChainCall<E, Out = ()> + Send,
765{
766    fn run(&mut self, world: &mut World, event: E) {
767        self.chain.call(world, event);
768    }
769
770    fn name(&self) -> &'static str {
771        "dag::Dag"
772    }
773}
774
775// =============================================================================
776// Fork arity macro — arm accumulation, merge, join
777// =============================================================================
778
779// =============================================================================
780// Combinator macro — shared between DagChain and DagArm
781// =============================================================================
782
783/// Generates step combinators, Option/Result helpers, and clone helpers.
784///
785/// DagChain and DagArm use the same named node types — `In` appears only
786/// on the `ChainCall<In>` trait impl, not on the struct. No closures, no
787/// `use<>` captures.
788macro_rules! impl_dag_combinators {
789    (builder: $Builder:ident, upstream: $U:ident) => {
790        // =============================================================
791        // Core — any Out
792        // =============================================================
793
794        impl<$U, Out: 'static, Chain> $Builder<$U, Out, Chain> {
795            /// Append a step. The step receives `&Out` by reference.
796            pub fn then<NewOut, Params, S>(
797                self,
798                f: S,
799                registry: &Registry,
800            ) -> $Builder<$U, NewOut, DagThenNode<Chain, S::Step, NewOut>>
801            where
802                NewOut: 'static,
803                S: IntoStep<&'static Out, NewOut, Params>,
804                S::Step: for<'a> StepCall<&'a Out, Out = NewOut>,
805            {
806                $Builder {
807                    chain: DagThenNode {
808                        prev: self.chain,
809                        step: f.into_step(registry),
810                        _out: PhantomData,
811                    },
812                    _marker: PhantomData,
813                }
814            }
815
816            /// Dispatch output to a [`Handler<Out>`].
817            pub fn dispatch<H: Handler<Out>>(
818                self,
819                handler: H,
820            ) -> $Builder<$U, (), DispatchNode<Chain, H>> {
821                $Builder {
822                    chain: DispatchNode {
823                        prev: self.chain,
824                        handler,
825                    },
826                    _marker: PhantomData,
827                }
828            }
829
830            /// Conditionally wrap the output in `Option`.
831            pub fn guard<Params, S: IntoRefStep<Out, bool, Params>>(
832                self,
833                f: S,
834                registry: &Registry,
835            ) -> $Builder<$U, Option<Out>, GuardNode<Chain, S::Step>> {
836                $Builder {
837                    chain: GuardNode {
838                        prev: self.chain,
839                        step: f.into_ref_step(registry),
840                    },
841                    _marker: PhantomData,
842                }
843            }
844
845            /// Open a view scope. Steps inside operate on a read-only
846            /// view constructed from the event. Close with `.end_view()`.
847            pub fn view<V: crate::view::View<Out>>(
848                self,
849            ) -> crate::view::ViewScope<$U, Out, V, Chain, ()> {
850                crate::view::ViewScope::new(self.chain)
851            }
852
853            /// Observe the current value without consuming or changing it.
854            pub fn tap<Params, S: IntoRefStep<Out, (), Params>>(
855                self,
856                f: S,
857                registry: &Registry,
858            ) -> $Builder<$U, Out, TapNode<Chain, S::Step>> {
859                $Builder {
860                    chain: TapNode {
861                        prev: self.chain,
862                        step: f.into_ref_step(registry),
863                    },
864                    _marker: PhantomData,
865                }
866            }
867
868            /// Binary conditional routing. Both arms borrow `&Out`.
869            pub fn route<NewOut, C0, C1, Params, Pred: IntoRefStep<Out, bool, Params>>(
870                self,
871                pred: Pred,
872                registry: &Registry,
873                on_true: DagArm<Out, NewOut, C0>,
874                on_false: DagArm<Out, NewOut, C1>,
875            ) -> $Builder<$U, NewOut, DagRouteNode<Chain, Pred::Step, C0, C1, NewOut>>
876            where
877                C0: for<'a> ChainCall<&'a Out, Out = NewOut>,
878                C1: for<'a> ChainCall<&'a Out, Out = NewOut>,
879            {
880                $Builder {
881                    chain: DagRouteNode {
882                        prev: self.chain,
883                        pred: pred.into_ref_step(registry),
884                        on_true: on_true.chain,
885                        on_false: on_false.chain,
886                        _out: PhantomData,
887                    },
888                    _marker: PhantomData,
889                }
890            }
891
892            /// Fork off a multi-step side-effect chain.
893            pub fn tee<C>(self, side: DagArm<Out, (), C>) -> $Builder<$U, Out, TeeNode<Chain, C>>
894            where
895                C: for<'a> ChainCall<&'a Out, Out = ()>,
896            {
897                $Builder {
898                    chain: TeeNode {
899                        prev: self.chain,
900                        side: side.chain,
901                    },
902                    _marker: PhantomData,
903                }
904            }
905
906            /// Scan with persistent accumulator. The step receives
907            /// `&mut Acc` and `&Out` by reference.
908            pub fn scan<Acc, NewOut, Params, S>(
909                self,
910                initial: Acc,
911                f: S,
912                registry: &Registry,
913            ) -> $Builder<$U, NewOut, RefScanNode<Chain, S::Step, Acc>>
914            where
915                Acc: 'static,
916                NewOut: 'static,
917                S: IntoRefScanStep<Acc, Out, NewOut, Params>,
918            {
919                $Builder {
920                    chain: RefScanNode {
921                        prev: self.chain,
922                        step: f.into_ref_scan_step(registry),
923                        acc: initial,
924                    },
925                    _marker: PhantomData,
926                }
927            }
928        }
929
930        // =============================================================
931        // Dedup — suppress unchanged values
932        // =============================================================
933
934        impl<$U, Out: PartialEq + Clone + 'static, Chain> $Builder<$U, Out, Chain> {
935            /// Suppress consecutive unchanged values.
936            pub fn dedup(self) -> $Builder<$U, Option<Out>, DedupNode<Chain, Out>> {
937                $Builder {
938                    chain: DedupNode {
939                        prev: self.chain,
940                        last: None,
941                    },
942                    _marker: PhantomData,
943                }
944            }
945        }
946
947        // =============================================================
948        // Bool combinators
949        // =============================================================
950
951        impl<$U, Chain> $Builder<$U, bool, Chain> {
952            /// Invert a boolean value.
953            #[allow(clippy::should_implement_trait)]
954            pub fn not(self) -> $Builder<$U, bool, NotNode<Chain>> {
955                $Builder {
956                    chain: NotNode { prev: self.chain },
957                    _marker: PhantomData,
958                }
959            }
960
961            /// Short-circuit AND with a second boolean.
962            pub fn and<Params, S: IntoProducer<bool, Params>>(
963                self,
964                f: S,
965                registry: &Registry,
966            ) -> $Builder<$U, bool, AndBoolNode<Chain, S::Step>> {
967                $Builder {
968                    chain: AndBoolNode {
969                        prev: self.chain,
970                        producer: f.into_producer(registry),
971                    },
972                    _marker: PhantomData,
973                }
974            }
975
976            /// Short-circuit OR with a second boolean.
977            pub fn or<Params, S: IntoProducer<bool, Params>>(
978                self,
979                f: S,
980                registry: &Registry,
981            ) -> $Builder<$U, bool, OrBoolNode<Chain, S::Step>> {
982                $Builder {
983                    chain: OrBoolNode {
984                        prev: self.chain,
985                        producer: f.into_producer(registry),
986                    },
987                    _marker: PhantomData,
988                }
989            }
990
991            /// XOR with a second boolean.
992            pub fn xor<Params, S: IntoProducer<bool, Params>>(
993                self,
994                f: S,
995                registry: &Registry,
996            ) -> $Builder<$U, bool, XorBoolNode<Chain, S::Step>> {
997                $Builder {
998                    chain: XorBoolNode {
999                        prev: self.chain,
1000                        producer: f.into_producer(registry),
1001                    },
1002                    _marker: PhantomData,
1003                }
1004            }
1005        }
1006
1007        // =============================================================
1008        // Clone helpers — &T → T transitions
1009        // =============================================================
1010
1011        impl<'a, $U, T: Clone, Chain> $Builder<$U, &'a T, Chain> {
1012            /// Clone a borrowed output to produce an owned value.
1013            pub fn cloned(self) -> $Builder<$U, T, ClonedNode<Chain>> {
1014                $Builder {
1015                    chain: ClonedNode { prev: self.chain },
1016                    _marker: PhantomData,
1017                }
1018            }
1019        }
1020
1021        impl<'a, $U, T: Clone, Chain> $Builder<$U, Option<&'a T>, Chain> {
1022            /// Clone inner borrowed value. `Option<&T>` → `Option<T>`.
1023            pub fn cloned(self) -> $Builder<$U, Option<T>, ClonedOptionNode<Chain>> {
1024                $Builder {
1025                    chain: ClonedOptionNode { prev: self.chain },
1026                    _marker: PhantomData,
1027                }
1028            }
1029        }
1030
1031        impl<'a, $U, T: Clone, Err, Chain> $Builder<$U, Result<&'a T, Err>, Chain> {
1032            /// Clone inner borrowed Ok value.
1033            pub fn cloned(self) -> $Builder<$U, Result<T, Err>, ClonedResultNode<Chain>> {
1034                $Builder {
1035                    chain: ClonedResultNode { prev: self.chain },
1036                    _marker: PhantomData,
1037                }
1038            }
1039        }
1040
1041        // =============================================================
1042        // Option helpers
1043        // =============================================================
1044
1045        impl<$U, T: 'static, Chain> $Builder<$U, Option<T>, Chain> {
1046            /// Transform the inner value. Step not called on None.
1047            pub fn map<U, Params, S: IntoStep<&'static T, U, Params>>(
1048                self,
1049                f: S,
1050                registry: &Registry,
1051            ) -> $Builder<$U, Option<U>, DagMapOptionNode<Chain, S::Step, U>>
1052            where
1053                U: 'static,
1054                S::Step: for<'x> StepCall<&'x T, Out = U>,
1055            {
1056                $Builder {
1057                    chain: DagMapOptionNode {
1058                        prev: self.chain,
1059                        step: f.into_step(registry),
1060                        _out: PhantomData,
1061                    },
1062                    _marker: PhantomData,
1063                }
1064            }
1065
1066            /// Short-circuits on None. std: `Option::and_then`
1067            pub fn and_then<U, Params, S: IntoStep<&'static T, Option<U>, Params>>(
1068                self,
1069                f: S,
1070                registry: &Registry,
1071            ) -> $Builder<$U, Option<U>, DagAndThenOptionNode<Chain, S::Step, U>>
1072            where
1073                U: 'static,
1074                S::Step: for<'x> StepCall<&'x T, Out = Option<U>>,
1075            {
1076                $Builder {
1077                    chain: DagAndThenOptionNode {
1078                        prev: self.chain,
1079                        step: f.into_step(registry),
1080                        _out: PhantomData,
1081                    },
1082                    _marker: PhantomData,
1083                }
1084            }
1085
1086            /// Side effect on None.
1087            pub fn on_none<Params, S: IntoProducer<(), Params>>(
1088                self,
1089                f: S,
1090                registry: &Registry,
1091            ) -> $Builder<$U, Option<T>, OnNoneNode<Chain, S::Step>> {
1092                $Builder {
1093                    chain: OnNoneNode {
1094                        prev: self.chain,
1095                        producer: f.into_producer(registry),
1096                    },
1097                    _marker: PhantomData,
1098                }
1099            }
1100
1101            /// Keep value if predicate holds. std: `Option::filter`
1102            pub fn filter<Params, S: IntoRefStep<T, bool, Params>>(
1103                self,
1104                f: S,
1105                registry: &Registry,
1106            ) -> $Builder<$U, Option<T>, FilterNode<Chain, S::Step>> {
1107                $Builder {
1108                    chain: FilterNode {
1109                        prev: self.chain,
1110                        step: f.into_ref_step(registry),
1111                    },
1112                    _marker: PhantomData,
1113                }
1114            }
1115
1116            /// Side effect on Some value. std: `Option::inspect`
1117            pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
1118                self,
1119                f: S,
1120                registry: &Registry,
1121            ) -> $Builder<$U, Option<T>, InspectOptionNode<Chain, S::Step>> {
1122                $Builder {
1123                    chain: InspectOptionNode {
1124                        prev: self.chain,
1125                        step: f.into_ref_step(registry),
1126                    },
1127                    _marker: PhantomData,
1128                }
1129            }
1130
1131            /// None becomes Err(err). std: `Option::ok_or`
1132            pub fn ok_or<Err: Clone>(
1133                self,
1134                err: Err,
1135            ) -> $Builder<$U, Result<T, Err>, OkOrNode<Chain, Err>> {
1136                $Builder {
1137                    chain: OkOrNode {
1138                        prev: self.chain,
1139                        err,
1140                    },
1141                    _marker: PhantomData,
1142                }
1143            }
1144
1145            /// None becomes Err(f()). std: `Option::ok_or_else`
1146            pub fn ok_or_else<Err, Params, S: IntoProducer<Err, Params>>(
1147                self,
1148                f: S,
1149                registry: &Registry,
1150            ) -> $Builder<$U, Result<T, Err>, OkOrElseNode<Chain, S::Step>> {
1151                $Builder {
1152                    chain: OkOrElseNode {
1153                        prev: self.chain,
1154                        producer: f.into_producer(registry),
1155                    },
1156                    _marker: PhantomData,
1157                }
1158            }
1159
1160            /// Exit Option — None becomes the default value.
1161            pub fn unwrap_or(self, default: T) -> $Builder<$U, T, UnwrapOrOptionNode<Chain, T>>
1162            where
1163                T: Clone,
1164            {
1165                $Builder {
1166                    chain: UnwrapOrOptionNode {
1167                        prev: self.chain,
1168                        default,
1169                    },
1170                    _marker: PhantomData,
1171                }
1172            }
1173
1174            /// Exit Option — None becomes `f()`.
1175            pub fn unwrap_or_else<Params, S: IntoProducer<T, Params>>(
1176                self,
1177                f: S,
1178                registry: &Registry,
1179            ) -> $Builder<$U, T, UnwrapOrElseOptionNode<Chain, S::Step>> {
1180                $Builder {
1181                    chain: UnwrapOrElseOptionNode {
1182                        prev: self.chain,
1183                        producer: f.into_producer(registry),
1184                    },
1185                    _marker: PhantomData,
1186                }
1187            }
1188        }
1189
1190        // =============================================================
1191        // Result helpers
1192        // =============================================================
1193
1194        impl<$U, T: 'static, Err: 'static, Chain> $Builder<$U, Result<T, Err>, Chain> {
1195            /// Transform the Ok value. Step not called on Err.
1196            pub fn map<U, Params, S: IntoStep<&'static T, U, Params>>(
1197                self,
1198                f: S,
1199                registry: &Registry,
1200            ) -> $Builder<$U, Result<U, Err>, DagMapResultNode<Chain, S::Step, U>>
1201            where
1202                U: 'static,
1203                S::Step: for<'x> StepCall<&'x T, Out = U>,
1204            {
1205                $Builder {
1206                    chain: DagMapResultNode {
1207                        prev: self.chain,
1208                        step: f.into_step(registry),
1209                        _out: PhantomData,
1210                    },
1211                    _marker: PhantomData,
1212                }
1213            }
1214
1215            /// Short-circuits on Err. std: `Result::and_then`
1216            pub fn and_then<U, Params, S: IntoStep<&'static T, Result<U, Err>, Params>>(
1217                self,
1218                f: S,
1219                registry: &Registry,
1220            ) -> $Builder<$U, Result<U, Err>, DagAndThenResultNode<Chain, S::Step, U>>
1221            where
1222                U: 'static,
1223                S::Step: for<'x> StepCall<&'x T, Out = Result<U, Err>>,
1224            {
1225                $Builder {
1226                    chain: DagAndThenResultNode {
1227                        prev: self.chain,
1228                        step: f.into_step(registry),
1229                        _out: PhantomData,
1230                    },
1231                    _marker: PhantomData,
1232                }
1233            }
1234
1235            /// Handle error and transition to Option.
1236            pub fn catch<Params, S: IntoStep<&'static Err, (), Params>>(
1237                self,
1238                f: S,
1239                registry: &Registry,
1240            ) -> $Builder<$U, Option<T>, DagCatchNode<Chain, S::Step>>
1241            where
1242                S::Step: for<'x> StepCall<&'x Err, Out = ()>,
1243            {
1244                $Builder {
1245                    chain: DagCatchNode {
1246                        prev: self.chain,
1247                        step: f.into_step(registry),
1248                    },
1249                    _marker: PhantomData,
1250                }
1251            }
1252
1253            /// Transform the error. std: `Result::map_err`
1254            pub fn map_err<Err2, Params, S: IntoStep<Err, Err2, Params>>(
1255                self,
1256                f: S,
1257                registry: &Registry,
1258            ) -> $Builder<$U, Result<T, Err2>, MapErrNode<Chain, S::Step>> {
1259                $Builder {
1260                    chain: MapErrNode {
1261                        prev: self.chain,
1262                        step: f.into_step(registry),
1263                    },
1264                    _marker: PhantomData,
1265                }
1266            }
1267
1268            /// Recover from Err. std: `Result::or_else`
1269            pub fn or_else<Err2, Params, S: IntoStep<Err, Result<T, Err2>, Params>>(
1270                self,
1271                f: S,
1272                registry: &Registry,
1273            ) -> $Builder<$U, Result<T, Err2>, OrElseNode<Chain, S::Step>> {
1274                $Builder {
1275                    chain: OrElseNode {
1276                        prev: self.chain,
1277                        step: f.into_step(registry),
1278                    },
1279                    _marker: PhantomData,
1280                }
1281            }
1282
1283            /// Side effect on Ok. std: `Result::inspect`
1284            pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
1285                self,
1286                f: S,
1287                registry: &Registry,
1288            ) -> $Builder<$U, Result<T, Err>, InspectResultNode<Chain, S::Step>> {
1289                $Builder {
1290                    chain: InspectResultNode {
1291                        prev: self.chain,
1292                        step: f.into_ref_step(registry),
1293                    },
1294                    _marker: PhantomData,
1295                }
1296            }
1297
1298            /// Side effect on Err. std: `Result::inspect_err`
1299            pub fn inspect_err<Params, S: IntoRefStep<Err, (), Params>>(
1300                self,
1301                f: S,
1302                registry: &Registry,
1303            ) -> $Builder<$U, Result<T, Err>, InspectErrNode<Chain, S::Step>> {
1304                $Builder {
1305                    chain: InspectErrNode {
1306                        prev: self.chain,
1307                        step: f.into_ref_step(registry),
1308                    },
1309                    _marker: PhantomData,
1310                }
1311            }
1312
1313            /// Discard error, enter Option land. std: `Result::ok`
1314            pub fn ok(self) -> $Builder<$U, Option<T>, OkResultNode<Chain>> {
1315                $Builder {
1316                    chain: OkResultNode { prev: self.chain },
1317                    _marker: PhantomData,
1318                }
1319            }
1320
1321            /// Exit Result — Err becomes the default value.
1322            pub fn unwrap_or(self, default: T) -> $Builder<$U, T, UnwrapOrResultNode<Chain, T>>
1323            where
1324                T: Clone,
1325            {
1326                $Builder {
1327                    chain: UnwrapOrResultNode {
1328                        prev: self.chain,
1329                        default,
1330                    },
1331                    _marker: PhantomData,
1332                }
1333            }
1334
1335            /// Exit Result — Err becomes `f(err)`.
1336            pub fn unwrap_or_else<Params, S: IntoStep<Err, T, Params>>(
1337                self,
1338                f: S,
1339                registry: &Registry,
1340            ) -> $Builder<$U, T, UnwrapOrElseResultNode<Chain, S::Step>> {
1341                $Builder {
1342                    chain: UnwrapOrElseResultNode {
1343                        prev: self.chain,
1344                        step: f.into_step(registry),
1345                    },
1346                    _marker: PhantomData,
1347                }
1348            }
1349        }
1350    };
1351}
1352
1353impl_dag_combinators!(builder: DagChain, upstream: E);
1354impl_dag_combinators!(builder: DagArm, upstream: In);
1355
1356// =============================================================================
1357// Merge / Join named nodes — fork terminal nodes
1358// =============================================================================
1359
1360/// Merge two fork arms into a single output via [`MergeStepCall`].
1361#[doc(hidden)]
1362pub struct MergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut> {
1363    pub(crate) chain: Chain,
1364    pub(crate) arm0: C0,
1365    pub(crate) arm1: C1,
1366    pub(crate) merge: MS,
1367    pub(crate) _marker: PhantomData<fn(ForkOut) -> (A0, A1, MOut)>,
1368}
1369
1370impl<In, Chain, C0, C1, MS, ForkOut, A0, A1, MOut> ChainCall<In>
1371    for MergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut>
1372where
1373    ForkOut: 'static,
1374    A0: 'static,
1375    A1: 'static,
1376    Chain: ChainCall<In, Out = ForkOut>,
1377    C0: for<'a> ChainCall<&'a ForkOut, Out = A0>,
1378    C1: for<'a> ChainCall<&'a ForkOut, Out = A1>,
1379    MS: for<'x> MergeStepCall<(&'x A0, &'x A1), MOut>,
1380{
1381    type Out = MOut;
1382
1383    fn call(&mut self, world: &mut World, input: In) -> MOut {
1384        let fork_out = self.chain.call(world, input);
1385        let o0 = self.arm0.call(world, &fork_out);
1386        let o1 = self.arm1.call(world, &fork_out);
1387        self.merge.call(world, (&o0, &o1))
1388    }
1389}
1390
1391/// Merge three fork arms into a single output via [`MergeStepCall`].
1392#[doc(hidden)]
1393pub struct MergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> {
1394    pub(crate) chain: Chain,
1395    pub(crate) arm0: C0,
1396    pub(crate) arm1: C1,
1397    pub(crate) arm2: C2,
1398    pub(crate) merge: MS,
1399    pub(crate) _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, MOut)>,
1400}
1401
1402impl<In, Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> ChainCall<In>
1403    for MergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut>
1404where
1405    ForkOut: 'static,
1406    A0: 'static,
1407    A1: 'static,
1408    A2: 'static,
1409    Chain: ChainCall<In, Out = ForkOut>,
1410    C0: for<'a> ChainCall<&'a ForkOut, Out = A0>,
1411    C1: for<'a> ChainCall<&'a ForkOut, Out = A1>,
1412    C2: for<'a> ChainCall<&'a ForkOut, Out = A2>,
1413    MS: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2), MOut>,
1414{
1415    type Out = MOut;
1416
1417    fn call(&mut self, world: &mut World, input: In) -> MOut {
1418        let fork_out = self.chain.call(world, input);
1419        let o0 = self.arm0.call(world, &fork_out);
1420        let o1 = self.arm1.call(world, &fork_out);
1421        let o2 = self.arm2.call(world, &fork_out);
1422        self.merge.call(world, (&o0, &o1, &o2))
1423    }
1424}
1425
1426/// Merge four fork arms into a single output via [`MergeStepCall`].
1427#[doc(hidden)]
1428pub struct MergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> {
1429    pub(crate) chain: Chain,
1430    pub(crate) arm0: C0,
1431    pub(crate) arm1: C1,
1432    pub(crate) arm2: C2,
1433    pub(crate) arm3: C3,
1434    pub(crate) merge: MS,
1435    pub(crate) _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, A3, MOut)>,
1436}
1437
1438#[allow(clippy::many_single_char_names)]
1439impl<In, Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> ChainCall<In>
1440    for MergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut>
1441where
1442    ForkOut: 'static,
1443    A0: 'static,
1444    A1: 'static,
1445    A2: 'static,
1446    A3: 'static,
1447    Chain: ChainCall<In, Out = ForkOut>,
1448    C0: for<'a> ChainCall<&'a ForkOut, Out = A0>,
1449    C1: for<'a> ChainCall<&'a ForkOut, Out = A1>,
1450    C2: for<'a> ChainCall<&'a ForkOut, Out = A2>,
1451    C3: for<'a> ChainCall<&'a ForkOut, Out = A3>,
1452    MS: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2, &'x A3), MOut>,
1453{
1454    type Out = MOut;
1455
1456    fn call(&mut self, world: &mut World, input: In) -> MOut {
1457        let fork_out = self.chain.call(world, input);
1458        let o0 = self.arm0.call(world, &fork_out);
1459        let o1 = self.arm1.call(world, &fork_out);
1460        let o2 = self.arm2.call(world, &fork_out);
1461        let o3 = self.arm3.call(world, &fork_out);
1462        self.merge.call(world, (&o0, &o1, &o2, &o3))
1463    }
1464}
1465
1466/// Join two fork arms (all producing `()`).
1467#[doc(hidden)]
1468pub struct JoinNode2<Chain, C0, C1, ForkOut> {
1469    pub(crate) chain: Chain,
1470    pub(crate) arm0: C0,
1471    pub(crate) arm1: C1,
1472    pub(crate) _marker: PhantomData<fn() -> ForkOut>,
1473}
1474
1475impl<In, Chain, C0, C1, ForkOut> ChainCall<In> for JoinNode2<Chain, C0, C1, ForkOut>
1476where
1477    ForkOut: 'static,
1478    Chain: ChainCall<In, Out = ForkOut>,
1479    C0: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1480    C1: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1481{
1482    type Out = ();
1483
1484    fn call(&mut self, world: &mut World, input: In) {
1485        let fork_out = self.chain.call(world, input);
1486        self.arm0.call(world, &fork_out);
1487        self.arm1.call(world, &fork_out);
1488    }
1489}
1490
1491/// Join three fork arms (all producing `()`).
1492#[doc(hidden)]
1493pub struct JoinNode3<Chain, C0, C1, C2, ForkOut> {
1494    pub(crate) chain: Chain,
1495    pub(crate) arm0: C0,
1496    pub(crate) arm1: C1,
1497    pub(crate) arm2: C2,
1498    pub(crate) _marker: PhantomData<fn() -> ForkOut>,
1499}
1500
1501impl<In, Chain, C0, C1, C2, ForkOut> ChainCall<In> for JoinNode3<Chain, C0, C1, C2, ForkOut>
1502where
1503    ForkOut: 'static,
1504    Chain: ChainCall<In, Out = ForkOut>,
1505    C0: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1506    C1: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1507    C2: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1508{
1509    type Out = ();
1510
1511    fn call(&mut self, world: &mut World, input: In) {
1512        let fork_out = self.chain.call(world, input);
1513        self.arm0.call(world, &fork_out);
1514        self.arm1.call(world, &fork_out);
1515        self.arm2.call(world, &fork_out);
1516    }
1517}
1518
1519/// Join four fork arms (all producing `()`).
1520#[doc(hidden)]
1521pub struct JoinNode4<Chain, C0, C1, C2, C3, ForkOut> {
1522    pub(crate) chain: Chain,
1523    pub(crate) arm0: C0,
1524    pub(crate) arm1: C1,
1525    pub(crate) arm2: C2,
1526    pub(crate) arm3: C3,
1527    pub(crate) _marker: PhantomData<fn() -> ForkOut>,
1528}
1529
1530#[allow(clippy::many_single_char_names)]
1531impl<In, Chain, C0, C1, C2, C3, ForkOut> ChainCall<In> for JoinNode4<Chain, C0, C1, C2, C3, ForkOut>
1532where
1533    ForkOut: 'static,
1534    Chain: ChainCall<In, Out = ForkOut>,
1535    C0: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1536    C1: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1537    C2: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1538    C3: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1539{
1540    type Out = ();
1541
1542    fn call(&mut self, world: &mut World, input: In) {
1543        let fork_out = self.chain.call(world, input);
1544        self.arm0.call(world, &fork_out);
1545        self.arm1.call(world, &fork_out);
1546        self.arm2.call(world, &fork_out);
1547        self.arm3.call(world, &fork_out);
1548    }
1549}
1550
1551// =============================================================================
1552// Splat — tuple destructuring into individual reference arguments (DAG)
1553// =============================================================================
1554//
1555// DAG splat reuses IntoMergeStep/MergeStepCall since DAG steps take inputs
1556// by reference — the function signature is the same as a merge step:
1557// `fn(Params..., &A, &B) -> Out`.
1558//
1559// Builder types are `#[doc(hidden)]` — users only see `.splat().then()`.
1560
1561macro_rules! define_dag_splat_builders {
1562    (
1563        $N:literal,
1564        chain: $SplatChain:ident,
1565        arm: $SplatArm:ident,
1566        arm_start: $SplatArmStart:ident,
1567        splat_then: $SplatThenNode:ident,
1568        splat_arm_start: $SplatArmStartNode:ident,
1569        ($($T:ident),+),
1570        ($($idx:tt),+)
1571    ) => {
1572        // -- Named node: splat + step on upstream chain --
1573
1574        #[doc(hidden)]
1575        pub struct $SplatThenNode<Chain, MS, $($T,)+ NewOut> {
1576            pub(crate) chain: Chain,
1577            pub(crate) merge: MS,
1578            pub(crate) _marker: PhantomData<fn() -> ($($T,)+ NewOut)>,
1579        }
1580
1581        impl<In, Chain, MS, $($T: 'static,)+ NewOut> ChainCall<In>
1582            for $SplatThenNode<Chain, MS, $($T,)+ NewOut>
1583        where
1584            Chain: ChainCall<In, Out = ($($T,)+)>,
1585            MS: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1586        {
1587            type Out = NewOut;
1588
1589            fn call(&mut self, world: &mut World, input: In) -> NewOut {
1590                let tuple = self.chain.call(world, input);
1591                self.merge.call(world, ($(&tuple.$idx,)+))
1592            }
1593        }
1594
1595        // -- Named node: splat at arm start (no upstream chain) --
1596
1597        #[doc(hidden)]
1598        pub struct $SplatArmStartNode<MS, $($T,)+ Out> {
1599            pub(crate) merge: MS,
1600            pub(crate) _marker: PhantomData<fn() -> ($($T,)+ Out)>,
1601        }
1602
1603        impl<'inp, $($T: 'static,)+ MS, Out> ChainCall<&'inp ($($T,)+)>
1604            for $SplatArmStartNode<MS, $($T,)+ Out>
1605        where
1606            MS: for<'x> MergeStepCall<($(&'x $T,)+), Out>,
1607        {
1608            type Out = Out;
1609
1610            fn call(&mut self, world: &mut World, input: &($($T,)+)) -> Out {
1611                self.merge.call(world, ($(&input.$idx,)+))
1612            }
1613        }
1614
1615        // -- Splat builder on main chain --
1616
1617        /// DAG splat builder on the main chain.
1618        #[doc(hidden)]
1619        pub struct $SplatChain<E, $($T,)+ Chain> {
1620            chain: Chain,
1621            _marker: PhantomData<fn(E) -> ($($T,)+)>,
1622        }
1623
1624        impl<E, $($T: 'static,)+ Chain> $SplatChain<E, $($T,)+ Chain> {
1625            /// Add a step that receives the tuple elements as individual `&T` arguments.
1626            pub fn then<NewOut, Params, S>(
1627                self,
1628                f: S,
1629                registry: &Registry,
1630            ) -> DagChain<E, NewOut, $SplatThenNode<Chain, S::Step, $($T,)+ NewOut>>
1631            where
1632                NewOut: 'static,
1633                S: IntoMergeStep<($(&'static $T,)+), NewOut, Params>,
1634                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1635            {
1636                DagChain {
1637                    chain: $SplatThenNode {
1638                        chain: self.chain,
1639                        merge: f.into_merge_step(registry),
1640                        _marker: PhantomData,
1641                    },
1642                    _marker: PhantomData,
1643                }
1644            }
1645        }
1646
1647        impl<E, $($T: 'static,)+ Chain> DagChain<E, ($($T,)+), Chain> {
1648            /// Destructure the tuple output into individual `&T` arguments.
1649            pub fn splat(self) -> $SplatChain<E, $($T,)+ Chain> {
1650                $SplatChain {
1651                    chain: self.chain,
1652                    _marker: PhantomData,
1653                }
1654            }
1655        }
1656
1657        // -- Splat builder within an arm --
1658
1659        /// DAG splat builder within an arm.
1660        #[doc(hidden)]
1661        pub struct $SplatArm<In, $($T,)+ Chain> {
1662            chain: Chain,
1663            _marker: PhantomData<fn(*const In) -> ($($T,)+)>,
1664        }
1665
1666        impl<In: 'static, $($T: 'static,)+ Chain> $SplatArm<In, $($T,)+ Chain> {
1667            /// Add a step that receives the tuple elements as individual `&T` arguments.
1668            pub fn then<NewOut, Params, S>(
1669                self,
1670                f: S,
1671                registry: &Registry,
1672            ) -> DagArm<In, NewOut, $SplatThenNode<Chain, S::Step, $($T,)+ NewOut>>
1673            where
1674                NewOut: 'static,
1675                S: IntoMergeStep<($(&'static $T,)+), NewOut, Params>,
1676                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1677            {
1678                DagArm {
1679                    chain: $SplatThenNode {
1680                        chain: self.chain,
1681                        merge: f.into_merge_step(registry),
1682                        _marker: PhantomData,
1683                    },
1684                    _marker: PhantomData,
1685                }
1686            }
1687        }
1688
1689        impl<In: 'static, $($T: 'static,)+ Chain> DagArm<In, ($($T,)+), Chain> {
1690            /// Destructure the tuple output into individual `&T` arguments.
1691            pub fn splat(self) -> $SplatArm<In, $($T,)+ Chain> {
1692                $SplatArm {
1693                    chain: self.chain,
1694                    _marker: PhantomData,
1695                }
1696            }
1697        }
1698
1699        // -- Splat at arm start position --
1700
1701        /// DAG splat builder at arm start position.
1702        #[doc(hidden)]
1703        pub struct $SplatArmStart<$($T),+>(PhantomData<fn(($($T,)+))>);
1704
1705        impl<$($T: 'static),+> $SplatArmStart<$($T),+> {
1706            /// Add a step that receives the tuple elements as individual `&T` arguments.
1707            pub fn then<Out, Params, S>(
1708                self,
1709                f: S,
1710                registry: &Registry,
1711            ) -> DagArm<($($T,)+), Out, $SplatArmStartNode<S::Step, $($T,)+ Out>>
1712            where
1713                Out: 'static,
1714                S: IntoMergeStep<($(&'static $T,)+), Out, Params>,
1715                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), Out>,
1716            {
1717                DagArm {
1718                    chain: $SplatArmStartNode {
1719                        merge: f.into_merge_step(registry),
1720                        _marker: PhantomData,
1721                    },
1722                    _marker: PhantomData,
1723                }
1724            }
1725        }
1726
1727        impl<$($T: 'static),+> DagArmSeed<($($T,)+)> {
1728            /// Destructure the tuple input into individual `&T` arguments.
1729            pub fn splat(self) -> $SplatArmStart<$($T),+> {
1730                $SplatArmStart(PhantomData)
1731            }
1732        }
1733    };
1734}
1735
1736define_dag_splat_builders!(2,
1737    chain: DagSplatChain2,
1738    arm: DagSplatArm2,
1739    arm_start: DagSplatArmStart2,
1740    splat_then: SplatThenNode2,
1741    splat_arm_start: SplatArmStartNode2,
1742    (T0, T1),
1743    (0, 1)
1744);
1745
1746define_dag_splat_builders!(3,
1747    chain: DagSplatChain3,
1748    arm: DagSplatArm3,
1749    arm_start: DagSplatArmStart3,
1750    splat_then: SplatThenNode3,
1751    splat_arm_start: SplatArmStartNode3,
1752    (T0, T1, T2),
1753    (0, 1, 2)
1754);
1755
1756define_dag_splat_builders!(4,
1757    chain: DagSplatChain4,
1758    arm: DagSplatArm4,
1759    arm_start: DagSplatArmStart4,
1760    splat_then: SplatThenNode4,
1761    splat_arm_start: SplatArmStartNode4,
1762    (T0, T1, T2, T3),
1763    (0, 1, 2, 3)
1764);
1765
1766define_dag_splat_builders!(5,
1767    chain: DagSplatChain5,
1768    arm: DagSplatArm5,
1769    arm_start: DagSplatArmStart5,
1770    splat_then: SplatThenNode5,
1771    splat_arm_start: SplatArmStartNode5,
1772    (T0, T1, T2, T3, T4),
1773    (0, 1, 2, 3, 4)
1774);
1775
1776// =============================================================================
1777// Fork arity macro — arm accumulation, merge, join
1778// =============================================================================
1779
1780/// Generates arm accumulation, merge, and join for a fork type.
1781///
1782/// ChainFork and ArmFork differ only in what output builder they
1783/// produce (DagChain vs DagArm). All dispatch logic lives in the
1784/// named MergeNode/JoinNode types — the macro just wires construction.
1785macro_rules! impl_dag_fork {
1786    (
1787        fork: $Fork:ident,
1788        output: $Output:ident,
1789        upstream: $U:ident
1790    ) => {
1791        // =============================================================
1792        // Arm accumulation: 0→1, 1→2, 2→3, 3→4
1793        // =============================================================
1794
1795        impl<$U, ForkOut, Chain> $Fork<$U, ForkOut, Chain, ()> {
1796            /// Add the first arm to this fork.
1797            pub fn arm<AOut, ACh>(
1798                self,
1799                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1800            ) -> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, AOut, ACh>,)> {
1801                let arm = f(DagArmSeed(PhantomData));
1802                $Fork {
1803                    chain: self.chain,
1804                    arms: (arm,),
1805                    _marker: PhantomData,
1806                }
1807            }
1808        }
1809
1810        impl<$U, ForkOut, Chain, A0, C0> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>,)> {
1811            /// Add a second arm to this fork.
1812            pub fn arm<AOut, ACh>(
1813                self,
1814                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1815            ) -> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, AOut, ACh>)>
1816            {
1817                let arm = f(DagArmSeed(PhantomData));
1818                let (a0,) = self.arms;
1819                $Fork {
1820                    chain: self.chain,
1821                    arms: (a0, arm),
1822                    _marker: PhantomData,
1823                }
1824            }
1825        }
1826
1827        impl<$U, ForkOut, Chain, A0, C0, A1, C1>
1828            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, A1, C1>)>
1829        {
1830            /// Add a third arm to this fork.
1831            pub fn arm<AOut, ACh>(
1832                self,
1833                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1834            ) -> $Fork<
1835                $U,
1836                ForkOut,
1837                Chain,
1838                (
1839                    DagArm<ForkOut, A0, C0>,
1840                    DagArm<ForkOut, A1, C1>,
1841                    DagArm<ForkOut, AOut, ACh>,
1842                ),
1843            > {
1844                let arm = f(DagArmSeed(PhantomData));
1845                let (a0, a1) = self.arms;
1846                $Fork {
1847                    chain: self.chain,
1848                    arms: (a0, a1, arm),
1849                    _marker: PhantomData,
1850                }
1851            }
1852        }
1853
1854        impl<$U, ForkOut, Chain, A0, C0, A1, C1, A2, C2>
1855            $Fork<
1856                $U,
1857                ForkOut,
1858                Chain,
1859                (
1860                    DagArm<ForkOut, A0, C0>,
1861                    DagArm<ForkOut, A1, C1>,
1862                    DagArm<ForkOut, A2, C2>,
1863                ),
1864            >
1865        {
1866            /// Add a fourth arm to this fork.
1867            pub fn arm<AOut, ACh>(
1868                self,
1869                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1870            ) -> $Fork<
1871                $U,
1872                ForkOut,
1873                Chain,
1874                (
1875                    DagArm<ForkOut, A0, C0>,
1876                    DagArm<ForkOut, A1, C1>,
1877                    DagArm<ForkOut, A2, C2>,
1878                    DagArm<ForkOut, AOut, ACh>,
1879                ),
1880            > {
1881                let arm = f(DagArmSeed(PhantomData));
1882                let (a0, a1, a2) = self.arms;
1883                $Fork {
1884                    chain: self.chain,
1885                    arms: (a0, a1, a2, arm),
1886                    _marker: PhantomData,
1887                }
1888            }
1889        }
1890
1891        // =============================================================
1892        // Merge arity 2
1893        // =============================================================
1894
1895        impl<$U, ForkOut: 'static, Chain, A0: 'static, C0, A1: 'static, C1>
1896            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, A1, C1>)>
1897        {
1898            /// Merge two arms with a merge step.
1899            pub fn merge<MOut, Params, S>(
1900                self,
1901                f: S,
1902                registry: &Registry,
1903            ) -> $Output<
1904                $U,
1905                MOut,
1906                MergeNode2<Chain, C0, C1, S::Step, ForkOut, A0, A1, MOut>,
1907            >
1908            where
1909                MOut: 'static,
1910                S: IntoMergeStep<(&'static A0, &'static A1), MOut, Params>,
1911                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1), MOut>,
1912            {
1913                let (a0, a1) = self.arms;
1914                $Output {
1915                    chain: MergeNode2 {
1916                        chain: self.chain,
1917                        arm0: a0.chain,
1918                        arm1: a1.chain,
1919                        merge: f.into_merge_step(registry),
1920                        _marker: PhantomData,
1921                    },
1922                    _marker: PhantomData,
1923                }
1924            }
1925        }
1926
1927        impl<$U, ForkOut: 'static, Chain, C0, C1>
1928            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, (), C0>, DagArm<ForkOut, (), C1>)>
1929        {
1930            /// Join two sink arms (all producing `()`).
1931            pub fn join(
1932                self,
1933            ) -> $Output<$U, (), JoinNode2<Chain, C0, C1, ForkOut>> {
1934                let (a0, a1) = self.arms;
1935                $Output {
1936                    chain: JoinNode2 {
1937                        chain: self.chain,
1938                        arm0: a0.chain,
1939                        arm1: a1.chain,
1940                        _marker: PhantomData,
1941                    },
1942                    _marker: PhantomData,
1943                }
1944            }
1945        }
1946
1947        // =============================================================
1948        // Merge arity 3
1949        // =============================================================
1950
1951        impl<$U, ForkOut: 'static, Chain, A0: 'static, C0, A1: 'static, C1, A2: 'static, C2>
1952            $Fork<
1953                $U,
1954                ForkOut,
1955                Chain,
1956                (
1957                    DagArm<ForkOut, A0, C0>,
1958                    DagArm<ForkOut, A1, C1>,
1959                    DagArm<ForkOut, A2, C2>,
1960                ),
1961            >
1962        {
1963            /// Merge three arms with a merge step.
1964            pub fn merge<MOut, Params, S>(
1965                self,
1966                f: S,
1967                registry: &Registry,
1968            ) -> $Output<
1969                $U,
1970                MOut,
1971                MergeNode3<Chain, C0, C1, C2, S::Step, ForkOut, A0, A1, A2, MOut>,
1972            >
1973            where
1974                MOut: 'static,
1975                S: IntoMergeStep<(&'static A0, &'static A1, &'static A2), MOut, Params>,
1976                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2), MOut>,
1977            {
1978                let (a0, a1, a2) = self.arms;
1979                $Output {
1980                    chain: MergeNode3 {
1981                        chain: self.chain,
1982                        arm0: a0.chain,
1983                        arm1: a1.chain,
1984                        arm2: a2.chain,
1985                        merge: f.into_merge_step(registry),
1986                        _marker: PhantomData,
1987                    },
1988                    _marker: PhantomData,
1989                }
1990            }
1991        }
1992
1993        impl<$U, ForkOut: 'static, Chain, C0, C1, C2>
1994            $Fork<
1995                $U,
1996                ForkOut,
1997                Chain,
1998                (
1999                    DagArm<ForkOut, (), C0>,
2000                    DagArm<ForkOut, (), C1>,
2001                    DagArm<ForkOut, (), C2>,
2002                ),
2003            >
2004        {
2005            /// Join three sink arms (all producing `()`).
2006            pub fn join(
2007                self,
2008            ) -> $Output<$U, (), JoinNode3<Chain, C0, C1, C2, ForkOut>> {
2009                let (a0, a1, a2) = self.arms;
2010                $Output {
2011                    chain: JoinNode3 {
2012                        chain: self.chain,
2013                        arm0: a0.chain,
2014                        arm1: a1.chain,
2015                        arm2: a2.chain,
2016                        _marker: PhantomData,
2017                    },
2018                    _marker: PhantomData,
2019                }
2020            }
2021        }
2022
2023        // =============================================================
2024        // Merge arity 4
2025        // =============================================================
2026
2027        #[allow(clippy::many_single_char_names)]
2028        impl<
2029            $U,
2030            ForkOut: 'static,
2031            Chain,
2032            A0: 'static,
2033            C0,
2034            A1: 'static,
2035            C1,
2036            A2: 'static,
2037            C2,
2038            A3: 'static,
2039            C3,
2040        >
2041            $Fork<
2042                $U,
2043                ForkOut,
2044                Chain,
2045                (
2046                    DagArm<ForkOut, A0, C0>,
2047                    DagArm<ForkOut, A1, C1>,
2048                    DagArm<ForkOut, A2, C2>,
2049                    DagArm<ForkOut, A3, C3>,
2050                ),
2051            >
2052        {
2053            /// Merge four arms with a merge step.
2054            pub fn merge<MOut, Params, S>(
2055                self,
2056                f: S,
2057                registry: &Registry,
2058            ) -> $Output<
2059                $U,
2060                MOut,
2061                MergeNode4<Chain, C0, C1, C2, C3, S::Step, ForkOut, A0, A1, A2, A3, MOut>,
2062            >
2063            where
2064                MOut: 'static,
2065                S: IntoMergeStep<
2066                    (&'static A0, &'static A1, &'static A2, &'static A3),
2067                    MOut,
2068                    Params,
2069                >,
2070                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2, &'x A3), MOut>,
2071            {
2072                let (a0, a1, a2, a3) = self.arms;
2073                $Output {
2074                    chain: MergeNode4 {
2075                        chain: self.chain,
2076                        arm0: a0.chain,
2077                        arm1: a1.chain,
2078                        arm2: a2.chain,
2079                        arm3: a3.chain,
2080                        merge: f.into_merge_step(registry),
2081                        _marker: PhantomData,
2082                    },
2083                    _marker: PhantomData,
2084                }
2085            }
2086        }
2087
2088        impl<$U, ForkOut: 'static, Chain, C0, C1, C2, C3>
2089            $Fork<
2090                $U,
2091                ForkOut,
2092                Chain,
2093                (
2094                    DagArm<ForkOut, (), C0>,
2095                    DagArm<ForkOut, (), C1>,
2096                    DagArm<ForkOut, (), C2>,
2097                    DagArm<ForkOut, (), C3>,
2098                ),
2099            >
2100        {
2101            /// Join four sink arms (all producing `()`).
2102            pub fn join(
2103                self,
2104            ) -> $Output<$U, (), JoinNode4<Chain, C0, C1, C2, C3, ForkOut>> {
2105                let (a0, a1, a2, a3) = self.arms;
2106                $Output {
2107                    chain: JoinNode4 {
2108                        chain: self.chain,
2109                        arm0: a0.chain,
2110                        arm1: a1.chain,
2111                        arm2: a2.chain,
2112                        arm3: a3.chain,
2113                        _marker: PhantomData,
2114                    },
2115                    _marker: PhantomData,
2116                }
2117            }
2118        }
2119    };
2120}
2121
2122impl_dag_fork!(fork: DagChainFork, output: DagChain, upstream: E);
2123impl_dag_fork!(fork: DagArmFork, output: DagArm, upstream: In);
2124
2125// =============================================================================
2126// build_batch — when Out: PipelineOutput (() or Option<()>)
2127// =============================================================================
2128
2129impl<E, Out: crate::PipelineOutput, Chain: ChainCall<E, Out = Out>> DagChain<E, Out, Chain> {
2130    /// Finalize into a [`BatchDag`] with a pre-allocated input buffer.
2131    ///
2132    /// Same DAG chain as [`build`](DagChain::build), but the DAG owns an
2133    /// input buffer that drivers fill between dispatch cycles. Each call
2134    /// to [`BatchDag::run`] drains the buffer, running every item through
2135    /// the chain independently.
2136    ///
2137    /// Available when the DAG ends with `()` or `Option<()>` (e.g.
2138    /// after `.guard()` or `.filter()` followed by `.unwrap_or(())`).
2139    ///
2140    /// `capacity` is the initial allocation — the buffer can grow if needed,
2141    /// but sizing it for the expected batch size avoids reallocation.
2142    #[must_use = "building a DAG without storing it does nothing"]
2143    pub fn build_batch(self, capacity: usize) -> BatchDag<E, Chain> {
2144        BatchDag {
2145            input: Vec::with_capacity(capacity),
2146            chain: self.chain,
2147        }
2148    }
2149}
2150
2151// =============================================================================
2152// BatchDag<E, F> — DAG with owned input buffer
2153// =============================================================================
2154
2155/// Batch DAG that owns a pre-allocated input buffer.
2156///
2157/// Created by [`DagChain::build_batch`]. Each item flows through the
2158/// full DAG chain independently — the same per-item `Option` and
2159/// `Result` flow control as [`Dag`]. Errors are handled inline (via
2160/// `.catch()`, `.unwrap_or()`, etc.) and the batch continues to the
2161/// next item.
2162///
2163/// Unlike [`Dag`], `BatchDag` does not implement [`Handler`] — it is
2164/// driven directly by the owner via [`run()`](BatchDag::run).
2165///
2166/// # Examples
2167///
2168/// ```
2169/// use nexus_rt::{WorldBuilder, ResMut, Resource};
2170/// use nexus_rt::dag::DagBuilder;
2171///
2172/// #[derive(Resource)]
2173/// struct Accum(u64);
2174///
2175/// let mut wb = WorldBuilder::new();
2176/// wb.register(Accum(0));
2177/// let mut world = wb.build();
2178/// let reg = world.registry();
2179///
2180/// fn double(x: u32) -> u64 { x as u64 * 2 }
2181/// fn store(mut out: ResMut<Accum>, val: &u64) { out.0 += *val; }
2182///
2183/// let mut batch = DagBuilder::<u32>::new()
2184///     .root(double, reg)
2185///     .then(store, reg)
2186///     .build_batch(8);
2187///
2188/// batch.input_mut().extend([1, 2, 3]);
2189/// batch.run(&mut world);
2190///
2191/// assert_eq!(world.resource::<Accum>().0, 12); // 2 + 4 + 6
2192/// assert!(batch.input().is_empty());
2193/// ```
2194pub struct BatchDag<E, F> {
2195    input: Vec<E>,
2196    chain: F,
2197}
2198
2199impl<E, Out: crate::PipelineOutput, F: ChainCall<E, Out = Out>> BatchDag<E, F> {
2200    /// Mutable access to the input buffer. Drivers fill this between
2201    /// dispatch cycles.
2202    pub fn input_mut(&mut self) -> &mut Vec<E> {
2203        &mut self.input
2204    }
2205
2206    /// Read-only access to the input buffer.
2207    pub fn input(&self) -> &[E] {
2208        &self.input
2209    }
2210
2211    /// Drain the input buffer, running each item through the DAG.
2212    ///
2213    /// Each item gets independent `Option`/`Result` flow control — an
2214    /// error on one item does not affect subsequent items. After `run()`,
2215    /// the input buffer is empty but retains its allocation.
2216    pub fn run(&mut self, world: &mut World) {
2217        for item in self.input.drain(..) {
2218            let _ = self.chain.call(world, item);
2219        }
2220    }
2221}
2222
2223// =============================================================================
2224// resolve_arm — pre-resolve a step for manual dispatch
2225// =============================================================================
2226
2227/// Resolve a step for use in manual dispatch (e.g. inside an
2228/// opaque `.then()` closure).
2229///
2230/// Returns a closure with pre-resolved [`Param`](crate::Param) state —
2231/// the same build-time resolution that `.then()` performs, but as a
2232/// standalone value the caller can invoke from any context.
2233///
2234/// # Examples
2235///
2236/// ```ignore
2237/// let mut arm0 = resolve_arm(handle_new, reg);
2238/// let mut arm1 = resolve_arm(handle_cancel, reg);
2239///
2240/// dag.then(move |world: &mut World, msg: &Decoded| match msg.kind {
2241///     MsgKind::NewOrder => arm0(world, msg),
2242///     MsgKind::Cancel   => arm1(world, msg),
2243/// }, reg)
2244/// ```
2245pub fn resolve_arm<In, Out, Params, S>(
2246    f: S,
2247    registry: &Registry,
2248) -> impl FnMut(&mut World, &In) -> Out + use<In, Out, Params, S>
2249where
2250    In: 'static,
2251    Out: 'static,
2252    S: IntoStep<&'static In, Out, Params>,
2253    S::Step: for<'a> StepCall<&'a In, Out = Out>,
2254{
2255    let mut resolved = f.into_step(registry);
2256    move |world: &mut World, input: &In| resolved.call(world, input)
2257}
2258
2259// =============================================================================
2260// Tests
2261// =============================================================================
2262
2263#[cfg(test)]
2264#[allow(
2265    clippy::ref_option,
2266    clippy::unnecessary_wraps,
2267    clippy::needless_pass_by_value,
2268    clippy::trivially_copy_pass_by_ref,
2269    clippy::ptr_arg
2270)]
2271mod tests {
2272    use super::*;
2273    use crate::{IntoHandler, Res, ResMut, Virtual, WorldBuilder};
2274
2275    // -- Linear chains --
2276
2277    #[test]
2278    fn dag_linear_2() {
2279        let mut wb = WorldBuilder::new();
2280        wb.register::<u64>(0);
2281        let mut world = wb.build();
2282        let reg = world.registry();
2283
2284        fn root_mul2(x: u32) -> u64 {
2285            x as u64 * 2
2286        }
2287        fn store(mut out: ResMut<u64>, val: &u64) {
2288            *out = *val;
2289        }
2290
2291        let mut dag = DagBuilder::<u32>::new()
2292            .root(root_mul2, reg)
2293            .then(store, reg)
2294            .build();
2295
2296        dag.run(&mut world, 5u32);
2297        assert_eq!(*world.resource::<u64>(), 10);
2298    }
2299
2300    #[test]
2301    fn dag_linear_3() {
2302        let mut wb = WorldBuilder::new();
2303        wb.register::<u64>(0);
2304        let mut world = wb.build();
2305        let reg = world.registry();
2306
2307        fn root_mul2(x: u32) -> u64 {
2308            x as u64 * 2
2309        }
2310        fn add_one(val: &u64) -> u64 {
2311            *val + 1
2312        }
2313        fn store(mut out: ResMut<u64>, val: &u64) {
2314            *out = *val;
2315        }
2316
2317        let mut dag = DagBuilder::<u32>::new()
2318            .root(root_mul2, reg)
2319            .then(add_one, reg)
2320            .then(store, reg)
2321            .build();
2322
2323        dag.run(&mut world, 5u32);
2324        assert_eq!(*world.resource::<u64>(), 11); // (5*2)+1
2325    }
2326
2327    #[test]
2328    fn dag_linear_5() {
2329        let mut wb = WorldBuilder::new();
2330        wb.register::<u64>(0);
2331        let mut world = wb.build();
2332        let reg = world.registry();
2333
2334        fn root_id(x: u32) -> u64 {
2335            x as u64
2336        }
2337        fn add_one(val: &u64) -> u64 {
2338            *val + 1
2339        }
2340        fn store(mut out: ResMut<u64>, val: &u64) {
2341            *out = *val;
2342        }
2343
2344        let mut dag = DagBuilder::<u32>::new()
2345            .root(root_id, reg)
2346            .then(add_one, reg)
2347            .then(add_one, reg)
2348            .then(add_one, reg)
2349            .then(store, reg)
2350            .build();
2351
2352        dag.run(&mut world, 0u32);
2353        assert_eq!(*world.resource::<u64>(), 3); // 0+1+1+1
2354    }
2355
2356    // -- Diamond: root → [a, b] → merge → sink --
2357
2358    #[test]
2359    fn dag_diamond() {
2360        let mut wb = WorldBuilder::new();
2361        wb.register::<u64>(0);
2362        let mut world = wb.build();
2363        let reg = world.registry();
2364
2365        fn root_mul2(x: u32) -> u32 {
2366            x.wrapping_mul(2)
2367        }
2368        fn add_one(val: &u32) -> u32 {
2369            val.wrapping_add(1)
2370        }
2371        fn mul3(val: &u32) -> u32 {
2372            val.wrapping_mul(3)
2373        }
2374        fn merge_add(a: &u32, b: &u32) -> u32 {
2375            a.wrapping_add(*b)
2376        }
2377        fn store(mut out: ResMut<u64>, val: &u32) {
2378            *out = *val as u64;
2379        }
2380
2381        let mut dag = DagBuilder::<u32>::new()
2382            .root(root_mul2, reg)
2383            .fork()
2384            .arm(|a| a.then(add_one, reg))
2385            .arm(|b| b.then(mul3, reg))
2386            .merge(merge_add, reg)
2387            .then(store, reg)
2388            .build();
2389
2390        dag.run(&mut world, 5u32);
2391        // root: 10, arm_a: 11, arm_b: 30, merge: 41
2392        assert_eq!(*world.resource::<u64>(), 41);
2393    }
2394
2395    // -- Fan-out to sinks (.join()) --
2396
2397    #[test]
2398    fn dag_fan_out_join() {
2399        let mut wb = WorldBuilder::new();
2400        wb.register::<u64>(0);
2401        wb.register::<i64>(0);
2402        let mut world = wb.build();
2403        let reg = world.registry();
2404
2405        fn root_id(x: u32) -> u64 {
2406            x as u64
2407        }
2408        fn sink_u64(mut out: ResMut<u64>, val: &u64) {
2409            *out = *val * 2;
2410        }
2411        fn sink_i64(mut out: ResMut<i64>, val: &u64) {
2412            *out = *val as i64 * 3;
2413        }
2414
2415        let mut dag = DagBuilder::<u32>::new()
2416            .root(root_id, reg)
2417            .fork()
2418            .arm(|a| a.then(sink_u64, reg))
2419            .arm(|b| b.then(sink_i64, reg))
2420            .join()
2421            .build();
2422
2423        dag.run(&mut world, 5u32);
2424        assert_eq!(*world.resource::<u64>(), 10);
2425        assert_eq!(*world.resource::<i64>(), 15);
2426    }
2427
2428    // -- Nested fork within an arm --
2429
2430    #[test]
2431    fn dag_nested_fork() {
2432        let mut wb = WorldBuilder::new();
2433        wb.register::<u64>(0);
2434        let mut world = wb.build();
2435        let reg = world.registry();
2436
2437        fn root_id(x: u32) -> u32 {
2438            x
2439        }
2440        fn add_10(val: &u32) -> u32 {
2441            val.wrapping_add(10)
2442        }
2443        fn mul2(val: &u32) -> u32 {
2444            val.wrapping_mul(2)
2445        }
2446        fn mul3(val: &u32) -> u32 {
2447            val.wrapping_mul(3)
2448        }
2449        fn inner_merge(a: &u32, b: &u32) -> u32 {
2450            a.wrapping_add(*b)
2451        }
2452        fn outer_merge(a: &u32, b: &u32) -> u32 {
2453            a.wrapping_add(*b)
2454        }
2455        fn store(mut out: ResMut<u64>, val: &u32) {
2456            *out = *val as u64;
2457        }
2458
2459        // root(5)=5 → fork
2460        //   arm_a: add_10(5)=15 → fork
2461        //     sub_c: mul2(15)=30
2462        //     sub_d: mul3(15)=45
2463        //     inner_merge(30,45)=75
2464        //   arm_b: mul3(5)=15
2465        // outer_merge(75,15)=90
2466        let mut dag = DagBuilder::<u32>::new()
2467            .root(root_id, reg)
2468            .fork()
2469            .arm(|a| {
2470                a.then(add_10, reg)
2471                    .fork()
2472                    .arm(|c| c.then(mul2, reg))
2473                    .arm(|d| d.then(mul3, reg))
2474                    .merge(inner_merge, reg)
2475            })
2476            .arm(|b| b.then(mul3, reg))
2477            .merge(outer_merge, reg)
2478            .then(store, reg)
2479            .build();
2480
2481        dag.run(&mut world, 5u32);
2482        assert_eq!(*world.resource::<u64>(), 90);
2483    }
2484
2485    // -- Complex topology: asymmetric arm lengths --
2486
2487    #[test]
2488    fn dag_complex_topology() {
2489        let mut wb = WorldBuilder::new();
2490        wb.register::<u64>(0);
2491        let mut world = wb.build();
2492        let reg = world.registry();
2493
2494        fn root_mul2(x: u32) -> u32 {
2495            x.wrapping_mul(2)
2496        }
2497        fn add_one(val: &u32) -> u32 {
2498            val.wrapping_add(1)
2499        }
2500        fn add_then_mul2(val: &u32) -> u32 {
2501            val.wrapping_add(1).wrapping_mul(2)
2502        }
2503        fn mul3(val: &u32) -> u32 {
2504            val.wrapping_mul(3)
2505        }
2506        fn merge_add(a: &u32, b: &u32) -> u32 {
2507            a.wrapping_add(*b)
2508        }
2509        fn store(mut out: ResMut<u64>, val: &u32) {
2510            *out = *val as u64;
2511        }
2512
2513        // root(5)=10 → fork
2514        //   a: add_one(10)=11 → add_then_mul2(11)=24
2515        //   b: mul3(10)=30
2516        // merge(24, 30) = 54
2517        let mut dag = DagBuilder::<u32>::new()
2518            .root(root_mul2, reg)
2519            .fork()
2520            .arm(|a| a.then(add_one, reg).then(add_then_mul2, reg))
2521            .arm(|b| b.then(mul3, reg))
2522            .merge(merge_add, reg)
2523            .then(store, reg)
2524            .build();
2525
2526        dag.run(&mut world, 5u32);
2527        assert_eq!(*world.resource::<u64>(), 54);
2528    }
2529
2530    // -- Boxable into Box<dyn Handler<E>> --
2531
2532    #[test]
2533    fn dag_boxable() {
2534        let mut wb = WorldBuilder::new();
2535        wb.register::<u64>(0);
2536        let mut world = wb.build();
2537        let reg = world.registry();
2538
2539        fn root_id(x: u32) -> u64 {
2540            x as u64
2541        }
2542        fn store(mut out: ResMut<u64>, val: &u64) {
2543            *out = *val;
2544        }
2545
2546        let mut boxed: Virtual<u32> = Box::new(
2547            DagBuilder::<u32>::new()
2548                .root(root_id, reg)
2549                .then(store, reg)
2550                .build(),
2551        );
2552        boxed.run(&mut world, 77u32);
2553        assert_eq!(*world.resource::<u64>(), 77);
2554    }
2555
2556    // -- World access (Res<T>, ResMut<T>) in nodes --
2557
2558    #[test]
2559    fn dag_world_access() {
2560        let mut wb = WorldBuilder::new();
2561        wb.register::<u64>(10); // factor
2562        wb.register::<String>(String::new());
2563        let mut world = wb.build();
2564        let reg = world.registry();
2565
2566        fn scale(factor: Res<u64>, val: &u32) -> u64 {
2567            *factor * (*val as u64)
2568        }
2569        fn store(mut out: ResMut<String>, val: &u64) {
2570            *out = val.to_string();
2571        }
2572
2573        let mut dag = DagBuilder::<u32>::new()
2574            .root(|x: u32| x, reg)
2575            .then(scale, reg)
2576            .then(store, reg)
2577            .build();
2578
2579        dag.run(&mut world, 7u32);
2580        assert_eq!(world.resource::<String>().as_str(), "70");
2581    }
2582
2583    // -- Root-only (terminal root outputting ()) --
2584
2585    #[test]
2586    fn dag_root_only() {
2587        let mut wb = WorldBuilder::new();
2588        wb.register::<u64>(0);
2589        let mut world = wb.build();
2590        let reg = world.registry();
2591
2592        let mut dag = DagBuilder::<u32>::new()
2593            .root(
2594                |mut out: ResMut<u64>, x: u32| {
2595                    *out = x as u64;
2596                },
2597                reg,
2598            )
2599            .build();
2600
2601        dag.run(&mut world, 42u32);
2602        assert_eq!(*world.resource::<u64>(), 42);
2603    }
2604
2605    // -- Multiple dispatches reuse state --
2606
2607    #[test]
2608    fn dag_multiple_dispatches() {
2609        let mut wb = WorldBuilder::new();
2610        wb.register::<u64>(0);
2611        let mut world = wb.build();
2612        let reg = world.registry();
2613
2614        fn root_id(x: u32) -> u64 {
2615            x as u64
2616        }
2617        fn store(mut out: ResMut<u64>, val: &u64) {
2618            *out = *val;
2619        }
2620
2621        let mut dag = DagBuilder::<u32>::new()
2622            .root(root_id, reg)
2623            .then(store, reg)
2624            .build();
2625
2626        dag.run(&mut world, 1u32);
2627        assert_eq!(*world.resource::<u64>(), 1);
2628        dag.run(&mut world, 2u32);
2629        assert_eq!(*world.resource::<u64>(), 2);
2630        dag.run(&mut world, 3u32);
2631        assert_eq!(*world.resource::<u64>(), 3);
2632    }
2633
2634    // -- 3-way merge --
2635
2636    #[test]
2637    fn dag_3way_merge() {
2638        let mut wb = WorldBuilder::new();
2639        wb.register::<String>(String::new());
2640        let mut world = wb.build();
2641        let reg = world.registry();
2642
2643        fn root_id(x: u32) -> u64 {
2644            x as u64
2645        }
2646        fn mul1(val: &u64) -> u64 {
2647            *val
2648        }
2649        fn mul2(val: &u64) -> u64 {
2650            *val * 2
2651        }
2652        fn mul3(val: &u64) -> u64 {
2653            *val * 3
2654        }
2655        fn merge3_fmt(mut out: ResMut<String>, a: &u64, b: &u64, c: &u64) {
2656            *out = format!("{},{},{}", a, b, c);
2657        }
2658
2659        let mut dag = DagBuilder::<u32>::new()
2660            .root(root_id, reg)
2661            .fork()
2662            .arm(|a| a.then(mul1, reg))
2663            .arm(|b| b.then(mul2, reg))
2664            .arm(|c| c.then(mul3, reg))
2665            .merge(merge3_fmt, reg)
2666            .build();
2667
2668        dag.run(&mut world, 10u32);
2669        assert_eq!(world.resource::<String>().as_str(), "10,20,30");
2670    }
2671
2672    // -- DAG combinators --
2673
2674    #[test]
2675    fn dag_dispatch() {
2676        fn root(x: u32) -> u64 {
2677            x as u64 + 42
2678        }
2679        fn sink(mut out: ResMut<u64>, event: u64) {
2680            *out = event;
2681        }
2682        let mut wb = WorldBuilder::new();
2683        wb.register::<u64>(0);
2684        let mut world = wb.build();
2685        let reg = world.registry();
2686
2687        let mut dag = DagBuilder::<u32>::new()
2688            .root(root, reg)
2689            .dispatch(sink.into_handler(reg))
2690            .build();
2691
2692        dag.run(&mut world, 0u32);
2693        assert_eq!(*world.resource::<u64>(), 42);
2694    }
2695
2696    #[test]
2697    fn dag_option_map() {
2698        fn root(_x: u32) -> Option<u64> {
2699            Some(10)
2700        }
2701        fn double(val: &u64) -> u64 {
2702            *val * 2
2703        }
2704        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2705            *out = val.unwrap_or(0);
2706        }
2707        let mut wb = WorldBuilder::new();
2708        wb.register::<u64>(0);
2709        let mut world = wb.build();
2710        let reg = world.registry();
2711
2712        let mut dag = DagBuilder::<u32>::new()
2713            .root(root, reg)
2714            .map(double, reg)
2715            .then(sink, reg)
2716            .build();
2717
2718        dag.run(&mut world, 0u32);
2719        assert_eq!(*world.resource::<u64>(), 20);
2720    }
2721
2722    #[test]
2723    fn dag_option_map_none() {
2724        fn root(_x: u32) -> Option<u64> {
2725            None
2726        }
2727        fn double(val: &u64) -> u64 {
2728            *val * 2
2729        }
2730        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2731            *out = val.unwrap_or(999);
2732        }
2733        let mut wb = WorldBuilder::new();
2734        wb.register::<u64>(0);
2735        let mut world = wb.build();
2736        let reg = world.registry();
2737
2738        let mut dag = DagBuilder::<u32>::new()
2739            .root(root, reg)
2740            .map(double, reg)
2741            .then(sink, reg)
2742            .build();
2743
2744        dag.run(&mut world, 0u32);
2745        assert_eq!(*world.resource::<u64>(), 999);
2746    }
2747
2748    #[test]
2749    fn dag_option_and_then() {
2750        fn root(_x: u32) -> Option<u64> {
2751            Some(5)
2752        }
2753        fn check(val: &u64) -> Option<u64> {
2754            if *val > 3 { Some(*val * 10) } else { None }
2755        }
2756        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2757            *out = val.unwrap_or(0);
2758        }
2759        let mut wb = WorldBuilder::new();
2760        wb.register::<u64>(0);
2761        let mut world = wb.build();
2762        let reg = world.registry();
2763
2764        let mut dag = DagBuilder::<u32>::new()
2765            .root(root, reg)
2766            .and_then(check, reg)
2767            .then(sink, reg)
2768            .build();
2769
2770        dag.run(&mut world, 0u32);
2771        assert_eq!(*world.resource::<u64>(), 50);
2772    }
2773
2774    #[test]
2775    fn dag_option_filter_keeps() {
2776        fn root(_x: u32) -> Option<u64> {
2777            Some(5)
2778        }
2779        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2780            *out = val.unwrap_or(0);
2781        }
2782        let mut wb = WorldBuilder::new();
2783        wb.register::<u64>(0);
2784        let mut world = wb.build();
2785
2786        let mut dag = DagBuilder::<u32>::new()
2787            .root(root, world.registry())
2788            .filter(|v: &u64| *v > 3, world.registry())
2789            .then(sink, world.registry())
2790            .build();
2791
2792        dag.run(&mut world, 0u32);
2793        assert_eq!(*world.resource::<u64>(), 5);
2794    }
2795
2796    #[test]
2797    fn dag_option_filter_drops() {
2798        fn root(_x: u32) -> Option<u64> {
2799            Some(5)
2800        }
2801        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2802            *out = val.unwrap_or(0);
2803        }
2804        let mut wb = WorldBuilder::new();
2805        wb.register::<u64>(0);
2806        let mut world = wb.build();
2807
2808        let mut dag = DagBuilder::<u32>::new()
2809            .root(root, world.registry())
2810            .filter(|v: &u64| *v > 10, world.registry())
2811            .then(sink, world.registry())
2812            .build();
2813
2814        dag.run(&mut world, 0u32);
2815        assert_eq!(*world.resource::<u64>(), 0);
2816    }
2817
2818    #[test]
2819    fn dag_option_on_none() {
2820        fn root(_x: u32) -> Option<u64> {
2821            None
2822        }
2823        fn sink(_val: &Option<u64>) {}
2824        let mut wb = WorldBuilder::new();
2825        wb.register::<bool>(false);
2826        let mut world = wb.build();
2827        let reg = world.registry();
2828
2829        let mut dag = DagBuilder::<u32>::new()
2830            .root(root, reg)
2831            .on_none(
2832                |w: &mut World| {
2833                    *w.resource_mut::<bool>() = true;
2834                },
2835                reg,
2836            )
2837            .then(sink, reg)
2838            .build();
2839
2840        dag.run(&mut world, 0u32);
2841        assert!(*world.resource::<bool>());
2842    }
2843
2844    #[test]
2845    fn dag_option_unwrap_or() {
2846        fn root(_x: u32) -> Option<u64> {
2847            None
2848        }
2849        fn sink(mut out: ResMut<u64>, val: &u64) {
2850            *out = *val;
2851        }
2852        let mut wb = WorldBuilder::new();
2853        wb.register::<u64>(0);
2854        let mut world = wb.build();
2855        let reg = world.registry();
2856
2857        let mut dag = DagBuilder::<u32>::new()
2858            .root(root, reg)
2859            .unwrap_or(42u64)
2860            .then(sink, reg)
2861            .build();
2862
2863        dag.run(&mut world, 0u32);
2864        assert_eq!(*world.resource::<u64>(), 42);
2865    }
2866
2867    #[test]
2868    fn dag_option_ok_or() {
2869        fn root(_x: u32) -> Option<u64> {
2870            None
2871        }
2872        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2873            *out = val.as_ref().map_or(999, |v| *v);
2874        }
2875        let mut wb = WorldBuilder::new();
2876        wb.register::<u64>(0);
2877        let mut world = wb.build();
2878        let reg = world.registry();
2879
2880        let mut dag = DagBuilder::<u32>::new()
2881            .root(root, reg)
2882            .ok_or("missing")
2883            .then(sink, reg)
2884            .build();
2885
2886        dag.run(&mut world, 0u32);
2887        assert_eq!(*world.resource::<u64>(), 999);
2888    }
2889
2890    #[test]
2891    fn dag_result_map() {
2892        fn root(_x: u32) -> Result<u64, &'static str> {
2893            Ok(10)
2894        }
2895        fn double(val: &u64) -> u64 {
2896            *val * 2
2897        }
2898        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2899            *out = val.as_ref().copied().unwrap_or(0);
2900        }
2901        let mut wb = WorldBuilder::new();
2902        wb.register::<u64>(0);
2903        let mut world = wb.build();
2904        let reg = world.registry();
2905
2906        let mut dag = DagBuilder::<u32>::new()
2907            .root(root, reg)
2908            .map(double, reg)
2909            .then(sink, reg)
2910            .build();
2911
2912        dag.run(&mut world, 0u32);
2913        assert_eq!(*world.resource::<u64>(), 20);
2914    }
2915
2916    #[test]
2917    fn dag_result_and_then() {
2918        fn root(_x: u32) -> Result<u64, &'static str> {
2919            Ok(5)
2920        }
2921        fn check(val: &u64) -> Result<u64, &'static str> {
2922            if *val > 3 {
2923                Ok(*val * 10)
2924            } else {
2925                Err("too small")
2926            }
2927        }
2928        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2929            *out = val.as_ref().copied().unwrap_or(0);
2930        }
2931        let mut wb = WorldBuilder::new();
2932        wb.register::<u64>(0);
2933        let mut world = wb.build();
2934        let reg = world.registry();
2935
2936        let mut dag = DagBuilder::<u32>::new()
2937            .root(root, reg)
2938            .and_then(check, reg)
2939            .then(sink, reg)
2940            .build();
2941
2942        dag.run(&mut world, 0u32);
2943        assert_eq!(*world.resource::<u64>(), 50);
2944    }
2945
2946    #[test]
2947    fn dag_result_catch() {
2948        fn root(_x: u32) -> Result<u64, String> {
2949            Err("oops".into())
2950        }
2951        fn handle_err(mut log: ResMut<String>, err: &String) {
2952            *log = err.clone();
2953        }
2954        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2955            *out = val.unwrap_or(0);
2956        }
2957        let mut wb = WorldBuilder::new();
2958        wb.register::<u64>(0);
2959        wb.register::<String>(String::new());
2960        let mut world = wb.build();
2961        let reg = world.registry();
2962
2963        let mut dag = DagBuilder::<u32>::new()
2964            .root(root, reg)
2965            .catch(handle_err, reg)
2966            .then(sink, reg)
2967            .build();
2968
2969        dag.run(&mut world, 0u32);
2970        assert_eq!(*world.resource::<u64>(), 0);
2971        assert_eq!(world.resource::<String>().as_str(), "oops");
2972    }
2973
2974    #[test]
2975    fn dag_result_ok() {
2976        fn root(_x: u32) -> Result<u64, &'static str> {
2977            Err("fail")
2978        }
2979        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2980            *out = val.unwrap_or(0);
2981        }
2982        let mut wb = WorldBuilder::new();
2983        wb.register::<u64>(0);
2984        let mut world = wb.build();
2985        let reg = world.registry();
2986
2987        let mut dag = DagBuilder::<u32>::new()
2988            .root(root, reg)
2989            .ok()
2990            .then(sink, reg)
2991            .build();
2992
2993        dag.run(&mut world, 0u32);
2994        assert_eq!(*world.resource::<u64>(), 0);
2995    }
2996
2997    #[test]
2998    fn dag_result_unwrap_or_else() {
2999        fn root(_x: u32) -> Result<u64, &'static str> {
3000            Err("fail")
3001        }
3002        fn sink(mut out: ResMut<u64>, val: &u64) {
3003            *out = *val;
3004        }
3005        let mut wb = WorldBuilder::new();
3006        wb.register::<u64>(0);
3007        let mut world = wb.build();
3008        let reg = world.registry();
3009
3010        let mut dag = DagBuilder::<u32>::new()
3011            .root(root, reg)
3012            .unwrap_or_else(|_err: &str| 42u64, reg)
3013            .then(sink, reg)
3014            .build();
3015
3016        dag.run(&mut world, 0u32);
3017        assert_eq!(*world.resource::<u64>(), 42);
3018    }
3019
3020    #[test]
3021    fn dag_result_map_err() {
3022        fn root(_x: u32) -> Result<u64, u32> {
3023            Err(5)
3024        }
3025        fn sink(mut out: ResMut<u64>, val: &Result<u64, String>) {
3026            *out = match val {
3027                Ok(v) => *v,
3028                Err(e) => e.len() as u64,
3029            };
3030        }
3031        let mut wb = WorldBuilder::new();
3032        wb.register::<u64>(0);
3033        let mut world = wb.build();
3034        let reg = world.registry();
3035
3036        let mut dag = DagBuilder::<u32>::new()
3037            .root(root, reg)
3038            .map_err(|e: u32| format!("err:{e}"), reg)
3039            .then(sink, reg)
3040            .build();
3041
3042        dag.run(&mut world, 0u32);
3043        // "err:5".len() == 5
3044        assert_eq!(*world.resource::<u64>(), 5);
3045    }
3046
3047    #[test]
3048    fn dag_arm_combinators() {
3049        fn root(x: u32) -> u64 {
3050            x as u64 + 10
3051        }
3052        fn arm_step(val: &u64) -> Option<u64> {
3053            if *val > 5 { Some(*val * 3) } else { None }
3054        }
3055        fn double(val: &u64) -> u64 {
3056            *val * 2
3057        }
3058        fn merge_fn(a: &u64, b: &u64) -> String {
3059            format!("{a},{b}")
3060        }
3061        fn sink(mut out: ResMut<String>, val: &String) {
3062            *out = val.clone();
3063        }
3064        let mut wb = WorldBuilder::new();
3065        wb.register::<String>(String::new());
3066        let mut world = wb.build();
3067        let reg = world.registry();
3068
3069        // Arm 0: root → arm_step (Option) → unwrap_or(0)
3070        // Arm 1: root → double
3071        let mut dag = DagBuilder::<u32>::new()
3072            .root(root, reg)
3073            .fork()
3074            .arm(|a| a.then(arm_step, reg).unwrap_or(0u64))
3075            .arm(|b| b.then(double, reg))
3076            .merge(merge_fn, reg)
3077            .then(sink, reg)
3078            .build();
3079
3080        dag.run(&mut world, 0u32);
3081        // root(0) = 10
3082        // arm0: 10 > 5 → Some(30) → unwrap → 30
3083        // arm1: 10 * 2 = 20
3084        assert_eq!(world.resource::<String>().as_str(), "30,20");
3085    }
3086
3087    #[test]
3088    fn dag_option_inspect() {
3089        fn root(_x: u32) -> Option<u64> {
3090            Some(42)
3091        }
3092        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3093            *out = val.unwrap_or(0);
3094        }
3095        let mut wb = WorldBuilder::new();
3096        wb.register::<u64>(0);
3097        wb.register::<bool>(false);
3098        let mut world = wb.build();
3099        let reg = world.registry();
3100
3101        let mut dag = DagBuilder::<u32>::new()
3102            .root(root, reg)
3103            .inspect(
3104                |w: &mut World, _val: &u64| {
3105                    *w.resource_mut::<bool>() = true;
3106                },
3107                reg,
3108            )
3109            .then(sink, reg)
3110            .build();
3111
3112        dag.run(&mut world, 0u32);
3113        assert_eq!(*world.resource::<u64>(), 42);
3114        assert!(*world.resource::<bool>());
3115    }
3116
3117    // -- Guard combinator --
3118
3119    #[test]
3120    fn dag_guard_keeps() {
3121        fn root(x: u32) -> u64 {
3122            x as u64
3123        }
3124        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3125            *out = val.unwrap_or(0);
3126        }
3127        let mut wb = WorldBuilder::new();
3128        wb.register::<u64>(0);
3129        let mut world = wb.build();
3130        let reg = world.registry();
3131
3132        let mut dag = DagBuilder::<u32>::new()
3133            .root(root, reg)
3134            .guard(|v: &u64| *v > 3, reg)
3135            .then(sink, reg)
3136            .build();
3137
3138        dag.run(&mut world, 5u32);
3139        assert_eq!(*world.resource::<u64>(), 5);
3140    }
3141
3142    #[test]
3143    fn dag_guard_drops() {
3144        fn root(x: u32) -> u64 {
3145            x as u64
3146        }
3147        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3148            *out = val.unwrap_or(999);
3149        }
3150        let mut wb = WorldBuilder::new();
3151        wb.register::<u64>(0);
3152        let mut world = wb.build();
3153        let reg = world.registry();
3154
3155        let mut dag = DagBuilder::<u32>::new()
3156            .root(root, reg)
3157            .guard(|v: &u64| *v > 10, reg)
3158            .then(sink, reg)
3159            .build();
3160
3161        dag.run(&mut world, 5u32);
3162        assert_eq!(*world.resource::<u64>(), 999);
3163    }
3164
3165    #[test]
3166    fn dag_arm_guard() {
3167        fn root(x: u32) -> u64 {
3168            x as u64
3169        }
3170        fn double(val: &u64) -> u64 {
3171            *val * 2
3172        }
3173        fn merge_fn(a: &Option<u64>, b: &u64) -> String {
3174            format!("{:?},{}", a, b)
3175        }
3176        fn sink(mut out: ResMut<String>, val: &String) {
3177            *out = val.clone();
3178        }
3179        let mut wb = WorldBuilder::new();
3180        wb.register::<String>(String::new());
3181        let mut world = wb.build();
3182        let reg = world.registry();
3183
3184        // arm_a: guard drops (5 < 10), arm_b: runs normally
3185        let mut dag = DagBuilder::<u32>::new()
3186            .root(root, reg)
3187            .fork()
3188            .arm(|a| a.then(double, reg).guard(|v: &u64| *v > 100, reg))
3189            .arm(|b| b.then(double, reg))
3190            .merge(merge_fn, reg)
3191            .then(sink, reg)
3192            .build();
3193
3194        dag.run(&mut world, 5u32);
3195        // arm_a: 10, guard fails → None. arm_b: 10.
3196        assert_eq!(world.resource::<String>().as_str(), "None,10");
3197    }
3198
3199    // -- Tap combinator --
3200
3201    #[test]
3202    fn dag_tap_observes_without_changing() {
3203        fn root(x: u32) -> u64 {
3204            x as u64 * 2
3205        }
3206        fn sink(mut out: ResMut<u64>, val: &u64) {
3207            *out = *val;
3208        }
3209        let mut wb = WorldBuilder::new();
3210        wb.register::<u64>(0);
3211        wb.register::<bool>(false);
3212        let mut world = wb.build();
3213        let reg = world.registry();
3214
3215        let mut dag = DagBuilder::<u32>::new()
3216            .root(root, reg)
3217            .tap(
3218                |w: &mut World, val: &u64| {
3219                    // Side-effect: record that we observed the value.
3220                    *w.resource_mut::<bool>() = *val == 10;
3221                },
3222                reg,
3223            )
3224            .then(sink, reg)
3225            .build();
3226
3227        dag.run(&mut world, 5u32);
3228        assert_eq!(*world.resource::<u64>(), 10); // value passed through
3229        assert!(*world.resource::<bool>()); // tap fired
3230    }
3231
3232    #[test]
3233    fn dag_arm_tap() {
3234        fn root(x: u32) -> u64 {
3235            x as u64
3236        }
3237        fn double(val: &u64) -> u64 {
3238            *val * 2
3239        }
3240        fn merge_add(a: &u64, b: &u64) -> u64 {
3241            *a + *b
3242        }
3243        fn sink(mut out: ResMut<u64>, val: &u64) {
3244            *out = *val;
3245        }
3246        let mut wb = WorldBuilder::new();
3247        wb.register::<u64>(0);
3248        wb.register::<bool>(false);
3249        let mut world = wb.build();
3250        let reg = world.registry();
3251
3252        let mut dag = DagBuilder::<u32>::new()
3253            .root(root, reg)
3254            .fork()
3255            .arm(|a| {
3256                a.then(double, reg).tap(
3257                    |w: &mut World, _v: &u64| {
3258                        *w.resource_mut::<bool>() = true;
3259                    },
3260                    reg,
3261                )
3262            })
3263            .arm(|b| b.then(double, reg))
3264            .merge(merge_add, reg)
3265            .then(sink, reg)
3266            .build();
3267
3268        dag.run(&mut world, 5u32);
3269        // arm_a: 10, arm_b: 10, merge: 20
3270        assert_eq!(*world.resource::<u64>(), 20);
3271        assert!(*world.resource::<bool>()); // tap in arm_a fired
3272    }
3273
3274    // -- Route combinator --
3275
3276    #[test]
3277    fn dag_route_true_arm() {
3278        fn root(x: u32) -> u64 {
3279            x as u64
3280        }
3281        fn double(val: &u64) -> u64 {
3282            *val * 2
3283        }
3284        fn triple(val: &u64) -> u64 {
3285            *val * 3
3286        }
3287        fn sink(mut out: ResMut<u64>, val: &u64) {
3288            *out = *val;
3289        }
3290        let mut wb = WorldBuilder::new();
3291        wb.register::<u64>(0);
3292        let mut world = wb.build();
3293        let reg = world.registry();
3294
3295        let arm_t = DagArmSeed::new().then(double, reg);
3296        let arm_f = DagArmSeed::new().then(triple, reg);
3297
3298        let mut dag = DagBuilder::<u32>::new()
3299            .root(root, reg)
3300            .route(|v: &u64| *v > 3, reg, arm_t, arm_f)
3301            .then(sink, reg)
3302            .build();
3303
3304        dag.run(&mut world, 5u32); // 5 > 3 → true arm → double → 10
3305        assert_eq!(*world.resource::<u64>(), 10);
3306    }
3307
3308    #[test]
3309    fn dag_route_false_arm() {
3310        fn root(x: u32) -> u64 {
3311            x as u64
3312        }
3313        fn double(val: &u64) -> u64 {
3314            *val * 2
3315        }
3316        fn triple(val: &u64) -> u64 {
3317            *val * 3
3318        }
3319        fn sink(mut out: ResMut<u64>, val: &u64) {
3320            *out = *val;
3321        }
3322        let mut wb = WorldBuilder::new();
3323        wb.register::<u64>(0);
3324        let mut world = wb.build();
3325        let reg = world.registry();
3326
3327        let arm_t = DagArmSeed::new().then(double, reg);
3328        let arm_f = DagArmSeed::new().then(triple, reg);
3329
3330        let mut dag = DagBuilder::<u32>::new()
3331            .root(root, reg)
3332            .route(|v: &u64| *v > 10, reg, arm_t, arm_f)
3333            .then(sink, reg)
3334            .build();
3335
3336        dag.run(&mut world, 5u32); // 5 <= 10 → false arm → triple → 15
3337        assert_eq!(*world.resource::<u64>(), 15);
3338    }
3339
3340    #[test]
3341    fn dag_route_nested() {
3342        fn root(x: u32) -> u64 {
3343            x as u64
3344        }
3345        fn pass(val: &u64) -> u64 {
3346            *val
3347        }
3348        fn add_100(val: &u64) -> u64 {
3349            *val + 100
3350        }
3351        fn add_200(val: &u64) -> u64 {
3352            *val + 200
3353        }
3354        fn add_300(val: &u64) -> u64 {
3355            *val + 300
3356        }
3357        fn sink(mut out: ResMut<u64>, val: &u64) {
3358            *out = *val;
3359        }
3360        let mut wb = WorldBuilder::new();
3361        wb.register::<u64>(0);
3362        let mut world = wb.build();
3363        let reg = world.registry();
3364
3365        // N-ary via nesting: <5 → +100, 5..10 → +200, >=10 → +300
3366        let inner_t = DagArmSeed::new().then(add_200, reg);
3367        let inner_f = DagArmSeed::new().then(add_300, reg);
3368        let outer_t = DagArmSeed::new().then(add_100, reg);
3369        let outer_f =
3370            DagArmSeed::new()
3371                .then(pass, reg)
3372                .route(|v: &u64| *v < 10, reg, inner_t, inner_f);
3373
3374        let mut dag = DagBuilder::<u32>::new()
3375            .root(root, reg)
3376            .route(|v: &u64| *v < 5, reg, outer_t, outer_f)
3377            .then(sink, reg)
3378            .build();
3379
3380        dag.run(&mut world, 3u32); // 3 < 5 → +100 → 103
3381        assert_eq!(*world.resource::<u64>(), 103);
3382
3383        dag.run(&mut world, 7u32); // 7 >= 5, 7 < 10 → +200 → 207
3384        assert_eq!(*world.resource::<u64>(), 207);
3385
3386        dag.run(&mut world, 15u32); // 15 >= 5, 15 >= 10 → +300 → 315
3387        assert_eq!(*world.resource::<u64>(), 315);
3388    }
3389
3390    // -- Tee combinator --
3391
3392    #[test]
3393    fn dag_tee_side_effect_chain() {
3394        fn root(x: u32) -> u64 {
3395            x as u64 * 2
3396        }
3397        fn log_step(mut counter: ResMut<u32>, _val: &u64) {
3398            *counter += 1;
3399        }
3400        fn sink(mut out: ResMut<u64>, val: &u64) {
3401            *out = *val;
3402        }
3403        let mut wb = WorldBuilder::new();
3404        wb.register::<u64>(0);
3405        wb.register::<u32>(0);
3406        let mut world = wb.build();
3407        let reg = world.registry();
3408
3409        let side = DagArmSeed::new().then(log_step, reg);
3410
3411        let mut dag = DagBuilder::<u32>::new()
3412            .root(root, reg)
3413            .tee(side)
3414            .then(sink, reg)
3415            .build();
3416
3417        dag.run(&mut world, 5u32);
3418        assert_eq!(*world.resource::<u64>(), 10); // value passed through
3419        assert_eq!(*world.resource::<u32>(), 1); // side-effect fired
3420
3421        dag.run(&mut world, 7u32);
3422        assert_eq!(*world.resource::<u64>(), 14);
3423        assert_eq!(*world.resource::<u32>(), 2); // fired again
3424    }
3425
3426    // -- Dedup combinator --
3427
3428    #[test]
3429    fn dag_dedup_suppresses_unchanged() {
3430        fn root(x: u32) -> u64 {
3431            x as u64 / 2 // intentional integer division: 4→2, 5→2
3432        }
3433        fn sink(mut out: ResMut<u32>, val: &Option<u64>) {
3434            if val.is_some() {
3435                *out += 1;
3436            }
3437        }
3438        let mut wb = WorldBuilder::new();
3439        wb.register::<u32>(0);
3440        let mut world = wb.build();
3441        let reg = world.registry();
3442
3443        let mut dag = DagBuilder::<u32>::new()
3444            .root(root, reg)
3445            .dedup()
3446            .then(sink, reg)
3447            .build();
3448
3449        dag.run(&mut world, 4u32); // 2 — first, Some
3450        assert_eq!(*world.resource::<u32>(), 1);
3451
3452        dag.run(&mut world, 5u32); // 2 — same, None
3453        assert_eq!(*world.resource::<u32>(), 1);
3454
3455        dag.run(&mut world, 6u32); // 3 — changed, Some
3456        assert_eq!(*world.resource::<u32>(), 2);
3457    }
3458
3459    // -- Bool combinators --
3460
3461    #[test]
3462    fn dag_not() {
3463        fn root(x: u32) -> bool {
3464            x > 5
3465        }
3466        fn sink(mut out: ResMut<bool>, val: &bool) {
3467            *out = *val;
3468        }
3469        let mut wb = WorldBuilder::new();
3470        wb.register::<bool>(false);
3471        let mut world = wb.build();
3472        let reg = world.registry();
3473
3474        let mut dag = DagBuilder::<u32>::new()
3475            .root(root, reg)
3476            .not()
3477            .then(sink, reg)
3478            .build();
3479
3480        dag.run(&mut world, 3u32); // 3 > 5 = false, not = true
3481        assert!(*world.resource::<bool>());
3482
3483        dag.run(&mut world, 10u32); // 10 > 5 = true, not = false
3484        assert!(!*world.resource::<bool>());
3485    }
3486
3487    #[test]
3488    fn dag_and() {
3489        fn root(x: u32) -> bool {
3490            x > 5
3491        }
3492        fn sink(mut out: ResMut<bool>, val: &bool) {
3493            *out = *val;
3494        }
3495        let mut wb = WorldBuilder::new();
3496        wb.register::<bool>(true); // "market open" flag
3497        let mut world = wb.build();
3498        let reg = world.registry();
3499
3500        let mut dag = DagBuilder::<u32>::new()
3501            .root(root, reg)
3502            .and(|w: &mut World| *w.resource::<bool>(), reg)
3503            .then(sink, reg)
3504            .build();
3505
3506        dag.run(&mut world, 10u32); // true && true = true
3507        assert!(*world.resource::<bool>());
3508
3509        *world.resource_mut::<bool>() = false; // close market
3510        dag.run(&mut world, 10u32); // true && false = false
3511        assert!(!*world.resource::<bool>());
3512    }
3513
3514    #[test]
3515    fn dag_or() {
3516        fn root(x: u32) -> bool {
3517            x > 5
3518        }
3519        fn sink(mut out: ResMut<bool>, val: &bool) {
3520            *out = *val;
3521        }
3522        let mut wb = WorldBuilder::new();
3523        wb.register::<bool>(false);
3524        let mut world = wb.build();
3525        let reg = world.registry();
3526
3527        let mut dag = DagBuilder::<u32>::new()
3528            .root(root, reg)
3529            .or(|w: &mut World| *w.resource::<bool>(), reg)
3530            .then(sink, reg)
3531            .build();
3532
3533        dag.run(&mut world, 3u32); // false || false = false
3534        assert!(!*world.resource::<bool>());
3535
3536        *world.resource_mut::<bool>() = true;
3537        dag.run(&mut world, 3u32); // false || true = true
3538        assert!(*world.resource::<bool>());
3539    }
3540
3541    #[test]
3542    fn dag_xor() {
3543        fn root(x: u32) -> bool {
3544            x > 5
3545        }
3546        fn sink(mut out: ResMut<bool>, val: &bool) {
3547            *out = *val;
3548        }
3549        let mut wb = WorldBuilder::new();
3550        wb.register::<bool>(true);
3551        let mut world = wb.build();
3552        let reg = world.registry();
3553
3554        let mut dag = DagBuilder::<u32>::new()
3555            .root(root, reg)
3556            .xor(|w: &mut World| *w.resource::<bool>(), reg)
3557            .then(sink, reg)
3558            .build();
3559
3560        dag.run(&mut world, 10u32); // true ^ true = false
3561        assert!(!*world.resource::<bool>());
3562    }
3563
3564    // =========================================================================
3565    // Splat — tuple destructuring
3566    // =========================================================================
3567
3568    #[test]
3569    fn dag_splat2_on_chain() {
3570        let mut wb = WorldBuilder::new();
3571        wb.register::<u64>(0);
3572        let mut world = wb.build();
3573        let reg = world.registry();
3574
3575        fn split(x: u32) -> (u32, u32) {
3576            (x, x * 2)
3577        }
3578        fn store(mut out: ResMut<u64>, a: &u32, b: &u32) {
3579            *out = *a as u64 + *b as u64;
3580        }
3581
3582        let mut dag = DagBuilder::<u32>::new()
3583            .root(split, reg)
3584            .splat()
3585            .then(store, reg)
3586            .build();
3587
3588        dag.run(&mut world, 5u32);
3589        assert_eq!(*world.resource::<u64>(), 15); // 5 + 10
3590    }
3591
3592    #[test]
3593    fn dag_splat3_on_chain() {
3594        let mut wb = WorldBuilder::new();
3595        wb.register::<u64>(0);
3596        let mut world = wb.build();
3597        let reg = world.registry();
3598
3599        fn split3(x: u32) -> (u32, u32, u32) {
3600            (x, x + 1, x + 2)
3601        }
3602        fn sum3(a: &u32, b: &u32, c: &u32) -> u64 {
3603            *a as u64 + *b as u64 + *c as u64
3604        }
3605        fn store(mut out: ResMut<u64>, val: &u64) {
3606            *out = *val;
3607        }
3608
3609        let mut dag = DagBuilder::<u32>::new()
3610            .root(split3, reg)
3611            .splat()
3612            .then(sum3, reg)
3613            .then(store, reg)
3614            .build();
3615
3616        dag.run(&mut world, 10u32);
3617        assert_eq!(*world.resource::<u64>(), 33); // 10+11+12
3618    }
3619
3620    #[test]
3621    fn dag_splat2_with_param() {
3622        let mut wb = WorldBuilder::new();
3623        wb.register::<u64>(100);
3624        let mut world = wb.build();
3625        let reg = world.registry();
3626
3627        fn split(x: u32) -> (u32, u32) {
3628            (x, x * 3)
3629        }
3630        fn add_base(base: Res<u64>, a: &u32, b: &u32) -> u64 {
3631            *base + *a as u64 + *b as u64
3632        }
3633        fn store(mut out: ResMut<u64>, val: &u64) {
3634            *out = *val;
3635        }
3636
3637        let mut dag = DagBuilder::<u32>::new()
3638            .root(split, reg)
3639            .splat()
3640            .then(add_base, reg)
3641            .then(store, reg)
3642            .build();
3643
3644        dag.run(&mut world, 5u32);
3645        assert_eq!(*world.resource::<u64>(), 120); // 100 + 5 + 15
3646    }
3647
3648    #[test]
3649    fn dag_splat_on_arm_start() {
3650        let mut wb = WorldBuilder::new();
3651        wb.register::<u64>(0);
3652        let mut world = wb.build();
3653        let reg = world.registry();
3654
3655        fn split(x: u32) -> (u32, u32) {
3656            (x, x + 10)
3657        }
3658        fn sum2(a: &u32, b: &u32) -> u64 {
3659            *a as u64 + *b as u64
3660        }
3661        fn identity(x: &(u32, u32)) -> u64 {
3662            x.0 as u64 * x.1 as u64
3663        }
3664        fn merge_add(a: &u64, b: &u64) -> u64 {
3665            *a + *b
3666        }
3667        fn store(mut out: ResMut<u64>, val: &u64) {
3668            *out = *val;
3669        }
3670
3671        let mut dag = DagBuilder::<u32>::new()
3672            .root(split, reg)
3673            .fork()
3674            .arm(|a| a.splat().then(sum2, reg))
3675            .arm(|b| b.then(identity, reg))
3676            .merge(merge_add, reg)
3677            .then(store, reg)
3678            .build();
3679
3680        dag.run(&mut world, 5u32);
3681        // arm_a: splat (5, 15) → sum2 = 20
3682        // arm_b: identity (5, 15) → 75
3683        // merge: 20 + 75 = 95
3684        assert_eq!(*world.resource::<u64>(), 95);
3685    }
3686
3687    #[test]
3688    fn dag_splat_on_arm() {
3689        let mut wb = WorldBuilder::new();
3690        wb.register::<u64>(0);
3691        let mut world = wb.build();
3692        let reg = world.registry();
3693
3694        fn root_id(x: u32) -> u32 {
3695            x
3696        }
3697        fn make_pair(val: &u32) -> (u32, u32) {
3698            (*val, *val + 100)
3699        }
3700        fn sum2(a: &u32, b: &u32) -> u64 {
3701            *a as u64 + *b as u64
3702        }
3703        fn double(val: &u32) -> u64 {
3704            *val as u64 * 2
3705        }
3706        fn merge_add(a: &u64, b: &u64) -> u64 {
3707            *a + *b
3708        }
3709        fn store(mut out: ResMut<u64>, val: &u64) {
3710            *out = *val;
3711        }
3712
3713        let mut dag = DagBuilder::<u32>::new()
3714            .root(root_id, reg)
3715            .fork()
3716            .arm(|a| a.then(make_pair, reg).splat().then(sum2, reg))
3717            .arm(|b| b.then(double, reg))
3718            .merge(merge_add, reg)
3719            .then(store, reg)
3720            .build();
3721
3722        dag.run(&mut world, 7u32);
3723        // arm_a: make_pair(7) = (7, 107), splat → sum2 = 114
3724        // arm_b: double(7) = 14
3725        // merge: 114 + 14 = 128
3726        assert_eq!(*world.resource::<u64>(), 128);
3727    }
3728
3729    #[test]
3730    fn dag_splat4_on_chain() {
3731        let mut wb = WorldBuilder::new();
3732        wb.register::<u64>(0);
3733        let mut world = wb.build();
3734        let reg = world.registry();
3735
3736        fn split4(x: u32) -> (u32, u32, u32, u32) {
3737            (x, x + 1, x + 2, x + 3)
3738        }
3739        fn sum4(a: &u32, b: &u32, c: &u32, d: &u32) -> u64 {
3740            (*a + *b + *c + *d) as u64
3741        }
3742        fn store(mut out: ResMut<u64>, val: &u64) {
3743            *out = *val;
3744        }
3745
3746        let mut dag = DagBuilder::<u32>::new()
3747            .root(split4, reg)
3748            .splat()
3749            .then(sum4, reg)
3750            .then(store, reg)
3751            .build();
3752
3753        dag.run(&mut world, 10u32);
3754        assert_eq!(*world.resource::<u64>(), 46); // 10+11+12+13
3755    }
3756
3757    #[test]
3758    fn dag_splat5_on_chain() {
3759        let mut wb = WorldBuilder::new();
3760        wb.register::<u64>(0);
3761        let mut world = wb.build();
3762        let reg = world.registry();
3763
3764        fn split5(x: u32) -> (u8, u8, u8, u8, u8) {
3765            let x = x as u8;
3766            (x, x + 1, x + 2, x + 3, x + 4)
3767        }
3768        #[allow(clippy::many_single_char_names)]
3769        fn sum5(a: &u8, b: &u8, c: &u8, d: &u8, e: &u8) -> u64 {
3770            (*a as u64) + (*b as u64) + (*c as u64) + (*d as u64) + (*e as u64)
3771        }
3772        fn store(mut out: ResMut<u64>, val: &u64) {
3773            *out = *val;
3774        }
3775
3776        let mut dag = DagBuilder::<u32>::new()
3777            .root(split5, reg)
3778            .splat()
3779            .then(sum5, reg)
3780            .then(store, reg)
3781            .build();
3782
3783        dag.run(&mut world, 1u32);
3784        assert_eq!(*world.resource::<u64>(), 15); // 1+2+3+4+5
3785    }
3786
3787    #[test]
3788    fn dag_splat_boxable() {
3789        let mut wb = WorldBuilder::new();
3790        wb.register::<u64>(0);
3791        let mut world = wb.build();
3792        let reg = world.registry();
3793
3794        fn split(x: u32) -> (u32, u32) {
3795            (x, x * 2)
3796        }
3797        fn store(mut out: ResMut<u64>, a: &u32, b: &u32) {
3798            *out = *a as u64 + *b as u64;
3799        }
3800
3801        let dag = DagBuilder::<u32>::new()
3802            .root(split, reg)
3803            .splat()
3804            .then(store, reg)
3805            .build();
3806
3807        let mut boxed: Virtual<u32> = Box::new(dag);
3808        boxed.run(&mut world, 5u32);
3809        assert_eq!(*world.resource::<u64>(), 15);
3810    }
3811
3812    // -- Batch DAG --
3813
3814    #[test]
3815    fn batch_dag_basic() {
3816        let mut wb = WorldBuilder::new();
3817        wb.register::<u64>(0);
3818        let mut world = wb.build();
3819        let reg = world.registry();
3820
3821        fn double(x: u32) -> u64 {
3822            x as u64 * 2
3823        }
3824        fn store(mut out: ResMut<u64>, val: &u64) {
3825            *out += *val;
3826        }
3827
3828        let mut batch = DagBuilder::<u32>::new()
3829            .root(double, reg)
3830            .then(store, reg)
3831            .build_batch(8);
3832
3833        batch.input_mut().extend([1, 2, 3]);
3834        batch.run(&mut world);
3835
3836        assert_eq!(*world.resource::<u64>(), 12); // 2 + 4 + 6
3837        assert!(batch.input().is_empty());
3838    }
3839
3840    #[test]
3841    fn batch_dag_option_terminal() {
3842        let mut wb = WorldBuilder::new();
3843        wb.register::<u64>(0);
3844        let mut world = wb.build();
3845        let reg = world.registry();
3846
3847        fn double(x: u32) -> u64 {
3848            x as u64 * 2
3849        }
3850        fn store(mut out: ResMut<u64>, val: &u64) {
3851            *out += *val;
3852        }
3853
3854        let mut batch = DagBuilder::<u32>::new()
3855            .root(double, reg)
3856            .guard(|val: &u64| *val > 5, reg)
3857            .map(store, reg)
3858            .unwrap_or(())
3859            .build_batch(8);
3860
3861        batch.input_mut().extend([1, 2, 3, 4, 5]);
3862        batch.run(&mut world);
3863
3864        // double: 2, 4, 6, 8, 10
3865        // guard keeps > 5: 6, 8, 10
3866        assert_eq!(*world.resource::<u64>(), 24); // 6 + 8 + 10
3867    }
3868
3869    #[test]
3870    fn batch_dag_buffer_reuse() {
3871        let mut wb = WorldBuilder::new();
3872        wb.register::<u64>(0);
3873        let mut world = wb.build();
3874        let reg = world.registry();
3875
3876        fn double(x: u32) -> u64 {
3877            x as u64 * 2
3878        }
3879        fn store(mut out: ResMut<u64>, val: &u64) {
3880            *out += *val;
3881        }
3882
3883        let mut batch = DagBuilder::<u32>::new()
3884            .root(double, reg)
3885            .then(store, reg)
3886            .build_batch(8);
3887
3888        batch.input_mut().extend([1, 2]);
3889        batch.run(&mut world);
3890        assert_eq!(*world.resource::<u64>(), 6); // 2 + 4
3891        assert!(batch.input().is_empty());
3892
3893        batch.input_mut().extend([10, 20]);
3894        batch.run(&mut world);
3895        assert_eq!(*world.resource::<u64>(), 66); // 6 + 20 + 40
3896    }
3897
3898    #[test]
3899    fn batch_dag_retains_allocation() {
3900        let mut world = WorldBuilder::new().build();
3901        let reg = world.registry();
3902
3903        fn noop(_x: u32) {}
3904
3905        let mut batch = DagBuilder::<u32>::new().root(noop, reg).build_batch(64);
3906
3907        batch.input_mut().extend([1, 2, 3]);
3908        batch.run(&mut world);
3909
3910        assert!(batch.input().is_empty());
3911        assert!(batch.input_mut().capacity() >= 64);
3912    }
3913
3914    #[test]
3915    fn batch_dag_empty_is_noop() {
3916        let mut wb = WorldBuilder::new();
3917        wb.register::<u64>(0);
3918        let mut world = wb.build();
3919        let reg = world.registry();
3920
3921        fn double(x: u32) -> u64 {
3922            x as u64 * 2
3923        }
3924        fn store(mut out: ResMut<u64>, val: &u64) {
3925            *out += *val;
3926        }
3927
3928        let mut batch = DagBuilder::<u32>::new()
3929            .root(double, reg)
3930            .then(store, reg)
3931            .build_batch(8);
3932
3933        batch.run(&mut world);
3934        assert_eq!(*world.resource::<u64>(), 0);
3935    }
3936
3937    #[test]
3938    fn batch_dag_with_splat() {
3939        let mut wb = WorldBuilder::new();
3940        wb.register::<u64>(0);
3941        let mut world = wb.build();
3942        let reg = world.registry();
3943
3944        fn split(x: u32) -> (u64, u64) {
3945            (x as u64, x as u64 * 10)
3946        }
3947        fn combine(a: &u64, b: &u64) -> u64 {
3948            *a + *b
3949        }
3950        fn store(mut out: ResMut<u64>, val: &u64) {
3951            *out += *val;
3952        }
3953
3954        let mut batch = DagBuilder::<u32>::new()
3955            .root(split, reg)
3956            .splat()
3957            .then(combine, reg)
3958            .then(store, reg)
3959            .build_batch(4);
3960
3961        batch.input_mut().extend([1, 2]);
3962        batch.run(&mut world);
3963
3964        // 1 → (1, 10) → 11, 2 → (2, 20) → 22
3965        assert_eq!(*world.resource::<u64>(), 33); // 11 + 22
3966    }
3967
3968    // -- Conditional then (formerly switch) --
3969
3970    #[test]
3971    fn dag_then_conditional_basic() {
3972        fn root(x: u32) -> u64 {
3973            x as u64
3974        }
3975        fn sink(mut out: ResMut<u64>, val: &u64) {
3976            *out = *val;
3977        }
3978
3979        let mut wb = WorldBuilder::new();
3980        wb.register::<u64>(0);
3981        let mut world = wb.build();
3982        let reg = world.registry();
3983
3984        let mut dag = DagBuilder::<u32>::new()
3985            .root(root, reg)
3986            .then(|val: &u64| if *val > 5 { *val * 10 } else { *val + 1 }, reg)
3987            .then(sink, reg)
3988            .build();
3989
3990        dag.run(&mut world, 10u32); // 10 > 5 → 100
3991        assert_eq!(*world.resource::<u64>(), 100);
3992
3993        dag.run(&mut world, 3u32); // 3 <= 5 → 4
3994        assert_eq!(*world.resource::<u64>(), 4);
3995    }
3996
3997    #[test]
3998    fn dag_then_conditional_3_way() {
3999        fn root(x: u32) -> u32 {
4000            x
4001        }
4002        fn sink(mut out: ResMut<u64>, val: &u64) {
4003            *out = *val;
4004        }
4005
4006        let mut wb = WorldBuilder::new();
4007        wb.register::<u64>(0);
4008        let mut world = wb.build();
4009        let reg = world.registry();
4010
4011        let mut dag = DagBuilder::<u32>::new()
4012            .root(root, reg)
4013            .then(
4014                |val: &u32| match *val % 3 {
4015                    0 => *val as u64 + 100,
4016                    1 => *val as u64 + 200,
4017                    _ => *val as u64 + 300,
4018                },
4019                reg,
4020            )
4021            .then(sink, reg)
4022            .build();
4023
4024        dag.run(&mut world, 6u32); // 6 % 3 == 0 → 106
4025        assert_eq!(*world.resource::<u64>(), 106);
4026
4027        dag.run(&mut world, 7u32); // 7 % 3 == 1 → 207
4028        assert_eq!(*world.resource::<u64>(), 207);
4029
4030        dag.run(&mut world, 8u32); // 8 % 3 == 2 → 308
4031        assert_eq!(*world.resource::<u64>(), 308);
4032    }
4033
4034    #[test]
4035    fn dag_then_with_resolve_arm() {
4036        fn root(x: u32) -> u32 {
4037            x
4038        }
4039        fn double(val: &u32) -> u64 {
4040            *val as u64 * 2
4041        }
4042        fn triple(val: &u32) -> u64 {
4043            *val as u64 * 3
4044        }
4045        fn sink(mut out: ResMut<u64>, val: &u64) {
4046            *out = *val;
4047        }
4048
4049        let mut wb = WorldBuilder::new();
4050        wb.register::<u64>(0);
4051        let mut world = wb.build();
4052        let reg = world.registry();
4053
4054        let mut arm_even = resolve_arm(double, reg);
4055        let mut arm_odd = resolve_arm(triple, reg);
4056
4057        let mut dag = DagBuilder::<u32>::new()
4058            .root(root, reg)
4059            .then(
4060                move |world: &mut World, val: &u32| {
4061                    if *val % 2 == 0 {
4062                        arm_even(world, val)
4063                    } else {
4064                        arm_odd(world, val)
4065                    }
4066                },
4067                reg,
4068            )
4069            .then(sink, reg)
4070            .build();
4071
4072        dag.run(&mut world, 4u32); // even → double → 8
4073        assert_eq!(*world.resource::<u64>(), 8);
4074
4075        dag.run(&mut world, 5u32); // odd → triple → 15
4076        assert_eq!(*world.resource::<u64>(), 15);
4077    }
4078
4079    #[test]
4080    fn dag_resolve_arm_with_params() {
4081        fn root(x: u32) -> u32 {
4082            x
4083        }
4084        fn add_offset(offset: Res<i64>, val: &u32) -> u64 {
4085            (*offset + *val as i64) as u64
4086        }
4087        fn plain_double(val: &u32) -> u64 {
4088            *val as u64 * 2
4089        }
4090        fn sink(mut out: ResMut<u64>, val: &u64) {
4091            *out = *val;
4092        }
4093
4094        let mut wb = WorldBuilder::new();
4095        wb.register::<u64>(0);
4096        wb.register::<i64>(100);
4097        let mut world = wb.build();
4098        let reg = world.registry();
4099
4100        // Each arm resolves different params
4101        let mut arm_offset = resolve_arm(add_offset, reg);
4102        let mut arm_double = resolve_arm(plain_double, reg);
4103
4104        let mut dag = DagBuilder::<u32>::new()
4105            .root(root, reg)
4106            .then(
4107                move |world: &mut World, val: &u32| {
4108                    if *val > 10 {
4109                        arm_offset(world, val)
4110                    } else {
4111                        arm_double(world, val)
4112                    }
4113                },
4114                reg,
4115            )
4116            .then(sink, reg)
4117            .build();
4118
4119        dag.run(&mut world, 20u32); // > 10 → add_offset → 100 + 20 = 120
4120        assert_eq!(*world.resource::<u64>(), 120);
4121
4122        dag.run(&mut world, 5u32); // <= 10 → double → 10
4123        assert_eq!(*world.resource::<u64>(), 10);
4124    }
4125
4126    #[test]
4127    fn dag_then_conditional_in_fork_arm() {
4128        fn root(x: u32) -> u32 {
4129            x
4130        }
4131        fn pass(val: &u32) -> u32 {
4132            *val
4133        }
4134        fn sink_u64(mut out: ResMut<u64>, val: &u64) {
4135            *out = *val;
4136        }
4137        fn sink_i64(mut out: ResMut<i64>, val: &u32) {
4138            *out = -(*val as i64);
4139        }
4140
4141        let mut wb = WorldBuilder::new();
4142        wb.register::<u64>(0);
4143        wb.register::<i64>(0);
4144        let mut world = wb.build();
4145        let reg = world.registry();
4146
4147        let mut dag = DagBuilder::<u32>::new()
4148            .root(root, reg)
4149            .fork()
4150            .arm(|a| {
4151                a.then(pass, reg)
4152                    .then(
4153                        |val: &u32| {
4154                            if *val > 5 {
4155                                *val as u64 * 10
4156                            } else {
4157                                *val as u64
4158                            }
4159                        },
4160                        reg,
4161                    )
4162                    .then(sink_u64, reg)
4163            })
4164            .arm(|a| a.then(sink_i64, reg))
4165            .join()
4166            .build();
4167
4168        dag.run(&mut world, 10u32); // arm0: 10 > 5 → 100, arm1: -10
4169        assert_eq!(*world.resource::<u64>(), 100);
4170        assert_eq!(*world.resource::<i64>(), -10);
4171
4172        dag.run(&mut world, 3u32); // arm0: 3 <= 5 → 3, arm1: -3
4173        assert_eq!(*world.resource::<u64>(), 3);
4174        assert_eq!(*world.resource::<i64>(), -3);
4175    }
4176
4177    #[test]
4178    fn batch_dag_then_conditional() {
4179        fn root(x: u32) -> u32 {
4180            x
4181        }
4182        fn sink(mut out: ResMut<u64>, val: &u64) {
4183            *out += *val;
4184        }
4185
4186        let mut wb = WorldBuilder::new();
4187        wb.register::<u64>(0);
4188        let mut world = wb.build();
4189        let reg = world.registry();
4190
4191        let mut batch = DagBuilder::<u32>::new()
4192            .root(root, reg)
4193            .then(
4194                |val: &u32| {
4195                    if *val % 2 == 0 {
4196                        *val as u64 * 10
4197                    } else {
4198                        *val as u64
4199                    }
4200                },
4201                reg,
4202            )
4203            .then(sink, reg)
4204            .build_batch(8);
4205
4206        batch.input_mut().extend([1, 2, 3, 4]);
4207        batch.run(&mut world);
4208
4209        // 1 → 1, 2 → 20, 3 → 3, 4 → 40 = 64
4210        assert_eq!(*world.resource::<u64>(), 64);
4211    }
4212
4213    // =========================================================================
4214    // Scan combinator (DAG)
4215    // =========================================================================
4216
4217    #[test]
4218    fn dag_scan_arity0_closure() {
4219        let mut wb = WorldBuilder::new();
4220        wb.register::<u64>(0);
4221        let mut world = wb.build();
4222        let reg = world.registry();
4223
4224        fn store(mut out: ResMut<u64>, val: &u64) {
4225            *out = *val;
4226        }
4227
4228        let mut dag = DagBuilder::<u64>::new()
4229            .root(|x: u64| x, reg)
4230            .scan(
4231                0u64,
4232                |acc: &mut u64, val: &u64| {
4233                    *acc += val;
4234                    *acc
4235                },
4236                reg,
4237            )
4238            .then(store, reg)
4239            .build();
4240
4241        dag.run(&mut world, 10);
4242        assert_eq!(*world.resource::<u64>(), 10);
4243        dag.run(&mut world, 20);
4244        assert_eq!(*world.resource::<u64>(), 30);
4245        dag.run(&mut world, 5);
4246        assert_eq!(*world.resource::<u64>(), 35);
4247    }
4248
4249    #[test]
4250    fn dag_scan_named_fn_with_param() {
4251        let mut wb = WorldBuilder::new();
4252        wb.register::<u64>(100);
4253        wb.register::<String>(String::new());
4254        let mut world = wb.build();
4255        let reg = world.registry();
4256
4257        fn threshold(limit: Res<u64>, acc: &mut u64, val: &u64) -> Option<u64> {
4258            *acc += val;
4259            if *acc > *limit { Some(*acc) } else { None }
4260        }
4261        fn store_opt(mut out: ResMut<String>, val: &Option<u64>) {
4262            *out = val
4263                .as_ref()
4264                .map_or_else(|| "below".into(), |v| format!("hit:{v}"));
4265        }
4266
4267        let mut dag = DagBuilder::<u64>::new()
4268            .root(|x: u64| x, reg)
4269            .scan(0u64, threshold, reg)
4270            .then(store_opt, reg)
4271            .build();
4272
4273        dag.run(&mut world, 50);
4274        assert_eq!(world.resource::<String>().as_str(), "below");
4275        dag.run(&mut world, 60);
4276        assert_eq!(world.resource::<String>().as_str(), "hit:110");
4277    }
4278
4279    #[test]
4280    fn dag_arm_scan() {
4281        let mut wb = WorldBuilder::new();
4282        wb.register::<u64>(0);
4283        let mut world = wb.build();
4284        let reg = world.registry();
4285
4286        fn store(mut out: ResMut<u64>, val: &u64) {
4287            *out = *val;
4288        }
4289
4290        let scan_arm = DagArmSeed::<u64>::new()
4291            .then(|v: &u64| *v, reg)
4292            .scan(
4293                0u64,
4294                |acc: &mut u64, val: &u64| {
4295                    *acc += val;
4296                    *acc
4297                },
4298                reg,
4299            )
4300            .then(store, reg);
4301
4302        let pass_arm = DagArmSeed::<u64>::new().then(|_: &u64| {}, reg);
4303
4304        let mut dag = DagBuilder::<u64>::new()
4305            .root(|x: u64| x, reg)
4306            .fork()
4307            .arm(|_| scan_arm)
4308            .arm(|_| pass_arm)
4309            .merge(|(): &(), (): &()| {}, reg)
4310            .build();
4311
4312        dag.run(&mut world, 10);
4313        assert_eq!(*world.resource::<u64>(), 10);
4314        dag.run(&mut world, 20);
4315        assert_eq!(*world.resource::<u64>(), 30);
4316    }
4317
4318    // =========================================================================
4319    // Build — Option<()> terminal
4320    // =========================================================================
4321
4322    #[test]
4323    fn build_option_unit_terminal() {
4324        let mut wb = WorldBuilder::new();
4325        wb.register::<u64>(0);
4326        let mut world = wb.build();
4327        let reg = world.registry();
4328
4329        // root takes by value (IntoStep), then .guard() produces Option
4330        fn check(x: u32) -> u64 {
4331            x as u64
4332        }
4333        fn store(mut out: ResMut<u64>, val: &u64) {
4334            *out += *val;
4335        }
4336
4337        // guard → Option<u64>, map(store) → Option<()>, build() should work
4338        let mut dag = DagBuilder::<u32>::new()
4339            .root(check, reg)
4340            .guard(|val: &u64| *val > 5, reg)
4341            .map(store, reg)
4342            .build();
4343
4344        dag.run(&mut world, 3); // guard filters → None
4345        assert_eq!(*world.resource::<u64>(), 0);
4346        dag.run(&mut world, 7); // passes guard → stores 7
4347        assert_eq!(*world.resource::<u64>(), 7);
4348    }
4349
4350    // =========================================================================
4351    // Build — borrowed event type
4352    // =========================================================================
4353
4354    #[test]
4355    fn build_borrowed_event_direct() {
4356        let mut wb = WorldBuilder::new();
4357        wb.register::<u64>(0);
4358        let mut world = wb.build();
4359
4360        fn decode(msg: &[u8]) -> u64 {
4361            msg.len() as u64
4362        }
4363        fn store(mut out: ResMut<u64>, val: &u64) {
4364            *out = *val;
4365        }
4366
4367        // msg declared before dag so it outlives the DAG (drop order).
4368        let msg = vec![1u8, 2, 3];
4369        let reg = world.registry();
4370        let mut dag = DagBuilder::<&[u8]>::new()
4371            .root(decode, reg)
4372            .then(store, reg)
4373            .build();
4374
4375        dag.run(&mut world, &msg);
4376        assert_eq!(*world.resource::<u64>(), 3);
4377    }
4378}