Skip to main content

nexus_rt/
dag.rs

1// Builder return types use complex generics for compile-time edge validation.
2#![allow(clippy::type_complexity)]
3
4//! DAG pipeline — monomorphized data-flow graphs with fan-out and merge.
5//!
6//! [`DagStart`] begins a typed DAG that encodes topology in the type system.
7//! After monomorphization, the entire DAG is a single flat function with
8//! all values as stack locals — no arena, no vtable dispatch. The only
9//! `unsafe` is in the shared [`Param::fetch`](crate::Param) path
10//! (resource access by pre-resolved index).
11//!
12//! Nodes receive their input **by reference** — fan-out is free (multiple
13//! arms borrow the same stack local). Nodes produce owned output values
14//! passed to the next step.
15//!
16//! # When to use
17//!
18//! Use DAG pipelines when data needs to fan out to multiple arms and
19//! merge back. For linear chains, prefer [`PipelineStart`](crate::PipelineStart).
20//! For dynamic fan-out by reference, use [`FanOut`](crate::FanOut) or
21//! [`Broadcast`](crate::Broadcast).
22//!
23//! # Flow control
24//!
25//! Option and Result combinators (`.guard()`, `.map()`, `.and_then()`,
26//! `.filter()`, `.catch()`, etc.) work on both the main chain and
27//! within arms.
28//!
29//! **Within an arm**, `None` / `Err` short-circuits the remaining steps
30//! in **that arm only**. Sibling arms execute unconditionally. The merge
31//! step receives whatever each arm produced (including `None`).
32//!
33//! `.tap()` observes the value mid-chain without consuming or changing it.
34//!
35//! `.route()` is binary conditional routing — evaluates a predicate and
36//! executes exactly one of two arms. Both arms must produce the same
37//! output type. For N-ary routing, nest `route` calls.
38//!
39//! To skip an entire fork, resolve Option/Result **before** `.fork()`:
40//!
41//! ```ignore
42//! DagStart::<RawMsg>::new()
43//!     .root(decode, reg)
44//!     .guard(|_w, msg| msg.len() > 0)   // None skips everything below
45//!     .unwrap_or(default)                // → T, enter fork with concrete type
46//!     .fork()
47//!     // arms work with &T, not &Option<T>
48//! ```
49//!
50//! # Combinator quick reference
51//!
52//! **Topology:** `.root()`, `.then()`, `.fork()`, `.arm()`, `.merge()`,
53//! `.join()`, `.build()`
54//!
55//! **Flow control:** `.guard()`, `.tap()`, `.route()`, `.switch()`, `.tee()`,
56//! `.dedup()`
57//!
58//! **Tuple `(A, B, ...)` (2-5 elements):** `.splat()` (→ splat builder,
59//! call `.then()` with destructured `&T` args)
60//!
61//! **Option:** `.map()`, `.filter()`, `.inspect()`, `.and_then()`,
62//! `.on_none()`, `.ok_or()`, `.unwrap_or()`
63//!
64//! **Result:** `.map()`, `.and_then()`, `.catch()`, `.map_err()`,
65//! `.ok()`, `.unwrap_or()`
66//!
67//! **Bool:** `.not()`, `.and()`, `.or()`, `.xor()`
68//!
69//! **Terminal:** `.dispatch()`, `.cloned()`, `.build()`, `.build_batch(cap)`
70//!
71//! Pre-resolved (hot path): `.then()`, `.map()`, `.and_then()`, `.catch()`
72//!
73//! Closure-based (cold path): `.filter()`, `.inspect()`, `.tap()`,
74//! `.guard()`, `.route()`, `.switch()`, `.tee()`, `.dedup()`, `.on_none()`,
75//! `.map_err()`, `.ok()`, `.unwrap_or()`, `.dispatch()`, `.cloned()`
76//!
77//! # Splat — tuple destructuring
78//!
79//! When a step returns a tuple, the next step normally receives the
80//! whole tuple as `&(A, B)`. `.splat()` destructures it into individual
81//! reference arguments (`&A, &B`), reusing the existing merge step
82//! infrastructure:
83//!
84//! ```ignore
85//! fn split(t: Tick) -> (f64, u64) { (t.price, t.size) }
86//! fn weighted(price: &f64, size: &u64) -> f64 { *price * *size as f64 }
87//!
88//! DagStart::<Tick>::new()
89//!     .root(split, reg)       // Tick → (f64, u64)
90//!     .splat()                // destructure
91//!     .then(weighted, reg)    // (&f64, &u64) → f64
92//!     .build();
93//! ```
94//!
95//! Supported for tuples of 2-5 elements. Beyond 5, define a named
96//! struct — if a combinator stage needs that many arguments, a struct
97//! makes the intent clearer and the code more maintainable.
98//!
99//! # Node signatures
100//!
101//! The root node takes the event by value. All other nodes take their
102//! input by reference:
103//!
104//! ```ignore
105//! // Root: event by value
106//! fn decode(raw: RawMsg) -> DecodedMsg { .. }
107//!
108//! // Regular: input by reference
109//! fn update_ob(msg: &DecodedMsg) { .. }
110//! fn check_risk(config: Res<Config>, msg: &DecodedMsg) -> RiskResult { .. }
111//! ```
112//!
113//! # Examples
114//!
115//! ```
116//! use nexus_rt::{WorldBuilder, ResMut, Handler};
117//! use nexus_rt::dag::DagStart;
118//!
119//! let mut wb = WorldBuilder::new();
120//! wb.register::<u64>(0);
121//! let mut world = wb.build();
122//! let reg = world.registry();
123//!
124//! fn double(x: u32) -> u64 { x as u64 * 2 }
125//! fn store(mut out: ResMut<u64>, val: &u64) { *out = *val; }
126//!
127//! let mut dag = DagStart::<u32>::new()
128//!     .root(double, reg)
129//!     .then(store, reg)
130//!     .build();
131//!
132//! dag.run(&mut world, 5u32);
133//! assert_eq!(*world.resource::<u64>(), 10);
134//! ```
135
136use std::marker::PhantomData;
137
138use crate::Handler;
139use crate::pipeline::{IntoStep, StepCall};
140use crate::world::{Registry, World};
141
142// =============================================================================
143// MergeStepCall / IntoMergeStep — merge step dispatch
144// =============================================================================
145
146/// Callable trait for resolved merge steps.
147///
148/// Like [`StepCall`] but for merge steps with multiple reference inputs
149/// bundled as `Inputs` (e.g. `(&'a A, &'a B)`).
150#[doc(hidden)]
151pub trait MergeStepCall<Inputs, Out> {
152    /// Call this merge step with a world reference and input references.
153    fn call(&mut self, world: &mut World, inputs: Inputs) -> Out;
154}
155
156/// Converts a named function into a resolved merge step.
157///
158/// Params first, then N reference inputs, returns output:
159///
160/// ```ignore
161/// fn check(config: Res<Config>, ob: &ObResult, risk: &RiskResult) -> Decision { .. }
162/// ```
163#[doc(hidden)]
164pub trait IntoMergeStep<Inputs, Out, Params> {
165    /// The concrete resolved merge step type.
166    type Step: MergeStepCall<Inputs, Out>;
167
168    /// Resolve Param state from the registry and produce a merge step.
169    fn into_merge_step(self, registry: &Registry) -> Self::Step;
170}
171
172/// Internal: pre-resolved merge step with cached Param state.
173#[doc(hidden)]
174pub struct MergeStep<F, Params: crate::handler::Param> {
175    f: F,
176    state: Params::State,
177    #[allow(dead_code)]
178    name: &'static str,
179}
180
181// -- Merge arity 2 -----------------------------------------------------------
182
183// Param arity 0: closures work.
184impl<A, B, Out, F> MergeStepCall<(&A, &B), Out> for MergeStep<F, ()>
185where
186    F: FnMut(&A, &B) -> Out + 'static,
187{
188    #[inline(always)]
189    fn call(&mut self, _world: &mut World, inputs: (&A, &B)) -> Out {
190        (self.f)(inputs.0, inputs.1)
191    }
192}
193
194impl<A, B, Out, F> IntoMergeStep<(&A, &B), Out, ()> for F
195where
196    F: FnMut(&A, &B) -> Out + 'static,
197{
198    type Step = MergeStep<F, ()>;
199
200    fn into_merge_step(self, registry: &Registry) -> Self::Step {
201        MergeStep {
202            f: self,
203            state: <() as crate::handler::Param>::init(registry),
204            name: std::any::type_name::<F>(),
205        }
206    }
207}
208
209// Param arities 1-8 for merge arity 2.
210macro_rules! impl_merge2_step {
211    ($($P:ident),+) => {
212        impl<A, B, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
213            MergeStepCall<(&A, &B), Out> for MergeStep<F, ($($P,)+)>
214        where
215            for<'a> &'a mut F:
216                FnMut($($P,)+ &A, &B) -> Out +
217                FnMut($($P::Item<'a>,)+ &A, &B) -> Out,
218        {
219            #[inline(always)]
220            #[allow(non_snake_case)]
221            fn call(&mut self, world: &mut World, inputs: (&A, &B)) -> Out {
222                #[allow(clippy::too_many_arguments)]
223                fn call_inner<$($P,)+ IA, IB, Output>(
224                    mut f: impl FnMut($($P,)+ &IA, &IB) -> Output,
225                    $($P: $P,)+
226                    a: &IA, b: &IB,
227                ) -> Output {
228                    f($($P,)+ a, b)
229                }
230                #[cfg(debug_assertions)]
231                world.clear_borrows();
232                let ($($P,)+) = unsafe {
233                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
234                };
235                call_inner(&mut self.f, $($P,)+ inputs.0, inputs.1)
236            }
237        }
238
239        impl<A, B, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
240            IntoMergeStep<(&A, &B), Out, ($($P,)+)> for F
241        where
242            for<'a> &'a mut F:
243                FnMut($($P,)+ &A, &B) -> Out +
244                FnMut($($P::Item<'a>,)+ &A, &B) -> Out,
245        {
246            type Step = MergeStep<F, ($($P,)+)>;
247
248            fn into_merge_step(self, registry: &Registry) -> Self::Step {
249                let state = <($($P,)+) as crate::handler::Param>::init(registry);
250                {
251                    #[allow(non_snake_case)]
252                    let ($($P,)+) = &state;
253                    registry.check_access(&[
254                        $((<$P as crate::handler::Param>::resource_id($P),
255                           std::any::type_name::<$P>()),)+
256                    ]);
257                }
258                MergeStep { f: self, state, name: std::any::type_name::<F>() }
259            }
260        }
261    };
262}
263
264// -- Merge arity 3 -----------------------------------------------------------
265
266impl<A, B, C, Out, F> MergeStepCall<(&A, &B, &C), Out> for MergeStep<F, ()>
267where
268    F: FnMut(&A, &B, &C) -> Out + 'static,
269{
270    #[inline(always)]
271    fn call(&mut self, _world: &mut World, inputs: (&A, &B, &C)) -> Out {
272        (self.f)(inputs.0, inputs.1, inputs.2)
273    }
274}
275
276impl<A, B, C, Out, F> IntoMergeStep<(&A, &B, &C), Out, ()> for F
277where
278    F: FnMut(&A, &B, &C) -> Out + 'static,
279{
280    type Step = MergeStep<F, ()>;
281
282    fn into_merge_step(self, registry: &Registry) -> Self::Step {
283        MergeStep {
284            f: self,
285            state: <() as crate::handler::Param>::init(registry),
286            name: std::any::type_name::<F>(),
287        }
288    }
289}
290
291macro_rules! impl_merge3_step {
292    ($($P:ident),+) => {
293        impl<A, B, C, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
294            MergeStepCall<(&A, &B, &C), Out> for MergeStep<F, ($($P,)+)>
295        where
296            for<'a> &'a mut F:
297                FnMut($($P,)+ &A, &B, &C) -> Out +
298                FnMut($($P::Item<'a>,)+ &A, &B, &C) -> Out,
299        {
300            #[inline(always)]
301            #[allow(non_snake_case)]
302            fn call(&mut self, world: &mut World, inputs: (&A, &B, &C)) -> Out {
303                #[allow(clippy::too_many_arguments)]
304                fn call_inner<$($P,)+ IA, IB, IC, Output>(
305                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC) -> Output,
306                    $($P: $P,)+
307                    a: &IA, b: &IB, c: &IC,
308                ) -> Output {
309                    f($($P,)+ a, b, c)
310                }
311                #[cfg(debug_assertions)]
312                world.clear_borrows();
313                let ($($P,)+) = unsafe {
314                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
315                };
316                call_inner(&mut self.f, $($P,)+ inputs.0, inputs.1, inputs.2)
317            }
318        }
319
320        impl<A, B, C, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
321            IntoMergeStep<(&A, &B, &C), Out, ($($P,)+)> for F
322        where
323            for<'a> &'a mut F:
324                FnMut($($P,)+ &A, &B, &C) -> Out +
325                FnMut($($P::Item<'a>,)+ &A, &B, &C) -> Out,
326        {
327            type Step = MergeStep<F, ($($P,)+)>;
328
329            fn into_merge_step(self, registry: &Registry) -> Self::Step {
330                let state = <($($P,)+) as crate::handler::Param>::init(registry);
331                {
332                    #[allow(non_snake_case)]
333                    let ($($P,)+) = &state;
334                    registry.check_access(&[
335                        $((<$P as crate::handler::Param>::resource_id($P),
336                           std::any::type_name::<$P>()),)+
337                    ]);
338                }
339                MergeStep { f: self, state, name: std::any::type_name::<F>() }
340            }
341        }
342    };
343}
344
345// -- Merge arity 4 -----------------------------------------------------------
346
347impl<A, B, C, D, Out, F> MergeStepCall<(&A, &B, &C, &D), Out> for MergeStep<F, ()>
348where
349    F: FnMut(&A, &B, &C, &D) -> Out + 'static,
350{
351    #[inline(always)]
352    fn call(&mut self, _world: &mut World, i: (&A, &B, &C, &D)) -> Out {
353        (self.f)(i.0, i.1, i.2, i.3)
354    }
355}
356
357impl<A, B, C, D, Out, F> IntoMergeStep<(&A, &B, &C, &D), Out, ()> for F
358where
359    F: FnMut(&A, &B, &C, &D) -> Out + 'static,
360{
361    type Step = MergeStep<F, ()>;
362    fn into_merge_step(self, registry: &Registry) -> Self::Step {
363        MergeStep {
364            f: self,
365            state: <() as crate::handler::Param>::init(registry),
366            name: std::any::type_name::<F>(),
367        }
368    }
369}
370
371macro_rules! impl_merge4_step {
372    ($($P:ident),+) => {
373        impl<A, B, C, D, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
374            MergeStepCall<(&A, &B, &C, &D), Out> for MergeStep<F, ($($P,)+)>
375        where for<'a> &'a mut F:
376            FnMut($($P,)+ &A, &B, &C, &D) -> Out +
377            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
378        {
379            #[inline(always)]
380            #[allow(non_snake_case)]
381            fn call(&mut self, world: &mut World, i: (&A, &B, &C, &D)) -> Out {
382                #[allow(clippy::too_many_arguments)]
383                fn call_inner<$($P,)+ IA, IB, IC, ID, Output>(
384                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC, &ID) -> Output,
385                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID,
386                ) -> Output { f($($P,)+ a, b, c, d) }
387                #[cfg(debug_assertions)]
388                world.clear_borrows();
389                let ($($P,)+) = unsafe {
390                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
391                };
392                call_inner(&mut self.f, $($P,)+ i.0, i.1, i.2, i.3)
393            }
394        }
395        impl<A, B, C, D, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
396            IntoMergeStep<(&A, &B, &C, &D), Out, ($($P,)+)> for F
397        where for<'a> &'a mut F:
398            FnMut($($P,)+ &A, &B, &C, &D) -> Out +
399            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D) -> Out,
400        {
401            type Step = MergeStep<F, ($($P,)+)>;
402            fn into_merge_step(self, registry: &Registry) -> Self::Step {
403                let state = <($($P,)+) as crate::handler::Param>::init(registry);
404                { #[allow(non_snake_case)] let ($($P,)+) = &state;
405                  registry.check_access(&[$((<$P as crate::handler::Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
406                MergeStep { f: self, state, name: std::any::type_name::<F>() }
407            }
408        }
409    };
410}
411
412// -- all_tuples! for param arities -------------------------------------------
413
414macro_rules! all_tuples {
415    ($m:ident) => {
416        $m!(P0);
417        $m!(P0, P1);
418        $m!(P0, P1, P2);
419        $m!(P0, P1, P2, P3);
420        $m!(P0, P1, P2, P3, P4);
421        $m!(P0, P1, P2, P3, P4, P5);
422        $m!(P0, P1, P2, P3, P4, P5, P6);
423        $m!(P0, P1, P2, P3, P4, P5, P6, P7);
424    };
425}
426
427// -- Merge arity 5 -----------------------------------------------------------
428
429impl<A, B, C, D, E, Out, F> MergeStepCall<(&A, &B, &C, &D, &E), Out> for MergeStep<F, ()>
430where
431    F: FnMut(&A, &B, &C, &D, &E) -> Out + 'static,
432{
433    #[inline(always)]
434    fn call(&mut self, _world: &mut World, i: (&A, &B, &C, &D, &E)) -> Out {
435        (self.f)(i.0, i.1, i.2, i.3, i.4)
436    }
437}
438
439impl<A, B, C, D, E, Out, F> IntoMergeStep<(&A, &B, &C, &D, &E), Out, ()> for F
440where
441    F: FnMut(&A, &B, &C, &D, &E) -> Out + 'static,
442{
443    type Step = MergeStep<F, ()>;
444    fn into_merge_step(self, registry: &Registry) -> Self::Step {
445        MergeStep {
446            f: self,
447            state: <() as crate::handler::Param>::init(registry),
448            name: std::any::type_name::<F>(),
449        }
450    }
451}
452
453macro_rules! impl_merge5_step {
454    ($($P:ident),+) => {
455        impl<A, B, C, D, E, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
456            MergeStepCall<(&A, &B, &C, &D, &E), Out> for MergeStep<F, ($($P,)+)>
457        where for<'a> &'a mut F:
458            FnMut($($P,)+ &A, &B, &C, &D, &E) -> Out +
459            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D, &E) -> Out,
460        {
461            #[inline(always)]
462            #[allow(non_snake_case)]
463            fn call(&mut self, world: &mut World, i: (&A, &B, &C, &D, &E)) -> Out {
464                #[allow(clippy::too_many_arguments)]
465                fn call_inner<$($P,)+ IA, IB, IC, ID, IE, Output>(
466                    mut f: impl FnMut($($P,)+ &IA, &IB, &IC, &ID, &IE) -> Output,
467                    $($P: $P,)+ a: &IA, b: &IB, c: &IC, d: &ID, e: &IE,
468                ) -> Output { f($($P,)+ a, b, c, d, e) }
469                #[cfg(debug_assertions)]
470                world.clear_borrows();
471                let ($($P,)+) = unsafe {
472                    <($($P,)+) as crate::handler::Param>::fetch(world, &mut self.state)
473                };
474                call_inner(&mut self.f, $($P,)+ i.0, i.1, i.2, i.3, i.4)
475            }
476        }
477        impl<A, B, C, D, E, Out, F: 'static, $($P: crate::handler::Param + 'static),+>
478            IntoMergeStep<(&A, &B, &C, &D, &E), Out, ($($P,)+)> for F
479        where for<'a> &'a mut F:
480            FnMut($($P,)+ &A, &B, &C, &D, &E) -> Out +
481            FnMut($($P::Item<'a>,)+ &A, &B, &C, &D, &E) -> Out,
482        {
483            type Step = MergeStep<F, ($($P,)+)>;
484            fn into_merge_step(self, registry: &Registry) -> Self::Step {
485                let state = <($($P,)+) as crate::handler::Param>::init(registry);
486                { #[allow(non_snake_case)] let ($($P,)+) = &state;
487                  registry.check_access(&[$((<$P as crate::handler::Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
488                MergeStep { f: self, state, name: std::any::type_name::<F>() }
489            }
490        }
491    };
492}
493
494all_tuples!(impl_merge2_step);
495all_tuples!(impl_merge3_step);
496all_tuples!(impl_merge4_step);
497all_tuples!(impl_merge5_step);
498
499// =============================================================================
500// DAG — monomorphized, zero vtable dispatch
501// =============================================================================
502//
503// Encodes DAG topology in the type system at compile time. After
504// monomorphization, the entire DAG is a single flat function with all
505// values as stack locals. No arena, no bitmap. Only unsafe is
506// in the shared Param::fetch path (resource access by pre-resolved index).
507//
508// Fan-out: multiple nodes borrow the same stack local (no Clone).
509// Merge: merge step borrows all arm outputs.
510// Panic safety: stack unwinding drops all locals automatically.
511
512/// Entry point for building a DAG pipeline.
513///
514/// The DAG encodes topology in the type system at compile time,
515/// producing a single monomorphized closure chain. All values live as
516/// stack locals in the `run()` body — no arena, no vtable dispatch.
517/// The only `unsafe` is in the shared [`Param::fetch`](crate::Param)
518/// path (resource access by pre-resolved index).
519///
520/// # Examples
521///
522/// ```
523/// use nexus_rt::{WorldBuilder, ResMut, Handler};
524/// use nexus_rt::dag::DagStart;
525///
526/// let mut wb = WorldBuilder::new();
527/// wb.register::<u64>(0);
528/// let mut world = wb.build();
529/// let reg = world.registry();
530///
531/// fn double(x: u32) -> u64 { x as u64 * 2 }
532/// fn store(mut out: ResMut<u64>, val: &u64) { *out = *val; }
533///
534/// let mut dag = DagStart::<u32>::new()
535///     .root(double, reg)
536///     .then(store, reg)
537///     .build();
538///
539/// dag.run(&mut world, 5u32);
540/// assert_eq!(*world.resource::<u64>(), 10);
541/// ```
542pub struct DagStart<E>(PhantomData<fn(E)>);
543
544impl<E: 'static> DagStart<E> {
545    /// Create a new typed DAG entry point.
546    pub fn new() -> Self {
547        Self(PhantomData)
548    }
549
550    /// Set the root step. Takes the event `E` by value, produces `Out`.
551    pub fn root<Out, Params, S>(
552        self,
553        f: S,
554        registry: &Registry,
555    ) -> DagChain<E, Out, impl FnMut(&mut World, E) -> Out + use<E, Out, Params, S>>
556    where
557        Out: 'static,
558        S: IntoStep<E, Out, Params>,
559    {
560        let mut resolved = f.into_step(registry);
561        DagChain {
562            chain: move |world: &mut World, event: E| resolved.call(world, event),
563            _marker: PhantomData,
564        }
565    }
566}
567
568impl<E: 'static> Default for DagStart<E> {
569    fn default() -> Self {
570        Self::new()
571    }
572}
573
574/// Main chain builder for a typed DAG.
575///
576/// `Chain` is `FnMut(&mut World, E) -> Out` — the monomorphized closure
577/// representing all steps composed so far.
578pub struct DagChain<E, Out, Chain> {
579    chain: Chain,
580    _marker: PhantomData<fn(E) -> Out>,
581}
582
583impl<E: 'static, Out: 'static, Chain> DagChain<E, Out, Chain>
584where
585    Chain: FnMut(&mut World, E) -> Out,
586{
587    /// Enter fork mode. Subsequent `.arm()` calls add parallel branches.
588    pub fn fork(self) -> DagChainFork<E, Out, Chain, ()> {
589        DagChainFork {
590            chain: self.chain,
591            arms: (),
592            _marker: PhantomData,
593        }
594    }
595}
596
597impl<E: 'static, Chain> DagChain<E, (), Chain>
598where
599    Chain: FnMut(&mut World, E) + Send + 'static,
600{
601    /// Finalize into a [`Dag`](crate::Dag) that implements [`Handler<E>`].
602    ///
603    /// Only available when the chain ends with `()`. If your DAG
604    /// produces a value, add a final `.then()` that consumes the output.
605    pub fn build(self) -> Dag<E, Chain> {
606        Dag {
607            chain: self.chain,
608            _marker: PhantomData,
609        }
610    }
611}
612
613/// Arm builder seed. Used in `.arm()` closures and to build arms for
614/// [`.route()`](DagChain::route).
615///
616/// Call `.then()` to add the first step in this arm.
617pub struct DagArmStart<In>(PhantomData<fn(*const In)>);
618
619impl<In: 'static> DagArmStart<In> {
620    /// Create a new arm builder seed.
621    ///
622    /// Used to build arms passed to [`DagChain::route`] or
623    /// [`DagArm::route`]:
624    ///
625    /// ```ignore
626    /// let fast = DagArmStart::new().then(fast_path, reg);
627    /// let slow = DagArmStart::new().then(slow_path, reg);
628    /// dag.route(predicate, fast, slow)
629    /// ```
630    pub fn new() -> Self {
631        Self(PhantomData)
632    }
633}
634
635impl<In: 'static> Default for DagArmStart<In> {
636    fn default() -> Self {
637        Self::new()
638    }
639}
640
641impl<In: 'static> DagArmStart<In> {
642    /// Add the first step in this arm. Takes `&In` by reference.
643    pub fn then<Out, Params, S>(
644        self,
645        f: S,
646        registry: &Registry,
647    ) -> DagArm<In, Out, impl FnMut(&mut World, &In) -> Out + use<In, Out, Params, S>>
648    where
649        Out: 'static,
650        S: IntoStep<&'static In, Out, Params>,
651        S::Step: for<'a> StepCall<&'a In, Out>,
652    {
653        let mut resolved = f.into_step(registry);
654        DagArm {
655            chain: move |world: &mut World, input: &In| resolved.call(world, input),
656            _marker: PhantomData,
657        }
658    }
659}
660
661/// Built arm in a typed DAG fork.
662///
663/// `Chain` is `FnMut(&mut World, &In) -> Out` — the monomorphized
664/// closure for this arm's steps.
665pub struct DagArm<In, Out, Chain> {
666    pub(crate) chain: Chain,
667    _marker: PhantomData<fn(*const In) -> Out>,
668}
669
670impl<In: 'static, Out: 'static, Chain> DagArm<In, Out, Chain>
671where
672    Chain: FnMut(&mut World, &In) -> Out,
673{
674    /// Enter fork mode within this arm.
675    pub fn fork(self) -> DagArmFork<In, Out, Chain, ()> {
676        DagArmFork {
677            chain: self.chain,
678            arms: (),
679            _marker: PhantomData,
680        }
681    }
682}
683
684/// Fork builder on the main chain. Accumulates arms as a tuple.
685pub struct DagChainFork<E, ForkOut, Chain, Arms> {
686    chain: Chain,
687    arms: Arms,
688    _marker: PhantomData<fn(E) -> ForkOut>,
689}
690
691/// Fork builder within an arm. Accumulates sub-arms as a tuple.
692pub struct DagArmFork<In, ForkOut, Chain, Arms> {
693    chain: Chain,
694    arms: Arms,
695    _marker: PhantomData<fn(*const In) -> ForkOut>,
696}
697
698/// Final built DAG. Implements [`Handler<E>`].
699///
700/// Created by [`DagChain::build`]. The entire DAG is monomorphized
701/// at compile time — no boxing, no virtual dispatch, no arena.
702/// For batch processing, see [`BatchDag`].
703pub struct Dag<E, Chain> {
704    chain: Chain,
705    _marker: PhantomData<fn(E)>,
706}
707
708impl<E: 'static, Chain> Handler<E> for Dag<E, Chain>
709where
710    Chain: FnMut(&mut World, E) + Send + 'static,
711{
712    fn run(&mut self, world: &mut World, event: E) {
713        (self.chain)(world, event);
714    }
715
716    fn name(&self) -> &'static str {
717        "dag::Dag"
718    }
719}
720
721// =============================================================================
722// Fork arity macro — arm accumulation, merge, join
723// =============================================================================
724
725// =============================================================================
726// Combinator macro — shared between DagChain and DagArm
727// =============================================================================
728
729/// Generates step combinators, Option/Result helpers, and clone helpers.
730///
731/// DagChain and DagArm differ only in how the upstream chain is
732/// called (by value vs by reference). This macro generates identical
733/// combinator sets for both.
734///
735/// All `IntoStep`-based methods resolve steps with `&T` input (DAG
736/// semantics — every step borrows its input, never consumes it).
737macro_rules! impl_dag_combinators {
738    (
739        builder: $Builder:ident,
740        upstream: $U:ident,
741        chain_input: $chain_input:ty,
742        param: $pname:ident : $pty:ty
743    ) => {
744        // =============================================================
745        // Core — any Out
746        // =============================================================
747
748        impl<$U: 'static, Out: 'static, Chain> $Builder<$U, Out, Chain>
749        where
750            Chain: FnMut(&mut World, $chain_input) -> Out,
751        {
752            /// Append a step. The step receives `&Out` by reference.
753            pub fn then<NewOut, Params, S>(
754                self,
755                f: S,
756                registry: &Registry,
757            ) -> $Builder<
758                $U,
759                NewOut,
760                impl FnMut(&mut World, $pty) -> NewOut
761                    + use<$U, Out, NewOut, Params, Chain, S>,
762            >
763            where
764                NewOut: 'static,
765                S: IntoStep<&'static Out, NewOut, Params>,
766                S::Step: for<'a> StepCall<&'a Out, NewOut>,
767            {
768                let mut chain = self.chain;
769                let mut resolved = f.into_step(registry);
770                $Builder {
771                    chain: move |world: &mut World, $pname: $pty| {
772                        let out = chain(world, $pname);
773                        resolved.call(world, &out)
774                    },
775                    _marker: PhantomData,
776                }
777            }
778
779            /// Dispatch output to a [`Handler<Out>`].
780            ///
781            /// Feeds the chain's output into any handler —
782            /// [`HandlerFn`](crate::HandlerFn), [`Callback`](crate::Callback),
783            /// [`Pipeline`](crate::Pipeline), etc.
784            pub fn dispatch<H: Handler<Out>>(
785                self,
786                mut handler: H,
787            ) -> $Builder<$U, (), impl FnMut(&mut World, $pty) + use<$U, Out, Chain, H>>
788            {
789                let mut chain = self.chain;
790                $Builder {
791                    chain: move |world: &mut World, $pname: $pty| {
792                        let out = chain(world, $pname);
793                        handler.run(world, out);
794                    },
795                    _marker: PhantomData,
796                }
797            }
798
799            /// Conditionally wrap the output in `Option`. `Some(val)` if
800            /// the predicate returns true, `None` otherwise.
801            ///
802            /// Enters Option-combinator land — follow with `.map()`,
803            /// `.and_then()`, `.filter()`, `.unwrap_or()`, etc.
804            ///
805            /// Within a DAG arm, `None` short-circuits the remaining arm
806            /// steps — sibling arms and the merge step still execute.
807            pub fn guard(
808                self,
809                mut f: impl FnMut(&mut World, &Out) -> bool,
810            ) -> $Builder<
811                $U,
812                Option<Out>,
813                impl FnMut(&mut World, $pty) -> Option<Out>,
814            > {
815                let mut chain = self.chain;
816                $Builder {
817                    chain: move |world: &mut World, $pname: $pty| {
818                        let val = chain(world, $pname);
819                        if f(world, &val) { Some(val) } else { None }
820                    },
821                    _marker: PhantomData,
822                }
823            }
824
825            /// Observe the current value without consuming or changing it.
826            ///
827            /// The closure receives `&mut World` and `&Out`. The value passes
828            /// through unchanged. Useful for logging, metrics, or debugging
829            /// mid-chain.
830            pub fn tap(
831                self,
832                mut f: impl FnMut(&mut World, &Out),
833            ) -> $Builder<$U, Out, impl FnMut(&mut World, $pty) -> Out> {
834                let mut chain = self.chain;
835                $Builder {
836                    chain: move |world: &mut World, $pname: $pty| {
837                        let val = chain(world, $pname);
838                        f(world, &val);
839                        val
840                    },
841                    _marker: PhantomData,
842                }
843            }
844
845            /// Binary conditional routing. Evaluates the predicate on the
846            /// current value, then executes exactly one of two arms.
847            ///
848            /// Both arms receive the value by reference (same as fork arms)
849            /// and must produce the same output type. Build arms with
850            /// [`DagArmStart::new()`] and pass them in. For N-ary routing,
851            /// nest `route` calls in the false arm.
852            ///
853            /// ```ignore
854            /// let fast = DagArmStart::new().then(fast_path, reg).then(store, reg);
855            /// let slow = DagArmStart::new().then(slow_path, reg).then(store, reg);
856            ///
857            /// DagStart::<Order>::new()
858            ///     .root(decode, reg)
859            ///     .route(|_, order| order.priority > 5, fast, slow)
860            ///     .build();
861            /// ```
862            pub fn route<NewOut, C0, C1, P>(
863                self,
864                mut pred: P,
865                on_true: DagArm<Out, NewOut, C0>,
866                on_false: DagArm<Out, NewOut, C1>,
867            ) -> $Builder<
868                $U,
869                NewOut,
870                impl FnMut(&mut World, $pty) -> NewOut + use<$U, Out, NewOut, Chain, C0, C1, P>,
871            >
872            where
873                P: FnMut(&mut World, &Out) -> bool,
874                C0: FnMut(&mut World, &Out) -> NewOut,
875                C1: FnMut(&mut World, &Out) -> NewOut,
876            {
877                let mut chain = self.chain;
878                let mut c0 = on_true.chain;
879                let mut c1 = on_false.chain;
880                $Builder {
881                    chain: move |world: &mut World, $pname: $pty| {
882                        let val = chain(world, $pname);
883                        if pred(world, &val) {
884                            c0(world, &val)
885                        } else {
886                            c1(world, &val)
887                        }
888                    },
889                    _marker: PhantomData,
890                }
891            }
892
893            /// Fork off a multi-step side-effect chain. The arm borrows
894            /// `&Out`, runs to completion (producing `()`), and the
895            /// original value passes through unchanged.
896            ///
897            /// Multi-step version of [`tap`](Self::tap) — the arm has the
898            /// full combinator API with Param resolution. Build with
899            /// [`DagArmStart::new()`].
900            pub fn tee<C>(
901                self,
902                side: DagArm<Out, (), C>,
903            ) -> $Builder<$U, Out, impl FnMut(&mut World, $pty) -> Out>
904            where
905                C: FnMut(&mut World, &Out),
906            {
907                let mut chain = self.chain;
908                let mut side_chain = side.chain;
909                $Builder {
910                    chain: move |world: &mut World, $pname: $pty| {
911                        let val = chain(world, $pname);
912                        side_chain(world, &val);
913                        val
914                    },
915                    _marker: PhantomData,
916                }
917            }
918
919            /// Closure-based step for N-ary conditional routing.
920            ///
921            /// The closure receives `&mut World` and `&Out`, returning a new
922            /// value of type `NewOut`. Use with [`resolve_arm`] to pre-resolve
923            /// per-arm steps with independent [`Param`](crate::Param) sets:
924            ///
925            /// ```ignore
926            /// let mut arm_new    = resolve_arm(handle_new, reg);
927            /// let mut arm_cancel = resolve_arm(handle_cancel, reg);
928            ///
929            /// dag.switch(move |world, msg: &Decoded| match msg.kind {
930            ///     MsgKind::NewOrder => arm_new(world, msg),
931            ///     MsgKind::Cancel   => arm_cancel(world, msg),
932            /// })
933            /// ```
934            ///
935            /// For simple cases where all arms share the same params, a named
936            /// function with a `match` body passed to [`.then()`](Self::then)
937            /// is sufficient.
938            pub fn switch<NewOut>(
939                self,
940                mut f: impl FnMut(&mut World, &Out) -> NewOut,
941            ) -> $Builder<$U, NewOut, impl FnMut(&mut World, $pty) -> NewOut> {
942                let mut chain = self.chain;
943                $Builder {
944                    chain: move |world: &mut World, $pname: $pty| {
945                        let val = chain(world, $pname);
946                        f(world, &val)
947                    },
948                    _marker: PhantomData,
949                }
950            }
951        }
952
953        // =============================================================
954        // Dedup — suppress unchanged values
955        // =============================================================
956
957        impl<$U: 'static, Out: PartialEq + Clone, Chain> $Builder<$U, Out, Chain>
958        where
959            Chain: FnMut(&mut World, $chain_input) -> Out,
960        {
961            /// Suppress consecutive unchanged values. Returns `Some(val)`
962            /// when the value differs from the previous invocation, `None`
963            /// when unchanged. First invocation always returns `Some`.
964            ///
965            /// Requires `PartialEq + Clone` — the previous value is stored
966            /// internally for comparison.
967            pub fn dedup(
968                self,
969            ) -> $Builder<$U, Option<Out>, impl FnMut(&mut World, $pty) -> Option<Out>> {
970                let mut chain = self.chain;
971                let mut prev: Option<Out> = None;
972                $Builder {
973                    chain: move |world: &mut World, $pname: $pty| {
974                        let val = chain(world, $pname);
975                        if prev.as_ref() == Some(&val) {
976                            None
977                        } else {
978                            prev = Some(val.clone());
979                            Some(val)
980                        }
981                    },
982                    _marker: PhantomData,
983                }
984            }
985        }
986
987        // =============================================================
988        // Bool combinators
989        // =============================================================
990
991        impl<$U: 'static, Chain> $Builder<$U, bool, Chain>
992        where
993            Chain: FnMut(&mut World, $chain_input) -> bool,
994        {
995            /// Invert a boolean value.
996            #[allow(clippy::should_implement_trait)]
997            pub fn not(
998                self,
999            ) -> $Builder<$U, bool, impl FnMut(&mut World, $pty) -> bool> {
1000                let mut chain = self.chain;
1001                $Builder {
1002                    chain: move |world: &mut World, $pname: $pty| {
1003                        !chain(world, $pname)
1004                    },
1005                    _marker: PhantomData,
1006                }
1007            }
1008
1009            /// Short-circuit AND with a second boolean from World state.
1010            ///
1011            /// If the chain produces `false`, the closure is not called.
1012            pub fn and(
1013                self,
1014                mut f: impl FnMut(&mut World) -> bool,
1015            ) -> $Builder<$U, bool, impl FnMut(&mut World, $pty) -> bool> {
1016                let mut chain = self.chain;
1017                $Builder {
1018                    chain: move |world: &mut World, $pname: $pty| {
1019                        chain(world, $pname) && f(world)
1020                    },
1021                    _marker: PhantomData,
1022                }
1023            }
1024
1025            /// Short-circuit OR with a second boolean from World state.
1026            ///
1027            /// If the chain produces `true`, the closure is not called.
1028            pub fn or(
1029                self,
1030                mut f: impl FnMut(&mut World) -> bool,
1031            ) -> $Builder<$U, bool, impl FnMut(&mut World, $pty) -> bool> {
1032                let mut chain = self.chain;
1033                $Builder {
1034                    chain: move |world: &mut World, $pname: $pty| {
1035                        chain(world, $pname) || f(world)
1036                    },
1037                    _marker: PhantomData,
1038                }
1039            }
1040
1041            /// XOR with a second boolean from World state.
1042            ///
1043            /// Both sides are always evaluated.
1044            pub fn xor(
1045                self,
1046                mut f: impl FnMut(&mut World) -> bool,
1047            ) -> $Builder<$U, bool, impl FnMut(&mut World, $pty) -> bool> {
1048                let mut chain = self.chain;
1049                $Builder {
1050                    chain: move |world: &mut World, $pname: $pty| {
1051                        chain(world, $pname) ^ f(world)
1052                    },
1053                    _marker: PhantomData,
1054                }
1055            }
1056        }
1057
1058        // =============================================================
1059        // Clone helpers — &T → T transitions
1060        // =============================================================
1061
1062        impl<'a, $U: 'static, T: Clone, Chain> $Builder<$U, &'a T, Chain>
1063        where
1064            Chain: FnMut(&mut World, $chain_input) -> &'a T,
1065        {
1066            /// Clone a borrowed output to produce an owned value.
1067            ///
1068            /// Uses UFCS (`T::clone(val)`) — `val.clone()` on `&&T`
1069            /// resolves to `<&T as Clone>::clone`, returning `&T` not `T`.
1070            pub fn cloned(
1071                self,
1072            ) -> $Builder<$U, T, impl FnMut(&mut World, $pty) -> T> {
1073                let mut chain = self.chain;
1074                $Builder {
1075                    chain: move |world: &mut World, $pname: $pty| {
1076                        T::clone(chain(world, $pname))
1077                    },
1078                    _marker: PhantomData,
1079                }
1080            }
1081        }
1082
1083        impl<'a, $U: 'static, T: Clone, Chain> $Builder<$U, Option<&'a T>, Chain>
1084        where
1085            Chain: FnMut(&mut World, $chain_input) -> Option<&'a T>,
1086        {
1087            /// Clone inner borrowed value. `Option<&T>` → `Option<T>`.
1088            pub fn cloned(
1089                self,
1090            ) -> $Builder<$U, Option<T>, impl FnMut(&mut World, $pty) -> Option<T>>
1091            {
1092                let mut chain = self.chain;
1093                $Builder {
1094                    chain: move |world: &mut World, $pname: $pty| {
1095                        chain(world, $pname).cloned()
1096                    },
1097                    _marker: PhantomData,
1098                }
1099            }
1100        }
1101
1102        impl<'a, $U: 'static, T: Clone, Err, Chain>
1103            $Builder<$U, Result<&'a T, Err>, Chain>
1104        where
1105            Chain: FnMut(&mut World, $chain_input) -> Result<&'a T, Err>,
1106        {
1107            /// Clone inner borrowed Ok value.
1108            /// `Result<&T, Err>` → `Result<T, Err>`.
1109            pub fn cloned(
1110                self,
1111            ) -> $Builder<
1112                $U,
1113                Result<T, Err>,
1114                impl FnMut(&mut World, $pty) -> Result<T, Err>,
1115            > {
1116                let mut chain = self.chain;
1117                $Builder {
1118                    chain: move |world: &mut World, $pname: $pty| {
1119                        chain(world, $pname).cloned()
1120                    },
1121                    _marker: PhantomData,
1122                }
1123            }
1124        }
1125
1126        // =============================================================
1127        // Option helpers — $Builder<$U, Option<T>, Chain>
1128        // =============================================================
1129
1130        impl<$U: 'static, T: 'static, Chain> $Builder<$U, Option<T>, Chain>
1131        where
1132            Chain: FnMut(&mut World, $chain_input) -> Option<T>,
1133        {
1134            // -- IntoStep-based (hot path) --------------------------------
1135
1136            /// Transform the inner value. Step not called on None.
1137            pub fn map<U, Params, S: IntoStep<&'static T, U, Params>>(
1138                self,
1139                f: S,
1140                registry: &Registry,
1141            ) -> $Builder<
1142                $U,
1143                Option<U>,
1144                impl FnMut(&mut World, $pty) -> Option<U>
1145                    + use<$U, T, U, Params, Chain, S>,
1146            >
1147            where
1148                U: 'static,
1149                S::Step: for<'x> StepCall<&'x T, U>,
1150            {
1151                let mut chain = self.chain;
1152                let mut resolved = f.into_step(registry);
1153                $Builder {
1154                    chain: move |world: &mut World, $pname: $pty| {
1155                        chain(world, $pname)
1156                            .map(|ref val| resolved.call(world, val))
1157                    },
1158                    _marker: PhantomData,
1159                }
1160            }
1161
1162            /// Short-circuits on None. std: `Option::and_then`
1163            pub fn and_then<
1164                U,
1165                Params,
1166                S: IntoStep<&'static T, Option<U>, Params>,
1167            >(
1168                self,
1169                f: S,
1170                registry: &Registry,
1171            ) -> $Builder<
1172                $U,
1173                Option<U>,
1174                impl FnMut(&mut World, $pty) -> Option<U>
1175                    + use<$U, T, U, Params, Chain, S>,
1176            >
1177            where
1178                U: 'static,
1179                S::Step: for<'x> StepCall<&'x T, Option<U>>,
1180            {
1181                let mut chain = self.chain;
1182                let mut resolved = f.into_step(registry);
1183                $Builder {
1184                    chain: move |world: &mut World, $pname: $pty| {
1185                        chain(world, $pname)
1186                            .and_then(|ref val| resolved.call(world, val))
1187                    },
1188                    _marker: PhantomData,
1189                }
1190            }
1191
1192            // -- Closure-based (cold path) --------------------------------
1193
1194            /// Side effect on None.
1195            pub fn on_none(
1196                self,
1197                mut f: impl FnMut(&mut World),
1198            ) -> $Builder<
1199                $U,
1200                Option<T>,
1201                impl FnMut(&mut World, $pty) -> Option<T>,
1202            > {
1203                let mut chain = self.chain;
1204                $Builder {
1205                    chain: move |world: &mut World, $pname: $pty| {
1206                        let result = chain(world, $pname);
1207                        if result.is_none() {
1208                            f(world);
1209                        }
1210                        result
1211                    },
1212                    _marker: PhantomData,
1213                }
1214            }
1215
1216            /// Keep value if predicate holds. std: `Option::filter`
1217            pub fn filter(
1218                self,
1219                mut f: impl FnMut(&mut World, &T) -> bool,
1220            ) -> $Builder<
1221                $U,
1222                Option<T>,
1223                impl FnMut(&mut World, $pty) -> Option<T>,
1224            > {
1225                let mut chain = self.chain;
1226                $Builder {
1227                    chain: move |world: &mut World, $pname: $pty| {
1228                        chain(world, $pname).filter(|val| f(world, val))
1229                    },
1230                    _marker: PhantomData,
1231                }
1232            }
1233
1234            /// Side effect on Some value. std: `Option::inspect`
1235            pub fn inspect(
1236                self,
1237                mut f: impl FnMut(&mut World, &T),
1238            ) -> $Builder<
1239                $U,
1240                Option<T>,
1241                impl FnMut(&mut World, $pty) -> Option<T>,
1242            > {
1243                let mut chain = self.chain;
1244                $Builder {
1245                    chain: move |world: &mut World, $pname: $pty| {
1246                        chain(world, $pname).inspect(|val| f(world, val))
1247                    },
1248                    _marker: PhantomData,
1249                }
1250            }
1251
1252            /// None becomes Err(err). std: `Option::ok_or`
1253            ///
1254            /// `Clone` required because the chain may run many times —
1255            /// the error value is cloned on each `None` invocation.
1256            pub fn ok_or<Err: Clone>(
1257                self,
1258                err: Err,
1259            ) -> $Builder<
1260                $U,
1261                Result<T, Err>,
1262                impl FnMut(&mut World, $pty) -> Result<T, Err>,
1263            > {
1264                let mut chain = self.chain;
1265                $Builder {
1266                    chain: move |world: &mut World, $pname: $pty| {
1267                        chain(world, $pname).ok_or_else(|| err.clone())
1268                    },
1269                    _marker: PhantomData,
1270                }
1271            }
1272
1273            /// None becomes Err(f()). std: `Option::ok_or_else`
1274            pub fn ok_or_else<Err>(
1275                self,
1276                mut f: impl FnMut(&mut World) -> Err,
1277            ) -> $Builder<
1278                $U,
1279                Result<T, Err>,
1280                impl FnMut(&mut World, $pty) -> Result<T, Err>,
1281            > {
1282                let mut chain = self.chain;
1283                $Builder {
1284                    chain: move |world: &mut World, $pname: $pty| {
1285                        chain(world, $pname).ok_or_else(|| f(world))
1286                    },
1287                    _marker: PhantomData,
1288                }
1289            }
1290
1291            /// Exit Option — None becomes the default value.
1292            ///
1293            /// `Clone` required because the chain may run many times —
1294            /// the default is cloned on each `None` invocation (unlike
1295            /// std's `unwrap_or` which consumes the value once).
1296            pub fn unwrap_or(
1297                self,
1298                default: T,
1299            ) -> $Builder<$U, T, impl FnMut(&mut World, $pty) -> T>
1300            where
1301                T: Clone,
1302            {
1303                let mut chain = self.chain;
1304                $Builder {
1305                    chain: move |world: &mut World, $pname: $pty| {
1306                        chain(world, $pname)
1307                            .unwrap_or_else(|| default.clone())
1308                    },
1309                    _marker: PhantomData,
1310                }
1311            }
1312
1313            /// Exit Option — None becomes `f()`.
1314            pub fn unwrap_or_else(
1315                self,
1316                mut f: impl FnMut(&mut World) -> T,
1317            ) -> $Builder<$U, T, impl FnMut(&mut World, $pty) -> T>
1318            {
1319                let mut chain = self.chain;
1320                $Builder {
1321                    chain: move |world: &mut World, $pname: $pty| {
1322                        chain(world, $pname).unwrap_or_else(|| f(world))
1323                    },
1324                    _marker: PhantomData,
1325                }
1326            }
1327        }
1328
1329        // =============================================================
1330        // Result helpers — $Builder<$U, Result<T, Err>, Chain>
1331        // =============================================================
1332
1333        impl<$U: 'static, T: 'static, Err: 'static, Chain>
1334            $Builder<$U, Result<T, Err>, Chain>
1335        where
1336            Chain: FnMut(&mut World, $chain_input) -> Result<T, Err>,
1337        {
1338            // -- IntoStep-based (hot path) --------------------------------
1339
1340            /// Transform the Ok value. Step not called on Err.
1341            pub fn map<U, Params, S: IntoStep<&'static T, U, Params>>(
1342                self,
1343                f: S,
1344                registry: &Registry,
1345            ) -> $Builder<
1346                $U,
1347                Result<U, Err>,
1348                impl FnMut(&mut World, $pty) -> Result<U, Err>
1349                    + use<$U, T, Err, U, Params, Chain, S>,
1350            >
1351            where
1352                U: 'static,
1353                S::Step: for<'x> StepCall<&'x T, U>,
1354            {
1355                let mut chain = self.chain;
1356                let mut resolved = f.into_step(registry);
1357                $Builder {
1358                    chain: move |world: &mut World, $pname: $pty| {
1359                        chain(world, $pname)
1360                            .map(|ref val| resolved.call(world, val))
1361                    },
1362                    _marker: PhantomData,
1363                }
1364            }
1365
1366            /// Short-circuits on Err. std: `Result::and_then`
1367            pub fn and_then<
1368                U,
1369                Params,
1370                S: IntoStep<&'static T, Result<U, Err>, Params>,
1371            >(
1372                self,
1373                f: S,
1374                registry: &Registry,
1375            ) -> $Builder<
1376                $U,
1377                Result<U, Err>,
1378                impl FnMut(&mut World, $pty) -> Result<U, Err>
1379                    + use<$U, T, Err, U, Params, Chain, S>,
1380            >
1381            where
1382                U: 'static,
1383                S::Step: for<'x> StepCall<&'x T, Result<U, Err>>,
1384            {
1385                let mut chain = self.chain;
1386                let mut resolved = f.into_step(registry);
1387                $Builder {
1388                    chain: move |world: &mut World, $pname: $pty| {
1389                        chain(world, $pname)
1390                            .and_then(|ref val| resolved.call(world, val))
1391                    },
1392                    _marker: PhantomData,
1393                }
1394            }
1395
1396            /// Handle error and transition to Option.
1397            ///
1398            /// `Ok(val)` becomes `Some(val)` — handler not called.
1399            /// `Err(err)` calls the handler, then produces `None`.
1400            pub fn catch<Params, S: IntoStep<&'static Err, (), Params>>(
1401                self,
1402                f: S,
1403                registry: &Registry,
1404            ) -> $Builder<
1405                $U,
1406                Option<T>,
1407                impl FnMut(&mut World, $pty) -> Option<T>
1408                    + use<$U, T, Err, Params, Chain, S>,
1409            >
1410            where
1411                S::Step: for<'x> StepCall<&'x Err, ()>,
1412            {
1413                let mut chain = self.chain;
1414                let mut resolved = f.into_step(registry);
1415                $Builder {
1416                    chain: move |world: &mut World, $pname: $pty| {
1417                        match chain(world, $pname) {
1418                            Ok(val) => Some(val),
1419                            Err(ref err) => {
1420                                resolved.call(world, err);
1421                                None
1422                            }
1423                        }
1424                    },
1425                    _marker: PhantomData,
1426                }
1427            }
1428
1429            // -- Closure-based (cold path) --------------------------------
1430
1431            /// Transform the error. std: `Result::map_err`
1432            pub fn map_err<Err2>(
1433                self,
1434                mut f: impl FnMut(&mut World, Err) -> Err2,
1435            ) -> $Builder<
1436                $U,
1437                Result<T, Err2>,
1438                impl FnMut(&mut World, $pty) -> Result<T, Err2>,
1439            > {
1440                let mut chain = self.chain;
1441                $Builder {
1442                    chain: move |world: &mut World, $pname: $pty| {
1443                        chain(world, $pname)
1444                            .map_err(|err| f(world, err))
1445                    },
1446                    _marker: PhantomData,
1447                }
1448            }
1449
1450            /// Recover from Err. std: `Result::or_else`
1451            pub fn or_else<Err2>(
1452                self,
1453                mut f: impl FnMut(&mut World, Err) -> Result<T, Err2>,
1454            ) -> $Builder<
1455                $U,
1456                Result<T, Err2>,
1457                impl FnMut(&mut World, $pty) -> Result<T, Err2>,
1458            > {
1459                let mut chain = self.chain;
1460                $Builder {
1461                    chain: move |world: &mut World, $pname: $pty| {
1462                        chain(world, $pname)
1463                            .or_else(|err| f(world, err))
1464                    },
1465                    _marker: PhantomData,
1466                }
1467            }
1468
1469            /// Side effect on Ok. std: `Result::inspect`
1470            pub fn inspect(
1471                self,
1472                mut f: impl FnMut(&mut World, &T),
1473            ) -> $Builder<
1474                $U,
1475                Result<T, Err>,
1476                impl FnMut(&mut World, $pty) -> Result<T, Err>,
1477            > {
1478                let mut chain = self.chain;
1479                $Builder {
1480                    chain: move |world: &mut World, $pname: $pty| {
1481                        chain(world, $pname)
1482                            .inspect(|val| f(world, val))
1483                    },
1484                    _marker: PhantomData,
1485                }
1486            }
1487
1488            /// Side effect on Err. std: `Result::inspect_err`
1489            pub fn inspect_err(
1490                self,
1491                mut f: impl FnMut(&mut World, &Err),
1492            ) -> $Builder<
1493                $U,
1494                Result<T, Err>,
1495                impl FnMut(&mut World, $pty) -> Result<T, Err>,
1496            > {
1497                let mut chain = self.chain;
1498                $Builder {
1499                    chain: move |world: &mut World, $pname: $pty| {
1500                        chain(world, $pname)
1501                            .inspect_err(|err| f(world, err))
1502                    },
1503                    _marker: PhantomData,
1504                }
1505            }
1506
1507            /// Discard error, enter Option land. std: `Result::ok`
1508            pub fn ok(
1509                self,
1510            ) -> $Builder<
1511                $U,
1512                Option<T>,
1513                impl FnMut(&mut World, $pty) -> Option<T>,
1514            > {
1515                let mut chain = self.chain;
1516                $Builder {
1517                    chain: move |world: &mut World, $pname: $pty| {
1518                        chain(world, $pname).ok()
1519                    },
1520                    _marker: PhantomData,
1521                }
1522            }
1523
1524            /// Exit Result — Err becomes the default value.
1525            ///
1526            /// `Clone` required because the chain may run many times —
1527            /// the default is cloned on each `Err` invocation (unlike
1528            /// std's `unwrap_or` which consumes the value once).
1529            pub fn unwrap_or(
1530                self,
1531                default: T,
1532            ) -> $Builder<$U, T, impl FnMut(&mut World, $pty) -> T>
1533            where
1534                T: Clone,
1535            {
1536                let mut chain = self.chain;
1537                $Builder {
1538                    chain: move |world: &mut World, $pname: $pty| {
1539                        chain(world, $pname)
1540                            .unwrap_or_else(|_| default.clone())
1541                    },
1542                    _marker: PhantomData,
1543                }
1544            }
1545
1546            /// Exit Result — Err becomes `f(err)`.
1547            pub fn unwrap_or_else(
1548                self,
1549                mut f: impl FnMut(&mut World, Err) -> T,
1550            ) -> $Builder<$U, T, impl FnMut(&mut World, $pty) -> T>
1551            {
1552                let mut chain = self.chain;
1553                $Builder {
1554                    chain: move |world: &mut World, $pname: $pty| {
1555                        match chain(world, $pname) {
1556                            Ok(val) => val,
1557                            Err(err) => f(world, err),
1558                        }
1559                    },
1560                    _marker: PhantomData,
1561                }
1562            }
1563        }
1564    };
1565}
1566
1567impl_dag_combinators!(
1568    builder: DagChain,
1569    upstream: E,
1570    chain_input: E,
1571    param: event: E
1572);
1573
1574impl_dag_combinators!(
1575    builder: DagArm,
1576    upstream: In,
1577    chain_input: &In,
1578    param: input: &In
1579);
1580
1581// =============================================================================
1582// Splat — tuple destructuring into individual reference arguments (DAG)
1583// =============================================================================
1584//
1585// DAG splat reuses IntoMergeStep/MergeStepCall since DAG steps take inputs
1586// by reference — the function signature is the same as a merge step:
1587// `fn(Params..., &A, &B) -> Out`.
1588//
1589// Builder types are `#[doc(hidden)]` — users only see `.splat().then()`.
1590
1591macro_rules! define_dag_splat_builders {
1592    (
1593        $N:literal,
1594        chain: $SplatChain:ident,
1595        arm: $SplatArm:ident,
1596        arm_start: $SplatArmStart:ident,
1597        ($($T:ident),+),
1598        ($($idx:tt),+)
1599    ) => {
1600        /// DAG splat builder on the main chain.
1601        #[doc(hidden)]
1602        pub struct $SplatChain<E, $($T,)+ Chain> {
1603            chain: Chain,
1604            _marker: PhantomData<fn(E) -> ($($T,)+)>,
1605        }
1606
1607        impl<E: 'static, $($T: 'static,)+ Chain> $SplatChain<E, $($T,)+ Chain>
1608        where
1609            Chain: FnMut(&mut World, E) -> ($($T,)+),
1610        {
1611            /// Add a step that receives the tuple elements as individual `&T` arguments.
1612            pub fn then<NewOut, Params, S>(
1613                self,
1614                f: S,
1615                registry: &Registry,
1616            ) -> DagChain<
1617                E,
1618                NewOut,
1619                impl FnMut(&mut World, E) -> NewOut
1620                    + use<E, $($T,)+ NewOut, Params, Chain, S>,
1621            >
1622            where
1623                NewOut: 'static,
1624                S: IntoMergeStep<($(&'static $T,)+), NewOut, Params>,
1625                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1626            {
1627                let mut chain = self.chain;
1628                let mut resolved = f.into_merge_step(registry);
1629                DagChain {
1630                    chain: move |world: &mut World, event: E| {
1631                        let tuple = chain(world, event);
1632                        resolved.call(world, ($(&tuple.$idx,)+))
1633                    },
1634                    _marker: PhantomData,
1635                }
1636            }
1637        }
1638
1639        impl<E: 'static, $($T: 'static,)+ Chain> DagChain<E, ($($T,)+), Chain>
1640        where
1641            Chain: FnMut(&mut World, E) -> ($($T,)+),
1642        {
1643            /// Destructure the tuple output into individual `&T` arguments.
1644            pub fn splat(self) -> $SplatChain<E, $($T,)+ Chain> {
1645                $SplatChain {
1646                    chain: self.chain,
1647                    _marker: PhantomData,
1648                }
1649            }
1650        }
1651
1652        /// DAG splat builder within an arm.
1653        #[doc(hidden)]
1654        pub struct $SplatArm<In, $($T,)+ Chain> {
1655            chain: Chain,
1656            _marker: PhantomData<fn(*const In) -> ($($T,)+)>,
1657        }
1658
1659        impl<In: 'static, $($T: 'static,)+ Chain> $SplatArm<In, $($T,)+ Chain>
1660        where
1661            Chain: FnMut(&mut World, &In) -> ($($T,)+),
1662        {
1663            /// Add a step that receives the tuple elements as individual `&T` arguments.
1664            pub fn then<NewOut, Params, S>(
1665                self,
1666                f: S,
1667                registry: &Registry,
1668            ) -> DagArm<
1669                In,
1670                NewOut,
1671                impl FnMut(&mut World, &In) -> NewOut
1672                    + use<In, $($T,)+ NewOut, Params, Chain, S>,
1673            >
1674            where
1675                NewOut: 'static,
1676                S: IntoMergeStep<($(&'static $T,)+), NewOut, Params>,
1677                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), NewOut>,
1678            {
1679                let mut chain = self.chain;
1680                let mut resolved = f.into_merge_step(registry);
1681                DagArm {
1682                    chain: move |world: &mut World, input: &In| {
1683                        let tuple = chain(world, input);
1684                        resolved.call(world, ($(&tuple.$idx,)+))
1685                    },
1686                    _marker: PhantomData,
1687                }
1688            }
1689        }
1690
1691        impl<In: 'static, $($T: 'static,)+ Chain> DagArm<In, ($($T,)+), Chain>
1692        where
1693            Chain: FnMut(&mut World, &In) -> ($($T,)+),
1694        {
1695            /// Destructure the tuple output into individual `&T` arguments.
1696            pub fn splat(self) -> $SplatArm<In, $($T,)+ Chain> {
1697                $SplatArm {
1698                    chain: self.chain,
1699                    _marker: PhantomData,
1700                }
1701            }
1702        }
1703
1704        /// DAG splat builder at arm start position.
1705        #[doc(hidden)]
1706        pub struct $SplatArmStart<$($T),+>(PhantomData<fn(($($T,)+))>);
1707
1708        impl<$($T: 'static),+> $SplatArmStart<$($T),+> {
1709            /// Add a step that receives the tuple elements as individual `&T` arguments.
1710            pub fn then<Out, Params, S>(
1711                self,
1712                f: S,
1713                registry: &Registry,
1714            ) -> DagArm<
1715                ($($T,)+),
1716                Out,
1717                impl FnMut(&mut World, &($($T,)+)) -> Out
1718                    + use<$($T,)+ Out, Params, S>,
1719            >
1720            where
1721                Out: 'static,
1722                S: IntoMergeStep<($(&'static $T,)+), Out, Params>,
1723                S::Step: for<'x> MergeStepCall<($(&'x $T,)+), Out>,
1724            {
1725                let mut resolved = f.into_merge_step(registry);
1726                DagArm {
1727                    chain: move |world: &mut World, input: &($($T,)+)| {
1728                        resolved.call(world, ($(&input.$idx,)+))
1729                    },
1730                    _marker: PhantomData,
1731                }
1732            }
1733        }
1734
1735        impl<$($T: 'static),+> DagArmStart<($($T,)+)> {
1736            /// Destructure the tuple input into individual `&T` arguments.
1737            pub fn splat(self) -> $SplatArmStart<$($T),+> {
1738                $SplatArmStart(PhantomData)
1739            }
1740        }
1741    };
1742}
1743
1744define_dag_splat_builders!(2,
1745    chain: DagSplatChain2,
1746    arm: DagSplatArm2,
1747    arm_start: DagSplatArmStart2,
1748    (T0, T1),
1749    (0, 1)
1750);
1751
1752define_dag_splat_builders!(3,
1753    chain: DagSplatChain3,
1754    arm: DagSplatArm3,
1755    arm_start: DagSplatArmStart3,
1756    (T0, T1, T2),
1757    (0, 1, 2)
1758);
1759
1760define_dag_splat_builders!(4,
1761    chain: DagSplatChain4,
1762    arm: DagSplatArm4,
1763    arm_start: DagSplatArmStart4,
1764    (T0, T1, T2, T3),
1765    (0, 1, 2, 3)
1766);
1767
1768define_dag_splat_builders!(5,
1769    chain: DagSplatChain5,
1770    arm: DagSplatArm5,
1771    arm_start: DagSplatArmStart5,
1772    (T0, T1, T2, T3, T4),
1773    (0, 1, 2, 3, 4)
1774);
1775
1776// =============================================================================
1777// Fork arity macro — arm accumulation, merge, join
1778// =============================================================================
1779
1780/// Generates arm accumulation, merge, and join for a fork type.
1781///
1782/// ChainFork and ArmFork differ only in:
1783/// - How the upstream chain is called (by value vs by reference)
1784/// - What output type is produced (DagChain vs DagArm)
1785macro_rules! impl_dag_fork {
1786    (
1787        fork: $Fork:ident,
1788        output: $Output:ident,
1789        upstream: $U:ident,
1790        chain_input: $chain_input:ty,
1791        param: $pname:ident : $pty:ty
1792    ) => {
1793        // =============================================================
1794        // Arm accumulation: 0→1, 1→2, 2→3, 3→4
1795        // =============================================================
1796
1797        impl<$U, ForkOut, Chain> $Fork<$U, ForkOut, Chain, ()> {
1798            /// Add the first arm to this fork.
1799            pub fn arm<AOut, ACh>(
1800                self,
1801                f: impl FnOnce(DagArmStart<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1802            ) -> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, AOut, ACh>,)> {
1803                let arm = f(DagArmStart(PhantomData));
1804                $Fork {
1805                    chain: self.chain,
1806                    arms: (arm,),
1807                    _marker: PhantomData,
1808                }
1809            }
1810        }
1811
1812        impl<$U, ForkOut, Chain, A0, C0> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>,)> {
1813            /// Add a second arm to this fork.
1814            pub fn arm<AOut, ACh>(
1815                self,
1816                f: impl FnOnce(DagArmStart<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1817            ) -> $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, AOut, ACh>)>
1818            {
1819                let arm = f(DagArmStart(PhantomData));
1820                let (a0,) = self.arms;
1821                $Fork {
1822                    chain: self.chain,
1823                    arms: (a0, arm),
1824                    _marker: PhantomData,
1825                }
1826            }
1827        }
1828
1829        impl<$U, ForkOut, Chain, A0, C0, A1, C1>
1830            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, A1, C1>)>
1831        {
1832            /// Add a third arm to this fork.
1833            pub fn arm<AOut, ACh>(
1834                self,
1835                f: impl FnOnce(DagArmStart<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1836            ) -> $Fork<
1837                $U,
1838                ForkOut,
1839                Chain,
1840                (
1841                    DagArm<ForkOut, A0, C0>,
1842                    DagArm<ForkOut, A1, C1>,
1843                    DagArm<ForkOut, AOut, ACh>,
1844                ),
1845            > {
1846                let arm = f(DagArmStart(PhantomData));
1847                let (a0, a1) = self.arms;
1848                $Fork {
1849                    chain: self.chain,
1850                    arms: (a0, a1, arm),
1851                    _marker: PhantomData,
1852                }
1853            }
1854        }
1855
1856        impl<$U, ForkOut, Chain, A0, C0, A1, C1, A2, C2>
1857            $Fork<
1858                $U,
1859                ForkOut,
1860                Chain,
1861                (
1862                    DagArm<ForkOut, A0, C0>,
1863                    DagArm<ForkOut, A1, C1>,
1864                    DagArm<ForkOut, A2, C2>,
1865                ),
1866            >
1867        {
1868            /// Add a fourth arm to this fork.
1869            pub fn arm<AOut, ACh>(
1870                self,
1871                f: impl FnOnce(DagArmStart<ForkOut>) -> DagArm<ForkOut, AOut, ACh>,
1872            ) -> $Fork<
1873                $U,
1874                ForkOut,
1875                Chain,
1876                (
1877                    DagArm<ForkOut, A0, C0>,
1878                    DagArm<ForkOut, A1, C1>,
1879                    DagArm<ForkOut, A2, C2>,
1880                    DagArm<ForkOut, AOut, ACh>,
1881                ),
1882            > {
1883                let arm = f(DagArmStart(PhantomData));
1884                let (a0, a1, a2) = self.arms;
1885                $Fork {
1886                    chain: self.chain,
1887                    arms: (a0, a1, a2, arm),
1888                    _marker: PhantomData,
1889                }
1890            }
1891        }
1892
1893        // =============================================================
1894        // Merge arity 2
1895        // =============================================================
1896
1897        impl<$U: 'static, ForkOut: 'static, Chain, A0: 'static, C0, A1: 'static, C1>
1898            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, A0, C0>, DagArm<ForkOut, A1, C1>)>
1899        where
1900            Chain: FnMut(&mut World, $chain_input) -> ForkOut,
1901            C0: FnMut(&mut World, &ForkOut) -> A0,
1902            C1: FnMut(&mut World, &ForkOut) -> A1,
1903        {
1904            /// Merge two arms with a merge step.
1905            pub fn merge<MOut, Params, S>(
1906                self,
1907                f: S,
1908                registry: &Registry,
1909            ) -> $Output<
1910                $U,
1911                MOut,
1912                impl FnMut(&mut World, $pty) -> MOut
1913                + use<$U, ForkOut, MOut, Params, Chain, S, A0, C0, A1, C1>,
1914            >
1915            where
1916                MOut: 'static,
1917                S: IntoMergeStep<(&'static A0, &'static A1), MOut, Params>,
1918                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1), MOut>,
1919            {
1920                let mut chain = self.chain;
1921                let (a0, a1) = self.arms;
1922                let mut c0 = a0.chain;
1923                let mut c1 = a1.chain;
1924                let mut ms = f.into_merge_step(registry);
1925                $Output {
1926                    chain: move |world: &mut World, $pname: $pty| {
1927                        let fork_out = chain(world, $pname);
1928                        let o0 = c0(world, &fork_out);
1929                        let o1 = c1(world, &fork_out);
1930                        ms.call(world, (&o0, &o1))
1931                    },
1932                    _marker: PhantomData,
1933                }
1934            }
1935        }
1936
1937        impl<$U: 'static, ForkOut: 'static, Chain, C0, C1>
1938            $Fork<$U, ForkOut, Chain, (DagArm<ForkOut, (), C0>, DagArm<ForkOut, (), C1>)>
1939        where
1940            Chain: FnMut(&mut World, $chain_input) -> ForkOut,
1941            C0: FnMut(&mut World, &ForkOut),
1942            C1: FnMut(&mut World, &ForkOut),
1943        {
1944            /// Join two sink arms (all producing `()`).
1945            pub fn join(
1946                self,
1947            ) -> $Output<$U, (), impl FnMut(&mut World, $pty) + use<$U, ForkOut, Chain, C0, C1>>
1948            {
1949                let mut chain = self.chain;
1950                let (a0, a1) = self.arms;
1951                let mut c0 = a0.chain;
1952                let mut c1 = a1.chain;
1953                $Output {
1954                    chain: move |world: &mut World, $pname: $pty| {
1955                        let fork_out = chain(world, $pname);
1956                        c0(world, &fork_out);
1957                        c1(world, &fork_out);
1958                    },
1959                    _marker: PhantomData,
1960                }
1961            }
1962        }
1963
1964        // =============================================================
1965        // Merge arity 3
1966        // =============================================================
1967
1968        impl<
1969            $U: 'static,
1970            ForkOut: 'static,
1971            Chain,
1972            A0: 'static,
1973            C0,
1974            A1: 'static,
1975            C1,
1976            A2: 'static,
1977            C2,
1978        >
1979            $Fork<
1980                $U,
1981                ForkOut,
1982                Chain,
1983                (
1984                    DagArm<ForkOut, A0, C0>,
1985                    DagArm<ForkOut, A1, C1>,
1986                    DagArm<ForkOut, A2, C2>,
1987                ),
1988            >
1989        where
1990            Chain: FnMut(&mut World, $chain_input) -> ForkOut,
1991            C0: FnMut(&mut World, &ForkOut) -> A0,
1992            C1: FnMut(&mut World, &ForkOut) -> A1,
1993            C2: FnMut(&mut World, &ForkOut) -> A2,
1994        {
1995            /// Merge three arms with a merge step.
1996            pub fn merge<MOut, Params, S>(
1997                self,
1998                f: S,
1999                registry: &Registry,
2000            ) -> $Output<
2001                $U,
2002                MOut,
2003                impl FnMut(&mut World, $pty) -> MOut
2004                + use<$U, ForkOut, MOut, Params, Chain, S, A0, C0, A1, C1, A2, C2>,
2005            >
2006            where
2007                MOut: 'static,
2008                S: IntoMergeStep<(&'static A0, &'static A1, &'static A2), MOut, Params>,
2009                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2), MOut>,
2010            {
2011                let mut chain = self.chain;
2012                let (a0, a1, a2) = self.arms;
2013                let mut c0 = a0.chain;
2014                let mut c1 = a1.chain;
2015                let mut c2 = a2.chain;
2016                let mut ms = f.into_merge_step(registry);
2017                $Output {
2018                    chain: move |world: &mut World, $pname: $pty| {
2019                        let fork_out = chain(world, $pname);
2020                        let o0 = c0(world, &fork_out);
2021                        let o1 = c1(world, &fork_out);
2022                        let o2 = c2(world, &fork_out);
2023                        ms.call(world, (&o0, &o1, &o2))
2024                    },
2025                    _marker: PhantomData,
2026                }
2027            }
2028        }
2029
2030        impl<$U: 'static, ForkOut: 'static, Chain, C0, C1, C2>
2031            $Fork<
2032                $U,
2033                ForkOut,
2034                Chain,
2035                (
2036                    DagArm<ForkOut, (), C0>,
2037                    DagArm<ForkOut, (), C1>,
2038                    DagArm<ForkOut, (), C2>,
2039                ),
2040            >
2041        where
2042            Chain: FnMut(&mut World, $chain_input) -> ForkOut,
2043            C0: FnMut(&mut World, &ForkOut),
2044            C1: FnMut(&mut World, &ForkOut),
2045            C2: FnMut(&mut World, &ForkOut),
2046        {
2047            /// Join three sink arms (all producing `()`).
2048            pub fn join(
2049                self,
2050            ) -> $Output<$U, (), impl FnMut(&mut World, $pty) + use<$U, ForkOut, Chain, C0, C1, C2>>
2051            {
2052                let mut chain = self.chain;
2053                let (a0, a1, a2) = self.arms;
2054                let mut c0 = a0.chain;
2055                let mut c1 = a1.chain;
2056                let mut c2 = a2.chain;
2057                $Output {
2058                    chain: move |world: &mut World, $pname: $pty| {
2059                        let fork_out = chain(world, $pname);
2060                        c0(world, &fork_out);
2061                        c1(world, &fork_out);
2062                        c2(world, &fork_out);
2063                    },
2064                    _marker: PhantomData,
2065                }
2066            }
2067        }
2068
2069        // =============================================================
2070        // Merge arity 4
2071        // =============================================================
2072
2073        #[allow(clippy::many_single_char_names)]
2074        impl<
2075            $U: 'static,
2076            ForkOut: 'static,
2077            Chain,
2078            A0: 'static,
2079            C0,
2080            A1: 'static,
2081            C1,
2082            A2: 'static,
2083            C2,
2084            A3: 'static,
2085            C3,
2086        >
2087            $Fork<
2088                $U,
2089                ForkOut,
2090                Chain,
2091                (
2092                    DagArm<ForkOut, A0, C0>,
2093                    DagArm<ForkOut, A1, C1>,
2094                    DagArm<ForkOut, A2, C2>,
2095                    DagArm<ForkOut, A3, C3>,
2096                ),
2097            >
2098        where
2099            Chain: FnMut(&mut World, $chain_input) -> ForkOut,
2100            C0: FnMut(&mut World, &ForkOut) -> A0,
2101            C1: FnMut(&mut World, &ForkOut) -> A1,
2102            C2: FnMut(&mut World, &ForkOut) -> A2,
2103            C3: FnMut(&mut World, &ForkOut) -> A3,
2104        {
2105            /// Merge four arms with a merge step.
2106            pub fn merge<MOut, Params, S>(
2107                self,
2108                f: S,
2109                registry: &Registry,
2110            ) -> $Output<
2111                $U,
2112                MOut,
2113                impl FnMut(&mut World, $pty) -> MOut
2114                + use<$U, ForkOut, MOut, Params, Chain, S, A0, C0, A1, C1, A2, C2, A3, C3>,
2115            >
2116            where
2117                MOut: 'static,
2118                S: IntoMergeStep<
2119                        (&'static A0, &'static A1, &'static A2, &'static A3),
2120                        MOut,
2121                        Params,
2122                    >,
2123                S::Step: for<'x> MergeStepCall<(&'x A0, &'x A1, &'x A2, &'x A3), MOut>,
2124            {
2125                let mut chain = self.chain;
2126                let (a0, a1, a2, a3) = self.arms;
2127                let mut c0 = a0.chain;
2128                let mut c1 = a1.chain;
2129                let mut c2 = a2.chain;
2130                let mut c3 = a3.chain;
2131                let mut ms = f.into_merge_step(registry);
2132                $Output {
2133                    chain: move |world: &mut World, $pname: $pty| {
2134                        let fork_out = chain(world, $pname);
2135                        let o0 = c0(world, &fork_out);
2136                        let o1 = c1(world, &fork_out);
2137                        let o2 = c2(world, &fork_out);
2138                        let o3 = c3(world, &fork_out);
2139                        ms.call(world, (&o0, &o1, &o2, &o3))
2140                    },
2141                    _marker: PhantomData,
2142                }
2143            }
2144        }
2145
2146        impl<$U: 'static, ForkOut: 'static, Chain, C0, C1, C2, C3>
2147            $Fork<
2148                $U,
2149                ForkOut,
2150                Chain,
2151                (
2152                    DagArm<ForkOut, (), C0>,
2153                    DagArm<ForkOut, (), C1>,
2154                    DagArm<ForkOut, (), C2>,
2155                    DagArm<ForkOut, (), C3>,
2156                ),
2157            >
2158        where
2159            Chain: FnMut(&mut World, $chain_input) -> ForkOut,
2160            C0: FnMut(&mut World, &ForkOut),
2161            C1: FnMut(&mut World, &ForkOut),
2162            C2: FnMut(&mut World, &ForkOut),
2163            C3: FnMut(&mut World, &ForkOut),
2164        {
2165            /// Join four sink arms (all producing `()`).
2166            pub fn join(
2167                self,
2168            ) -> $Output<
2169                $U,
2170                (),
2171                impl FnMut(&mut World, $pty) + use<$U, ForkOut, Chain, C0, C1, C2, C3>,
2172            > {
2173                let mut chain = self.chain;
2174                let (a0, a1, a2, a3) = self.arms;
2175                let mut c0 = a0.chain;
2176                let mut c1 = a1.chain;
2177                let mut c2 = a2.chain;
2178                let mut c3 = a3.chain;
2179                $Output {
2180                    chain: move |world: &mut World, $pname: $pty| {
2181                        let fork_out = chain(world, $pname);
2182                        c0(world, &fork_out);
2183                        c1(world, &fork_out);
2184                        c2(world, &fork_out);
2185                        c3(world, &fork_out);
2186                    },
2187                    _marker: PhantomData,
2188                }
2189            }
2190        }
2191    };
2192}
2193
2194impl_dag_fork!(
2195    fork: DagChainFork,
2196    output: DagChain,
2197    upstream: E,
2198    chain_input: E,
2199    param: event: E
2200);
2201
2202impl_dag_fork!(
2203    fork: DagArmFork,
2204    output: DagArm,
2205    upstream: In,
2206    chain_input: &In,
2207    param: input: &In
2208);
2209
2210// =============================================================================
2211// build_batch — when Out: PipelineOutput (() or Option<()>)
2212// =============================================================================
2213
2214impl<E: 'static, Out: crate::PipelineOutput, Chain> DagChain<E, Out, Chain>
2215where
2216    Chain: FnMut(&mut World, E) -> Out + 'static,
2217{
2218    /// Finalize into a [`BatchDag`] with a pre-allocated input buffer.
2219    ///
2220    /// Same DAG chain as [`build`](DagChain::build), but the DAG owns an
2221    /// input buffer that drivers fill between dispatch cycles. Each call
2222    /// to [`BatchDag::run`] drains the buffer, running every item through
2223    /// the chain independently.
2224    ///
2225    /// Available when the DAG ends with `()` or `Option<()>` (e.g.
2226    /// after `.guard()` or `.filter()` followed by `.unwrap_or(())`).
2227    ///
2228    /// `capacity` is the initial allocation — the buffer can grow if needed,
2229    /// but sizing it for the expected batch size avoids reallocation.
2230    pub fn build_batch(self, capacity: usize) -> BatchDag<E, Chain> {
2231        BatchDag {
2232            input: Vec::with_capacity(capacity),
2233            chain: self.chain,
2234        }
2235    }
2236}
2237
2238// =============================================================================
2239// BatchDag<E, F> — DAG with owned input buffer
2240// =============================================================================
2241
2242/// Batch DAG that owns a pre-allocated input buffer.
2243///
2244/// Created by [`DagChain::build_batch`]. Each item flows through the
2245/// full DAG chain independently — the same per-item `Option` and
2246/// `Result` flow control as [`Dag`]. Errors are handled inline (via
2247/// `.catch()`, `.unwrap_or()`, etc.) and the batch continues to the
2248/// next item.
2249///
2250/// Unlike [`Dag`], `BatchDag` does not implement [`Handler`] — it is
2251/// driven directly by the owner via [`run()`](BatchDag::run).
2252///
2253/// # Examples
2254///
2255/// ```
2256/// use nexus_rt::{WorldBuilder, ResMut};
2257/// use nexus_rt::dag::DagStart;
2258///
2259/// let mut wb = WorldBuilder::new();
2260/// wb.register::<u64>(0);
2261/// let mut world = wb.build();
2262/// let reg = world.registry();
2263///
2264/// fn double(x: u32) -> u64 { x as u64 * 2 }
2265/// fn store(mut out: ResMut<u64>, val: &u64) { *out += *val; }
2266///
2267/// let mut batch = DagStart::<u32>::new()
2268///     .root(double, reg)
2269///     .then(store, reg)
2270///     .build_batch(8);
2271///
2272/// batch.input_mut().extend([1, 2, 3]);
2273/// batch.run(&mut world);
2274///
2275/// assert_eq!(*world.resource::<u64>(), 12); // 2 + 4 + 6
2276/// assert!(batch.input().is_empty());
2277/// ```
2278pub struct BatchDag<E, F> {
2279    input: Vec<E>,
2280    chain: F,
2281}
2282
2283impl<E, Out: crate::PipelineOutput, F: FnMut(&mut World, E) -> Out> BatchDag<E, F> {
2284    /// Mutable access to the input buffer. Drivers fill this between
2285    /// dispatch cycles.
2286    pub fn input_mut(&mut self) -> &mut Vec<E> {
2287        &mut self.input
2288    }
2289
2290    /// Read-only access to the input buffer.
2291    pub fn input(&self) -> &[E] {
2292        &self.input
2293    }
2294
2295    /// Drain the input buffer, running each item through the DAG.
2296    ///
2297    /// Each item gets independent `Option`/`Result` flow control — an
2298    /// error on one item does not affect subsequent items. After `run()`,
2299    /// the input buffer is empty but retains its allocation.
2300    pub fn run(&mut self, world: &mut World) {
2301        for item in self.input.drain(..) {
2302            let _ = (self.chain)(world, item);
2303        }
2304    }
2305}
2306
2307// =============================================================================
2308// resolve_arm — pre-resolve a step for manual dispatch
2309// =============================================================================
2310
2311/// Resolve a step for use in manual dispatch (e.g. inside a
2312/// [`.switch()`](DagChain::switch) closure).
2313///
2314/// Returns a closure with pre-resolved [`Param`](crate::Param) state —
2315/// the same build-time resolution that `.then()` performs, but as a
2316/// standalone value the caller can invoke from any context.
2317///
2318/// # Examples
2319///
2320/// ```ignore
2321/// let mut arm0 = resolve_arm(handle_new, reg);
2322/// let mut arm1 = resolve_arm(handle_cancel, reg);
2323///
2324/// dag.switch(move |world, msg: &Decoded| match msg.kind {
2325///     MsgKind::NewOrder => arm0(world, msg),
2326///     MsgKind::Cancel   => arm1(world, msg),
2327/// })
2328/// ```
2329pub fn resolve_arm<In, Out, Params, S>(
2330    f: S,
2331    registry: &Registry,
2332) -> impl FnMut(&mut World, &In) -> Out + use<In, Out, Params, S>
2333where
2334    In: 'static,
2335    Out: 'static,
2336    S: IntoStep<&'static In, Out, Params>,
2337    S::Step: for<'a> StepCall<&'a In, Out>,
2338{
2339    let mut resolved = f.into_step(registry);
2340    move |world: &mut World, input: &In| resolved.call(world, input)
2341}
2342
2343// =============================================================================
2344// Tests
2345// =============================================================================
2346
2347#[cfg(test)]
2348mod tests {
2349    use super::*;
2350    use crate::{IntoHandler, Res, ResMut, Virtual, WorldBuilder};
2351
2352    // -- Linear chains --
2353
2354    #[test]
2355    fn dag_linear_2() {
2356        let mut wb = WorldBuilder::new();
2357        wb.register::<u64>(0);
2358        let mut world = wb.build();
2359        let reg = world.registry();
2360
2361        fn root_mul2(x: u32) -> u64 {
2362            x as u64 * 2
2363        }
2364        fn store(mut out: ResMut<u64>, val: &u64) {
2365            *out = *val;
2366        }
2367
2368        let mut dag = DagStart::<u32>::new()
2369            .root(root_mul2, reg)
2370            .then(store, reg)
2371            .build();
2372
2373        dag.run(&mut world, 5u32);
2374        assert_eq!(*world.resource::<u64>(), 10);
2375    }
2376
2377    #[test]
2378    fn dag_linear_3() {
2379        let mut wb = WorldBuilder::new();
2380        wb.register::<u64>(0);
2381        let mut world = wb.build();
2382        let reg = world.registry();
2383
2384        fn root_mul2(x: u32) -> u64 {
2385            x as u64 * 2
2386        }
2387        fn add_one(val: &u64) -> u64 {
2388            *val + 1
2389        }
2390        fn store(mut out: ResMut<u64>, val: &u64) {
2391            *out = *val;
2392        }
2393
2394        let mut dag = DagStart::<u32>::new()
2395            .root(root_mul2, reg)
2396            .then(add_one, reg)
2397            .then(store, reg)
2398            .build();
2399
2400        dag.run(&mut world, 5u32);
2401        assert_eq!(*world.resource::<u64>(), 11); // (5*2)+1
2402    }
2403
2404    #[test]
2405    fn dag_linear_5() {
2406        let mut wb = WorldBuilder::new();
2407        wb.register::<u64>(0);
2408        let mut world = wb.build();
2409        let reg = world.registry();
2410
2411        fn root_id(x: u32) -> u64 {
2412            x as u64
2413        }
2414        fn add_one(val: &u64) -> u64 {
2415            *val + 1
2416        }
2417        fn store(mut out: ResMut<u64>, val: &u64) {
2418            *out = *val;
2419        }
2420
2421        let mut dag = DagStart::<u32>::new()
2422            .root(root_id, reg)
2423            .then(add_one, reg)
2424            .then(add_one, reg)
2425            .then(add_one, reg)
2426            .then(store, reg)
2427            .build();
2428
2429        dag.run(&mut world, 0u32);
2430        assert_eq!(*world.resource::<u64>(), 3); // 0+1+1+1
2431    }
2432
2433    // -- Diamond: root → [a, b] → merge → sink --
2434
2435    #[test]
2436    fn dag_diamond() {
2437        let mut wb = WorldBuilder::new();
2438        wb.register::<u64>(0);
2439        let mut world = wb.build();
2440        let reg = world.registry();
2441
2442        fn root_mul2(x: u32) -> u32 {
2443            x.wrapping_mul(2)
2444        }
2445        fn add_one(val: &u32) -> u32 {
2446            val.wrapping_add(1)
2447        }
2448        fn mul3(val: &u32) -> u32 {
2449            val.wrapping_mul(3)
2450        }
2451        fn merge_add(a: &u32, b: &u32) -> u32 {
2452            a.wrapping_add(*b)
2453        }
2454        fn store(mut out: ResMut<u64>, val: &u32) {
2455            *out = *val as u64;
2456        }
2457
2458        let mut dag = DagStart::<u32>::new()
2459            .root(root_mul2, reg)
2460            .fork()
2461            .arm(|a| a.then(add_one, reg))
2462            .arm(|b| b.then(mul3, reg))
2463            .merge(merge_add, reg)
2464            .then(store, reg)
2465            .build();
2466
2467        dag.run(&mut world, 5u32);
2468        // root: 10, arm_a: 11, arm_b: 30, merge: 41
2469        assert_eq!(*world.resource::<u64>(), 41);
2470    }
2471
2472    // -- Fan-out to sinks (.join()) --
2473
2474    #[test]
2475    fn dag_fan_out_join() {
2476        let mut wb = WorldBuilder::new();
2477        wb.register::<u64>(0);
2478        wb.register::<i64>(0);
2479        let mut world = wb.build();
2480        let reg = world.registry();
2481
2482        fn root_id(x: u32) -> u64 {
2483            x as u64
2484        }
2485        fn sink_u64(mut out: ResMut<u64>, val: &u64) {
2486            *out = *val * 2;
2487        }
2488        fn sink_i64(mut out: ResMut<i64>, val: &u64) {
2489            *out = *val as i64 * 3;
2490        }
2491
2492        let mut dag = DagStart::<u32>::new()
2493            .root(root_id, reg)
2494            .fork()
2495            .arm(|a| a.then(sink_u64, reg))
2496            .arm(|b| b.then(sink_i64, reg))
2497            .join()
2498            .build();
2499
2500        dag.run(&mut world, 5u32);
2501        assert_eq!(*world.resource::<u64>(), 10);
2502        assert_eq!(*world.resource::<i64>(), 15);
2503    }
2504
2505    // -- Nested fork within an arm --
2506
2507    #[test]
2508    fn dag_nested_fork() {
2509        let mut wb = WorldBuilder::new();
2510        wb.register::<u64>(0);
2511        let mut world = wb.build();
2512        let reg = world.registry();
2513
2514        fn root_id(x: u32) -> u32 {
2515            x
2516        }
2517        fn add_10(val: &u32) -> u32 {
2518            val.wrapping_add(10)
2519        }
2520        fn mul2(val: &u32) -> u32 {
2521            val.wrapping_mul(2)
2522        }
2523        fn mul3(val: &u32) -> u32 {
2524            val.wrapping_mul(3)
2525        }
2526        fn inner_merge(a: &u32, b: &u32) -> u32 {
2527            a.wrapping_add(*b)
2528        }
2529        fn outer_merge(a: &u32, b: &u32) -> u32 {
2530            a.wrapping_add(*b)
2531        }
2532        fn store(mut out: ResMut<u64>, val: &u32) {
2533            *out = *val as u64;
2534        }
2535
2536        // root(5)=5 → fork
2537        //   arm_a: add_10(5)=15 → fork
2538        //     sub_c: mul2(15)=30
2539        //     sub_d: mul3(15)=45
2540        //     inner_merge(30,45)=75
2541        //   arm_b: mul3(5)=15
2542        // outer_merge(75,15)=90
2543        let mut dag = DagStart::<u32>::new()
2544            .root(root_id, reg)
2545            .fork()
2546            .arm(|a| {
2547                a.then(add_10, reg)
2548                    .fork()
2549                    .arm(|c| c.then(mul2, reg))
2550                    .arm(|d| d.then(mul3, reg))
2551                    .merge(inner_merge, reg)
2552            })
2553            .arm(|b| b.then(mul3, reg))
2554            .merge(outer_merge, reg)
2555            .then(store, reg)
2556            .build();
2557
2558        dag.run(&mut world, 5u32);
2559        assert_eq!(*world.resource::<u64>(), 90);
2560    }
2561
2562    // -- Complex topology: asymmetric arm lengths --
2563
2564    #[test]
2565    fn dag_complex_topology() {
2566        let mut wb = WorldBuilder::new();
2567        wb.register::<u64>(0);
2568        let mut world = wb.build();
2569        let reg = world.registry();
2570
2571        fn root_mul2(x: u32) -> u32 {
2572            x.wrapping_mul(2)
2573        }
2574        fn add_one(val: &u32) -> u32 {
2575            val.wrapping_add(1)
2576        }
2577        fn add_then_mul2(val: &u32) -> u32 {
2578            val.wrapping_add(1).wrapping_mul(2)
2579        }
2580        fn mul3(val: &u32) -> u32 {
2581            val.wrapping_mul(3)
2582        }
2583        fn merge_add(a: &u32, b: &u32) -> u32 {
2584            a.wrapping_add(*b)
2585        }
2586        fn store(mut out: ResMut<u64>, val: &u32) {
2587            *out = *val as u64;
2588        }
2589
2590        // root(5)=10 → fork
2591        //   a: add_one(10)=11 → add_then_mul2(11)=24
2592        //   b: mul3(10)=30
2593        // merge(24, 30) = 54
2594        let mut dag = DagStart::<u32>::new()
2595            .root(root_mul2, reg)
2596            .fork()
2597            .arm(|a| a.then(add_one, reg).then(add_then_mul2, reg))
2598            .arm(|b| b.then(mul3, reg))
2599            .merge(merge_add, reg)
2600            .then(store, reg)
2601            .build();
2602
2603        dag.run(&mut world, 5u32);
2604        assert_eq!(*world.resource::<u64>(), 54);
2605    }
2606
2607    // -- Boxable into Box<dyn Handler<E>> --
2608
2609    #[test]
2610    fn dag_boxable() {
2611        let mut wb = WorldBuilder::new();
2612        wb.register::<u64>(0);
2613        let mut world = wb.build();
2614        let reg = world.registry();
2615
2616        fn root_id(x: u32) -> u64 {
2617            x as u64
2618        }
2619        fn store(mut out: ResMut<u64>, val: &u64) {
2620            *out = *val;
2621        }
2622
2623        let mut boxed: Virtual<u32> = Box::new(
2624            DagStart::<u32>::new()
2625                .root(root_id, reg)
2626                .then(store, reg)
2627                .build(),
2628        );
2629        boxed.run(&mut world, 77u32);
2630        assert_eq!(*world.resource::<u64>(), 77);
2631    }
2632
2633    // -- World access (Res<T>, ResMut<T>) in nodes --
2634
2635    #[test]
2636    fn dag_world_access() {
2637        let mut wb = WorldBuilder::new();
2638        wb.register::<u64>(10); // factor
2639        wb.register::<String>(String::new());
2640        let mut world = wb.build();
2641        let reg = world.registry();
2642
2643        fn scale(factor: Res<u64>, val: &u32) -> u64 {
2644            *factor * (*val as u64)
2645        }
2646        fn store(mut out: ResMut<String>, val: &u64) {
2647            *out = val.to_string();
2648        }
2649
2650        let mut dag = DagStart::<u32>::new()
2651            .root(|x: u32| x, reg)
2652            .then(scale, reg)
2653            .then(store, reg)
2654            .build();
2655
2656        dag.run(&mut world, 7u32);
2657        assert_eq!(world.resource::<String>().as_str(), "70");
2658    }
2659
2660    // -- Root-only (terminal root outputting ()) --
2661
2662    #[test]
2663    fn dag_root_only() {
2664        let mut wb = WorldBuilder::new();
2665        wb.register::<u64>(0);
2666        let mut world = wb.build();
2667        let reg = world.registry();
2668
2669        let mut dag = DagStart::<u32>::new()
2670            .root(
2671                |mut out: ResMut<u64>, x: u32| {
2672                    *out = x as u64;
2673                },
2674                reg,
2675            )
2676            .build();
2677
2678        dag.run(&mut world, 42u32);
2679        assert_eq!(*world.resource::<u64>(), 42);
2680    }
2681
2682    // -- Multiple dispatches reuse state --
2683
2684    #[test]
2685    fn dag_multiple_dispatches() {
2686        let mut wb = WorldBuilder::new();
2687        wb.register::<u64>(0);
2688        let mut world = wb.build();
2689        let reg = world.registry();
2690
2691        fn root_id(x: u32) -> u64 {
2692            x as u64
2693        }
2694        fn store(mut out: ResMut<u64>, val: &u64) {
2695            *out = *val;
2696        }
2697
2698        let mut dag = DagStart::<u32>::new()
2699            .root(root_id, reg)
2700            .then(store, reg)
2701            .build();
2702
2703        dag.run(&mut world, 1u32);
2704        assert_eq!(*world.resource::<u64>(), 1);
2705        dag.run(&mut world, 2u32);
2706        assert_eq!(*world.resource::<u64>(), 2);
2707        dag.run(&mut world, 3u32);
2708        assert_eq!(*world.resource::<u64>(), 3);
2709    }
2710
2711    // -- 3-way merge --
2712
2713    #[test]
2714    fn dag_3way_merge() {
2715        let mut wb = WorldBuilder::new();
2716        wb.register::<String>(String::new());
2717        let mut world = wb.build();
2718        let reg = world.registry();
2719
2720        fn root_id(x: u32) -> u64 {
2721            x as u64
2722        }
2723        fn mul1(val: &u64) -> u64 {
2724            *val
2725        }
2726        fn mul2(val: &u64) -> u64 {
2727            *val * 2
2728        }
2729        fn mul3(val: &u64) -> u64 {
2730            *val * 3
2731        }
2732        fn merge3_fmt(mut out: ResMut<String>, a: &u64, b: &u64, c: &u64) {
2733            *out = format!("{},{},{}", a, b, c);
2734        }
2735
2736        let mut dag = DagStart::<u32>::new()
2737            .root(root_id, reg)
2738            .fork()
2739            .arm(|a| a.then(mul1, reg))
2740            .arm(|b| b.then(mul2, reg))
2741            .arm(|c| c.then(mul3, reg))
2742            .merge(merge3_fmt, reg)
2743            .build();
2744
2745        dag.run(&mut world, 10u32);
2746        assert_eq!(world.resource::<String>().as_str(), "10,20,30");
2747    }
2748
2749    // -- DAG combinators --
2750
2751    #[test]
2752    fn dag_dispatch() {
2753        fn root(x: u32) -> u64 {
2754            x as u64 + 42
2755        }
2756        fn sink(mut out: ResMut<u64>, event: u64) {
2757            *out = event;
2758        }
2759        let mut wb = WorldBuilder::new();
2760        wb.register::<u64>(0);
2761        let mut world = wb.build();
2762        let reg = world.registry();
2763
2764        let mut dag = DagStart::<u32>::new()
2765            .root(root, reg)
2766            .dispatch(sink.into_handler(reg))
2767            .build();
2768
2769        dag.run(&mut world, 0u32);
2770        assert_eq!(*world.resource::<u64>(), 42);
2771    }
2772
2773    #[test]
2774    fn dag_option_map() {
2775        fn root(_x: u32) -> Option<u64> {
2776            Some(10)
2777        }
2778        fn double(val: &u64) -> u64 {
2779            *val * 2
2780        }
2781        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2782            *out = val.unwrap_or(0);
2783        }
2784        let mut wb = WorldBuilder::new();
2785        wb.register::<u64>(0);
2786        let mut world = wb.build();
2787        let reg = world.registry();
2788
2789        let mut dag = DagStart::<u32>::new()
2790            .root(root, reg)
2791            .map(double, reg)
2792            .then(sink, reg)
2793            .build();
2794
2795        dag.run(&mut world, 0u32);
2796        assert_eq!(*world.resource::<u64>(), 20);
2797    }
2798
2799    #[test]
2800    fn dag_option_map_none() {
2801        fn root(_x: u32) -> Option<u64> {
2802            None
2803        }
2804        fn double(val: &u64) -> u64 {
2805            *val * 2
2806        }
2807        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2808            *out = val.unwrap_or(999);
2809        }
2810        let mut wb = WorldBuilder::new();
2811        wb.register::<u64>(0);
2812        let mut world = wb.build();
2813        let reg = world.registry();
2814
2815        let mut dag = DagStart::<u32>::new()
2816            .root(root, reg)
2817            .map(double, reg)
2818            .then(sink, reg)
2819            .build();
2820
2821        dag.run(&mut world, 0u32);
2822        assert_eq!(*world.resource::<u64>(), 999);
2823    }
2824
2825    #[test]
2826    fn dag_option_and_then() {
2827        fn root(_x: u32) -> Option<u64> {
2828            Some(5)
2829        }
2830        fn check(val: &u64) -> Option<u64> {
2831            if *val > 3 { Some(*val * 10) } else { None }
2832        }
2833        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2834            *out = val.unwrap_or(0);
2835        }
2836        let mut wb = WorldBuilder::new();
2837        wb.register::<u64>(0);
2838        let mut world = wb.build();
2839        let reg = world.registry();
2840
2841        let mut dag = DagStart::<u32>::new()
2842            .root(root, reg)
2843            .and_then(check, reg)
2844            .then(sink, reg)
2845            .build();
2846
2847        dag.run(&mut world, 0u32);
2848        assert_eq!(*world.resource::<u64>(), 50);
2849    }
2850
2851    #[test]
2852    fn dag_option_filter_keeps() {
2853        fn root(_x: u32) -> Option<u64> {
2854            Some(5)
2855        }
2856        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2857            *out = val.unwrap_or(0);
2858        }
2859        let mut wb = WorldBuilder::new();
2860        wb.register::<u64>(0);
2861        let mut world = wb.build();
2862
2863        let mut dag = DagStart::<u32>::new()
2864            .root(root, world.registry())
2865            .filter(|_w, v: &u64| *v > 3)
2866            .then(sink, world.registry())
2867            .build();
2868
2869        dag.run(&mut world, 0u32);
2870        assert_eq!(*world.resource::<u64>(), 5);
2871    }
2872
2873    #[test]
2874    fn dag_option_filter_drops() {
2875        fn root(_x: u32) -> Option<u64> {
2876            Some(5)
2877        }
2878        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
2879            *out = val.unwrap_or(0);
2880        }
2881        let mut wb = WorldBuilder::new();
2882        wb.register::<u64>(0);
2883        let mut world = wb.build();
2884
2885        let mut dag = DagStart::<u32>::new()
2886            .root(root, world.registry())
2887            .filter(|_w, v: &u64| *v > 10)
2888            .then(sink, world.registry())
2889            .build();
2890
2891        dag.run(&mut world, 0u32);
2892        assert_eq!(*world.resource::<u64>(), 0);
2893    }
2894
2895    #[test]
2896    fn dag_option_on_none() {
2897        fn root(_x: u32) -> Option<u64> {
2898            None
2899        }
2900        fn sink(_val: &Option<u64>) {}
2901        let mut wb = WorldBuilder::new();
2902        wb.register::<bool>(false);
2903        let mut world = wb.build();
2904        let reg = world.registry();
2905
2906        let mut dag = DagStart::<u32>::new()
2907            .root(root, reg)
2908            .on_none(|w: &mut World| {
2909                *w.resource_mut::<bool>() = true;
2910            })
2911            .then(sink, reg)
2912            .build();
2913
2914        dag.run(&mut world, 0u32);
2915        assert!(*world.resource::<bool>());
2916    }
2917
2918    #[test]
2919    fn dag_option_unwrap_or() {
2920        fn root(_x: u32) -> Option<u64> {
2921            None
2922        }
2923        fn sink(mut out: ResMut<u64>, val: &u64) {
2924            *out = *val;
2925        }
2926        let mut wb = WorldBuilder::new();
2927        wb.register::<u64>(0);
2928        let mut world = wb.build();
2929        let reg = world.registry();
2930
2931        let mut dag = DagStart::<u32>::new()
2932            .root(root, reg)
2933            .unwrap_or(42u64)
2934            .then(sink, reg)
2935            .build();
2936
2937        dag.run(&mut world, 0u32);
2938        assert_eq!(*world.resource::<u64>(), 42);
2939    }
2940
2941    #[test]
2942    fn dag_option_ok_or() {
2943        fn root(_x: u32) -> Option<u64> {
2944            None
2945        }
2946        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2947            *out = match val {
2948                Ok(v) => *v,
2949                Err(_) => 999,
2950            };
2951        }
2952        let mut wb = WorldBuilder::new();
2953        wb.register::<u64>(0);
2954        let mut world = wb.build();
2955        let reg = world.registry();
2956
2957        let mut dag = DagStart::<u32>::new()
2958            .root(root, reg)
2959            .ok_or("missing")
2960            .then(sink, reg)
2961            .build();
2962
2963        dag.run(&mut world, 0u32);
2964        assert_eq!(*world.resource::<u64>(), 999);
2965    }
2966
2967    #[test]
2968    fn dag_result_map() {
2969        fn root(_x: u32) -> Result<u64, &'static str> {
2970            Ok(10)
2971        }
2972        fn double(val: &u64) -> u64 {
2973            *val * 2
2974        }
2975        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
2976            *out = val.as_ref().copied().unwrap_or(0);
2977        }
2978        let mut wb = WorldBuilder::new();
2979        wb.register::<u64>(0);
2980        let mut world = wb.build();
2981        let reg = world.registry();
2982
2983        let mut dag = DagStart::<u32>::new()
2984            .root(root, reg)
2985            .map(double, reg)
2986            .then(sink, reg)
2987            .build();
2988
2989        dag.run(&mut world, 0u32);
2990        assert_eq!(*world.resource::<u64>(), 20);
2991    }
2992
2993    #[test]
2994    fn dag_result_and_then() {
2995        fn root(_x: u32) -> Result<u64, &'static str> {
2996            Ok(5)
2997        }
2998        fn check(val: &u64) -> Result<u64, &'static str> {
2999            if *val > 3 {
3000                Ok(*val * 10)
3001            } else {
3002                Err("too small")
3003            }
3004        }
3005        fn sink(mut out: ResMut<u64>, val: &Result<u64, &str>) {
3006            *out = val.as_ref().copied().unwrap_or(0);
3007        }
3008        let mut wb = WorldBuilder::new();
3009        wb.register::<u64>(0);
3010        let mut world = wb.build();
3011        let reg = world.registry();
3012
3013        let mut dag = DagStart::<u32>::new()
3014            .root(root, reg)
3015            .and_then(check, reg)
3016            .then(sink, reg)
3017            .build();
3018
3019        dag.run(&mut world, 0u32);
3020        assert_eq!(*world.resource::<u64>(), 50);
3021    }
3022
3023    #[test]
3024    fn dag_result_catch() {
3025        fn root(_x: u32) -> Result<u64, String> {
3026            Err("oops".into())
3027        }
3028        fn handle_err(mut log: ResMut<String>, err: &String) {
3029            *log = err.clone();
3030        }
3031        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3032            *out = val.unwrap_or(0);
3033        }
3034        let mut wb = WorldBuilder::new();
3035        wb.register::<u64>(0);
3036        wb.register::<String>(String::new());
3037        let mut world = wb.build();
3038        let reg = world.registry();
3039
3040        let mut dag = DagStart::<u32>::new()
3041            .root(root, reg)
3042            .catch(handle_err, reg)
3043            .then(sink, reg)
3044            .build();
3045
3046        dag.run(&mut world, 0u32);
3047        assert_eq!(*world.resource::<u64>(), 0);
3048        assert_eq!(world.resource::<String>().as_str(), "oops");
3049    }
3050
3051    #[test]
3052    fn dag_result_ok() {
3053        fn root(_x: u32) -> Result<u64, &'static str> {
3054            Err("fail")
3055        }
3056        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3057            *out = val.unwrap_or(0);
3058        }
3059        let mut wb = WorldBuilder::new();
3060        wb.register::<u64>(0);
3061        let mut world = wb.build();
3062        let reg = world.registry();
3063
3064        let mut dag = DagStart::<u32>::new()
3065            .root(root, reg)
3066            .ok()
3067            .then(sink, reg)
3068            .build();
3069
3070        dag.run(&mut world, 0u32);
3071        assert_eq!(*world.resource::<u64>(), 0);
3072    }
3073
3074    #[test]
3075    fn dag_result_unwrap_or_else() {
3076        fn root(_x: u32) -> Result<u64, &'static str> {
3077            Err("fail")
3078        }
3079        fn sink(mut out: ResMut<u64>, val: &u64) {
3080            *out = *val;
3081        }
3082        let mut wb = WorldBuilder::new();
3083        wb.register::<u64>(0);
3084        let mut world = wb.build();
3085        let reg = world.registry();
3086
3087        let mut dag = DagStart::<u32>::new()
3088            .root(root, reg)
3089            .unwrap_or_else(|_w, _err| 42u64)
3090            .then(sink, reg)
3091            .build();
3092
3093        dag.run(&mut world, 0u32);
3094        assert_eq!(*world.resource::<u64>(), 42);
3095    }
3096
3097    #[test]
3098    fn dag_result_map_err() {
3099        fn root(_x: u32) -> Result<u64, u32> {
3100            Err(5)
3101        }
3102        fn sink(mut out: ResMut<u64>, val: &Result<u64, String>) {
3103            *out = match val {
3104                Ok(v) => *v,
3105                Err(e) => e.len() as u64,
3106            };
3107        }
3108        let mut wb = WorldBuilder::new();
3109        wb.register::<u64>(0);
3110        let mut world = wb.build();
3111        let reg = world.registry();
3112
3113        let mut dag = DagStart::<u32>::new()
3114            .root(root, reg)
3115            .map_err(|_w, e: u32| format!("err:{e}"))
3116            .then(sink, reg)
3117            .build();
3118
3119        dag.run(&mut world, 0u32);
3120        // "err:5".len() == 5
3121        assert_eq!(*world.resource::<u64>(), 5);
3122    }
3123
3124    #[test]
3125    fn dag_arm_combinators() {
3126        fn root(x: u32) -> u64 {
3127            x as u64 + 10
3128        }
3129        fn arm_step(val: &u64) -> Option<u64> {
3130            if *val > 5 { Some(*val * 3) } else { None }
3131        }
3132        fn double(val: &u64) -> u64 {
3133            *val * 2
3134        }
3135        fn merge_fn(a: &u64, b: &u64) -> String {
3136            format!("{a},{b}")
3137        }
3138        fn sink(mut out: ResMut<String>, val: &String) {
3139            *out = val.clone();
3140        }
3141        let mut wb = WorldBuilder::new();
3142        wb.register::<String>(String::new());
3143        let mut world = wb.build();
3144        let reg = world.registry();
3145
3146        // Arm 0: root → arm_step (Option) → unwrap_or(0)
3147        // Arm 1: root → double
3148        let mut dag = DagStart::<u32>::new()
3149            .root(root, reg)
3150            .fork()
3151            .arm(|a| a.then(arm_step, reg).unwrap_or(0u64))
3152            .arm(|b| b.then(double, reg))
3153            .merge(merge_fn, reg)
3154            .then(sink, reg)
3155            .build();
3156
3157        dag.run(&mut world, 0u32);
3158        // root(0) = 10
3159        // arm0: 10 > 5 → Some(30) → unwrap → 30
3160        // arm1: 10 * 2 = 20
3161        assert_eq!(world.resource::<String>().as_str(), "30,20");
3162    }
3163
3164    #[test]
3165    fn dag_option_inspect() {
3166        fn root(_x: u32) -> Option<u64> {
3167            Some(42)
3168        }
3169        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3170            *out = val.unwrap_or(0);
3171        }
3172        let mut wb = WorldBuilder::new();
3173        wb.register::<u64>(0);
3174        wb.register::<bool>(false);
3175        let mut world = wb.build();
3176        let reg = world.registry();
3177
3178        let mut dag = DagStart::<u32>::new()
3179            .root(root, reg)
3180            .inspect(|w: &mut World, _val: &u64| {
3181                *w.resource_mut::<bool>() = true;
3182            })
3183            .then(sink, reg)
3184            .build();
3185
3186        dag.run(&mut world, 0u32);
3187        assert_eq!(*world.resource::<u64>(), 42);
3188        assert!(*world.resource::<bool>());
3189    }
3190
3191    // -- Guard combinator --
3192
3193    #[test]
3194    fn dag_guard_keeps() {
3195        fn root(x: u32) -> u64 {
3196            x as u64
3197        }
3198        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3199            *out = val.unwrap_or(0);
3200        }
3201        let mut wb = WorldBuilder::new();
3202        wb.register::<u64>(0);
3203        let mut world = wb.build();
3204        let reg = world.registry();
3205
3206        let mut dag = DagStart::<u32>::new()
3207            .root(root, reg)
3208            .guard(|_w, v| *v > 3)
3209            .then(sink, reg)
3210            .build();
3211
3212        dag.run(&mut world, 5u32);
3213        assert_eq!(*world.resource::<u64>(), 5);
3214    }
3215
3216    #[test]
3217    fn dag_guard_drops() {
3218        fn root(x: u32) -> u64 {
3219            x as u64
3220        }
3221        fn sink(mut out: ResMut<u64>, val: &Option<u64>) {
3222            *out = val.unwrap_or(999);
3223        }
3224        let mut wb = WorldBuilder::new();
3225        wb.register::<u64>(0);
3226        let mut world = wb.build();
3227        let reg = world.registry();
3228
3229        let mut dag = DagStart::<u32>::new()
3230            .root(root, reg)
3231            .guard(|_w, v| *v > 10)
3232            .then(sink, reg)
3233            .build();
3234
3235        dag.run(&mut world, 5u32);
3236        assert_eq!(*world.resource::<u64>(), 999);
3237    }
3238
3239    #[test]
3240    fn dag_arm_guard() {
3241        fn root(x: u32) -> u64 {
3242            x as u64
3243        }
3244        fn double(val: &u64) -> u64 {
3245            *val * 2
3246        }
3247        fn merge_fn(a: &Option<u64>, b: &u64) -> String {
3248            format!("{:?},{}", a, b)
3249        }
3250        fn sink(mut out: ResMut<String>, val: &String) {
3251            *out = val.clone();
3252        }
3253        let mut wb = WorldBuilder::new();
3254        wb.register::<String>(String::new());
3255        let mut world = wb.build();
3256        let reg = world.registry();
3257
3258        // arm_a: guard drops (5 < 10), arm_b: runs normally
3259        let mut dag = DagStart::<u32>::new()
3260            .root(root, reg)
3261            .fork()
3262            .arm(|a| a.then(double, reg).guard(|_w, v| *v > 100))
3263            .arm(|b| b.then(double, reg))
3264            .merge(merge_fn, reg)
3265            .then(sink, reg)
3266            .build();
3267
3268        dag.run(&mut world, 5u32);
3269        // arm_a: 10, guard fails → None. arm_b: 10.
3270        assert_eq!(world.resource::<String>().as_str(), "None,10");
3271    }
3272
3273    // -- Tap combinator --
3274
3275    #[test]
3276    fn dag_tap_observes_without_changing() {
3277        fn root(x: u32) -> u64 {
3278            x as u64 * 2
3279        }
3280        fn sink(mut out: ResMut<u64>, val: &u64) {
3281            *out = *val;
3282        }
3283        let mut wb = WorldBuilder::new();
3284        wb.register::<u64>(0);
3285        wb.register::<bool>(false);
3286        let mut world = wb.build();
3287        let reg = world.registry();
3288
3289        let mut dag = DagStart::<u32>::new()
3290            .root(root, reg)
3291            .tap(|w, val| {
3292                // Side-effect: record that we observed the value.
3293                *w.resource_mut::<bool>() = *val == 10;
3294            })
3295            .then(sink, reg)
3296            .build();
3297
3298        dag.run(&mut world, 5u32);
3299        assert_eq!(*world.resource::<u64>(), 10); // value passed through
3300        assert!(*world.resource::<bool>()); // tap fired
3301    }
3302
3303    #[test]
3304    fn dag_arm_tap() {
3305        fn root(x: u32) -> u64 {
3306            x as u64
3307        }
3308        fn double(val: &u64) -> u64 {
3309            *val * 2
3310        }
3311        fn merge_add(a: &u64, b: &u64) -> u64 {
3312            *a + *b
3313        }
3314        fn sink(mut out: ResMut<u64>, val: &u64) {
3315            *out = *val;
3316        }
3317        let mut wb = WorldBuilder::new();
3318        wb.register::<u64>(0);
3319        wb.register::<bool>(false);
3320        let mut world = wb.build();
3321        let reg = world.registry();
3322
3323        let mut dag = DagStart::<u32>::new()
3324            .root(root, reg)
3325            .fork()
3326            .arm(|a| {
3327                a.then(double, reg).tap(|w, _v| {
3328                    *w.resource_mut::<bool>() = true;
3329                })
3330            })
3331            .arm(|b| b.then(double, reg))
3332            .merge(merge_add, reg)
3333            .then(sink, reg)
3334            .build();
3335
3336        dag.run(&mut world, 5u32);
3337        // arm_a: 10, arm_b: 10, merge: 20
3338        assert_eq!(*world.resource::<u64>(), 20);
3339        assert!(*world.resource::<bool>()); // tap in arm_a fired
3340    }
3341
3342    // -- Route combinator --
3343
3344    #[test]
3345    fn dag_route_true_arm() {
3346        fn root(x: u32) -> u64 {
3347            x as u64
3348        }
3349        fn double(val: &u64) -> u64 {
3350            *val * 2
3351        }
3352        fn triple(val: &u64) -> u64 {
3353            *val * 3
3354        }
3355        fn sink(mut out: ResMut<u64>, val: &u64) {
3356            *out = *val;
3357        }
3358        let mut wb = WorldBuilder::new();
3359        wb.register::<u64>(0);
3360        let mut world = wb.build();
3361        let reg = world.registry();
3362
3363        let arm_t = DagArmStart::new().then(double, reg);
3364        let arm_f = DagArmStart::new().then(triple, reg);
3365
3366        let mut dag = DagStart::<u32>::new()
3367            .root(root, reg)
3368            .route(|_w, v| *v > 3, arm_t, arm_f)
3369            .then(sink, reg)
3370            .build();
3371
3372        dag.run(&mut world, 5u32); // 5 > 3 → true arm → double → 10
3373        assert_eq!(*world.resource::<u64>(), 10);
3374    }
3375
3376    #[test]
3377    fn dag_route_false_arm() {
3378        fn root(x: u32) -> u64 {
3379            x as u64
3380        }
3381        fn double(val: &u64) -> u64 {
3382            *val * 2
3383        }
3384        fn triple(val: &u64) -> u64 {
3385            *val * 3
3386        }
3387        fn sink(mut out: ResMut<u64>, val: &u64) {
3388            *out = *val;
3389        }
3390        let mut wb = WorldBuilder::new();
3391        wb.register::<u64>(0);
3392        let mut world = wb.build();
3393        let reg = world.registry();
3394
3395        let arm_t = DagArmStart::new().then(double, reg);
3396        let arm_f = DagArmStart::new().then(triple, reg);
3397
3398        let mut dag = DagStart::<u32>::new()
3399            .root(root, reg)
3400            .route(|_w, v| *v > 10, arm_t, arm_f)
3401            .then(sink, reg)
3402            .build();
3403
3404        dag.run(&mut world, 5u32); // 5 <= 10 → false arm → triple → 15
3405        assert_eq!(*world.resource::<u64>(), 15);
3406    }
3407
3408    #[test]
3409    fn dag_route_nested() {
3410        fn root(x: u32) -> u64 {
3411            x as u64
3412        }
3413        fn pass(val: &u64) -> u64 {
3414            *val
3415        }
3416        fn add_100(val: &u64) -> u64 {
3417            *val + 100
3418        }
3419        fn add_200(val: &u64) -> u64 {
3420            *val + 200
3421        }
3422        fn add_300(val: &u64) -> u64 {
3423            *val + 300
3424        }
3425        fn sink(mut out: ResMut<u64>, val: &u64) {
3426            *out = *val;
3427        }
3428        let mut wb = WorldBuilder::new();
3429        wb.register::<u64>(0);
3430        let mut world = wb.build();
3431        let reg = world.registry();
3432
3433        // N-ary via nesting: <5 → +100, 5..10 → +200, >=10 → +300
3434        let inner_t = DagArmStart::new().then(add_200, reg);
3435        let inner_f = DagArmStart::new().then(add_300, reg);
3436        let outer_t = DagArmStart::new().then(add_100, reg);
3437        let outer_f = DagArmStart::new()
3438            .then(pass, reg)
3439            .route(|_w, v| *v < 10, inner_t, inner_f);
3440
3441        let mut dag = DagStart::<u32>::new()
3442            .root(root, reg)
3443            .route(|_w, v| *v < 5, outer_t, outer_f)
3444            .then(sink, reg)
3445            .build();
3446
3447        dag.run(&mut world, 3u32); // 3 < 5 → +100 → 103
3448        assert_eq!(*world.resource::<u64>(), 103);
3449
3450        dag.run(&mut world, 7u32); // 7 >= 5, 7 < 10 → +200 → 207
3451        assert_eq!(*world.resource::<u64>(), 207);
3452
3453        dag.run(&mut world, 15u32); // 15 >= 5, 15 >= 10 → +300 → 315
3454        assert_eq!(*world.resource::<u64>(), 315);
3455    }
3456
3457    // -- Tee combinator --
3458
3459    #[test]
3460    fn dag_tee_side_effect_chain() {
3461        fn root(x: u32) -> u64 {
3462            x as u64 * 2
3463        }
3464        fn log_step(mut counter: ResMut<u32>, _val: &u64) {
3465            *counter += 1;
3466        }
3467        fn sink(mut out: ResMut<u64>, val: &u64) {
3468            *out = *val;
3469        }
3470        let mut wb = WorldBuilder::new();
3471        wb.register::<u64>(0);
3472        wb.register::<u32>(0);
3473        let mut world = wb.build();
3474        let reg = world.registry();
3475
3476        let side = DagArmStart::new().then(log_step, reg);
3477
3478        let mut dag = DagStart::<u32>::new()
3479            .root(root, reg)
3480            .tee(side)
3481            .then(sink, reg)
3482            .build();
3483
3484        dag.run(&mut world, 5u32);
3485        assert_eq!(*world.resource::<u64>(), 10); // value passed through
3486        assert_eq!(*world.resource::<u32>(), 1); // side-effect fired
3487
3488        dag.run(&mut world, 7u32);
3489        assert_eq!(*world.resource::<u64>(), 14);
3490        assert_eq!(*world.resource::<u32>(), 2); // fired again
3491    }
3492
3493    // -- Dedup combinator --
3494
3495    #[test]
3496    fn dag_dedup_suppresses_unchanged() {
3497        fn root(x: u32) -> u64 {
3498            x as u64 / 2 // intentional integer division: 4→2, 5→2
3499        }
3500        fn sink(mut out: ResMut<u32>, val: &Option<u64>) {
3501            if val.is_some() {
3502                *out += 1;
3503            }
3504        }
3505        let mut wb = WorldBuilder::new();
3506        wb.register::<u32>(0);
3507        let mut world = wb.build();
3508        let reg = world.registry();
3509
3510        let mut dag = DagStart::<u32>::new()
3511            .root(root, reg)
3512            .dedup()
3513            .then(sink, reg)
3514            .build();
3515
3516        dag.run(&mut world, 4u32); // 2 — first, Some
3517        assert_eq!(*world.resource::<u32>(), 1);
3518
3519        dag.run(&mut world, 5u32); // 2 — same, None
3520        assert_eq!(*world.resource::<u32>(), 1);
3521
3522        dag.run(&mut world, 6u32); // 3 — changed, Some
3523        assert_eq!(*world.resource::<u32>(), 2);
3524    }
3525
3526    // -- Bool combinators --
3527
3528    #[test]
3529    fn dag_not() {
3530        fn root(x: u32) -> bool {
3531            x > 5
3532        }
3533        fn sink(mut out: ResMut<bool>, val: &bool) {
3534            *out = *val;
3535        }
3536        let mut wb = WorldBuilder::new();
3537        wb.register::<bool>(false);
3538        let mut world = wb.build();
3539        let reg = world.registry();
3540
3541        let mut dag = DagStart::<u32>::new()
3542            .root(root, reg)
3543            .not()
3544            .then(sink, reg)
3545            .build();
3546
3547        dag.run(&mut world, 3u32); // 3 > 5 = false, not = true
3548        assert!(*world.resource::<bool>());
3549
3550        dag.run(&mut world, 10u32); // 10 > 5 = true, not = false
3551        assert!(!*world.resource::<bool>());
3552    }
3553
3554    #[test]
3555    fn dag_and() {
3556        fn root(x: u32) -> bool {
3557            x > 5
3558        }
3559        fn sink(mut out: ResMut<bool>, val: &bool) {
3560            *out = *val;
3561        }
3562        let mut wb = WorldBuilder::new();
3563        wb.register::<bool>(true); // "market open" flag
3564        let mut world = wb.build();
3565        let reg = world.registry();
3566
3567        let mut dag = DagStart::<u32>::new()
3568            .root(root, reg)
3569            .and(|w| *w.resource::<bool>())
3570            .then(sink, reg)
3571            .build();
3572
3573        dag.run(&mut world, 10u32); // true && true = true
3574        assert!(*world.resource::<bool>());
3575
3576        *world.resource_mut::<bool>() = false; // close market
3577        dag.run(&mut world, 10u32); // true && false = false
3578        assert!(!*world.resource::<bool>());
3579    }
3580
3581    #[test]
3582    fn dag_or() {
3583        fn root(x: u32) -> bool {
3584            x > 5
3585        }
3586        fn sink(mut out: ResMut<bool>, val: &bool) {
3587            *out = *val;
3588        }
3589        let mut wb = WorldBuilder::new();
3590        wb.register::<bool>(false);
3591        let mut world = wb.build();
3592        let reg = world.registry();
3593
3594        let mut dag = DagStart::<u32>::new()
3595            .root(root, reg)
3596            .or(|w| *w.resource::<bool>())
3597            .then(sink, reg)
3598            .build();
3599
3600        dag.run(&mut world, 3u32); // false || false = false
3601        assert!(!*world.resource::<bool>());
3602
3603        *world.resource_mut::<bool>() = true;
3604        dag.run(&mut world, 3u32); // false || true = true
3605        assert!(*world.resource::<bool>());
3606    }
3607
3608    #[test]
3609    fn dag_xor() {
3610        fn root(x: u32) -> bool {
3611            x > 5
3612        }
3613        fn sink(mut out: ResMut<bool>, val: &bool) {
3614            *out = *val;
3615        }
3616        let mut wb = WorldBuilder::new();
3617        wb.register::<bool>(true);
3618        let mut world = wb.build();
3619        let reg = world.registry();
3620
3621        let mut dag = DagStart::<u32>::new()
3622            .root(root, reg)
3623            .xor(|w| *w.resource::<bool>())
3624            .then(sink, reg)
3625            .build();
3626
3627        dag.run(&mut world, 10u32); // true ^ true = false
3628        assert!(!*world.resource::<bool>());
3629    }
3630
3631    // =========================================================================
3632    // Splat — tuple destructuring
3633    // =========================================================================
3634
3635    #[test]
3636    fn dag_splat2_on_chain() {
3637        let mut wb = WorldBuilder::new();
3638        wb.register::<u64>(0);
3639        let mut world = wb.build();
3640        let reg = world.registry();
3641
3642        fn split(x: u32) -> (u32, u32) {
3643            (x, x * 2)
3644        }
3645        fn store(mut out: ResMut<u64>, a: &u32, b: &u32) {
3646            *out = *a as u64 + *b as u64;
3647        }
3648
3649        let mut dag = DagStart::<u32>::new()
3650            .root(split, reg)
3651            .splat()
3652            .then(store, reg)
3653            .build();
3654
3655        dag.run(&mut world, 5u32);
3656        assert_eq!(*world.resource::<u64>(), 15); // 5 + 10
3657    }
3658
3659    #[test]
3660    fn dag_splat3_on_chain() {
3661        let mut wb = WorldBuilder::new();
3662        wb.register::<u64>(0);
3663        let mut world = wb.build();
3664        let reg = world.registry();
3665
3666        fn split3(x: u32) -> (u32, u32, u32) {
3667            (x, x + 1, x + 2)
3668        }
3669        fn sum3(a: &u32, b: &u32, c: &u32) -> u64 {
3670            *a as u64 + *b as u64 + *c as u64
3671        }
3672        fn store(mut out: ResMut<u64>, val: &u64) {
3673            *out = *val;
3674        }
3675
3676        let mut dag = DagStart::<u32>::new()
3677            .root(split3, reg)
3678            .splat()
3679            .then(sum3, reg)
3680            .then(store, reg)
3681            .build();
3682
3683        dag.run(&mut world, 10u32);
3684        assert_eq!(*world.resource::<u64>(), 33); // 10+11+12
3685    }
3686
3687    #[test]
3688    fn dag_splat2_with_param() {
3689        let mut wb = WorldBuilder::new();
3690        wb.register::<u64>(100);
3691        let mut world = wb.build();
3692        let reg = world.registry();
3693
3694        fn split(x: u32) -> (u32, u32) {
3695            (x, x * 3)
3696        }
3697        fn add_base(base: Res<u64>, a: &u32, b: &u32) -> u64 {
3698            *base + *a as u64 + *b as u64
3699        }
3700        fn store(mut out: ResMut<u64>, val: &u64) {
3701            *out = *val;
3702        }
3703
3704        let mut dag = DagStart::<u32>::new()
3705            .root(split, reg)
3706            .splat()
3707            .then(add_base, reg)
3708            .then(store, reg)
3709            .build();
3710
3711        dag.run(&mut world, 5u32);
3712        assert_eq!(*world.resource::<u64>(), 120); // 100 + 5 + 15
3713    }
3714
3715    #[test]
3716    fn dag_splat_on_arm_start() {
3717        let mut wb = WorldBuilder::new();
3718        wb.register::<u64>(0);
3719        let mut world = wb.build();
3720        let reg = world.registry();
3721
3722        fn split(x: u32) -> (u32, u32) {
3723            (x, x + 10)
3724        }
3725        fn sum2(a: &u32, b: &u32) -> u64 {
3726            *a as u64 + *b as u64
3727        }
3728        fn identity(x: &(u32, u32)) -> u64 {
3729            x.0 as u64 * x.1 as u64
3730        }
3731        fn merge_add(a: &u64, b: &u64) -> u64 {
3732            *a + *b
3733        }
3734        fn store(mut out: ResMut<u64>, val: &u64) {
3735            *out = *val;
3736        }
3737
3738        let mut dag = DagStart::<u32>::new()
3739            .root(split, reg)
3740            .fork()
3741            .arm(|a| a.splat().then(sum2, reg))
3742            .arm(|b| b.then(identity, reg))
3743            .merge(merge_add, reg)
3744            .then(store, reg)
3745            .build();
3746
3747        dag.run(&mut world, 5u32);
3748        // arm_a: splat (5, 15) → sum2 = 20
3749        // arm_b: identity (5, 15) → 75
3750        // merge: 20 + 75 = 95
3751        assert_eq!(*world.resource::<u64>(), 95);
3752    }
3753
3754    #[test]
3755    fn dag_splat_on_arm() {
3756        let mut wb = WorldBuilder::new();
3757        wb.register::<u64>(0);
3758        let mut world = wb.build();
3759        let reg = world.registry();
3760
3761        fn root_id(x: u32) -> u32 {
3762            x
3763        }
3764        fn make_pair(val: &u32) -> (u32, u32) {
3765            (*val, *val + 100)
3766        }
3767        fn sum2(a: &u32, b: &u32) -> u64 {
3768            *a as u64 + *b as u64
3769        }
3770        fn double(val: &u32) -> u64 {
3771            *val as u64 * 2
3772        }
3773        fn merge_add(a: &u64, b: &u64) -> u64 {
3774            *a + *b
3775        }
3776        fn store(mut out: ResMut<u64>, val: &u64) {
3777            *out = *val;
3778        }
3779
3780        let mut dag = DagStart::<u32>::new()
3781            .root(root_id, reg)
3782            .fork()
3783            .arm(|a| a.then(make_pair, reg).splat().then(sum2, reg))
3784            .arm(|b| b.then(double, reg))
3785            .merge(merge_add, reg)
3786            .then(store, reg)
3787            .build();
3788
3789        dag.run(&mut world, 7u32);
3790        // arm_a: make_pair(7) = (7, 107), splat → sum2 = 114
3791        // arm_b: double(7) = 14
3792        // merge: 114 + 14 = 128
3793        assert_eq!(*world.resource::<u64>(), 128);
3794    }
3795
3796    #[test]
3797    fn dag_splat4_on_chain() {
3798        let mut wb = WorldBuilder::new();
3799        wb.register::<u64>(0);
3800        let mut world = wb.build();
3801        let reg = world.registry();
3802
3803        fn split4(x: u32) -> (u32, u32, u32, u32) {
3804            (x, x + 1, x + 2, x + 3)
3805        }
3806        fn sum4(a: &u32, b: &u32, c: &u32, d: &u32) -> u64 {
3807            (*a + *b + *c + *d) as u64
3808        }
3809        fn store(mut out: ResMut<u64>, val: &u64) {
3810            *out = *val;
3811        }
3812
3813        let mut dag = DagStart::<u32>::new()
3814            .root(split4, reg)
3815            .splat()
3816            .then(sum4, reg)
3817            .then(store, reg)
3818            .build();
3819
3820        dag.run(&mut world, 10u32);
3821        assert_eq!(*world.resource::<u64>(), 46); // 10+11+12+13
3822    }
3823
3824    #[test]
3825    fn dag_splat5_on_chain() {
3826        let mut wb = WorldBuilder::new();
3827        wb.register::<u64>(0);
3828        let mut world = wb.build();
3829        let reg = world.registry();
3830
3831        fn split5(x: u32) -> (u8, u8, u8, u8, u8) {
3832            let x = x as u8;
3833            (x, x + 1, x + 2, x + 3, x + 4)
3834        }
3835        fn sum5(a: &u8, b: &u8, c: &u8, d: &u8, e: &u8) -> u64 {
3836            (*a as u64) + (*b as u64) + (*c as u64) + (*d as u64) + (*e as u64)
3837        }
3838        fn store(mut out: ResMut<u64>, val: &u64) {
3839            *out = *val;
3840        }
3841
3842        let mut dag = DagStart::<u32>::new()
3843            .root(split5, reg)
3844            .splat()
3845            .then(sum5, reg)
3846            .then(store, reg)
3847            .build();
3848
3849        dag.run(&mut world, 1u32);
3850        assert_eq!(*world.resource::<u64>(), 15); // 1+2+3+4+5
3851    }
3852
3853    #[test]
3854    fn dag_splat_boxable() {
3855        let mut wb = WorldBuilder::new();
3856        wb.register::<u64>(0);
3857        let mut world = wb.build();
3858        let reg = world.registry();
3859
3860        fn split(x: u32) -> (u32, u32) {
3861            (x, x * 2)
3862        }
3863        fn store(mut out: ResMut<u64>, a: &u32, b: &u32) {
3864            *out = *a as u64 + *b as u64;
3865        }
3866
3867        let dag = DagStart::<u32>::new()
3868            .root(split, reg)
3869            .splat()
3870            .then(store, reg)
3871            .build();
3872
3873        let mut boxed: Virtual<u32> = Box::new(dag);
3874        boxed.run(&mut world, 5u32);
3875        assert_eq!(*world.resource::<u64>(), 15);
3876    }
3877
3878    // -- Batch DAG --
3879
3880    #[test]
3881    fn batch_dag_basic() {
3882        let mut wb = WorldBuilder::new();
3883        wb.register::<u64>(0);
3884        let mut world = wb.build();
3885        let reg = world.registry();
3886
3887        fn double(x: u32) -> u64 {
3888            x as u64 * 2
3889        }
3890        fn store(mut out: ResMut<u64>, val: &u64) {
3891            *out += *val;
3892        }
3893
3894        let mut batch = DagStart::<u32>::new()
3895            .root(double, reg)
3896            .then(store, reg)
3897            .build_batch(8);
3898
3899        batch.input_mut().extend([1, 2, 3]);
3900        batch.run(&mut world);
3901
3902        assert_eq!(*world.resource::<u64>(), 12); // 2 + 4 + 6
3903        assert!(batch.input().is_empty());
3904    }
3905
3906    #[test]
3907    fn batch_dag_option_terminal() {
3908        let mut wb = WorldBuilder::new();
3909        wb.register::<u64>(0);
3910        let mut world = wb.build();
3911        let reg = world.registry();
3912
3913        fn double(x: u32) -> u64 {
3914            x as u64 * 2
3915        }
3916        fn store(mut out: ResMut<u64>, val: &u64) {
3917            *out += *val;
3918        }
3919
3920        let mut batch = DagStart::<u32>::new()
3921            .root(double, reg)
3922            .guard(|_w, val| *val > 5)
3923            .map(store, reg)
3924            .unwrap_or(())
3925            .build_batch(8);
3926
3927        batch.input_mut().extend([1, 2, 3, 4, 5]);
3928        batch.run(&mut world);
3929
3930        // double: 2, 4, 6, 8, 10
3931        // guard keeps > 5: 6, 8, 10
3932        assert_eq!(*world.resource::<u64>(), 24); // 6 + 8 + 10
3933    }
3934
3935    #[test]
3936    fn batch_dag_buffer_reuse() {
3937        let mut wb = WorldBuilder::new();
3938        wb.register::<u64>(0);
3939        let mut world = wb.build();
3940        let reg = world.registry();
3941
3942        fn double(x: u32) -> u64 {
3943            x as u64 * 2
3944        }
3945        fn store(mut out: ResMut<u64>, val: &u64) {
3946            *out += *val;
3947        }
3948
3949        let mut batch = DagStart::<u32>::new()
3950            .root(double, reg)
3951            .then(store, reg)
3952            .build_batch(8);
3953
3954        batch.input_mut().extend([1, 2]);
3955        batch.run(&mut world);
3956        assert_eq!(*world.resource::<u64>(), 6); // 2 + 4
3957        assert!(batch.input().is_empty());
3958
3959        batch.input_mut().extend([10, 20]);
3960        batch.run(&mut world);
3961        assert_eq!(*world.resource::<u64>(), 66); // 6 + 20 + 40
3962    }
3963
3964    #[test]
3965    fn batch_dag_retains_allocation() {
3966        let mut world = WorldBuilder::new().build();
3967        let reg = world.registry();
3968
3969        fn noop(_x: u32) {}
3970
3971        let mut batch = DagStart::<u32>::new().root(noop, reg).build_batch(64);
3972
3973        batch.input_mut().extend([1, 2, 3]);
3974        batch.run(&mut world);
3975
3976        assert!(batch.input().is_empty());
3977        assert!(batch.input_mut().capacity() >= 64);
3978    }
3979
3980    #[test]
3981    fn batch_dag_empty_is_noop() {
3982        let mut wb = WorldBuilder::new();
3983        wb.register::<u64>(0);
3984        let mut world = wb.build();
3985        let reg = world.registry();
3986
3987        fn double(x: u32) -> u64 {
3988            x as u64 * 2
3989        }
3990        fn store(mut out: ResMut<u64>, val: &u64) {
3991            *out += *val;
3992        }
3993
3994        let mut batch = DagStart::<u32>::new()
3995            .root(double, reg)
3996            .then(store, reg)
3997            .build_batch(8);
3998
3999        batch.run(&mut world);
4000        assert_eq!(*world.resource::<u64>(), 0);
4001    }
4002
4003    #[test]
4004    fn batch_dag_with_splat() {
4005        let mut wb = WorldBuilder::new();
4006        wb.register::<u64>(0);
4007        let mut world = wb.build();
4008        let reg = world.registry();
4009
4010        fn split(x: u32) -> (u64, u64) {
4011            (x as u64, x as u64 * 10)
4012        }
4013        fn combine(a: &u64, b: &u64) -> u64 {
4014            *a + *b
4015        }
4016        fn store(mut out: ResMut<u64>, val: &u64) {
4017            *out += *val;
4018        }
4019
4020        let mut batch = DagStart::<u32>::new()
4021            .root(split, reg)
4022            .splat()
4023            .then(combine, reg)
4024            .then(store, reg)
4025            .build_batch(4);
4026
4027        batch.input_mut().extend([1, 2]);
4028        batch.run(&mut world);
4029
4030        // 1 → (1, 10) → 11, 2 → (2, 20) → 22
4031        assert_eq!(*world.resource::<u64>(), 33); // 11 + 22
4032    }
4033
4034    // -- Switch combinator --
4035
4036    #[test]
4037    fn dag_switch_basic() {
4038        fn root(x: u32) -> u64 {
4039            x as u64
4040        }
4041        fn sink(mut out: ResMut<u64>, val: &u64) {
4042            *out = *val;
4043        }
4044
4045        let mut wb = WorldBuilder::new();
4046        wb.register::<u64>(0);
4047        let mut world = wb.build();
4048        let reg = world.registry();
4049
4050        let mut dag = DagStart::<u32>::new()
4051            .root(root, reg)
4052            .switch(|_world, val| if *val > 5 { *val * 10 } else { *val + 1 })
4053            .then(sink, reg)
4054            .build();
4055
4056        dag.run(&mut world, 10u32); // 10 > 5 → 100
4057        assert_eq!(*world.resource::<u64>(), 100);
4058
4059        dag.run(&mut world, 3u32); // 3 <= 5 → 4
4060        assert_eq!(*world.resource::<u64>(), 4);
4061    }
4062
4063    #[test]
4064    fn dag_switch_3_way() {
4065        fn root(x: u32) -> u32 {
4066            x
4067        }
4068        fn sink(mut out: ResMut<u64>, val: &u64) {
4069            *out = *val;
4070        }
4071
4072        let mut wb = WorldBuilder::new();
4073        wb.register::<u64>(0);
4074        let mut world = wb.build();
4075        let reg = world.registry();
4076
4077        let mut dag = DagStart::<u32>::new()
4078            .root(root, reg)
4079            .switch(|_world, val| match *val % 3 {
4080                0 => *val as u64 + 100,
4081                1 => *val as u64 + 200,
4082                _ => *val as u64 + 300,
4083            })
4084            .then(sink, reg)
4085            .build();
4086
4087        dag.run(&mut world, 6u32); // 6 % 3 == 0 → 106
4088        assert_eq!(*world.resource::<u64>(), 106);
4089
4090        dag.run(&mut world, 7u32); // 7 % 3 == 1 → 207
4091        assert_eq!(*world.resource::<u64>(), 207);
4092
4093        dag.run(&mut world, 8u32); // 8 % 3 == 2 → 308
4094        assert_eq!(*world.resource::<u64>(), 308);
4095    }
4096
4097    #[test]
4098    fn dag_switch_with_resolve_arm() {
4099        fn root(x: u32) -> u32 {
4100            x
4101        }
4102        fn double(val: &u32) -> u64 {
4103            *val as u64 * 2
4104        }
4105        fn triple(val: &u32) -> u64 {
4106            *val as u64 * 3
4107        }
4108        fn sink(mut out: ResMut<u64>, val: &u64) {
4109            *out = *val;
4110        }
4111
4112        let mut wb = WorldBuilder::new();
4113        wb.register::<u64>(0);
4114        let mut world = wb.build();
4115        let reg = world.registry();
4116
4117        let mut arm_even = resolve_arm(double, reg);
4118        let mut arm_odd = resolve_arm(triple, reg);
4119
4120        let mut dag = DagStart::<u32>::new()
4121            .root(root, reg)
4122            .switch(move |world, val| {
4123                if *val % 2 == 0 {
4124                    arm_even(world, val)
4125                } else {
4126                    arm_odd(world, val)
4127                }
4128            })
4129            .then(sink, reg)
4130            .build();
4131
4132        dag.run(&mut world, 4u32); // even → double → 8
4133        assert_eq!(*world.resource::<u64>(), 8);
4134
4135        dag.run(&mut world, 5u32); // odd → triple → 15
4136        assert_eq!(*world.resource::<u64>(), 15);
4137    }
4138
4139    #[test]
4140    fn dag_resolve_arm_with_params() {
4141        fn root(x: u32) -> u32 {
4142            x
4143        }
4144        fn add_offset(offset: Res<i64>, val: &u32) -> u64 {
4145            (*offset + *val as i64) as u64
4146        }
4147        fn plain_double(val: &u32) -> u64 {
4148            *val as u64 * 2
4149        }
4150        fn sink(mut out: ResMut<u64>, val: &u64) {
4151            *out = *val;
4152        }
4153
4154        let mut wb = WorldBuilder::new();
4155        wb.register::<u64>(0);
4156        wb.register::<i64>(100);
4157        let mut world = wb.build();
4158        let reg = world.registry();
4159
4160        // Each arm resolves different params
4161        let mut arm_offset = resolve_arm(add_offset, reg);
4162        let mut arm_double = resolve_arm(plain_double, reg);
4163
4164        let mut dag = DagStart::<u32>::new()
4165            .root(root, reg)
4166            .switch(move |world, val| {
4167                if *val > 10 {
4168                    arm_offset(world, val)
4169                } else {
4170                    arm_double(world, val)
4171                }
4172            })
4173            .then(sink, reg)
4174            .build();
4175
4176        dag.run(&mut world, 20u32); // > 10 → add_offset → 100 + 20 = 120
4177        assert_eq!(*world.resource::<u64>(), 120);
4178
4179        dag.run(&mut world, 5u32); // <= 10 → double → 10
4180        assert_eq!(*world.resource::<u64>(), 10);
4181    }
4182
4183    #[test]
4184    fn dag_switch_in_fork_arm() {
4185        fn root(x: u32) -> u32 {
4186            x
4187        }
4188        fn pass(val: &u32) -> u32 {
4189            *val
4190        }
4191        fn sink_u64(mut out: ResMut<u64>, val: &u64) {
4192            *out = *val;
4193        }
4194        fn sink_i64(mut out: ResMut<i64>, val: &u32) {
4195            *out = *val as i64 * -1;
4196        }
4197
4198        let mut wb = WorldBuilder::new();
4199        wb.register::<u64>(0);
4200        wb.register::<i64>(0);
4201        let mut world = wb.build();
4202        let reg = world.registry();
4203
4204        let mut dag = DagStart::<u32>::new()
4205            .root(root, reg)
4206            .fork()
4207            .arm(|a| {
4208                a.then(pass, reg)
4209                    .switch(|_w, val| {
4210                        if *val > 5 {
4211                            *val as u64 * 10
4212                        } else {
4213                            *val as u64
4214                        }
4215                    })
4216                    .then(sink_u64, reg)
4217            })
4218            .arm(|a| a.then(sink_i64, reg))
4219            .join()
4220            .build();
4221
4222        dag.run(&mut world, 10u32); // arm0: 10 > 5 → 100, arm1: -10
4223        assert_eq!(*world.resource::<u64>(), 100);
4224        assert_eq!(*world.resource::<i64>(), -10);
4225
4226        dag.run(&mut world, 3u32); // arm0: 3 <= 5 → 3, arm1: -3
4227        assert_eq!(*world.resource::<u64>(), 3);
4228        assert_eq!(*world.resource::<i64>(), -3);
4229    }
4230
4231    #[test]
4232    fn batch_dag_switch() {
4233        fn root(x: u32) -> u32 {
4234            x
4235        }
4236        fn sink(mut out: ResMut<u64>, val: &u64) {
4237            *out += *val;
4238        }
4239
4240        let mut wb = WorldBuilder::new();
4241        wb.register::<u64>(0);
4242        let mut world = wb.build();
4243        let reg = world.registry();
4244
4245        let mut batch = DagStart::<u32>::new()
4246            .root(root, reg)
4247            .switch(|_w, val| {
4248                if *val % 2 == 0 {
4249                    *val as u64 * 10
4250                } else {
4251                    *val as u64
4252                }
4253            })
4254            .then(sink, reg)
4255            .build_batch(8);
4256
4257        batch.input_mut().extend([1, 2, 3, 4]);
4258        batch.run(&mut world);
4259
4260        // 1 → 1, 2 → 20, 3 → 3, 4 → 40 = 64
4261        assert_eq!(*world.resource::<u64>(), 64);
4262    }
4263}