Skip to main content

nexus_rt/
dag.rs

1// Builder return types use complex generics for compile-time edge validation.
2#![allow(clippy::type_complexity)]
3// Handler arity is architecturally required by the Param trait — handlers
4// take N typed parameters and the macro-generated dispatch impls expand
5// per-arity into call_inner functions with N + Input arguments. Module-level
6// allow rather than one inline attribute per arity expansion.
7#![allow(clippy::too_many_arguments)]
8
9//! DAG pipeline — monomorphized data-flow graphs with fan-out and merge.
10//!
11//! [`DagBuilder`] begins a typed DAG that encodes topology in the type system.
12//! After monomorphization, the entire DAG is a single flat function with
13//! all values as stack locals — no arena, no vtable dispatch. The only
14//! `unsafe` is in the shared [`Param::fetch`](crate::Param) path
15//! (resource access by pre-resolved index).
16//!
17//! Nodes receive their input **by reference** — fan-out is free (multiple
18//! arms borrow the same stack local). Nodes produce owned output values
19//! passed to the next step.
20//!
21//! # When to use
22//!
23//! Use DAG pipelines when data needs to fan out to multiple arms and
24//! merge back. For linear chains, prefer [`PipelineBuilder`](crate::PipelineBuilder).
25//! For dynamic fan-out by reference, use [`FanOut`](crate::FanOut) or
26//! [`Broadcast`](crate::Broadcast).
27//!
28//! # Flow control
29//!
30//! Option and Result combinators (`.guard()`, `.map()`, `.and_then()`,
31//! `.filter()`, `.catch()`, etc.) work on both the main chain and
32//! within arms.
33//!
34//! **Within an arm**, `None` / `Err` short-circuits the remaining steps
35//! in **that arm only**. Sibling arms execute unconditionally. The merge
36//! step receives whatever each arm produced (including `None`).
37//!
38//! `.tap()` observes the value mid-chain without consuming or changing it.
39//!
40//! `.route()` is binary conditional routing — evaluates a predicate and
41//! executes exactly one of two arms. Both arms must produce the same
42//! output type. For N-ary routing, nest `route` calls.
43//!
44//! To skip an entire fork, resolve Option/Result **before** `.fork()`:
45//!
46//! ```ignore
47//! DagBuilder::<RawMsg>::new()
48//!     .root(decode, reg)
49//!     .guard(|msg: &RawMsg| !msg.is_empty(), reg)  // None skips everything below
50//!     .unwrap_or(default)                           // → T, enter fork with concrete type
51//!     .fork()
52//!     // arms work with &T, not &Option<T>
53//! ```
54//!
55//! # Combinator quick reference
56//!
57//! **Topology:** `.root()`, `.then()`, `.fork()`, `.arm()`, `.merge()`,
58//! `.join()`, `.build()`
59//!
60//! **Flow control:** `.guard()`, `.tap()`, `.route()`, `.tee()`, `.scan()`,
61//! `.dedup()`
62//!
63//! **Tuple `(A, B, ...)` (2-5 elements):** `.splat()` (→ splat builder,
64//! call `.then()` with destructured `&T` args)
65//!
66//! **Option:** `.map()`, `.filter()`, `.inspect()`, `.and_then()`,
67//! `.on_none()`, `.ok_or()`, `.unwrap_or()`
68//!
69//! **Result:** `.map()`, `.and_then()`, `.catch()`, `.map_err()`,
70//! `.ok()`, `.unwrap_or()`
71//!
72//! **Bool:** `.not()`, `.and()`, `.or()`, `.xor()`
73//!
74//! **Terminal:** `.dispatch()`, `.cloned()`, `.build()`, `.build_batch(cap)`
75//!
76//! All combinators accepting functions resolve `Param` dependencies at build
77//! time via `IntoStep`, `IntoRefStep`, or `IntoProducer` — named functions
78//! get direct-pointer access. Arity-0 closures work everywhere. Raw
79//! `&mut World` closures are available as an escape hatch via `Opaque`.
80//!
81//! # Splat — tuple destructuring
82//!
83//! When a step returns a tuple, the next step normally receives the
84//! whole tuple as `&(A, B)`. `.splat()` destructures it into individual
85//! reference arguments (`&A, &B`), reusing the existing merge step
86//! infrastructure:
87//!
88//! ```ignore
89//! fn split(t: Tick) -> (f64, u64) { (t.price, t.size) }
90//! fn weighted(price: &f64, size: &u64) -> f64 { *price * *size as f64 }
91//!
92//! DagBuilder::<Tick>::new()
93//!     .root(split, reg)       // Tick → (f64, u64)
94//!     .splat()                // destructure
95//!     .then(weighted, reg)    // (&f64, &u64) → f64
96//!     .build();
97//! ```
98//!
99//! Supported for tuples of 2-5 elements. Beyond 5, define a named
100//! struct — if a combinator stage needs that many arguments, a struct
101//! makes the intent clearer and the code more maintainable.
102//!
103//! # Node signatures
104//!
105//! The root node takes the event by value. All other nodes take their
106//! input by reference:
107//!
108//! ```ignore
109//! // Root: event by value
110//! fn decode(raw: RawMsg) -> DecodedMsg { .. }
111//!
112//! // Regular: input by reference
113//! fn update_ob(msg: &DecodedMsg) { .. }
114//! fn check_risk(config: Res<Config>, msg: &DecodedMsg) -> RiskResult { .. }
115//! ```
116//!
117//! # Examples
118//!
119//! ```
120//! use nexus_rt::{WorldBuilder, ResMut, Handler, Resource};
121//! use nexus_rt::dag::DagBuilder;
122//!
123//! #[derive(Resource)]
124//! struct Accum(u64);
125//!
126//! let mut wb = WorldBuilder::new();
127//! wb.register(Accum(0));
128//! let mut world = wb.build();
129//! let reg = world.registry();
130//!
131//! fn double(x: u32) -> u64 { x as u64 * 2 }
132//! fn store(mut out: ResMut<Accum>, val: &u64) { out.0 = *val; }
133//!
134//! let mut dag = DagBuilder::<u32>::new()
135//!     .root(double, reg)
136//!     .then(store, reg)
137//!     .build();
138//!
139//! dag.run(&mut world, 5u32);
140//! assert_eq!(world.resource::<Accum>().0, 10);
141//! ```
142//!
143//! # Returning DAGs from functions (Rust 2024)
144//!
145//! When a factory function takes `&Registry` and returns `impl Handler<E>`,
146//! Rust 2024 captures the registry borrow in the return type by default.
147//! Use `+ use<...>` to exclude it:
148//!
149//! ```ignore
150//! fn on_tick<C: Config>(
151//!     reg: &Registry,
152//! ) -> impl Handler<Tick> + use<C> {
153//!     DagBuilder::<Tick>::new()
154//!         .root(split::<C>, reg)
155//!         .fork()
156//!         // ...
157//!         .build()
158//! }
159//! ```
160//!
161//! List every type parameter the DAG captures; omit the `&Registry`
162//! lifetime — it's consumed during `.build()`. See the
163//! [crate-level docs](crate#returning-impl-handler-from-functions-rust-2024)
164//! for the full explanation.
165
166use std::marker::PhantomData;
167
168use crate::Handler;
169use crate::pipeline::{
170    AndBoolNode, ChainCall, ClonedNode, ClonedOptionNode, ClonedResultNode, DagAndThenOptionNode,
171    DagAndThenResultNode, DagCatchNode, DagMapOptionNode, DagMapResultNode, DagRouteNode,
172    DagThenNode, DedupNode, DiscardOptionNode, DispatchNode, FilterNode, GuardNode, IdentityNode,
173    InspectErrNode, InspectOptionNode, InspectResultNode, IntoProducer, IntoRefScanStep,
174    IntoRefStep, IntoStep, MapErrNode, NotNode, OkOrElseNode, OkOrNode, OkResultNode, OnNoneNode,
175    OrBoolNode, OrElseNode, RefScanNode, StepCall, TapNode, TeeNode, ThenNode,
176    UnwrapOrElseOptionNode, UnwrapOrElseResultNode, UnwrapOrOptionNode, UnwrapOrResultNode,
177    XorBoolNode,
178};
179use crate::world::{Registry, World};
180
181// =============================================================================
182// MergeStepCall / IntoMergeStep — merge step dispatch
183// =============================================================================
184
185/// Callable trait for resolved merge steps.
186///
187/// Like [`StepCall`] but for merge steps with multiple reference inputs
188/// bundled as `Inputs` (e.g. `(&'a A, &'a B)`).
189#[doc(hidden)]
190pub trait MergeStepCall<Inputs, Out> {
191    /// Call this merge step with a world reference and input references.
192    fn call(&mut self, world: &mut World, inputs: Inputs) -> Out;
193}
194
195/// Converts a named function into a resolved merge step.
196///
197/// Params first, then N reference inputs, returns output:
198///
199/// ```ignore
200/// fn check(config: Res<Config>, ob: &ObResult, risk: &RiskResult) -> Decision { .. }
201/// ```
202#[doc(hidden)]
203#[diagnostic::on_unimplemented(
204    message = "this function cannot be used as a merge step",
205    note = "merge steps take reference tuple inputs from the fork arms",
206    note = "closures with resource parameters are not supported — use a named `fn`"
207)]
208pub trait IntoMergeStep<Inputs, Out, Params> {
209    /// The concrete resolved merge step type.
210    type Step: MergeStepCall<Inputs, Out>;
211
212    /// Resolve Param state from the registry and produce a merge step.
213    fn into_merge_step(self, registry: &Registry) -> Self::Step;
214}
215
216/// Internal: pre-resolved merge step with cached Param state.
217#[doc(hidden)]
218pub struct MergeStep<F, Params: crate::handler::Param> {
219    f: F,
220    state: Params::State,
221    // Retained for future diagnostic/tracing use (step name in error messages).
222    #[allow(dead_code)]
223    name: &'static str,
224}
225
226// -- Merge arity 2 -----------------------------------------------------------
227
228// Param arity 0: closures work.
229impl<A, B, Out, F> MergeStepCall<(&A, &B), Out> for MergeStep<F, ()>
230where
231    F: FnMut(&A, &B) -> Out + 'static,
232{
233    #[inline(always)]
234    fn call(&mut self, _world: &mut World, inputs: (&A, &B)) -> Out {
235        (self.f)(inputs.0, inputs.1)
236    }
237}
238
239impl<A, B, Out, F> IntoMergeStep<(&A, &B), Out, ()> for F
240where
241    F: FnMut(&A, &B) -> Out + 'static,
242{
243    type Step = MergeStep<F, ()>;
244
245    fn into_merge_step(self, registry: &Registry) -> Self::Step {
246        MergeStep {
247            f: self,
248            state: <() as crate::handler::Param>::init(registry),
249            name: std::any::type_name::<F>(),
250        }
251    }
252}
253
254// Param arities 1-8 for merge arity 2.
255macro_rules! impl_merge2_step {
256    ($($P:ident),+) => {
257        impl<A, B, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
258            MergeStepCall<(&A, &B), Out> for MergeStep<F, ($($P,)+)>
259        where
260            for<'a> &'a mut F:
261                FnMut($($P,)+ &A, &B) -> Out +
262                FnMut($($P::Item<'a>,)+ &A, &B) -> Out,
263        {
264            #[inline(always)]
265            #[allow(non_snake_case)]
266            fn call(&mut self, world: &mut World, inputs: (&A, &B)) -> Out {
267                fn call_inner<$($P,)+ IA, IB, Output>(
268                    mut f: impl FnMut($($P,)+ &IA, &IB) -> Output,
269                    $($P: $P,)+
270                    a: &IA, b: &IB,
271                ) -> Output {
272                    f($($P,)+ a, b)
273                }
274                // SAFETY: state was produced by Param::init() on the same Registry
275                // that built this World. Borrows are disjoint — enforced by
276                // conflict detection at build time.
277                #[cfg(debug_assertions)]
278                world.clear_borrows();
279                let ($($P,)+) = unsafe {
280                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
281                };
282                call_inner(&mut self.f, $($P,)+ inputs.0, inputs.1)
283            }
284        }
285
286        impl<A, B, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
287            IntoMergeStep<(&A, &B), Out, ($($P,)+)> for F
288        where
289            for<'a> &'a mut F:
290                FnMut($($P,)+ &A, &B) -> Out +
291                FnMut($($P::Item<'a>,)+ &A, &B) -> Out,
292        {
293            type Step = MergeStep<F, ($($P,)+)>;
294
295            fn into_merge_step(self, registry: &Registry) -> Self::Step {
296                let state = <($($P,)+) as crate::handler::Param>::init(registry);
297                {
298                    #[allow(non_snake_case)]
299                    let ($($P,)+) = &state;
300                    registry.check_access(&[
301                        $((<$P as crate::handler::Param>::resource_id($P),
302                           std::any::type_name::<$P>()),)+
303                    ]);
304                }
305                MergeStep { f: self, state, name: std::any::type_name::<F>() }
306            }
307        }
308    };
309}
310
311// -- Merge arity 3 -----------------------------------------------------------
312
313impl<A, B, C, Out, F> MergeStepCall<(&A, &B, &C), Out> for MergeStep<F, ()>
314where
315    F: FnMut(&A, &B, &C) -> Out + 'static,
316{
317    #[inline(always)]
318    fn call(&mut self, _world: &mut World, inputs: (&A, &B, &C)) -> Out {
319        (self.f)(inputs.0, inputs.1, inputs.2)
320    }
321}
322
323impl<A, B, C, Out, F> IntoMergeStep<(&A, &B, &C), Out, ()> for F
324where
325    F: FnMut(&A, &B, &C) -> Out + 'static,
326{
327    type Step = MergeStep<F, ()>;
328
329    fn into_merge_step(self, registry: &Registry) -> Self::Step {
330        MergeStep {
331            f: self,
332            state: <() as crate::handler::Param>::init(registry),
333            name: std::any::type_name::<F>(),
334        }
335    }
336}
337
338macro_rules! impl_merge3_step {
339    ($($P:ident),+) => {
340        impl<A, B, C, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
341            MergeStepCall<(&A, &B, &C), Out> for MergeStep<F, ($($P,)+)>
342        where
343            for<'a> &'a mut F:
344                FnMut($($P,)+ &A, &B, &C) -> Out +
345                FnMut($($P::Item<'a>,)+ &A, &B, &C) -> Out,
346        {
347            #[inline(always)]
348            #[allow(non_snake_case)]
349            fn call(&mut self, world: &mut World, inputs: (&A, &B, &C)) -> Out {
350                fn call_inner<$($P,)+ IA, IB, IC, Output>(
351                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC) -> Output,
352                    $($P: $P,)+
353                    a: &IA, b: &IB, c: &IC,
354                ) -> Output {
355                    f($($P,)+ a, b, c)
356                }
357                // SAFETY: state was produced by Param::init() on the same Registry
358                // that built this World. Borrows are disjoint — enforced by
359                // conflict detection at build time.
360                #[cfg(debug_assertions)]
361                world.clear_borrows();
362                let ($($P,)+) = unsafe {
363                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
364                };
365                call_inner(&mut self.f, $($P,)+ inputs.0, inputs.1, inputs.2)
366            }
367        }
368
369        impl<A, B, C, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
370            IntoMergeStep<(&A, &B, &C), Out, ($($P,)+)> for F
371        where
372            for<'a> &'a mut F:
373                FnMut($($P,)+ &A, &B, &C) -> Out +
374                FnMut($($P::Item<'a>,)+ &A, &B, &C) -> Out,
375        {
376            type Step = MergeStep<F, ($($P,)+)>;
377
378            fn into_merge_step(self, registry: &Registry) -> Self::Step {
379                let state = <($($P,)+) as crate::handler::Param>::init(registry);
380                {
381                    #[allow(non_snake_case)]
382                    let ($($P,)+) = &state;
383                    registry.check_access(&[
384                        $((<$P as crate::handler::Param>::resource_id($P),
385                           std::any::type_name::<$P>()),)+
386                    ]);
387                }
388                MergeStep { f: self, state, name: std::any::type_name::<F>() }
389            }
390        }
391    };
392}
393
394// -- Merge arity 4 -----------------------------------------------------------
395
396impl<A, B, C, D, Out, F> MergeStepCall<(&A, &B, &C, &D), Out> for MergeStep<F, ()>
397where
398    F: FnMut(&A, &B, &C, &D) -> Out + 'static,
399{
400    #[inline(always)]
401    fn call(&mut self, _world: &mut World, i: (&A, &B, &C, &D)) -> Out {
402        (self.f)(i.0, i.1, i.2, i.3)
403    }
404}
405
406impl<A, B, C, D, Out, F> IntoMergeStep<(&A, &B, &C, &D), Out, ()> for F
407where
408    F: FnMut(&A, &B, &C, &D) -> Out + 'static,
409{
410    type Step = MergeStep<F, ()>;
411    fn into_merge_step(self, registry: &Registry) -> Self::Step {
412        MergeStep {
413            f: self,
414            state: <() as crate::handler::Param>::init(registry),
415            name: std::any::type_name::<F>(),
416        }
417    }
418}
419
420macro_rules! impl_merge4_step {
421    ($($P:ident),+) => {
422        impl<A, B, C, D, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
423            MergeStepCall<(&A, &B, &C, &D), Out> for MergeStep<F, ($($P,)+)>
424        where for<'a> &'a mut F:
425            FnMut($($P,)+ &A, &B, &C, &D) -> Out +
426            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
427        {
428            #[inline(always)]
429            #[allow(non_snake_case)]
430            fn call(&mut self, world: &mut World, i: (&A, &B, &C, &D)) -> Out {
431                fn call_inner<$($P,)+ IA, IB, IC, ID, Output>(
432                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC, &ID) -> Output,
433                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID,
434                ) -> Output { f($($P,)+ a, b, c, d) }
435                // SAFETY: state was produced by Param::init() on the same Registry
436                // that built this World. Borrows are disjoint — enforced by
437                // conflict detection at build time.
438                #[cfg(debug_assertions)]
439                world.clear_borrows();
440                let ($($P,)+) = unsafe {
441                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
442                };
443                call_inner(&mut self.f, $($P,)+ i.0, i.1, i.2, i.3)
444            }
445        }
446        impl<A, B, C, D, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
447            IntoMergeStep<(&A, &B, &C, &D), Out, ($($P,)+)> for F
448        where for<'a> &'a mut F:
449            FnMut($($P,)+ &A, &B, &C, &D) -> Out +
450            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
451        {
452            type Step = MergeStep<F, ($($P,)+)>;
453            fn into_merge_step(self, registry: &Registry) -> Self::Step {
454                let state = <($($P,)+) as crate::handler::Param>::init(registry);
455                { #[allow(non_snake_case)] let ($($P,)+) = &state;
456                  registry.check_access(&[$((<$P as crate::handler::Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
457                MergeStep { f: self, state, name: std::any::type_name::<F>() }
458            }
459        }
460    };
461}
462
463// -- Merge arity 5 -----------------------------------------------------------
464
465impl<A, B, C, D, E, Out, F> MergeStepCall<(&A, &B, &C, &D, &E), Out> for MergeStep<F, ()>
466where
467    F: FnMut(&A, &B, &C, &D, &E) -> Out + 'static,
468{
469    #[inline(always)]
470    fn call(&mut self, _world: &mut World, i: (&A, &B, &C, &D, &E)) -> Out {
471        (self.f)(i.0, i.1, i.2, i.3, i.4)
472    }
473}
474
475impl<A, B, C, D, E, Out, F> IntoMergeStep<(&A, &B, &C, &D, &E), Out, ()> for F
476where
477    F: FnMut(&A, &B, &C, &D, &E) -> Out + 'static,
478{
479    type Step = MergeStep<F, ()>;
480    fn into_merge_step(self, registry: &Registry) -> Self::Step {
481        MergeStep {
482            f: self,
483            state: <() as crate::handler::Param>::init(registry),
484            name: std::any::type_name::<F>(),
485        }
486    }
487}
488
489macro_rules! impl_merge5_step {
490    ($($P:ident),+) => {
491        impl<A, B, C, D, E, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
492            MergeStepCall<(&A, &B, &C, &D, &E), Out> for MergeStep<F, ($($P,)+)>
493        where for<'a> &'a mut F:
494            FnMut($($P,)+ &A, &B, &C, &D, &E) -> Out +
495            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D, &E) -> Out,
496        {
497            #[inline(always)]
498            #[allow(non_snake_case)]
499            fn call(&mut self, world: &mut World, i: (&A, &B, &C, &D, &E)) -> Out {
500                fn call_inner<$($P,)+ IA, IB, IC, ID, IE, Output>(
501                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC, &ID, &IE) -> Output,
502                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID, e: &IE,
503                ) -> Output { f($($P,)+ a, b, c, d, e) }
504                // SAFETY: state was produced by Param::init() on the same Registry
505                // that built this World. Borrows are disjoint — enforced by
506                // conflict detection at build time.
507                #[cfg(debug_assertions)]
508                world.clear_borrows();
509                let ($($P,)+) = unsafe {
510                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
511                };
512                call_inner(&mut self.f, $($P,)+ i.0, i.1, i.2, i.3, i.4)
513            }
514        }
515        impl<A, B, C, D, E, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
516            IntoMergeStep<(&A, &B, &C, &D, &E), Out, ($($P,)+)> for F
517        where for<'a> &'a mut F:
518            FnMut($($P,)+ &A, &B, &C, &D, &E) -> Out +
519            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D, &E) -> Out,
520        {
521            type Step = MergeStep<F, ($($P,)+)>;
522            fn into_merge_step(self, registry: &Registry) -> Self::Step {
523                let state = <($($P,)+) as crate::handler::Param>::init(registry);
524                { #[allow(non_snake_case)] let ($($P,)+) = &state;
525                  registry.check_access(&[$((<$P as crate::handler::Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
526                MergeStep { f: self, state, name: std::any::type_name::<F>() }
527            }
528        }
529    };
530}
531
532all_tuples!(impl_merge2_step);
533all_tuples!(impl_merge3_step);
534all_tuples!(impl_merge4_step);
535all_tuples!(impl_merge5_step);
536
537// =============================================================================
538// DAG — monomorphized, zero vtable dispatch
539// =============================================================================
540//
541// Encodes DAG topology in the type system at compile time. After
542// monomorphization, the entire DAG is a single flat function with all
543// values as stack locals. No arena, no bitmap. Only unsafe is
544// in the shared Param::fetch path (resource access by pre-resolved index).
545//
546// Fan-out: multiple nodes borrow the same stack local (no Clone).
547// Merge: merge step borrows all arm outputs.
548// Panic safety: stack unwinding drops all locals automatically.
549
550/// Entry point for building a DAG pipeline.
551///
552/// The DAG encodes topology in the type system at compile time,
553/// producing a single monomorphized chain of named node types. All values live as
554/// stack locals in the `run()` body — no arena, no vtable dispatch.
555/// The only `unsafe` is in the shared [`Param::fetch`](crate::Param)
556/// path (resource access by pre-resolved index).
557///
558/// # Examples
559///
560/// ```
561/// use nexus_rt::{WorldBuilder, ResMut, Handler, Resource};
562/// use nexus_rt::dag::DagBuilder;
563///
564/// #[derive(Resource)]
565/// struct Accum(u64);
566///
567/// let mut wb = WorldBuilder::new();
568/// wb.register(Accum(0));
569/// let mut world = wb.build();
570/// let reg = world.registry();
571///
572/// fn double(x: u32) -> u64 { x as u64 * 2 }
573/// fn store(mut out: ResMut<Accum>, val: &u64) { out.0 = *val; }
574///
575/// let mut dag = DagBuilder::<u32>::new()
576///     .root(double, reg)
577///     .then(store, reg)
578///     .build();
579///
580/// dag.run(&mut world, 5u32);
581/// assert_eq!(world.resource::<Accum>().0, 10);
582/// ```
583#[must_use = "a DAG builder does nothing unless you chain steps and call .build()"]
584pub struct DagBuilder<E>(PhantomData<fn(E)>);
585
586impl<E> DagBuilder<E> {
587    /// Create a new typed DAG entry point.
588    pub fn new() -> Self {
589        Self(PhantomData)
590    }
591
592    /// Set the root step. Takes the event `E` by value, produces `Out`.
593    pub fn root<Out, Params, S>(
594        self,
595        f: S,
596        registry: &Registry,
597    ) -> DagChain<E, Out, ThenNode<IdentityNode, S::Step>>
598    where
599        Out: 'static,
600        S: IntoStep<E, Out, Params>,
601    {
602        DagChain {
603            chain: ThenNode {
604                prev: IdentityNode,
605                step: f.into_step(registry),
606            },
607            _marker: PhantomData,
608        }
609    }
610}
611
612impl<E> Default for DagBuilder<E> {
613    fn default() -> Self {
614        Self::new()
615    }
616}
617
618/// Main chain builder for a typed DAG.
619///
620/// `Chain` implements [`ChainCall<E, Out = Out>`] — a named node type
621/// representing all steps composed so far. No closures, no `use<>`.
622#[must_use = "DAG chain does nothing until .build() is called"]
623pub struct DagChain<E, Out, Chain> {
624    pub(crate) chain: Chain,
625    pub(crate) _marker: PhantomData<fn(E) -> Out>,
626}
627
628impl<E, Out: 'static, Chain> DagChain<E, Out, Chain> {
629    /// Enter fork mode. Subsequent `.arm()` calls add parallel branches.
630    pub fn fork(self) -> DagChainFork<E, Out, Chain, ()> {
631        DagChainFork {
632            chain: self.chain,
633            arms: (),
634            _marker: PhantomData,
635        }
636    }
637}
638
639impl<E, Chain> DagChain<E, (), Chain>
640where
641    Chain: ChainCall<E, Out = ()> + Send,
642{
643    /// Finalize into a [`Dag`] that implements [`Handler<E>`].
644    ///
645    /// Only available when the chain ends with `()` or `Option<()>`.
646    /// If your DAG produces a value, add a final `.then()` that consumes
647    /// the output.
648    #[must_use = "building a DAG without storing it does nothing"]
649    pub fn build(self) -> Dag<Chain> {
650        Dag { chain: self.chain }
651    }
652}
653
654impl<E, Chain> DagChain<E, Option<()>, Chain>
655where
656    Chain: ChainCall<E, Out = Option<()>> + Send,
657{
658    /// Finalize into a [`Dag`], discarding the `Option<()>`.
659    ///
660    /// DAGs ending with `Option<()>` produce the same [`Dag`] as those
661    /// ending with `()`.
662    #[must_use = "building a DAG without storing it does nothing"]
663    pub fn build(self) -> Dag<DiscardOptionNode<Chain>> {
664        Dag {
665            chain: DiscardOptionNode { prev: self.chain },
666        }
667    }
668}
669
670/// Arm builder seed. Used in `.arm()` closures and to build arms for
671/// [`.route()`](DagChain::route).
672///
673/// Call `.then()` to add the first step in this arm.
674pub struct DagArmSeed<In>(PhantomData<fn(*const In)>);
675
676impl<In: 'static> DagArmSeed<In> {
677    /// Create a new arm builder seed.
678    ///
679    /// Used to build arms passed to [`DagChain::route`] or
680    /// [`DagArm::route`]:
681    ///
682    /// ```ignore
683    /// let fast = DagArmSeed::new().then(fast_path, &reg);
684    /// let slow = DagArmSeed::new().then(slow_path, &reg);
685    /// dag.route(predicate, &reg, fast, slow)
686    /// ```
687    pub fn new() -> Self {
688        Self(PhantomData)
689    }
690}
691
692impl<In: 'static> Default for DagArmSeed<In> {
693    fn default() -> Self {
694        Self::new()
695    }
696}
697
698impl<In: 'static> DagArmSeed<In> {
699    /// Add the first step in this arm. Takes `&In` by reference.
700    pub fn then<Out, Params, S>(
701        self,
702        f: S,
703        registry: &Registry,
704    ) -> DagArm<In, Out, ThenNode<IdentityNode, S::Step>>
705    where
706        Out: 'static,
707        S: IntoStep<&'static In, Out, Params>,
708        S::Step: for<'a> StepCall<&'a In, Out = Out>,
709    {
710        DagArm {
711            chain: ThenNode {
712                prev: IdentityNode,
713                step: f.into_step(registry),
714            },
715            _marker: PhantomData,
716        }
717    }
718}
719
720/// Built arm in a typed DAG fork.
721///
722/// `Chain` implements [`ChainCall<&In, Out = Out>`] — a named node type
723/// for this arm's steps.
724pub struct DagArm<In, Out, Chain> {
725    pub(crate) chain: Chain,
726    pub(crate) _marker: PhantomData<fn(*const In) -> Out>,
727}
728
729impl<In: 'static, Out: 'static, Chain> DagArm<In, Out, Chain> {
730    /// Enter fork mode within this arm.
731    pub fn fork(self) -> DagArmFork<In, Out, Chain, ()> {
732        DagArmFork {
733            chain: self.chain,
734            arms: (),
735            _marker: PhantomData,
736        }
737    }
738}
739
740/// Fork builder on the main chain. Accumulates arms as a tuple.
741pub struct DagChainFork<E, ForkOut, Chain, Arms> {
742    chain: Chain,
743    arms: Arms,
744    _marker: PhantomData<fn(E) -> ForkOut>,
745}
746
747/// Fork builder within an arm. Accumulates sub-arms as a tuple.
748pub struct DagArmFork<In, ForkOut, Chain, Arms> {
749    chain: Chain,
750    arms: Arms,
751    _marker: PhantomData<fn(*const In) -> ForkOut>,
752}
753
754/// Final built DAG. Implements [`Handler<E>`].
755///
756/// Created by [`DagChain::build`]. The entire DAG is monomorphized
757/// at compile time — no boxing, no virtual dispatch, no arena.
758/// Supports `for<'a> Handler<&'a T>` for zero-copy event dispatch.
759/// For batch processing, see [`BatchDag`].
760pub struct Dag<Chain> {
761    chain: Chain,
762}
763
764impl<E, Chain> Handler<E> for Dag<Chain>
765where
766    Chain: ChainCall<E, Out = ()> + Send,
767{
768    fn run(&mut self, world: &mut World, event: E) {
769        self.chain.call(world, event);
770    }
771
772    fn name(&self) -> &'static str {
773        "dag::Dag"
774    }
775}
776
777// =============================================================================
778// Fork arity macro — arm accumulation, merge, join
779// =============================================================================
780
781// =============================================================================
782// Combinator macro — shared between DagChain and DagArm
783// =============================================================================
784
785/// Generates step combinators, Option/Result helpers, and clone helpers.
786///
787/// DagChain and DagArm use the same named node types — `In` appears only
788/// on the `ChainCall<In>` trait impl, not on the struct. No closures, no
789/// `use<>` captures.
790macro_rules! impl_dag_combinators {
791    (builder: $Builder:ident, upstream: $U:ident) => {
792        // =============================================================
793        // Core — any Out
794        // =============================================================
795
796        impl<$U, Out: 'static, Chain> $Builder<$U, Out, Chain> {
797            /// Append a step. The step receives `&Out` by reference.
798            pub fn then<NewOut, Params, S>(
799                self,
800                f: S,
801                registry: &Registry,
802            ) -> $Builder<$U, NewOut, DagThenNode<Chain, S::Step, NewOut>>
803            where
804                NewOut: 'static,
805                S: IntoStep<&'static Out, NewOut, Params>,
806                S::Step: for<'a> StepCall<&'a Out, Out = NewOut>,
807            {
808                $Builder {
809                    chain: DagThenNode {
810                        prev: self.chain,
811                        step: f.into_step(registry),
812                        _out: PhantomData,
813                    },
814                    _marker: PhantomData,
815                }
816            }
817
818            /// Dispatch output to a [`Handler<Out>`].
819            pub fn dispatch<H: Handler<Out>>(
820                self,
821                handler: H,
822            ) -> $Builder<$U, (), DispatchNode<Chain, H>> {
823                $Builder {
824                    chain: DispatchNode {
825                        prev: self.chain,
826                        handler,
827                    },
828                    _marker: PhantomData,
829                }
830            }
831
832            /// Conditionally wrap the output in `Option`.
833            pub fn guard<Params, S: IntoRefStep<Out, bool, Params>>(
834                self,
835                f: S,
836                registry: &Registry,
837            ) -> $Builder<$U, Option<Out>, GuardNode<Chain, S::Step>> {
838                $Builder {
839                    chain: GuardNode {
840                        prev: self.chain,
841                        step: f.into_ref_step(registry),
842                    },
843                    _marker: PhantomData,
844                }
845            }
846
847            /// Open a view scope. Steps inside operate on a read-only
848            /// view constructed from the event. Close with `.end_view()`.
849            pub fn view<V: crate::view::View<Out>>(
850                self,
851            ) -> crate::view::ViewScope<$U, Out, V, Chain, ()> {
852                crate::view::ViewScope::new(self.chain)
853            }
854
855            /// Observe the current value without consuming or changing it.
856            pub fn tap<Params, S: IntoRefStep<Out, (), Params>>(
857                self,
858                f: S,
859                registry: &Registry,
860            ) -> $Builder<$U, Out, TapNode<Chain, S::Step>> {
861                $Builder {
862                    chain: TapNode {
863                        prev: self.chain,
864                        step: f.into_ref_step(registry),
865                    },
866                    _marker: PhantomData,
867                }
868            }
869
870            /// Binary conditional routing. Both arms borrow `&Out`.
871            pub fn route<NewOut, C0, C1, Params, Pred: IntoRefStep<Out, bool, Params>>(
872                self,
873                pred: Pred,
874                registry: &Registry,
875                on_true: DagArm<Out, NewOut, C0>,
876                on_false: DagArm<Out, NewOut, C1>,
877            ) -> $Builder<$U, NewOut, DagRouteNode<Chain, Pred::Step, C0, C1, NewOut>>
878            where
879                C0: for<'a> ChainCall<&'a Out, Out = NewOut>,
880                C1: for<'a> ChainCall<&'a Out, Out = NewOut>,
881            {
882                $Builder {
883                    chain: DagRouteNode {
884                        prev: self.chain,
885                        pred: pred.into_ref_step(registry),
886                        on_true: on_true.chain,
887                        on_false: on_false.chain,
888                        _out: PhantomData,
889                    },
890                    _marker: PhantomData,
891                }
892            }
893
894            /// Fork off a multi-step side-effect chain.
895            pub fn tee<C>(self, side: DagArm<Out, (), C>) -> $Builder<$U, Out, TeeNode<Chain, C>>
896            where
897                C: for<'a> ChainCall<&'a Out, Out = ()>,
898            {
899                $Builder {
900                    chain: TeeNode {
901                        prev: self.chain,
902                        side: side.chain,
903                    },
904                    _marker: PhantomData,
905                }
906            }
907
908            /// Scan with persistent accumulator. The step receives
909            /// `&mut Acc` and `&Out` by reference.
910            pub fn scan<Acc, NewOut, Params, S>(
911                self,
912                initial: Acc,
913                f: S,
914                registry: &Registry,
915            ) -> $Builder<$U, NewOut, RefScanNode<Chain, S::Step, Acc>>
916            where
917                Acc: 'static,
918                NewOut: 'static,
919                S: IntoRefScanStep<Acc, Out, NewOut, Params>,
920            {
921                $Builder {
922                    chain: RefScanNode {
923                        prev: self.chain,
924                        step: f.into_ref_scan_step(registry),
925                        acc: initial,
926                    },
927                    _marker: PhantomData,
928                }
929            }
930        }
931
932        // =============================================================
933        // Dedup — suppress unchanged values
934        // =============================================================
935
936        impl<$U, Out: PartialEq + Clone + 'static, Chain> $Builder<$U, Out, Chain> {
937            /// Suppress consecutive unchanged values.
938            pub fn dedup(self) -> $Builder<$U, Option<Out>, DedupNode<Chain, Out>> {
939                $Builder {
940                    chain: DedupNode {
941                        prev: self.chain,
942                        last: None,
943                    },
944                    _marker: PhantomData,
945                }
946            }
947        }
948
949        // =============================================================
950        // Bool combinators
951        // =============================================================
952
953        impl<$U, Chain> $Builder<$U, bool, Chain> {
954            /// Invert a boolean value.
955            #[allow(clippy::should_implement_trait)]
956            pub fn not(self) -> $Builder<$U, bool, NotNode<Chain>> {
957                $Builder {
958                    chain: NotNode { prev: self.chain },
959                    _marker: PhantomData,
960                }
961            }
962
963            /// Short-circuit AND with a second boolean.
964            pub fn and<Params, S: IntoProducer<bool, Params>>(
965                self,
966                f: S,
967                registry: &Registry,
968            ) -> $Builder<$U, bool, AndBoolNode<Chain, S::Step>> {
969                $Builder {
970                    chain: AndBoolNode {
971                        prev: self.chain,
972                        producer: f.into_producer(registry),
973                    },
974                    _marker: PhantomData,
975                }
976            }
977
978            /// Short-circuit OR with a second boolean.
979            pub fn or<Params, S: IntoProducer<bool, Params>>(
980                self,
981                f: S,
982                registry: &Registry,
983            ) -> $Builder<$U, bool, OrBoolNode<Chain, S::Step>> {
984                $Builder {
985                    chain: OrBoolNode {
986                        prev: self.chain,
987                        producer: f.into_producer(registry),
988                    },
989                    _marker: PhantomData,
990                }
991            }
992
993            /// XOR with a second boolean.
994            pub fn xor<Params, S: IntoProducer<bool, Params>>(
995                self,
996                f: S,
997                registry: &Registry,
998            ) -> $Builder<$U, bool, XorBoolNode<Chain, S::Step>> {
999                $Builder {
1000                    chain: XorBoolNode {
1001                        prev: self.chain,
1002                        producer: f.into_producer(registry),
1003                    },
1004                    _marker: PhantomData,
1005                }
1006            }
1007        }
1008
1009        // =============================================================
1010        // Clone helpers — &T → T transitions
1011        // =============================================================
1012
1013        impl<'a, $U, T: Clone, Chain> $Builder<$U, &'a T, Chain> {
1014            /// Clone a borrowed output to produce an owned value.
1015            pub fn cloned(self) -> $Builder<$U, T, ClonedNode<Chain>> {
1016                $Builder {
1017                    chain: ClonedNode { prev: self.chain },
1018                    _marker: PhantomData,
1019                }
1020            }
1021        }
1022
1023        impl<'a, $U, T: Clone, Chain> $Builder<$U, Option<&'a T>, Chain> {
1024            /// Clone inner borrowed value. `Option<&T>` → `Option<T>`.
1025            pub fn cloned(self) -> $Builder<$U, Option<T>, ClonedOptionNode<Chain>> {
1026                $Builder {
1027                    chain: ClonedOptionNode { prev: self.chain },
1028                    _marker: PhantomData,
1029                }
1030            }
1031        }
1032
1033        impl<'a, $U, T: Clone, Err, Chain> $Builder<$U, Result<&'a T, Err>, Chain> {
1034            /// Clone inner borrowed Ok value.
1035            pub fn cloned(self) -> $Builder<$U, Result<T, Err>, ClonedResultNode<Chain>> {
1036                $Builder {
1037                    chain: ClonedResultNode { prev: self.chain },
1038                    _marker: PhantomData,
1039                }
1040            }
1041        }
1042
1043        // =============================================================
1044        // Option helpers
1045        // =============================================================
1046
1047        impl<$U, T: 'static, Chain> $Builder<$U, Option<T>, Chain> {
1048            /// Transform the inner value. Step not called on None.
1049            pub fn map<U, Params, S: IntoStep<&'static T, U, Params>>(
1050                self,
1051                f: S,
1052                registry: &Registry,
1053            ) -> $Builder<$U, Option<U>, DagMapOptionNode<Chain, S::Step, U>>
1054            where
1055                U: 'static,
1056                S::Step: for<'x> StepCall<&'x T, Out = U>,
1057            {
1058                $Builder {
1059                    chain: DagMapOptionNode {
1060                        prev: self.chain,
1061                        step: f.into_step(registry),
1062                        _out: PhantomData,
1063                    },
1064                    _marker: PhantomData,
1065                }
1066            }
1067
1068            /// Short-circuits on None. std: `Option::and_then`
1069            pub fn and_then<U, Params, S: IntoStep<&'static T, Option<U>, Params>>(
1070                self,
1071                f: S,
1072                registry: &Registry,
1073            ) -> $Builder<$U, Option<U>, DagAndThenOptionNode<Chain, S::Step, U>>
1074            where
1075                U: 'static,
1076                S::Step: for<'x> StepCall<&'x T, Out = Option<U>>,
1077            {
1078                $Builder {
1079                    chain: DagAndThenOptionNode {
1080                        prev: self.chain,
1081                        step: f.into_step(registry),
1082                        _out: PhantomData,
1083                    },
1084                    _marker: PhantomData,
1085                }
1086            }
1087
1088            /// Side effect on None.
1089            pub fn on_none<Params, S: IntoProducer<(), Params>>(
1090                self,
1091                f: S,
1092                registry: &Registry,
1093            ) -> $Builder<$U, Option<T>, OnNoneNode<Chain, S::Step>> {
1094                $Builder {
1095                    chain: OnNoneNode {
1096                        prev: self.chain,
1097                        producer: f.into_producer(registry),
1098                    },
1099                    _marker: PhantomData,
1100                }
1101            }
1102
1103            /// Keep value if predicate holds. std: `Option::filter`
1104            pub fn filter<Params, S: IntoRefStep<T, bool, Params>>(
1105                self,
1106                f: S,
1107                registry: &Registry,
1108            ) -> $Builder<$U, Option<T>, FilterNode<Chain, S::Step>> {
1109                $Builder {
1110                    chain: FilterNode {
1111                        prev: self.chain,
1112                        step: f.into_ref_step(registry),
1113                    },
1114                    _marker: PhantomData,
1115                }
1116            }
1117
1118            /// Side effect on Some value. std: `Option::inspect`
1119            pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
1120                self,
1121                f: S,
1122                registry: &Registry,
1123            ) -> $Builder<$U, Option<T>, InspectOptionNode<Chain, S::Step>> {
1124                $Builder {
1125                    chain: InspectOptionNode {
1126                        prev: self.chain,
1127                        step: f.into_ref_step(registry),
1128                    },
1129                    _marker: PhantomData,
1130                }
1131            }
1132
1133            /// None becomes Err(err). std: `Option::ok_or`
1134            pub fn ok_or<Err: Clone>(
1135                self,
1136                err: Err,
1137            ) -> $Builder<$U, Result<T, Err>, OkOrNode<Chain, Err>> {
1138                $Builder {
1139                    chain: OkOrNode {
1140                        prev: self.chain,
1141                        err,
1142                    },
1143                    _marker: PhantomData,
1144                }
1145            }
1146
1147            /// None becomes Err(f()). std: `Option::ok_or_else`
1148            pub fn ok_or_else<Err, Params, S: IntoProducer<Err, Params>>(
1149                self,
1150                f: S,
1151                registry: &Registry,
1152            ) -> $Builder<$U, Result<T, Err>, OkOrElseNode<Chain, S::Step>> {
1153                $Builder {
1154                    chain: OkOrElseNode {
1155                        prev: self.chain,
1156                        producer: f.into_producer(registry),
1157                    },
1158                    _marker: PhantomData,
1159                }
1160            }
1161
1162            /// Exit Option — None becomes the default value.
1163            pub fn unwrap_or(self, default: T) -> $Builder<$U, T, UnwrapOrOptionNode<Chain, T>>
1164            where
1165                T: Clone,
1166            {
1167                $Builder {
1168                    chain: UnwrapOrOptionNode {
1169                        prev: self.chain,
1170                        default,
1171                    },
1172                    _marker: PhantomData,
1173                }
1174            }
1175
1176            /// Exit Option — None becomes `f()`.
1177            pub fn unwrap_or_else<Params, S: IntoProducer<T, Params>>(
1178                self,
1179                f: S,
1180                registry: &Registry,
1181            ) -> $Builder<$U, T, UnwrapOrElseOptionNode<Chain, S::Step>> {
1182                $Builder {
1183                    chain: UnwrapOrElseOptionNode {
1184                        prev: self.chain,
1185                        producer: f.into_producer(registry),
1186                    },
1187                    _marker: PhantomData,
1188                }
1189            }
1190        }
1191
1192        // =============================================================
1193        // Result helpers
1194        // =============================================================
1195
1196        impl<$U, T: 'static, Err: 'static, Chain> $Builder<$U, Result<T, Err>, Chain> {
1197            /// Transform the Ok value. Step not called on Err.
1198            pub fn map<U, Params, S: IntoStep<&'static T, U, Params>>(
1199                self,
1200                f: S,
1201                registry: &Registry,
1202            ) -> $Builder<$U, Result<U, Err>, DagMapResultNode<Chain, S::Step, U>>
1203            where
1204                U: 'static,
1205                S::Step: for<'x> StepCall<&'x T, Out = U>,
1206            {
1207                $Builder {
1208                    chain: DagMapResultNode {
1209                        prev: self.chain,
1210                        step: f.into_step(registry),
1211                        _out: PhantomData,
1212                    },
1213                    _marker: PhantomData,
1214                }
1215            }
1216
1217            /// Short-circuits on Err. std: `Result::and_then`
1218            pub fn and_then<U, Params, S: IntoStep<&'static T, Result<U, Err>, Params>>(
1219                self,
1220                f: S,
1221                registry: &Registry,
1222            ) -> $Builder<$U, Result<U, Err>, DagAndThenResultNode<Chain, S::Step, U>>
1223            where
1224                U: 'static,
1225                S::Step: for<'x> StepCall<&'x T, Out = Result<U, Err>>,
1226            {
1227                $Builder {
1228                    chain: DagAndThenResultNode {
1229                        prev: self.chain,
1230                        step: f.into_step(registry),
1231                        _out: PhantomData,
1232                    },
1233                    _marker: PhantomData,
1234                }
1235            }
1236
1237            /// Handle error and transition to Option.
1238            pub fn catch<Params, S: IntoStep<&'static Err, (), Params>>(
1239                self,
1240                f: S,
1241                registry: &Registry,
1242            ) -> $Builder<$U, Option<T>, DagCatchNode<Chain, S::Step>>
1243            where
1244                S::Step: for<'x> StepCall<&'x Err, Out = ()>,
1245            {
1246                $Builder {
1247                    chain: DagCatchNode {
1248                        prev: self.chain,
1249                        step: f.into_step(registry),
1250                    },
1251                    _marker: PhantomData,
1252                }
1253            }
1254
1255            /// Transform the error. std: `Result::map_err`
1256            pub fn map_err<Err2, Params, S: IntoStep<Err, Err2, Params>>(
1257                self,
1258                f: S,
1259                registry: &Registry,
1260            ) -> $Builder<$U, Result<T, Err2>, MapErrNode<Chain, S::Step>> {
1261                $Builder {
1262                    chain: MapErrNode {
1263                        prev: self.chain,
1264                        step: f.into_step(registry),
1265                    },
1266                    _marker: PhantomData,
1267                }
1268            }
1269
1270            /// Recover from Err. std: `Result::or_else`
1271            pub fn or_else<Err2, Params, S: IntoStep<Err, Result<T, Err2>, Params>>(
1272                self,
1273                f: S,
1274                registry: &Registry,
1275            ) -> $Builder<$U, Result<T, Err2>, OrElseNode<Chain, S::Step>> {
1276                $Builder {
1277                    chain: OrElseNode {
1278                        prev: self.chain,
1279                        step: f.into_step(registry),
1280                    },
1281                    _marker: PhantomData,
1282                }
1283            }
1284
1285            /// Side effect on Ok. std: `Result::inspect`
1286            pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
1287                self,
1288                f: S,
1289                registry: &Registry,
1290            ) -> $Builder<$U, Result<T, Err>, InspectResultNode<Chain, S::Step>> {
1291                $Builder {
1292                    chain: InspectResultNode {
1293                        prev: self.chain,
1294                        step: f.into_ref_step(registry),
1295                    },
1296                    _marker: PhantomData,
1297                }
1298            }
1299
1300            /// Side effect on Err. std: `Result::inspect_err`
1301            pub fn inspect_err<Params, S: IntoRefStep<Err, (), Params>>(
1302                self,
1303                f: S,
1304                registry: &Registry,
1305            ) -> $Builder<$U, Result<T, Err>, InspectErrNode<Chain, S::Step>> {
1306                $Builder {
1307                    chain: InspectErrNode {
1308                        prev: self.chain,
1309                        step: f.into_ref_step(registry),
1310                    },
1311                    _marker: PhantomData,
1312                }
1313            }
1314
1315            /// Discard error, enter Option land. std: `Result::ok`
1316            pub fn ok(self) -> $Builder<$U, Option<T>, OkResultNode<Chain>> {
1317                $Builder {
1318                    chain: OkResultNode { prev: self.chain },
1319                    _marker: PhantomData,
1320                }
1321            }
1322
1323            /// Exit Result — Err becomes the default value.
1324            pub fn unwrap_or(self, default: T) -> $Builder<$U, T, UnwrapOrResultNode<Chain, T>>
1325            where
1326                T: Clone,
1327            {
1328                $Builder {
1329                    chain: UnwrapOrResultNode {
1330                        prev: self.chain,
1331                        default,
1332                    },
1333                    _marker: PhantomData,
1334                }
1335            }
1336
1337            /// Exit Result — Err becomes `f(err)`.
1338            pub fn unwrap_or_else<Params, S: IntoStep<Err, T, Params>>(
1339                self,
1340                f: S,
1341                registry: &Registry,
1342            ) -> $Builder<$U, T, UnwrapOrElseResultNode<Chain, S::Step>> {
1343                $Builder {
1344                    chain: UnwrapOrElseResultNode {
1345                        prev: self.chain,
1346                        step: f.into_step(registry),
1347                    },
1348                    _marker: PhantomData,
1349                }
1350            }
1351        }
1352    };
1353}
1354
1355impl_dag_combinators!(builder: DagChain, upstream: E);
1356impl_dag_combinators!(builder: DagArm, upstream: In);
1357
1358// =============================================================================
1359// Merge / Join named nodes — fork terminal nodes
1360// =============================================================================
1361
1362/// Merge two fork arms into a single output via [`MergeStepCall`].
1363#[doc(hidden)]
1364pub struct MergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut> {
1365    pub(crate) chain: Chain,
1366    pub(crate) arm0: C0,
1367    pub(crate) arm1: C1,
1368    pub(crate) merge: MS,
1369    pub(crate) _marker: PhantomData<fn(ForkOut) -> (A0, A1, MOut)>,
1370}
1371
1372impl<In, Chain, C0, C1, MS, ForkOut, A0, A1, MOut> ChainCall<In>
1373    for MergeNode2<Chain, C0, C1, MS, ForkOut, A0, A1, MOut>
1374where
1375    ForkOut: 'static,
1376    A0: 'static,
1377    A1: 'static,
1378    Chain: ChainCall<In, Out = ForkOut>,
1379    C0: for<'a> ChainCall<&'a ForkOut, Out = A0>,
1380    C1: for<'a> ChainCall<&'a ForkOut, Out = A1>,
1381    MS: for<'x> MergeStepCall<(&'x A0, &'x A1), MOut>,
1382{
1383    type Out = MOut;
1384
1385    #[inline(always)]
1386    fn call(&mut self, world: &mut World, input: In) -> MOut {
1387        let fork_out = self.chain.call(world, input);
1388        let o0 = self.arm0.call(world, &fork_out);
1389        let o1 = self.arm1.call(world, &fork_out);
1390        self.merge.call(world, (&o0, &o1))
1391    }
1392}
1393
1394/// Merge three fork arms into a single output via [`MergeStepCall`].
1395#[doc(hidden)]
1396pub struct MergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> {
1397    pub(crate) chain: Chain,
1398    pub(crate) arm0: C0,
1399    pub(crate) arm1: C1,
1400    pub(crate) arm2: C2,
1401    pub(crate) merge: MS,
1402    pub(crate) _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, MOut)>,
1403}
1404
1405impl<In, Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut> ChainCall<In>
1406    for MergeNode3<Chain, C0, C1, C2, MS, ForkOut, A0, A1, A2, MOut>
1407where
1408    ForkOut: 'static,
1409    A0: 'static,
1410    A1: 'static,
1411    A2: 'static,
1412    Chain: ChainCall<In, Out = ForkOut>,
1413    C0: for<'a> ChainCall<&'a ForkOut, Out = A0>,
1414    C1: for<'a> ChainCall<&'a ForkOut, Out = A1>,
1415    C2: for<'a> ChainCall<&'a ForkOut, Out = A2>,
1416    MS: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2), MOut>,
1417{
1418    type Out = MOut;
1419
1420    #[inline(always)]
1421    fn call(&mut self, world: &mut World, input: In) -> MOut {
1422        let fork_out = self.chain.call(world, input);
1423        let o0 = self.arm0.call(world, &fork_out);
1424        let o1 = self.arm1.call(world, &fork_out);
1425        let o2 = self.arm2.call(world, &fork_out);
1426        self.merge.call(world, (&o0, &o1, &o2))
1427    }
1428}
1429
1430/// Merge four fork arms into a single output via [`MergeStepCall`].
1431#[doc(hidden)]
1432pub struct MergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> {
1433    pub(crate) chain: Chain,
1434    pub(crate) arm0: C0,
1435    pub(crate) arm1: C1,
1436    pub(crate) arm2: C2,
1437    pub(crate) arm3: C3,
1438    pub(crate) merge: MS,
1439    pub(crate) _marker: PhantomData<fn(ForkOut) -> (A0, A1, A2, A3, MOut)>,
1440}
1441
1442#[allow(clippy::many_single_char_names)]
1443impl<In, Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut> ChainCall<In>
1444    for MergeNode4<Chain, C0, C1, C2, C3, MS, ForkOut, A0, A1, A2, A3, MOut>
1445where
1446    ForkOut: 'static,
1447    A0: 'static,
1448    A1: 'static,
1449    A2: 'static,
1450    A3: 'static,
1451    Chain: ChainCall<In, Out = ForkOut>,
1452    C0: for<'a> ChainCall<&'a ForkOut, Out = A0>,
1453    C1: for<'a> ChainCall<&'a ForkOut, Out = A1>,
1454    C2: for<'a> ChainCall<&'a ForkOut, Out = A2>,
1455    C3: for<'a> ChainCall<&'a ForkOut, Out = A3>,
1456    MS: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2, &'x A3), MOut>,
1457{
1458    type Out = MOut;
1459
1460    #[inline(always)]
1461    fn call(&mut self, world: &mut World, input: In) -> MOut {
1462        let fork_out = self.chain.call(world, input);
1463        let o0 = self.arm0.call(world, &fork_out);
1464        let o1 = self.arm1.call(world, &fork_out);
1465        let o2 = self.arm2.call(world, &fork_out);
1466        let o3 = self.arm3.call(world, &fork_out);
1467        self.merge.call(world, (&o0, &o1, &o2, &o3))
1468    }
1469}
1470
1471/// Join two fork arms (all producing `()`).
1472#[doc(hidden)]
1473pub struct JoinNode2<Chain, C0, C1, ForkOut> {
1474    pub(crate) chain: Chain,
1475    pub(crate) arm0: C0,
1476    pub(crate) arm1: C1,
1477    pub(crate) _marker: PhantomData<fn() -> ForkOut>,
1478}
1479
1480impl<In, Chain, C0, C1, ForkOut> ChainCall<In> for JoinNode2<Chain, C0, C1, ForkOut>
1481where
1482    ForkOut: 'static,
1483    Chain: ChainCall<In, Out = ForkOut>,
1484    C0: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1485    C1: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1486{
1487    type Out = ();
1488
1489    #[inline(always)]
1490    fn call(&mut self, world: &mut World, input: In) {
1491        let fork_out = self.chain.call(world, input);
1492        self.arm0.call(world, &fork_out);
1493        self.arm1.call(world, &fork_out);
1494    }
1495}
1496
1497/// Join three fork arms (all producing `()`).
1498#[doc(hidden)]
1499pub struct JoinNode3<Chain, C0, C1, C2, ForkOut> {
1500    pub(crate) chain: Chain,
1501    pub(crate) arm0: C0,
1502    pub(crate) arm1: C1,
1503    pub(crate) arm2: C2,
1504    pub(crate) _marker: PhantomData<fn() -> ForkOut>,
1505}
1506
1507impl<In, Chain, C0, C1, C2, ForkOut> ChainCall<In> for JoinNode3<Chain, C0, C1, C2, ForkOut>
1508where
1509    ForkOut: 'static,
1510    Chain: ChainCall<In, Out = ForkOut>,
1511    C0: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1512    C1: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1513    C2: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1514{
1515    type Out = ();
1516
1517    #[inline(always)]
1518    fn call(&mut self, world: &mut World, input: In) {
1519        let fork_out = self.chain.call(world, input);
1520        self.arm0.call(world, &fork_out);
1521        self.arm1.call(world, &fork_out);
1522        self.arm2.call(world, &fork_out);
1523    }
1524}
1525
1526/// Join four fork arms (all producing `()`).
1527#[doc(hidden)]
1528pub struct JoinNode4<Chain, C0, C1, C2, C3, ForkOut> {
1529    pub(crate) chain: Chain,
1530    pub(crate) arm0: C0,
1531    pub(crate) arm1: C1,
1532    pub(crate) arm2: C2,
1533    pub(crate) arm3: C3,
1534    pub(crate) _marker: PhantomData<fn() -> ForkOut>,
1535}
1536
1537#[allow(clippy::many_single_char_names)]
1538impl<In, Chain, C0, C1, C2, C3, ForkOut> ChainCall<In> for JoinNode4<Chain, C0, C1, C2, C3, ForkOut>
1539where
1540    ForkOut: 'static,
1541    Chain: ChainCall<In, Out = ForkOut>,
1542    C0: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1543    C1: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1544    C2: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1545    C3: for<'a> ChainCall<&'a ForkOut, Out = ()>,
1546{
1547    type Out = ();
1548
1549    #[inline(always)]
1550    fn call(&mut self, world: &mut World, input: In) {
1551        let fork_out = self.chain.call(world, input);
1552        self.arm0.call(world, &fork_out);
1553        self.arm1.call(world, &fork_out);
1554        self.arm2.call(world, &fork_out);
1555        self.arm3.call(world, &fork_out);
1556    }
1557}
1558
1559// =============================================================================
1560// Splat — tuple destructuring into individual reference arguments (DAG)
1561// =============================================================================
1562//
1563// DAG splat reuses IntoMergeStep/MergeStepCall since DAG steps take inputs
1564// by reference — the function signature is the same as a merge step:
1565// `fn(Params..., &A, &B) -> Out`.
1566//
1567// Builder types are `#[doc(hidden)]` — users only see `.splat().then()`.
1568
1569macro_rules! define_dag_splat_builders {
1570    (
1571        $N:literal,
1572        chain: $SplatChain:ident,
1573        arm: $SplatArm:ident,
1574        arm_start: $SplatArmStart:ident,
1575        splat_then: $SplatThenNode:ident,
1576        splat_arm_start: $SplatArmStartNode:ident,
1577        ($($T:ident),+),
1578        ($($idx:tt),+)
1579    ) => {
1580        // -- Named node: splat + step on upstream chain --
1581
1582        #[doc(hidden)]
1583        pub struct $SplatThenNode<Chain, MS, $($T,)+ NewOut> {
1584            pub(crate) chain: Chain,
1585            pub(crate) merge: MS,
1586            pub(crate) _marker: PhantomData<fn() -> ($($T,)+ NewOut)>,
1587        }
1588
1589        impl<In, Chain, MS, $($T: 'static,)+ NewOut> ChainCall<In>
1590            for $SplatThenNode<Chain, MS, $($T,)+ NewOut>
1591        where
1592            Chain: ChainCall<In, Out = ($($T,)+)>,
1593            MS: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1594        {
1595            type Out = NewOut;
1596
1597            #[inline(always)]
1598            fn call(&mut self, world: &mut World, input: In) -> NewOut {
1599                let tuple = self.chain.call(world, input);
1600                self.merge.call(world, ($(&tuple.$idx,)+))
1601            }
1602        }
1603
1604        // -- Named node: splat at arm start (no upstream chain) --
1605
1606        #[doc(hidden)]
1607        pub struct $SplatArmStartNode<MS, $($T,)+ Out> {
1608            pub(crate) merge: MS,
1609            pub(crate) _marker: PhantomData<fn() -> ($($T,)+ Out)>,
1610        }
1611
1612        impl<'inp, $($T: 'static,)+ MS, Out> ChainCall<&'inp ($($T,)+)>
1613            for $SplatArmStartNode<MS, $($T,)+ Out>
1614        where
1615            MS: for<'x> MergeStepCall<($(&'x $T,)+), Out>,
1616        {
1617            type Out = Out;
1618
1619            #[inline(always)]
1620            fn call(&mut self, world: &mut World, input: &($($T,)+)) -> Out {
1621                self.merge.call(world, ($(&input.$idx,)+))
1622            }
1623        }
1624
1625        // -- Splat builder on main chain --
1626
1627        /// DAG splat builder on the main chain.
1628        #[doc(hidden)]
1629        pub struct $SplatChain<E, $($T,)+ Chain> {
1630            chain: Chain,
1631            _marker: PhantomData<fn(E) -> ($($T,)+)>,
1632        }
1633
1634        impl<E, $($T: 'static,)+ Chain> $SplatChain<E, $($T,)+ Chain> {
1635            /// Add a step that receives the tuple elements as individual `&T` arguments.
1636            pub fn then<NewOut, Params, S>(
1637                self,
1638                f: S,
1639                registry: &Registry,
1640            ) -> DagChain<E, NewOut, $SplatThenNode<Chain, S::Step, $($T,)+ NewOut>>
1641            where
1642                NewOut: 'static,
1643                S: IntoMergeStep<($(&'static $T,)+), NewOut, Params>,
1644                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1645            {
1646                DagChain {
1647                    chain: $SplatThenNode {
1648                        chain: self.chain,
1649                        merge: f.into_merge_step(registry),
1650                        _marker: PhantomData,
1651                    },
1652                    _marker: PhantomData,
1653                }
1654            }
1655        }
1656
1657        impl<E, $($T: 'static,)+ Chain> DagChain<E, ($($T,)+), Chain> {
1658            /// Destructure the tuple output into individual `&T` arguments.
1659            pub fn splat(self) -> $SplatChain<E, $($T,)+ Chain> {
1660                $SplatChain {
1661                    chain: self.chain,
1662                    _marker: PhantomData,
1663                }
1664            }
1665        }
1666
1667        // -- Splat builder within an arm --
1668
1669        /// DAG splat builder within an arm.
1670        #[doc(hidden)]
1671        pub struct $SplatArm<In, $($T,)+ Chain> {
1672            chain: Chain,
1673            _marker: PhantomData<fn(*const In) -> ($($T,)+)>,
1674        }
1675
1676        impl<In: 'static, $($T: 'static,)+ Chain> $SplatArm<In, $($T,)+ Chain> {
1677            /// Add a step that receives the tuple elements as individual `&T` arguments.
1678            pub fn then<NewOut, Params, S>(
1679                self,
1680                f: S,
1681                registry: &Registry,
1682            ) -> DagArm<In, NewOut, $SplatThenNode<Chain, S::Step, $($T,)+ NewOut>>
1683            where
1684                NewOut: 'static,
1685                S: IntoMergeStep<($(&'static $T,)+), NewOut, Params>,
1686                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1687            {
1688                DagArm {
1689                    chain: $SplatThenNode {
1690                        chain: self.chain,
1691                        merge: f.into_merge_step(registry),
1692                        _marker: PhantomData,
1693                    },
1694                    _marker: PhantomData,
1695                }
1696            }
1697        }
1698
1699        impl<In: 'static, $($T: 'static,)+ Chain> DagArm<In, ($($T,)+), Chain> {
1700            /// Destructure the tuple output into individual `&T` arguments.
1701            pub fn splat(self) -> $SplatArm<In, $($T,)+ Chain> {
1702                $SplatArm {
1703                    chain: self.chain,
1704                    _marker: PhantomData,
1705                }
1706            }
1707        }
1708
1709        // -- Splat at arm start position --
1710
1711        /// DAG splat builder at arm start position.
1712        #[doc(hidden)]
1713        pub struct $SplatArmStart<$($T),+>(PhantomData<fn(($($T,)+))>);
1714
1715        impl<$($T: 'static),+> $SplatArmStart<$($T),+> {
1716            /// Add a step that receives the tuple elements as individual `&T` arguments.
1717            pub fn then<Out, Params, S>(
1718                self,
1719                f: S,
1720                registry: &Registry,
1721            ) -> DagArm<($($T,)+), Out, $SplatArmStartNode<S::Step, $($T,)+ Out>>
1722            where
1723                Out: 'static,
1724                S: IntoMergeStep<($(&'static $T,)+), Out, Params>,
1725                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), Out>,
1726            {
1727                DagArm {
1728                    chain: $SplatArmStartNode {
1729                        merge: f.into_merge_step(registry),
1730                        _marker: PhantomData,
1731                    },
1732                    _marker: PhantomData,
1733                }
1734            }
1735        }
1736
1737        impl<$($T: 'static),+> DagArmSeed<($($T,)+)> {
1738            /// Destructure the tuple input into individual `&T` arguments.
1739            pub fn splat(self) -> $SplatArmStart<$($T),+> {
1740                $SplatArmStart(PhantomData)
1741            }
1742        }
1743    };
1744}
1745
1746define_dag_splat_builders!(2,
1747    chain: DagSplatChain2,
1748    arm: DagSplatArm2,
1749    arm_start: DagSplatArmStart2,
1750    splat_then: SplatThenNode2,
1751    splat_arm_start: SplatArmStartNode2,
1752    (T0, T1),
1753    (0, 1)
1754);
1755
1756define_dag_splat_builders!(3,
1757    chain: DagSplatChain3,
1758    arm: DagSplatArm3,
1759    arm_start: DagSplatArmStart3,
1760    splat_then: SplatThenNode3,
1761    splat_arm_start: SplatArmStartNode3,
1762    (T0, T1, T2),
1763    (0, 1, 2)
1764);
1765
1766define_dag_splat_builders!(4,
1767    chain: DagSplatChain4,
1768    arm: DagSplatArm4,
1769    arm_start: DagSplatArmStart4,
1770    splat_then: SplatThenNode4,
1771    splat_arm_start: SplatArmStartNode4,
1772    (T0, T1, T2, T3),
1773    (0, 1, 2, 3)
1774);
1775
1776define_dag_splat_builders!(5,
1777    chain: DagSplatChain5,
1778    arm: DagSplatArm5,
1779    arm_start: DagSplatArmStart5,
1780    splat_then: SplatThenNode5,
1781    splat_arm_start: SplatArmStartNode5,
1782    (T0, T1, T2, T3, T4),
1783    (0, 1, 2, 3, 4)
1784);
1785
1786// =============================================================================
1787// Fork arity macro — arm accumulation, merge, join
1788// =============================================================================
1789
1790/// Generates arm accumulation, merge, and join for a fork type.
1791///
1792/// ChainFork and ArmFork differ only in what output builder they
1793/// produce (DagChain vs DagArm). All dispatch logic lives in the
1794/// named MergeNode/JoinNode types — the macro just wires construction.
1795macro_rules! impl_dag_fork {
1796    (
1797        fork: $Fork:ident,
1798        output: $Output:ident,
1799        upstream: $U:ident
1800    ) => {
1801        // =============================================================
1802        // Arm accumulation: 0→1, 1→2, 2→3, 3→4
1803        // =============================================================
1804
1805        impl<$U, ForkOut, Chain> $Fork<$U, ForkOut, Chain, ()> {
1806            /// Add the first arm to this fork.
1807            pub fn arm<AOut, ACh>(
1808                self,
1809                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1810            ) -> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, AOut, ACh>,)> {
1811                let arm = f(DagArmSeed(PhantomData));
1812                $Fork {
1813                    chain: self.chain,
1814                    arms: (arm,),
1815                    _marker: PhantomData,
1816                }
1817            }
1818        }
1819
1820        impl<$U, ForkOut, Chain, A0, C0> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>,)> {
1821            /// Add a second arm to this fork.
1822            pub fn arm<AOut, ACh>(
1823                self,
1824                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1825            ) -> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, AOut, ACh>)>
1826            {
1827                let arm = f(DagArmSeed(PhantomData));
1828                let (a0,) = self.arms;
1829                $Fork {
1830                    chain: self.chain,
1831                    arms: (a0, arm),
1832                    _marker: PhantomData,
1833                }
1834            }
1835        }
1836
1837        impl<$U, ForkOut, Chain, A0, C0, A1, C1>
1838            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, A1, C1>)>
1839        {
1840            /// Add a third arm to this fork.
1841            pub fn arm<AOut, ACh>(
1842                self,
1843                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1844            ) -> $Fork<
1845                $U,
1846                ForkOut,
1847                Chain,
1848                (
1849                    DagArm<ForkOut, A0, C0>,
1850                    DagArm<ForkOut, A1, C1>,
1851                    DagArm<ForkOut, AOut, ACh>,
1852                ),
1853            > {
1854                let arm = f(DagArmSeed(PhantomData));
1855                let (a0, a1) = self.arms;
1856                $Fork {
1857                    chain: self.chain,
1858                    arms: (a0, a1, arm),
1859                    _marker: PhantomData,
1860                }
1861            }
1862        }
1863
1864        impl<$U, ForkOut, Chain, A0, C0, A1, C1, A2, C2>
1865            $Fork<
1866                $U,
1867                ForkOut,
1868                Chain,
1869                (
1870                    DagArm<ForkOut, A0, C0>,
1871                    DagArm<ForkOut, A1, C1>,
1872                    DagArm<ForkOut, A2, C2>,
1873                ),
1874            >
1875        {
1876            /// Add a fourth arm to this fork.
1877            pub fn arm<AOut, ACh>(
1878                self,
1879                f: impl FnOnce(DagArmSeed<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1880            ) -> $Fork<
1881                $U,
1882                ForkOut,
1883                Chain,
1884                (
1885                    DagArm<ForkOut, A0, C0>,
1886                    DagArm<ForkOut, A1, C1>,
1887                    DagArm<ForkOut, A2, C2>,
1888                    DagArm<ForkOut, AOut, ACh>,
1889                ),
1890            > {
1891                let arm = f(DagArmSeed(PhantomData));
1892                let (a0, a1, a2) = self.arms;
1893                $Fork {
1894                    chain: self.chain,
1895                    arms: (a0, a1, a2, arm),
1896                    _marker: PhantomData,
1897                }
1898            }
1899        }
1900
1901        // =============================================================
1902        // Merge arity 2
1903        // =============================================================
1904
1905        impl<$U, ForkOut: 'static, Chain, A0: 'static, C0, A1: 'static, C1>
1906            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, A1, C1>)>
1907        {
1908            /// Merge two arms with a merge step.
1909            pub fn merge<MOut, Params, S>(
1910                self,
1911                f: S,
1912                registry: &Registry,
1913            ) -> $Output<
1914                $U,
1915                MOut,
1916                MergeNode2<Chain, C0, C1, S::Step, ForkOut, A0, A1, MOut>,
1917            >
1918            where
1919                MOut: 'static,
1920                S: IntoMergeStep<(&'static A0, &'static A1), MOut, Params>,
1921                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1), MOut>,
1922            {
1923                let (a0, a1) = self.arms;
1924                $Output {
1925                    chain: MergeNode2 {
1926                        chain: self.chain,
1927                        arm0: a0.chain,
1928                        arm1: a1.chain,
1929                        merge: f.into_merge_step(registry),
1930                        _marker: PhantomData,
1931                    },
1932                    _marker: PhantomData,
1933                }
1934            }
1935        }
1936
1937        impl<$U, ForkOut: 'static, Chain, C0, C1>
1938            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, (), C0>, DagArm<ForkOut, (), C1>)>
1939        {
1940            /// Join two sink arms (all producing `()`).
1941            pub fn join(
1942                self,
1943            ) -> $Output<$U, (), JoinNode2<Chain, C0, C1, ForkOut>> {
1944                let (a0, a1) = self.arms;
1945                $Output {
1946                    chain: JoinNode2 {
1947                        chain: self.chain,
1948                        arm0: a0.chain,
1949                        arm1: a1.chain,
1950                        _marker: PhantomData,
1951                    },
1952                    _marker: PhantomData,
1953                }
1954            }
1955        }
1956
1957        // =============================================================
1958        // Merge arity 3
1959        // =============================================================
1960
1961        impl<$U, ForkOut: 'static, Chain, A0: 'static, C0, A1: 'static, C1, A2: 'static, C2>
1962            $Fork<
1963                $U,
1964                ForkOut,
1965                Chain,
1966                (
1967                    DagArm<ForkOut, A0, C0>,
1968                    DagArm<ForkOut, A1, C1>,
1969                    DagArm<ForkOut, A2, C2>,
1970                ),
1971            >
1972        {
1973            /// Merge three arms with a merge step.
1974            pub fn merge<MOut, Params, S>(
1975                self,
1976                f: S,
1977                registry: &Registry,
1978            ) -> $Output<
1979                $U,
1980                MOut,
1981                MergeNode3<Chain, C0, C1, C2, S::Step, ForkOut, A0, A1, A2, MOut>,
1982            >
1983            where
1984                MOut: 'static,
1985                S: IntoMergeStep<(&'static A0, &'static A1, &'static A2), MOut, Params>,
1986                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2), MOut>,
1987            {
1988                let (a0, a1, a2) = self.arms;
1989                $Output {
1990                    chain: MergeNode3 {
1991                        chain: self.chain,
1992                        arm0: a0.chain,
1993                        arm1: a1.chain,
1994                        arm2: a2.chain,
1995                        merge: f.into_merge_step(registry),
1996                        _marker: PhantomData,
1997                    },
1998                    _marker: PhantomData,
1999                }
2000            }
2001        }
2002
2003        impl<$U, ForkOut: 'static, Chain, C0, C1, C2>
2004            $Fork<
2005                $U,
2006                ForkOut,
2007                Chain,
2008                (
2009                    DagArm<ForkOut, (), C0>,
2010                    DagArm<ForkOut, (), C1>,
2011                    DagArm<ForkOut, (), C2>,
2012                ),
2013            >
2014        {
2015            /// Join three sink arms (all producing `()`).
2016            pub fn join(
2017                self,
2018            ) -> $Output<$U, (), JoinNode3<Chain, C0, C1, C2, ForkOut>> {
2019                let (a0, a1, a2) = self.arms;
2020                $Output {
2021                    chain: JoinNode3 {
2022                        chain: self.chain,
2023                        arm0: a0.chain,
2024                        arm1: a1.chain,
2025                        arm2: a2.chain,
2026                        _marker: PhantomData,
2027                    },
2028                    _marker: PhantomData,
2029                }
2030            }
2031        }
2032
2033        // =============================================================
2034        // Merge arity 4
2035        // =============================================================
2036
2037        #[allow(clippy::many_single_char_names)]
2038        impl<
2039            $U,
2040            ForkOut: 'static,
2041            Chain,
2042            A0: 'static,
2043            C0,
2044            A1: 'static,
2045            C1,
2046            A2: 'static,
2047            C2,
2048            A3: 'static,
2049            C3,
2050        >
2051            $Fork<
2052                $U,
2053                ForkOut,
2054                Chain,
2055                (
2056                    DagArm<ForkOut, A0, C0>,
2057                    DagArm<ForkOut, A1, C1>,
2058                    DagArm<ForkOut, A2, C2>,
2059                    DagArm<ForkOut, A3, C3>,
2060                ),
2061            >
2062        {
2063            /// Merge four arms with a merge step.
2064            pub fn merge<MOut, Params, S>(
2065                self,
2066                f: S,
2067                registry: &Registry,
2068            ) -> $Output<
2069                $U,
2070                MOut,
2071                MergeNode4<Chain, C0, C1, C2, C3, S::Step, ForkOut, A0, A1, A2, A3, MOut>,
2072            >
2073            where
2074                MOut: 'static,
2075                S: IntoMergeStep<
2076                    (&'static A0, &'static A1, &'static A2, &'static A3),
2077                    MOut,
2078                    Params,
2079                >,
2080                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2, &'x A3), MOut>,
2081            {
2082                let (a0, a1, a2, a3) = self.arms;
2083                $Output {
2084                    chain: MergeNode4 {
2085                        chain: self.chain,
2086                        arm0: a0.chain,
2087                        arm1: a1.chain,
2088                        arm2: a2.chain,
2089                        arm3: a3.chain,
2090                        merge: f.into_merge_step(registry),
2091                        _marker: PhantomData,
2092                    },
2093                    _marker: PhantomData,
2094                }
2095            }
2096        }
2097
2098        impl<$U, ForkOut: 'static, Chain, C0, C1, C2, C3>
2099            $Fork<
2100                $U,
2101                ForkOut,
2102                Chain,
2103                (
2104                    DagArm<ForkOut, (), C0>,
2105                    DagArm<ForkOut, (), C1>,
2106                    DagArm<ForkOut, (), C2>,
2107                    DagArm<ForkOut, (), C3>,
2108                ),
2109            >
2110        {
2111            /// Join four sink arms (all producing `()`).
2112            pub fn join(
2113                self,
2114            ) -> $Output<$U, (), JoinNode4<Chain, C0, C1, C2, C3, ForkOut>> {
2115                let (a0, a1, a2, a3) = self.arms;
2116                $Output {
2117                    chain: JoinNode4 {
2118                        chain: self.chain,
2119                        arm0: a0.chain,
2120                        arm1: a1.chain,
2121                        arm2: a2.chain,
2122                        arm3: a3.chain,
2123                        _marker: PhantomData,
2124                    },
2125                    _marker: PhantomData,
2126                }
2127            }
2128        }
2129    };
2130}
2131
2132impl_dag_fork!(fork: DagChainFork, output: DagChain, upstream: E);
2133impl_dag_fork!(fork: DagArmFork, output: DagArm, upstream: In);
2134
2135// =============================================================================
2136// build_batch — when Out: PipelineOutput (() or Option<()>)
2137// =============================================================================
2138
2139impl<E, Out: crate::PipelineOutput, Chain: ChainCall<E, Out = Out>> DagChain<E, Out, Chain> {
2140    /// Finalize into a [`BatchDag`] with a pre-allocated input buffer.
2141    ///
2142    /// Same DAG chain as [`build`](DagChain::build), but the DAG owns an
2143    /// input buffer that drivers fill between dispatch cycles. Each call
2144    /// to [`BatchDag::run`] drains the buffer, running every item through
2145    /// the chain independently.
2146    ///
2147    /// Available when the DAG ends with `()` or `Option<()>` (e.g.
2148    /// after `.guard()` or `.filter()` followed by `.unwrap_or(())`).
2149    ///
2150    /// `capacity` is the initial allocation — the buffer can grow if needed,
2151    /// but sizing it for the expected batch size avoids reallocation.
2152    #[must_use = "building a DAG without storing it does nothing"]
2153    pub fn build_batch(self, capacity: usize) -> BatchDag<E, Chain> {
2154        BatchDag {
2155            input: Vec::with_capacity(capacity),
2156            chain: self.chain,
2157        }
2158    }
2159}
2160
2161// =============================================================================
2162// BatchDag<E, F> — DAG with owned input buffer
2163// =============================================================================
2164
2165/// Batch DAG that owns a pre-allocated input buffer.
2166///
2167/// Created by [`DagChain::build_batch`]. Each item flows through the
2168/// full DAG chain independently — the same per-item `Option` and
2169/// `Result` flow control as [`Dag`]. Errors are handled inline (via
2170/// `.catch()`, `.unwrap_or()`, etc.) and the batch continues to the
2171/// next item.
2172///
2173/// Unlike [`Dag`], `BatchDag` does not implement [`Handler`] — it is
2174/// driven directly by the owner via [`run()`](BatchDag::run).
2175///
2176/// # Examples
2177///
2178/// ```
2179/// use nexus_rt::{WorldBuilder, ResMut, Resource};
2180/// use nexus_rt::dag::DagBuilder;
2181///
2182/// #[derive(Resource)]
2183/// struct Accum(u64);
2184///
2185/// let mut wb = WorldBuilder::new();
2186/// wb.register(Accum(0));
2187/// let mut world = wb.build();
2188/// let reg = world.registry();
2189///
2190/// fn double(x: u32) -> u64 { x as u64 * 2 }
2191/// fn store(mut out: ResMut<Accum>, val: &u64) { out.0 += *val; }
2192///
2193/// let mut batch = DagBuilder::<u32>::new()
2194///     .root(double, reg)
2195///     .then(store, reg)
2196///     .build_batch(8);
2197///
2198/// batch.input_mut().extend([1, 2, 3]);
2199/// batch.run(&mut world);
2200///
2201/// assert_eq!(world.resource::<Accum>().0, 12); // 2 + 4 + 6
2202/// assert!(batch.input().is_empty());
2203/// ```
2204pub struct BatchDag<E, F> {
2205    input: Vec<E>,
2206    chain: F,
2207}
2208
2209impl<E, Out: crate::PipelineOutput, F: ChainCall<E, Out = Out>> BatchDag<E, F> {
2210    /// Mutable access to the input buffer. Drivers fill this between
2211    /// dispatch cycles.
2212    pub fn input_mut(&mut self) -> &mut Vec<E> {
2213        &mut self.input
2214    }
2215
2216    /// Read-only access to the input buffer.
2217    pub fn input(&self) -> &[E] {
2218        &self.input
2219    }
2220
2221    /// Drain the input buffer, running each item through the DAG.
2222    ///
2223    /// Each item gets independent `Option`/`Result` flow control — an
2224    /// error on one item does not affect subsequent items. After `run()`,
2225    /// the input buffer is empty but retains its allocation.
2226    pub fn run(&mut self, world: &mut World) {
2227        for item in self.input.drain(..) {
2228            let _ = self.chain.call(world, item);
2229        }
2230    }
2231}
2232
2233// =============================================================================
2234// resolve_arm — pre-resolve a step for manual dispatch
2235// =============================================================================
2236
2237/// Resolve a step for use in manual dispatch (e.g. inside an
2238/// opaque `.then()` closure).
2239///
2240/// Returns a closure with pre-resolved [`Param`](crate::Param) state —
2241/// the same build-time resolution that `.then()` performs, but as a
2242/// standalone value the caller can invoke from any context.
2243///
2244/// # Examples
2245///
2246/// ```ignore
2247/// let mut arm0 = resolve_arm(handle_new, reg);
2248/// let mut arm1 = resolve_arm(handle_cancel, reg);
2249///
2250/// dag.then(move |world: &mut World, msg: &Decoded| match msg.kind {
2251///     MsgKind::NewOrder => arm0(world, msg),
2252///     MsgKind::Cancel   => arm1(world, msg),
2253/// }, reg)
2254/// ```
2255pub fn resolve_arm<In, Out, Params, S>(
2256    f: S,
2257    registry: &Registry,
2258) -> impl FnMut(&mut World, &In) -> Out + use<In, Out, Params, S>
2259where
2260    In: 'static,
2261    Out: 'static,
2262    S: IntoStep<&'static In, Out, Params>,
2263    S::Step: for<'a> StepCall<&'a In, Out = Out>,
2264{
2265    let mut resolved = f.into_step(registry);
2266    move |world: &mut World, input: &In| resolved.call(world, input)
2267}
2268
2269// =============================================================================
2270// Tests
2271// =============================================================================
2272
2273#[cfg(test)]
2274#[allow(
2275    clippy::ref_option,
2276    clippy::unnecessary_wraps,
2277    clippy::needless_pass_by_value,
2278    clippy::trivially_copy_pass_by_ref,
2279    clippy::ptr_arg
2280)]
2281mod tests {
2282    use super::*;
2283    use crate::{IntoHandler, Res, ResMut, Virtual, WorldBuilder};
2284
2285    // -- Linear chains --
2286
2287    #[test]
2288    fn dag_linear_2() {
2289        let mut wb = WorldBuilder::new();
2290        wb.register::<u64>(0);
2291        let mut world = wb.build();
2292        let reg = world.registry();
2293
2294        fn root_mul2(x: u32) -> u64 {
2295            x as u64 * 2
2296        }
2297        fn store(mut out: ResMut<u64>, val: &u64) {
2298            *out = *val;
2299        }
2300
2301        let mut dag = DagBuilder::<u32>::new()
2302            .root(root_mul2, reg)
2303            .then(store, reg)
2304            .build();
2305
2306        dag.run(&mut world, 5u32);
2307        assert_eq!(*world.resource::<u64>(), 10);
2308    }
2309
2310    #[test]
2311    fn dag_linear_3() {
2312        let mut wb = WorldBuilder::new();
2313        wb.register::<u64>(0);
2314        let mut world = wb.build();
2315        let reg = world.registry();
2316
2317        fn root_mul2(x: u32) -> u64 {
2318            x as u64 * 2
2319        }
2320        fn add_one(val: &u64) -> u64 {
2321            *val + 1
2322        }
2323        fn store(mut out: ResMut<u64>, val: &u64) {
2324            *out = *val;
2325        }
2326
2327        let mut dag = DagBuilder::<u32>::new()
2328            .root(root_mul2, reg)
2329            .then(add_one, reg)
2330            .then(store, reg)
2331            .build();
2332
2333        dag.run(&mut world, 5u32);
2334        assert_eq!(*world.resource::<u64>(), 11); // (5*2)+1
2335    }
2336
2337    #[test]
2338    fn dag_linear_5() {
2339        let mut wb = WorldBuilder::new();
2340        wb.register::<u64>(0);
2341        let mut world = wb.build();
2342        let reg = world.registry();
2343
2344        fn root_id(x: u32) -> u64 {
2345            x as u64
2346        }
2347        fn add_one(val: &u64) -> u64 {
2348            *val + 1
2349        }
2350        fn store(mut out: ResMut<u64>, val: &u64) {
2351            *out = *val;
2352        }
2353
2354        let mut dag = DagBuilder::<u32>::new()
2355            .root(root_id, reg)
2356            .then(add_one, reg)
2357            .then(add_one, reg)
2358            .then(add_one, reg)
2359            .then(store, reg)
2360            .build();
2361
2362        dag.run(&mut world, 0u32);
2363        assert_eq!(*world.resource::<u64>(), 3); // 0+1+1+1
2364    }
2365
2366    // -- Diamond: root → [a, b] → merge → sink --
2367
2368    #[test]
2369    fn dag_diamond() {
2370        let mut wb = WorldBuilder::new();
2371        wb.register::<u64>(0);
2372        let mut world = wb.build();
2373        let reg = world.registry();
2374
2375        fn root_mul2(x: u32) -> u32 {
2376            x.wrapping_mul(2)
2377        }
2378        fn add_one(val: &u32) -> u32 {
2379            val.wrapping_add(1)
2380        }
2381        fn mul3(val: &u32) -> u32 {
2382            val.wrapping_mul(3)
2383        }
2384        fn merge_add(a: &u32, b: &u32) -> u32 {
2385            a.wrapping_add(*b)
2386        }
2387        fn store(mut out: ResMut<u64>, val: &u32) {
2388            *out = *val as u64;
2389        }
2390
2391        let mut dag = DagBuilder::<u32>::new()
2392            .root(root_mul2, reg)
2393            .fork()
2394            .arm(|a| a.then(add_one, reg))
2395            .arm(|b| b.then(mul3, reg))
2396            .merge(merge_add, reg)
2397            .then(store, reg)
2398            .build();
2399
2400        dag.run(&mut world, 5u32);
2401        // root: 10, arm_a: 11, arm_b: 30, merge: 41
2402        assert_eq!(*world.resource::<u64>(), 41);
2403    }
2404
2405    // -- Fan-out to sinks (.join()) --
2406
2407    #[test]
2408    fn dag_fan_out_join() {
2409        let mut wb = WorldBuilder::new();
2410        wb.register::<u64>(0);
2411        wb.register::<i64>(0);
2412        let mut world = wb.build();
2413        let reg = world.registry();
2414
2415        fn root_id(x: u32) -> u64 {
2416            x as u64
2417        }
2418        fn sink_u64(mut out: ResMut<u64>, val: &u64) {
2419            *out = *val * 2;
2420        }
2421        fn sink_i64(mut out: ResMut<i64>, val: &u64) {
2422            *out = *val as i64 * 3;
2423        }
2424
2425        let mut dag = DagBuilder::<u32>::new()
2426            .root(root_id, reg)
2427            .fork()
2428            .arm(|a| a.then(sink_u64, reg))
2429            .arm(|b| b.then(sink_i64, reg))
2430            .join()
2431            .build();
2432
2433        dag.run(&mut world, 5u32);
2434        assert_eq!(*world.resource::<u64>(), 10);
2435        assert_eq!(*world.resource::<i64>(), 15);
2436    }
2437
2438    // -- Nested fork within an arm --
2439
2440    #[test]
2441    fn dag_nested_fork() {
2442        let mut wb = WorldBuilder::new();
2443        wb.register::<u64>(0);
2444        let mut world = wb.build();
2445        let reg = world.registry();
2446
2447        fn root_id(x: u32) -> u32 {
2448            x
2449        }
2450        fn add_10(val: &u32) -> u32 {
2451            val.wrapping_add(10)
2452        }
2453        fn mul2(val: &u32) -> u32 {
2454            val.wrapping_mul(2)
2455        }
2456        fn mul3(val: &u32) -> u32 {
2457            val.wrapping_mul(3)
2458        }
2459        fn inner_merge(a: &u32, b: &u32) -> u32 {
2460            a.wrapping_add(*b)
2461        }
2462        fn outer_merge(a: &u32, b: &u32) -> u32 {
2463            a.wrapping_add(*b)
2464        }
2465        fn store(mut out: ResMut<u64>, val: &u32) {
2466            *out = *val as u64;
2467        }
2468
2469        // root(5)=5 → fork
2470        //   arm_a: add_10(5)=15 → fork
2471        //     sub_c: mul2(15)=30
2472        //     sub_d: mul3(15)=45
2473        //     inner_merge(30,45)=75
2474        //   arm_b: mul3(5)=15
2475        // outer_merge(75,15)=90
2476        let mut dag = DagBuilder::<u32>::new()
2477            .root(root_id, reg)
2478            .fork()
2479            .arm(|a| {
2480                a.then(add_10, reg)
2481                    .fork()
2482                    .arm(|c| c.then(mul2, reg))
2483                    .arm(|d| d.then(mul3, reg))
2484                    .merge(inner_merge, reg)
2485            })
2486            .arm(|b| b.then(mul3, reg))
2487            .merge(outer_merge, reg)
2488            .then(store, reg)
2489            .build();
2490
2491        dag.run(&mut world, 5u32);
2492        assert_eq!(*world.resource::<u64>(), 90);
2493    }
2494
2495    // -- Complex topology: asymmetric arm lengths --
2496
2497    #[test]
2498    fn dag_complex_topology() {
2499        let mut wb = WorldBuilder::new();
2500        wb.register::<u64>(0);
2501        let mut world = wb.build();
2502        let reg = world.registry();
2503
2504        fn root_mul2(x: u32) -> u32 {
2505            x.wrapping_mul(2)
2506        }
2507        fn add_one(val: &u32) -> u32 {
2508            val.wrapping_add(1)
2509        }
2510        fn add_then_mul2(val: &u32) -> u32 {
2511            val.wrapping_add(1).wrapping_mul(2)
2512        }
2513        fn mul3(val: &u32) -> u32 {
2514            val.wrapping_mul(3)
2515        }
2516        fn merge_add(a: &u32, b: &u32) -> u32 {
2517            a.wrapping_add(*b)
2518        }
2519        fn store(mut out: ResMut<u64>, val: &u32) {
2520            *out = *val as u64;
2521        }
2522
2523        // root(5)=10 → fork
2524        //   a: add_one(10)=11 → add_then_mul2(11)=24
2525        //   b: mul3(10)=30
2526        // merge(24, 30) = 54
2527        let mut dag = DagBuilder::<u32>::new()
2528            .root(root_mul2, reg)
2529            .fork()
2530            .arm(|a| a.then(add_one, reg).then(add_then_mul2, reg))
2531            .arm(|b| b.then(mul3, reg))
2532            .merge(merge_add, reg)
2533            .then(store, reg)
2534            .build();
2535
2536        dag.run(&mut world, 5u32);
2537        assert_eq!(*world.resource::<u64>(), 54);
2538    }
2539
2540    // -- Boxable into Box<dyn Handler<E>> --
2541
2542    #[test]
2543    fn dag_boxable() {
2544        let mut wb = WorldBuilder::new();
2545        wb.register::<u64>(0);
2546        let mut world = wb.build();
2547        let reg = world.registry();
2548
2549        fn root_id(x: u32) -> u64 {
2550            x as u64
2551        }
2552        fn store(mut out: ResMut<u64>, val: &u64) {
2553            *out = *val;
2554        }
2555
2556        let mut boxed: Virtual<u32> = Box::new(
2557            DagBuilder::<u32>::new()
2558                .root(root_id, reg)
2559                .then(store, reg)
2560                .build(),
2561        );
2562        boxed.run(&mut world, 77u32);
2563        assert_eq!(*world.resource::<u64>(), 77);
2564    }
2565
2566    // -- World access (Res<T>, ResMut<T>) in nodes --
2567
2568    #[test]
2569    fn dag_world_access() {
2570        let mut wb = WorldBuilder::new();
2571        wb.register::<u64>(10); // factor
2572        wb.register::<String>(String::new());
2573        let mut world = wb.build();
2574        let reg = world.registry();
2575
2576        fn scale(factor: Res<u64>, val: &u32) -> u64 {
2577            *factor * (*val as u64)
2578        }
2579        fn store(mut out: ResMut<String>, val: &u64) {
2580            *out = val.to_string();
2581        }
2582
2583        let mut dag = DagBuilder::<u32>::new()
2584            .root(|x: u32| x, reg)
2585            .then(scale, reg)
2586            .then(store, reg)
2587            .build();
2588
2589        dag.run(&mut world, 7u32);
2590        assert_eq!(world.resource::<String>().as_str(), "70");
2591    }
2592
2593    // -- Root-only (terminal root outputting ()) --
2594
2595    #[test]
2596    fn dag_root_only() {
2597        let mut wb = WorldBuilder::new();
2598        wb.register::<u64>(0);
2599        let mut world = wb.build();
2600        let reg = world.registry();
2601
2602        let mut dag = DagBuilder::<u32>::new()
2603            .root(
2604                |mut out: ResMut<u64>, x: u32| {
2605                    *out = x as u64;
2606                },
2607                reg,
2608            )
2609            .build();
2610
2611        dag.run(&mut world, 42u32);
2612        assert_eq!(*world.resource::<u64>(), 42);
2613    }
2614
2615    // -- Multiple dispatches reuse state --
2616
2617    #[test]
2618    fn dag_multiple_dispatches() {
2619        let mut wb = WorldBuilder::new();
2620        wb.register::<u64>(0);
2621        let mut world = wb.build();
2622        let reg = world.registry();
2623
2624        fn root_id(x: u32) -> u64 {
2625            x as u64
2626        }
2627        fn store(mut out: ResMut<u64>, val: &u64) {
2628            *out = *val;
2629        }
2630
2631        let mut dag = DagBuilder::<u32>::new()
2632            .root(root_id, reg)
2633            .then(store, reg)
2634            .build();
2635
2636        dag.run(&mut world, 1u32);
2637        assert_eq!(*world.resource::<u64>(), 1);
2638        dag.run(&mut world, 2u32);
2639        assert_eq!(*world.resource::<u64>(), 2);
2640        dag.run(&mut world, 3u32);
2641        assert_eq!(*world.resource::<u64>(), 3);
2642    }
2643
2644    // -- 3-way merge --
2645
2646    #[test]
2647    fn dag_3way_merge() {
2648        let mut wb = WorldBuilder::new();
2649        wb.register::<String>(String::new());
2650        let mut world = wb.build();
2651        let reg = world.registry();
2652
2653        fn root_id(x: u32) -> u64 {
2654            x as u64
2655        }
2656        fn mul1(val: &u64) -> u64 {
2657            *val
2658        }
2659        fn mul2(val: &u64) -> u64 {
2660            *val * 2
2661        }
2662        fn mul3(val: &u64) -> u64 {
2663            *val * 3
2664        }
2665        fn merge3_fmt(mut out: ResMut<String>, a: &u64, b: &u64, c: &u64) {
2666            *out = format!("{},{},{}", a, b, c);
2667        }
2668
2669        let mut dag = DagBuilder::<u32>::new()
2670            .root(root_id, reg)
2671            .fork()
2672            .arm(|a| a.then(mul1, reg))
2673            .arm(|b| b.then(mul2, reg))
2674            .arm(|c| c.then(mul3, reg))
2675            .merge(merge3_fmt, reg)
2676            .build();
2677
2678        dag.run(&mut world, 10u32);
2679        assert_eq!(world.resource::<String>().as_str(), "10,20,30");
2680    }
2681
2682    // -- DAG combinators --
2683
2684    #[test]
2685    fn dag_dispatch() {
2686        fn root(x: u32) -> u64 {
2687            x as u64 + 42
2688        }
2689        fn sink(mut out: ResMut<u64>, event: u64) {
2690            *out = event;
2691        }
2692        let mut wb = WorldBuilder::new();
2693        wb.register::<u64>(0);
2694        let mut world = wb.build();
2695        let reg = world.registry();
2696
2697        let mut dag = DagBuilder::<u32>::new()
2698            .root(root, reg)
2699            .dispatch(sink.into_handler(reg))
2700            .build();
2701
2702        dag.run(&mut world, 0u32);
2703        assert_eq!(*world.resource::<u64>(), 42);
2704    }
2705
2706    #[test]
2707    fn dag_option_map() {
2708        fn root(_x: u32) -> Option<u64> {
2709            Some(10)
2710        }
2711        fn double(val: &u64) -> u64 {
2712            *val * 2
2713        }
2714        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2715            *out = val.unwrap_or(0);
2716        }
2717        let mut wb = WorldBuilder::new();
2718        wb.register::<u64>(0);
2719        let mut world = wb.build();
2720        let reg = world.registry();
2721
2722        let mut dag = DagBuilder::<u32>::new()
2723            .root(root, reg)
2724            .map(double, reg)
2725            .then(sink, reg)
2726            .build();
2727
2728        dag.run(&mut world, 0u32);
2729        assert_eq!(*world.resource::<u64>(), 20);
2730    }
2731
2732    #[test]
2733    fn dag_option_map_none() {
2734        fn root(_x: u32) -> Option<u64> {
2735            None
2736        }
2737        fn double(val: &u64) -> u64 {
2738            *val * 2
2739        }
2740        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2741            *out = val.unwrap_or(999);
2742        }
2743        let mut wb = WorldBuilder::new();
2744        wb.register::<u64>(0);
2745        let mut world = wb.build();
2746        let reg = world.registry();
2747
2748        let mut dag = DagBuilder::<u32>::new()
2749            .root(root, reg)
2750            .map(double, reg)
2751            .then(sink, reg)
2752            .build();
2753
2754        dag.run(&mut world, 0u32);
2755        assert_eq!(*world.resource::<u64>(), 999);
2756    }
2757
2758    #[test]
2759    fn dag_option_and_then() {
2760        fn root(_x: u32) -> Option<u64> {
2761            Some(5)
2762        }
2763        fn check(val: &u64) -> Option<u64> {
2764            if *val > 3 { Some(*val * 10) } else { None }
2765        }
2766        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2767            *out = val.unwrap_or(0);
2768        }
2769        let mut wb = WorldBuilder::new();
2770        wb.register::<u64>(0);
2771        let mut world = wb.build();
2772        let reg = world.registry();
2773
2774        let mut dag = DagBuilder::<u32>::new()
2775            .root(root, reg)
2776            .and_then(check, reg)
2777            .then(sink, reg)
2778            .build();
2779
2780        dag.run(&mut world, 0u32);
2781        assert_eq!(*world.resource::<u64>(), 50);
2782    }
2783
2784    #[test]
2785    fn dag_option_filter_keeps() {
2786        fn root(_x: u32) -> Option<u64> {
2787            Some(5)
2788        }
2789        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2790            *out = val.unwrap_or(0);
2791        }
2792        let mut wb = WorldBuilder::new();
2793        wb.register::<u64>(0);
2794        let mut world = wb.build();
2795
2796        let mut dag = DagBuilder::<u32>::new()
2797            .root(root, world.registry())
2798            .filter(|v: &u64| *v > 3, world.registry())
2799            .then(sink, world.registry())
2800            .build();
2801
2802        dag.run(&mut world, 0u32);
2803        assert_eq!(*world.resource::<u64>(), 5);
2804    }
2805
2806    #[test]
2807    fn dag_option_filter_drops() {
2808        fn root(_x: u32) -> Option<u64> {
2809            Some(5)
2810        }
2811        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2812            *out = val.unwrap_or(0);
2813        }
2814        let mut wb = WorldBuilder::new();
2815        wb.register::<u64>(0);
2816        let mut world = wb.build();
2817
2818        let mut dag = DagBuilder::<u32>::new()
2819            .root(root, world.registry())
2820            .filter(|v: &u64| *v > 10, world.registry())
2821            .then(sink, world.registry())
2822            .build();
2823
2824        dag.run(&mut world, 0u32);
2825        assert_eq!(*world.resource::<u64>(), 0);
2826    }
2827
2828    #[test]
2829    fn dag_option_on_none() {
2830        fn root(_x: u32) -> Option<u64> {
2831            None
2832        }
2833        fn sink(_val: &Option<u64>) {}
2834        let mut wb = WorldBuilder::new();
2835        wb.register::<bool>(false);
2836        let mut world = wb.build();
2837        let reg = world.registry();
2838
2839        let mut dag = DagBuilder::<u32>::new()
2840            .root(root, reg)
2841            .on_none(
2842                |w: &mut World| {
2843                    *w.resource_mut::<bool>() = true;
2844                },
2845                reg,
2846            )
2847            .then(sink, reg)
2848            .build();
2849
2850        dag.run(&mut world, 0u32);
2851        assert!(*world.resource::<bool>());
2852    }
2853
2854    #[test]
2855    fn dag_option_unwrap_or() {
2856        fn root(_x: u32) -> Option<u64> {
2857            None
2858        }
2859        fn sink(mut out: ResMut<u64>, val: &u64) {
2860            *out = *val;
2861        }
2862        let mut wb = WorldBuilder::new();
2863        wb.register::<u64>(0);
2864        let mut world = wb.build();
2865        let reg = world.registry();
2866
2867        let mut dag = DagBuilder::<u32>::new()
2868            .root(root, reg)
2869            .unwrap_or(42u64)
2870            .then(sink, reg)
2871            .build();
2872
2873        dag.run(&mut world, 0u32);
2874        assert_eq!(*world.resource::<u64>(), 42);
2875    }
2876
2877    #[test]
2878    fn dag_option_ok_or() {
2879        fn root(_x: u32) -> Option<u64> {
2880            None
2881        }
2882        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2883            *out = val.as_ref().map_or(999, |v| *v);
2884        }
2885        let mut wb = WorldBuilder::new();
2886        wb.register::<u64>(0);
2887        let mut world = wb.build();
2888        let reg = world.registry();
2889
2890        let mut dag = DagBuilder::<u32>::new()
2891            .root(root, reg)
2892            .ok_or("missing")
2893            .then(sink, reg)
2894            .build();
2895
2896        dag.run(&mut world, 0u32);
2897        assert_eq!(*world.resource::<u64>(), 999);
2898    }
2899
2900    #[test]
2901    fn dag_result_map() {
2902        fn root(_x: u32) -> Result<u64, &'static str> {
2903            Ok(10)
2904        }
2905        fn double(val: &u64) -> u64 {
2906            *val * 2
2907        }
2908        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2909            *out = val.as_ref().copied().unwrap_or(0);
2910        }
2911        let mut wb = WorldBuilder::new();
2912        wb.register::<u64>(0);
2913        let mut world = wb.build();
2914        let reg = world.registry();
2915
2916        let mut dag = DagBuilder::<u32>::new()
2917            .root(root, reg)
2918            .map(double, reg)
2919            .then(sink, reg)
2920            .build();
2921
2922        dag.run(&mut world, 0u32);
2923        assert_eq!(*world.resource::<u64>(), 20);
2924    }
2925
2926    #[test]
2927    fn dag_result_and_then() {
2928        fn root(_x: u32) -> Result<u64, &'static str> {
2929            Ok(5)
2930        }
2931        fn check(val: &u64) -> Result<u64, &'static str> {
2932            if *val > 3 {
2933                Ok(*val * 10)
2934            } else {
2935                Err("too small")
2936            }
2937        }
2938        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2939            *out = val.as_ref().copied().unwrap_or(0);
2940        }
2941        let mut wb = WorldBuilder::new();
2942        wb.register::<u64>(0);
2943        let mut world = wb.build();
2944        let reg = world.registry();
2945
2946        let mut dag = DagBuilder::<u32>::new()
2947            .root(root, reg)
2948            .and_then(check, reg)
2949            .then(sink, reg)
2950            .build();
2951
2952        dag.run(&mut world, 0u32);
2953        assert_eq!(*world.resource::<u64>(), 50);
2954    }
2955
2956    #[test]
2957    fn dag_result_catch() {
2958        fn root(_x: u32) -> Result<u64, String> {
2959            Err("oops".into())
2960        }
2961        fn handle_err(mut log: ResMut<String>, err: &String) {
2962            *log = err.clone();
2963        }
2964        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2965            *out = val.unwrap_or(0);
2966        }
2967        let mut wb = WorldBuilder::new();
2968        wb.register::<u64>(0);
2969        wb.register::<String>(String::new());
2970        let mut world = wb.build();
2971        let reg = world.registry();
2972
2973        let mut dag = DagBuilder::<u32>::new()
2974            .root(root, reg)
2975            .catch(handle_err, reg)
2976            .then(sink, reg)
2977            .build();
2978
2979        dag.run(&mut world, 0u32);
2980        assert_eq!(*world.resource::<u64>(), 0);
2981        assert_eq!(world.resource::<String>().as_str(), "oops");
2982    }
2983
2984    #[test]
2985    fn dag_result_ok() {
2986        fn root(_x: u32) -> Result<u64, &'static str> {
2987            Err("fail")
2988        }
2989        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2990            *out = val.unwrap_or(0);
2991        }
2992        let mut wb = WorldBuilder::new();
2993        wb.register::<u64>(0);
2994        let mut world = wb.build();
2995        let reg = world.registry();
2996
2997        let mut dag = DagBuilder::<u32>::new()
2998            .root(root, reg)
2999            .ok()
3000            .then(sink, reg)
3001            .build();
3002
3003        dag.run(&mut world, 0u32);
3004        assert_eq!(*world.resource::<u64>(), 0);
3005    }
3006
3007    #[test]
3008    fn dag_result_unwrap_or_else() {
3009        fn root(_x: u32) -> Result<u64, &'static str> {
3010            Err("fail")
3011        }
3012        fn sink(mut out: ResMut<u64>, val: &u64) {
3013            *out = *val;
3014        }
3015        let mut wb = WorldBuilder::new();
3016        wb.register::<u64>(0);
3017        let mut world = wb.build();
3018        let reg = world.registry();
3019
3020        let mut dag = DagBuilder::<u32>::new()
3021            .root(root, reg)
3022            .unwrap_or_else(|_err: &str| 42u64, reg)
3023            .then(sink, reg)
3024            .build();
3025
3026        dag.run(&mut world, 0u32);
3027        assert_eq!(*world.resource::<u64>(), 42);
3028    }
3029
3030    #[test]
3031    fn dag_result_map_err() {
3032        fn root(_x: u32) -> Result<u64, u32> {
3033            Err(5)
3034        }
3035        fn sink(mut out: ResMut<u64>, val: &Result<u64, String>) {
3036            *out = match val {
3037                Ok(v) => *v,
3038                Err(e) => e.len() as u64,
3039            };
3040        }
3041        let mut wb = WorldBuilder::new();
3042        wb.register::<u64>(0);
3043        let mut world = wb.build();
3044        let reg = world.registry();
3045
3046        let mut dag = DagBuilder::<u32>::new()
3047            .root(root, reg)
3048            .map_err(|e: u32| format!("err:{e}"), reg)
3049            .then(sink, reg)
3050            .build();
3051
3052        dag.run(&mut world, 0u32);
3053        // "err:5".len() == 5
3054        assert_eq!(*world.resource::<u64>(), 5);
3055    }
3056
3057    #[test]
3058    fn dag_arm_combinators() {
3059        fn root(x: u32) -> u64 {
3060            x as u64 + 10
3061        }
3062        fn arm_step(val: &u64) -> Option<u64> {
3063            if *val > 5 { Some(*val * 3) } else { None }
3064        }
3065        fn double(val: &u64) -> u64 {
3066            *val * 2
3067        }
3068        fn merge_fn(a: &u64, b: &u64) -> String {
3069            format!("{a},{b}")
3070        }
3071        fn sink(mut out: ResMut<String>, val: &String) {
3072            *out = val.clone();
3073        }
3074        let mut wb = WorldBuilder::new();
3075        wb.register::<String>(String::new());
3076        let mut world = wb.build();
3077        let reg = world.registry();
3078
3079        // Arm 0: root → arm_step (Option) → unwrap_or(0)
3080        // Arm 1: root → double
3081        let mut dag = DagBuilder::<u32>::new()
3082            .root(root, reg)
3083            .fork()
3084            .arm(|a| a.then(arm_step, reg).unwrap_or(0u64))
3085            .arm(|b| b.then(double, reg))
3086            .merge(merge_fn, reg)
3087            .then(sink, reg)
3088            .build();
3089
3090        dag.run(&mut world, 0u32);
3091        // root(0) = 10
3092        // arm0: 10 > 5 → Some(30) → unwrap → 30
3093        // arm1: 10 * 2 = 20
3094        assert_eq!(world.resource::<String>().as_str(), "30,20");
3095    }
3096
3097    #[test]
3098    fn dag_option_inspect() {
3099        fn root(_x: u32) -> Option<u64> {
3100            Some(42)
3101        }
3102        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3103            *out = val.unwrap_or(0);
3104        }
3105        let mut wb = WorldBuilder::new();
3106        wb.register::<u64>(0);
3107        wb.register::<bool>(false);
3108        let mut world = wb.build();
3109        let reg = world.registry();
3110
3111        let mut dag = DagBuilder::<u32>::new()
3112            .root(root, reg)
3113            .inspect(
3114                |w: &mut World, _val: &u64| {
3115                    *w.resource_mut::<bool>() = true;
3116                },
3117                reg,
3118            )
3119            .then(sink, reg)
3120            .build();
3121
3122        dag.run(&mut world, 0u32);
3123        assert_eq!(*world.resource::<u64>(), 42);
3124        assert!(*world.resource::<bool>());
3125    }
3126
3127    // -- Guard combinator --
3128
3129    #[test]
3130    fn dag_guard_keeps() {
3131        fn root(x: u32) -> u64 {
3132            x as u64
3133        }
3134        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3135            *out = val.unwrap_or(0);
3136        }
3137        let mut wb = WorldBuilder::new();
3138        wb.register::<u64>(0);
3139        let mut world = wb.build();
3140        let reg = world.registry();
3141
3142        let mut dag = DagBuilder::<u32>::new()
3143            .root(root, reg)
3144            .guard(|v: &u64| *v > 3, reg)
3145            .then(sink, reg)
3146            .build();
3147
3148        dag.run(&mut world, 5u32);
3149        assert_eq!(*world.resource::<u64>(), 5);
3150    }
3151
3152    #[test]
3153    fn dag_guard_drops() {
3154        fn root(x: u32) -> u64 {
3155            x as u64
3156        }
3157        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3158            *out = val.unwrap_or(999);
3159        }
3160        let mut wb = WorldBuilder::new();
3161        wb.register::<u64>(0);
3162        let mut world = wb.build();
3163        let reg = world.registry();
3164
3165        let mut dag = DagBuilder::<u32>::new()
3166            .root(root, reg)
3167            .guard(|v: &u64| *v > 10, reg)
3168            .then(sink, reg)
3169            .build();
3170
3171        dag.run(&mut world, 5u32);
3172        assert_eq!(*world.resource::<u64>(), 999);
3173    }
3174
3175    #[test]
3176    fn dag_arm_guard() {
3177        fn root(x: u32) -> u64 {
3178            x as u64
3179        }
3180        fn double(val: &u64) -> u64 {
3181            *val * 2
3182        }
3183        fn merge_fn(a: &Option<u64>, b: &u64) -> String {
3184            format!("{:?},{}", a, b)
3185        }
3186        fn sink(mut out: ResMut<String>, val: &String) {
3187            *out = val.clone();
3188        }
3189        let mut wb = WorldBuilder::new();
3190        wb.register::<String>(String::new());
3191        let mut world = wb.build();
3192        let reg = world.registry();
3193
3194        // arm_a: guard drops (5 < 10), arm_b: runs normally
3195        let mut dag = DagBuilder::<u32>::new()
3196            .root(root, reg)
3197            .fork()
3198            .arm(|a| a.then(double, reg).guard(|v: &u64| *v > 100, reg))
3199            .arm(|b| b.then(double, reg))
3200            .merge(merge_fn, reg)
3201            .then(sink, reg)
3202            .build();
3203
3204        dag.run(&mut world, 5u32);
3205        // arm_a: 10, guard fails → None. arm_b: 10.
3206        assert_eq!(world.resource::<String>().as_str(), "None,10");
3207    }
3208
3209    // -- Tap combinator --
3210
3211    #[test]
3212    fn dag_tap_observes_without_changing() {
3213        fn root(x: u32) -> u64 {
3214            x as u64 * 2
3215        }
3216        fn sink(mut out: ResMut<u64>, val: &u64) {
3217            *out = *val;
3218        }
3219        let mut wb = WorldBuilder::new();
3220        wb.register::<u64>(0);
3221        wb.register::<bool>(false);
3222        let mut world = wb.build();
3223        let reg = world.registry();
3224
3225        let mut dag = DagBuilder::<u32>::new()
3226            .root(root, reg)
3227            .tap(
3228                |w: &mut World, val: &u64| {
3229                    // Side-effect: record that we observed the value.
3230                    *w.resource_mut::<bool>() = *val == 10;
3231                },
3232                reg,
3233            )
3234            .then(sink, reg)
3235            .build();
3236
3237        dag.run(&mut world, 5u32);
3238        assert_eq!(*world.resource::<u64>(), 10); // value passed through
3239        assert!(*world.resource::<bool>()); // tap fired
3240    }
3241
3242    #[test]
3243    fn dag_arm_tap() {
3244        fn root(x: u32) -> u64 {
3245            x as u64
3246        }
3247        fn double(val: &u64) -> u64 {
3248            *val * 2
3249        }
3250        fn merge_add(a: &u64, b: &u64) -> u64 {
3251            *a + *b
3252        }
3253        fn sink(mut out: ResMut<u64>, val: &u64) {
3254            *out = *val;
3255        }
3256        let mut wb = WorldBuilder::new();
3257        wb.register::<u64>(0);
3258        wb.register::<bool>(false);
3259        let mut world = wb.build();
3260        let reg = world.registry();
3261
3262        let mut dag = DagBuilder::<u32>::new()
3263            .root(root, reg)
3264            .fork()
3265            .arm(|a| {
3266                a.then(double, reg).tap(
3267                    |w: &mut World, _v: &u64| {
3268                        *w.resource_mut::<bool>() = true;
3269                    },
3270                    reg,
3271                )
3272            })
3273            .arm(|b| b.then(double, reg))
3274            .merge(merge_add, reg)
3275            .then(sink, reg)
3276            .build();
3277
3278        dag.run(&mut world, 5u32);
3279        // arm_a: 10, arm_b: 10, merge: 20
3280        assert_eq!(*world.resource::<u64>(), 20);
3281        assert!(*world.resource::<bool>()); // tap in arm_a fired
3282    }
3283
3284    // -- Route combinator --
3285
3286    #[test]
3287    fn dag_route_true_arm() {
3288        fn root(x: u32) -> u64 {
3289            x as u64
3290        }
3291        fn double(val: &u64) -> u64 {
3292            *val * 2
3293        }
3294        fn triple(val: &u64) -> u64 {
3295            *val * 3
3296        }
3297        fn sink(mut out: ResMut<u64>, val: &u64) {
3298            *out = *val;
3299        }
3300        let mut wb = WorldBuilder::new();
3301        wb.register::<u64>(0);
3302        let mut world = wb.build();
3303        let reg = world.registry();
3304
3305        let arm_t = DagArmSeed::new().then(double, reg);
3306        let arm_f = DagArmSeed::new().then(triple, reg);
3307
3308        let mut dag = DagBuilder::<u32>::new()
3309            .root(root, reg)
3310            .route(|v: &u64| *v > 3, reg, arm_t, arm_f)
3311            .then(sink, reg)
3312            .build();
3313
3314        dag.run(&mut world, 5u32); // 5 > 3 → true arm → double → 10
3315        assert_eq!(*world.resource::<u64>(), 10);
3316    }
3317
3318    #[test]
3319    fn dag_route_false_arm() {
3320        fn root(x: u32) -> u64 {
3321            x as u64
3322        }
3323        fn double(val: &u64) -> u64 {
3324            *val * 2
3325        }
3326        fn triple(val: &u64) -> u64 {
3327            *val * 3
3328        }
3329        fn sink(mut out: ResMut<u64>, val: &u64) {
3330            *out = *val;
3331        }
3332        let mut wb = WorldBuilder::new();
3333        wb.register::<u64>(0);
3334        let mut world = wb.build();
3335        let reg = world.registry();
3336
3337        let arm_t = DagArmSeed::new().then(double, reg);
3338        let arm_f = DagArmSeed::new().then(triple, reg);
3339
3340        let mut dag = DagBuilder::<u32>::new()
3341            .root(root, reg)
3342            .route(|v: &u64| *v > 10, reg, arm_t, arm_f)
3343            .then(sink, reg)
3344            .build();
3345
3346        dag.run(&mut world, 5u32); // 5 <= 10 → false arm → triple → 15
3347        assert_eq!(*world.resource::<u64>(), 15);
3348    }
3349
3350    #[test]
3351    fn dag_route_nested() {
3352        fn root(x: u32) -> u64 {
3353            x as u64
3354        }
3355        fn pass(val: &u64) -> u64 {
3356            *val
3357        }
3358        fn add_100(val: &u64) -> u64 {
3359            *val + 100
3360        }
3361        fn add_200(val: &u64) -> u64 {
3362            *val + 200
3363        }
3364        fn add_300(val: &u64) -> u64 {
3365            *val + 300
3366        }
3367        fn sink(mut out: ResMut<u64>, val: &u64) {
3368            *out = *val;
3369        }
3370        let mut wb = WorldBuilder::new();
3371        wb.register::<u64>(0);
3372        let mut world = wb.build();
3373        let reg = world.registry();
3374
3375        // N-ary via nesting: <5 → +100, 5..10 → +200, >=10 → +300
3376        let inner_t = DagArmSeed::new().then(add_200, reg);
3377        let inner_f = DagArmSeed::new().then(add_300, reg);
3378        let outer_t = DagArmSeed::new().then(add_100, reg);
3379        let outer_f =
3380            DagArmSeed::new()
3381                .then(pass, reg)
3382                .route(|v: &u64| *v < 10, reg, inner_t, inner_f);
3383
3384        let mut dag = DagBuilder::<u32>::new()
3385            .root(root, reg)
3386            .route(|v: &u64| *v < 5, reg, outer_t, outer_f)
3387            .then(sink, reg)
3388            .build();
3389
3390        dag.run(&mut world, 3u32); // 3 < 5 → +100 → 103
3391        assert_eq!(*world.resource::<u64>(), 103);
3392
3393        dag.run(&mut world, 7u32); // 7 >= 5, 7 < 10 → +200 → 207
3394        assert_eq!(*world.resource::<u64>(), 207);
3395
3396        dag.run(&mut world, 15u32); // 15 >= 5, 15 >= 10 → +300 → 315
3397        assert_eq!(*world.resource::<u64>(), 315);
3398    }
3399
3400    // -- Tee combinator --
3401
3402    #[test]
3403    fn dag_tee_side_effect_chain() {
3404        fn root(x: u32) -> u64 {
3405            x as u64 * 2
3406        }
3407        fn log_step(mut counter: ResMut<u32>, _val: &u64) {
3408            *counter += 1;
3409        }
3410        fn sink(mut out: ResMut<u64>, val: &u64) {
3411            *out = *val;
3412        }
3413        let mut wb = WorldBuilder::new();
3414        wb.register::<u64>(0);
3415        wb.register::<u32>(0);
3416        let mut world = wb.build();
3417        let reg = world.registry();
3418
3419        let side = DagArmSeed::new().then(log_step, reg);
3420
3421        let mut dag = DagBuilder::<u32>::new()
3422            .root(root, reg)
3423            .tee(side)
3424            .then(sink, reg)
3425            .build();
3426
3427        dag.run(&mut world, 5u32);
3428        assert_eq!(*world.resource::<u64>(), 10); // value passed through
3429        assert_eq!(*world.resource::<u32>(), 1); // side-effect fired
3430
3431        dag.run(&mut world, 7u32);
3432        assert_eq!(*world.resource::<u64>(), 14);
3433        assert_eq!(*world.resource::<u32>(), 2); // fired again
3434    }
3435
3436    // -- Dedup combinator --
3437
3438    #[test]
3439    fn dag_dedup_suppresses_unchanged() {
3440        fn root(x: u32) -> u64 {
3441            x as u64 / 2 // intentional integer division: 4→2, 5→2
3442        }
3443        fn sink(mut out: ResMut<u32>, val: &Option<u64>) {
3444            if val.is_some() {
3445                *out += 1;
3446            }
3447        }
3448        let mut wb = WorldBuilder::new();
3449        wb.register::<u32>(0);
3450        let mut world = wb.build();
3451        let reg = world.registry();
3452
3453        let mut dag = DagBuilder::<u32>::new()
3454            .root(root, reg)
3455            .dedup()
3456            .then(sink, reg)
3457            .build();
3458
3459        dag.run(&mut world, 4u32); // 2 — first, Some
3460        assert_eq!(*world.resource::<u32>(), 1);
3461
3462        dag.run(&mut world, 5u32); // 2 — same, None
3463        assert_eq!(*world.resource::<u32>(), 1);
3464
3465        dag.run(&mut world, 6u32); // 3 — changed, Some
3466        assert_eq!(*world.resource::<u32>(), 2);
3467    }
3468
3469    // -- Bool combinators --
3470
3471    #[test]
3472    fn dag_not() {
3473        fn root(x: u32) -> bool {
3474            x > 5
3475        }
3476        fn sink(mut out: ResMut<bool>, val: &bool) {
3477            *out = *val;
3478        }
3479        let mut wb = WorldBuilder::new();
3480        wb.register::<bool>(false);
3481        let mut world = wb.build();
3482        let reg = world.registry();
3483
3484        let mut dag = DagBuilder::<u32>::new()
3485            .root(root, reg)
3486            .not()
3487            .then(sink, reg)
3488            .build();
3489
3490        dag.run(&mut world, 3u32); // 3 > 5 = false, not = true
3491        assert!(*world.resource::<bool>());
3492
3493        dag.run(&mut world, 10u32); // 10 > 5 = true, not = false
3494        assert!(!*world.resource::<bool>());
3495    }
3496
3497    #[test]
3498    fn dag_and() {
3499        fn root(x: u32) -> bool {
3500            x > 5
3501        }
3502        fn sink(mut out: ResMut<bool>, val: &bool) {
3503            *out = *val;
3504        }
3505        let mut wb = WorldBuilder::new();
3506        wb.register::<bool>(true); // "market open" flag
3507        let mut world = wb.build();
3508        let reg = world.registry();
3509
3510        let mut dag = DagBuilder::<u32>::new()
3511            .root(root, reg)
3512            .and(|w: &mut World| *w.resource::<bool>(), reg)
3513            .then(sink, reg)
3514            .build();
3515
3516        dag.run(&mut world, 10u32); // true && true = true
3517        assert!(*world.resource::<bool>());
3518
3519        *world.resource_mut::<bool>() = false; // close market
3520        dag.run(&mut world, 10u32); // true && false = false
3521        assert!(!*world.resource::<bool>());
3522    }
3523
3524    #[test]
3525    fn dag_or() {
3526        fn root(x: u32) -> bool {
3527            x > 5
3528        }
3529        fn sink(mut out: ResMut<bool>, val: &bool) {
3530            *out = *val;
3531        }
3532        let mut wb = WorldBuilder::new();
3533        wb.register::<bool>(false);
3534        let mut world = wb.build();
3535        let reg = world.registry();
3536
3537        let mut dag = DagBuilder::<u32>::new()
3538            .root(root, reg)
3539            .or(|w: &mut World| *w.resource::<bool>(), reg)
3540            .then(sink, reg)
3541            .build();
3542
3543        dag.run(&mut world, 3u32); // false || false = false
3544        assert!(!*world.resource::<bool>());
3545
3546        *world.resource_mut::<bool>() = true;
3547        dag.run(&mut world, 3u32); // false || true = true
3548        assert!(*world.resource::<bool>());
3549    }
3550
3551    #[test]
3552    fn dag_xor() {
3553        fn root(x: u32) -> bool {
3554            x > 5
3555        }
3556        fn sink(mut out: ResMut<bool>, val: &bool) {
3557            *out = *val;
3558        }
3559        let mut wb = WorldBuilder::new();
3560        wb.register::<bool>(true);
3561        let mut world = wb.build();
3562        let reg = world.registry();
3563
3564        let mut dag = DagBuilder::<u32>::new()
3565            .root(root, reg)
3566            .xor(|w: &mut World| *w.resource::<bool>(), reg)
3567            .then(sink, reg)
3568            .build();
3569
3570        dag.run(&mut world, 10u32); // true ^ true = false
3571        assert!(!*world.resource::<bool>());
3572    }
3573
3574    // =========================================================================
3575    // Splat — tuple destructuring
3576    // =========================================================================
3577
3578    #[test]
3579    fn dag_splat2_on_chain() {
3580        let mut wb = WorldBuilder::new();
3581        wb.register::<u64>(0);
3582        let mut world = wb.build();
3583        let reg = world.registry();
3584
3585        fn split(x: u32) -> (u32, u32) {
3586            (x, x * 2)
3587        }
3588        fn store(mut out: ResMut<u64>, a: &u32, b: &u32) {
3589            *out = *a as u64 + *b as u64;
3590        }
3591
3592        let mut dag = DagBuilder::<u32>::new()
3593            .root(split, reg)
3594            .splat()
3595            .then(store, reg)
3596            .build();
3597
3598        dag.run(&mut world, 5u32);
3599        assert_eq!(*world.resource::<u64>(), 15); // 5 + 10
3600    }
3601
3602    #[test]
3603    fn dag_splat3_on_chain() {
3604        let mut wb = WorldBuilder::new();
3605        wb.register::<u64>(0);
3606        let mut world = wb.build();
3607        let reg = world.registry();
3608
3609        fn split3(x: u32) -> (u32, u32, u32) {
3610            (x, x + 1, x + 2)
3611        }
3612        fn sum3(a: &u32, b: &u32, c: &u32) -> u64 {
3613            *a as u64 + *b as u64 + *c as u64
3614        }
3615        fn store(mut out: ResMut<u64>, val: &u64) {
3616            *out = *val;
3617        }
3618
3619        let mut dag = DagBuilder::<u32>::new()
3620            .root(split3, reg)
3621            .splat()
3622            .then(sum3, reg)
3623            .then(store, reg)
3624            .build();
3625
3626        dag.run(&mut world, 10u32);
3627        assert_eq!(*world.resource::<u64>(), 33); // 10+11+12
3628    }
3629
3630    #[test]
3631    fn dag_splat2_with_param() {
3632        let mut wb = WorldBuilder::new();
3633        wb.register::<u64>(100);
3634        let mut world = wb.build();
3635        let reg = world.registry();
3636
3637        fn split(x: u32) -> (u32, u32) {
3638            (x, x * 3)
3639        }
3640        fn add_base(base: Res<u64>, a: &u32, b: &u32) -> u64 {
3641            *base + *a as u64 + *b as u64
3642        }
3643        fn store(mut out: ResMut<u64>, val: &u64) {
3644            *out = *val;
3645        }
3646
3647        let mut dag = DagBuilder::<u32>::new()
3648            .root(split, reg)
3649            .splat()
3650            .then(add_base, reg)
3651            .then(store, reg)
3652            .build();
3653
3654        dag.run(&mut world, 5u32);
3655        assert_eq!(*world.resource::<u64>(), 120); // 100 + 5 + 15
3656    }
3657
3658    #[test]
3659    fn dag_splat_on_arm_start() {
3660        let mut wb = WorldBuilder::new();
3661        wb.register::<u64>(0);
3662        let mut world = wb.build();
3663        let reg = world.registry();
3664
3665        fn split(x: u32) -> (u32, u32) {
3666            (x, x + 10)
3667        }
3668        fn sum2(a: &u32, b: &u32) -> u64 {
3669            *a as u64 + *b as u64
3670        }
3671        fn identity(x: &(u32, u32)) -> u64 {
3672            x.0 as u64 * x.1 as u64
3673        }
3674        fn merge_add(a: &u64, b: &u64) -> u64 {
3675            *a + *b
3676        }
3677        fn store(mut out: ResMut<u64>, val: &u64) {
3678            *out = *val;
3679        }
3680
3681        let mut dag = DagBuilder::<u32>::new()
3682            .root(split, reg)
3683            .fork()
3684            .arm(|a| a.splat().then(sum2, reg))
3685            .arm(|b| b.then(identity, reg))
3686            .merge(merge_add, reg)
3687            .then(store, reg)
3688            .build();
3689
3690        dag.run(&mut world, 5u32);
3691        // arm_a: splat (5, 15) → sum2 = 20
3692        // arm_b: identity (5, 15) → 75
3693        // merge: 20 + 75 = 95
3694        assert_eq!(*world.resource::<u64>(), 95);
3695    }
3696
3697    #[test]
3698    fn dag_splat_on_arm() {
3699        let mut wb = WorldBuilder::new();
3700        wb.register::<u64>(0);
3701        let mut world = wb.build();
3702        let reg = world.registry();
3703
3704        fn root_id(x: u32) -> u32 {
3705            x
3706        }
3707        fn make_pair(val: &u32) -> (u32, u32) {
3708            (*val, *val + 100)
3709        }
3710        fn sum2(a: &u32, b: &u32) -> u64 {
3711            *a as u64 + *b as u64
3712        }
3713        fn double(val: &u32) -> u64 {
3714            *val as u64 * 2
3715        }
3716        fn merge_add(a: &u64, b: &u64) -> u64 {
3717            *a + *b
3718        }
3719        fn store(mut out: ResMut<u64>, val: &u64) {
3720            *out = *val;
3721        }
3722
3723        let mut dag = DagBuilder::<u32>::new()
3724            .root(root_id, reg)
3725            .fork()
3726            .arm(|a| a.then(make_pair, reg).splat().then(sum2, reg))
3727            .arm(|b| b.then(double, reg))
3728            .merge(merge_add, reg)
3729            .then(store, reg)
3730            .build();
3731
3732        dag.run(&mut world, 7u32);
3733        // arm_a: make_pair(7) = (7, 107), splat → sum2 = 114
3734        // arm_b: double(7) = 14
3735        // merge: 114 + 14 = 128
3736        assert_eq!(*world.resource::<u64>(), 128);
3737    }
3738
3739    #[test]
3740    fn dag_splat4_on_chain() {
3741        let mut wb = WorldBuilder::new();
3742        wb.register::<u64>(0);
3743        let mut world = wb.build();
3744        let reg = world.registry();
3745
3746        fn split4(x: u32) -> (u32, u32, u32, u32) {
3747            (x, x + 1, x + 2, x + 3)
3748        }
3749        fn sum4(a: &u32, b: &u32, c: &u32, d: &u32) -> u64 {
3750            (*a + *b + *c + *d) as u64
3751        }
3752        fn store(mut out: ResMut<u64>, val: &u64) {
3753            *out = *val;
3754        }
3755
3756        let mut dag = DagBuilder::<u32>::new()
3757            .root(split4, reg)
3758            .splat()
3759            .then(sum4, reg)
3760            .then(store, reg)
3761            .build();
3762
3763        dag.run(&mut world, 10u32);
3764        assert_eq!(*world.resource::<u64>(), 46); // 10+11+12+13
3765    }
3766
3767    #[test]
3768    fn dag_splat5_on_chain() {
3769        let mut wb = WorldBuilder::new();
3770        wb.register::<u64>(0);
3771        let mut world = wb.build();
3772        let reg = world.registry();
3773
3774        fn split5(x: u32) -> (u8, u8, u8, u8, u8) {
3775            let x = x as u8;
3776            (x, x + 1, x + 2, x + 3, x + 4)
3777        }
3778        #[allow(clippy::many_single_char_names)]
3779        fn sum5(a: &u8, b: &u8, c: &u8, d: &u8, e: &u8) -> u64 {
3780            (*a as u64) + (*b as u64) + (*c as u64) + (*d as u64) + (*e as u64)
3781        }
3782        fn store(mut out: ResMut<u64>, val: &u64) {
3783            *out = *val;
3784        }
3785
3786        let mut dag = DagBuilder::<u32>::new()
3787            .root(split5, reg)
3788            .splat()
3789            .then(sum5, reg)
3790            .then(store, reg)
3791            .build();
3792
3793        dag.run(&mut world, 1u32);
3794        assert_eq!(*world.resource::<u64>(), 15); // 1+2+3+4+5
3795    }
3796
3797    #[test]
3798    fn dag_splat_boxable() {
3799        let mut wb = WorldBuilder::new();
3800        wb.register::<u64>(0);
3801        let mut world = wb.build();
3802        let reg = world.registry();
3803
3804        fn split(x: u32) -> (u32, u32) {
3805            (x, x * 2)
3806        }
3807        fn store(mut out: ResMut<u64>, a: &u32, b: &u32) {
3808            *out = *a as u64 + *b as u64;
3809        }
3810
3811        let dag = DagBuilder::<u32>::new()
3812            .root(split, reg)
3813            .splat()
3814            .then(store, reg)
3815            .build();
3816
3817        let mut boxed: Virtual<u32> = Box::new(dag);
3818        boxed.run(&mut world, 5u32);
3819        assert_eq!(*world.resource::<u64>(), 15);
3820    }
3821
3822    // -- Batch DAG --
3823
3824    #[test]
3825    fn batch_dag_basic() {
3826        let mut wb = WorldBuilder::new();
3827        wb.register::<u64>(0);
3828        let mut world = wb.build();
3829        let reg = world.registry();
3830
3831        fn double(x: u32) -> u64 {
3832            x as u64 * 2
3833        }
3834        fn store(mut out: ResMut<u64>, val: &u64) {
3835            *out += *val;
3836        }
3837
3838        let mut batch = DagBuilder::<u32>::new()
3839            .root(double, reg)
3840            .then(store, reg)
3841            .build_batch(8);
3842
3843        batch.input_mut().extend([1, 2, 3]);
3844        batch.run(&mut world);
3845
3846        assert_eq!(*world.resource::<u64>(), 12); // 2 + 4 + 6
3847        assert!(batch.input().is_empty());
3848    }
3849
3850    #[test]
3851    fn batch_dag_option_terminal() {
3852        let mut wb = WorldBuilder::new();
3853        wb.register::<u64>(0);
3854        let mut world = wb.build();
3855        let reg = world.registry();
3856
3857        fn double(x: u32) -> u64 {
3858            x as u64 * 2
3859        }
3860        fn store(mut out: ResMut<u64>, val: &u64) {
3861            *out += *val;
3862        }
3863
3864        let mut batch = DagBuilder::<u32>::new()
3865            .root(double, reg)
3866            .guard(|val: &u64| *val > 5, reg)
3867            .map(store, reg)
3868            .unwrap_or(())
3869            .build_batch(8);
3870
3871        batch.input_mut().extend([1, 2, 3, 4, 5]);
3872        batch.run(&mut world);
3873
3874        // double: 2, 4, 6, 8, 10
3875        // guard keeps > 5: 6, 8, 10
3876        assert_eq!(*world.resource::<u64>(), 24); // 6 + 8 + 10
3877    }
3878
3879    #[test]
3880    fn batch_dag_buffer_reuse() {
3881        let mut wb = WorldBuilder::new();
3882        wb.register::<u64>(0);
3883        let mut world = wb.build();
3884        let reg = world.registry();
3885
3886        fn double(x: u32) -> u64 {
3887            x as u64 * 2
3888        }
3889        fn store(mut out: ResMut<u64>, val: &u64) {
3890            *out += *val;
3891        }
3892
3893        let mut batch = DagBuilder::<u32>::new()
3894            .root(double, reg)
3895            .then(store, reg)
3896            .build_batch(8);
3897
3898        batch.input_mut().extend([1, 2]);
3899        batch.run(&mut world);
3900        assert_eq!(*world.resource::<u64>(), 6); // 2 + 4
3901        assert!(batch.input().is_empty());
3902
3903        batch.input_mut().extend([10, 20]);
3904        batch.run(&mut world);
3905        assert_eq!(*world.resource::<u64>(), 66); // 6 + 20 + 40
3906    }
3907
3908    #[test]
3909    fn batch_dag_retains_allocation() {
3910        let mut world = WorldBuilder::new().build();
3911        let reg = world.registry();
3912
3913        fn noop(_x: u32) {}
3914
3915        let mut batch = DagBuilder::<u32>::new().root(noop, reg).build_batch(64);
3916
3917        batch.input_mut().extend([1, 2, 3]);
3918        batch.run(&mut world);
3919
3920        assert!(batch.input().is_empty());
3921        assert!(batch.input_mut().capacity() >= 64);
3922    }
3923
3924    #[test]
3925    fn batch_dag_empty_is_noop() {
3926        let mut wb = WorldBuilder::new();
3927        wb.register::<u64>(0);
3928        let mut world = wb.build();
3929        let reg = world.registry();
3930
3931        fn double(x: u32) -> u64 {
3932            x as u64 * 2
3933        }
3934        fn store(mut out: ResMut<u64>, val: &u64) {
3935            *out += *val;
3936        }
3937
3938        let mut batch = DagBuilder::<u32>::new()
3939            .root(double, reg)
3940            .then(store, reg)
3941            .build_batch(8);
3942
3943        batch.run(&mut world);
3944        assert_eq!(*world.resource::<u64>(), 0);
3945    }
3946
3947    #[test]
3948    fn batch_dag_with_splat() {
3949        let mut wb = WorldBuilder::new();
3950        wb.register::<u64>(0);
3951        let mut world = wb.build();
3952        let reg = world.registry();
3953
3954        fn split(x: u32) -> (u64, u64) {
3955            (x as u64, x as u64 * 10)
3956        }
3957        fn combine(a: &u64, b: &u64) -> u64 {
3958            *a + *b
3959        }
3960        fn store(mut out: ResMut<u64>, val: &u64) {
3961            *out += *val;
3962        }
3963
3964        let mut batch = DagBuilder::<u32>::new()
3965            .root(split, reg)
3966            .splat()
3967            .then(combine, reg)
3968            .then(store, reg)
3969            .build_batch(4);
3970
3971        batch.input_mut().extend([1, 2]);
3972        batch.run(&mut world);
3973
3974        // 1 → (1, 10) → 11, 2 → (2, 20) → 22
3975        assert_eq!(*world.resource::<u64>(), 33); // 11 + 22
3976    }
3977
3978    // -- Conditional then (formerly switch) --
3979
3980    #[test]
3981    fn dag_then_conditional_basic() {
3982        fn root(x: u32) -> u64 {
3983            x as u64
3984        }
3985        fn sink(mut out: ResMut<u64>, val: &u64) {
3986            *out = *val;
3987        }
3988
3989        let mut wb = WorldBuilder::new();
3990        wb.register::<u64>(0);
3991        let mut world = wb.build();
3992        let reg = world.registry();
3993
3994        let mut dag = DagBuilder::<u32>::new()
3995            .root(root, reg)
3996            .then(|val: &u64| if *val > 5 { *val * 10 } else { *val + 1 }, reg)
3997            .then(sink, reg)
3998            .build();
3999
4000        dag.run(&mut world, 10u32); // 10 > 5 → 100
4001        assert_eq!(*world.resource::<u64>(), 100);
4002
4003        dag.run(&mut world, 3u32); // 3 <= 5 → 4
4004        assert_eq!(*world.resource::<u64>(), 4);
4005    }
4006
4007    #[test]
4008    fn dag_then_conditional_3_way() {
4009        fn root(x: u32) -> u32 {
4010            x
4011        }
4012        fn sink(mut out: ResMut<u64>, val: &u64) {
4013            *out = *val;
4014        }
4015
4016        let mut wb = WorldBuilder::new();
4017        wb.register::<u64>(0);
4018        let mut world = wb.build();
4019        let reg = world.registry();
4020
4021        let mut dag = DagBuilder::<u32>::new()
4022            .root(root, reg)
4023            .then(
4024                |val: &u32| match *val % 3 {
4025                    0 => *val as u64 + 100,
4026                    1 => *val as u64 + 200,
4027                    _ => *val as u64 + 300,
4028                },
4029                reg,
4030            )
4031            .then(sink, reg)
4032            .build();
4033
4034        dag.run(&mut world, 6u32); // 6 % 3 == 0 → 106
4035        assert_eq!(*world.resource::<u64>(), 106);
4036
4037        dag.run(&mut world, 7u32); // 7 % 3 == 1 → 207
4038        assert_eq!(*world.resource::<u64>(), 207);
4039
4040        dag.run(&mut world, 8u32); // 8 % 3 == 2 → 308
4041        assert_eq!(*world.resource::<u64>(), 308);
4042    }
4043
4044    #[test]
4045    fn dag_then_with_resolve_arm() {
4046        fn root(x: u32) -> u32 {
4047            x
4048        }
4049        fn double(val: &u32) -> u64 {
4050            *val as u64 * 2
4051        }
4052        fn triple(val: &u32) -> u64 {
4053            *val as u64 * 3
4054        }
4055        fn sink(mut out: ResMut<u64>, val: &u64) {
4056            *out = *val;
4057        }
4058
4059        let mut wb = WorldBuilder::new();
4060        wb.register::<u64>(0);
4061        let mut world = wb.build();
4062        let reg = world.registry();
4063
4064        let mut arm_even = resolve_arm(double, reg);
4065        let mut arm_odd = resolve_arm(triple, reg);
4066
4067        let mut dag = DagBuilder::<u32>::new()
4068            .root(root, reg)
4069            .then(
4070                move |world: &mut World, val: &u32| {
4071                    if *val % 2 == 0 {
4072                        arm_even(world, val)
4073                    } else {
4074                        arm_odd(world, val)
4075                    }
4076                },
4077                reg,
4078            )
4079            .then(sink, reg)
4080            .build();
4081
4082        dag.run(&mut world, 4u32); // even → double → 8
4083        assert_eq!(*world.resource::<u64>(), 8);
4084
4085        dag.run(&mut world, 5u32); // odd → triple → 15
4086        assert_eq!(*world.resource::<u64>(), 15);
4087    }
4088
4089    #[test]
4090    fn dag_resolve_arm_with_params() {
4091        fn root(x: u32) -> u32 {
4092            x
4093        }
4094        fn add_offset(offset: Res<i64>, val: &u32) -> u64 {
4095            (*offset + *val as i64) as u64
4096        }
4097        fn plain_double(val: &u32) -> u64 {
4098            *val as u64 * 2
4099        }
4100        fn sink(mut out: ResMut<u64>, val: &u64) {
4101            *out = *val;
4102        }
4103
4104        let mut wb = WorldBuilder::new();
4105        wb.register::<u64>(0);
4106        wb.register::<i64>(100);
4107        let mut world = wb.build();
4108        let reg = world.registry();
4109
4110        // Each arm resolves different params
4111        let mut arm_offset = resolve_arm(add_offset, reg);
4112        let mut arm_double = resolve_arm(plain_double, reg);
4113
4114        let mut dag = DagBuilder::<u32>::new()
4115            .root(root, reg)
4116            .then(
4117                move |world: &mut World, val: &u32| {
4118                    if *val > 10 {
4119                        arm_offset(world, val)
4120                    } else {
4121                        arm_double(world, val)
4122                    }
4123                },
4124                reg,
4125            )
4126            .then(sink, reg)
4127            .build();
4128
4129        dag.run(&mut world, 20u32); // > 10 → add_offset → 100 + 20 = 120
4130        assert_eq!(*world.resource::<u64>(), 120);
4131
4132        dag.run(&mut world, 5u32); // <= 10 → double → 10
4133        assert_eq!(*world.resource::<u64>(), 10);
4134    }
4135
4136    #[test]
4137    fn dag_then_conditional_in_fork_arm() {
4138        fn root(x: u32) -> u32 {
4139            x
4140        }
4141        fn pass(val: &u32) -> u32 {
4142            *val
4143        }
4144        fn sink_u64(mut out: ResMut<u64>, val: &u64) {
4145            *out = *val;
4146        }
4147        fn sink_i64(mut out: ResMut<i64>, val: &u32) {
4148            *out = -(*val as i64);
4149        }
4150
4151        let mut wb = WorldBuilder::new();
4152        wb.register::<u64>(0);
4153        wb.register::<i64>(0);
4154        let mut world = wb.build();
4155        let reg = world.registry();
4156
4157        let mut dag = DagBuilder::<u32>::new()
4158            .root(root, reg)
4159            .fork()
4160            .arm(|a| {
4161                a.then(pass, reg)
4162                    .then(
4163                        |val: &u32| {
4164                            if *val > 5 {
4165                                *val as u64 * 10
4166                            } else {
4167                                *val as u64
4168                            }
4169                        },
4170                        reg,
4171                    )
4172                    .then(sink_u64, reg)
4173            })
4174            .arm(|a| a.then(sink_i64, reg))
4175            .join()
4176            .build();
4177
4178        dag.run(&mut world, 10u32); // arm0: 10 > 5 → 100, arm1: -10
4179        assert_eq!(*world.resource::<u64>(), 100);
4180        assert_eq!(*world.resource::<i64>(), -10);
4181
4182        dag.run(&mut world, 3u32); // arm0: 3 <= 5 → 3, arm1: -3
4183        assert_eq!(*world.resource::<u64>(), 3);
4184        assert_eq!(*world.resource::<i64>(), -3);
4185    }
4186
4187    #[test]
4188    fn batch_dag_then_conditional() {
4189        fn root(x: u32) -> u32 {
4190            x
4191        }
4192        fn sink(mut out: ResMut<u64>, val: &u64) {
4193            *out += *val;
4194        }
4195
4196        let mut wb = WorldBuilder::new();
4197        wb.register::<u64>(0);
4198        let mut world = wb.build();
4199        let reg = world.registry();
4200
4201        let mut batch = DagBuilder::<u32>::new()
4202            .root(root, reg)
4203            .then(
4204                |val: &u32| {
4205                    if *val % 2 == 0 {
4206                        *val as u64 * 10
4207                    } else {
4208                        *val as u64
4209                    }
4210                },
4211                reg,
4212            )
4213            .then(sink, reg)
4214            .build_batch(8);
4215
4216        batch.input_mut().extend([1, 2, 3, 4]);
4217        batch.run(&mut world);
4218
4219        // 1 → 1, 2 → 20, 3 → 3, 4 → 40 = 64
4220        assert_eq!(*world.resource::<u64>(), 64);
4221    }
4222
4223    // =========================================================================
4224    // Scan combinator (DAG)
4225    // =========================================================================
4226
4227    #[test]
4228    fn dag_scan_arity0_closure() {
4229        let mut wb = WorldBuilder::new();
4230        wb.register::<u64>(0);
4231        let mut world = wb.build();
4232        let reg = world.registry();
4233
4234        fn store(mut out: ResMut<u64>, val: &u64) {
4235            *out = *val;
4236        }
4237
4238        let mut dag = DagBuilder::<u64>::new()
4239            .root(|x: u64| x, reg)
4240            .scan(
4241                0u64,
4242                |acc: &mut u64, val: &u64| {
4243                    *acc += val;
4244                    *acc
4245                },
4246                reg,
4247            )
4248            .then(store, reg)
4249            .build();
4250
4251        dag.run(&mut world, 10);
4252        assert_eq!(*world.resource::<u64>(), 10);
4253        dag.run(&mut world, 20);
4254        assert_eq!(*world.resource::<u64>(), 30);
4255        dag.run(&mut world, 5);
4256        assert_eq!(*world.resource::<u64>(), 35);
4257    }
4258
4259    #[test]
4260    fn dag_scan_named_fn_with_param() {
4261        let mut wb = WorldBuilder::new();
4262        wb.register::<u64>(100);
4263        wb.register::<String>(String::new());
4264        let mut world = wb.build();
4265        let reg = world.registry();
4266
4267        fn threshold(limit: Res<u64>, acc: &mut u64, val: &u64) -> Option<u64> {
4268            *acc += val;
4269            if *acc > *limit { Some(*acc) } else { None }
4270        }
4271        fn store_opt(mut out: ResMut<String>, val: &Option<u64>) {
4272            *out = val
4273                .as_ref()
4274                .map_or_else(|| "below".into(), |v| format!("hit:{v}"));
4275        }
4276
4277        let mut dag = DagBuilder::<u64>::new()
4278            .root(|x: u64| x, reg)
4279            .scan(0u64, threshold, reg)
4280            .then(store_opt, reg)
4281            .build();
4282
4283        dag.run(&mut world, 50);
4284        assert_eq!(world.resource::<String>().as_str(), "below");
4285        dag.run(&mut world, 60);
4286        assert_eq!(world.resource::<String>().as_str(), "hit:110");
4287    }
4288
4289    #[test]
4290    fn dag_arm_scan() {
4291        let mut wb = WorldBuilder::new();
4292        wb.register::<u64>(0);
4293        let mut world = wb.build();
4294        let reg = world.registry();
4295
4296        fn store(mut out: ResMut<u64>, val: &u64) {
4297            *out = *val;
4298        }
4299
4300        let scan_arm = DagArmSeed::<u64>::new()
4301            .then(|v: &u64| *v, reg)
4302            .scan(
4303                0u64,
4304                |acc: &mut u64, val: &u64| {
4305                    *acc += val;
4306                    *acc
4307                },
4308                reg,
4309            )
4310            .then(store, reg);
4311
4312        let pass_arm = DagArmSeed::<u64>::new().then(|_: &u64| {}, reg);
4313
4314        let mut dag = DagBuilder::<u64>::new()
4315            .root(|x: u64| x, reg)
4316            .fork()
4317            .arm(|_| scan_arm)
4318            .arm(|_| pass_arm)
4319            .merge(|(): &(), (): &()| {}, reg)
4320            .build();
4321
4322        dag.run(&mut world, 10);
4323        assert_eq!(*world.resource::<u64>(), 10);
4324        dag.run(&mut world, 20);
4325        assert_eq!(*world.resource::<u64>(), 30);
4326    }
4327
4328    // =========================================================================
4329    // Build — Option<()> terminal
4330    // =========================================================================
4331
4332    #[test]
4333    fn build_option_unit_terminal() {
4334        let mut wb = WorldBuilder::new();
4335        wb.register::<u64>(0);
4336        let mut world = wb.build();
4337        let reg = world.registry();
4338
4339        // root takes by value (IntoStep), then .guard() produces Option
4340        fn check(x: u32) -> u64 {
4341            x as u64
4342        }
4343        fn store(mut out: ResMut<u64>, val: &u64) {
4344            *out += *val;
4345        }
4346
4347        // guard → Option<u64>, map(store) → Option<()>, build() should work
4348        let mut dag = DagBuilder::<u32>::new()
4349            .root(check, reg)
4350            .guard(|val: &u64| *val > 5, reg)
4351            .map(store, reg)
4352            .build();
4353
4354        dag.run(&mut world, 3); // guard filters → None
4355        assert_eq!(*world.resource::<u64>(), 0);
4356        dag.run(&mut world, 7); // passes guard → stores 7
4357        assert_eq!(*world.resource::<u64>(), 7);
4358    }
4359
4360    // =========================================================================
4361    // Build — borrowed event type
4362    // =========================================================================
4363
4364    #[test]
4365    fn build_borrowed_event_direct() {
4366        let mut wb = WorldBuilder::new();
4367        wb.register::<u64>(0);
4368        let mut world = wb.build();
4369
4370        fn decode(msg: &[u8]) -> u64 {
4371            msg.len() as u64
4372        }
4373        fn store(mut out: ResMut<u64>, val: &u64) {
4374            *out = *val;
4375        }
4376
4377        // msg declared before dag so it outlives the DAG (drop order).
4378        let msg = vec![1u8, 2, 3];
4379        let reg = world.registry();
4380        let mut dag = DagBuilder::<&[u8]>::new()
4381            .root(decode, reg)
4382            .then(store, reg)
4383            .build();
4384
4385        dag.run(&mut world, &msg);
4386        assert_eq!(*world.resource::<u64>(), 3);
4387    }
4388}