Skip to main content

nexus_rt/
pipeline.rs

1// Builder return types are necessarily complex — each combinator returns
2// PipelineChain<In, Out, NodeType<Chain, ...>>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4// Handler arity is architecturally required by the Param trait — handlers
5// take N typed parameters and the macro-generated dispatch impls expand
6// per-arity into call_inner functions with N + Input arguments. Module-level
7// allow rather than one inline attribute per arity expansion.
8#![allow(clippy::too_many_arguments)]
9
10//! Pre-resolved pipeline dispatch using [`Param`] steps.
11//!
12//! [`PipelineBuilder`] begins a typed composition chain where each step
13//! is a named function with [`Param`] dependencies resolved at build
14//! time. The result is a monomorphized chain of named node types where dispatch-time
15//! resource access is a single pointer deref per fetch — zero framework overhead.
16//! [`ResourceId`](crate::ResourceId) is a direct pointer, not a HashMap lookup.
17//!
18//! Two dispatch tiers in nexus-rt:
19//! 1. **Pipeline** — static after build, pre-resolved, the workhorse
20//! 2. **Callback** — dynamic registration with per-instance context
21//!
22//! # Step function convention
23//!
24//! Params first, step input last, returns output:
25//!
26//! ```ignore
27//! fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
28//! fn enrich(cache: Res<MarketData>, order: ValidOrder) -> EnrichedOrder { .. }
29//! fn submit(mut gw: ResMut<Gateway>, order: CheckedOrder) { gw.send(order); }
30//! ```
31//!
32//! # Combinator split
33//!
34//! **IntoStep-based (pre-resolved, hot path):**
35//! `.then()`, `.map()`, `.and_then()`, `.catch()`
36//!
37//! **Trait-based (same API for named functions, arity-0 closures, and [`Opaque`] closures):**
38//! `.guard()`, `.filter()`, `.tap()`, `.inspect()`, `.inspect_err()`,
39//! `.on_none()`, `.ok_or_else()`, `.unwrap_or_else()`, `.map_err()`,
40//! `.or_else()`, `.and()`, `.or()`, `.xor()`, `.route()`
41//!
42//! # Combinator quick reference
43//!
44//! **Bare value `T`:** `.then()`, `.tap()`, `.guard()` (→ `Option<T>`),
45//! `.dispatch()`, `.route()`, `.tee()`, `.scan()`, `.dedup()` (→ `Option<T>`)
46//!
47//! **Tuple `(A, B, ...)` (2-5 elements):** `.splat()` (→ splat builder,
48//! call `.then()` with destructured args)
49//!
50//! **`Option<T>`:** `.map()`, `.filter()`, `.inspect()`, `.and_then()`,
51//! `.on_none()`, `.ok_or()` (→ `Result`), `.unwrap_or()` (→ `T`),
52//! `.cloned()` (→ `Option<T>` from `Option<&T>`)
53//!
54//! **`Result<T, E>`:** `.map()`, `.and_then()`, `.catch()` (→ `Option<T>`),
55//! `.map_err()`, `.inspect_err()`, `.ok()` (→ `Option<T>`),
56//! `.unwrap_or()` (→ `T`), `.or_else()`
57//!
58//! **`bool`:** `.not()`, `.and()`, `.or()`, `.xor()`
59//!
60//! **Terminal:** `.build()` (→ `Pipeline`), `.build_batch(cap)`
61//! (→ `BatchPipeline<In>`)
62//!
63//! # Splat — tuple destructuring
64//!
65//! Pipeline steps follow a single-value-in, single-value-out convention.
66//! When a step returns a tuple like `(OrderId, f64)`, the next step
67//! must accept the whole tuple as one argument. `.splat()` destructures
68//! the tuple so the next step receives individual arguments instead:
69//!
70//! ```ignore
71//! // Without splat — next step takes the whole tuple:
72//! fn process(pair: (OrderId, f64)) -> bool { .. }
73//!
74//! // With splat — next step takes individual args:
75//! fn process(id: OrderId, price: f64) -> bool { .. }
76//!
77//! PipelineBuilder::<Order>::new()
78//!     .then(extract, reg)   // Order → (OrderId, f64)
79//!     .splat()              // destructure
80//!     .then(process, reg)   // (OrderId, f64) → bool
81//!     .build();
82//! ```
83//!
84//! Supported for tuples of 2-5 elements. Beyond 5, define a named
85//! struct — if a combinator stage needs that many arguments, a struct
86//! makes the intent clearer and the code more maintainable.
87//!
88//! # Returning pipelines from functions (Rust 2024)
89//!
90//! When a factory function takes `&Registry` and returns `impl Handler<E>`,
91//! Rust 2024 captures the registry borrow in the return type by default.
92//! Use `+ use<...>` to exclude it:
93//!
94//! ```ignore
95//! fn on_order<C: Config>(
96//!     reg: &Registry,
97//! ) -> impl Handler<Order> + use<C> {
98//!     PipelineBuilder::<Order>::new()
99//!         .then(validate::<C>, reg)
100//!         .dispatch(submit::<C>.into_handler(reg))
101//!         .build()
102//! }
103//! ```
104//!
105//! List every type parameter the pipeline captures; omit the `&Registry`
106//! lifetime — it's consumed during `.build()`. See the
107//! [crate-level docs](crate#returning-impl-handler-from-functions-rust-2024)
108//! for the full explanation.
109
110use std::marker::PhantomData;
111
112use crate::Handler;
113use crate::dag::DagArm;
114use crate::handler::{Opaque, Param};
115use crate::world::{Registry, World};
116
117// =============================================================================
118// Step — pre-resolved step with Param state
119// =============================================================================
120
121/// Internal: pre-resolved step with cached Param state.
122///
123/// Users don't construct this directly — it's produced by [`IntoStep`] and
124/// stored inside named chain node types.
125#[doc(hidden)]
126pub struct Step<F, Params: Param> {
127    f: F,
128    state: Params::State,
129    // Retained for future diagnostic/tracing use (step name in error messages).
130    #[allow(dead_code)]
131    name: &'static str,
132}
133
134// =============================================================================
135// StepCall — callable trait for resolved steps
136// =============================================================================
137
138/// Internal: callable trait for resolved steps.
139///
140/// Used as a bound on [`IntoStep::Step`]. Users don't implement this.
141#[doc(hidden)]
142pub trait StepCall<In> {
143    /// The output type of this step.
144    type Out;
145    /// Call this step with a world reference and input value.
146    fn call(&mut self, world: &mut World, input: In) -> Self::Out;
147}
148
149// =============================================================================
150// IntoStep — converts a named function into a resolved step
151// =============================================================================
152
153/// Converts a named function into a pre-resolved pipeline step.
154///
155/// Params first, step input last, returns output. Arity 0 (no
156/// Params) supports closures. Arities 1+ require named functions
157/// (same HRTB+GAT limitation as [`IntoHandler`](crate::IntoHandler)).
158///
159/// # Examples
160///
161/// ```ignore
162/// // Arity 0 — closure works
163/// let step = (|x: u32| x * 2).into_step(registry);
164///
165/// // Arity 1 — named function required
166/// fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
167/// let step = validate.into_step(registry);
168/// ```
169#[diagnostic::on_unimplemented(
170    message = "this function cannot be used as a pipeline step for this input type",
171    note = "if the pipeline output is `Option<T>`, use `.map()` to operate on the inner `T`",
172    note = "if the pipeline output is `Result<T, E>`, use `.map()` for `Ok` or `.catch()` for `Err`",
173    note = "if using a closure with resource params (Res<T>, ResMut<T>), that isn't supported — use a named `fn`"
174)]
175pub trait IntoStep<In, Out, Params> {
176    /// The concrete resolved step type.
177    type Step: StepCall<In, Out = Out>;
178
179    /// Resolve Param state from the registry and produce a step.
180    fn into_step(self, registry: &Registry) -> Self::Step;
181}
182
183// =============================================================================
184// Arity 0 — fn(In) -> Out — closures work (no HRTB+GAT issues)
185// =============================================================================
186
187impl<In, Out, F: FnMut(In) -> Out + 'static> StepCall<In> for Step<F, ()> {
188    type Out = Out;
189    #[inline(always)]
190    fn call(&mut self, _world: &mut World, input: In) -> Out {
191        (self.f)(input)
192    }
193}
194
195impl<In, Out, F: FnMut(In) -> Out + 'static> IntoStep<In, Out, ()> for F {
196    type Step = Step<F, ()>;
197
198    fn into_step(self, registry: &Registry) -> Self::Step {
199        Step {
200            f: self,
201            state: <() as Param>::init(registry),
202            name: std::any::type_name::<F>(),
203        }
204    }
205}
206
207// =============================================================================
208// Arities 1-8 via macro — HRTB with -> Out
209// =============================================================================
210
211macro_rules! impl_into_step {
212    ($($P:ident),+) => {
213        impl<In, Out, F: 'static, $($P: Param + 'static),+>
214            StepCall<In> for Step<F, ($($P,)+)>
215        where
216            for<'a> &'a mut F:
217                FnMut($($P,)+ In) -> Out +
218                FnMut($($P::Item<'a>,)+ In) -> Out,
219        {
220            type Out = Out;
221            #[inline(always)]
222            #[allow(non_snake_case)]
223            fn call(&mut self, world: &mut World, input: In) -> Out {
224                fn call_inner<$($P,)+ Input, Output>(
225                    mut f: impl FnMut($($P,)+ Input) -> Output,
226                    $($P: $P,)+
227                    input: Input,
228                ) -> Output {
229                    f($($P,)+ input)
230                }
231
232                // SAFETY: state was produced by Param::init() on the same Registry
233                // that built this World. Borrows are disjoint — enforced by
234                // conflict detection at build time.
235                #[cfg(debug_assertions)]
236                world.clear_borrows();
237                let ($($P,)+) = unsafe {
238                    <($($P,)+) as Param>::fetch(world, &mut self.state)
239                };
240                call_inner(&mut self.f, $($P,)+ input)
241            }
242        }
243
244        impl<In, Out, F: 'static, $($P: Param + 'static),+>
245            IntoStep<In, Out, ($($P,)+)> for F
246        where
247            for<'a> &'a mut F:
248                FnMut($($P,)+ In) -> Out +
249                FnMut($($P::Item<'a>,)+ In) -> Out,
250        {
251            type Step = Step<F, ($($P,)+)>;
252
253            fn into_step(self, registry: &Registry) -> Self::Step {
254                let state = <($($P,)+) as Param>::init(registry);
255                {
256                    #[allow(non_snake_case)]
257                    let ($($P,)+) = &state;
258                    registry.check_access(&[
259                        $(
260                            (<$P as Param>::resource_id($P),
261                             std::any::type_name::<$P>()),
262                        )+
263                    ]);
264                }
265                Step { f: self, state, name: std::any::type_name::<F>() }
266            }
267        }
268    };
269}
270
271all_tuples!(impl_into_step);
272
273// =============================================================================
274// No-event Step — IntoStep<(), Out, _> without passing `()` to the function
275// =============================================================================
276
277use crate::handler::NoEvent;
278
279// Arity 0: fn() -> Out
280impl<Out, F: FnMut() -> Out + 'static> StepCall<()> for Step<NoEvent<F>, ()> {
281    type Out = Out;
282    #[inline(always)]
283    fn call(&mut self, _world: &mut World, _input: ()) -> Out {
284        (self.f.0)()
285    }
286}
287
288impl<Out, F: FnMut() -> Out + 'static> IntoStep<(), Out, NoEvent<F>> for F {
289    type Step = Step<NoEvent<F>, ()>;
290
291    fn into_step(self, registry: &Registry) -> Self::Step {
292        Step {
293            f: NoEvent(self),
294            state: <() as Param>::init(registry),
295            name: std::any::type_name::<F>(),
296        }
297    }
298}
299
300macro_rules! impl_into_step_no_event {
301    ($($P:ident),+) => {
302        impl<Out, F: 'static, $($P: Param + 'static),+>
303            StepCall<()> for Step<NoEvent<F>, ($($P,)+)>
304        where
305            for<'a> &'a mut F:
306                FnMut($($P,)+) -> Out +
307                FnMut($($P::Item<'a>,)+) -> Out,
308        {
309            type Out = Out;
310            #[inline(always)]
311            #[allow(non_snake_case)]
312            fn call(&mut self, world: &mut World, _input: ()) -> Out {
313                fn call_inner<$($P,)+ Output>(
314                    mut f: impl FnMut($($P,)+) -> Output,
315                    $($P: $P,)+
316                ) -> Output {
317                    f($($P,)+)
318                }
319
320                #[cfg(debug_assertions)]
321                world.clear_borrows();
322                let ($($P,)+) = unsafe {
323                    <($($P,)+) as Param>::fetch(world, &mut self.state)
324                };
325                call_inner(&mut self.f.0, $($P,)+)
326            }
327        }
328
329        impl<Out, F: 'static, $($P: Param + 'static),+>
330            IntoStep<(), Out, ($($P,)+)> for NoEvent<F>
331        where
332            for<'a> &'a mut F:
333                FnMut($($P,)+) -> Out +
334                FnMut($($P::Item<'a>,)+) -> Out,
335        {
336            type Step = Step<NoEvent<F>, ($($P,)+)>;
337
338            fn into_step(self, registry: &Registry) -> Self::Step {
339                let state = <($($P,)+) as Param>::init(registry);
340                {
341                    #[allow(non_snake_case)]
342                    let ($($P,)+) = &state;
343                    registry.check_access(&[
344                        $(
345                            (<$P as Param>::resource_id($P),
346                             std::any::type_name::<$P>()),
347                        )+
348                    ]);
349                }
350                Step { f: self, state, name: std::any::type_name::<F>() }
351            }
352        }
353    };
354}
355
356all_tuples!(impl_into_step_no_event);
357
358// =============================================================================
359// OpaqueStep — wrapper for opaque closures as steps
360// =============================================================================
361
362/// Internal: wrapper for opaque closures used as pipeline steps.
363///
364/// Unlike [`Step<F, P>`] which stores resolved `Param::State`, this
365/// holds only the function — no state to resolve.
366#[doc(hidden)]
367pub struct OpaqueStep<F> {
368    f: F,
369    // Retained for future diagnostic/tracing use (step name in error messages).
370    #[allow(dead_code)]
371    name: &'static str,
372}
373
374impl<In, Out, F: FnMut(&mut World, In) -> Out + 'static> StepCall<In> for OpaqueStep<F> {
375    type Out = Out;
376    #[inline(always)]
377    fn call(&mut self, world: &mut World, input: In) -> Out {
378        (self.f)(world, input)
379    }
380}
381
382impl<In, Out, F: FnMut(&mut World, In) -> Out + 'static> IntoStep<In, Out, Opaque> for F {
383    type Step = OpaqueStep<F>;
384
385    fn into_step(self, _registry: &Registry) -> Self::Step {
386        OpaqueStep {
387            f: self,
388            name: std::any::type_name::<F>(),
389        }
390    }
391}
392
393// =============================================================================
394// RefStepCall / IntoRefStep — step taking &In, returning Out
395// =============================================================================
396
397/// Internal: callable trait for resolved steps taking input by reference.
398///
399/// Used by combinators like `tap`, `guard`, `filter`, `inspect` that
400/// observe the value without consuming it.
401#[doc(hidden)]
402pub trait RefStepCall<In> {
403    /// The output type of this step.
404    type Out;
405    /// Call this step with a world reference and borrowed input.
406    fn call(&mut self, world: &mut World, input: &In) -> Self::Out;
407}
408
409/// Converts a function into a pre-resolved step taking input by reference.
410///
411/// Same three-tier resolution as [`IntoStep`]:
412///
413/// | Params | Function shape | Example |
414/// |--------|---------------|---------|
415/// | `()` | `FnMut(&In) -> Out` | `\|o: &Order\| o.price > 0.0` |
416/// | `(P0,)...(P0..P7,)` | `fn(Params..., &In) -> Out` | `fn check(c: Res<Config>, o: &Order) -> bool` |
417/// | [`Opaque`] | `FnMut(&mut World, &In) -> Out` | `\|w: &mut World, o: &Order\| { ... }` |
418#[diagnostic::on_unimplemented(
419    message = "this function cannot be used as a reference step for this input type",
420    note = "reference steps (guard, filter, tap, inspect) take `&In`, not `In`",
421    note = "if the pipeline output is `Option<T>`, `.filter()` operates on `&T` inside the Option",
422    note = "closures with resource parameters are not supported — use a named `fn`"
423)]
424pub trait IntoRefStep<In, Out, Params> {
425    /// The concrete resolved step type.
426    type Step: RefStepCall<In, Out = Out>;
427
428    /// Resolve Param state from the registry and produce a step.
429    fn into_ref_step(self, registry: &Registry) -> Self::Step;
430}
431
432// -- Arity 0: FnMut(&In) -> Out — closures work ----------------------------
433
434impl<In, Out, F: FnMut(&In) -> Out + 'static> RefStepCall<In> for Step<F, ()> {
435    type Out = Out;
436    #[inline(always)]
437    fn call(&mut self, _world: &mut World, input: &In) -> Out {
438        (self.f)(input)
439    }
440}
441
442impl<In, Out, F: FnMut(&In) -> Out + 'static> IntoRefStep<In, Out, ()> for F {
443    type Step = Step<F, ()>;
444
445    fn into_ref_step(self, registry: &Registry) -> Self::Step {
446        Step {
447            f: self,
448            state: <() as Param>::init(registry),
449            name: std::any::type_name::<F>(),
450        }
451    }
452}
453
454// -- Arities 1-8: named functions with Param resolution ---------------------
455
456macro_rules! impl_into_ref_step {
457    ($($P:ident),+) => {
458        impl<In, Out, F: 'static, $($P: Param + 'static),+>
459            RefStepCall<In> for Step<F, ($($P,)+)>
460        where
461            for<'a> &'a mut F:
462                FnMut($($P,)+ &In) -> Out +
463                FnMut($($P::Item<'a>,)+ &In) -> Out,
464        {
465            type Out = Out;
466            #[inline(always)]
467            #[allow(non_snake_case)]
468            fn call(&mut self, world: &mut World, input: &In) -> Out {
469                fn call_inner<$($P,)+ Input: ?Sized, Output>(
470                    mut f: impl FnMut($($P,)+ &Input) -> Output,
471                    $($P: $P,)+
472                    input: &Input,
473                ) -> Output {
474                    f($($P,)+ input)
475                }
476
477                // SAFETY: state was produced by Param::init() on the same Registry
478                // that built this World. Borrows are disjoint — enforced by
479                // conflict detection at build time.
480                #[cfg(debug_assertions)]
481                world.clear_borrows();
482                let ($($P,)+) = unsafe {
483                    <($($P,)+) as Param>::fetch(world, &mut self.state)
484                };
485                call_inner(&mut self.f, $($P,)+ input)
486            }
487        }
488
489        impl<In, Out, F: 'static, $($P: Param + 'static),+>
490            IntoRefStep<In, Out, ($($P,)+)> for F
491        where
492            for<'a> &'a mut F:
493                FnMut($($P,)+ &In) -> Out +
494                FnMut($($P::Item<'a>,)+ &In) -> Out,
495        {
496            type Step = Step<F, ($($P,)+)>;
497
498            fn into_ref_step(self, registry: &Registry) -> Self::Step {
499                let state = <($($P,)+) as Param>::init(registry);
500                {
501                    #[allow(non_snake_case)]
502                    let ($($P,)+) = &state;
503                    registry.check_access(&[
504                        $(
505                            (<$P as Param>::resource_id($P),
506                             std::any::type_name::<$P>()),
507                        )+
508                    ]);
509                }
510                Step { f: self, state, name: std::any::type_name::<F>() }
511            }
512        }
513    };
514}
515
516all_tuples!(impl_into_ref_step);
517
518// =============================================================================
519// No-event RefStep — IntoRefStep<(), Out, _> without passing `&()` to function
520// =============================================================================
521
522// Arity 0: fn() -> Out
523impl<Out, F: FnMut() -> Out + 'static> RefStepCall<()> for Step<NoEvent<F>, ()> {
524    type Out = Out;
525    #[inline(always)]
526    fn call(&mut self, _world: &mut World, _input: &()) -> Out {
527        (self.f.0)()
528    }
529}
530
531impl<Out, F: FnMut() -> Out + 'static> IntoRefStep<(), Out, NoEvent<F>> for F {
532    type Step = Step<NoEvent<F>, ()>;
533
534    fn into_ref_step(self, registry: &Registry) -> Self::Step {
535        Step {
536            f: NoEvent(self),
537            state: <() as Param>::init(registry),
538            name: std::any::type_name::<F>(),
539        }
540    }
541}
542
543macro_rules! impl_into_ref_step_no_event {
544    ($($P:ident),+) => {
545        impl<Out, F: 'static, $($P: Param + 'static),+>
546            RefStepCall<()> for Step<NoEvent<F>, ($($P,)+)>
547        where
548            for<'a> &'a mut F:
549                FnMut($($P,)+) -> Out +
550                FnMut($($P::Item<'a>,)+) -> Out,
551        {
552            type Out = Out;
553            #[inline(always)]
554            #[allow(non_snake_case)]
555            fn call(&mut self, world: &mut World, _input: &()) -> Out {
556                fn call_inner<$($P,)+ Output>(
557                    mut f: impl FnMut($($P,)+) -> Output,
558                    $($P: $P,)+
559                ) -> Output {
560                    f($($P,)+)
561                }
562
563                #[cfg(debug_assertions)]
564                world.clear_borrows();
565                let ($($P,)+) = unsafe {
566                    <($($P,)+) as Param>::fetch(world, &mut self.state)
567                };
568                call_inner(&mut self.f.0, $($P,)+)
569            }
570        }
571
572        impl<Out, F: 'static, $($P: Param + 'static),+>
573            IntoRefStep<(), Out, ($($P,)+)> for NoEvent<F>
574        where
575            for<'a> &'a mut F:
576                FnMut($($P,)+) -> Out +
577                FnMut($($P::Item<'a>,)+) -> Out,
578        {
579            type Step = Step<NoEvent<F>, ($($P,)+)>;
580
581            fn into_ref_step(self, registry: &Registry) -> Self::Step {
582                let state = <($($P,)+) as Param>::init(registry);
583                {
584                    #[allow(non_snake_case)]
585                    let ($($P,)+) = &state;
586                    registry.check_access(&[
587                        $(
588                            (<$P as Param>::resource_id($P),
589                             std::any::type_name::<$P>()),
590                        )+
591                    ]);
592                }
593                Step { f: self, state, name: std::any::type_name::<F>() }
594            }
595        }
596    };
597}
598
599all_tuples!(impl_into_ref_step_no_event);
600
601// -- Opaque: FnMut(&mut World, &In) -> Out ---------------------------------
602
603/// Internal: wrapper for opaque closures taking input by reference.
604#[doc(hidden)]
605pub struct OpaqueRefStep<F> {
606    f: F,
607    // Retained for future diagnostic/tracing use (step name in error messages).
608    #[allow(dead_code)]
609    name: &'static str,
610}
611
612impl<In, Out, F: FnMut(&mut World, &In) -> Out + 'static> RefStepCall<In> for OpaqueRefStep<F> {
613    type Out = Out;
614    #[inline(always)]
615    fn call(&mut self, world: &mut World, input: &In) -> Out {
616        (self.f)(world, input)
617    }
618}
619
620impl<In, Out, F: FnMut(&mut World, &In) -> Out + 'static> IntoRefStep<In, Out, Opaque> for F {
621    type Step = OpaqueRefStep<F>;
622
623    fn into_ref_step(self, _registry: &Registry) -> Self::Step {
624        OpaqueRefStep {
625            f: self,
626            name: std::any::type_name::<F>(),
627        }
628    }
629}
630
631// =============================================================================
632// resolve_ref_step — pre-resolve a ref step for manual dispatch
633// =============================================================================
634
635/// Resolve a reference step for manual dispatch.
636///
637/// Returns a closure with pre-resolved [`Param`] state. Reference-input
638/// counterpart of [`resolve_step`].
639pub fn resolve_ref_step<In, Out, Params, S: IntoRefStep<In, Out, Params>>(
640    f: S,
641    registry: &Registry,
642) -> impl FnMut(&mut World, &In) -> Out + use<In, Out, Params, S> {
643    let mut resolved = f.into_ref_step(registry);
644    move |world: &mut World, input: &In| resolved.call(world, input)
645}
646
647// =============================================================================
648// ProducerCall / IntoProducer — step producing a value with no pipeline input
649// =============================================================================
650
651/// Internal: callable trait for resolved steps that produce a value
652/// without receiving pipeline input.
653///
654/// Used by combinators like `and`, `or`, `xor`, `on_none`, `ok_or_else`,
655/// `unwrap_or_else` (Option).
656#[doc(hidden)]
657pub trait ProducerCall {
658    /// The output type of this producer.
659    type Out;
660    /// Call this producer with a world reference.
661    fn call(&mut self, world: &mut World) -> Self::Out;
662}
663
664/// Converts a function into a pre-resolved producer step.
665///
666/// Same three-tier resolution as [`IntoStep`]:
667///
668/// | Params | Function shape | Example |
669/// |--------|---------------|---------|
670/// | `()` | `FnMut() -> Out` | `\|\| true` |
671/// | `(P0,)...(P0..P7,)` | `fn(Params...) -> Out` | `fn is_active(s: Res<State>) -> bool` |
672/// | [`Opaque`] | `FnMut(&mut World) -> Out` | `\|w: &mut World\| { ... }` |
673#[diagnostic::on_unimplemented(
674    message = "this function cannot be used as a producer for this output type",
675    note = "producers take no pipeline input — they produce a value (e.g., default, fallback)",
676    note = "closures with resource parameters are not supported — use a named `fn`"
677)]
678pub trait IntoProducer<Out, Params> {
679    /// The concrete resolved producer type.
680    type Step: ProducerCall<Out = Out>;
681
682    /// Resolve Param state from the registry and produce a step.
683    fn into_producer(self, registry: &Registry) -> Self::Step;
684}
685
686// -- Arity 0: FnMut() -> Out — closures work --------------------------------
687
688impl<Out, F: FnMut() -> Out + 'static> ProducerCall for Step<F, ()> {
689    type Out = Out;
690    #[inline(always)]
691    fn call(&mut self, _world: &mut World) -> Out {
692        (self.f)()
693    }
694}
695
696impl<Out, F: FnMut() -> Out + 'static> IntoProducer<Out, ()> for F {
697    type Step = Step<F, ()>;
698
699    fn into_producer(self, registry: &Registry) -> Self::Step {
700        Step {
701            f: self,
702            state: <() as Param>::init(registry),
703            name: std::any::type_name::<F>(),
704        }
705    }
706}
707
708// -- Arities 1-8: named functions with Param resolution ---------------------
709
710macro_rules! impl_into_producer {
711    ($($P:ident),+) => {
712        impl<Out, F: 'static, $($P: Param + 'static),+>
713            ProducerCall for Step<F, ($($P,)+)>
714        where
715            for<'a> &'a mut F:
716                FnMut($($P,)+) -> Out +
717                FnMut($($P::Item<'a>,)+) -> Out,
718        {
719            type Out = Out;
720            #[inline(always)]
721            #[allow(non_snake_case)]
722            fn call(&mut self, world: &mut World) -> Out {
723                fn call_inner<$($P,)+ Output>(
724                    mut f: impl FnMut($($P,)+) -> Output,
725                    $($P: $P,)+
726                ) -> Output {
727                    f($($P,)+)
728                }
729
730                // SAFETY: state was produced by Param::init() on the same Registry
731                // that built this World. Borrows are disjoint — enforced by
732                // conflict detection at build time.
733                #[cfg(debug_assertions)]
734                world.clear_borrows();
735                let ($($P,)+) = unsafe {
736                    <($($P,)+) as Param>::fetch(world, &mut self.state)
737                };
738                call_inner(&mut self.f, $($P,)+)
739            }
740        }
741
742        impl<Out, F: 'static, $($P: Param + 'static),+>
743            IntoProducer<Out, ($($P,)+)> for F
744        where
745            for<'a> &'a mut F:
746                FnMut($($P,)+) -> Out +
747                FnMut($($P::Item<'a>,)+) -> Out,
748        {
749            type Step = Step<F, ($($P,)+)>;
750
751            fn into_producer(self, registry: &Registry) -> Self::Step {
752                let state = <($($P,)+) as Param>::init(registry);
753                {
754                    #[allow(non_snake_case)]
755                    let ($($P,)+) = &state;
756                    registry.check_access(&[
757                        $(
758                            (<$P as Param>::resource_id($P),
759                             std::any::type_name::<$P>()),
760                        )+
761                    ]);
762                }
763                Step { f: self, state, name: std::any::type_name::<F>() }
764            }
765        }
766    };
767}
768
769all_tuples!(impl_into_producer);
770
771// -- Opaque: FnMut(&mut World) -> Out ---------------------------------------
772
773/// Internal: wrapper for opaque closures used as producers.
774#[doc(hidden)]
775pub struct OpaqueProducer<F> {
776    f: F,
777    // Retained for future diagnostic/tracing use (step name in error messages).
778    #[allow(dead_code)]
779    name: &'static str,
780}
781
782impl<Out, F: FnMut(&mut World) -> Out + 'static> ProducerCall for OpaqueProducer<F> {
783    type Out = Out;
784    #[inline(always)]
785    fn call(&mut self, world: &mut World) -> Out {
786        (self.f)(world)
787    }
788}
789
790impl<Out, F: FnMut(&mut World) -> Out + 'static> IntoProducer<Out, Opaque> for F {
791    type Step = OpaqueProducer<F>;
792
793    fn into_producer(self, _registry: &Registry) -> Self::Step {
794        OpaqueProducer {
795            f: self,
796            name: std::any::type_name::<F>(),
797        }
798    }
799}
800
801// =============================================================================
802// resolve_producer — pre-resolve a producer for manual dispatch
803// =============================================================================
804
805/// Resolve a producer for manual dispatch.
806///
807/// Returns a closure with pre-resolved [`Param`] state. No-input
808/// counterpart of [`resolve_step`].
809pub fn resolve_producer<Out, Params, S: IntoProducer<Out, Params>>(
810    f: S,
811    registry: &Registry,
812) -> impl FnMut(&mut World) -> Out + use<Out, Params, S> {
813    let mut resolved = f.into_producer(registry);
814    move |world: &mut World| resolved.call(world)
815}
816
817// =============================================================================
818// ScanStepCall / IntoScanStep — step with persistent accumulator
819// =============================================================================
820
821/// Internal: callable trait for resolved scan steps.
822///
823/// Like [`StepCall`] but with an additional `&mut Acc` accumulator
824/// argument that persists across invocations.
825#[doc(hidden)]
826pub trait ScanStepCall<Acc, In> {
827    /// The output type of this scan step.
828    type Out;
829    /// Call this scan step with a world reference, accumulator, and input value.
830    fn call(&mut self, world: &mut World, acc: &mut Acc, input: In) -> Self::Out;
831}
832
833/// Converts a function into a pre-resolved scan step with persistent state.
834///
835/// Same three-tier resolution as [`IntoStep`]:
836///
837/// | Params | Function shape | Example |
838/// |--------|---------------|---------|
839/// | `()` | `FnMut(&mut Acc, In) -> Out` | `\|acc, trade\| { *acc += trade.amount; Some(*acc) }` |
840/// | `(P0,)...(P0..P7,)` | `fn(Params..., &mut Acc, In) -> Out` | `fn vwap(c: Res<Config>, acc: &mut State, t: Trade) -> Option<f64>` |
841/// | [`Opaque`] | `FnMut(&mut World, &mut Acc, In) -> Out` | `\|w: &mut World, acc: &mut u64, t: Trade\| { ... }` |
842#[diagnostic::on_unimplemented(
843    message = "this function cannot be used as a scan step",
844    note = "scan steps take `&mut Accumulator` as first param, then resources, then input last",
845    note = "closures with resource parameters are not supported — use a named `fn`"
846)]
847pub trait IntoScanStep<Acc, In, Out, Params> {
848    /// The concrete resolved scan step type.
849    type Step: ScanStepCall<Acc, In, Out = Out>;
850
851    /// Resolve Param state from the registry and produce a scan step.
852    fn into_scan_step(self, registry: &Registry) -> Self::Step;
853}
854
855// -- Arity 0: FnMut(&mut Acc, In) -> Out — closures work --------------------
856
857impl<Acc, In, Out, F: FnMut(&mut Acc, In) -> Out + 'static> ScanStepCall<Acc, In> for Step<F, ()> {
858    type Out = Out;
859    #[inline(always)]
860    fn call(&mut self, _world: &mut World, acc: &mut Acc, input: In) -> Out {
861        (self.f)(acc, input)
862    }
863}
864
865impl<Acc, In, Out, F: FnMut(&mut Acc, In) -> Out + 'static> IntoScanStep<Acc, In, Out, ()> for F {
866    type Step = Step<F, ()>;
867
868    fn into_scan_step(self, registry: &Registry) -> Self::Step {
869        Step {
870            f: self,
871            state: <() as Param>::init(registry),
872            name: std::any::type_name::<F>(),
873        }
874    }
875}
876
877// -- Arities 1-8: named functions with Param resolution ----------------------
878
879macro_rules! impl_into_scan_step {
880    ($($P:ident),+) => {
881        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
882            ScanStepCall<Acc, In> for Step<F, ($($P,)+)>
883        where
884            for<'a> &'a mut F:
885                FnMut($($P,)+ &mut Acc, In) -> Out +
886                FnMut($($P::Item<'a>,)+ &mut Acc, In) -> Out,
887        {
888            type Out = Out;
889            #[inline(always)]
890            #[allow(non_snake_case)]
891            fn call(&mut self, world: &mut World, acc: &mut Acc, input: In) -> Out {
892                fn call_inner<$($P,)+ Accumulator, Input, Output>(
893                    mut f: impl FnMut($($P,)+ &mut Accumulator, Input) -> Output,
894                    $($P: $P,)+
895                    acc: &mut Accumulator,
896                    input: Input,
897                ) -> Output {
898                    f($($P,)+ acc, input)
899                }
900
901                // SAFETY: state was produced by Param::init() on the same Registry
902                // that built this World. Borrows are disjoint — enforced by
903                // conflict detection at build time.
904                #[cfg(debug_assertions)]
905                world.clear_borrows();
906                let ($($P,)+) = unsafe {
907                    <($($P,)+) as Param>::fetch(world, &mut self.state)
908                };
909                call_inner(&mut self.f, $($P,)+ acc, input)
910            }
911        }
912
913        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
914            IntoScanStep<Acc, In, Out, ($($P,)+)> for F
915        where
916            for<'a> &'a mut F:
917                FnMut($($P,)+ &mut Acc, In) -> Out +
918                FnMut($($P::Item<'a>,)+ &mut Acc, In) -> Out,
919        {
920            type Step = Step<F, ($($P,)+)>;
921
922            fn into_scan_step(self, registry: &Registry) -> Self::Step {
923                let state = <($($P,)+) as Param>::init(registry);
924                {
925                    #[allow(non_snake_case)]
926                    let ($($P,)+) = &state;
927                    registry.check_access(&[
928                        $(
929                            (<$P as Param>::resource_id($P),
930                             std::any::type_name::<$P>()),
931                        )+
932                    ]);
933                }
934                Step { f: self, state, name: std::any::type_name::<F>() }
935            }
936        }
937    };
938}
939
940all_tuples!(impl_into_scan_step);
941
942// =============================================================================
943// No-event ScanStep — IntoScanStep<Acc, (), Out, _> without passing `()` input
944// =============================================================================
945
946// Arity 0: fn(&mut Acc) -> Out
947impl<Acc, Out, F: FnMut(&mut Acc) -> Out + 'static> ScanStepCall<Acc, ()> for Step<NoEvent<F>, ()> {
948    type Out = Out;
949    #[inline(always)]
950    fn call(&mut self, _world: &mut World, acc: &mut Acc, _input: ()) -> Out {
951        (self.f.0)(acc)
952    }
953}
954
955impl<Acc, Out, F: FnMut(&mut Acc) -> Out + 'static> IntoScanStep<Acc, (), Out, NoEvent<F>> for F {
956    type Step = Step<NoEvent<F>, ()>;
957
958    fn into_scan_step(self, registry: &Registry) -> Self::Step {
959        Step {
960            f: NoEvent(self),
961            state: <() as Param>::init(registry),
962            name: std::any::type_name::<F>(),
963        }
964    }
965}
966
967macro_rules! impl_into_scan_step_no_event {
968    ($($P:ident),+) => {
969        impl<Acc, Out, F: 'static, $($P: Param + 'static),+>
970            ScanStepCall<Acc, ()> for Step<NoEvent<F>, ($($P,)+)>
971        where
972            for<'a> &'a mut F:
973                FnMut($($P,)+ &mut Acc) -> Out +
974                FnMut($($P::Item<'a>,)+ &mut Acc) -> Out,
975        {
976            type Out = Out;
977            #[inline(always)]
978            #[allow(non_snake_case)]
979            fn call(&mut self, world: &mut World, acc: &mut Acc, _input: ()) -> Out {
980                fn call_inner<$($P,)+ Accumulator, Output>(
981                    mut f: impl FnMut($($P,)+ &mut Accumulator) -> Output,
982                    $($P: $P,)+
983                    acc: &mut Accumulator,
984                ) -> Output {
985                    f($($P,)+ acc)
986                }
987
988                #[cfg(debug_assertions)]
989                world.clear_borrows();
990                let ($($P,)+) = unsafe {
991                    <($($P,)+) as Param>::fetch(world, &mut self.state)
992                };
993                call_inner(&mut self.f.0, $($P,)+ acc)
994            }
995        }
996
997        impl<Acc, Out, F: 'static, $($P: Param + 'static),+>
998            IntoScanStep<Acc, (), Out, ($($P,)+)> for NoEvent<F>
999        where
1000            for<'a> &'a mut F:
1001                FnMut($($P,)+ &mut Acc) -> Out +
1002                FnMut($($P::Item<'a>,)+ &mut Acc) -> Out,
1003        {
1004            type Step = Step<NoEvent<F>, ($($P,)+)>;
1005
1006            fn into_scan_step(self, registry: &Registry) -> Self::Step {
1007                let state = <($($P,)+) as Param>::init(registry);
1008                {
1009                    #[allow(non_snake_case)]
1010                    let ($($P,)+) = &state;
1011                    registry.check_access(&[
1012                        $(
1013                            (<$P as Param>::resource_id($P),
1014                             std::any::type_name::<$P>()),
1015                        )+
1016                    ]);
1017                }
1018                Step { f: self, state, name: std::any::type_name::<F>() }
1019            }
1020        }
1021    };
1022}
1023
1024all_tuples!(impl_into_scan_step_no_event);
1025
1026// -- Opaque: FnMut(&mut World, &mut Acc, In) -> Out --------------------------
1027
1028/// Internal: wrapper for opaque closures used as scan steps.
1029#[doc(hidden)]
1030pub struct OpaqueScanStep<F> {
1031    f: F,
1032    // Retained for future diagnostic/tracing use (step name in error messages).
1033    #[allow(dead_code)]
1034    name: &'static str,
1035}
1036
1037impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, In) -> Out + 'static> ScanStepCall<Acc, In>
1038    for OpaqueScanStep<F>
1039{
1040    type Out = Out;
1041    #[inline(always)]
1042    fn call(&mut self, world: &mut World, acc: &mut Acc, input: In) -> Out {
1043        (self.f)(world, acc, input)
1044    }
1045}
1046
1047impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, In) -> Out + 'static>
1048    IntoScanStep<Acc, In, Out, Opaque> for F
1049{
1050    type Step = OpaqueScanStep<F>;
1051
1052    fn into_scan_step(self, _registry: &Registry) -> Self::Step {
1053        OpaqueScanStep {
1054            f: self,
1055            name: std::any::type_name::<F>(),
1056        }
1057    }
1058}
1059
1060// =============================================================================
1061// resolve_scan_step — pre-resolve a scan step for manual dispatch
1062// =============================================================================
1063
1064/// Resolve a scan step for manual dispatch.
1065///
1066/// Returns a closure with pre-resolved [`Param`] state. Scan variant
1067/// of [`resolve_step`] with an additional `&mut Acc` accumulator.
1068pub fn resolve_scan_step<Acc, In, Out, Params, S: IntoScanStep<Acc, In, Out, Params>>(
1069    f: S,
1070    registry: &Registry,
1071) -> impl FnMut(&mut World, &mut Acc, In) -> Out + use<Acc, In, Out, Params, S> {
1072    let mut resolved = f.into_scan_step(registry);
1073    move |world: &mut World, acc: &mut Acc, input: In| resolved.call(world, acc, input)
1074}
1075
1076// =============================================================================
1077// RefScanStepCall / IntoRefScanStep — scan step taking &In
1078// =============================================================================
1079
1080/// Internal: callable trait for resolved scan steps taking input by reference.
1081///
1082/// DAG variant of [`ScanStepCall`] — each step borrows its input.
1083#[doc(hidden)]
1084pub trait RefScanStepCall<Acc, In> {
1085    /// The output type of this ref-scan step.
1086    type Out;
1087    /// Call this scan step with a world reference, accumulator, and borrowed input.
1088    fn call(&mut self, world: &mut World, acc: &mut Acc, input: &In) -> Self::Out;
1089}
1090
1091/// Converts a function into a pre-resolved ref-scan step with persistent state.
1092///
1093/// Same three-tier resolution as [`IntoRefStep`]:
1094///
1095/// | Params | Function shape | Example |
1096/// |--------|---------------|---------|
1097/// | `()` | `FnMut(&mut Acc, &In) -> Out` | `\|acc, trade: &Trade\| { ... }` |
1098/// | `(P0,)...(P0..P7,)` | `fn(Params..., &mut Acc, &In) -> Out` | `fn vwap(c: Res<Config>, acc: &mut State, t: &Trade) -> Option<f64>` |
1099/// | [`Opaque`] | `FnMut(&mut World, &mut Acc, &In) -> Out` | `\|w: &mut World, acc: &mut u64, t: &Trade\| { ... }` |
1100#[diagnostic::on_unimplemented(
1101    message = "this function cannot be used as a reference scan step",
1102    note = "reference scan steps take `&mut Accumulator` as first param, then resources, then `&In` last",
1103    note = "closures with resource parameters are not supported — use a named `fn`"
1104)]
1105pub trait IntoRefScanStep<Acc, In, Out, Params> {
1106    /// The concrete resolved ref-scan step type.
1107    type Step: RefScanStepCall<Acc, In, Out = Out>;
1108
1109    /// Resolve Param state from the registry and produce a ref-scan step.
1110    fn into_ref_scan_step(self, registry: &Registry) -> Self::Step;
1111}
1112
1113// -- Arity 0: FnMut(&mut Acc, &In) -> Out — closures work -------------------
1114
1115impl<Acc, In, Out, F: FnMut(&mut Acc, &In) -> Out + 'static> RefScanStepCall<Acc, In>
1116    for Step<F, ()>
1117{
1118    type Out = Out;
1119    #[inline(always)]
1120    fn call(&mut self, _world: &mut World, acc: &mut Acc, input: &In) -> Out {
1121        (self.f)(acc, input)
1122    }
1123}
1124
1125impl<Acc, In, Out, F: FnMut(&mut Acc, &In) -> Out + 'static> IntoRefScanStep<Acc, In, Out, ()>
1126    for F
1127{
1128    type Step = Step<F, ()>;
1129
1130    fn into_ref_scan_step(self, registry: &Registry) -> Self::Step {
1131        Step {
1132            f: self,
1133            state: <() as Param>::init(registry),
1134            name: std::any::type_name::<F>(),
1135        }
1136    }
1137}
1138
1139// -- Arities 1-8: named functions with Param resolution ----------------------
1140
1141macro_rules! impl_into_ref_scan_step {
1142    ($($P:ident),+) => {
1143        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
1144            RefScanStepCall<Acc, In> for Step<F, ($($P,)+)>
1145        where
1146            for<'a> &'a mut F:
1147                FnMut($($P,)+ &mut Acc, &In) -> Out +
1148                FnMut($($P::Item<'a>,)+ &mut Acc, &In) -> Out,
1149        {
1150            type Out = Out;
1151            #[inline(always)]
1152            #[allow(non_snake_case)]
1153            fn call(&mut self, world: &mut World, acc: &mut Acc, input: &In) -> Out {
1154                fn call_inner<$($P,)+ Accumulator, Input: ?Sized, Output>(
1155                    mut f: impl FnMut($($P,)+ &mut Accumulator, &Input) -> Output,
1156                    $($P: $P,)+
1157                    acc: &mut Accumulator,
1158                    input: &Input,
1159                ) -> Output {
1160                    f($($P,)+ acc, input)
1161                }
1162
1163                // SAFETY: state was produced by Param::init() on the same Registry
1164                // that built this World. Borrows are disjoint — enforced by
1165                // conflict detection at build time.
1166                #[cfg(debug_assertions)]
1167                world.clear_borrows();
1168                let ($($P,)+) = unsafe {
1169                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1170                };
1171                call_inner(&mut self.f, $($P,)+ acc, input)
1172            }
1173        }
1174
1175        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
1176            IntoRefScanStep<Acc, In, Out, ($($P,)+)> for F
1177        where
1178            for<'a> &'a mut F:
1179                FnMut($($P,)+ &mut Acc, &In) -> Out +
1180                FnMut($($P::Item<'a>,)+ &mut Acc, &In) -> Out,
1181        {
1182            type Step = Step<F, ($($P,)+)>;
1183
1184            fn into_ref_scan_step(self, registry: &Registry) -> Self::Step {
1185                let state = <($($P,)+) as Param>::init(registry);
1186                {
1187                    #[allow(non_snake_case)]
1188                    let ($($P,)+) = &state;
1189                    registry.check_access(&[
1190                        $(
1191                            (<$P as Param>::resource_id($P),
1192                             std::any::type_name::<$P>()),
1193                        )+
1194                    ]);
1195                }
1196                Step { f: self, state, name: std::any::type_name::<F>() }
1197            }
1198        }
1199    };
1200}
1201
1202all_tuples!(impl_into_ref_scan_step);
1203
1204// =============================================================================
1205// No-event RefScanStep — IntoRefScanStep<Acc, (), Out, _> without `&()` input
1206// =============================================================================
1207
1208// Arity 0: fn(&mut Acc) -> Out
1209impl<Acc, Out, F: FnMut(&mut Acc) -> Out + 'static> RefScanStepCall<Acc, ()>
1210    for Step<NoEvent<F>, ()>
1211{
1212    type Out = Out;
1213    #[inline(always)]
1214    fn call(&mut self, _world: &mut World, acc: &mut Acc, _input: &()) -> Out {
1215        (self.f.0)(acc)
1216    }
1217}
1218
1219impl<Acc, Out, F: FnMut(&mut Acc) -> Out + 'static> IntoRefScanStep<Acc, (), Out, NoEvent<F>>
1220    for F
1221{
1222    type Step = Step<NoEvent<F>, ()>;
1223
1224    fn into_ref_scan_step(self, registry: &Registry) -> Self::Step {
1225        Step {
1226            f: NoEvent(self),
1227            state: <() as Param>::init(registry),
1228            name: std::any::type_name::<F>(),
1229        }
1230    }
1231}
1232
1233macro_rules! impl_into_ref_scan_step_no_event {
1234    ($($P:ident),+) => {
1235        impl<Acc, Out, F: 'static, $($P: Param + 'static),+>
1236            RefScanStepCall<Acc, ()> for Step<NoEvent<F>, ($($P,)+)>
1237        where
1238            for<'a> &'a mut F:
1239                FnMut($($P,)+ &mut Acc) -> Out +
1240                FnMut($($P::Item<'a>,)+ &mut Acc) -> Out,
1241        {
1242            type Out = Out;
1243            #[inline(always)]
1244            #[allow(non_snake_case)]
1245            fn call(&mut self, world: &mut World, acc: &mut Acc, _input: &()) -> Out {
1246                fn call_inner<$($P,)+ Accumulator, Output>(
1247                    mut f: impl FnMut($($P,)+ &mut Accumulator) -> Output,
1248                    $($P: $P,)+
1249                    acc: &mut Accumulator,
1250                ) -> Output {
1251                    f($($P,)+ acc)
1252                }
1253
1254                #[cfg(debug_assertions)]
1255                world.clear_borrows();
1256                let ($($P,)+) = unsafe {
1257                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1258                };
1259                call_inner(&mut self.f.0, $($P,)+ acc)
1260            }
1261        }
1262
1263        impl<Acc, Out, F: 'static, $($P: Param + 'static),+>
1264            IntoRefScanStep<Acc, (), Out, ($($P,)+)> for NoEvent<F>
1265        where
1266            for<'a> &'a mut F:
1267                FnMut($($P,)+ &mut Acc) -> Out +
1268                FnMut($($P::Item<'a>,)+ &mut Acc) -> Out,
1269        {
1270            type Step = Step<NoEvent<F>, ($($P,)+)>;
1271
1272            fn into_ref_scan_step(self, registry: &Registry) -> Self::Step {
1273                let state = <($($P,)+) as Param>::init(registry);
1274                {
1275                    #[allow(non_snake_case)]
1276                    let ($($P,)+) = &state;
1277                    registry.check_access(&[
1278                        $(
1279                            (<$P as Param>::resource_id($P),
1280                             std::any::type_name::<$P>()),
1281                        )+
1282                    ]);
1283                }
1284                Step { f: self, state, name: std::any::type_name::<F>() }
1285            }
1286        }
1287    };
1288}
1289
1290all_tuples!(impl_into_ref_scan_step_no_event);
1291
1292// -- Opaque: FnMut(&mut World, &mut Acc, &In) -> Out ------------------------
1293
1294/// Internal: wrapper for opaque closures used as ref-scan steps.
1295#[doc(hidden)]
1296pub struct OpaqueRefScanStep<F> {
1297    f: F,
1298    // Retained for future diagnostic/tracing use (step name in error messages).
1299    #[allow(dead_code)]
1300    name: &'static str,
1301}
1302
1303impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, &In) -> Out + 'static> RefScanStepCall<Acc, In>
1304    for OpaqueRefScanStep<F>
1305{
1306    type Out = Out;
1307    #[inline(always)]
1308    fn call(&mut self, world: &mut World, acc: &mut Acc, input: &In) -> Out {
1309        (self.f)(world, acc, input)
1310    }
1311}
1312
1313impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, &In) -> Out + 'static>
1314    IntoRefScanStep<Acc, In, Out, Opaque> for F
1315{
1316    type Step = OpaqueRefScanStep<F>;
1317
1318    fn into_ref_scan_step(self, _registry: &Registry) -> Self::Step {
1319        OpaqueRefScanStep {
1320            f: self,
1321            name: std::any::type_name::<F>(),
1322        }
1323    }
1324}
1325
1326// =============================================================================
1327// resolve_ref_scan_step — pre-resolve a ref-scan step for manual dispatch
1328// =============================================================================
1329
1330/// Resolve a ref-scan step for manual dispatch.
1331///
1332/// Returns a closure with pre-resolved [`Param`] state. Reference-input
1333/// counterpart of [`resolve_scan_step`].
1334pub fn resolve_ref_scan_step<Acc, In, Out, Params, S: IntoRefScanStep<Acc, In, Out, Params>>(
1335    f: S,
1336    registry: &Registry,
1337) -> impl FnMut(&mut World, &mut Acc, &In) -> Out + use<Acc, In, Out, Params, S> {
1338    let mut resolved = f.into_ref_scan_step(registry);
1339    move |world: &mut World, acc: &mut Acc, input: &In| resolved.call(world, acc, input)
1340}
1341
1342// =============================================================================
1343// SplatCall / IntoSplatStep — splat step dispatch (tuple destructuring)
1344// =============================================================================
1345//
1346// Splat traits mirror StepCall/IntoStep but accept multiple owned values
1347// instead of a single input. This lets `.splat()` destructure a tuple
1348// output into individual function arguments for the next step.
1349//
1350// One trait pair per arity (2-5). Past 5, use a named struct.
1351
1352// -- Splat 2 ------------------------------------------------------------------
1353
1354/// Internal: callable trait for resolved 2-splat steps.
1355#[doc(hidden)]
1356pub trait SplatCall2<A, B> {
1357    /// Output type of this splat step.
1358    type Out;
1359    fn call_splat(&mut self, world: &mut World, a: A, b: B) -> Self::Out;
1360}
1361
1362/// Converts a named function into a resolved 2-splat step.
1363#[doc(hidden)]
1364pub trait IntoSplatStep2<A, B, Out, Params> {
1365    type Step: SplatCall2<A, B, Out = Out>;
1366    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1367}
1368
1369impl<A, B, Out, F: FnMut(A, B) -> Out + 'static> SplatCall2<A, B> for Step<F, ()> {
1370    type Out = Out;
1371    #[inline(always)]
1372    fn call_splat(&mut self, _world: &mut World, a: A, b: B) -> Out {
1373        (self.f)(a, b)
1374    }
1375}
1376
1377impl<A, B, Out, F: FnMut(A, B) -> Out + 'static> IntoSplatStep2<A, B, Out, ()> for F {
1378    type Step = Step<F, ()>;
1379    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1380        Step {
1381            f: self,
1382            state: <() as Param>::init(registry),
1383            name: std::any::type_name::<F>(),
1384        }
1385    }
1386}
1387
1388macro_rules! impl_splat2_step {
1389    ($($P:ident),+) => {
1390        impl<A, B, Out, F: 'static, $($P: Param + 'static),+>
1391            SplatCall2<A, B> for Step<F, ($($P,)+)>
1392        where
1393            for<'a> &'a mut F:
1394                FnMut($($P,)+ A, B) -> Out +
1395                FnMut($($P::Item<'a>,)+ A, B) -> Out,
1396        {
1397            type Out = Out;
1398            #[inline(always)]
1399            #[allow(non_snake_case)]
1400            fn call_splat(&mut self, world: &mut World, a: A, b: B) -> Out {
1401                fn call_inner<$($P,)+ IA, IB, Output>(
1402                    mut f: impl FnMut($($P,)+ IA, IB) -> Output,
1403                    $($P: $P,)+
1404                    a: IA, b: IB,
1405                ) -> Output {
1406                    f($($P,)+ a, b)
1407                }
1408                // SAFETY: state was produced by Param::init() on the same Registry
1409                // that built this World. Borrows are disjoint — enforced by
1410                // conflict detection at build time.
1411                #[cfg(debug_assertions)]
1412                world.clear_borrows();
1413                let ($($P,)+) = unsafe {
1414                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1415                };
1416                call_inner(&mut self.f, $($P,)+ a, b)
1417            }
1418        }
1419
1420        impl<A, B, Out, F: 'static, $($P: Param + 'static),+>
1421            IntoSplatStep2<A, B, Out, ($($P,)+)> for F
1422        where
1423            for<'a> &'a mut F:
1424                FnMut($($P,)+ A, B) -> Out +
1425                FnMut($($P::Item<'a>,)+ A, B) -> Out,
1426        {
1427            type Step = Step<F, ($($P,)+)>;
1428
1429            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1430                let state = <($($P,)+) as Param>::init(registry);
1431                {
1432                    #[allow(non_snake_case)]
1433                    let ($($P,)+) = &state;
1434                    registry.check_access(&[
1435                        $(
1436                            (<$P as Param>::resource_id($P),
1437                             std::any::type_name::<$P>()),
1438                        )+
1439                    ]);
1440                }
1441                Step { f: self, state, name: std::any::type_name::<F>() }
1442            }
1443        }
1444    };
1445}
1446
1447// -- Splat 3 ------------------------------------------------------------------
1448
1449/// Internal: callable trait for resolved 3-splat steps.
1450#[doc(hidden)]
1451pub trait SplatCall3<A, B, C> {
1452    /// Output type of this splat step.
1453    type Out;
1454    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C) -> Self::Out;
1455}
1456
1457/// Converts a named function into a resolved 3-splat step.
1458#[doc(hidden)]
1459pub trait IntoSplatStep3<A, B, C, Out, Params> {
1460    type Step: SplatCall3<A, B, C, Out = Out>;
1461    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1462}
1463
1464impl<A, B, C, Out, F: FnMut(A, B, C) -> Out + 'static> SplatCall3<A, B, C> for Step<F, ()> {
1465    type Out = Out;
1466    #[inline(always)]
1467    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C) -> Out {
1468        (self.f)(a, b, c)
1469    }
1470}
1471
1472impl<A, B, C, Out, F: FnMut(A, B, C) -> Out + 'static> IntoSplatStep3<A, B, C, Out, ()> for F {
1473    type Step = Step<F, ()>;
1474    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1475        Step {
1476            f: self,
1477            state: <() as Param>::init(registry),
1478            name: std::any::type_name::<F>(),
1479        }
1480    }
1481}
1482
1483macro_rules! impl_splat3_step {
1484    ($($P:ident),+) => {
1485        impl<A, B, C, Out, F: 'static, $($P: Param + 'static),+>
1486            SplatCall3<A, B, C> for Step<F, ($($P,)+)>
1487        where
1488            for<'a> &'a mut F:
1489                FnMut($($P,)+ A, B, C) -> Out +
1490                FnMut($($P::Item<'a>,)+ A, B, C) -> Out,
1491        {
1492            type Out = Out;
1493            #[inline(always)]
1494            #[allow(non_snake_case)]
1495            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C) -> Out {
1496                fn call_inner<$($P,)+ IA, IB, IC, Output>(
1497                    mut f: impl FnMut($($P,)+ IA, IB, IC) -> Output,
1498                    $($P: $P,)+
1499                    a: IA, b: IB, c: IC,
1500                ) -> Output {
1501                    f($($P,)+ a, b, c)
1502                }
1503                // SAFETY: state was produced by Param::init() on the same Registry
1504                // that built this World. Borrows are disjoint — enforced by
1505                // conflict detection at build time.
1506                #[cfg(debug_assertions)]
1507                world.clear_borrows();
1508                let ($($P,)+) = unsafe {
1509                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1510                };
1511                call_inner(&mut self.f, $($P,)+ a, b, c)
1512            }
1513        }
1514
1515        impl<A, B, C, Out, F: 'static, $($P: Param + 'static),+>
1516            IntoSplatStep3<A, B, C, Out, ($($P,)+)> for F
1517        where
1518            for<'a> &'a mut F:
1519                FnMut($($P,)+ A, B, C) -> Out +
1520                FnMut($($P::Item<'a>,)+ A, B, C) -> Out,
1521        {
1522            type Step = Step<F, ($($P,)+)>;
1523
1524            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1525                let state = <($($P,)+) as Param>::init(registry);
1526                {
1527                    #[allow(non_snake_case)]
1528                    let ($($P,)+) = &state;
1529                    registry.check_access(&[
1530                        $(
1531                            (<$P as Param>::resource_id($P),
1532                             std::any::type_name::<$P>()),
1533                        )+
1534                    ]);
1535                }
1536                Step { f: self, state, name: std::any::type_name::<F>() }
1537            }
1538        }
1539    };
1540}
1541
1542// -- Splat 4 ------------------------------------------------------------------
1543
1544/// Internal: callable trait for resolved 4-splat steps.
1545#[doc(hidden)]
1546pub trait SplatCall4<A, B, C, D> {
1547    /// Output type of this splat step.
1548    type Out;
1549    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D) -> Self::Out;
1550}
1551
1552/// Converts a named function into a resolved 4-splat step.
1553#[doc(hidden)]
1554pub trait IntoSplatStep4<A, B, C, D, Out, Params> {
1555    type Step: SplatCall4<A, B, C, D, Out = Out>;
1556    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1557}
1558
1559impl<A, B, C, D, Out, F: FnMut(A, B, C, D) -> Out + 'static> SplatCall4<A, B, C, D>
1560    for Step<F, ()>
1561{
1562    type Out = Out;
1563    #[inline(always)]
1564    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C, d: D) -> Out {
1565        (self.f)(a, b, c, d)
1566    }
1567}
1568
1569impl<A, B, C, D, Out, F: FnMut(A, B, C, D) -> Out + 'static> IntoSplatStep4<A, B, C, D, Out, ()>
1570    for F
1571{
1572    type Step = Step<F, ()>;
1573    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1574        Step {
1575            f: self,
1576            state: <() as Param>::init(registry),
1577            name: std::any::type_name::<F>(),
1578        }
1579    }
1580}
1581
1582macro_rules! impl_splat4_step {
1583    ($($P:ident),+) => {
1584        impl<A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
1585            SplatCall4<A, B, C, D> for Step<F, ($($P,)+)>
1586        where for<'a> &'a mut F:
1587            FnMut($($P,)+ A, B, C, D) -> Out +
1588            FnMut($($P::Item<'a>,)+ A, B, C, D) -> Out,
1589        {
1590            type Out = Out;
1591            #[inline(always)]
1592            #[allow(non_snake_case)]
1593            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D) -> Out {
1594                fn call_inner<$($P,)+ IA, IB, IC, ID, Output>(
1595                    mut f: impl FnMut($($P,)+ IA, IB, IC, ID) -> Output,
1596                    $($P: $P,)+ a: IA, b: IB, c: IC, d: ID,
1597                ) -> Output { f($($P,)+ a, b, c, d) }
1598                // SAFETY: state was produced by Param::init() on the same Registry
1599                // that built this World. Borrows are disjoint — enforced by
1600                // conflict detection at build time.
1601                #[cfg(debug_assertions)]
1602                world.clear_borrows();
1603                let ($($P,)+) = unsafe {
1604                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1605                };
1606                call_inner(&mut self.f, $($P,)+ a, b, c, d)
1607            }
1608        }
1609        impl<A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
1610            IntoSplatStep4<A, B, C, D, Out, ($($P,)+)> for F
1611        where for<'a> &'a mut F:
1612            FnMut($($P,)+ A, B, C, D) -> Out +
1613            FnMut($($P::Item<'a>,)+ A, B, C, D) -> Out,
1614        {
1615            type Step = Step<F, ($($P,)+)>;
1616            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1617                let state = <($($P,)+) as Param>::init(registry);
1618                { #[allow(non_snake_case)] let ($($P,)+) = &state;
1619                  registry.check_access(&[$((<$P as Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
1620                Step { f: self, state, name: std::any::type_name::<F>() }
1621            }
1622        }
1623    };
1624}
1625
1626// -- Splat 5 ------------------------------------------------------------------
1627
1628/// Internal: callable trait for resolved 5-splat steps.
1629#[doc(hidden)]
1630pub trait SplatCall5<A, B, C, D, E> {
1631    /// Output type of this splat step.
1632    type Out;
1633    #[allow(clippy::many_single_char_names)]
1634    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Self::Out;
1635}
1636
1637/// Converts a named function into a resolved 5-splat step.
1638#[doc(hidden)]
1639pub trait IntoSplatStep5<A, B, C, D, E, Out, Params> {
1640    type Step: SplatCall5<A, B, C, D, E, Out = Out>;
1641    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1642}
1643
1644impl<A, B, C, D, E, Out, F: FnMut(A, B, C, D, E) -> Out + 'static> SplatCall5<A, B, C, D, E>
1645    for Step<F, ()>
1646{
1647    type Out = Out;
1648    #[inline(always)]
1649    #[allow(clippy::many_single_char_names)]
1650    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Out {
1651        (self.f)(a, b, c, d, e)
1652    }
1653}
1654
1655impl<A, B, C, D, E, Out, F: FnMut(A, B, C, D, E) -> Out + 'static>
1656    IntoSplatStep5<A, B, C, D, E, Out, ()> for F
1657{
1658    type Step = Step<F, ()>;
1659    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1660        Step {
1661            f: self,
1662            state: <() as Param>::init(registry),
1663            name: std::any::type_name::<F>(),
1664        }
1665    }
1666}
1667
1668macro_rules! impl_splat5_step {
1669    ($($P:ident),+) => {
1670        impl<A, B, C, D, E, Out, F: 'static, $($P: Param + 'static),+>
1671            SplatCall5<A, B, C, D, E> for Step<F, ($($P,)+)>
1672        where for<'a> &'a mut F:
1673            FnMut($($P,)+ A, B, C, D, E) -> Out +
1674            FnMut($($P::Item<'a>,)+ A, B, C, D, E) -> Out,
1675        {
1676            type Out = Out;
1677            #[inline(always)]
1678            #[allow(non_snake_case, clippy::many_single_char_names)]
1679            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Out {
1680                fn call_inner<$($P,)+ IA, IB, IC, ID, IE, Output>(
1681                    mut f: impl FnMut($($P,)+ IA, IB, IC, ID, IE) -> Output,
1682                    $($P: $P,)+ a: IA, b: IB, c: IC, d: ID, e: IE,
1683                ) -> Output { f($($P,)+ a, b, c, d, e) }
1684                // SAFETY: state was produced by Param::init() on the same Registry
1685                // that built this World. Borrows are disjoint — enforced by
1686                // conflict detection at build time.
1687                #[cfg(debug_assertions)]
1688                world.clear_borrows();
1689                let ($($P,)+) = unsafe {
1690                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1691                };
1692                call_inner(&mut self.f, $($P,)+ a, b, c, d, e)
1693            }
1694        }
1695        impl<A, B, C, D, E, Out, F: 'static, $($P: Param + 'static),+>
1696            IntoSplatStep5<A, B, C, D, E, Out, ($($P,)+)> for F
1697        where for<'a> &'a mut F:
1698            FnMut($($P,)+ A, B, C, D, E) -> Out +
1699            FnMut($($P::Item<'a>,)+ A, B, C, D, E) -> Out,
1700        {
1701            type Step = Step<F, ($($P,)+)>;
1702            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1703                let state = <($($P,)+) as Param>::init(registry);
1704                { #[allow(non_snake_case)] let ($($P,)+) = &state;
1705                  registry.check_access(&[$((<$P as Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
1706                Step { f: self, state, name: std::any::type_name::<F>() }
1707            }
1708        }
1709    };
1710}
1711
1712all_tuples!(impl_splat2_step);
1713all_tuples!(impl_splat3_step);
1714all_tuples!(impl_splat4_step);
1715all_tuples!(impl_splat5_step);
1716
1717// =============================================================================
1718// ChainCall — named chain dispatch trait
1719// =============================================================================
1720
1721/// Trait for pipeline chain nodes. Each node transforms input through
1722/// the chain, producing an output. `In` appears only on the trait impl,
1723/// not on the implementing struct — this preserves HRTB compatibility
1724/// so `Pipeline<Chain>` can satisfy `for<'a> Handler<&'a T>`.
1725#[doc(hidden)]
1726pub trait ChainCall<In> {
1727    /// The output type of this chain node.
1728    type Out;
1729    /// Execute the chain on the given input.
1730    fn call(&mut self, world: &mut World, input: In) -> Self::Out;
1731}
1732
1733// =============================================================================
1734// Chain nodes — named types for HRTB-compatible pipeline composition
1735// =============================================================================
1736//
1737// Each combinator gets a named struct following the iterator adapter pattern
1738// (like `Map<I, F>`, `Filter<I, P>`). `In` appears only on the `ChainCall<In>`
1739// trait impl, never on the struct — this is what enables HRTB boxing.
1740
1741/// Identity passthrough node. Used as the initial chain element.
1742#[doc(hidden)]
1743pub struct IdentityNode;
1744
1745impl<In> ChainCall<In> for IdentityNode {
1746    type Out = In;
1747    #[inline(always)]
1748    fn call(&mut self, _world: &mut World, input: In) -> In {
1749        input
1750    }
1751}
1752
1753// -- Core (any Out) ----------------------------------------------------------
1754
1755/// Chain node for `.then()` — transforms output via a resolved step.
1756#[doc(hidden)]
1757pub struct ThenNode<Prev, S> {
1758    pub(crate) prev: Prev,
1759    pub(crate) step: S,
1760}
1761
1762impl<In, Prev, S> ChainCall<In> for ThenNode<Prev, S>
1763where
1764    Prev: ChainCall<In>,
1765    S: StepCall<Prev::Out>,
1766{
1767    type Out = S::Out;
1768    #[inline(always)]
1769    fn call(&mut self, world: &mut World, input: In) -> S::Out {
1770        let mid = self.prev.call(world, input);
1771        self.step.call(world, mid)
1772    }
1773}
1774
1775/// Chain node for `.tap()` — side effect via `&Out`, value passes through.
1776#[doc(hidden)]
1777pub struct TapNode<Prev, S> {
1778    pub(crate) prev: Prev,
1779    pub(crate) step: S,
1780}
1781
1782impl<In, Prev, S> ChainCall<In> for TapNode<Prev, S>
1783where
1784    Prev: ChainCall<In>,
1785    S: RefStepCall<Prev::Out, Out = ()>,
1786{
1787    type Out = Prev::Out;
1788    #[inline(always)]
1789    fn call(&mut self, world: &mut World, input: In) -> Prev::Out {
1790        let val = self.prev.call(world, input);
1791        self.step.call(world, &val);
1792        val
1793    }
1794}
1795
1796/// Chain node for `.guard()` — wraps output in `Option` based on predicate.
1797#[doc(hidden)]
1798pub struct GuardNode<Prev, S> {
1799    pub(crate) prev: Prev,
1800    pub(crate) step: S,
1801}
1802
1803impl<In, Prev, S> ChainCall<In> for GuardNode<Prev, S>
1804where
1805    Prev: ChainCall<In>,
1806    S: RefStepCall<Prev::Out, Out = bool>,
1807{
1808    type Out = Option<Prev::Out>;
1809    #[inline(always)]
1810    fn call(&mut self, world: &mut World, input: In) -> Option<Prev::Out> {
1811        let val = self.prev.call(world, input);
1812        if self.step.call(world, &val) {
1813            Some(val)
1814        } else {
1815            None
1816        }
1817    }
1818}
1819
1820/// Chain node for `.dedup()` — suppresses consecutive unchanged values.
1821#[doc(hidden)]
1822pub struct DedupNode<Prev, T> {
1823    pub(crate) prev: Prev,
1824    pub(crate) last: Option<T>,
1825}
1826
1827impl<In, T: PartialEq + Clone, Prev: ChainCall<In, Out = T>> ChainCall<In> for DedupNode<Prev, T> {
1828    type Out = Option<T>;
1829    #[inline(always)]
1830    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1831        let val = self.prev.call(world, input);
1832        if self.last.as_ref() == Some(&val) {
1833            None
1834        } else {
1835            self.last = Some(val.clone());
1836            Some(val)
1837        }
1838    }
1839}
1840
1841/// Chain node for `.scan()` — transforms with persistent accumulator.
1842#[doc(hidden)]
1843pub struct ScanNode<Prev, S, Acc> {
1844    pub(crate) prev: Prev,
1845    pub(crate) step: S,
1846    pub(crate) acc: Acc,
1847}
1848
1849impl<In, Prev, S, Acc> ChainCall<In> for ScanNode<Prev, S, Acc>
1850where
1851    Prev: ChainCall<In>,
1852    S: ScanStepCall<Acc, Prev::Out>,
1853{
1854    type Out = S::Out;
1855    #[inline(always)]
1856    fn call(&mut self, world: &mut World, input: In) -> S::Out {
1857        let val = self.prev.call(world, input);
1858        self.step.call(world, &mut self.acc, val)
1859    }
1860}
1861
1862/// Chain node for `.dispatch()` — feeds output to a [`Handler`].
1863#[doc(hidden)]
1864pub struct DispatchNode<Prev, H> {
1865    pub(crate) prev: Prev,
1866    pub(crate) handler: H,
1867}
1868
1869impl<In, Prev, H> ChainCall<In> for DispatchNode<Prev, H>
1870where
1871    Prev: ChainCall<In>,
1872    H: Handler<Prev::Out>,
1873{
1874    type Out = ();
1875    #[inline(always)]
1876    fn call(&mut self, world: &mut World, input: In) {
1877        let out = self.prev.call(world, input);
1878        self.handler.run(world, out);
1879    }
1880}
1881
1882/// Chain node for `.tee()` — runs side-effect chain on `&Out`, passes value through.
1883#[doc(hidden)]
1884pub struct TeeNode<Prev, C> {
1885    pub(crate) prev: Prev,
1886    pub(crate) side: C,
1887}
1888
1889impl<In, Prev, C> ChainCall<In> for TeeNode<Prev, C>
1890where
1891    Prev: ChainCall<In>,
1892    Prev::Out: 'static,
1893    C: for<'a> ChainCall<&'a Prev::Out, Out = ()>,
1894{
1895    type Out = Prev::Out;
1896    #[inline(always)]
1897    fn call(&mut self, world: &mut World, input: In) -> Prev::Out {
1898        let val = self.prev.call(world, input);
1899        self.side.call(world, &val);
1900        val
1901    }
1902}
1903
1904/// Chain node for `.route()` — binary conditional dispatch.
1905#[doc(hidden)]
1906pub struct RouteNode<Prev, P, C0, C1> {
1907    pub(crate) prev: Prev,
1908    pub(crate) pred: P,
1909    pub(crate) on_true: C0,
1910    pub(crate) on_false: C1,
1911}
1912
1913impl<In, Prev, P, C0, C1> ChainCall<In> for RouteNode<Prev, P, C0, C1>
1914where
1915    Prev: ChainCall<In>,
1916    P: RefStepCall<Prev::Out, Out = bool>,
1917    C0: ChainCall<Prev::Out>,
1918    C1: ChainCall<Prev::Out, Out = C0::Out>,
1919{
1920    type Out = C0::Out;
1921    #[inline(always)]
1922    fn call(&mut self, world: &mut World, input: In) -> C0::Out {
1923        let val = self.prev.call(world, input);
1924        if self.pred.call(world, &val) {
1925            self.on_true.call(world, val)
1926        } else {
1927            self.on_false.call(world, val)
1928        }
1929    }
1930}
1931
1932// -- Option<T> nodes ---------------------------------------------------------
1933
1934/// Chain node for `.map()` on `Option<T>`.
1935#[doc(hidden)]
1936pub struct MapOptionNode<Prev, S> {
1937    pub(crate) prev: Prev,
1938    pub(crate) step: S,
1939}
1940
1941impl<In, T, Prev, S> ChainCall<In> for MapOptionNode<Prev, S>
1942where
1943    Prev: ChainCall<In, Out = Option<T>>,
1944    S: StepCall<T>,
1945{
1946    type Out = Option<S::Out>;
1947    #[inline(always)]
1948    fn call(&mut self, world: &mut World, input: In) -> Option<S::Out> {
1949        self.prev
1950            .call(world, input)
1951            .map(|val| self.step.call(world, val))
1952    }
1953}
1954
1955/// Chain node for `.filter()` on `Option<T>`.
1956#[doc(hidden)]
1957pub struct FilterNode<Prev, S> {
1958    pub(crate) prev: Prev,
1959    pub(crate) step: S,
1960}
1961
1962impl<In, T, Prev, S> ChainCall<In> for FilterNode<Prev, S>
1963where
1964    Prev: ChainCall<In, Out = Option<T>>,
1965    S: RefStepCall<T, Out = bool>,
1966{
1967    type Out = Option<T>;
1968    #[inline(always)]
1969    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1970        self.prev
1971            .call(world, input)
1972            .filter(|val| self.step.call(world, val))
1973    }
1974}
1975
1976/// Chain node for `.inspect()` on `Option<T>`.
1977#[doc(hidden)]
1978pub struct InspectOptionNode<Prev, S> {
1979    pub(crate) prev: Prev,
1980    pub(crate) step: S,
1981}
1982
1983impl<In, T, Prev, S> ChainCall<In> for InspectOptionNode<Prev, S>
1984where
1985    Prev: ChainCall<In, Out = Option<T>>,
1986    S: RefStepCall<T, Out = ()>,
1987{
1988    type Out = Option<T>;
1989    #[inline(always)]
1990    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1991        self.prev
1992            .call(world, input)
1993            .inspect(|val| self.step.call(world, val))
1994    }
1995}
1996
1997/// Chain node for `.and_then()` on `Option<T>`.
1998#[doc(hidden)]
1999pub struct AndThenNode<Prev, S> {
2000    pub(crate) prev: Prev,
2001    pub(crate) step: S,
2002}
2003
2004impl<In, T, U, Prev, S> ChainCall<In> for AndThenNode<Prev, S>
2005where
2006    Prev: ChainCall<In, Out = Option<T>>,
2007    S: StepCall<T, Out = Option<U>>,
2008{
2009    type Out = Option<U>;
2010    #[inline(always)]
2011    fn call(&mut self, world: &mut World, input: In) -> Option<U> {
2012        self.prev
2013            .call(world, input)
2014            .and_then(|val| self.step.call(world, val))
2015    }
2016}
2017
2018/// Chain node for `.on_none()` — side effect when `None`.
2019#[doc(hidden)]
2020pub struct OnNoneNode<Prev, P> {
2021    pub(crate) prev: Prev,
2022    pub(crate) producer: P,
2023}
2024
2025impl<In, T, Prev, P> ChainCall<In> for OnNoneNode<Prev, P>
2026where
2027    Prev: ChainCall<In, Out = Option<T>>,
2028    P: ProducerCall<Out = ()>,
2029{
2030    type Out = Option<T>;
2031    #[inline(always)]
2032    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
2033        let result = self.prev.call(world, input);
2034        if result.is_none() {
2035            self.producer.call(world);
2036        }
2037        result
2038    }
2039}
2040
2041/// Chain node for `.ok_or()` — `Option<T>` → `Result<T, E>`.
2042#[doc(hidden)]
2043pub struct OkOrNode<Prev, E> {
2044    pub(crate) prev: Prev,
2045    pub(crate) err: E,
2046}
2047
2048impl<In, T, E: Clone, Prev: ChainCall<In, Out = Option<T>>> ChainCall<In> for OkOrNode<Prev, E> {
2049    type Out = Result<T, E>;
2050    #[inline(always)]
2051    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
2052        self.prev.call(world, input).ok_or_else(|| self.err.clone())
2053    }
2054}
2055
2056/// Chain node for `.ok_or_else()` — `Option<T>` → `Result<T, E>`.
2057#[doc(hidden)]
2058pub struct OkOrElseNode<Prev, P> {
2059    pub(crate) prev: Prev,
2060    pub(crate) producer: P,
2061}
2062
2063impl<In, T, E, Prev, P> ChainCall<In> for OkOrElseNode<Prev, P>
2064where
2065    Prev: ChainCall<In, Out = Option<T>>,
2066    P: ProducerCall<Out = E>,
2067{
2068    type Out = Result<T, E>;
2069    #[inline(always)]
2070    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
2071        self.prev
2072            .call(world, input)
2073            .ok_or_else(|| self.producer.call(world))
2074    }
2075}
2076
2077/// Chain node for `.unwrap_or()` on `Option<T>`.
2078#[doc(hidden)]
2079pub struct UnwrapOrOptionNode<Prev, T> {
2080    pub(crate) prev: Prev,
2081    pub(crate) default: T,
2082}
2083
2084impl<In, T: Clone, Prev: ChainCall<In, Out = Option<T>>> ChainCall<In>
2085    for UnwrapOrOptionNode<Prev, T>
2086{
2087    type Out = T;
2088    #[inline(always)]
2089    fn call(&mut self, world: &mut World, input: In) -> T {
2090        self.prev
2091            .call(world, input)
2092            .unwrap_or_else(|| self.default.clone())
2093    }
2094}
2095
2096/// Chain node for `.unwrap_or_else()` on `Option<T>`.
2097#[doc(hidden)]
2098pub struct UnwrapOrElseOptionNode<Prev, P> {
2099    pub(crate) prev: Prev,
2100    pub(crate) producer: P,
2101}
2102
2103impl<In, T, Prev, P> ChainCall<In> for UnwrapOrElseOptionNode<Prev, P>
2104where
2105    Prev: ChainCall<In, Out = Option<T>>,
2106    P: ProducerCall<Out = T>,
2107{
2108    type Out = T;
2109    #[inline(always)]
2110    fn call(&mut self, world: &mut World, input: In) -> T {
2111        self.prev
2112            .call(world, input)
2113            .unwrap_or_else(|| self.producer.call(world))
2114    }
2115}
2116
2117// -- Result<T, E> nodes ------------------------------------------------------
2118
2119/// Chain node for `.map()` on `Result<T, E>`.
2120#[doc(hidden)]
2121pub struct MapResultNode<Prev, S> {
2122    pub(crate) prev: Prev,
2123    pub(crate) step: S,
2124}
2125
2126impl<In, T, E, Prev, S> ChainCall<In> for MapResultNode<Prev, S>
2127where
2128    Prev: ChainCall<In, Out = Result<T, E>>,
2129    S: StepCall<T>,
2130{
2131    type Out = Result<S::Out, E>;
2132    #[inline(always)]
2133    fn call(&mut self, world: &mut World, input: In) -> Result<S::Out, E> {
2134        self.prev
2135            .call(world, input)
2136            .map(|val| self.step.call(world, val))
2137    }
2138}
2139
2140/// Chain node for `.and_then()` on `Result<T, E>`.
2141#[doc(hidden)]
2142pub struct AndThenResultNode<Prev, S> {
2143    pub(crate) prev: Prev,
2144    pub(crate) step: S,
2145}
2146
2147impl<In, T, U, E, Prev, S> ChainCall<In> for AndThenResultNode<Prev, S>
2148where
2149    Prev: ChainCall<In, Out = Result<T, E>>,
2150    S: StepCall<T, Out = Result<U, E>>,
2151{
2152    type Out = Result<U, E>;
2153    #[inline(always)]
2154    fn call(&mut self, world: &mut World, input: In) -> Result<U, E> {
2155        self.prev
2156            .call(world, input)
2157            .and_then(|val| self.step.call(world, val))
2158    }
2159}
2160
2161/// Chain node for `.catch()` — `Result<T, E>` → `Option<T>`.
2162#[doc(hidden)]
2163pub struct CatchNode<Prev, S> {
2164    pub(crate) prev: Prev,
2165    pub(crate) step: S,
2166}
2167
2168impl<In, T, E, Prev, S> ChainCall<In> for CatchNode<Prev, S>
2169where
2170    Prev: ChainCall<In, Out = Result<T, E>>,
2171    S: StepCall<E, Out = ()>,
2172{
2173    type Out = Option<T>;
2174    #[inline(always)]
2175    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
2176        match self.prev.call(world, input) {
2177            Ok(val) => Some(val),
2178            Err(err) => {
2179                self.step.call(world, err);
2180                None
2181            }
2182        }
2183    }
2184}
2185
2186/// Chain node for `.map_err()`.
2187#[doc(hidden)]
2188pub struct MapErrNode<Prev, S> {
2189    pub(crate) prev: Prev,
2190    pub(crate) step: S,
2191}
2192
2193impl<In, T, E, Prev, S> ChainCall<In> for MapErrNode<Prev, S>
2194where
2195    Prev: ChainCall<In, Out = Result<T, E>>,
2196    S: StepCall<E>,
2197{
2198    type Out = Result<T, S::Out>;
2199    #[inline(always)]
2200    fn call(&mut self, world: &mut World, input: In) -> Result<T, S::Out> {
2201        self.prev
2202            .call(world, input)
2203            .map_err(|err| self.step.call(world, err))
2204    }
2205}
2206
2207/// Chain node for `.or_else()`.
2208#[doc(hidden)]
2209pub struct OrElseNode<Prev, S> {
2210    pub(crate) prev: Prev,
2211    pub(crate) step: S,
2212}
2213
2214impl<In, T, E, E2, Prev, S> ChainCall<In> for OrElseNode<Prev, S>
2215where
2216    Prev: ChainCall<In, Out = Result<T, E>>,
2217    S: StepCall<E, Out = Result<T, E2>>,
2218{
2219    type Out = Result<T, E2>;
2220    #[inline(always)]
2221    fn call(&mut self, world: &mut World, input: In) -> Result<T, E2> {
2222        self.prev
2223            .call(world, input)
2224            .or_else(|err| self.step.call(world, err))
2225    }
2226}
2227
2228/// Chain node for `.inspect()` on `Result<T, E>`.
2229#[doc(hidden)]
2230pub struct InspectResultNode<Prev, S> {
2231    pub(crate) prev: Prev,
2232    pub(crate) step: S,
2233}
2234
2235impl<In, T, E, Prev, S> ChainCall<In> for InspectResultNode<Prev, S>
2236where
2237    Prev: ChainCall<In, Out = Result<T, E>>,
2238    S: RefStepCall<T, Out = ()>,
2239{
2240    type Out = Result<T, E>;
2241    #[inline(always)]
2242    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
2243        self.prev
2244            .call(world, input)
2245            .inspect(|val| self.step.call(world, val))
2246    }
2247}
2248
2249/// Chain node for `.inspect_err()`.
2250#[doc(hidden)]
2251pub struct InspectErrNode<Prev, S> {
2252    pub(crate) prev: Prev,
2253    pub(crate) step: S,
2254}
2255
2256impl<In, T, E, Prev, S> ChainCall<In> for InspectErrNode<Prev, S>
2257where
2258    Prev: ChainCall<In, Out = Result<T, E>>,
2259    S: RefStepCall<E, Out = ()>,
2260{
2261    type Out = Result<T, E>;
2262    #[inline(always)]
2263    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
2264        self.prev
2265            .call(world, input)
2266            .inspect_err(|err| self.step.call(world, err))
2267    }
2268}
2269
2270/// Chain node for `.ok()` — `Result<T, E>` → `Option<T>`.
2271#[doc(hidden)]
2272pub struct OkResultNode<Prev> {
2273    pub(crate) prev: Prev,
2274}
2275
2276impl<In, T, E, Prev: ChainCall<In, Out = Result<T, E>>> ChainCall<In> for OkResultNode<Prev> {
2277    type Out = Option<T>;
2278    #[inline(always)]
2279    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
2280        self.prev.call(world, input).ok()
2281    }
2282}
2283
2284/// Chain node for `.unwrap_or()` on `Result<T, E>`.
2285#[doc(hidden)]
2286pub struct UnwrapOrResultNode<Prev, T> {
2287    pub(crate) prev: Prev,
2288    pub(crate) default: T,
2289}
2290
2291impl<In, T: Clone, E, Prev: ChainCall<In, Out = Result<T, E>>> ChainCall<In>
2292    for UnwrapOrResultNode<Prev, T>
2293{
2294    type Out = T;
2295    #[inline(always)]
2296    fn call(&mut self, world: &mut World, input: In) -> T {
2297        self.prev
2298            .call(world, input)
2299            .unwrap_or_else(|_| self.default.clone())
2300    }
2301}
2302
2303/// Chain node for `.unwrap_or_else()` on `Result<T, E>`.
2304#[doc(hidden)]
2305pub struct UnwrapOrElseResultNode<Prev, S> {
2306    pub(crate) prev: Prev,
2307    pub(crate) step: S,
2308}
2309
2310impl<In, T, E, Prev, S> ChainCall<In> for UnwrapOrElseResultNode<Prev, S>
2311where
2312    Prev: ChainCall<In, Out = Result<T, E>>,
2313    S: StepCall<E, Out = T>,
2314{
2315    type Out = T;
2316    #[inline(always)]
2317    fn call(&mut self, world: &mut World, input: In) -> T {
2318        match self.prev.call(world, input) {
2319            Ok(val) => val,
2320            Err(err) => self.step.call(world, err),
2321        }
2322    }
2323}
2324
2325// -- Bool nodes --------------------------------------------------------------
2326
2327/// Chain node for `.not()`.
2328#[doc(hidden)]
2329pub struct NotNode<Prev> {
2330    pub(crate) prev: Prev,
2331}
2332
2333impl<In, Prev: ChainCall<In, Out = bool>> ChainCall<In> for NotNode<Prev> {
2334    type Out = bool;
2335    #[inline(always)]
2336    fn call(&mut self, world: &mut World, input: In) -> bool {
2337        !self.prev.call(world, input)
2338    }
2339}
2340
2341/// Chain node for `.and()` on bool.
2342#[doc(hidden)]
2343pub struct AndBoolNode<Prev, P> {
2344    pub(crate) prev: Prev,
2345    pub(crate) producer: P,
2346}
2347
2348impl<In, Prev, P> ChainCall<In> for AndBoolNode<Prev, P>
2349where
2350    Prev: ChainCall<In, Out = bool>,
2351    P: ProducerCall<Out = bool>,
2352{
2353    type Out = bool;
2354    #[inline(always)]
2355    fn call(&mut self, world: &mut World, input: In) -> bool {
2356        self.prev.call(world, input) && self.producer.call(world)
2357    }
2358}
2359
2360/// Chain node for `.or()` on bool.
2361#[doc(hidden)]
2362pub struct OrBoolNode<Prev, P> {
2363    pub(crate) prev: Prev,
2364    pub(crate) producer: P,
2365}
2366
2367impl<In, Prev, P> ChainCall<In> for OrBoolNode<Prev, P>
2368where
2369    Prev: ChainCall<In, Out = bool>,
2370    P: ProducerCall<Out = bool>,
2371{
2372    type Out = bool;
2373    #[inline(always)]
2374    fn call(&mut self, world: &mut World, input: In) -> bool {
2375        self.prev.call(world, input) || self.producer.call(world)
2376    }
2377}
2378
2379/// Chain node for `.xor()` on bool.
2380#[doc(hidden)]
2381pub struct XorBoolNode<Prev, P> {
2382    pub(crate) prev: Prev,
2383    pub(crate) producer: P,
2384}
2385
2386impl<In, Prev, P> ChainCall<In> for XorBoolNode<Prev, P>
2387where
2388    Prev: ChainCall<In, Out = bool>,
2389    P: ProducerCall<Out = bool>,
2390{
2391    type Out = bool;
2392    #[inline(always)]
2393    fn call(&mut self, world: &mut World, input: In) -> bool {
2394        self.prev.call(world, input) ^ self.producer.call(world)
2395    }
2396}
2397
2398// -- Cloned nodes ------------------------------------------------------------
2399
2400/// Chain node for `.cloned()` on `&T`.
2401#[doc(hidden)]
2402pub struct ClonedNode<Prev> {
2403    pub(crate) prev: Prev,
2404}
2405
2406impl<'a, In, T: Clone + 'a, Prev: ChainCall<In, Out = &'a T>> ChainCall<In> for ClonedNode<Prev> {
2407    type Out = T;
2408    #[inline(always)]
2409    fn call(&mut self, world: &mut World, input: In) -> T {
2410        T::clone(self.prev.call(world, input))
2411    }
2412}
2413
2414/// Chain node for `.cloned()` on `Option<&T>`.
2415#[doc(hidden)]
2416pub struct ClonedOptionNode<Prev> {
2417    pub(crate) prev: Prev,
2418}
2419
2420impl<'a, In, T: Clone + 'a, Prev: ChainCall<In, Out = Option<&'a T>>> ChainCall<In>
2421    for ClonedOptionNode<Prev>
2422{
2423    type Out = Option<T>;
2424    #[inline(always)]
2425    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
2426        self.prev.call(world, input).cloned()
2427    }
2428}
2429
2430/// Chain node for `.cloned()` on `Result<&T, E>`.
2431#[doc(hidden)]
2432pub struct ClonedResultNode<Prev> {
2433    pub(crate) prev: Prev,
2434}
2435
2436impl<'a, In, T: Clone + 'a, E, Prev: ChainCall<In, Out = Result<&'a T, E>>> ChainCall<In>
2437    for ClonedResultNode<Prev>
2438{
2439    type Out = Result<T, E>;
2440    #[inline(always)]
2441    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
2442        self.prev.call(world, input).cloned()
2443    }
2444}
2445
2446// -- DAG-specific nodes (borrow intermediate) --------------------------------
2447//
2448// DAG steps borrow `&Prev::Out` instead of consuming by value. These nodes
2449// parallel the pipeline nodes above but use `for<'a> StepCall<&'a T>` bounds
2450// and HRTB arm chains (`for<'a> ChainCall<&'a T>`).
2451
2452/// Chain node for DAG `.then()` — borrows intermediate output for next step.
2453///
2454/// Unlike pipeline's [`ThenNode`] which passes by value, this borrows `&Prev::Out`
2455/// for the step. Used for DAG chains where intermediates are owned and borrowed.
2456#[doc(hidden)]
2457pub struct DagThenNode<Prev, S, NewOut> {
2458    pub(crate) prev: Prev,
2459    pub(crate) step: S,
2460    pub(crate) _out: PhantomData<fn() -> NewOut>,
2461}
2462
2463impl<In, Prev, S, NewOut: 'static> ChainCall<In> for DagThenNode<Prev, S, NewOut>
2464where
2465    Prev: ChainCall<In>,
2466    Prev::Out: 'static,
2467    S: for<'a> StepCall<&'a Prev::Out, Out = NewOut>,
2468{
2469    type Out = NewOut;
2470    #[inline(always)]
2471    fn call(&mut self, world: &mut World, input: In) -> NewOut {
2472        let out = self.prev.call(world, input);
2473        self.step.call(world, &out)
2474    }
2475}
2476
2477/// Chain node for DAG `.scan()` — scan with borrowed input.
2478///
2479/// Like [`ScanNode`] but the step receives `&Prev::Out` via [`RefScanStepCall`].
2480#[doc(hidden)]
2481pub struct RefScanNode<Prev, S, Acc> {
2482    pub(crate) prev: Prev,
2483    pub(crate) step: S,
2484    pub(crate) acc: Acc,
2485}
2486
2487impl<In, Prev, S, Acc> ChainCall<In> for RefScanNode<Prev, S, Acc>
2488where
2489    Prev: ChainCall<In>,
2490    S: RefScanStepCall<Acc, Prev::Out>,
2491{
2492    type Out = S::Out;
2493    #[inline(always)]
2494    fn call(&mut self, world: &mut World, input: In) -> S::Out {
2495        let val = self.prev.call(world, input);
2496        self.step.call(world, &mut self.acc, &val)
2497    }
2498}
2499
2500/// Chain node for DAG `.route()` — arms borrow `&Prev::Out` (HRTB).
2501///
2502/// Unlike pipeline's [`RouteNode`] which passes by value, this borrows
2503/// the value for the predicate and arms. Arms satisfy `for<'a> ChainCall<&'a Out>`.
2504#[doc(hidden)]
2505pub struct DagRouteNode<Prev, P, C0, C1, NewOut> {
2506    pub(crate) prev: Prev,
2507    pub(crate) pred: P,
2508    pub(crate) on_true: C0,
2509    pub(crate) on_false: C1,
2510    pub(crate) _out: PhantomData<fn() -> NewOut>,
2511}
2512
2513impl<In, Prev, P, C0, C1, NewOut> ChainCall<In> for DagRouteNode<Prev, P, C0, C1, NewOut>
2514where
2515    Prev: ChainCall<In>,
2516    Prev::Out: 'static,
2517    P: RefStepCall<Prev::Out, Out = bool>,
2518    C0: for<'a> ChainCall<&'a Prev::Out, Out = NewOut>,
2519    C1: for<'a> ChainCall<&'a Prev::Out, Out = NewOut>,
2520{
2521    type Out = NewOut;
2522    #[inline(always)]
2523    fn call(&mut self, world: &mut World, input: In) -> NewOut {
2524        let val = self.prev.call(world, input);
2525        if self.pred.call(world, &val) {
2526            self.on_true.call(world, &val)
2527        } else {
2528            self.on_false.call(world, &val)
2529        }
2530    }
2531}
2532
2533/// Chain node for DAG `.map()` on `Option<T>` — step borrows `&T`.
2534///
2535/// Like [`MapOptionNode`] but the step receives `&T` instead of `T` by value.
2536#[doc(hidden)]
2537pub struct DagMapOptionNode<Prev, S, U> {
2538    pub(crate) prev: Prev,
2539    pub(crate) step: S,
2540    pub(crate) _out: PhantomData<fn() -> U>,
2541}
2542
2543impl<In, T: 'static, U: 'static, Prev, S> ChainCall<In> for DagMapOptionNode<Prev, S, U>
2544where
2545    Prev: ChainCall<In, Out = Option<T>>,
2546    S: for<'a> StepCall<&'a T, Out = U>,
2547{
2548    type Out = Option<U>;
2549    #[inline(always)]
2550    fn call(&mut self, world: &mut World, input: In) -> Option<U> {
2551        self.prev
2552            .call(world, input)
2553            .map(|ref val| self.step.call(world, val))
2554    }
2555}
2556
2557/// Chain node for DAG `.and_then()` on `Option<T>` — step borrows `&T`.
2558#[doc(hidden)]
2559pub struct DagAndThenOptionNode<Prev, S, U> {
2560    pub(crate) prev: Prev,
2561    pub(crate) step: S,
2562    pub(crate) _out: PhantomData<fn() -> U>,
2563}
2564
2565impl<In, T: 'static, U: 'static, Prev, S> ChainCall<In> for DagAndThenOptionNode<Prev, S, U>
2566where
2567    Prev: ChainCall<In, Out = Option<T>>,
2568    S: for<'a> StepCall<&'a T, Out = Option<U>>,
2569{
2570    type Out = Option<U>;
2571    #[inline(always)]
2572    fn call(&mut self, world: &mut World, input: In) -> Option<U> {
2573        self.prev
2574            .call(world, input)
2575            .and_then(|ref val| self.step.call(world, val))
2576    }
2577}
2578
2579/// Chain node for DAG `.map()` on `Result<T, E>` — step borrows `&T`.
2580#[doc(hidden)]
2581pub struct DagMapResultNode<Prev, S, U> {
2582    pub(crate) prev: Prev,
2583    pub(crate) step: S,
2584    pub(crate) _out: PhantomData<fn() -> U>,
2585}
2586
2587impl<In, T: 'static, E, U: 'static, Prev, S> ChainCall<In> for DagMapResultNode<Prev, S, U>
2588where
2589    Prev: ChainCall<In, Out = Result<T, E>>,
2590    S: for<'a> StepCall<&'a T, Out = U>,
2591{
2592    type Out = Result<U, E>;
2593    #[inline(always)]
2594    fn call(&mut self, world: &mut World, input: In) -> Result<U, E> {
2595        self.prev
2596            .call(world, input)
2597            .map(|ref val| self.step.call(world, val))
2598    }
2599}
2600
2601/// Chain node for DAG `.and_then()` on `Result<T, E>` — step borrows `&T`.
2602#[doc(hidden)]
2603pub struct DagAndThenResultNode<Prev, S, U> {
2604    pub(crate) prev: Prev,
2605    pub(crate) step: S,
2606    pub(crate) _out: PhantomData<fn() -> U>,
2607}
2608
2609impl<In, T: 'static, E, U: 'static, Prev, S> ChainCall<In> for DagAndThenResultNode<Prev, S, U>
2610where
2611    Prev: ChainCall<In, Out = Result<T, E>>,
2612    S: for<'a> StepCall<&'a T, Out = Result<U, E>>,
2613{
2614    type Out = Result<U, E>;
2615    #[inline(always)]
2616    fn call(&mut self, world: &mut World, input: In) -> Result<U, E> {
2617        self.prev
2618            .call(world, input)
2619            .and_then(|ref val| self.step.call(world, val))
2620    }
2621}
2622
2623/// Chain node for DAG `.catch()` on `Result<T, E>` — error handler borrows `&E`.
2624///
2625/// Like [`CatchNode`] but the step receives `&E` instead of consuming `E`.
2626#[doc(hidden)]
2627pub struct DagCatchNode<Prev, S> {
2628    pub(crate) prev: Prev,
2629    pub(crate) step: S,
2630}
2631
2632impl<In, T, E: 'static, Prev, S> ChainCall<In> for DagCatchNode<Prev, S>
2633where
2634    Prev: ChainCall<In, Out = Result<T, E>>,
2635    S: for<'a> StepCall<&'a E, Out = ()>,
2636{
2637    type Out = Option<T>;
2638    #[inline(always)]
2639    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
2640        match self.prev.call(world, input) {
2641            Ok(val) => Some(val),
2642            Err(ref err) => {
2643                self.step.call(world, err);
2644                None
2645            }
2646        }
2647    }
2648}
2649
2650// -- Terminal nodes ----------------------------------------------------------
2651
2652/// Chain node for `build()` on `Option<()>` — discards the option wrapper.
2653#[doc(hidden)]
2654pub struct DiscardOptionNode<Prev> {
2655    pub(crate) prev: Prev,
2656}
2657
2658impl<In, Prev: ChainCall<In, Out = Option<()>>> ChainCall<In> for DiscardOptionNode<Prev> {
2659    type Out = ();
2660    #[inline(always)]
2661    fn call(&mut self, world: &mut World, input: In) {
2662        let _ = self.prev.call(world, input);
2663    }
2664}
2665
2666// =============================================================================
2667// PipelineBuilder — entry point
2668// =============================================================================
2669
2670/// Entry point for building a pre-resolved step pipeline.
2671///
2672/// `In` is the pipeline input type. Call [`.then()`](Self::then) to add
2673/// the first step — a named function whose [`Param`] dependencies
2674/// are resolved from the registry at build time.
2675///
2676/// # Examples
2677///
2678/// ```
2679/// use nexus_rt::{WorldBuilder, Res, ResMut, PipelineBuilder, Handler, Resource};
2680///
2681/// #[derive(Resource)]
2682/// struct Factor(u64);
2683/// #[derive(Resource)]
2684/// struct Output(String);
2685///
2686/// let mut wb = WorldBuilder::new();
2687/// wb.register(Factor(10));
2688/// wb.register(Output(String::new()));
2689/// let mut world = wb.build();
2690///
2691/// fn double(factor: Res<Factor>, x: u32) -> u64 {
2692///     factor.0 * x as u64
2693/// }
2694/// fn store(mut out: ResMut<Output>, val: u64) {
2695///     out.0 = val.to_string();
2696/// }
2697///
2698/// let r = world.registry();
2699/// let mut pipeline = PipelineBuilder::<u32>::new()
2700///     .then(double, r)
2701///     .then(store, r)
2702///     .build();
2703///
2704/// pipeline.run(&mut world, 5);
2705/// assert_eq!(world.resource::<Output>().0.as_str(), "50");
2706/// ```
2707#[must_use = "a pipeline builder does nothing unless you chain steps and call .build()"]
2708pub struct PipelineBuilder<In>(PhantomData<fn(In)>);
2709
2710impl<In> PipelineBuilder<In> {
2711    /// Create a new step pipeline entry point.
2712    pub fn new() -> Self {
2713        Self(PhantomData)
2714    }
2715
2716    /// Add the first step. Params resolved from the registry.
2717    pub fn then<Out, Params, S: IntoStep<In, Out, Params>>(
2718        self,
2719        f: S,
2720        registry: &Registry,
2721    ) -> PipelineChain<In, Out, ThenNode<IdentityNode, S::Step>> {
2722        PipelineChain {
2723            chain: ThenNode {
2724                prev: IdentityNode,
2725                step: f.into_step(registry),
2726            },
2727            _marker: PhantomData,
2728        }
2729    }
2730
2731    /// Add the first step as a scan with persistent accumulator.
2732    /// The step receives `&mut Acc` and the input, returning the output.
2733    /// State persists across invocations.
2734    pub fn scan<Acc, Out, Params, S>(
2735        self,
2736        initial: Acc,
2737        f: S,
2738        registry: &Registry,
2739    ) -> PipelineChain<In, Out, ScanNode<IdentityNode, S::Step, Acc>>
2740    where
2741        Acc: 'static,
2742        S: IntoScanStep<Acc, In, Out, Params>,
2743    {
2744        PipelineChain {
2745            chain: ScanNode {
2746                prev: IdentityNode,
2747                step: f.into_scan_step(registry),
2748                acc: initial,
2749            },
2750            _marker: PhantomData,
2751        }
2752    }
2753}
2754
2755impl<In> Default for PipelineBuilder<In> {
2756    fn default() -> Self {
2757        Self::new()
2758    }
2759}
2760
2761// =============================================================================
2762// PipelineChain — typestate builder
2763// =============================================================================
2764
2765/// Builder that composes pre-resolved pipeline steps via named chain nodes.
2766///
2767/// `In` is the pipeline's input type (fixed). `Out` is the current output.
2768/// `Chain` is the concrete chain type (nested named nodes, like iterator adapters).
2769///
2770/// Each combinator consumes `self`, wraps the previous chain in a new named
2771/// node, and returns a new `PipelineChain`. The compiler monomorphizes the
2772/// entire chain — zero virtual dispatch through steps. Named types (not
2773/// closures) preserve HRTB: `Pipeline<Chain>` can satisfy `for<'a> Handler<&'a T>`.
2774///
2775/// IntoStep-based methods (`.then()`, `.map()`, `.and_then()`, `.catch()`)
2776/// take `&Registry` to resolve Param state at build time. Closure-based
2777/// methods don't need the registry.
2778#[must_use = "pipeline chain does nothing until .build() is called"]
2779pub struct PipelineChain<In, Out, Chain> {
2780    pub(crate) chain: Chain,
2781    pub(crate) _marker: PhantomData<fn(In) -> Out>,
2782}
2783
2784// =============================================================================
2785// Core — any Out
2786// =============================================================================
2787
2788impl<In, Out, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
2789    /// Add a step. Params resolved from the registry.
2790    pub fn then<NewOut, Params, S: IntoStep<Out, NewOut, Params>>(
2791        self,
2792        f: S,
2793        registry: &Registry,
2794    ) -> PipelineChain<In, NewOut, ThenNode<Chain, S::Step>> {
2795        PipelineChain {
2796            chain: ThenNode {
2797                prev: self.chain,
2798                step: f.into_step(registry),
2799            },
2800            _marker: PhantomData,
2801        }
2802    }
2803
2804    /// Run the pipeline directly. No boxing, no `'static` on `In`.
2805    pub fn run(&mut self, world: &mut World, input: In) -> Out {
2806        self.chain.call(world, input)
2807    }
2808
2809    /// Dispatch pipeline output to a [`Handler<Out>`].
2810    ///
2811    /// Connects a pipeline's output to any handler — [`HandlerFn`](crate::HandlerFn),
2812    /// [`Callback`](crate::Callback), [`Pipeline`], or a combinator like
2813    /// [`fan_out!`](crate::fan_out).
2814    pub fn dispatch<H: Handler<Out>>(
2815        self,
2816        handler: H,
2817    ) -> PipelineChain<In, (), DispatchNode<Chain, H>> {
2818        PipelineChain {
2819            chain: DispatchNode {
2820                prev: self.chain,
2821                handler,
2822            },
2823            _marker: PhantomData,
2824        }
2825    }
2826
2827    /// Conditionally wrap the output in `Option`. `Some(val)` if
2828    /// the predicate returns true, `None` otherwise.
2829    ///
2830    /// Enters Option-combinator land — follow with `.map()`,
2831    /// `.and_then()`, `.filter()`, `.unwrap_or()`, etc.
2832    ///
2833    /// # Common Mistakes
2834    ///
2835    /// Guard takes `&In`, not `In` — the value passes through unchanged.
2836    ///
2837    /// ```compile_fail
2838    /// # use nexus_rt::{PipelineBuilder, WorldBuilder};
2839    /// # let mut wb = WorldBuilder::new();
2840    /// # let world = wb.build();
2841    /// # let reg = world.registry();
2842    /// // ERROR: takes u32 by value, should be &u32
2843    /// PipelineBuilder::<u32>::new()
2844    ///     .then(|x: u32| x, &reg)
2845    ///     .guard(|x: u32| x > 10, &reg);
2846    /// ```
2847    ///
2848    /// Fix: take by reference:
2849    /// ```ignore
2850    /// PipelineBuilder::<u32>::new()
2851    ///     .then(|x: u32| x, &reg)
2852    ///     .guard(|x: &u32| *x > 10, &reg);
2853    /// ```
2854    pub fn guard<Params, S: IntoRefStep<Out, bool, Params>>(
2855        self,
2856        f: S,
2857        registry: &Registry,
2858    ) -> PipelineChain<In, Option<Out>, GuardNode<Chain, S::Step>> {
2859        PipelineChain {
2860            chain: GuardNode {
2861                prev: self.chain,
2862                step: f.into_ref_step(registry),
2863            },
2864            _marker: PhantomData,
2865        }
2866    }
2867
2868    /// Observe the current value without consuming or changing it.
2869    ///
2870    /// The step receives `&Out`. The value passes through unchanged.
2871    /// Useful for logging, metrics, or debugging mid-chain.
2872    ///
2873    /// # Common Mistakes
2874    ///
2875    /// Tap takes `&In`, not `In`:
2876    /// ```compile_fail
2877    /// # use nexus_rt::{PipelineBuilder, WorldBuilder};
2878    /// # let mut wb = WorldBuilder::new();
2879    /// # let world = wb.build();
2880    /// # let reg = world.registry();
2881    /// // ERROR: takes u32 by value
2882    /// PipelineBuilder::<u32>::new()
2883    ///     .then(|x: u32| x, &reg)
2884    ///     .tap(|x: u32| println!("{x}"), &reg);
2885    /// ```
2886    pub fn tap<Params, S: IntoRefStep<Out, (), Params>>(
2887        self,
2888        f: S,
2889        registry: &Registry,
2890    ) -> PipelineChain<In, Out, TapNode<Chain, S::Step>> {
2891        PipelineChain {
2892            chain: TapNode {
2893                prev: self.chain,
2894                step: f.into_ref_step(registry),
2895            },
2896            _marker: PhantomData,
2897        }
2898    }
2899
2900    /// Binary conditional routing. Evaluates the predicate on the
2901    /// current value, then moves it into exactly one of two arms.
2902    ///
2903    /// Both arms must produce the same output type. Build each arm as
2904    /// a sub-pipeline from [`PipelineBuilder`]. For N-ary routing, nest
2905    /// `route` calls in the false arm.
2906    ///
2907    /// ```ignore
2908    /// let large = PipelineBuilder::new().then(large_check, reg).then(submit, reg);
2909    /// let small = PipelineBuilder::new().then(submit, reg);
2910    ///
2911    /// PipelineBuilder::<Order>::new()
2912    ///     .then(validate, reg)
2913    ///     .route(|order: &Order| order.size > 1000, reg, large, small)
2914    ///     .build();
2915    /// ```
2916    pub fn route<NewOut, C0, C1, Params, Pred: IntoRefStep<Out, bool, Params>>(
2917        self,
2918        pred: Pred,
2919        registry: &Registry,
2920        on_true: PipelineChain<Out, NewOut, C0>,
2921        on_false: PipelineChain<Out, NewOut, C1>,
2922    ) -> PipelineChain<In, NewOut, RouteNode<Chain, Pred::Step, C0, C1>>
2923    where
2924        C0: ChainCall<Out, Out = NewOut>,
2925        C1: ChainCall<Out, Out = NewOut>,
2926    {
2927        PipelineChain {
2928            chain: RouteNode {
2929                prev: self.chain,
2930                pred: pred.into_ref_step(registry),
2931                on_true: on_true.chain,
2932                on_false: on_false.chain,
2933            },
2934            _marker: PhantomData,
2935        }
2936    }
2937
2938    /// Fork off a multi-step side-effect chain. The arm borrows
2939    /// `&Out`, runs to completion (producing `()`), and the
2940    /// original value passes through unchanged.
2941    ///
2942    /// Multi-step version of [`tap`](Self::tap) — the arm has the
2943    /// full DAG combinator API with Param resolution. Build with
2944    /// [`DagArmSeed::new()`](crate::dag::DagArmSeed::new).
2945    pub fn tee<C>(self, side: DagArm<Out, (), C>) -> PipelineChain<In, Out, TeeNode<Chain, C>>
2946    where
2947        C: for<'a> ChainCall<&'a Out, Out = ()>,
2948    {
2949        PipelineChain {
2950            chain: TeeNode {
2951                prev: self.chain,
2952                side: side.chain,
2953            },
2954            _marker: PhantomData,
2955        }
2956    }
2957
2958    /// Scan with persistent accumulator. The step receives `&mut Acc`
2959    /// and the current value, returning the new output. State persists
2960    /// across invocations.
2961    ///
2962    /// # Examples
2963    ///
2964    /// ```ignore
2965    /// // Running sum — suppress values below threshold
2966    /// PipelineBuilder::<u64>::new()
2967    ///     .then(identity, reg)
2968    ///     .scan(0u64, |acc: &mut u64, val: u64| {
2969    ///         *acc += val;
2970    ///         if *acc > 100 { Some(*acc) } else { None }
2971    ///     }, reg)
2972    ///     .build();
2973    /// ```
2974    pub fn scan<Acc, NewOut, Params, S>(
2975        self,
2976        initial: Acc,
2977        f: S,
2978        registry: &Registry,
2979    ) -> PipelineChain<In, NewOut, ScanNode<Chain, S::Step, Acc>>
2980    where
2981        Acc: 'static,
2982        S: IntoScanStep<Acc, Out, NewOut, Params>,
2983    {
2984        PipelineChain {
2985            chain: ScanNode {
2986                prev: self.chain,
2987                step: f.into_scan_step(registry),
2988                acc: initial,
2989            },
2990            _marker: PhantomData,
2991        }
2992    }
2993}
2994
2995// =============================================================================
2996// Splat — tuple destructuring into individual function arguments
2997// =============================================================================
2998//
2999// `.splat()` transitions from a tuple output to a builder whose `.then()`
3000// accepts a function taking the tuple elements as individual arguments.
3001// After `.splat().then(f, reg)`, the user is back on PipelineChain.
3002//
3003// Builder types are `#[doc(hidden)]` — users only see `.splat().then()`.
3004
3005// -- Splat builder types ------------------------------------------------------
3006
3007macro_rules! define_splat_builders {
3008    (
3009        $N:literal,
3010        start: $SplatStart:ident,
3011        mid: $SplatBuilder:ident,
3012        node: $SplatThenNode:ident,
3013        into_trait: $IntoSplatStep:ident,
3014        call_trait: $SplatCall:ident,
3015        ($($T:ident),+),
3016        ($($idx:tt),+)
3017    ) => {
3018        /// Chain node for `.splat().then()` — destructures tuple into individual arguments.
3019        #[doc(hidden)]
3020        pub struct $SplatThenNode<Prev, S> {
3021            prev: Prev,
3022            step: S,
3023        }
3024
3025        impl<In, $($T,)+ Prev, S> ChainCall<In> for $SplatThenNode<Prev, S>
3026        where
3027            Prev: ChainCall<In, Out = ($($T,)+)>,
3028            S: $SplatCall<$($T),+>,
3029        {
3030            type Out = S::Out;
3031            #[inline(always)]
3032            fn call(&mut self, world: &mut World, input: In) -> S::Out {
3033                let tuple = self.prev.call(world, input);
3034                self.step.call_splat(world, $(tuple.$idx),+)
3035            }
3036        }
3037
3038        /// Splat builder at pipeline start position.
3039        #[doc(hidden)]
3040        pub struct $SplatStart<$($T),+>(PhantomData<fn(($($T,)+))>);
3041
3042        impl<$($T),+> $SplatStart<$($T),+> {
3043            /// Add a step that receives the tuple elements as individual arguments.
3044            pub fn then<Out, Params, S>(
3045                self,
3046                f: S,
3047                registry: &Registry,
3048            ) -> PipelineChain<($($T,)+), Out, $SplatThenNode<IdentityNode, S::Step>>
3049            where
3050                S: $IntoSplatStep<$($T,)+ Out, Params>,
3051            {
3052                PipelineChain {
3053                    chain: $SplatThenNode {
3054                        prev: IdentityNode,
3055                        step: f.into_splat_step(registry),
3056                    },
3057                    _marker: PhantomData,
3058                }
3059            }
3060        }
3061
3062        impl<$($T),+> PipelineBuilder<($($T,)+)> {
3063            /// Destructure the tuple input into individual function arguments.
3064            pub fn splat(self) -> $SplatStart<$($T),+> {
3065                $SplatStart(PhantomData)
3066            }
3067        }
3068
3069        /// Splat builder at mid-chain position.
3070        #[doc(hidden)]
3071        pub struct $SplatBuilder<In, $($T,)+ Chain> {
3072            chain: Chain,
3073            _marker: PhantomData<fn(In) -> ($($T,)+)>,
3074        }
3075
3076        impl<In, $($T,)+ Chain: ChainCall<In, Out = ($($T,)+)>> $SplatBuilder<In, $($T,)+ Chain> {
3077            /// Add a step that receives the tuple elements as individual arguments.
3078            pub fn then<Out, Params, S>(
3079                self,
3080                f: S,
3081                registry: &Registry,
3082            ) -> PipelineChain<In, Out, $SplatThenNode<Chain, S::Step>>
3083            where
3084                S: $IntoSplatStep<$($T,)+ Out, Params>,
3085            {
3086                PipelineChain {
3087                    chain: $SplatThenNode {
3088                        prev: self.chain,
3089                        step: f.into_splat_step(registry),
3090                    },
3091                    _marker: PhantomData,
3092                }
3093            }
3094        }
3095
3096        impl<In, $($T,)+ Chain: ChainCall<In, Out = ($($T,)+)>> PipelineChain<In, ($($T,)+), Chain> {
3097            /// Destructure the tuple output into individual function arguments.
3098            pub fn splat(self) -> $SplatBuilder<In, $($T,)+ Chain> {
3099                $SplatBuilder {
3100                    chain: self.chain,
3101                    _marker: PhantomData,
3102                }
3103            }
3104        }
3105    };
3106}
3107
3108define_splat_builders!(2,
3109    start: SplatStart2,
3110    mid: SplatBuilder2,
3111    node: SplatThenNode2,
3112    into_trait: IntoSplatStep2,
3113    call_trait: SplatCall2,
3114    (A, B),
3115    (0, 1)
3116);
3117
3118define_splat_builders!(3,
3119    start: SplatStart3,
3120    mid: SplatBuilder3,
3121    node: SplatThenNode3,
3122    into_trait: IntoSplatStep3,
3123    call_trait: SplatCall3,
3124    (A, B, C),
3125    (0, 1, 2)
3126);
3127
3128define_splat_builders!(4,
3129    start: SplatStart4,
3130    mid: SplatBuilder4,
3131    node: SplatThenNode4,
3132    into_trait: IntoSplatStep4,
3133    call_trait: SplatCall4,
3134    (A, B, C, D),
3135    (0, 1, 2, 3)
3136);
3137
3138define_splat_builders!(5,
3139    start: SplatStart5,
3140    mid: SplatBuilder5,
3141    node: SplatThenNode5,
3142    into_trait: IntoSplatStep5,
3143    call_trait: SplatCall5,
3144    (A, B, C, D, E),
3145    (0, 1, 2, 3, 4)
3146);
3147
3148// =============================================================================
3149// Dedup — suppress unchanged values
3150// =============================================================================
3151
3152impl<In, Out: PartialEq + Clone, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
3153    /// Suppress consecutive unchanged values. Returns `Some(val)`
3154    /// when the value differs from the previous invocation, `None`
3155    /// when unchanged. First invocation always returns `Some`.
3156    ///
3157    /// Requires `PartialEq + Clone` — the previous value is stored
3158    /// internally for comparison.
3159    pub fn dedup(self) -> PipelineChain<In, Option<Out>, DedupNode<Chain, Out>> {
3160        PipelineChain {
3161            chain: DedupNode {
3162                prev: self.chain,
3163                last: None,
3164            },
3165            _marker: PhantomData,
3166        }
3167    }
3168}
3169
3170// =============================================================================
3171// Bool combinators
3172// =============================================================================
3173
3174impl<In, Chain: ChainCall<In, Out = bool>> PipelineChain<In, bool, Chain> {
3175    /// Invert a boolean value.
3176    #[allow(clippy::should_implement_trait)]
3177    pub fn not(self) -> PipelineChain<In, bool, NotNode<Chain>> {
3178        PipelineChain {
3179            chain: NotNode { prev: self.chain },
3180            _marker: PhantomData,
3181        }
3182    }
3183
3184    /// Short-circuit AND with a second boolean.
3185    ///
3186    /// If the chain produces `false`, the step is not called.
3187    pub fn and<Params, S: IntoProducer<bool, Params>>(
3188        self,
3189        f: S,
3190        registry: &Registry,
3191    ) -> PipelineChain<In, bool, AndBoolNode<Chain, S::Step>> {
3192        PipelineChain {
3193            chain: AndBoolNode {
3194                prev: self.chain,
3195                producer: f.into_producer(registry),
3196            },
3197            _marker: PhantomData,
3198        }
3199    }
3200
3201    /// Short-circuit OR with a second boolean.
3202    ///
3203    /// If the chain produces `true`, the step is not called.
3204    pub fn or<Params, S: IntoProducer<bool, Params>>(
3205        self,
3206        f: S,
3207        registry: &Registry,
3208    ) -> PipelineChain<In, bool, OrBoolNode<Chain, S::Step>> {
3209        PipelineChain {
3210            chain: OrBoolNode {
3211                prev: self.chain,
3212                producer: f.into_producer(registry),
3213            },
3214            _marker: PhantomData,
3215        }
3216    }
3217
3218    /// XOR with a second boolean.
3219    ///
3220    /// Both sides are always evaluated.
3221    pub fn xor<Params, S: IntoProducer<bool, Params>>(
3222        self,
3223        f: S,
3224        registry: &Registry,
3225    ) -> PipelineChain<In, bool, XorBoolNode<Chain, S::Step>> {
3226        PipelineChain {
3227            chain: XorBoolNode {
3228                prev: self.chain,
3229                producer: f.into_producer(registry),
3230            },
3231            _marker: PhantomData,
3232        }
3233    }
3234}
3235
3236// =============================================================================
3237// Clone helpers — &T → T transitions
3238// =============================================================================
3239
3240impl<'a, In, T: Clone, Chain: ChainCall<In, Out = &'a T>> PipelineChain<In, &'a T, Chain> {
3241    /// Clone a borrowed output to produce an owned value.
3242    ///
3243    /// Transitions the pipeline from `&T` to `T`. Uses UFCS
3244    /// (`T::clone(val)`) — `val.clone()` on a `&&T` resolves to
3245    /// `<&T as Clone>::clone` and returns `&T`, not `T`.
3246    pub fn cloned(self) -> PipelineChain<In, T, ClonedNode<Chain>> {
3247        PipelineChain {
3248            chain: ClonedNode { prev: self.chain },
3249            _marker: PhantomData,
3250        }
3251    }
3252}
3253
3254impl<'a, In, T: Clone, Chain: ChainCall<In, Out = Option<&'a T>>>
3255    PipelineChain<In, Option<&'a T>, Chain>
3256{
3257    /// Clone inner borrowed value. `Option<&T>` → `Option<T>`.
3258    pub fn cloned(self) -> PipelineChain<In, Option<T>, ClonedOptionNode<Chain>> {
3259        PipelineChain {
3260            chain: ClonedOptionNode { prev: self.chain },
3261            _marker: PhantomData,
3262        }
3263    }
3264}
3265
3266impl<'a, In, T: Clone, E, Chain: ChainCall<In, Out = Result<&'a T, E>>>
3267    PipelineChain<In, Result<&'a T, E>, Chain>
3268{
3269    /// Clone inner borrowed Ok value. `Result<&T, E>` → `Result<T, E>`.
3270    pub fn cloned(self) -> PipelineChain<In, Result<T, E>, ClonedResultNode<Chain>> {
3271        PipelineChain {
3272            chain: ClonedResultNode { prev: self.chain },
3273            _marker: PhantomData,
3274        }
3275    }
3276}
3277
3278// =============================================================================
3279// Option helpers — PipelineChain<In, Option<T>, Chain>
3280// =============================================================================
3281
3282impl<In, T, Chain: ChainCall<In, Out = Option<T>>> PipelineChain<In, Option<T>, Chain> {
3283    // -- IntoStep-based (hot path) -------------------------------------------
3284
3285    /// Transform the inner value. Step not called on None.
3286    pub fn map<U, Params, S: IntoStep<T, U, Params>>(
3287        self,
3288        f: S,
3289        registry: &Registry,
3290    ) -> PipelineChain<In, Option<U>, MapOptionNode<Chain, S::Step>> {
3291        PipelineChain {
3292            chain: MapOptionNode {
3293                prev: self.chain,
3294                step: f.into_step(registry),
3295            },
3296            _marker: PhantomData,
3297        }
3298    }
3299
3300    /// Short-circuits on None. std: `Option::and_then`
3301    pub fn and_then<U, Params, S: IntoStep<T, Option<U>, Params>>(
3302        self,
3303        f: S,
3304        registry: &Registry,
3305    ) -> PipelineChain<In, Option<U>, AndThenNode<Chain, S::Step>> {
3306        PipelineChain {
3307            chain: AndThenNode {
3308                prev: self.chain,
3309                step: f.into_step(registry),
3310            },
3311            _marker: PhantomData,
3312        }
3313    }
3314
3315    // -- Resolved (cold path, now with Param resolution) -----------------------
3316
3317    /// Side effect on None. Complement to [`inspect`](Self::inspect).
3318    pub fn on_none<Params, S: IntoProducer<(), Params>>(
3319        self,
3320        f: S,
3321        registry: &Registry,
3322    ) -> PipelineChain<In, Option<T>, OnNoneNode<Chain, S::Step>> {
3323        PipelineChain {
3324            chain: OnNoneNode {
3325                prev: self.chain,
3326                producer: f.into_producer(registry),
3327            },
3328            _marker: PhantomData,
3329        }
3330    }
3331
3332    /// Keep value if predicate holds. std: `Option::filter`
3333    ///
3334    /// # Common Mistakes
3335    ///
3336    /// Filter operates on `&T` inside the Option, not `T`:
3337    /// ```compile_fail
3338    /// # use nexus_rt::{PipelineBuilder, WorldBuilder};
3339    /// # let mut wb = WorldBuilder::new();
3340    /// # let world = wb.build();
3341    /// # let reg = world.registry();
3342    /// fn to_opt(x: u32) -> Option<u32> { Some(x) }
3343    /// // ERROR: takes u32, should be &u32
3344    /// PipelineBuilder::<u32>::new()
3345    ///     .then(to_opt, &reg)
3346    ///     .filter(|x: u32| x > 10, &reg);
3347    /// ```
3348    pub fn filter<Params, S: IntoRefStep<T, bool, Params>>(
3349        self,
3350        f: S,
3351        registry: &Registry,
3352    ) -> PipelineChain<In, Option<T>, FilterNode<Chain, S::Step>> {
3353        PipelineChain {
3354            chain: FilterNode {
3355                prev: self.chain,
3356                step: f.into_ref_step(registry),
3357            },
3358            _marker: PhantomData,
3359        }
3360    }
3361
3362    /// Side effect on Some value. std: `Option::inspect`
3363    ///
3364    /// Takes `&T`, not `T` — the value passes through.
3365    pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
3366        self,
3367        f: S,
3368        registry: &Registry,
3369    ) -> PipelineChain<In, Option<T>, InspectOptionNode<Chain, S::Step>> {
3370        PipelineChain {
3371            chain: InspectOptionNode {
3372                prev: self.chain,
3373                step: f.into_ref_step(registry),
3374            },
3375            _marker: PhantomData,
3376        }
3377    }
3378
3379    /// None becomes Err(err). std: `Option::ok_or`
3380    ///
3381    /// `Clone` required because the pipeline may run many times —
3382    /// the error value is cloned on each `None` invocation.
3383    pub fn ok_or<E: Clone>(self, err: E) -> PipelineChain<In, Result<T, E>, OkOrNode<Chain, E>> {
3384        PipelineChain {
3385            chain: OkOrNode {
3386                prev: self.chain,
3387                err,
3388            },
3389            _marker: PhantomData,
3390        }
3391    }
3392
3393    /// None becomes Err(f()). std: `Option::ok_or_else`
3394    pub fn ok_or_else<E, Params, S: IntoProducer<E, Params>>(
3395        self,
3396        f: S,
3397        registry: &Registry,
3398    ) -> PipelineChain<In, Result<T, E>, OkOrElseNode<Chain, S::Step>> {
3399        PipelineChain {
3400            chain: OkOrElseNode {
3401                prev: self.chain,
3402                producer: f.into_producer(registry),
3403            },
3404            _marker: PhantomData,
3405        }
3406    }
3407
3408    /// Exit Option — None becomes the default value.
3409    ///
3410    /// `Clone` required because the pipeline may run many times —
3411    /// the default is cloned on each `None` invocation (unlike
3412    /// std's `unwrap_or` which consumes the value once).
3413    pub fn unwrap_or(self, default: T) -> PipelineChain<In, T, UnwrapOrOptionNode<Chain, T>>
3414    where
3415        T: Clone,
3416    {
3417        PipelineChain {
3418            chain: UnwrapOrOptionNode {
3419                prev: self.chain,
3420                default,
3421            },
3422            _marker: PhantomData,
3423        }
3424    }
3425
3426    /// Exit Option — None becomes `f()`.
3427    pub fn unwrap_or_else<Params, S: IntoProducer<T, Params>>(
3428        self,
3429        f: S,
3430        registry: &Registry,
3431    ) -> PipelineChain<In, T, UnwrapOrElseOptionNode<Chain, S::Step>> {
3432        PipelineChain {
3433            chain: UnwrapOrElseOptionNode {
3434                prev: self.chain,
3435                producer: f.into_producer(registry),
3436            },
3437            _marker: PhantomData,
3438        }
3439    }
3440}
3441
3442// =============================================================================
3443// Result helpers — PipelineChain<In, Result<T, E>, Chain>
3444// =============================================================================
3445
3446impl<In, T, E, Chain: ChainCall<In, Out = Result<T, E>>> PipelineChain<In, Result<T, E>, Chain> {
3447    // -- IntoStep-based (hot path) -------------------------------------------
3448
3449    /// Transform the Ok value. Step not called on Err.
3450    pub fn map<U, Params, S: IntoStep<T, U, Params>>(
3451        self,
3452        f: S,
3453        registry: &Registry,
3454    ) -> PipelineChain<In, Result<U, E>, MapResultNode<Chain, S::Step>> {
3455        PipelineChain {
3456            chain: MapResultNode {
3457                prev: self.chain,
3458                step: f.into_step(registry),
3459            },
3460            _marker: PhantomData,
3461        }
3462    }
3463
3464    /// Short-circuits on Err. std: `Result::and_then`
3465    pub fn and_then<U, Params, S: IntoStep<T, Result<U, E>, Params>>(
3466        self,
3467        f: S,
3468        registry: &Registry,
3469    ) -> PipelineChain<In, Result<U, E>, AndThenResultNode<Chain, S::Step>> {
3470        PipelineChain {
3471            chain: AndThenResultNode {
3472                prev: self.chain,
3473                step: f.into_step(registry),
3474            },
3475            _marker: PhantomData,
3476        }
3477    }
3478
3479    /// Handle error and transition to Option.
3480    ///
3481    /// `Ok(val)` becomes `Some(val)` — handler not called.
3482    /// `Err(err)` calls the handler, then produces `None`.
3483    pub fn catch<Params, S: IntoStep<E, (), Params>>(
3484        self,
3485        f: S,
3486        registry: &Registry,
3487    ) -> PipelineChain<In, Option<T>, CatchNode<Chain, S::Step>> {
3488        PipelineChain {
3489            chain: CatchNode {
3490                prev: self.chain,
3491                step: f.into_step(registry),
3492            },
3493            _marker: PhantomData,
3494        }
3495    }
3496
3497    // -- Resolved (cold path, now with Param resolution) -----------------------
3498
3499    /// Transform the error. std: `Result::map_err`
3500    pub fn map_err<E2, Params, S: IntoStep<E, E2, Params>>(
3501        self,
3502        f: S,
3503        registry: &Registry,
3504    ) -> PipelineChain<In, Result<T, E2>, MapErrNode<Chain, S::Step>> {
3505        PipelineChain {
3506            chain: MapErrNode {
3507                prev: self.chain,
3508                step: f.into_step(registry),
3509            },
3510            _marker: PhantomData,
3511        }
3512    }
3513
3514    /// Recover from Err. std: `Result::or_else`
3515    pub fn or_else<E2, Params, S: IntoStep<E, Result<T, E2>, Params>>(
3516        self,
3517        f: S,
3518        registry: &Registry,
3519    ) -> PipelineChain<In, Result<T, E2>, OrElseNode<Chain, S::Step>> {
3520        PipelineChain {
3521            chain: OrElseNode {
3522                prev: self.chain,
3523                step: f.into_step(registry),
3524            },
3525            _marker: PhantomData,
3526        }
3527    }
3528
3529    /// Side effect on Ok. std: `Result::inspect`
3530    ///
3531    /// Takes `&T`, not `T` — the value passes through.
3532    pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
3533        self,
3534        f: S,
3535        registry: &Registry,
3536    ) -> PipelineChain<In, Result<T, E>, InspectResultNode<Chain, S::Step>> {
3537        PipelineChain {
3538            chain: InspectResultNode {
3539                prev: self.chain,
3540                step: f.into_ref_step(registry),
3541            },
3542            _marker: PhantomData,
3543        }
3544    }
3545
3546    /// Side effect on Err. std: `Result::inspect_err`
3547    ///
3548    /// Takes `&E`, not `E` — the error passes through.
3549    pub fn inspect_err<Params, S: IntoRefStep<E, (), Params>>(
3550        self,
3551        f: S,
3552        registry: &Registry,
3553    ) -> PipelineChain<In, Result<T, E>, InspectErrNode<Chain, S::Step>> {
3554        PipelineChain {
3555            chain: InspectErrNode {
3556                prev: self.chain,
3557                step: f.into_ref_step(registry),
3558            },
3559            _marker: PhantomData,
3560        }
3561    }
3562
3563    /// Discard error, enter Option land. std: `Result::ok`
3564    pub fn ok(self) -> PipelineChain<In, Option<T>, OkResultNode<Chain>> {
3565        PipelineChain {
3566            chain: OkResultNode { prev: self.chain },
3567            _marker: PhantomData,
3568        }
3569    }
3570
3571    /// Exit Result — Err becomes the default value.
3572    ///
3573    /// `Clone` required because the pipeline may run many times —
3574    /// the default is cloned on each `Err` invocation (unlike
3575    /// std's `unwrap_or` which consumes the value once).
3576    pub fn unwrap_or(self, default: T) -> PipelineChain<In, T, UnwrapOrResultNode<Chain, T>>
3577    where
3578        T: Clone,
3579    {
3580        PipelineChain {
3581            chain: UnwrapOrResultNode {
3582                prev: self.chain,
3583                default,
3584            },
3585            _marker: PhantomData,
3586        }
3587    }
3588
3589    /// Exit Result — Err becomes `f(err)`.
3590    pub fn unwrap_or_else<Params, S: IntoStep<E, T, Params>>(
3591        self,
3592        f: S,
3593        registry: &Registry,
3594    ) -> PipelineChain<In, T, UnwrapOrElseResultNode<Chain, S::Step>> {
3595        PipelineChain {
3596            chain: UnwrapOrElseResultNode {
3597                prev: self.chain,
3598                step: f.into_step(registry),
3599            },
3600            _marker: PhantomData,
3601        }
3602    }
3603}
3604
3605// =============================================================================
3606// PipelineOutput — marker trait for build()
3607// =============================================================================
3608
3609mod pipeline_output_seal {
3610    pub trait Sealed {}
3611    impl Sealed for () {}
3612    impl Sealed for Option<()> {}
3613}
3614
3615/// Sealed marker trait for valid pipeline terminal types.
3616///
3617/// Only `()` and `Option<()>` satisfy this. A pipeline can only
3618/// `.build()` when its output is one of these types — add a final
3619/// `.then()` or `.dispatch()` that consumes the output.
3620#[diagnostic::on_unimplemented(
3621    message = "`build()` requires the pipeline output to be `()` or `Option<()>`",
3622    label = "this pipeline produces `{Self}`, not `()` or `Option<()>`",
3623    note = "add a final `.then()` or `.dispatch()` that consumes the output"
3624)]
3625pub trait PipelineOutput: pipeline_output_seal::Sealed {}
3626impl PipelineOutput for () {}
3627impl PipelineOutput for Option<()> {}
3628
3629// =============================================================================
3630// build — when Out: PipelineOutput (() or Option<()>)
3631// =============================================================================
3632
3633impl<In, Chain: ChainCall<In, Out = ()>> PipelineChain<In, (), Chain> {
3634    /// Finalize the pipeline into a [`Pipeline`].
3635    ///
3636    /// The returned pipeline is a concrete, monomorphized type — no boxing,
3637    /// no virtual dispatch. Call `.run()` directly for zero-cost execution,
3638    /// or wrap in `Box<dyn Handler<In>>` when type erasure is needed.
3639    ///
3640    /// Only available when the pipeline ends with `()` or `Option<()>`.
3641    /// If your chain produces a value, add a final `.then()` that consumes
3642    /// the output.
3643    #[must_use = "building a pipeline without storing it does nothing"]
3644    pub fn build(self) -> Pipeline<Chain> {
3645        Pipeline { chain: self.chain }
3646    }
3647}
3648
3649impl<In, Chain: ChainCall<In, Out = Option<()>>> PipelineChain<In, Option<()>, Chain> {
3650    /// Finalize the pipeline into a [`Pipeline`], discarding the `Option<()>`.
3651    ///
3652    /// Pipelines ending with `Option<()>` (e.g. after `.map()` on an
3653    /// `Option<T>` with a step that returns `()`) produce the same
3654    /// [`Pipeline`] as those ending with `()`.
3655    #[must_use = "building a pipeline without storing it does nothing"]
3656    pub fn build(self) -> Pipeline<DiscardOptionNode<Chain>> {
3657        Pipeline {
3658            chain: DiscardOptionNode { prev: self.chain },
3659        }
3660    }
3661}
3662
3663// =============================================================================
3664// build_batch — when Out: PipelineOutput (() or Option<()>)
3665// =============================================================================
3666
3667impl<In, Out: PipelineOutput, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
3668    /// Finalize into a [`BatchPipeline`] with a pre-allocated input buffer.
3669    ///
3670    /// Same pipeline chain as [`build`](PipelineChain::build), but the
3671    /// pipeline owns an input buffer that drivers fill between dispatch
3672    /// cycles. Each call to [`BatchPipeline::run`] drains the buffer,
3673    /// running every item through the chain independently.
3674    ///
3675    /// Available when the pipeline ends with `()` or `Option<()>` (e.g.
3676    /// after `.catch()` or `.filter()`). Pipelines producing values need
3677    /// a final `.then()` that consumes the output.
3678    ///
3679    /// `capacity` is the initial allocation — the buffer can grow if needed,
3680    /// but sizing it for the expected batch size avoids reallocation.
3681    #[must_use = "building a pipeline without storing it does nothing"]
3682    pub fn build_batch(self, capacity: usize) -> BatchPipeline<In, Chain> {
3683        BatchPipeline {
3684            input: Vec::with_capacity(capacity),
3685            chain: self.chain,
3686        }
3687    }
3688}
3689
3690// =============================================================================
3691// Pipeline<F> — built pipeline
3692// =============================================================================
3693
3694/// Built step pipeline implementing [`Handler<E>`](crate::Handler).
3695///
3696/// Created by [`PipelineChain::build`]. The entire pipeline chain is
3697/// monomorphized at compile time — no boxing, no virtual dispatch.
3698/// Call `.run()` directly for zero-cost execution, or wrap in
3699/// `Box<dyn Handler<E>>` when you need type erasure (single box).
3700///
3701/// Implements [`Handler<E>`](crate::Handler) for any event type `E`
3702/// that the chain accepts — including borrowed types like `&'a [u8]`.
3703/// Supports `for<'a> Handler<&'a T>` for zero-copy event dispatch.
3704pub struct Pipeline<F> {
3705    chain: F,
3706}
3707
3708impl<E, F: ChainCall<E, Out = ()> + Send> crate::Handler<E> for Pipeline<F> {
3709    fn run(&mut self, world: &mut World, event: E) {
3710        self.chain.call(world, event);
3711    }
3712}
3713
3714// =============================================================================
3715// BatchPipeline<In, F> — pipeline with owned input buffer
3716// =============================================================================
3717
3718/// Batch pipeline that owns a pre-allocated input buffer.
3719///
3720/// Created by [`PipelineChain::build_batch`]. Each item flows through
3721/// the full pipeline chain independently — the same per-item `Option`
3722/// and `Result` flow control as [`Pipeline`]. Errors are handled inline
3723/// (via `.catch()`, `.unwrap_or()`, etc.) and the batch continues to
3724/// the next item. No intermediate buffers between steps.
3725///
3726/// # Examples
3727///
3728/// ```
3729/// use nexus_rt::{WorldBuilder, ResMut, PipelineBuilder, Resource};
3730///
3731/// #[derive(Resource)]
3732/// struct Accum(u64);
3733///
3734/// let mut wb = WorldBuilder::new();
3735/// wb.register(Accum(0));
3736/// let mut world = wb.build();
3737///
3738/// fn accumulate(mut sum: ResMut<Accum>, x: u32) {
3739///     sum.0 += x as u64;
3740/// }
3741///
3742/// let r = world.registry();
3743/// let mut batch = PipelineBuilder::<u32>::new()
3744///     .then(accumulate, r)
3745///     .build_batch(64);
3746///
3747/// batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
3748/// batch.run(&mut world);
3749///
3750/// assert_eq!(world.resource::<Accum>().0, 15);
3751/// assert!(batch.input().is_empty());
3752/// ```
3753pub struct BatchPipeline<In, F> {
3754    input: Vec<In>,
3755    chain: F,
3756}
3757
3758impl<In, Out: PipelineOutput, F: ChainCall<In, Out = Out>> BatchPipeline<In, F> {
3759    /// Mutable access to the input buffer. Drivers fill this between
3760    /// dispatch cycles.
3761    pub fn input_mut(&mut self) -> &mut Vec<In> {
3762        &mut self.input
3763    }
3764
3765    /// Read-only access to the input buffer.
3766    pub fn input(&self) -> &[In] {
3767        &self.input
3768    }
3769
3770    /// Drain the input buffer, running each item through the pipeline.
3771    ///
3772    /// Each item gets independent `Option`/`Result` flow control — an
3773    /// error on one item does not affect subsequent items. After `run()`,
3774    /// the input buffer is empty but retains its allocation.
3775    pub fn run(&mut self, world: &mut World) {
3776        for item in self.input.drain(..) {
3777            let _ = self.chain.call(world, item);
3778        }
3779    }
3780}
3781
3782// =============================================================================
3783// resolve_step — pre-resolve a step for manual dispatch (owned input)
3784// =============================================================================
3785
3786/// Resolve a step for use in manual dispatch (e.g. inside an
3787/// [`Opaque`] closure passed to `.then()`).
3788///
3789/// Returns a closure with pre-resolved [`Param`] state — the same
3790/// build-time resolution that `.then()` performs, but as a standalone
3791/// value the caller can invoke from any context.
3792///
3793/// This is the pipeline (owned-input) counterpart of
3794/// [`dag::resolve_arm`](crate::dag::resolve_arm) (reference-input).
3795///
3796/// # Examples
3797///
3798/// ```ignore
3799/// let mut arm0 = resolve_step(handle_new, &reg);
3800/// let mut arm1 = resolve_step(handle_cancel, &reg);
3801///
3802/// pipeline.then(move |world: &mut World, order: Order| match order.kind {
3803///     OrderKind::New    => arm0(world, order),
3804///     OrderKind::Cancel => arm1(world, order),
3805/// }, &reg)
3806/// ```
3807pub fn resolve_step<In, Out, Params, S>(
3808    f: S,
3809    registry: &Registry,
3810) -> impl FnMut(&mut World, In) -> Out + use<In, Out, Params, S>
3811where
3812    In: 'static,
3813    Out: 'static,
3814    S: IntoStep<In, Out, Params>,
3815{
3816    let mut resolved = f.into_step(registry);
3817    move |world: &mut World, input: In| resolved.call(world, input)
3818}
3819
3820// =============================================================================
3821// Tests
3822// =============================================================================
3823
3824#[cfg(test)]
3825mod tests {
3826    use super::*;
3827    use crate::{Handler, IntoHandler, Local, Res, ResMut, WorldBuilder, fan_out};
3828
3829    // =========================================================================
3830    // Core dispatch
3831    // =========================================================================
3832
3833    #[test]
3834    fn step_pure_transform() {
3835        let mut world = WorldBuilder::new().build();
3836        let r = world.registry_mut();
3837        let mut p = PipelineBuilder::<u32>::new().then(|x: u32| x as u64 * 2, r);
3838        assert_eq!(p.run(&mut world, 5), 10u64);
3839    }
3840
3841    #[test]
3842    fn step_one_res() {
3843        let mut wb = WorldBuilder::new();
3844        wb.register::<u64>(10);
3845        let mut world = wb.build();
3846
3847        fn multiply(factor: Res<u64>, x: u32) -> u64 {
3848            *factor * x as u64
3849        }
3850
3851        let r = world.registry_mut();
3852        let mut p = PipelineBuilder::<u32>::new().then(multiply, r);
3853        assert_eq!(p.run(&mut world, 5), 50);
3854    }
3855
3856    #[test]
3857    fn step_one_res_mut() {
3858        let mut wb = WorldBuilder::new();
3859        wb.register::<u64>(0);
3860        let mut world = wb.build();
3861
3862        fn accumulate(mut total: ResMut<u64>, x: u32) {
3863            *total += x as u64;
3864        }
3865
3866        let r = world.registry_mut();
3867        let mut p = PipelineBuilder::<u32>::new().then(accumulate, r);
3868        p.run(&mut world, 10);
3869        p.run(&mut world, 5);
3870        assert_eq!(*world.resource::<u64>(), 15);
3871    }
3872
3873    #[test]
3874    fn step_two_params() {
3875        let mut wb = WorldBuilder::new();
3876        wb.register::<u64>(10);
3877        wb.register::<bool>(true);
3878        let mut world = wb.build();
3879
3880        fn conditional(factor: Res<u64>, flag: Res<bool>, x: u32) -> u64 {
3881            if *flag { *factor * x as u64 } else { 0 }
3882        }
3883
3884        let r = world.registry_mut();
3885        let mut p = PipelineBuilder::<u32>::new().then(conditional, r);
3886        assert_eq!(p.run(&mut world, 5), 50);
3887    }
3888
3889    #[test]
3890    fn step_chain_two() {
3891        let mut wb = WorldBuilder::new();
3892        wb.register::<u64>(2);
3893        let mut world = wb.build();
3894
3895        fn double(factor: Res<u64>, x: u32) -> u64 {
3896            *factor * x as u64
3897        }
3898
3899        let r = world.registry_mut();
3900        let mut p = PipelineBuilder::<u32>::new()
3901            .then(double, r)
3902            .then(|val: u64| val + 1, r);
3903        assert_eq!(p.run(&mut world, 5), 11); // 2*5 + 1
3904    }
3905
3906    // =========================================================================
3907    // Option combinators
3908    // =========================================================================
3909
3910    #[test]
3911    fn option_map_on_some() {
3912        let mut wb = WorldBuilder::new();
3913        wb.register::<u64>(10);
3914        let mut world = wb.build();
3915
3916        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
3917            *factor + x as u64
3918        }
3919
3920        let r = world.registry_mut();
3921        let mut p = PipelineBuilder::<u32>::new()
3922            .then(|x: u32| -> Option<u32> { Some(x) }, r)
3923            .map(add_factor, r);
3924        assert_eq!(p.run(&mut world, 5), Some(15));
3925    }
3926
3927    #[test]
3928    fn option_map_skips_none() {
3929        let mut wb = WorldBuilder::new();
3930        wb.register::<bool>(false);
3931        let mut world = wb.build();
3932
3933        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
3934            *flag = true;
3935            0
3936        }
3937
3938        let r = world.registry_mut();
3939        let mut p = PipelineBuilder::<u32>::new()
3940            .then(|_x: u32| -> Option<u32> { None }, r)
3941            .map(mark, r);
3942        assert_eq!(p.run(&mut world, 5), None);
3943        assert!(!*world.resource::<bool>());
3944    }
3945
3946    #[test]
3947    fn option_and_then_chains() {
3948        let mut wb = WorldBuilder::new();
3949        wb.register::<u64>(10);
3950        let mut world = wb.build();
3951
3952        fn check(min: Res<u64>, x: u32) -> Option<u64> {
3953            let val = x as u64;
3954            if val > *min { Some(val) } else { None }
3955        }
3956
3957        let r = world.registry_mut();
3958        let mut p = PipelineBuilder::<u32>::new()
3959            .then(|x: u32| Some(x), r)
3960            .and_then(check, r);
3961        assert_eq!(p.run(&mut world, 20), Some(20));
3962    }
3963
3964    #[test]
3965    fn option_and_then_short_circuits() {
3966        let mut wb = WorldBuilder::new();
3967        wb.register::<u64>(10);
3968        let mut world = wb.build();
3969
3970        fn check(min: Res<u64>, x: u32) -> Option<u64> {
3971            let val = x as u64;
3972            if val > *min { Some(val) } else { None }
3973        }
3974
3975        let r = world.registry_mut();
3976        let mut p = PipelineBuilder::<u32>::new()
3977            .then(|x: u32| Some(x), r)
3978            .and_then(check, r);
3979        assert_eq!(p.run(&mut world, 5), None);
3980    }
3981
3982    #[test]
3983    fn option_on_none_fires() {
3984        let mut wb = WorldBuilder::new();
3985        wb.register::<bool>(false);
3986        let mut world = wb.build();
3987
3988        let r = world.registry();
3989        let mut p = PipelineBuilder::<u32>::new()
3990            .then(|_x: u32| -> Option<u32> { None }, r)
3991            .on_none(
3992                |w: &mut World| {
3993                    *w.resource_mut::<bool>() = true;
3994                },
3995                r,
3996            );
3997        p.run(&mut world, 0);
3998        assert!(*world.resource::<bool>());
3999    }
4000
4001    #[test]
4002    fn option_filter_keeps() {
4003        let mut world = WorldBuilder::new().build();
4004        let r = world.registry_mut();
4005        let mut p = PipelineBuilder::<u32>::new()
4006            .then(|x: u32| Some(x), r)
4007            .filter(|x: &u32| *x > 3, r);
4008        assert_eq!(p.run(&mut world, 5), Some(5));
4009    }
4010
4011    #[test]
4012    fn option_filter_drops() {
4013        let mut world = WorldBuilder::new().build();
4014        let r = world.registry_mut();
4015        let mut p = PipelineBuilder::<u32>::new()
4016            .then(|x: u32| Some(x), r)
4017            .filter(|x: &u32| *x > 10, r);
4018        assert_eq!(p.run(&mut world, 5), None);
4019    }
4020
4021    // =========================================================================
4022    // Result combinators
4023    // =========================================================================
4024
4025    #[test]
4026    fn result_map_on_ok() {
4027        let mut wb = WorldBuilder::new();
4028        wb.register::<u64>(10);
4029        let mut world = wb.build();
4030
4031        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
4032            *factor + x as u64
4033        }
4034
4035        let r = world.registry_mut();
4036        let mut p = PipelineBuilder::<u32>::new()
4037            .then(|x: u32| -> Result<u32, String> { Ok(x) }, r)
4038            .map(add_factor, r);
4039        assert_eq!(p.run(&mut world, 5), Ok(15));
4040    }
4041
4042    #[test]
4043    fn result_map_skips_err() {
4044        let mut wb = WorldBuilder::new();
4045        wb.register::<bool>(false);
4046        let mut world = wb.build();
4047
4048        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
4049            *flag = true;
4050            0
4051        }
4052
4053        let r = world.registry_mut();
4054        let mut p = PipelineBuilder::<u32>::new()
4055            .then(|_x: u32| -> Result<u32, String> { Err("fail".into()) }, r)
4056            .map(mark, r);
4057        assert!(p.run(&mut world, 5).is_err());
4058        assert!(!*world.resource::<bool>());
4059    }
4060
4061    #[test]
4062    fn result_catch_handles_error() {
4063        let mut wb = WorldBuilder::new();
4064        wb.register::<String>(String::new());
4065        let mut world = wb.build();
4066
4067        fn log_error(mut log: ResMut<String>, err: String) {
4068            *log = err;
4069        }
4070
4071        let r = world.registry_mut();
4072        let mut p = PipelineBuilder::<u32>::new()
4073            .then(|_x: u32| -> Result<u32, String> { Err("caught".into()) }, r)
4074            .catch(log_error, r);
4075        assert_eq!(p.run(&mut world, 0), None);
4076        assert_eq!(world.resource::<String>().as_str(), "caught");
4077    }
4078
4079    #[test]
4080    fn result_catch_passes_ok() {
4081        let mut wb = WorldBuilder::new();
4082        wb.register::<String>(String::new());
4083        let mut world = wb.build();
4084
4085        fn log_error(mut log: ResMut<String>, err: String) {
4086            *log = err;
4087        }
4088
4089        let r = world.registry_mut();
4090        let mut p = PipelineBuilder::<u32>::new()
4091            .then(|x: u32| -> Result<u32, String> { Ok(x) }, r)
4092            .catch(log_error, r);
4093        assert_eq!(p.run(&mut world, 5), Some(5));
4094        assert!(world.resource::<String>().is_empty());
4095    }
4096
4097    // =========================================================================
4098    // Build + Handler
4099    // =========================================================================
4100
4101    #[test]
4102    fn build_produces_handler() {
4103        let mut wb = WorldBuilder::new();
4104        wb.register::<u64>(0);
4105        let mut world = wb.build();
4106
4107        fn accumulate(mut total: ResMut<u64>, x: u32) {
4108            *total += x as u64;
4109        }
4110
4111        let r = world.registry_mut();
4112        let mut pipeline = PipelineBuilder::<u32>::new().then(accumulate, r).build();
4113
4114        pipeline.run(&mut world, 10);
4115        pipeline.run(&mut world, 5);
4116        assert_eq!(*world.resource::<u64>(), 15);
4117    }
4118
4119    #[test]
4120    fn run_returns_output() {
4121        let mut wb = WorldBuilder::new();
4122        wb.register::<u64>(3);
4123        let mut world = wb.build();
4124
4125        fn multiply(factor: Res<u64>, x: u32) -> u64 {
4126            *factor * x as u64
4127        }
4128
4129        let r = world.registry_mut();
4130        let mut p = PipelineBuilder::<u32>::new().then(multiply, r);
4131        let result: u64 = p.run(&mut world, 7);
4132        assert_eq!(result, 21);
4133    }
4134
4135    // =========================================================================
4136    // Safety
4137    // =========================================================================
4138
4139    #[test]
4140    #[should_panic(expected = "not registered")]
4141    fn panics_on_missing_resource() {
4142        let mut world = WorldBuilder::new().build();
4143
4144        fn needs_u64(_val: Res<u64>, _x: u32) -> u32 {
4145            0
4146        }
4147
4148        let r = world.registry_mut();
4149        let _p = PipelineBuilder::<u32>::new().then(needs_u64, r);
4150    }
4151
4152    // =========================================================================
4153    // Access conflict detection
4154    // =========================================================================
4155
4156    #[test]
4157    #[should_panic(expected = "conflicting access")]
4158    fn step_duplicate_access_panics() {
4159        let mut wb = WorldBuilder::new();
4160        wb.register::<u64>(0);
4161        let mut world = wb.build();
4162
4163        fn bad(a: Res<u64>, b: ResMut<u64>, _x: u32) -> u32 {
4164            let _ = (*a, &*b);
4165            0
4166        }
4167
4168        let r = world.registry_mut();
4169        let _p = PipelineBuilder::<u32>::new().then(bad, r);
4170    }
4171
4172    // =========================================================================
4173    // Integration
4174    // =========================================================================
4175
4176    #[test]
4177    fn local_in_step() {
4178        let mut wb = WorldBuilder::new();
4179        wb.register::<u64>(0);
4180        let mut world = wb.build();
4181
4182        fn count(mut count: Local<u64>, mut total: ResMut<u64>, _x: u32) {
4183            *count += 1;
4184            *total = *count;
4185        }
4186
4187        let r = world.registry_mut();
4188        let mut p = PipelineBuilder::<u32>::new().then(count, r);
4189        p.run(&mut world, 0);
4190        p.run(&mut world, 0);
4191        p.run(&mut world, 0);
4192        assert_eq!(*world.resource::<u64>(), 3);
4193    }
4194
4195    // =========================================================================
4196    // Option combinators (extended)
4197    // =========================================================================
4198
4199    #[test]
4200    fn option_unwrap_or_some() {
4201        let mut world = WorldBuilder::new().build();
4202        let r = world.registry_mut();
4203        let mut p = PipelineBuilder::<u32>::new()
4204            .then(|x: u32| -> Option<u32> { Some(x) }, r)
4205            .unwrap_or(99);
4206        assert_eq!(p.run(&mut world, 5), 5);
4207    }
4208
4209    #[test]
4210    fn option_unwrap_or_none() {
4211        let mut world = WorldBuilder::new().build();
4212        let r = world.registry_mut();
4213        let mut p = PipelineBuilder::<u32>::new()
4214            .then(|_x: u32| -> Option<u32> { None }, r)
4215            .unwrap_or(99);
4216        assert_eq!(p.run(&mut world, 5), 99);
4217    }
4218
4219    #[test]
4220    fn option_unwrap_or_else() {
4221        let mut world = WorldBuilder::new().build();
4222        let r = world.registry_mut();
4223        let mut p = PipelineBuilder::<u32>::new()
4224            .then(|_x: u32| -> Option<u32> { None }, r)
4225            .unwrap_or_else(|| 42, r);
4226        assert_eq!(p.run(&mut world, 0), 42);
4227    }
4228
4229    #[test]
4230    fn option_ok_or() {
4231        let mut world = WorldBuilder::new().build();
4232        let r = world.registry_mut();
4233        let mut p = PipelineBuilder::<u32>::new()
4234            .then(|_x: u32| -> Option<u32> { None }, r)
4235            .ok_or("missing");
4236        assert_eq!(p.run(&mut world, 0), Err("missing"));
4237    }
4238
4239    #[test]
4240    fn option_ok_or_some() {
4241        let mut world = WorldBuilder::new().build();
4242        let r = world.registry_mut();
4243        let mut p = PipelineBuilder::<u32>::new()
4244            .then(|x: u32| -> Option<u32> { Some(x) }, r)
4245            .ok_or("missing");
4246        assert_eq!(p.run(&mut world, 7), Ok(7));
4247    }
4248
4249    #[test]
4250    fn option_ok_or_else() {
4251        let mut world = WorldBuilder::new().build();
4252        let r = world.registry_mut();
4253        let mut p = PipelineBuilder::<u32>::new()
4254            .then(|_x: u32| -> Option<u32> { None }, r)
4255            .ok_or_else(|| "computed", r);
4256        assert_eq!(p.run(&mut world, 0), Err("computed"));
4257    }
4258
4259    #[test]
4260    fn option_inspect_passes_through() {
4261        let mut wb = WorldBuilder::new();
4262        wb.register::<u64>(0);
4263        let mut world = wb.build();
4264        let r = world.registry_mut();
4265        let mut p = PipelineBuilder::<u32>::new()
4266            .then(|x: u32| -> Option<u32> { Some(x) }, r)
4267            .inspect(|_val: &u32| {}, r);
4268        // inspect should pass through the value unchanged.
4269        assert_eq!(p.run(&mut world, 10), Some(10));
4270    }
4271
4272    // =========================================================================
4273    // Result combinators (extended)
4274    // =========================================================================
4275
4276    #[test]
4277    fn result_map_err() {
4278        let mut world = WorldBuilder::new().build();
4279        let r = world.registry_mut();
4280        let mut p = PipelineBuilder::<u32>::new()
4281            .then(|_x: u32| -> Result<u32, i32> { Err(-1) }, r)
4282            .map_err(|e: i32| e.to_string(), r);
4283        assert_eq!(p.run(&mut world, 0), Err("-1".to_string()));
4284    }
4285
4286    #[test]
4287    fn result_map_err_ok_passthrough() {
4288        let mut world = WorldBuilder::new().build();
4289        let r = world.registry_mut();
4290        let mut p = PipelineBuilder::<u32>::new()
4291            .then(|x: u32| -> Result<u32, i32> { Ok(x) }, r)
4292            .map_err(|e: i32| e.to_string(), r);
4293        assert_eq!(p.run(&mut world, 5), Ok(5));
4294    }
4295
4296    #[test]
4297    fn result_or_else() {
4298        let mut world = WorldBuilder::new().build();
4299        let r = world.registry_mut();
4300        let mut p = PipelineBuilder::<u32>::new()
4301            .then(|_x: u32| -> Result<u32, &str> { Err("fail") }, r)
4302            .or_else(|_e: &str| Ok::<u32, &str>(42), r);
4303        assert_eq!(p.run(&mut world, 0), Ok(42));
4304    }
4305
4306    #[test]
4307    fn result_inspect_passes_through() {
4308        let mut world = WorldBuilder::new().build();
4309        let r = world.registry_mut();
4310        let mut p = PipelineBuilder::<u32>::new()
4311            .then(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
4312            .inspect(|_val: &u32| {}, r);
4313        // inspect should pass through Ok unchanged.
4314        assert_eq!(p.run(&mut world, 7), Ok(7));
4315    }
4316
4317    #[test]
4318    fn result_inspect_err_passes_through() {
4319        let mut world = WorldBuilder::new().build();
4320        let r = world.registry_mut();
4321        let mut p = PipelineBuilder::<u32>::new()
4322            .then(|_x: u32| -> Result<u32, &str> { Err("bad") }, r)
4323            .inspect_err(|_e: &&str| {}, r);
4324        // inspect_err should pass through Err unchanged.
4325        assert_eq!(p.run(&mut world, 0), Err("bad"));
4326    }
4327
4328    #[test]
4329    fn result_ok_converts() {
4330        let mut world = WorldBuilder::new().build();
4331        let r = world.registry_mut();
4332        let mut p = PipelineBuilder::<u32>::new()
4333            .then(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
4334            .ok();
4335        assert_eq!(p.run(&mut world, 5), Some(5));
4336    }
4337
4338    #[test]
4339    fn result_ok_drops_err() {
4340        let mut world = WorldBuilder::new().build();
4341        let r = world.registry_mut();
4342        let mut p = PipelineBuilder::<u32>::new()
4343            .then(|_x: u32| -> Result<u32, &str> { Err("gone") }, r)
4344            .ok();
4345        assert_eq!(p.run(&mut world, 0), None);
4346    }
4347
4348    #[test]
4349    fn result_unwrap_or() {
4350        let mut world = WorldBuilder::new().build();
4351        let r = world.registry_mut();
4352        let mut p = PipelineBuilder::<u32>::new()
4353            .then(|_x: u32| -> Result<u32, &str> { Err("x") }, r)
4354            .unwrap_or(99);
4355        assert_eq!(p.run(&mut world, 0), 99);
4356    }
4357
4358    #[test]
4359    fn result_unwrap_or_else() {
4360        let mut world = WorldBuilder::new().build();
4361        let r = world.registry_mut();
4362        let mut p = PipelineBuilder::<u32>::new()
4363            .then(|_x: u32| -> Result<u32, i32> { Err(-5) }, r)
4364            .unwrap_or_else(|e: i32| e.unsigned_abs(), r);
4365        assert_eq!(p.run(&mut world, 0), 5);
4366    }
4367
4368    // =========================================================================
4369    // Batch pipeline
4370    // =========================================================================
4371
4372    #[test]
4373    fn batch_accumulates() {
4374        let mut wb = WorldBuilder::new();
4375        wb.register::<u64>(0);
4376        let mut world = wb.build();
4377
4378        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4379            *sum += x as u64;
4380        }
4381
4382        let r = world.registry_mut();
4383        let mut batch = PipelineBuilder::<u32>::new()
4384            .then(accumulate, r)
4385            .build_batch(16);
4386
4387        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
4388        batch.run(&mut world);
4389
4390        assert_eq!(*world.resource::<u64>(), 15);
4391        assert!(batch.input().is_empty());
4392    }
4393
4394    #[test]
4395    fn batch_retains_allocation() {
4396        let mut world = WorldBuilder::new().build();
4397        let r = world.registry_mut();
4398        let mut batch = PipelineBuilder::<u32>::new()
4399            .then(|_x: u32| {}, r)
4400            .build_batch(64);
4401
4402        batch.input_mut().extend_from_slice(&[1, 2, 3]);
4403        batch.run(&mut world);
4404
4405        assert!(batch.input().is_empty());
4406        assert!(batch.input_mut().capacity() >= 64);
4407    }
4408
4409    #[test]
4410    fn batch_empty_is_noop() {
4411        let mut wb = WorldBuilder::new();
4412        wb.register::<u64>(0);
4413        let mut world = wb.build();
4414
4415        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4416            *sum += x as u64;
4417        }
4418
4419        let r = world.registry_mut();
4420        let mut batch = PipelineBuilder::<u32>::new()
4421            .then(accumulate, r)
4422            .build_batch(16);
4423
4424        batch.run(&mut world);
4425        assert_eq!(*world.resource::<u64>(), 0);
4426    }
4427
4428    #[test]
4429    fn batch_catch_continues_on_error() {
4430        let mut wb = WorldBuilder::new();
4431        wb.register::<u64>(0);
4432        wb.register::<u32>(0);
4433        let mut world = wb.build();
4434
4435        fn validate(x: u32) -> Result<u32, &'static str> {
4436            if x > 0 { Ok(x) } else { Err("zero") }
4437        }
4438
4439        fn count_errors(mut errs: ResMut<u32>, _err: &'static str) {
4440            *errs += 1;
4441        }
4442
4443        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4444            *sum += x as u64;
4445        }
4446
4447        let r = world.registry_mut();
4448        let mut batch = PipelineBuilder::<u32>::new()
4449            .then(validate, r)
4450            .catch(count_errors, r)
4451            .map(accumulate, r)
4452            .build_batch(16);
4453
4454        // Items: 1, 0 (error), 2, 0 (error), 3
4455        batch.input_mut().extend_from_slice(&[1, 0, 2, 0, 3]);
4456        batch.run(&mut world);
4457
4458        assert_eq!(*world.resource::<u64>(), 6); // 1 + 2 + 3
4459        assert_eq!(*world.resource::<u32>(), 2); // 2 errors
4460    }
4461
4462    #[test]
4463    fn batch_filter_skips_items() {
4464        let mut wb = WorldBuilder::new();
4465        wb.register::<u64>(0);
4466        let mut world = wb.build();
4467
4468        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4469            *sum += x as u64;
4470        }
4471
4472        let r = world.registry_mut();
4473        let mut batch = PipelineBuilder::<u32>::new()
4474            .then(
4475                |x: u32| -> Option<u32> { if x > 2 { Some(x) } else { None } },
4476                r,
4477            )
4478            .map(accumulate, r)
4479            .build_batch(16);
4480
4481        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
4482        batch.run(&mut world);
4483
4484        assert_eq!(*world.resource::<u64>(), 12); // 3 + 4 + 5
4485    }
4486
4487    #[test]
4488    fn batch_multiple_runs_accumulate() {
4489        let mut wb = WorldBuilder::new();
4490        wb.register::<u64>(0);
4491        let mut world = wb.build();
4492
4493        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4494            *sum += x as u64;
4495        }
4496
4497        let r = world.registry_mut();
4498        let mut batch = PipelineBuilder::<u32>::new()
4499            .then(accumulate, r)
4500            .build_batch(16);
4501
4502        batch.input_mut().extend_from_slice(&[1, 2, 3]);
4503        batch.run(&mut world);
4504        assert_eq!(*world.resource::<u64>(), 6);
4505
4506        batch.input_mut().extend_from_slice(&[4, 5]);
4507        batch.run(&mut world);
4508        assert_eq!(*world.resource::<u64>(), 15);
4509    }
4510
4511    #[test]
4512    fn batch_with_world_access() {
4513        let mut wb = WorldBuilder::new();
4514        wb.register::<u64>(10); // multiplier
4515        wb.register::<Vec<u64>>(Vec::new());
4516        let mut world = wb.build();
4517
4518        fn multiply_and_collect(factor: Res<u64>, mut out: ResMut<Vec<u64>>, x: u32) {
4519            out.push(x as u64 * *factor);
4520        }
4521
4522        let r = world.registry_mut();
4523        let mut batch = PipelineBuilder::<u32>::new()
4524            .then(multiply_and_collect, r)
4525            .build_batch(16);
4526
4527        batch.input_mut().extend_from_slice(&[1, 2, 3]);
4528        batch.run(&mut world);
4529
4530        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[10, 20, 30]);
4531    }
4532
4533    // =========================================================================
4534    // Cloned combinator
4535    // =========================================================================
4536
4537    // Named functions for proper lifetime elision (&'a u32 → &'a u32).
4538    // Closures get two independent lifetimes and fail to compile.
4539    fn ref_identity(x: &u32) -> &u32 {
4540        x
4541    }
4542    #[allow(clippy::unnecessary_wraps)]
4543    fn ref_wrap_some(x: &u32) -> Option<&u32> {
4544        Some(x)
4545    }
4546    fn ref_wrap_none(_x: &u32) -> Option<&u32> {
4547        None
4548    }
4549    #[allow(clippy::unnecessary_wraps)]
4550    fn ref_wrap_ok(x: &u32) -> Result<&u32, String> {
4551        Ok(x)
4552    }
4553    fn ref_wrap_err(_x: &u32) -> Result<&u32, String> {
4554        Err("fail".into())
4555    }
4556
4557    #[test]
4558    fn cloned_bare() {
4559        let mut world = WorldBuilder::new().build();
4560        // val before p — val must outlive the pipeline's In = &u32
4561        let val = 42u32;
4562        let r = world.registry_mut();
4563        let mut p = PipelineBuilder::<&u32>::new()
4564            .then(ref_identity, r)
4565            .cloned();
4566        assert_eq!(p.run(&mut world, &val), 42u32);
4567    }
4568
4569    #[test]
4570    fn cloned_option_some() {
4571        let mut world = WorldBuilder::new().build();
4572        let val = 42u32;
4573        let r = world.registry_mut();
4574        let mut p = PipelineBuilder::<&u32>::new()
4575            .then(ref_wrap_some, r)
4576            .cloned();
4577        assert_eq!(p.run(&mut world, &val), Some(42u32));
4578    }
4579
4580    #[test]
4581    fn cloned_option_none() {
4582        let mut world = WorldBuilder::new().build();
4583        let val = 42u32;
4584        let r = world.registry_mut();
4585        let mut p = PipelineBuilder::<&u32>::new()
4586            .then(ref_wrap_none, r)
4587            .cloned();
4588        assert_eq!(p.run(&mut world, &val), None);
4589    }
4590
4591    #[test]
4592    fn cloned_result_ok() {
4593        let mut world = WorldBuilder::new().build();
4594        let val = 42u32;
4595        let r = world.registry_mut();
4596        let mut p = PipelineBuilder::<&u32>::new().then(ref_wrap_ok, r).cloned();
4597        assert_eq!(p.run(&mut world, &val), Ok(42u32));
4598    }
4599
4600    #[test]
4601    fn cloned_result_err() {
4602        let mut world = WorldBuilder::new().build();
4603        let val = 42u32;
4604        let r = world.registry_mut();
4605        let mut p = PipelineBuilder::<&u32>::new()
4606            .then(ref_wrap_err, r)
4607            .cloned();
4608        assert_eq!(p.run(&mut world, &val), Err("fail".into()));
4609    }
4610
4611    // =========================================================================
4612    // Dispatch combinator
4613    // =========================================================================
4614
4615    #[test]
4616    fn dispatch_to_handler() {
4617        let mut wb = WorldBuilder::new();
4618        wb.register::<u64>(0);
4619        let mut world = wb.build();
4620
4621        fn store(mut out: ResMut<u64>, val: u32) {
4622            *out = val as u64;
4623        }
4624
4625        let r = world.registry_mut();
4626        let handler = PipelineBuilder::<u32>::new().then(store, r).build();
4627
4628        let mut p = PipelineBuilder::<u32>::new()
4629            .then(|x: u32| x * 2, r)
4630            .dispatch(handler)
4631            .build();
4632
4633        p.run(&mut world, 5);
4634        assert_eq!(*world.resource::<u64>(), 10);
4635    }
4636
4637    #[test]
4638    fn dispatch_to_fanout() {
4639        let mut wb = WorldBuilder::new();
4640        wb.register::<u64>(0);
4641        wb.register::<i64>(0);
4642        let mut world = wb.build();
4643
4644        fn write_u64(mut sink: ResMut<u64>, event: &u32) {
4645            *sink += *event as u64;
4646        }
4647        fn write_i64(mut sink: ResMut<i64>, event: &u32) {
4648            *sink += *event as i64;
4649        }
4650
4651        let h1 = write_u64.into_handler(world.registry());
4652        let h2 = write_i64.into_handler(world.registry());
4653        let fan = fan_out!(h1, h2);
4654
4655        let r = world.registry_mut();
4656        let mut p = PipelineBuilder::<u32>::new()
4657            .then(|x: u32| x * 2, r)
4658            .dispatch(fan)
4659            .build();
4660
4661        p.run(&mut world, 5);
4662        assert_eq!(*world.resource::<u64>(), 10);
4663        assert_eq!(*world.resource::<i64>(), 10);
4664    }
4665
4666    #[test]
4667    fn dispatch_to_broadcast() {
4668        let mut wb = WorldBuilder::new();
4669        wb.register::<u64>(0);
4670        let mut world = wb.build();
4671
4672        fn write_u64(mut sink: ResMut<u64>, event: &u32) {
4673            *sink += *event as u64;
4674        }
4675
4676        let mut broadcast = crate::Broadcast::<u32>::new();
4677        broadcast.add(write_u64.into_handler(world.registry()));
4678        broadcast.add(write_u64.into_handler(world.registry()));
4679
4680        let r = world.registry_mut();
4681        let mut p = PipelineBuilder::<u32>::new()
4682            .then(|x: u32| x + 1, r)
4683            .dispatch(broadcast)
4684            .build();
4685
4686        p.run(&mut world, 4);
4687        assert_eq!(*world.resource::<u64>(), 10); // 5 + 5
4688    }
4689
4690    #[test]
4691    fn dispatch_build_produces_handler() {
4692        let mut wb = WorldBuilder::new();
4693        wb.register::<u64>(0);
4694        let mut world = wb.build();
4695
4696        fn store(mut out: ResMut<u64>, val: u32) {
4697            *out = val as u64;
4698        }
4699
4700        let r = world.registry_mut();
4701        let inner = PipelineBuilder::<u32>::new().then(store, r).build();
4702
4703        let mut pipeline: Box<dyn Handler<u32>> = Box::new(
4704            PipelineBuilder::<u32>::new()
4705                .then(|x: u32| x + 1, r)
4706                .dispatch(inner)
4707                .build(),
4708        );
4709
4710        pipeline.run(&mut world, 9);
4711        assert_eq!(*world.resource::<u64>(), 10);
4712    }
4713
4714    // -- Guard combinator --
4715
4716    #[test]
4717    fn pipeline_guard_keeps() {
4718        fn sink(mut out: ResMut<u64>, val: Option<u64>) {
4719            *out = val.unwrap_or(0);
4720        }
4721        let mut wb = WorldBuilder::new();
4722        wb.register::<u64>(0);
4723        let mut world = wb.build();
4724        let reg = world.registry();
4725
4726        let mut p = PipelineBuilder::<u32>::new()
4727            .then(|x: u32| x as u64, reg)
4728            .guard(|v: &u64| *v > 3, reg)
4729            .then(sink, reg);
4730
4731        p.run(&mut world, 5u32);
4732        assert_eq!(*world.resource::<u64>(), 5);
4733    }
4734
4735    #[test]
4736    fn pipeline_guard_drops() {
4737        fn sink(mut out: ResMut<u64>, val: Option<u64>) {
4738            *out = val.unwrap_or(999);
4739        }
4740        let mut wb = WorldBuilder::new();
4741        wb.register::<u64>(0);
4742        let mut world = wb.build();
4743        let reg = world.registry();
4744
4745        let mut p = PipelineBuilder::<u32>::new()
4746            .then(|x: u32| x as u64, reg)
4747            .guard(|v: &u64| *v > 10, reg)
4748            .then(sink, reg);
4749
4750        p.run(&mut world, 5u32);
4751        assert_eq!(*world.resource::<u64>(), 999);
4752    }
4753
4754    // -- Tap combinator --
4755
4756    #[test]
4757    fn pipeline_tap_observes_without_changing() {
4758        fn sink(mut out: ResMut<u64>, val: u64) {
4759            *out = val;
4760        }
4761        let mut wb = WorldBuilder::new();
4762        wb.register::<u64>(0);
4763        wb.register::<bool>(false);
4764        let mut world = wb.build();
4765        let reg = world.registry();
4766
4767        let mut p = PipelineBuilder::<u32>::new()
4768            .then(|x: u32| x as u64 * 2, reg)
4769            .tap(
4770                |w: &mut World, val: &u64| {
4771                    *w.resource_mut::<bool>() = *val == 10;
4772                },
4773                reg,
4774            )
4775            .then(sink, reg);
4776
4777        p.run(&mut world, 5u32);
4778        assert_eq!(*world.resource::<u64>(), 10); // value passed through
4779        assert!(*world.resource::<bool>()); // tap fired
4780    }
4781
4782    // -- Route combinator --
4783
4784    #[test]
4785    fn pipeline_route_true_arm() {
4786        fn sink(mut out: ResMut<u64>, val: u64) {
4787            *out = val;
4788        }
4789        let mut wb = WorldBuilder::new();
4790        wb.register::<u64>(0);
4791        let mut world = wb.build();
4792        let reg = world.registry();
4793
4794        let arm_t = PipelineBuilder::new().then(|x: u64| x * 2, reg);
4795        let arm_f = PipelineBuilder::new().then(|x: u64| x * 3, reg);
4796
4797        let mut p = PipelineBuilder::<u32>::new()
4798            .then(|x: u32| x as u64, reg)
4799            .route(|v: &u64| *v > 3, reg, arm_t, arm_f)
4800            .then(sink, reg);
4801
4802        p.run(&mut world, 5u32); // 5 > 3 → true arm → double → 10
4803        assert_eq!(*world.resource::<u64>(), 10);
4804    }
4805
4806    #[test]
4807    fn pipeline_route_false_arm() {
4808        fn sink(mut out: ResMut<u64>, val: u64) {
4809            *out = val;
4810        }
4811        let mut wb = WorldBuilder::new();
4812        wb.register::<u64>(0);
4813        let mut world = wb.build();
4814        let reg = world.registry();
4815
4816        let arm_t = PipelineBuilder::new().then(|x: u64| x * 2, reg);
4817        let arm_f = PipelineBuilder::new().then(|x: u64| x * 3, reg);
4818
4819        let mut p = PipelineBuilder::<u32>::new()
4820            .then(|x: u32| x as u64, reg)
4821            .route(|v: &u64| *v > 10, reg, arm_t, arm_f)
4822            .then(sink, reg);
4823
4824        p.run(&mut world, 5u32); // 5 <= 10 → false arm → triple → 15
4825        assert_eq!(*world.resource::<u64>(), 15);
4826    }
4827
4828    #[test]
4829    fn pipeline_route_nested() {
4830        fn sink(mut out: ResMut<u64>, val: u64) {
4831            *out = val;
4832        }
4833        let mut wb = WorldBuilder::new();
4834        wb.register::<u64>(0);
4835        let mut world = wb.build();
4836        let reg = world.registry();
4837
4838        // N-ary via nesting: <5 → +100, 5..10 → +200, >=10 → +300
4839        let inner_t = PipelineBuilder::new().then(|x: u64| x + 200, reg);
4840        let inner_f = PipelineBuilder::new().then(|x: u64| x + 300, reg);
4841        let outer_t = PipelineBuilder::new().then(|x: u64| x + 100, reg);
4842        let outer_f = PipelineBuilder::new().then(|x: u64| x, reg).route(
4843            |v: &u64| *v < 10,
4844            reg,
4845            inner_t,
4846            inner_f,
4847        );
4848
4849        let mut p = PipelineBuilder::<u32>::new()
4850            .then(|x: u32| x as u64, reg)
4851            .route(|v: &u64| *v < 5, reg, outer_t, outer_f)
4852            .then(sink, reg);
4853
4854        p.run(&mut world, 3u32); // 3 < 5 → +100 → 103
4855        assert_eq!(*world.resource::<u64>(), 103);
4856
4857        p.run(&mut world, 7u32); // 7 >= 5, 7 < 10 → +200 → 207
4858        assert_eq!(*world.resource::<u64>(), 207);
4859
4860        p.run(&mut world, 15u32); // 15 >= 5, 15 >= 10 → +300 → 315
4861        assert_eq!(*world.resource::<u64>(), 315);
4862    }
4863
4864    // -- Tee combinator --
4865
4866    #[test]
4867    fn pipeline_tee_side_effect_chain() {
4868        use crate::dag::DagArmSeed;
4869
4870        fn log_step(mut counter: ResMut<u32>, _val: &u64) {
4871            *counter += 1;
4872        }
4873        fn sink(mut out: ResMut<u64>, val: u64) {
4874            *out = val;
4875        }
4876        let mut wb = WorldBuilder::new();
4877        wb.register::<u64>(0);
4878        wb.register::<u32>(0);
4879        let mut world = wb.build();
4880        let reg = world.registry();
4881
4882        let side = DagArmSeed::new().then(log_step, reg);
4883
4884        let mut p = PipelineBuilder::<u32>::new()
4885            .then(|x: u32| x as u64 * 2, reg)
4886            .tee(side)
4887            .then(sink, reg);
4888
4889        p.run(&mut world, 5u32);
4890        assert_eq!(*world.resource::<u64>(), 10); // value passed through
4891        assert_eq!(*world.resource::<u32>(), 1); // side-effect fired
4892
4893        p.run(&mut world, 7u32);
4894        assert_eq!(*world.resource::<u64>(), 14);
4895        assert_eq!(*world.resource::<u32>(), 2);
4896    }
4897
4898    // -- Dedup combinator --
4899
4900    #[test]
4901    fn pipeline_dedup_suppresses_unchanged() {
4902        fn sink(mut out: ResMut<u32>, val: Option<u64>) {
4903            if val.is_some() {
4904                *out += 1;
4905            }
4906        }
4907        let mut wb = WorldBuilder::new();
4908        wb.register::<u32>(0);
4909        let mut world = wb.build();
4910        let reg = world.registry();
4911
4912        let mut p = PipelineBuilder::<u32>::new()
4913            .then(|x: u32| x as u64 / 2, reg)
4914            .dedup()
4915            .then(sink, reg);
4916
4917        p.run(&mut world, 4u32); // 2 — first, Some
4918        assert_eq!(*world.resource::<u32>(), 1);
4919
4920        p.run(&mut world, 5u32); // 2 — same, None
4921        assert_eq!(*world.resource::<u32>(), 1);
4922
4923        p.run(&mut world, 6u32); // 3 — changed, Some
4924        assert_eq!(*world.resource::<u32>(), 2);
4925    }
4926
4927    // -- Bool combinators --
4928
4929    #[test]
4930    fn pipeline_not() {
4931        fn sink(mut out: ResMut<bool>, val: bool) {
4932            *out = val;
4933        }
4934        let mut wb = WorldBuilder::new();
4935        wb.register::<bool>(false);
4936        let mut world = wb.build();
4937        let reg = world.registry();
4938
4939        let mut p = PipelineBuilder::<u32>::new()
4940            .then(|x: u32| x > 5, reg)
4941            .not()
4942            .then(sink, reg);
4943
4944        p.run(&mut world, 3u32); // 3 > 5 = false, not = true
4945        assert!(*world.resource::<bool>());
4946
4947        p.run(&mut world, 10u32); // 10 > 5 = true, not = false
4948        assert!(!*world.resource::<bool>());
4949    }
4950
4951    #[test]
4952    fn pipeline_and() {
4953        fn sink(mut out: ResMut<bool>, val: bool) {
4954            *out = val;
4955        }
4956        let mut wb = WorldBuilder::new();
4957        wb.register::<bool>(true);
4958        let mut world = wb.build();
4959        let reg = world.registry();
4960
4961        let mut p = PipelineBuilder::<u32>::new()
4962            .then(|x: u32| x > 5, reg)
4963            .and(|w: &mut World| *w.resource::<bool>(), reg)
4964            .then(sink, reg);
4965
4966        p.run(&mut world, 10u32); // true && true = true
4967        assert!(*world.resource::<bool>());
4968
4969        *world.resource_mut::<bool>() = false;
4970        p.run(&mut world, 10u32); // true && false = false
4971        assert!(!*world.resource::<bool>());
4972    }
4973
4974    #[test]
4975    fn pipeline_or() {
4976        fn sink(mut out: ResMut<bool>, val: bool) {
4977            *out = val;
4978        }
4979        let mut wb = WorldBuilder::new();
4980        wb.register::<bool>(false);
4981        let mut world = wb.build();
4982        let reg = world.registry();
4983
4984        let mut p = PipelineBuilder::<u32>::new()
4985            .then(|x: u32| x > 5, reg)
4986            .or(|w: &mut World| *w.resource::<bool>(), reg)
4987            .then(sink, reg);
4988
4989        p.run(&mut world, 3u32); // false || false = false
4990        assert!(!*world.resource::<bool>());
4991
4992        *world.resource_mut::<bool>() = true;
4993        p.run(&mut world, 3u32); // false || true = true
4994        assert!(*world.resource::<bool>());
4995    }
4996
4997    #[test]
4998    fn pipeline_xor() {
4999        fn sink(mut out: ResMut<bool>, val: bool) {
5000            *out = val;
5001        }
5002        let mut wb = WorldBuilder::new();
5003        wb.register::<bool>(true);
5004        let mut world = wb.build();
5005        let reg = world.registry();
5006
5007        let mut p = PipelineBuilder::<u32>::new()
5008            .then(|x: u32| x > 5, reg)
5009            .xor(|w: &mut World| *w.resource::<bool>(), reg)
5010            .then(sink, reg);
5011
5012        p.run(&mut world, 10u32); // true ^ true = false
5013        assert!(!*world.resource::<bool>());
5014    }
5015
5016    // =========================================================================
5017    // Splat — tuple destructuring
5018    // =========================================================================
5019
5020    #[test]
5021    fn splat2_closure_on_start() {
5022        let mut world = WorldBuilder::new().build();
5023        let r = world.registry_mut();
5024        let mut p = PipelineBuilder::<(u32, u64)>::new()
5025            .splat()
5026            .then(|a: u32, b: u64| a as u64 + b, r);
5027        assert_eq!(p.run(&mut world, (3, 7)), 10);
5028    }
5029
5030    #[test]
5031    fn splat2_named_fn_with_param() {
5032        let mut wb = WorldBuilder::new();
5033        wb.register::<u64>(100);
5034        let mut world = wb.build();
5035
5036        fn process(base: Res<u64>, a: u32, b: u32) -> u64 {
5037            *base + a as u64 + b as u64
5038        }
5039
5040        let r = world.registry_mut();
5041        let mut p = PipelineBuilder::<(u32, u32)>::new()
5042            .splat()
5043            .then(process, r);
5044        assert_eq!(p.run(&mut world, (3, 7)), 110);
5045    }
5046
5047    #[test]
5048    fn splat2_mid_chain() {
5049        let mut world = WorldBuilder::new().build();
5050        let r = world.registry_mut();
5051        let mut p = PipelineBuilder::<u32>::new()
5052            .then(|x: u32| (x, x * 2), r)
5053            .splat()
5054            .then(|a: u32, b: u32| a as u64 + b as u64, r);
5055        assert_eq!(p.run(&mut world, 5), 15); // 5 + 10
5056    }
5057
5058    #[test]
5059    fn splat3_closure_on_start() {
5060        let mut world = WorldBuilder::new().build();
5061        let r = world.registry_mut();
5062        let mut p = PipelineBuilder::<(u32, u32, u32)>::new()
5063            .splat()
5064            .then(|a: u32, b: u32, c: u32| a + b + c, r);
5065        assert_eq!(p.run(&mut world, (1, 2, 3)), 6);
5066    }
5067
5068    #[test]
5069    fn splat3_named_fn_with_param() {
5070        let mut wb = WorldBuilder::new();
5071        wb.register::<u64>(10);
5072        let mut world = wb.build();
5073
5074        fn process(factor: Res<u64>, a: u32, b: u32, c: u32) -> u64 {
5075            *factor * (a + b + c) as u64
5076        }
5077
5078        let r = world.registry_mut();
5079        let mut p = PipelineBuilder::<(u32, u32, u32)>::new()
5080            .splat()
5081            .then(process, r);
5082        assert_eq!(p.run(&mut world, (1, 2, 3)), 60);
5083    }
5084
5085    #[test]
5086    fn splat4_mid_chain() {
5087        let mut world = WorldBuilder::new().build();
5088        let r = world.registry_mut();
5089        let mut p = PipelineBuilder::<u32>::new()
5090            .then(|x: u32| (x, x + 1, x + 2, x + 3), r)
5091            .splat()
5092            .then(|a: u32, b: u32, c: u32, d: u32| (a + b + c + d) as u64, r);
5093        assert_eq!(p.run(&mut world, 10), 46); // 10+11+12+13
5094    }
5095
5096    #[test]
5097    fn splat5_closure_on_start() {
5098        let mut world = WorldBuilder::new().build();
5099        let r = world.registry_mut();
5100        let mut p = PipelineBuilder::<(u8, u8, u8, u8, u8)>::new().splat().then(
5101            |a: u8, b: u8, c: u8, d: u8, e: u8| {
5102                (a as u64) + (b as u64) + (c as u64) + (d as u64) + (e as u64)
5103            },
5104            r,
5105        );
5106        assert_eq!(p.run(&mut world, (1, 2, 3, 4, 5)), 15);
5107    }
5108
5109    #[test]
5110    fn splat_build_into_handler() {
5111        let mut wb = WorldBuilder::new();
5112        wb.register::<u64>(0);
5113        let mut world = wb.build();
5114
5115        fn store(mut out: ResMut<u64>, a: u32, b: u32) {
5116            *out = a as u64 + b as u64;
5117        }
5118
5119        let r = world.registry_mut();
5120        let mut pipeline = PipelineBuilder::<(u32, u32)>::new()
5121            .splat()
5122            .then(store, r)
5123            .build();
5124
5125        pipeline.run(&mut world, (3, 7));
5126        assert_eq!(*world.resource::<u64>(), 10);
5127    }
5128
5129    #[test]
5130    fn splat_build_batch() {
5131        let mut wb = WorldBuilder::new();
5132        wb.register::<u64>(0);
5133        let mut world = wb.build();
5134
5135        fn accumulate(mut sum: ResMut<u64>, a: u32, b: u32) {
5136            *sum += a as u64 + b as u64;
5137        }
5138
5139        let r = world.registry_mut();
5140        let mut batch = PipelineBuilder::<(u32, u32)>::new()
5141            .splat()
5142            .then(accumulate, r)
5143            .build_batch(8);
5144
5145        batch
5146            .input_mut()
5147            .extend_from_slice(&[(1, 2), (3, 4), (5, 6)]);
5148        batch.run(&mut world);
5149        assert_eq!(*world.resource::<u64>(), 21); // 3+7+11
5150    }
5151
5152    #[test]
5153    #[should_panic(expected = "conflicting access")]
5154    fn splat_access_conflict_detected() {
5155        let mut wb = WorldBuilder::new();
5156        wb.register::<u64>(0);
5157        let mut world = wb.build();
5158
5159        fn bad(a: ResMut<u64>, _b: ResMut<u64>, _x: u32, _y: u32) {
5160            let _ = a;
5161        }
5162
5163        let r = world.registry_mut();
5164        // Should panic on duplicate ResMut<u64>
5165        let _ = PipelineBuilder::<(u32, u32)>::new().splat().then(bad, r);
5166    }
5167
5168    // -- Then (previously switch) --
5169
5170    #[test]
5171    fn pipeline_then_branching() {
5172        fn double(x: u32) -> u64 {
5173            x as u64 * 2
5174        }
5175        fn sink(mut out: ResMut<u64>, val: u64) {
5176            *out = val;
5177        }
5178
5179        let mut wb = WorldBuilder::new();
5180        wb.register::<u64>(0);
5181        let mut world = wb.build();
5182        let reg = world.registry();
5183
5184        let mut pipeline = PipelineBuilder::<u32>::new()
5185            .then(double, reg)
5186            .then(|val: u64| if val > 10 { val * 100 } else { val + 1 }, reg)
5187            .then(sink, reg)
5188            .build();
5189
5190        pipeline.run(&mut world, 10u32); // 20 > 10 → 2000
5191        assert_eq!(*world.resource::<u64>(), 2000);
5192
5193        pipeline.run(&mut world, 3u32); // 6 <= 10 → 7
5194        assert_eq!(*world.resource::<u64>(), 7);
5195    }
5196
5197    #[test]
5198    fn pipeline_then_3_way() {
5199        fn sink(mut out: ResMut<u64>, val: u64) {
5200            *out = val;
5201        }
5202
5203        let mut wb = WorldBuilder::new();
5204        wb.register::<u64>(0);
5205        let mut world = wb.build();
5206        let reg = world.registry();
5207
5208        let mut pipeline = PipelineBuilder::<u32>::new()
5209            .then(
5210                |val: u32| match val % 3 {
5211                    0 => val as u64 + 100,
5212                    1 => val as u64 + 200,
5213                    _ => val as u64 + 300,
5214                },
5215                reg,
5216            )
5217            .then(sink, reg)
5218            .build();
5219
5220        pipeline.run(&mut world, 6u32); // 6 % 3 == 0 → 106
5221        assert_eq!(*world.resource::<u64>(), 106);
5222
5223        pipeline.run(&mut world, 7u32); // 7 % 3 == 1 → 207
5224        assert_eq!(*world.resource::<u64>(), 207);
5225
5226        pipeline.run(&mut world, 8u32); // 8 % 3 == 2 → 308
5227        assert_eq!(*world.resource::<u64>(), 308);
5228    }
5229
5230    #[test]
5231    fn pipeline_then_with_resolve_step() {
5232        fn add_offset(offset: Res<i64>, val: u32) -> u64 {
5233            (*offset + val as i64) as u64
5234        }
5235        fn plain_double(val: u32) -> u64 {
5236            val as u64 * 2
5237        }
5238        fn sink(mut out: ResMut<u64>, val: u64) {
5239            *out = val;
5240        }
5241
5242        let mut wb = WorldBuilder::new();
5243        wb.register::<u64>(0);
5244        wb.register::<i64>(100);
5245        let mut world = wb.build();
5246        let reg = world.registry();
5247
5248        let mut arm_offset = resolve_step(add_offset, reg);
5249        let mut arm_double = resolve_step(plain_double, reg);
5250
5251        let mut pipeline = PipelineBuilder::<u32>::new()
5252            .then(
5253                move |world: &mut World, val: u32| {
5254                    if val > 10 {
5255                        arm_offset(world, val)
5256                    } else {
5257                        arm_double(world, val)
5258                    }
5259                },
5260                reg,
5261            )
5262            .then(sink, reg)
5263            .build();
5264
5265        pipeline.run(&mut world, 20u32); // > 10 → offset → 100 + 20 = 120
5266        assert_eq!(*world.resource::<u64>(), 120);
5267
5268        pipeline.run(&mut world, 5u32); // <= 10 → double → 10
5269        assert_eq!(*world.resource::<u64>(), 10);
5270    }
5271
5272    #[test]
5273    fn batch_pipeline_then_branching() {
5274        fn sink(mut out: ResMut<u64>, val: u64) {
5275            *out += val;
5276        }
5277
5278        let mut wb = WorldBuilder::new();
5279        wb.register::<u64>(0);
5280        let mut world = wb.build();
5281        let reg = world.registry();
5282
5283        let mut batch = PipelineBuilder::<u32>::new()
5284            .then(
5285                |val: u32| {
5286                    if val % 2 == 0 {
5287                        val as u64 * 10
5288                    } else {
5289                        val as u64
5290                    }
5291                },
5292                reg,
5293            )
5294            .then(sink, reg)
5295            .build_batch(8);
5296
5297        batch.input_mut().extend([1, 2, 3, 4]);
5298        batch.run(&mut world);
5299
5300        // 1 → 1, 2 → 20, 3 → 3, 4 → 40 = 64
5301        assert_eq!(*world.resource::<u64>(), 64);
5302    }
5303
5304    // -- IntoRefStep with Param: named functions --
5305
5306    #[test]
5307    fn guard_named_fn_with_param() {
5308        fn above_threshold(threshold: Res<u64>, val: &u64) -> bool {
5309            *val > *threshold
5310        }
5311        fn sink(mut out: ResMut<i64>, val: Option<u64>) {
5312            *out = val.map_or(-1, |v| v as i64);
5313        }
5314        let mut wb = WorldBuilder::new();
5315        wb.register::<u64>(5); // threshold
5316        wb.register::<i64>(0);
5317        let mut world = wb.build();
5318        let reg = world.registry();
5319
5320        let mut p = PipelineBuilder::<u32>::new()
5321            .then(|x: u32| x as u64, reg)
5322            .guard(above_threshold, reg)
5323            .then(sink, reg);
5324
5325        p.run(&mut world, 10u32); // 10 > 5 → Some(10)
5326        assert_eq!(*world.resource::<i64>(), 10);
5327
5328        p.run(&mut world, 3u32); // 3 <= 5 → None → -1
5329        assert_eq!(*world.resource::<i64>(), -1);
5330    }
5331
5332    #[test]
5333    fn filter_named_fn_with_param() {
5334        fn is_allowed(allowed: Res<u64>, val: &u64) -> bool {
5335            *val != *allowed
5336        }
5337        fn count(mut ctr: ResMut<i64>, _val: u64) {
5338            *ctr += 1;
5339        }
5340        let mut wb = WorldBuilder::new();
5341        wb.register::<u64>(42); // blocked value
5342        wb.register::<i64>(0);
5343        let mut world = wb.build();
5344        let reg = world.registry();
5345
5346        let mut p = PipelineBuilder::<u32>::new()
5347            .then(|x: u32| -> Option<u64> { Some(x as u64) }, reg)
5348            .filter(is_allowed, reg)
5349            .map(count, reg)
5350            .unwrap_or(());
5351
5352        for v in [1u32, 42, 5, 42, 10] {
5353            p.run(&mut world, v);
5354        }
5355        assert_eq!(*world.resource::<i64>(), 3); // 42 filtered out twice
5356    }
5357
5358    #[test]
5359    fn inspect_named_fn_with_param() {
5360        fn log_value(mut log: ResMut<Vec<u64>>, val: &u64) {
5361            log.push(*val);
5362        }
5363        let mut wb = WorldBuilder::new();
5364        wb.register::<Vec<u64>>(Vec::new());
5365        let mut world = wb.build();
5366        let reg = world.registry();
5367
5368        let mut p = PipelineBuilder::<u32>::new()
5369            .then(|x: u32| -> Option<u64> { Some(x as u64) }, reg)
5370            .inspect(log_value, reg)
5371            .unwrap_or(0);
5372
5373        for v in [1u32, 2, 3] {
5374            p.run(&mut world, v);
5375        }
5376        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[1, 2, 3]);
5377    }
5378
5379    #[test]
5380    fn tap_named_fn_with_param() {
5381        fn observe(mut log: ResMut<Vec<u64>>, val: &u64) {
5382            log.push(*val);
5383        }
5384        fn sink(mut out: ResMut<u64>, val: u64) {
5385            *out = val;
5386        }
5387        let mut wb = WorldBuilder::new();
5388        wb.register::<u64>(0);
5389        wb.register::<Vec<u64>>(Vec::new());
5390        let mut world = wb.build();
5391        let reg = world.registry();
5392
5393        let mut p = PipelineBuilder::<u32>::new()
5394            .then(|x: u32| x as u64, reg)
5395            .tap(observe, reg)
5396            .then(sink, reg);
5397
5398        p.run(&mut world, 7u32);
5399        assert_eq!(*world.resource::<u64>(), 7);
5400        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[7]);
5401    }
5402
5403    // -- IntoProducer with Param: named functions --
5404
5405    #[test]
5406    fn and_named_fn_with_param() {
5407        fn check_enabled(flag: Res<bool>) -> bool {
5408            *flag
5409        }
5410        let mut wb = WorldBuilder::new();
5411        wb.register::<bool>(true);
5412        let mut world = wb.build();
5413        let reg = world.registry();
5414
5415        let mut p = PipelineBuilder::<u32>::new()
5416            .then(|_x: u32| true, reg)
5417            .and(check_enabled, reg);
5418
5419        assert!(p.run(&mut world, 0u32));
5420
5421        *world.resource_mut::<bool>() = false;
5422        assert!(!p.run(&mut world, 0u32)); // short-circuit: true AND false
5423    }
5424
5425    #[test]
5426    fn or_named_fn_with_param() {
5427        fn check_enabled(flag: Res<bool>) -> bool {
5428            *flag
5429        }
5430        let mut wb = WorldBuilder::new();
5431        wb.register::<bool>(true);
5432        let mut world = wb.build();
5433        let reg = world.registry();
5434
5435        let mut p = PipelineBuilder::<u32>::new()
5436            .then(|_x: u32| false, reg)
5437            .or(check_enabled, reg);
5438
5439        assert!(p.run(&mut world, 0u32)); // false OR true
5440
5441        *world.resource_mut::<bool>() = false;
5442        assert!(!p.run(&mut world, 0u32)); // false OR false
5443    }
5444
5445    #[test]
5446    fn on_none_named_fn_with_param() {
5447        fn log_miss(mut ctr: ResMut<u64>) {
5448            *ctr += 1;
5449        }
5450        let mut wb = WorldBuilder::new();
5451        wb.register::<u64>(0);
5452        let mut world = wb.build();
5453        let reg = world.registry();
5454
5455        let mut p = PipelineBuilder::<u32>::new()
5456            .then(
5457                |x: u32| -> Option<u32> { if x > 5 { Some(x) } else { None } },
5458                reg,
5459            )
5460            .on_none(log_miss, reg)
5461            .unwrap_or(0);
5462
5463        for v in [1u32, 10, 3, 20] {
5464            p.run(&mut world, v);
5465        }
5466        assert_eq!(*world.resource::<u64>(), 2); // 1 and 3 are None
5467    }
5468
5469    #[test]
5470    fn ok_or_else_named_fn_with_param() {
5471        fn make_error(msg: Res<String>) -> String {
5472            (*msg).clone()
5473        }
5474        let mut wb = WorldBuilder::new();
5475        wb.register::<String>("not found".into());
5476        let mut world = wb.build();
5477        let reg = world.registry();
5478
5479        let mut p = PipelineBuilder::<u32>::new()
5480            .then(
5481                |x: u32| -> Option<u32> { if x > 0 { Some(x) } else { None } },
5482                reg,
5483            )
5484            .ok_or_else(make_error, reg);
5485
5486        let r: Result<u32, String> = p.run(&mut world, 5u32);
5487        assert_eq!(r, Ok(5));
5488
5489        let r: Result<u32, String> = p.run(&mut world, 0u32);
5490        assert_eq!(r, Err("not found".into()));
5491    }
5492
5493    #[test]
5494    fn unwrap_or_else_option_named_fn_with_param() {
5495        fn fallback(default: Res<u64>) -> u64 {
5496            *default
5497        }
5498        let mut wb = WorldBuilder::new();
5499        wb.register::<u64>(42);
5500        let mut world = wb.build();
5501        let reg = world.registry();
5502
5503        let mut p = PipelineBuilder::<u32>::new()
5504            .then(
5505                |x: u32| -> Option<u64> { if x > 0 { Some(x as u64) } else { None } },
5506                reg,
5507            )
5508            .unwrap_or_else(fallback, reg);
5509
5510        assert_eq!(p.run(&mut world, 5u32), 5);
5511        assert_eq!(p.run(&mut world, 0u32), 42);
5512    }
5513
5514    // -- IntoStep with Opaque: &mut World closures --
5515
5516    #[test]
5517    fn map_err_named_fn_with_param() {
5518        fn tag_error(prefix: Res<String>, err: String) -> String {
5519            format!("{}: {err}", &*prefix)
5520        }
5521        fn sink(mut out: ResMut<String>, val: Result<u32, String>) {
5522            match val {
5523                Ok(v) => *out = format!("ok:{v}"),
5524                Err(e) => *out = e,
5525            }
5526        }
5527        let mut wb = WorldBuilder::new();
5528        wb.register::<String>("ERR".into());
5529        let mut world = wb.build();
5530        let reg = world.registry();
5531
5532        let mut p = PipelineBuilder::<u32>::new()
5533            .then(
5534                |x: u32| -> Result<u32, String> { if x > 0 { Ok(x) } else { Err("zero".into()) } },
5535                reg,
5536            )
5537            .map_err(tag_error, reg)
5538            .then(sink, reg);
5539
5540        p.run(&mut world, 0u32);
5541        assert_eq!(world.resource::<String>().as_str(), "ERR: zero");
5542
5543        p.run(&mut world, 5u32);
5544        assert_eq!(world.resource::<String>().as_str(), "ok:5");
5545    }
5546
5547    // =========================================================================
5548    // Scan combinator
5549    // =========================================================================
5550
5551    #[test]
5552    fn scan_arity0_closure_running_sum() {
5553        let mut world = WorldBuilder::new().build();
5554        let reg = world.registry();
5555
5556        let mut p = PipelineBuilder::<u64>::new().then(|x: u64| x, reg).scan(
5557            0u64,
5558            |acc: &mut u64, val: u64| {
5559                *acc += val;
5560                Some(*acc)
5561            },
5562            reg,
5563        );
5564
5565        assert_eq!(p.run(&mut world, 10), Some(10));
5566        assert_eq!(p.run(&mut world, 20), Some(30));
5567        assert_eq!(p.run(&mut world, 5), Some(35));
5568    }
5569
5570    #[test]
5571    fn scan_named_fn_with_param() {
5572        let mut wb = WorldBuilder::new();
5573        wb.register::<u64>(100);
5574        let mut world = wb.build();
5575        let reg = world.registry();
5576
5577        fn threshold_scan(limit: Res<u64>, acc: &mut u64, val: u64) -> Option<u64> {
5578            *acc += val;
5579            if *acc > *limit { Some(*acc) } else { None }
5580        }
5581
5582        let mut p =
5583            PipelineBuilder::<u64>::new()
5584                .then(|x: u64| x, reg)
5585                .scan(0u64, threshold_scan, reg);
5586
5587        assert_eq!(p.run(&mut world, 50), None);
5588        assert_eq!(p.run(&mut world, 30), None);
5589        assert_eq!(p.run(&mut world, 25), Some(105));
5590    }
5591
5592    #[test]
5593    fn scan_opaque_closure() {
5594        let mut wb = WorldBuilder::new();
5595        wb.register::<u64>(10);
5596        let mut world = wb.build();
5597        let reg = world.registry();
5598
5599        let mut p = PipelineBuilder::<u64>::new().then(|x: u64| x, reg).scan(
5600            0u64,
5601            |world: &mut World, acc: &mut u64, val: u64| {
5602                let factor = *world.resource::<u64>();
5603                *acc += val * factor;
5604                Some(*acc)
5605            },
5606            reg,
5607        );
5608
5609        assert_eq!(p.run(&mut world, 1), Some(10));
5610        assert_eq!(p.run(&mut world, 2), Some(30));
5611    }
5612
5613    #[test]
5614    fn scan_suppression_returns_none() {
5615        let mut world = WorldBuilder::new().build();
5616        let reg = world.registry();
5617
5618        let mut p = PipelineBuilder::<u64>::new().then(|x: u64| x, reg).scan(
5619            0u64,
5620            |acc: &mut u64, val: u64| -> Option<u64> {
5621                *acc += val;
5622                if *acc > 50 { Some(*acc) } else { None }
5623            },
5624            reg,
5625        );
5626
5627        assert_eq!(p.run(&mut world, 20), None);
5628        assert_eq!(p.run(&mut world, 20), None);
5629        assert_eq!(p.run(&mut world, 20), Some(60));
5630    }
5631
5632    #[test]
5633    fn scan_on_pipeline_start() {
5634        let mut world = WorldBuilder::new().build();
5635        let reg = world.registry();
5636
5637        let mut p = PipelineBuilder::<u64>::new().scan(
5638            0u64,
5639            |acc: &mut u64, val: u64| {
5640                *acc += val;
5641                *acc
5642            },
5643            reg,
5644        );
5645
5646        assert_eq!(p.run(&mut world, 5), 5);
5647        assert_eq!(p.run(&mut world, 3), 8);
5648        assert_eq!(p.run(&mut world, 2), 10);
5649    }
5650
5651    #[test]
5652    fn scan_persistence_across_batch() {
5653        let mut wb = WorldBuilder::new();
5654        wb.register::<u64>(0);
5655        let mut world = wb.build();
5656        let reg = world.registry();
5657
5658        fn store(mut out: ResMut<u64>, val: u64) {
5659            *out = val;
5660        }
5661
5662        let mut p = PipelineBuilder::<u64>::new()
5663            .then(|x: u64| x, reg)
5664            .scan(
5665                0u64,
5666                |acc: &mut u64, val: u64| {
5667                    *acc += val;
5668                    *acc
5669                },
5670                reg,
5671            )
5672            .then(store, reg)
5673            .build_batch(4);
5674
5675        p.input_mut().extend([1, 2, 3]);
5676        p.run(&mut world);
5677
5678        // Accumulator persists: 1, 3, 6
5679        assert_eq!(*world.resource::<u64>(), 6);
5680
5681        p.input_mut().push(4);
5682        p.run(&mut world);
5683        // acc = 6 + 4 = 10
5684        assert_eq!(*world.resource::<u64>(), 10);
5685    }
5686
5687    // =========================================================================
5688    // Build — Option<()> terminal
5689    // =========================================================================
5690
5691    #[test]
5692    fn build_option_unit_terminal() {
5693        let mut wb = WorldBuilder::new();
5694        wb.register::<u64>(0);
5695        let mut world = wb.build();
5696        let r = world.registry_mut();
5697
5698        fn check(x: u32) -> Option<u32> {
5699            if x > 5 { Some(x) } else { None }
5700        }
5701        fn store(mut out: ResMut<u64>, val: u32) {
5702            *out += val as u64;
5703        }
5704
5705        // .map(store) on Option<u32> produces Option<()> — build() must work
5706        let mut p = PipelineBuilder::<u32>::new()
5707            .then(check, r)
5708            .map(store, r)
5709            .build();
5710
5711        p.run(&mut world, 3); // None, skipped
5712        assert_eq!(*world.resource::<u64>(), 0);
5713        p.run(&mut world, 7); // Some, stores
5714        assert_eq!(*world.resource::<u64>(), 7);
5715        p.run(&mut world, 10);
5716        assert_eq!(*world.resource::<u64>(), 17);
5717    }
5718
5719    #[test]
5720    fn build_option_unit_boxes_into_handler() {
5721        let mut wb = WorldBuilder::new();
5722        wb.register::<u64>(0);
5723        let mut world = wb.build();
5724        let r = world.registry_mut();
5725
5726        fn double(x: u32) -> Option<u64> {
5727            if x > 0 { Some(x as u64 * 2) } else { None }
5728        }
5729        fn store(mut out: ResMut<u64>, val: u64) {
5730            *out += val;
5731        }
5732
5733        let mut h: Box<dyn Handler<u32>> = Box::new(
5734            PipelineBuilder::<u32>::new()
5735                .then(double, r)
5736                .map(store, r)
5737                .build(),
5738        );
5739        h.run(&mut world, 0); // None
5740        assert_eq!(*world.resource::<u64>(), 0);
5741        h.run(&mut world, 5); // 10
5742        assert_eq!(*world.resource::<u64>(), 10);
5743    }
5744
5745    // =========================================================================
5746    // Build — borrowed event type
5747    // =========================================================================
5748
5749    #[test]
5750    fn build_borrowed_event_direct() {
5751        let mut wb = WorldBuilder::new();
5752        wb.register::<u64>(0);
5753        let mut world = wb.build();
5754
5755        fn decode(msg: &[u8]) -> u64 {
5756            msg.len() as u64
5757        }
5758        fn store(mut out: ResMut<u64>, val: u64) {
5759            *out = val;
5760        }
5761
5762        // msg declared before p so it outlives the pipeline (drop order).
5763        // Matches real-world usage: pipeline lives long, events come and go.
5764        let msg = vec![1u8, 2, 3];
5765        let r = world.registry_mut();
5766        let mut p = PipelineBuilder::<&[u8]>::new()
5767            .then(decode, r)
5768            .then(store, r)
5769            .build();
5770
5771        p.run(&mut world, &msg);
5772        assert_eq!(*world.resource::<u64>(), 3);
5773    }
5774
5775    #[test]
5776    fn build_borrowed_event_option_unit() {
5777        let mut wb = WorldBuilder::new();
5778        wb.register::<u64>(0);
5779        let mut world = wb.build();
5780
5781        fn decode(msg: &[u8]) -> Option<u64> {
5782            if msg.is_empty() {
5783                None
5784            } else {
5785                Some(msg.len() as u64)
5786            }
5787        }
5788        fn store(mut out: ResMut<u64>, val: u64) {
5789            *out = val;
5790        }
5791
5792        let empty = vec![];
5793        let data = vec![1u8, 2, 3];
5794        let r = world.registry_mut();
5795        let mut p = PipelineBuilder::<&[u8]>::new()
5796            .then(decode, r)
5797            .map(store, r)
5798            .build();
5799
5800        p.run(&mut world, &empty); // None
5801        assert_eq!(*world.resource::<u64>(), 0);
5802        p.run(&mut world, &data); // Some(3)
5803        assert_eq!(*world.resource::<u64>(), 3);
5804    }
5805
5806    // =========================================================================
5807    // NoEvent — pipeline steps with In = () that omit the input parameter
5808    // =========================================================================
5809
5810    #[test]
5811    fn no_event_step_arity_0() {
5812        let mut world = WorldBuilder::new().build();
5813        let r = world.registry_mut();
5814        let mut p = PipelineBuilder::<()>::new().then(|| 42u64, r);
5815        assert_eq!(p.run(&mut world, ()), 42);
5816    }
5817
5818    #[test]
5819    fn no_event_step_arity_1() {
5820        use crate::no_event;
5821
5822        let mut wb = WorldBuilder::new();
5823        wb.register::<u32>(10);
5824        let mut world = wb.build();
5825
5826        fn read_config(config: Res<u32>) -> u64 {
5827            *config as u64
5828        }
5829
5830        let r = world.registry_mut();
5831        let mut p = PipelineBuilder::<()>::new().then(no_event(read_config), r);
5832        assert_eq!(p.run(&mut world, ()), 10);
5833    }
5834
5835    #[test]
5836    fn no_event_step_chained() {
5837        use crate::no_event;
5838
5839        let mut wb = WorldBuilder::new();
5840        wb.register::<u32>(5);
5841        let mut world = wb.build();
5842
5843        fn read_val(val: Res<u32>) -> u64 {
5844            *val as u64
5845        }
5846
5847        let r = world.registry_mut();
5848        let mut p = PipelineBuilder::<()>::new()
5849            .then(no_event(read_val), r)
5850            .then(|x: u64| x * 2, r);
5851        assert_eq!(p.run(&mut world, ()), 10);
5852    }
5853
5854    #[test]
5855    fn no_event_step_as_handler() {
5856        use crate::no_event;
5857
5858        let mut wb = WorldBuilder::new();
5859        wb.register::<u64>(0);
5860        let mut world = wb.build();
5861
5862        fn write_val(mut out: ResMut<u64>) {
5863            *out += 1;
5864        }
5865
5866        let r = world.registry_mut();
5867        let mut p = PipelineBuilder::<()>::new()
5868            .then(no_event(write_val), r)
5869            .build();
5870
5871        p.run(&mut world, ());
5872        p.run(&mut world, ());
5873        assert_eq!(*world.resource::<u64>(), 2);
5874    }
5875}