Skip to main content

nexus_rt/
pipeline.rs

1// Builder return types are necessarily complex — each combinator returns
2// PipelineChain<In, Out, NodeType<Chain, ...>>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4
5//! Pre-resolved pipeline dispatch using [`Param`] steps.
6//!
7//! [`PipelineBuilder`] begins a typed composition chain where each step
8//! is a named function with [`Param`] dependencies resolved at build
9//! time. The result is a monomorphized chain of named node types where dispatch-time
10//! resource access is a single pointer deref per fetch — zero framework overhead.
11//! [`ResourceId`](crate::ResourceId) is a direct pointer, not a HashMap lookup.
12//!
13//! Two dispatch tiers in nexus-rt:
14//! 1. **Pipeline** — static after build, pre-resolved, the workhorse
15//! 2. **Callback** — dynamic registration with per-instance context
16//!
17//! # Step function convention
18//!
19//! Params first, step input last, returns output:
20//!
21//! ```ignore
22//! fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
23//! fn enrich(cache: Res<MarketData>, order: ValidOrder) -> EnrichedOrder { .. }
24//! fn submit(mut gw: ResMut<Gateway>, order: CheckedOrder) { gw.send(order); }
25//! ```
26//!
27//! # Combinator split
28//!
29//! **IntoStep-based (pre-resolved, hot path):**
30//! `.then()`, `.map()`, `.and_then()`, `.catch()`
31//!
32//! **Trait-based (same API for named functions, arity-0 closures, and [`Opaque`] closures):**
33//! `.guard()`, `.filter()`, `.tap()`, `.inspect()`, `.inspect_err()`,
34//! `.on_none()`, `.ok_or_else()`, `.unwrap_or_else()`, `.map_err()`,
35//! `.or_else()`, `.and()`, `.or()`, `.xor()`, `.route()`
36//!
37//! # Combinator quick reference
38//!
39//! **Bare value `T`:** `.then()`, `.tap()`, `.guard()` (→ `Option<T>`),
40//! `.dispatch()`, `.route()`, `.tee()`, `.scan()`, `.dedup()` (→ `Option<T>`)
41//!
42//! **Tuple `(A, B, ...)` (2-5 elements):** `.splat()` (→ splat builder,
43//! call `.then()` with destructured args)
44//!
45//! **`Option<T>`:** `.map()`, `.filter()`, `.inspect()`, `.and_then()`,
46//! `.on_none()`, `.ok_or()` (→ `Result`), `.unwrap_or()` (→ `T`),
47//! `.cloned()` (→ `Option<T>` from `Option<&T>`)
48//!
49//! **`Result<T, E>`:** `.map()`, `.and_then()`, `.catch()` (→ `Option<T>`),
50//! `.map_err()`, `.inspect_err()`, `.ok()` (→ `Option<T>`),
51//! `.unwrap_or()` (→ `T`), `.or_else()`
52//!
53//! **`bool`:** `.not()`, `.and()`, `.or()`, `.xor()`
54//!
55//! **Terminal:** `.build()` (→ `Pipeline`), `.build_batch(cap)`
56//! (→ `BatchPipeline<In>`)
57//!
58//! # Splat — tuple destructuring
59//!
60//! Pipeline steps follow a single-value-in, single-value-out convention.
61//! When a step returns a tuple like `(OrderId, f64)`, the next step
62//! must accept the whole tuple as one argument. `.splat()` destructures
63//! the tuple so the next step receives individual arguments instead:
64//!
65//! ```ignore
66//! // Without splat — next step takes the whole tuple:
67//! fn process(pair: (OrderId, f64)) -> bool { .. }
68//!
69//! // With splat — next step takes individual args:
70//! fn process(id: OrderId, price: f64) -> bool { .. }
71//!
72//! PipelineBuilder::<Order>::new()
73//!     .then(extract, reg)   // Order → (OrderId, f64)
74//!     .splat()              // destructure
75//!     .then(process, reg)   // (OrderId, f64) → bool
76//!     .build();
77//! ```
78//!
79//! Supported for tuples of 2-5 elements. Beyond 5, define a named
80//! struct — if a combinator stage needs that many arguments, a struct
81//! makes the intent clearer and the code more maintainable.
82//!
83//! # Returning pipelines from functions (Rust 2024)
84//!
85//! When a factory function takes `&Registry` and returns `impl Handler<E>`,
86//! Rust 2024 captures the registry borrow in the return type by default.
87//! Use `+ use<...>` to exclude it:
88//!
89//! ```ignore
90//! fn on_order<C: Config>(
91//!     reg: &Registry,
92//! ) -> impl Handler<Order> + use<C> {
93//!     PipelineBuilder::<Order>::new()
94//!         .then(validate::<C>, reg)
95//!         .dispatch(submit::<C>.into_handler(reg))
96//!         .build()
97//! }
98//! ```
99//!
100//! List every type parameter the pipeline captures; omit the `&Registry`
101//! lifetime — it's consumed during `.build()`. See the
102//! [crate-level docs](crate#returning-impl-handler-from-functions-rust-2024)
103//! for the full explanation.
104
105use std::marker::PhantomData;
106
107use crate::Handler;
108use crate::dag::DagArm;
109use crate::handler::{Opaque, Param};
110use crate::world::{Registry, World};
111
112// =============================================================================
113// Step — pre-resolved step with Param state
114// =============================================================================
115
116/// Internal: pre-resolved step with cached Param state.
117///
118/// Users don't construct this directly — it's produced by [`IntoStep`] and
119/// stored inside named chain node types.
120#[doc(hidden)]
121pub struct Step<F, Params: Param> {
122    f: F,
123    state: Params::State,
124    #[allow(dead_code)]
125    name: &'static str,
126}
127
128// =============================================================================
129// StepCall — callable trait for resolved steps
130// =============================================================================
131
132/// Internal: callable trait for resolved steps.
133///
134/// Used as a bound on [`IntoStep::Step`]. Users don't implement this.
135#[doc(hidden)]
136pub trait StepCall<In> {
137    /// The output type of this step.
138    type Out;
139    /// Call this step with a world reference and input value.
140    fn call(&mut self, world: &mut World, input: In) -> Self::Out;
141}
142
143// =============================================================================
144// IntoStep — converts a named function into a resolved step
145// =============================================================================
146
147/// Converts a named function into a pre-resolved pipeline step.
148///
149/// Params first, step input last, returns output. Arity 0 (no
150/// Params) supports closures. Arities 1+ require named functions
151/// (same HRTB+GAT limitation as [`IntoHandler`](crate::IntoHandler)).
152///
153/// # Examples
154///
155/// ```ignore
156/// // Arity 0 — closure works
157/// let step = (|x: u32| x * 2).into_step(registry);
158///
159/// // Arity 1 — named function required
160/// fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
161/// let step = validate.into_step(registry);
162/// ```
163#[diagnostic::on_unimplemented(
164    message = "this function cannot be used as a pipeline step for this input type",
165    note = "if the pipeline output is `Option<T>`, use `.map()` to operate on the inner `T`",
166    note = "if the pipeline output is `Result<T, E>`, use `.map()` for `Ok` or `.catch()` for `Err`",
167    note = "if using a closure with resource params (Res<T>, ResMut<T>), that isn't supported — use a named `fn`"
168)]
169pub trait IntoStep<In, Out, Params> {
170    /// The concrete resolved step type.
171    type Step: StepCall<In, Out = Out>;
172
173    /// Resolve Param state from the registry and produce a step.
174    fn into_step(self, registry: &Registry) -> Self::Step;
175}
176
177// =============================================================================
178// Arity 0 — fn(In) -> Out — closures work (no HRTB+GAT issues)
179// =============================================================================
180
181impl<In, Out, F: FnMut(In) -> Out + 'static> StepCall<In> for Step<F, ()> {
182    type Out = Out;
183    #[inline(always)]
184    fn call(&mut self, _world: &mut World, input: In) -> Out {
185        (self.f)(input)
186    }
187}
188
189impl<In, Out, F: FnMut(In) -> Out + 'static> IntoStep<In, Out, ()> for F {
190    type Step = Step<F, ()>;
191
192    fn into_step(self, registry: &Registry) -> Self::Step {
193        Step {
194            f: self,
195            state: <() as Param>::init(registry),
196            name: std::any::type_name::<F>(),
197        }
198    }
199}
200
201// =============================================================================
202// Arities 1-8 via macro — HRTB with -> Out
203// =============================================================================
204
205macro_rules! impl_into_step {
206    ($($P:ident),+) => {
207        impl<In, Out, F: 'static, $($P: Param + 'static),+>
208            StepCall<In> for Step<F, ($($P,)+)>
209        where
210            for<'a> &'a mut F:
211                FnMut($($P,)+ In) -> Out +
212                FnMut($($P::Item<'a>,)+ In) -> Out,
213        {
214            type Out = Out;
215            #[inline(always)]
216            #[allow(non_snake_case)]
217            fn call(&mut self, world: &mut World, input: In) -> Out {
218                #[allow(clippy::too_many_arguments)]
219                fn call_inner<$($P,)+ Input, Output>(
220                    mut f: impl FnMut($($P,)+ Input) -> Output,
221                    $($P: $P,)+
222                    input: Input,
223                ) -> Output {
224                    f($($P,)+ input)
225                }
226
227                // SAFETY: state was produced by Param::init() on the same Registry
228                // that built this World. Borrows are disjoint — enforced by
229                // conflict detection at build time.
230                #[cfg(debug_assertions)]
231                world.clear_borrows();
232                let ($($P,)+) = unsafe {
233                    <($($P,)+) as Param>::fetch(world, &mut self.state)
234                };
235                call_inner(&mut self.f, $($P,)+ input)
236            }
237        }
238
239        impl<In, Out, F: 'static, $($P: Param + 'static),+>
240            IntoStep<In, Out, ($($P,)+)> for F
241        where
242            for<'a> &'a mut F:
243                FnMut($($P,)+ In) -> Out +
244                FnMut($($P::Item<'a>,)+ In) -> Out,
245        {
246            type Step = Step<F, ($($P,)+)>;
247
248            fn into_step(self, registry: &Registry) -> Self::Step {
249                let state = <($($P,)+) as Param>::init(registry);
250                {
251                    #[allow(non_snake_case)]
252                    let ($($P,)+) = &state;
253                    registry.check_access(&[
254                        $(
255                            (<$P as Param>::resource_id($P),
256                             std::any::type_name::<$P>()),
257                        )+
258                    ]);
259                }
260                Step { f: self, state, name: std::any::type_name::<F>() }
261            }
262        }
263    };
264}
265
266all_tuples!(impl_into_step);
267
268// =============================================================================
269// OpaqueStep — wrapper for opaque closures as steps
270// =============================================================================
271
272/// Internal: wrapper for opaque closures used as pipeline steps.
273///
274/// Unlike [`Step<F, P>`] which stores resolved `Param::State`, this
275/// holds only the function — no state to resolve.
276#[doc(hidden)]
277pub struct OpaqueStep<F> {
278    f: F,
279    #[allow(dead_code)]
280    name: &'static str,
281}
282
283impl<In, Out, F: FnMut(&mut World, In) -> Out + 'static> StepCall<In> for OpaqueStep<F> {
284    type Out = Out;
285    #[inline(always)]
286    fn call(&mut self, world: &mut World, input: In) -> Out {
287        (self.f)(world, input)
288    }
289}
290
291impl<In, Out, F: FnMut(&mut World, In) -> Out + 'static> IntoStep<In, Out, Opaque> for F {
292    type Step = OpaqueStep<F>;
293
294    fn into_step(self, _registry: &Registry) -> Self::Step {
295        OpaqueStep {
296            f: self,
297            name: std::any::type_name::<F>(),
298        }
299    }
300}
301
302// =============================================================================
303// RefStepCall / IntoRefStep — step taking &In, returning Out
304// =============================================================================
305
306/// Internal: callable trait for resolved steps taking input by reference.
307///
308/// Used by combinators like `tap`, `guard`, `filter`, `inspect` that
309/// observe the value without consuming it.
310#[doc(hidden)]
311pub trait RefStepCall<In> {
312    /// The output type of this step.
313    type Out;
314    /// Call this step with a world reference and borrowed input.
315    fn call(&mut self, world: &mut World, input: &In) -> Self::Out;
316}
317
318/// Converts a function into a pre-resolved step taking input by reference.
319///
320/// Same three-tier resolution as [`IntoStep`]:
321///
322/// | Params | Function shape | Example |
323/// |--------|---------------|---------|
324/// | `()` | `FnMut(&In) -> Out` | `\|o: &Order\| o.price > 0.0` |
325/// | `(P0,)...(P0..P7,)` | `fn(Params..., &In) -> Out` | `fn check(c: Res<Config>, o: &Order) -> bool` |
326/// | [`Opaque`] | `FnMut(&mut World, &In) -> Out` | `\|w: &mut World, o: &Order\| { ... }` |
327#[diagnostic::on_unimplemented(
328    message = "this function cannot be used as a reference step for this input type",
329    note = "reference steps (guard, filter, tap, inspect) take `&In`, not `In`",
330    note = "if the pipeline output is `Option<T>`, `.filter()` operates on `&T` inside the Option",
331    note = "closures with resource parameters are not supported — use a named `fn`"
332)]
333pub trait IntoRefStep<In, Out, Params> {
334    /// The concrete resolved step type.
335    type Step: RefStepCall<In, Out = Out>;
336
337    /// Resolve Param state from the registry and produce a step.
338    fn into_ref_step(self, registry: &Registry) -> Self::Step;
339}
340
341// -- Arity 0: FnMut(&In) -> Out — closures work ----------------------------
342
343impl<In, Out, F: FnMut(&In) -> Out + 'static> RefStepCall<In> for Step<F, ()> {
344    type Out = Out;
345    #[inline(always)]
346    fn call(&mut self, _world: &mut World, input: &In) -> Out {
347        (self.f)(input)
348    }
349}
350
351impl<In, Out, F: FnMut(&In) -> Out + 'static> IntoRefStep<In, Out, ()> for F {
352    type Step = Step<F, ()>;
353
354    fn into_ref_step(self, registry: &Registry) -> Self::Step {
355        Step {
356            f: self,
357            state: <() as Param>::init(registry),
358            name: std::any::type_name::<F>(),
359        }
360    }
361}
362
363// -- Arities 1-8: named functions with Param resolution ---------------------
364
365macro_rules! impl_into_ref_step {
366    ($($P:ident),+) => {
367        impl<In, Out, F: 'static, $($P: Param + 'static),+>
368            RefStepCall<In> for Step<F, ($($P,)+)>
369        where
370            for<'a> &'a mut F:
371                FnMut($($P,)+ &In) -> Out +
372                FnMut($($P::Item<'a>,)+ &In) -> Out,
373        {
374            type Out = Out;
375            #[inline(always)]
376            #[allow(non_snake_case)]
377            fn call(&mut self, world: &mut World, input: &In) -> Out {
378                #[allow(clippy::too_many_arguments)]
379                fn call_inner<$($P,)+ Input: ?Sized, Output>(
380                    mut f: impl FnMut($($P,)+ &Input) -> Output,
381                    $($P: $P,)+
382                    input: &Input,
383                ) -> Output {
384                    f($($P,)+ input)
385                }
386
387                // SAFETY: state was produced by Param::init() on the same Registry
388                // that built this World. Borrows are disjoint — enforced by
389                // conflict detection at build time.
390                #[cfg(debug_assertions)]
391                world.clear_borrows();
392                let ($($P,)+) = unsafe {
393                    <($($P,)+) as Param>::fetch(world, &mut self.state)
394                };
395                call_inner(&mut self.f, $($P,)+ input)
396            }
397        }
398
399        impl<In, Out, F: 'static, $($P: Param + 'static),+>
400            IntoRefStep<In, Out, ($($P,)+)> for F
401        where
402            for<'a> &'a mut F:
403                FnMut($($P,)+ &In) -> Out +
404                FnMut($($P::Item<'a>,)+ &In) -> Out,
405        {
406            type Step = Step<F, ($($P,)+)>;
407
408            fn into_ref_step(self, registry: &Registry) -> Self::Step {
409                let state = <($($P,)+) as Param>::init(registry);
410                {
411                    #[allow(non_snake_case)]
412                    let ($($P,)+) = &state;
413                    registry.check_access(&[
414                        $(
415                            (<$P as Param>::resource_id($P),
416                             std::any::type_name::<$P>()),
417                        )+
418                    ]);
419                }
420                Step { f: self, state, name: std::any::type_name::<F>() }
421            }
422        }
423    };
424}
425
426all_tuples!(impl_into_ref_step);
427
428// -- Opaque: FnMut(&mut World, &In) -> Out ---------------------------------
429
430/// Internal: wrapper for opaque closures taking input by reference.
431#[doc(hidden)]
432pub struct OpaqueRefStep<F> {
433    f: F,
434    #[allow(dead_code)]
435    name: &'static str,
436}
437
438impl<In, Out, F: FnMut(&mut World, &In) -> Out + 'static> RefStepCall<In> for OpaqueRefStep<F> {
439    type Out = Out;
440    #[inline(always)]
441    fn call(&mut self, world: &mut World, input: &In) -> Out {
442        (self.f)(world, input)
443    }
444}
445
446impl<In, Out, F: FnMut(&mut World, &In) -> Out + 'static> IntoRefStep<In, Out, Opaque> for F {
447    type Step = OpaqueRefStep<F>;
448
449    fn into_ref_step(self, _registry: &Registry) -> Self::Step {
450        OpaqueRefStep {
451            f: self,
452            name: std::any::type_name::<F>(),
453        }
454    }
455}
456
457// =============================================================================
458// resolve_ref_step — pre-resolve a ref step for manual dispatch
459// =============================================================================
460
461/// Resolve a reference step for manual dispatch.
462///
463/// Returns a closure with pre-resolved [`Param`] state. Reference-input
464/// counterpart of [`resolve_step`].
465pub fn resolve_ref_step<In, Out, Params, S: IntoRefStep<In, Out, Params>>(
466    f: S,
467    registry: &Registry,
468) -> impl FnMut(&mut World, &In) -> Out + use<In, Out, Params, S> {
469    let mut resolved = f.into_ref_step(registry);
470    move |world: &mut World, input: &In| resolved.call(world, input)
471}
472
473// =============================================================================
474// ProducerCall / IntoProducer — step producing a value with no pipeline input
475// =============================================================================
476
477/// Internal: callable trait for resolved steps that produce a value
478/// without receiving pipeline input.
479///
480/// Used by combinators like `and`, `or`, `xor`, `on_none`, `ok_or_else`,
481/// `unwrap_or_else` (Option).
482#[doc(hidden)]
483pub trait ProducerCall {
484    /// The output type of this producer.
485    type Out;
486    /// Call this producer with a world reference.
487    fn call(&mut self, world: &mut World) -> Self::Out;
488}
489
490/// Converts a function into a pre-resolved producer step.
491///
492/// Same three-tier resolution as [`IntoStep`]:
493///
494/// | Params | Function shape | Example |
495/// |--------|---------------|---------|
496/// | `()` | `FnMut() -> Out` | `\|\| true` |
497/// | `(P0,)...(P0..P7,)` | `fn(Params...) -> Out` | `fn is_active(s: Res<State>) -> bool` |
498/// | [`Opaque`] | `FnMut(&mut World) -> Out` | `\|w: &mut World\| { ... }` |
499#[diagnostic::on_unimplemented(
500    message = "this function cannot be used as a producer for this output type",
501    note = "producers take no pipeline input — they produce a value (e.g., default, fallback)",
502    note = "closures with resource parameters are not supported — use a named `fn`"
503)]
504pub trait IntoProducer<Out, Params> {
505    /// The concrete resolved producer type.
506    type Step: ProducerCall<Out = Out>;
507
508    /// Resolve Param state from the registry and produce a step.
509    fn into_producer(self, registry: &Registry) -> Self::Step;
510}
511
512// -- Arity 0: FnMut() -> Out — closures work --------------------------------
513
514impl<Out, F: FnMut() -> Out + 'static> ProducerCall for Step<F, ()> {
515    type Out = Out;
516    #[inline(always)]
517    fn call(&mut self, _world: &mut World) -> Out {
518        (self.f)()
519    }
520}
521
522impl<Out, F: FnMut() -> Out + 'static> IntoProducer<Out, ()> for F {
523    type Step = Step<F, ()>;
524
525    fn into_producer(self, registry: &Registry) -> Self::Step {
526        Step {
527            f: self,
528            state: <() as Param>::init(registry),
529            name: std::any::type_name::<F>(),
530        }
531    }
532}
533
534// -- Arities 1-8: named functions with Param resolution ---------------------
535
536macro_rules! impl_into_producer {
537    ($($P:ident),+) => {
538        impl<Out, F: 'static, $($P: Param + 'static),+>
539            ProducerCall for Step<F, ($($P,)+)>
540        where
541            for<'a> &'a mut F:
542                FnMut($($P,)+) -> Out +
543                FnMut($($P::Item<'a>,)+) -> Out,
544        {
545            type Out = Out;
546            #[inline(always)]
547            #[allow(non_snake_case)]
548            fn call(&mut self, world: &mut World) -> Out {
549                #[allow(clippy::too_many_arguments)]
550                fn call_inner<$($P,)+ Output>(
551                    mut f: impl FnMut($($P,)+) -> Output,
552                    $($P: $P,)+
553                ) -> Output {
554                    f($($P,)+)
555                }
556
557                // SAFETY: state was produced by Param::init() on the same Registry
558                // that built this World. Borrows are disjoint — enforced by
559                // conflict detection at build time.
560                #[cfg(debug_assertions)]
561                world.clear_borrows();
562                let ($($P,)+) = unsafe {
563                    <($($P,)+) as Param>::fetch(world, &mut self.state)
564                };
565                call_inner(&mut self.f, $($P,)+)
566            }
567        }
568
569        impl<Out, F: 'static, $($P: Param + 'static),+>
570            IntoProducer<Out, ($($P,)+)> for F
571        where
572            for<'a> &'a mut F:
573                FnMut($($P,)+) -> Out +
574                FnMut($($P::Item<'a>,)+) -> Out,
575        {
576            type Step = Step<F, ($($P,)+)>;
577
578            fn into_producer(self, registry: &Registry) -> Self::Step {
579                let state = <($($P,)+) as Param>::init(registry);
580                {
581                    #[allow(non_snake_case)]
582                    let ($($P,)+) = &state;
583                    registry.check_access(&[
584                        $(
585                            (<$P as Param>::resource_id($P),
586                             std::any::type_name::<$P>()),
587                        )+
588                    ]);
589                }
590                Step { f: self, state, name: std::any::type_name::<F>() }
591            }
592        }
593    };
594}
595
596all_tuples!(impl_into_producer);
597
598// -- Opaque: FnMut(&mut World) -> Out ---------------------------------------
599
600/// Internal: wrapper for opaque closures used as producers.
601#[doc(hidden)]
602pub struct OpaqueProducer<F> {
603    f: F,
604    #[allow(dead_code)]
605    name: &'static str,
606}
607
608impl<Out, F: FnMut(&mut World) -> Out + 'static> ProducerCall for OpaqueProducer<F> {
609    type Out = Out;
610    #[inline(always)]
611    fn call(&mut self, world: &mut World) -> Out {
612        (self.f)(world)
613    }
614}
615
616impl<Out, F: FnMut(&mut World) -> Out + 'static> IntoProducer<Out, Opaque> for F {
617    type Step = OpaqueProducer<F>;
618
619    fn into_producer(self, _registry: &Registry) -> Self::Step {
620        OpaqueProducer {
621            f: self,
622            name: std::any::type_name::<F>(),
623        }
624    }
625}
626
627// =============================================================================
628// resolve_producer — pre-resolve a producer for manual dispatch
629// =============================================================================
630
631/// Resolve a producer for manual dispatch.
632///
633/// Returns a closure with pre-resolved [`Param`] state. No-input
634/// counterpart of [`resolve_step`].
635pub fn resolve_producer<Out, Params, S: IntoProducer<Out, Params>>(
636    f: S,
637    registry: &Registry,
638) -> impl FnMut(&mut World) -> Out + use<Out, Params, S> {
639    let mut resolved = f.into_producer(registry);
640    move |world: &mut World| resolved.call(world)
641}
642
643// =============================================================================
644// ScanStepCall / IntoScanStep — step with persistent accumulator
645// =============================================================================
646
647/// Internal: callable trait for resolved scan steps.
648///
649/// Like [`StepCall`] but with an additional `&mut Acc` accumulator
650/// argument that persists across invocations.
651#[doc(hidden)]
652pub trait ScanStepCall<Acc, In> {
653    /// The output type of this scan step.
654    type Out;
655    /// Call this scan step with a world reference, accumulator, and input value.
656    fn call(&mut self, world: &mut World, acc: &mut Acc, input: In) -> Self::Out;
657}
658
659/// Converts a function into a pre-resolved scan step with persistent state.
660///
661/// Same three-tier resolution as [`IntoStep`]:
662///
663/// | Params | Function shape | Example |
664/// |--------|---------------|---------|
665/// | `()` | `FnMut(&mut Acc, In) -> Out` | `\|acc, trade\| { *acc += trade.amount; Some(*acc) }` |
666/// | `(P0,)...(P0..P7,)` | `fn(Params..., &mut Acc, In) -> Out` | `fn vwap(c: Res<Config>, acc: &mut State, t: Trade) -> Option<f64>` |
667/// | [`Opaque`] | `FnMut(&mut World, &mut Acc, In) -> Out` | `\|w: &mut World, acc: &mut u64, t: Trade\| { ... }` |
668#[diagnostic::on_unimplemented(
669    message = "this function cannot be used as a scan step",
670    note = "scan steps take `&mut Accumulator` as first param, then resources, then input last",
671    note = "closures with resource parameters are not supported — use a named `fn`"
672)]
673pub trait IntoScanStep<Acc, In, Out, Params> {
674    /// The concrete resolved scan step type.
675    type Step: ScanStepCall<Acc, In, Out = Out>;
676
677    /// Resolve Param state from the registry and produce a scan step.
678    fn into_scan_step(self, registry: &Registry) -> Self::Step;
679}
680
681// -- Arity 0: FnMut(&mut Acc, In) -> Out — closures work --------------------
682
683impl<Acc, In, Out, F: FnMut(&mut Acc, In) -> Out + 'static> ScanStepCall<Acc, In> for Step<F, ()> {
684    type Out = Out;
685    #[inline(always)]
686    fn call(&mut self, _world: &mut World, acc: &mut Acc, input: In) -> Out {
687        (self.f)(acc, input)
688    }
689}
690
691impl<Acc, In, Out, F: FnMut(&mut Acc, In) -> Out + 'static> IntoScanStep<Acc, In, Out, ()> for F {
692    type Step = Step<F, ()>;
693
694    fn into_scan_step(self, registry: &Registry) -> Self::Step {
695        Step {
696            f: self,
697            state: <() as Param>::init(registry),
698            name: std::any::type_name::<F>(),
699        }
700    }
701}
702
703// -- Arities 1-8: named functions with Param resolution ----------------------
704
705macro_rules! impl_into_scan_step {
706    ($($P:ident),+) => {
707        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
708            ScanStepCall<Acc, In> for Step<F, ($($P,)+)>
709        where
710            for<'a> &'a mut F:
711                FnMut($($P,)+ &mut Acc, In) -> Out +
712                FnMut($($P::Item<'a>,)+ &mut Acc, In) -> Out,
713        {
714            type Out = Out;
715            #[inline(always)]
716            #[allow(non_snake_case)]
717            fn call(&mut self, world: &mut World, acc: &mut Acc, input: In) -> Out {
718                #[allow(clippy::too_many_arguments)]
719                fn call_inner<$($P,)+ Accumulator, Input, Output>(
720                    mut f: impl FnMut($($P,)+ &mut Accumulator, Input) -> Output,
721                    $($P: $P,)+
722                    acc: &mut Accumulator,
723                    input: Input,
724                ) -> Output {
725                    f($($P,)+ acc, input)
726                }
727
728                // SAFETY: state was produced by Param::init() on the same Registry
729                // that built this World. Borrows are disjoint — enforced by
730                // conflict detection at build time.
731                #[cfg(debug_assertions)]
732                world.clear_borrows();
733                let ($($P,)+) = unsafe {
734                    <($($P,)+) as Param>::fetch(world, &mut self.state)
735                };
736                call_inner(&mut self.f, $($P,)+ acc, input)
737            }
738        }
739
740        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
741            IntoScanStep<Acc, In, Out, ($($P,)+)> for F
742        where
743            for<'a> &'a mut F:
744                FnMut($($P,)+ &mut Acc, In) -> Out +
745                FnMut($($P::Item<'a>,)+ &mut Acc, In) -> Out,
746        {
747            type Step = Step<F, ($($P,)+)>;
748
749            fn into_scan_step(self, registry: &Registry) -> Self::Step {
750                let state = <($($P,)+) as Param>::init(registry);
751                {
752                    #[allow(non_snake_case)]
753                    let ($($P,)+) = &state;
754                    registry.check_access(&[
755                        $(
756                            (<$P as Param>::resource_id($P),
757                             std::any::type_name::<$P>()),
758                        )+
759                    ]);
760                }
761                Step { f: self, state, name: std::any::type_name::<F>() }
762            }
763        }
764    };
765}
766
767all_tuples!(impl_into_scan_step);
768
769// -- Opaque: FnMut(&mut World, &mut Acc, In) -> Out --------------------------
770
771/// Internal: wrapper for opaque closures used as scan steps.
772#[doc(hidden)]
773pub struct OpaqueScanStep<F> {
774    f: F,
775    #[allow(dead_code)]
776    name: &'static str,
777}
778
779impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, In) -> Out + 'static> ScanStepCall<Acc, In>
780    for OpaqueScanStep<F>
781{
782    type Out = Out;
783    #[inline(always)]
784    fn call(&mut self, world: &mut World, acc: &mut Acc, input: In) -> Out {
785        (self.f)(world, acc, input)
786    }
787}
788
789impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, In) -> Out + 'static>
790    IntoScanStep<Acc, In, Out, Opaque> for F
791{
792    type Step = OpaqueScanStep<F>;
793
794    fn into_scan_step(self, _registry: &Registry) -> Self::Step {
795        OpaqueScanStep {
796            f: self,
797            name: std::any::type_name::<F>(),
798        }
799    }
800}
801
802// =============================================================================
803// resolve_scan_step — pre-resolve a scan step for manual dispatch
804// =============================================================================
805
806/// Resolve a scan step for manual dispatch.
807///
808/// Returns a closure with pre-resolved [`Param`] state. Scan variant
809/// of [`resolve_step`] with an additional `&mut Acc` accumulator.
810pub fn resolve_scan_step<Acc, In, Out, Params, S: IntoScanStep<Acc, In, Out, Params>>(
811    f: S,
812    registry: &Registry,
813) -> impl FnMut(&mut World, &mut Acc, In) -> Out + use<Acc, In, Out, Params, S> {
814    let mut resolved = f.into_scan_step(registry);
815    move |world: &mut World, acc: &mut Acc, input: In| resolved.call(world, acc, input)
816}
817
818// =============================================================================
819// RefScanStepCall / IntoRefScanStep — scan step taking &In
820// =============================================================================
821
822/// Internal: callable trait for resolved scan steps taking input by reference.
823///
824/// DAG variant of [`ScanStepCall`] — each step borrows its input.
825#[doc(hidden)]
826pub trait RefScanStepCall<Acc, In> {
827    /// The output type of this ref-scan step.
828    type Out;
829    /// Call this scan step with a world reference, accumulator, and borrowed input.
830    fn call(&mut self, world: &mut World, acc: &mut Acc, input: &In) -> Self::Out;
831}
832
833/// Converts a function into a pre-resolved ref-scan step with persistent state.
834///
835/// Same three-tier resolution as [`IntoRefStep`]:
836///
837/// | Params | Function shape | Example |
838/// |--------|---------------|---------|
839/// | `()` | `FnMut(&mut Acc, &In) -> Out` | `\|acc, trade: &Trade\| { ... }` |
840/// | `(P0,)...(P0..P7,)` | `fn(Params..., &mut Acc, &In) -> Out` | `fn vwap(c: Res<Config>, acc: &mut State, t: &Trade) -> Option<f64>` |
841/// | [`Opaque`] | `FnMut(&mut World, &mut Acc, &In) -> Out` | `\|w: &mut World, acc: &mut u64, t: &Trade\| { ... }` |
842#[diagnostic::on_unimplemented(
843    message = "this function cannot be used as a reference scan step",
844    note = "reference scan steps take `&mut Accumulator` as first param, then resources, then `&In` last",
845    note = "closures with resource parameters are not supported — use a named `fn`"
846)]
847pub trait IntoRefScanStep<Acc, In, Out, Params> {
848    /// The concrete resolved ref-scan step type.
849    type Step: RefScanStepCall<Acc, In, Out = Out>;
850
851    /// Resolve Param state from the registry and produce a ref-scan step.
852    fn into_ref_scan_step(self, registry: &Registry) -> Self::Step;
853}
854
855// -- Arity 0: FnMut(&mut Acc, &In) -> Out — closures work -------------------
856
857impl<Acc, In, Out, F: FnMut(&mut Acc, &In) -> Out + 'static> RefScanStepCall<Acc, In>
858    for Step<F, ()>
859{
860    type Out = Out;
861    #[inline(always)]
862    fn call(&mut self, _world: &mut World, acc: &mut Acc, input: &In) -> Out {
863        (self.f)(acc, input)
864    }
865}
866
867impl<Acc, In, Out, F: FnMut(&mut Acc, &In) -> Out + 'static> IntoRefScanStep<Acc, In, Out, ()>
868    for F
869{
870    type Step = Step<F, ()>;
871
872    fn into_ref_scan_step(self, registry: &Registry) -> Self::Step {
873        Step {
874            f: self,
875            state: <() as Param>::init(registry),
876            name: std::any::type_name::<F>(),
877        }
878    }
879}
880
881// -- Arities 1-8: named functions with Param resolution ----------------------
882
883macro_rules! impl_into_ref_scan_step {
884    ($($P:ident),+) => {
885        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
886            RefScanStepCall<Acc, In> for Step<F, ($($P,)+)>
887        where
888            for<'a> &'a mut F:
889                FnMut($($P,)+ &mut Acc, &In) -> Out +
890                FnMut($($P::Item<'a>,)+ &mut Acc, &In) -> Out,
891        {
892            type Out = Out;
893            #[inline(always)]
894            #[allow(non_snake_case)]
895            fn call(&mut self, world: &mut World, acc: &mut Acc, input: &In) -> Out {
896                #[allow(clippy::too_many_arguments)]
897                fn call_inner<$($P,)+ Accumulator, Input: ?Sized, Output>(
898                    mut f: impl FnMut($($P,)+ &mut Accumulator, &Input) -> Output,
899                    $($P: $P,)+
900                    acc: &mut Accumulator,
901                    input: &Input,
902                ) -> Output {
903                    f($($P,)+ acc, input)
904                }
905
906                // SAFETY: state was produced by Param::init() on the same Registry
907                // that built this World. Borrows are disjoint — enforced by
908                // conflict detection at build time.
909                #[cfg(debug_assertions)]
910                world.clear_borrows();
911                let ($($P,)+) = unsafe {
912                    <($($P,)+) as Param>::fetch(world, &mut self.state)
913                };
914                call_inner(&mut self.f, $($P,)+ acc, input)
915            }
916        }
917
918        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
919            IntoRefScanStep<Acc, In, Out, ($($P,)+)> for F
920        where
921            for<'a> &'a mut F:
922                FnMut($($P,)+ &mut Acc, &In) -> Out +
923                FnMut($($P::Item<'a>,)+ &mut Acc, &In) -> Out,
924        {
925            type Step = Step<F, ($($P,)+)>;
926
927            fn into_ref_scan_step(self, registry: &Registry) -> Self::Step {
928                let state = <($($P,)+) as Param>::init(registry);
929                {
930                    #[allow(non_snake_case)]
931                    let ($($P,)+) = &state;
932                    registry.check_access(&[
933                        $(
934                            (<$P as Param>::resource_id($P),
935                             std::any::type_name::<$P>()),
936                        )+
937                    ]);
938                }
939                Step { f: self, state, name: std::any::type_name::<F>() }
940            }
941        }
942    };
943}
944
945all_tuples!(impl_into_ref_scan_step);
946
947// -- Opaque: FnMut(&mut World, &mut Acc, &In) -> Out ------------------------
948
949/// Internal: wrapper for opaque closures used as ref-scan steps.
950#[doc(hidden)]
951pub struct OpaqueRefScanStep<F> {
952    f: F,
953    #[allow(dead_code)]
954    name: &'static str,
955}
956
957impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, &In) -> Out + 'static> RefScanStepCall<Acc, In>
958    for OpaqueRefScanStep<F>
959{
960    type Out = Out;
961    #[inline(always)]
962    fn call(&mut self, world: &mut World, acc: &mut Acc, input: &In) -> Out {
963        (self.f)(world, acc, input)
964    }
965}
966
967impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, &In) -> Out + 'static>
968    IntoRefScanStep<Acc, In, Out, Opaque> for F
969{
970    type Step = OpaqueRefScanStep<F>;
971
972    fn into_ref_scan_step(self, _registry: &Registry) -> Self::Step {
973        OpaqueRefScanStep {
974            f: self,
975            name: std::any::type_name::<F>(),
976        }
977    }
978}
979
980// =============================================================================
981// resolve_ref_scan_step — pre-resolve a ref-scan step for manual dispatch
982// =============================================================================
983
984/// Resolve a ref-scan step for manual dispatch.
985///
986/// Returns a closure with pre-resolved [`Param`] state. Reference-input
987/// counterpart of [`resolve_scan_step`].
988pub fn resolve_ref_scan_step<Acc, In, Out, Params, S: IntoRefScanStep<Acc, In, Out, Params>>(
989    f: S,
990    registry: &Registry,
991) -> impl FnMut(&mut World, &mut Acc, &In) -> Out + use<Acc, In, Out, Params, S> {
992    let mut resolved = f.into_ref_scan_step(registry);
993    move |world: &mut World, acc: &mut Acc, input: &In| resolved.call(world, acc, input)
994}
995
996// =============================================================================
997// SplatCall / IntoSplatStep — splat step dispatch (tuple destructuring)
998// =============================================================================
999//
1000// Splat traits mirror StepCall/IntoStep but accept multiple owned values
1001// instead of a single input. This lets `.splat()` destructure a tuple
1002// output into individual function arguments for the next step.
1003//
1004// One trait pair per arity (2-5). Past 5, use a named struct.
1005
1006// -- Splat 2 ------------------------------------------------------------------
1007
1008/// Internal: callable trait for resolved 2-splat steps.
1009#[doc(hidden)]
1010pub trait SplatCall2<A, B> {
1011    /// Output type of this splat step.
1012    type Out;
1013    fn call_splat(&mut self, world: &mut World, a: A, b: B) -> Self::Out;
1014}
1015
1016/// Converts a named function into a resolved 2-splat step.
1017#[doc(hidden)]
1018pub trait IntoSplatStep2<A, B, Out, Params> {
1019    type Step: SplatCall2<A, B, Out = Out>;
1020    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1021}
1022
1023impl<A, B, Out, F: FnMut(A, B) -> Out + 'static> SplatCall2<A, B> for Step<F, ()> {
1024    type Out = Out;
1025    #[inline(always)]
1026    fn call_splat(&mut self, _world: &mut World, a: A, b: B) -> Out {
1027        (self.f)(a, b)
1028    }
1029}
1030
1031impl<A, B, Out, F: FnMut(A, B) -> Out + 'static> IntoSplatStep2<A, B, Out, ()> for F {
1032    type Step = Step<F, ()>;
1033    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1034        Step {
1035            f: self,
1036            state: <() as Param>::init(registry),
1037            name: std::any::type_name::<F>(),
1038        }
1039    }
1040}
1041
1042macro_rules! impl_splat2_step {
1043    ($($P:ident),+) => {
1044        impl<A, B, Out, F: 'static, $($P: Param + 'static),+>
1045            SplatCall2<A, B> for Step<F, ($($P,)+)>
1046        where
1047            for<'a> &'a mut F:
1048                FnMut($($P,)+ A, B) -> Out +
1049                FnMut($($P::Item<'a>,)+ A, B) -> Out,
1050        {
1051            type Out = Out;
1052            #[inline(always)]
1053            #[allow(non_snake_case)]
1054            fn call_splat(&mut self, world: &mut World, a: A, b: B) -> Out {
1055                #[allow(clippy::too_many_arguments)]
1056                fn call_inner<$($P,)+ IA, IB, Output>(
1057                    mut f: impl FnMut($($P,)+ IA, IB) -> Output,
1058                    $($P: $P,)+
1059                    a: IA, b: IB,
1060                ) -> Output {
1061                    f($($P,)+ a, b)
1062                }
1063                // SAFETY: state was produced by Param::init() on the same Registry
1064                // that built this World. Borrows are disjoint — enforced by
1065                // conflict detection at build time.
1066                #[cfg(debug_assertions)]
1067                world.clear_borrows();
1068                let ($($P,)+) = unsafe {
1069                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1070                };
1071                call_inner(&mut self.f, $($P,)+ a, b)
1072            }
1073        }
1074
1075        impl<A, B, Out, F: 'static, $($P: Param + 'static),+>
1076            IntoSplatStep2<A, B, Out, ($($P,)+)> for F
1077        where
1078            for<'a> &'a mut F:
1079                FnMut($($P,)+ A, B) -> Out +
1080                FnMut($($P::Item<'a>,)+ A, B) -> Out,
1081        {
1082            type Step = Step<F, ($($P,)+)>;
1083
1084            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1085                let state = <($($P,)+) as Param>::init(registry);
1086                {
1087                    #[allow(non_snake_case)]
1088                    let ($($P,)+) = &state;
1089                    registry.check_access(&[
1090                        $(
1091                            (<$P as Param>::resource_id($P),
1092                             std::any::type_name::<$P>()),
1093                        )+
1094                    ]);
1095                }
1096                Step { f: self, state, name: std::any::type_name::<F>() }
1097            }
1098        }
1099    };
1100}
1101
1102// -- Splat 3 ------------------------------------------------------------------
1103
1104/// Internal: callable trait for resolved 3-splat steps.
1105#[doc(hidden)]
1106pub trait SplatCall3<A, B, C> {
1107    /// Output type of this splat step.
1108    type Out;
1109    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C) -> Self::Out;
1110}
1111
1112/// Converts a named function into a resolved 3-splat step.
1113#[doc(hidden)]
1114pub trait IntoSplatStep3<A, B, C, Out, Params> {
1115    type Step: SplatCall3<A, B, C, Out = Out>;
1116    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1117}
1118
1119impl<A, B, C, Out, F: FnMut(A, B, C) -> Out + 'static> SplatCall3<A, B, C> for Step<F, ()> {
1120    type Out = Out;
1121    #[inline(always)]
1122    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C) -> Out {
1123        (self.f)(a, b, c)
1124    }
1125}
1126
1127impl<A, B, C, Out, F: FnMut(A, B, C) -> Out + 'static> IntoSplatStep3<A, B, C, Out, ()> for F {
1128    type Step = Step<F, ()>;
1129    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1130        Step {
1131            f: self,
1132            state: <() as Param>::init(registry),
1133            name: std::any::type_name::<F>(),
1134        }
1135    }
1136}
1137
1138macro_rules! impl_splat3_step {
1139    ($($P:ident),+) => {
1140        impl<A, B, C, Out, F: 'static, $($P: Param + 'static),+>
1141            SplatCall3<A, B, C> for Step<F, ($($P,)+)>
1142        where
1143            for<'a> &'a mut F:
1144                FnMut($($P,)+ A, B, C) -> Out +
1145                FnMut($($P::Item<'a>,)+ A, B, C) -> Out,
1146        {
1147            type Out = Out;
1148            #[inline(always)]
1149            #[allow(non_snake_case)]
1150            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C) -> Out {
1151                #[allow(clippy::too_many_arguments)]
1152                fn call_inner<$($P,)+ IA, IB, IC, Output>(
1153                    mut f: impl FnMut($($P,)+ IA, IB, IC) -> Output,
1154                    $($P: $P,)+
1155                    a: IA, b: IB, c: IC,
1156                ) -> Output {
1157                    f($($P,)+ a, b, c)
1158                }
1159                // SAFETY: state was produced by Param::init() on the same Registry
1160                // that built this World. Borrows are disjoint — enforced by
1161                // conflict detection at build time.
1162                #[cfg(debug_assertions)]
1163                world.clear_borrows();
1164                let ($($P,)+) = unsafe {
1165                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1166                };
1167                call_inner(&mut self.f, $($P,)+ a, b, c)
1168            }
1169        }
1170
1171        impl<A, B, C, Out, F: 'static, $($P: Param + 'static),+>
1172            IntoSplatStep3<A, B, C, Out, ($($P,)+)> for F
1173        where
1174            for<'a> &'a mut F:
1175                FnMut($($P,)+ A, B, C) -> Out +
1176                FnMut($($P::Item<'a>,)+ A, B, C) -> Out,
1177        {
1178            type Step = Step<F, ($($P,)+)>;
1179
1180            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1181                let state = <($($P,)+) as Param>::init(registry);
1182                {
1183                    #[allow(non_snake_case)]
1184                    let ($($P,)+) = &state;
1185                    registry.check_access(&[
1186                        $(
1187                            (<$P as Param>::resource_id($P),
1188                             std::any::type_name::<$P>()),
1189                        )+
1190                    ]);
1191                }
1192                Step { f: self, state, name: std::any::type_name::<F>() }
1193            }
1194        }
1195    };
1196}
1197
1198// -- Splat 4 ------------------------------------------------------------------
1199
1200/// Internal: callable trait for resolved 4-splat steps.
1201#[doc(hidden)]
1202pub trait SplatCall4<A, B, C, D> {
1203    /// Output type of this splat step.
1204    type Out;
1205    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D) -> Self::Out;
1206}
1207
1208/// Converts a named function into a resolved 4-splat step.
1209#[doc(hidden)]
1210pub trait IntoSplatStep4<A, B, C, D, Out, Params> {
1211    type Step: SplatCall4<A, B, C, D, Out = Out>;
1212    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1213}
1214
1215impl<A, B, C, D, Out, F: FnMut(A, B, C, D) -> Out + 'static> SplatCall4<A, B, C, D>
1216    for Step<F, ()>
1217{
1218    type Out = Out;
1219    #[inline(always)]
1220    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C, d: D) -> Out {
1221        (self.f)(a, b, c, d)
1222    }
1223}
1224
1225impl<A, B, C, D, Out, F: FnMut(A, B, C, D) -> Out + 'static> IntoSplatStep4<A, B, C, D, Out, ()>
1226    for F
1227{
1228    type Step = Step<F, ()>;
1229    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1230        Step {
1231            f: self,
1232            state: <() as Param>::init(registry),
1233            name: std::any::type_name::<F>(),
1234        }
1235    }
1236}
1237
1238macro_rules! impl_splat4_step {
1239    ($($P:ident),+) => {
1240        impl<A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
1241            SplatCall4<A, B, C, D> for Step<F, ($($P,)+)>
1242        where for<'a> &'a mut F:
1243            FnMut($($P,)+ A, B, C, D) -> Out +
1244            FnMut($($P::Item<'a>,)+ A, B, C, D) -> Out,
1245        {
1246            type Out = Out;
1247            #[inline(always)]
1248            #[allow(non_snake_case)]
1249            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D) -> Out {
1250                #[allow(clippy::too_many_arguments)]
1251                fn call_inner<$($P,)+ IA, IB, IC, ID, Output>(
1252                    mut f: impl FnMut($($P,)+ IA, IB, IC, ID) -> Output,
1253                    $($P: $P,)+ a: IA, b: IB, c: IC, d: ID,
1254                ) -> Output { f($($P,)+ a, b, c, d) }
1255                // SAFETY: state was produced by Param::init() on the same Registry
1256                // that built this World. Borrows are disjoint — enforced by
1257                // conflict detection at build time.
1258                #[cfg(debug_assertions)]
1259                world.clear_borrows();
1260                let ($($P,)+) = unsafe {
1261                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1262                };
1263                call_inner(&mut self.f, $($P,)+ a, b, c, d)
1264            }
1265        }
1266        impl<A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
1267            IntoSplatStep4<A, B, C, D, Out, ($($P,)+)> for F
1268        where for<'a> &'a mut F:
1269            FnMut($($P,)+ A, B, C, D) -> Out +
1270            FnMut($($P::Item<'a>,)+ A, B, C, D) -> Out,
1271        {
1272            type Step = Step<F, ($($P,)+)>;
1273            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1274                let state = <($($P,)+) as Param>::init(registry);
1275                { #[allow(non_snake_case)] let ($($P,)+) = &state;
1276                  registry.check_access(&[$((<$P as Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
1277                Step { f: self, state, name: std::any::type_name::<F>() }
1278            }
1279        }
1280    };
1281}
1282
1283// -- Splat 5 ------------------------------------------------------------------
1284
1285/// Internal: callable trait for resolved 5-splat steps.
1286#[doc(hidden)]
1287pub trait SplatCall5<A, B, C, D, E> {
1288    /// Output type of this splat step.
1289    type Out;
1290    #[allow(clippy::many_single_char_names)]
1291    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Self::Out;
1292}
1293
1294/// Converts a named function into a resolved 5-splat step.
1295#[doc(hidden)]
1296pub trait IntoSplatStep5<A, B, C, D, E, Out, Params> {
1297    type Step: SplatCall5<A, B, C, D, E, Out = Out>;
1298    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1299}
1300
1301impl<A, B, C, D, E, Out, F: FnMut(A, B, C, D, E) -> Out + 'static> SplatCall5<A, B, C, D, E>
1302    for Step<F, ()>
1303{
1304    type Out = Out;
1305    #[inline(always)]
1306    #[allow(clippy::many_single_char_names)]
1307    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Out {
1308        (self.f)(a, b, c, d, e)
1309    }
1310}
1311
1312impl<A, B, C, D, E, Out, F: FnMut(A, B, C, D, E) -> Out + 'static>
1313    IntoSplatStep5<A, B, C, D, E, Out, ()> for F
1314{
1315    type Step = Step<F, ()>;
1316    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1317        Step {
1318            f: self,
1319            state: <() as Param>::init(registry),
1320            name: std::any::type_name::<F>(),
1321        }
1322    }
1323}
1324
1325macro_rules! impl_splat5_step {
1326    ($($P:ident),+) => {
1327        impl<A, B, C, D, E, Out, F: 'static, $($P: Param + 'static),+>
1328            SplatCall5<A, B, C, D, E> for Step<F, ($($P,)+)>
1329        where for<'a> &'a mut F:
1330            FnMut($($P,)+ A, B, C, D, E) -> Out +
1331            FnMut($($P::Item<'a>,)+ A, B, C, D, E) -> Out,
1332        {
1333            type Out = Out;
1334            #[inline(always)]
1335            #[allow(non_snake_case, clippy::many_single_char_names)]
1336            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Out {
1337                #[allow(clippy::too_many_arguments)]
1338                fn call_inner<$($P,)+ IA, IB, IC, ID, IE, Output>(
1339                    mut f: impl FnMut($($P,)+ IA, IB, IC, ID, IE) -> Output,
1340                    $($P: $P,)+ a: IA, b: IB, c: IC, d: ID, e: IE,
1341                ) -> Output { f($($P,)+ a, b, c, d, e) }
1342                // SAFETY: state was produced by Param::init() on the same Registry
1343                // that built this World. Borrows are disjoint — enforced by
1344                // conflict detection at build time.
1345                #[cfg(debug_assertions)]
1346                world.clear_borrows();
1347                let ($($P,)+) = unsafe {
1348                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1349                };
1350                call_inner(&mut self.f, $($P,)+ a, b, c, d, e)
1351            }
1352        }
1353        impl<A, B, C, D, E, Out, F: 'static, $($P: Param + 'static),+>
1354            IntoSplatStep5<A, B, C, D, E, Out, ($($P,)+)> for F
1355        where for<'a> &'a mut F:
1356            FnMut($($P,)+ A, B, C, D, E) -> Out +
1357            FnMut($($P::Item<'a>,)+ A, B, C, D, E) -> Out,
1358        {
1359            type Step = Step<F, ($($P,)+)>;
1360            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1361                let state = <($($P,)+) as Param>::init(registry);
1362                { #[allow(non_snake_case)] let ($($P,)+) = &state;
1363                  registry.check_access(&[$((<$P as Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
1364                Step { f: self, state, name: std::any::type_name::<F>() }
1365            }
1366        }
1367    };
1368}
1369
1370all_tuples!(impl_splat2_step);
1371all_tuples!(impl_splat3_step);
1372all_tuples!(impl_splat4_step);
1373all_tuples!(impl_splat5_step);
1374
1375// =============================================================================
1376// ChainCall — named chain dispatch trait
1377// =============================================================================
1378
1379/// Trait for pipeline chain nodes. Each node transforms input through
1380/// the chain, producing an output. `In` appears only on the trait impl,
1381/// not on the implementing struct — this preserves HRTB compatibility
1382/// so `Pipeline<Chain>` can satisfy `for<'a> Handler<&'a T>`.
1383#[doc(hidden)]
1384pub trait ChainCall<In> {
1385    /// The output type of this chain node.
1386    type Out;
1387    /// Execute the chain on the given input.
1388    fn call(&mut self, world: &mut World, input: In) -> Self::Out;
1389}
1390
1391// =============================================================================
1392// Chain nodes — named types for HRTB-compatible pipeline composition
1393// =============================================================================
1394//
1395// Each combinator gets a named struct following the iterator adapter pattern
1396// (like `Map<I, F>`, `Filter<I, P>`). `In` appears only on the `ChainCall<In>`
1397// trait impl, never on the struct — this is what enables HRTB boxing.
1398
1399/// Identity passthrough node. Used as the initial chain element.
1400#[doc(hidden)]
1401pub struct IdentityNode;
1402
1403impl<In> ChainCall<In> for IdentityNode {
1404    type Out = In;
1405    #[inline(always)]
1406    fn call(&mut self, _world: &mut World, input: In) -> In {
1407        input
1408    }
1409}
1410
1411// -- Core (any Out) ----------------------------------------------------------
1412
1413/// Chain node for `.then()` — transforms output via a resolved step.
1414#[doc(hidden)]
1415pub struct ThenNode<Prev, S> {
1416    pub(crate) prev: Prev,
1417    pub(crate) step: S,
1418}
1419
1420impl<In, Prev, S> ChainCall<In> for ThenNode<Prev, S>
1421where
1422    Prev: ChainCall<In>,
1423    S: StepCall<Prev::Out>,
1424{
1425    type Out = S::Out;
1426    fn call(&mut self, world: &mut World, input: In) -> S::Out {
1427        let mid = self.prev.call(world, input);
1428        self.step.call(world, mid)
1429    }
1430}
1431
1432/// Chain node for `.tap()` — side effect via `&Out`, value passes through.
1433#[doc(hidden)]
1434pub struct TapNode<Prev, S> {
1435    pub(crate) prev: Prev,
1436    pub(crate) step: S,
1437}
1438
1439impl<In, Prev, S> ChainCall<In> for TapNode<Prev, S>
1440where
1441    Prev: ChainCall<In>,
1442    S: RefStepCall<Prev::Out, Out = ()>,
1443{
1444    type Out = Prev::Out;
1445    fn call(&mut self, world: &mut World, input: In) -> Prev::Out {
1446        let val = self.prev.call(world, input);
1447        self.step.call(world, &val);
1448        val
1449    }
1450}
1451
1452/// Chain node for `.guard()` — wraps output in `Option` based on predicate.
1453#[doc(hidden)]
1454pub struct GuardNode<Prev, S> {
1455    pub(crate) prev: Prev,
1456    pub(crate) step: S,
1457}
1458
1459impl<In, Prev, S> ChainCall<In> for GuardNode<Prev, S>
1460where
1461    Prev: ChainCall<In>,
1462    S: RefStepCall<Prev::Out, Out = bool>,
1463{
1464    type Out = Option<Prev::Out>;
1465    fn call(&mut self, world: &mut World, input: In) -> Option<Prev::Out> {
1466        let val = self.prev.call(world, input);
1467        if self.step.call(world, &val) {
1468            Some(val)
1469        } else {
1470            None
1471        }
1472    }
1473}
1474
1475/// Chain node for `.dedup()` — suppresses consecutive unchanged values.
1476#[doc(hidden)]
1477pub struct DedupNode<Prev, T> {
1478    pub(crate) prev: Prev,
1479    pub(crate) last: Option<T>,
1480}
1481
1482impl<In, T: PartialEq + Clone, Prev: ChainCall<In, Out = T>> ChainCall<In> for DedupNode<Prev, T> {
1483    type Out = Option<T>;
1484    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1485        let val = self.prev.call(world, input);
1486        if self.last.as_ref() == Some(&val) {
1487            None
1488        } else {
1489            self.last = Some(val.clone());
1490            Some(val)
1491        }
1492    }
1493}
1494
1495/// Chain node for `.scan()` — transforms with persistent accumulator.
1496#[doc(hidden)]
1497pub struct ScanNode<Prev, S, Acc> {
1498    pub(crate) prev: Prev,
1499    pub(crate) step: S,
1500    pub(crate) acc: Acc,
1501}
1502
1503impl<In, Prev, S, Acc> ChainCall<In> for ScanNode<Prev, S, Acc>
1504where
1505    Prev: ChainCall<In>,
1506    S: ScanStepCall<Acc, Prev::Out>,
1507{
1508    type Out = S::Out;
1509    fn call(&mut self, world: &mut World, input: In) -> S::Out {
1510        let val = self.prev.call(world, input);
1511        self.step.call(world, &mut self.acc, val)
1512    }
1513}
1514
1515/// Chain node for `.dispatch()` — feeds output to a [`Handler`].
1516#[doc(hidden)]
1517pub struct DispatchNode<Prev, H> {
1518    pub(crate) prev: Prev,
1519    pub(crate) handler: H,
1520}
1521
1522impl<In, Prev, H> ChainCall<In> for DispatchNode<Prev, H>
1523where
1524    Prev: ChainCall<In>,
1525    H: Handler<Prev::Out>,
1526{
1527    type Out = ();
1528    fn call(&mut self, world: &mut World, input: In) {
1529        let out = self.prev.call(world, input);
1530        self.handler.run(world, out);
1531    }
1532}
1533
1534/// Chain node for `.tee()` — runs side-effect chain on `&Out`, passes value through.
1535#[doc(hidden)]
1536pub struct TeeNode<Prev, C> {
1537    pub(crate) prev: Prev,
1538    pub(crate) side: C,
1539}
1540
1541impl<In, Prev, C> ChainCall<In> for TeeNode<Prev, C>
1542where
1543    Prev: ChainCall<In>,
1544    Prev::Out: 'static,
1545    C: for<'a> ChainCall<&'a Prev::Out, Out = ()>,
1546{
1547    type Out = Prev::Out;
1548    fn call(&mut self, world: &mut World, input: In) -> Prev::Out {
1549        let val = self.prev.call(world, input);
1550        self.side.call(world, &val);
1551        val
1552    }
1553}
1554
1555/// Chain node for `.route()` — binary conditional dispatch.
1556#[doc(hidden)]
1557pub struct RouteNode<Prev, P, C0, C1> {
1558    pub(crate) prev: Prev,
1559    pub(crate) pred: P,
1560    pub(crate) on_true: C0,
1561    pub(crate) on_false: C1,
1562}
1563
1564impl<In, Prev, P, C0, C1> ChainCall<In> for RouteNode<Prev, P, C0, C1>
1565where
1566    Prev: ChainCall<In>,
1567    P: RefStepCall<Prev::Out, Out = bool>,
1568    C0: ChainCall<Prev::Out>,
1569    C1: ChainCall<Prev::Out, Out = C0::Out>,
1570{
1571    type Out = C0::Out;
1572    fn call(&mut self, world: &mut World, input: In) -> C0::Out {
1573        let val = self.prev.call(world, input);
1574        if self.pred.call(world, &val) {
1575            self.on_true.call(world, val)
1576        } else {
1577            self.on_false.call(world, val)
1578        }
1579    }
1580}
1581
1582// -- Option<T> nodes ---------------------------------------------------------
1583
1584/// Chain node for `.map()` on `Option<T>`.
1585#[doc(hidden)]
1586pub struct MapOptionNode<Prev, S> {
1587    pub(crate) prev: Prev,
1588    pub(crate) step: S,
1589}
1590
1591impl<In, T, Prev, S> ChainCall<In> for MapOptionNode<Prev, S>
1592where
1593    Prev: ChainCall<In, Out = Option<T>>,
1594    S: StepCall<T>,
1595{
1596    type Out = Option<S::Out>;
1597    fn call(&mut self, world: &mut World, input: In) -> Option<S::Out> {
1598        self.prev
1599            .call(world, input)
1600            .map(|val| self.step.call(world, val))
1601    }
1602}
1603
1604/// Chain node for `.filter()` on `Option<T>`.
1605#[doc(hidden)]
1606pub struct FilterNode<Prev, S> {
1607    pub(crate) prev: Prev,
1608    pub(crate) step: S,
1609}
1610
1611impl<In, T, Prev, S> ChainCall<In> for FilterNode<Prev, S>
1612where
1613    Prev: ChainCall<In, Out = Option<T>>,
1614    S: RefStepCall<T, Out = bool>,
1615{
1616    type Out = Option<T>;
1617    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1618        self.prev
1619            .call(world, input)
1620            .filter(|val| self.step.call(world, val))
1621    }
1622}
1623
1624/// Chain node for `.inspect()` on `Option<T>`.
1625#[doc(hidden)]
1626pub struct InspectOptionNode<Prev, S> {
1627    pub(crate) prev: Prev,
1628    pub(crate) step: S,
1629}
1630
1631impl<In, T, Prev, S> ChainCall<In> for InspectOptionNode<Prev, S>
1632where
1633    Prev: ChainCall<In, Out = Option<T>>,
1634    S: RefStepCall<T, Out = ()>,
1635{
1636    type Out = Option<T>;
1637    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1638        self.prev
1639            .call(world, input)
1640            .inspect(|val| self.step.call(world, val))
1641    }
1642}
1643
1644/// Chain node for `.and_then()` on `Option<T>`.
1645#[doc(hidden)]
1646pub struct AndThenNode<Prev, S> {
1647    pub(crate) prev: Prev,
1648    pub(crate) step: S,
1649}
1650
1651impl<In, T, U, Prev, S> ChainCall<In> for AndThenNode<Prev, S>
1652where
1653    Prev: ChainCall<In, Out = Option<T>>,
1654    S: StepCall<T, Out = Option<U>>,
1655{
1656    type Out = Option<U>;
1657    fn call(&mut self, world: &mut World, input: In) -> Option<U> {
1658        self.prev
1659            .call(world, input)
1660            .and_then(|val| self.step.call(world, val))
1661    }
1662}
1663
1664/// Chain node for `.on_none()` — side effect when `None`.
1665#[doc(hidden)]
1666pub struct OnNoneNode<Prev, P> {
1667    pub(crate) prev: Prev,
1668    pub(crate) producer: P,
1669}
1670
1671impl<In, T, Prev, P> ChainCall<In> for OnNoneNode<Prev, P>
1672where
1673    Prev: ChainCall<In, Out = Option<T>>,
1674    P: ProducerCall<Out = ()>,
1675{
1676    type Out = Option<T>;
1677    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1678        let result = self.prev.call(world, input);
1679        if result.is_none() {
1680            self.producer.call(world);
1681        }
1682        result
1683    }
1684}
1685
1686/// Chain node for `.ok_or()` — `Option<T>` → `Result<T, E>`.
1687#[doc(hidden)]
1688pub struct OkOrNode<Prev, E> {
1689    pub(crate) prev: Prev,
1690    pub(crate) err: E,
1691}
1692
1693impl<In, T, E: Clone, Prev: ChainCall<In, Out = Option<T>>> ChainCall<In> for OkOrNode<Prev, E> {
1694    type Out = Result<T, E>;
1695    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
1696        self.prev.call(world, input).ok_or_else(|| self.err.clone())
1697    }
1698}
1699
1700/// Chain node for `.ok_or_else()` — `Option<T>` → `Result<T, E>`.
1701#[doc(hidden)]
1702pub struct OkOrElseNode<Prev, P> {
1703    pub(crate) prev: Prev,
1704    pub(crate) producer: P,
1705}
1706
1707impl<In, T, E, Prev, P> ChainCall<In> for OkOrElseNode<Prev, P>
1708where
1709    Prev: ChainCall<In, Out = Option<T>>,
1710    P: ProducerCall<Out = E>,
1711{
1712    type Out = Result<T, E>;
1713    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
1714        self.prev
1715            .call(world, input)
1716            .ok_or_else(|| self.producer.call(world))
1717    }
1718}
1719
1720/// Chain node for `.unwrap_or()` on `Option<T>`.
1721#[doc(hidden)]
1722pub struct UnwrapOrOptionNode<Prev, T> {
1723    pub(crate) prev: Prev,
1724    pub(crate) default: T,
1725}
1726
1727impl<In, T: Clone, Prev: ChainCall<In, Out = Option<T>>> ChainCall<In>
1728    for UnwrapOrOptionNode<Prev, T>
1729{
1730    type Out = T;
1731    fn call(&mut self, world: &mut World, input: In) -> T {
1732        self.prev
1733            .call(world, input)
1734            .unwrap_or_else(|| self.default.clone())
1735    }
1736}
1737
1738/// Chain node for `.unwrap_or_else()` on `Option<T>`.
1739#[doc(hidden)]
1740pub struct UnwrapOrElseOptionNode<Prev, P> {
1741    pub(crate) prev: Prev,
1742    pub(crate) producer: P,
1743}
1744
1745impl<In, T, Prev, P> ChainCall<In> for UnwrapOrElseOptionNode<Prev, P>
1746where
1747    Prev: ChainCall<In, Out = Option<T>>,
1748    P: ProducerCall<Out = T>,
1749{
1750    type Out = T;
1751    fn call(&mut self, world: &mut World, input: In) -> T {
1752        self.prev
1753            .call(world, input)
1754            .unwrap_or_else(|| self.producer.call(world))
1755    }
1756}
1757
1758// -- Result<T, E> nodes ------------------------------------------------------
1759
1760/// Chain node for `.map()` on `Result<T, E>`.
1761#[doc(hidden)]
1762pub struct MapResultNode<Prev, S> {
1763    pub(crate) prev: Prev,
1764    pub(crate) step: S,
1765}
1766
1767impl<In, T, E, Prev, S> ChainCall<In> for MapResultNode<Prev, S>
1768where
1769    Prev: ChainCall<In, Out = Result<T, E>>,
1770    S: StepCall<T>,
1771{
1772    type Out = Result<S::Out, E>;
1773    fn call(&mut self, world: &mut World, input: In) -> Result<S::Out, E> {
1774        self.prev
1775            .call(world, input)
1776            .map(|val| self.step.call(world, val))
1777    }
1778}
1779
1780/// Chain node for `.and_then()` on `Result<T, E>`.
1781#[doc(hidden)]
1782pub struct AndThenResultNode<Prev, S> {
1783    pub(crate) prev: Prev,
1784    pub(crate) step: S,
1785}
1786
1787impl<In, T, U, E, Prev, S> ChainCall<In> for AndThenResultNode<Prev, S>
1788where
1789    Prev: ChainCall<In, Out = Result<T, E>>,
1790    S: StepCall<T, Out = Result<U, E>>,
1791{
1792    type Out = Result<U, E>;
1793    fn call(&mut self, world: &mut World, input: In) -> Result<U, E> {
1794        self.prev
1795            .call(world, input)
1796            .and_then(|val| self.step.call(world, val))
1797    }
1798}
1799
1800/// Chain node for `.catch()` — `Result<T, E>` → `Option<T>`.
1801#[doc(hidden)]
1802pub struct CatchNode<Prev, S> {
1803    pub(crate) prev: Prev,
1804    pub(crate) step: S,
1805}
1806
1807impl<In, T, E, Prev, S> ChainCall<In> for CatchNode<Prev, S>
1808where
1809    Prev: ChainCall<In, Out = Result<T, E>>,
1810    S: StepCall<E, Out = ()>,
1811{
1812    type Out = Option<T>;
1813    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1814        match self.prev.call(world, input) {
1815            Ok(val) => Some(val),
1816            Err(err) => {
1817                self.step.call(world, err);
1818                None
1819            }
1820        }
1821    }
1822}
1823
1824/// Chain node for `.map_err()`.
1825#[doc(hidden)]
1826pub struct MapErrNode<Prev, S> {
1827    pub(crate) prev: Prev,
1828    pub(crate) step: S,
1829}
1830
1831impl<In, T, E, Prev, S> ChainCall<In> for MapErrNode<Prev, S>
1832where
1833    Prev: ChainCall<In, Out = Result<T, E>>,
1834    S: StepCall<E>,
1835{
1836    type Out = Result<T, S::Out>;
1837    fn call(&mut self, world: &mut World, input: In) -> Result<T, S::Out> {
1838        self.prev
1839            .call(world, input)
1840            .map_err(|err| self.step.call(world, err))
1841    }
1842}
1843
1844/// Chain node for `.or_else()`.
1845#[doc(hidden)]
1846pub struct OrElseNode<Prev, S> {
1847    pub(crate) prev: Prev,
1848    pub(crate) step: S,
1849}
1850
1851impl<In, T, E, E2, Prev, S> ChainCall<In> for OrElseNode<Prev, S>
1852where
1853    Prev: ChainCall<In, Out = Result<T, E>>,
1854    S: StepCall<E, Out = Result<T, E2>>,
1855{
1856    type Out = Result<T, E2>;
1857    fn call(&mut self, world: &mut World, input: In) -> Result<T, E2> {
1858        self.prev
1859            .call(world, input)
1860            .or_else(|err| self.step.call(world, err))
1861    }
1862}
1863
1864/// Chain node for `.inspect()` on `Result<T, E>`.
1865#[doc(hidden)]
1866pub struct InspectResultNode<Prev, S> {
1867    pub(crate) prev: Prev,
1868    pub(crate) step: S,
1869}
1870
1871impl<In, T, E, Prev, S> ChainCall<In> for InspectResultNode<Prev, S>
1872where
1873    Prev: ChainCall<In, Out = Result<T, E>>,
1874    S: RefStepCall<T, Out = ()>,
1875{
1876    type Out = Result<T, E>;
1877    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
1878        self.prev
1879            .call(world, input)
1880            .inspect(|val| self.step.call(world, val))
1881    }
1882}
1883
1884/// Chain node for `.inspect_err()`.
1885#[doc(hidden)]
1886pub struct InspectErrNode<Prev, S> {
1887    pub(crate) prev: Prev,
1888    pub(crate) step: S,
1889}
1890
1891impl<In, T, E, Prev, S> ChainCall<In> for InspectErrNode<Prev, S>
1892where
1893    Prev: ChainCall<In, Out = Result<T, E>>,
1894    S: RefStepCall<E, Out = ()>,
1895{
1896    type Out = Result<T, E>;
1897    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
1898        self.prev
1899            .call(world, input)
1900            .inspect_err(|err| self.step.call(world, err))
1901    }
1902}
1903
1904/// Chain node for `.ok()` — `Result<T, E>` → `Option<T>`.
1905#[doc(hidden)]
1906pub struct OkResultNode<Prev> {
1907    pub(crate) prev: Prev,
1908}
1909
1910impl<In, T, E, Prev: ChainCall<In, Out = Result<T, E>>> ChainCall<In> for OkResultNode<Prev> {
1911    type Out = Option<T>;
1912    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1913        self.prev.call(world, input).ok()
1914    }
1915}
1916
1917/// Chain node for `.unwrap_or()` on `Result<T, E>`.
1918#[doc(hidden)]
1919pub struct UnwrapOrResultNode<Prev, T> {
1920    pub(crate) prev: Prev,
1921    pub(crate) default: T,
1922}
1923
1924impl<In, T: Clone, E, Prev: ChainCall<In, Out = Result<T, E>>> ChainCall<In>
1925    for UnwrapOrResultNode<Prev, T>
1926{
1927    type Out = T;
1928    fn call(&mut self, world: &mut World, input: In) -> T {
1929        self.prev
1930            .call(world, input)
1931            .unwrap_or_else(|_| self.default.clone())
1932    }
1933}
1934
1935/// Chain node for `.unwrap_or_else()` on `Result<T, E>`.
1936#[doc(hidden)]
1937pub struct UnwrapOrElseResultNode<Prev, S> {
1938    pub(crate) prev: Prev,
1939    pub(crate) step: S,
1940}
1941
1942impl<In, T, E, Prev, S> ChainCall<In> for UnwrapOrElseResultNode<Prev, S>
1943where
1944    Prev: ChainCall<In, Out = Result<T, E>>,
1945    S: StepCall<E, Out = T>,
1946{
1947    type Out = T;
1948    fn call(&mut self, world: &mut World, input: In) -> T {
1949        match self.prev.call(world, input) {
1950            Ok(val) => val,
1951            Err(err) => self.step.call(world, err),
1952        }
1953    }
1954}
1955
1956// -- Bool nodes --------------------------------------------------------------
1957
1958/// Chain node for `.not()`.
1959#[doc(hidden)]
1960pub struct NotNode<Prev> {
1961    pub(crate) prev: Prev,
1962}
1963
1964impl<In, Prev: ChainCall<In, Out = bool>> ChainCall<In> for NotNode<Prev> {
1965    type Out = bool;
1966    fn call(&mut self, world: &mut World, input: In) -> bool {
1967        !self.prev.call(world, input)
1968    }
1969}
1970
1971/// Chain node for `.and()` on bool.
1972#[doc(hidden)]
1973pub struct AndBoolNode<Prev, P> {
1974    pub(crate) prev: Prev,
1975    pub(crate) producer: P,
1976}
1977
1978impl<In, Prev, P> ChainCall<In> for AndBoolNode<Prev, P>
1979where
1980    Prev: ChainCall<In, Out = bool>,
1981    P: ProducerCall<Out = bool>,
1982{
1983    type Out = bool;
1984    fn call(&mut self, world: &mut World, input: In) -> bool {
1985        self.prev.call(world, input) && self.producer.call(world)
1986    }
1987}
1988
1989/// Chain node for `.or()` on bool.
1990#[doc(hidden)]
1991pub struct OrBoolNode<Prev, P> {
1992    pub(crate) prev: Prev,
1993    pub(crate) producer: P,
1994}
1995
1996impl<In, Prev, P> ChainCall<In> for OrBoolNode<Prev, P>
1997where
1998    Prev: ChainCall<In, Out = bool>,
1999    P: ProducerCall<Out = bool>,
2000{
2001    type Out = bool;
2002    fn call(&mut self, world: &mut World, input: In) -> bool {
2003        self.prev.call(world, input) || self.producer.call(world)
2004    }
2005}
2006
2007/// Chain node for `.xor()` on bool.
2008#[doc(hidden)]
2009pub struct XorBoolNode<Prev, P> {
2010    pub(crate) prev: Prev,
2011    pub(crate) producer: P,
2012}
2013
2014impl<In, Prev, P> ChainCall<In> for XorBoolNode<Prev, P>
2015where
2016    Prev: ChainCall<In, Out = bool>,
2017    P: ProducerCall<Out = bool>,
2018{
2019    type Out = bool;
2020    fn call(&mut self, world: &mut World, input: In) -> bool {
2021        self.prev.call(world, input) ^ self.producer.call(world)
2022    }
2023}
2024
2025// -- Cloned nodes ------------------------------------------------------------
2026
2027/// Chain node for `.cloned()` on `&T`.
2028#[doc(hidden)]
2029pub struct ClonedNode<Prev> {
2030    pub(crate) prev: Prev,
2031}
2032
2033impl<'a, In, T: Clone + 'a, Prev: ChainCall<In, Out = &'a T>> ChainCall<In> for ClonedNode<Prev> {
2034    type Out = T;
2035    fn call(&mut self, world: &mut World, input: In) -> T {
2036        T::clone(self.prev.call(world, input))
2037    }
2038}
2039
2040/// Chain node for `.cloned()` on `Option<&T>`.
2041#[doc(hidden)]
2042pub struct ClonedOptionNode<Prev> {
2043    pub(crate) prev: Prev,
2044}
2045
2046impl<'a, In, T: Clone + 'a, Prev: ChainCall<In, Out = Option<&'a T>>> ChainCall<In>
2047    for ClonedOptionNode<Prev>
2048{
2049    type Out = Option<T>;
2050    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
2051        self.prev.call(world, input).cloned()
2052    }
2053}
2054
2055/// Chain node for `.cloned()` on `Result<&T, E>`.
2056#[doc(hidden)]
2057pub struct ClonedResultNode<Prev> {
2058    pub(crate) prev: Prev,
2059}
2060
2061impl<'a, In, T: Clone + 'a, E, Prev: ChainCall<In, Out = Result<&'a T, E>>> ChainCall<In>
2062    for ClonedResultNode<Prev>
2063{
2064    type Out = Result<T, E>;
2065    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
2066        self.prev.call(world, input).cloned()
2067    }
2068}
2069
2070// -- DAG-specific nodes (borrow intermediate) --------------------------------
2071//
2072// DAG steps borrow `&Prev::Out` instead of consuming by value. These nodes
2073// parallel the pipeline nodes above but use `for<'a> StepCall<&'a T>` bounds
2074// and HRTB arm chains (`for<'a> ChainCall<&'a T>`).
2075
2076/// Chain node for DAG `.then()` — borrows intermediate output for next step.
2077///
2078/// Unlike pipeline's [`ThenNode`] which passes by value, this borrows `&Prev::Out`
2079/// for the step. Used for DAG chains where intermediates are owned and borrowed.
2080#[doc(hidden)]
2081pub struct DagThenNode<Prev, S, NewOut> {
2082    pub(crate) prev: Prev,
2083    pub(crate) step: S,
2084    pub(crate) _out: PhantomData<fn() -> NewOut>,
2085}
2086
2087impl<In, Prev, S, NewOut: 'static> ChainCall<In> for DagThenNode<Prev, S, NewOut>
2088where
2089    Prev: ChainCall<In>,
2090    Prev::Out: 'static,
2091    S: for<'a> StepCall<&'a Prev::Out, Out = NewOut>,
2092{
2093    type Out = NewOut;
2094    fn call(&mut self, world: &mut World, input: In) -> NewOut {
2095        let out = self.prev.call(world, input);
2096        self.step.call(world, &out)
2097    }
2098}
2099
2100/// Chain node for DAG `.scan()` — scan with borrowed input.
2101///
2102/// Like [`ScanNode`] but the step receives `&Prev::Out` via [`RefScanStepCall`].
2103#[doc(hidden)]
2104pub struct RefScanNode<Prev, S, Acc> {
2105    pub(crate) prev: Prev,
2106    pub(crate) step: S,
2107    pub(crate) acc: Acc,
2108}
2109
2110impl<In, Prev, S, Acc> ChainCall<In> for RefScanNode<Prev, S, Acc>
2111where
2112    Prev: ChainCall<In>,
2113    S: RefScanStepCall<Acc, Prev::Out>,
2114{
2115    type Out = S::Out;
2116    fn call(&mut self, world: &mut World, input: In) -> S::Out {
2117        let val = self.prev.call(world, input);
2118        self.step.call(world, &mut self.acc, &val)
2119    }
2120}
2121
2122/// Chain node for DAG `.route()` — arms borrow `&Prev::Out` (HRTB).
2123///
2124/// Unlike pipeline's [`RouteNode`] which passes by value, this borrows
2125/// the value for the predicate and arms. Arms satisfy `for<'a> ChainCall<&'a Out>`.
2126#[doc(hidden)]
2127pub struct DagRouteNode<Prev, P, C0, C1, NewOut> {
2128    pub(crate) prev: Prev,
2129    pub(crate) pred: P,
2130    pub(crate) on_true: C0,
2131    pub(crate) on_false: C1,
2132    pub(crate) _out: PhantomData<fn() -> NewOut>,
2133}
2134
2135impl<In, Prev, P, C0, C1, NewOut> ChainCall<In> for DagRouteNode<Prev, P, C0, C1, NewOut>
2136where
2137    Prev: ChainCall<In>,
2138    Prev::Out: 'static,
2139    P: RefStepCall<Prev::Out, Out = bool>,
2140    C0: for<'a> ChainCall<&'a Prev::Out, Out = NewOut>,
2141    C1: for<'a> ChainCall<&'a Prev::Out, Out = NewOut>,
2142{
2143    type Out = NewOut;
2144    fn call(&mut self, world: &mut World, input: In) -> NewOut {
2145        let val = self.prev.call(world, input);
2146        if self.pred.call(world, &val) {
2147            self.on_true.call(world, &val)
2148        } else {
2149            self.on_false.call(world, &val)
2150        }
2151    }
2152}
2153
2154/// Chain node for DAG `.map()` on `Option<T>` — step borrows `&T`.
2155///
2156/// Like [`MapOptionNode`] but the step receives `&T` instead of `T` by value.
2157#[doc(hidden)]
2158pub struct DagMapOptionNode<Prev, S, U> {
2159    pub(crate) prev: Prev,
2160    pub(crate) step: S,
2161    pub(crate) _out: PhantomData<fn() -> U>,
2162}
2163
2164impl<In, T: 'static, U: 'static, Prev, S> ChainCall<In> for DagMapOptionNode<Prev, S, U>
2165where
2166    Prev: ChainCall<In, Out = Option<T>>,
2167    S: for<'a> StepCall<&'a T, Out = U>,
2168{
2169    type Out = Option<U>;
2170    fn call(&mut self, world: &mut World, input: In) -> Option<U> {
2171        self.prev
2172            .call(world, input)
2173            .map(|ref val| self.step.call(world, val))
2174    }
2175}
2176
2177/// Chain node for DAG `.and_then()` on `Option<T>` — step borrows `&T`.
2178#[doc(hidden)]
2179pub struct DagAndThenOptionNode<Prev, S, U> {
2180    pub(crate) prev: Prev,
2181    pub(crate) step: S,
2182    pub(crate) _out: PhantomData<fn() -> U>,
2183}
2184
2185impl<In, T: 'static, U: 'static, Prev, S> ChainCall<In> for DagAndThenOptionNode<Prev, S, U>
2186where
2187    Prev: ChainCall<In, Out = Option<T>>,
2188    S: for<'a> StepCall<&'a T, Out = Option<U>>,
2189{
2190    type Out = Option<U>;
2191    fn call(&mut self, world: &mut World, input: In) -> Option<U> {
2192        self.prev
2193            .call(world, input)
2194            .and_then(|ref val| self.step.call(world, val))
2195    }
2196}
2197
2198/// Chain node for DAG `.map()` on `Result<T, E>` — step borrows `&T`.
2199#[doc(hidden)]
2200pub struct DagMapResultNode<Prev, S, U> {
2201    pub(crate) prev: Prev,
2202    pub(crate) step: S,
2203    pub(crate) _out: PhantomData<fn() -> U>,
2204}
2205
2206impl<In, T: 'static, E, U: 'static, Prev, S> ChainCall<In> for DagMapResultNode<Prev, S, U>
2207where
2208    Prev: ChainCall<In, Out = Result<T, E>>,
2209    S: for<'a> StepCall<&'a T, Out = U>,
2210{
2211    type Out = Result<U, E>;
2212    fn call(&mut self, world: &mut World, input: In) -> Result<U, E> {
2213        self.prev
2214            .call(world, input)
2215            .map(|ref val| self.step.call(world, val))
2216    }
2217}
2218
2219/// Chain node for DAG `.and_then()` on `Result<T, E>` — step borrows `&T`.
2220#[doc(hidden)]
2221pub struct DagAndThenResultNode<Prev, S, U> {
2222    pub(crate) prev: Prev,
2223    pub(crate) step: S,
2224    pub(crate) _out: PhantomData<fn() -> U>,
2225}
2226
2227impl<In, T: 'static, E, U: 'static, Prev, S> ChainCall<In> for DagAndThenResultNode<Prev, S, U>
2228where
2229    Prev: ChainCall<In, Out = Result<T, E>>,
2230    S: for<'a> StepCall<&'a T, Out = Result<U, E>>,
2231{
2232    type Out = Result<U, E>;
2233    fn call(&mut self, world: &mut World, input: In) -> Result<U, E> {
2234        self.prev
2235            .call(world, input)
2236            .and_then(|ref val| self.step.call(world, val))
2237    }
2238}
2239
2240/// Chain node for DAG `.catch()` on `Result<T, E>` — error handler borrows `&E`.
2241///
2242/// Like [`CatchNode`] but the step receives `&E` instead of consuming `E`.
2243#[doc(hidden)]
2244pub struct DagCatchNode<Prev, S> {
2245    pub(crate) prev: Prev,
2246    pub(crate) step: S,
2247}
2248
2249impl<In, T, E: 'static, Prev, S> ChainCall<In> for DagCatchNode<Prev, S>
2250where
2251    Prev: ChainCall<In, Out = Result<T, E>>,
2252    S: for<'a> StepCall<&'a E, Out = ()>,
2253{
2254    type Out = Option<T>;
2255    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
2256        match self.prev.call(world, input) {
2257            Ok(val) => Some(val),
2258            Err(ref err) => {
2259                self.step.call(world, err);
2260                None
2261            }
2262        }
2263    }
2264}
2265
2266// -- Terminal nodes ----------------------------------------------------------
2267
2268/// Chain node for `build()` on `Option<()>` — discards the option wrapper.
2269#[doc(hidden)]
2270pub struct DiscardOptionNode<Prev> {
2271    pub(crate) prev: Prev,
2272}
2273
2274impl<In, Prev: ChainCall<In, Out = Option<()>>> ChainCall<In> for DiscardOptionNode<Prev> {
2275    type Out = ();
2276    fn call(&mut self, world: &mut World, input: In) {
2277        let _ = self.prev.call(world, input);
2278    }
2279}
2280
2281// =============================================================================
2282// PipelineBuilder — entry point
2283// =============================================================================
2284
2285/// Entry point for building a pre-resolved step pipeline.
2286///
2287/// `In` is the pipeline input type. Call [`.then()`](Self::then) to add
2288/// the first step — a named function whose [`Param`] dependencies
2289/// are resolved from the registry at build time.
2290///
2291/// # Examples
2292///
2293/// ```
2294/// use nexus_rt::{WorldBuilder, Res, ResMut, PipelineBuilder, Handler, Resource};
2295///
2296/// #[derive(Resource)]
2297/// struct Factor(u64);
2298/// #[derive(Resource)]
2299/// struct Output(String);
2300///
2301/// let mut wb = WorldBuilder::new();
2302/// wb.register(Factor(10));
2303/// wb.register(Output(String::new()));
2304/// let mut world = wb.build();
2305///
2306/// fn double(factor: Res<Factor>, x: u32) -> u64 {
2307///     factor.0 * x as u64
2308/// }
2309/// fn store(mut out: ResMut<Output>, val: u64) {
2310///     out.0 = val.to_string();
2311/// }
2312///
2313/// let r = world.registry();
2314/// let mut pipeline = PipelineBuilder::<u32>::new()
2315///     .then(double, r)
2316///     .then(store, r)
2317///     .build();
2318///
2319/// pipeline.run(&mut world, 5);
2320/// assert_eq!(world.resource::<Output>().0.as_str(), "50");
2321/// ```
2322#[must_use = "a pipeline builder does nothing unless you chain steps and call .build()"]
2323pub struct PipelineBuilder<In>(PhantomData<fn(In)>);
2324
2325impl<In> PipelineBuilder<In> {
2326    /// Create a new step pipeline entry point.
2327    pub fn new() -> Self {
2328        Self(PhantomData)
2329    }
2330
2331    /// Add the first step. Params resolved from the registry.
2332    pub fn then<Out, Params, S: IntoStep<In, Out, Params>>(
2333        self,
2334        f: S,
2335        registry: &Registry,
2336    ) -> PipelineChain<In, Out, ThenNode<IdentityNode, S::Step>> {
2337        PipelineChain {
2338            chain: ThenNode {
2339                prev: IdentityNode,
2340                step: f.into_step(registry),
2341            },
2342            _marker: PhantomData,
2343        }
2344    }
2345
2346    /// Add the first step as a scan with persistent accumulator.
2347    /// The step receives `&mut Acc` and the input, returning the output.
2348    /// State persists across invocations.
2349    pub fn scan<Acc, Out, Params, S>(
2350        self,
2351        initial: Acc,
2352        f: S,
2353        registry: &Registry,
2354    ) -> PipelineChain<In, Out, ScanNode<IdentityNode, S::Step, Acc>>
2355    where
2356        Acc: 'static,
2357        S: IntoScanStep<Acc, In, Out, Params>,
2358    {
2359        PipelineChain {
2360            chain: ScanNode {
2361                prev: IdentityNode,
2362                step: f.into_scan_step(registry),
2363                acc: initial,
2364            },
2365            _marker: PhantomData,
2366        }
2367    }
2368}
2369
2370impl<In> Default for PipelineBuilder<In> {
2371    fn default() -> Self {
2372        Self::new()
2373    }
2374}
2375
2376// =============================================================================
2377// PipelineChain — typestate builder
2378// =============================================================================
2379
2380/// Builder that composes pre-resolved pipeline steps via named chain nodes.
2381///
2382/// `In` is the pipeline's input type (fixed). `Out` is the current output.
2383/// `Chain` is the concrete chain type (nested named nodes, like iterator adapters).
2384///
2385/// Each combinator consumes `self`, wraps the previous chain in a new named
2386/// node, and returns a new `PipelineChain`. The compiler monomorphizes the
2387/// entire chain — zero virtual dispatch through steps. Named types (not
2388/// closures) preserve HRTB: `Pipeline<Chain>` can satisfy `for<'a> Handler<&'a T>`.
2389///
2390/// IntoStep-based methods (`.then()`, `.map()`, `.and_then()`, `.catch()`)
2391/// take `&Registry` to resolve Param state at build time. Closure-based
2392/// methods don't need the registry.
2393#[must_use = "pipeline chain does nothing until .build() is called"]
2394pub struct PipelineChain<In, Out, Chain> {
2395    pub(crate) chain: Chain,
2396    pub(crate) _marker: PhantomData<fn(In) -> Out>,
2397}
2398
2399// =============================================================================
2400// Core — any Out
2401// =============================================================================
2402
2403impl<In, Out, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
2404    /// Add a step. Params resolved from the registry.
2405    pub fn then<NewOut, Params, S: IntoStep<Out, NewOut, Params>>(
2406        self,
2407        f: S,
2408        registry: &Registry,
2409    ) -> PipelineChain<In, NewOut, ThenNode<Chain, S::Step>> {
2410        PipelineChain {
2411            chain: ThenNode {
2412                prev: self.chain,
2413                step: f.into_step(registry),
2414            },
2415            _marker: PhantomData,
2416        }
2417    }
2418
2419    /// Run the pipeline directly. No boxing, no `'static` on `In`.
2420    pub fn run(&mut self, world: &mut World, input: In) -> Out {
2421        self.chain.call(world, input)
2422    }
2423
2424    /// Dispatch pipeline output to a [`Handler<Out>`].
2425    ///
2426    /// Connects a pipeline's output to any handler — [`HandlerFn`](crate::HandlerFn),
2427    /// [`Callback`](crate::Callback), [`Pipeline`], or a combinator like
2428    /// [`fan_out!`](crate::fan_out).
2429    pub fn dispatch<H: Handler<Out>>(
2430        self,
2431        handler: H,
2432    ) -> PipelineChain<In, (), DispatchNode<Chain, H>> {
2433        PipelineChain {
2434            chain: DispatchNode {
2435                prev: self.chain,
2436                handler,
2437            },
2438            _marker: PhantomData,
2439        }
2440    }
2441
2442    /// Conditionally wrap the output in `Option`. `Some(val)` if
2443    /// the predicate returns true, `None` otherwise.
2444    ///
2445    /// Enters Option-combinator land — follow with `.map()`,
2446    /// `.and_then()`, `.filter()`, `.unwrap_or()`, etc.
2447    ///
2448    /// # Common Mistakes
2449    ///
2450    /// Guard takes `&In`, not `In` — the value passes through unchanged.
2451    ///
2452    /// ```compile_fail
2453    /// # use nexus_rt::{PipelineBuilder, WorldBuilder};
2454    /// # let mut wb = WorldBuilder::new();
2455    /// # let world = wb.build();
2456    /// # let reg = world.registry();
2457    /// // ERROR: takes u32 by value, should be &u32
2458    /// PipelineBuilder::<u32>::new()
2459    ///     .then(|x: u32| x, &reg)
2460    ///     .guard(|x: u32| x > 10, &reg);
2461    /// ```
2462    ///
2463    /// Fix: take by reference:
2464    /// ```ignore
2465    /// PipelineBuilder::<u32>::new()
2466    ///     .then(|x: u32| x, &reg)
2467    ///     .guard(|x: &u32| *x > 10, &reg);
2468    /// ```
2469    pub fn guard<Params, S: IntoRefStep<Out, bool, Params>>(
2470        self,
2471        f: S,
2472        registry: &Registry,
2473    ) -> PipelineChain<In, Option<Out>, GuardNode<Chain, S::Step>> {
2474        PipelineChain {
2475            chain: GuardNode {
2476                prev: self.chain,
2477                step: f.into_ref_step(registry),
2478            },
2479            _marker: PhantomData,
2480        }
2481    }
2482
2483    /// Observe the current value without consuming or changing it.
2484    ///
2485    /// The step receives `&Out`. The value passes through unchanged.
2486    /// Useful for logging, metrics, or debugging mid-chain.
2487    ///
2488    /// # Common Mistakes
2489    ///
2490    /// Tap takes `&In`, not `In`:
2491    /// ```compile_fail
2492    /// # use nexus_rt::{PipelineBuilder, WorldBuilder};
2493    /// # let mut wb = WorldBuilder::new();
2494    /// # let world = wb.build();
2495    /// # let reg = world.registry();
2496    /// // ERROR: takes u32 by value
2497    /// PipelineBuilder::<u32>::new()
2498    ///     .then(|x: u32| x, &reg)
2499    ///     .tap(|x: u32| println!("{x}"), &reg);
2500    /// ```
2501    pub fn tap<Params, S: IntoRefStep<Out, (), Params>>(
2502        self,
2503        f: S,
2504        registry: &Registry,
2505    ) -> PipelineChain<In, Out, TapNode<Chain, S::Step>> {
2506        PipelineChain {
2507            chain: TapNode {
2508                prev: self.chain,
2509                step: f.into_ref_step(registry),
2510            },
2511            _marker: PhantomData,
2512        }
2513    }
2514
2515    /// Binary conditional routing. Evaluates the predicate on the
2516    /// current value, then moves it into exactly one of two arms.
2517    ///
2518    /// Both arms must produce the same output type. Build each arm as
2519    /// a sub-pipeline from [`PipelineBuilder`]. For N-ary routing, nest
2520    /// `route` calls in the false arm.
2521    ///
2522    /// ```ignore
2523    /// let large = PipelineBuilder::new().then(large_check, reg).then(submit, reg);
2524    /// let small = PipelineBuilder::new().then(submit, reg);
2525    ///
2526    /// PipelineBuilder::<Order>::new()
2527    ///     .then(validate, reg)
2528    ///     .route(|order: &Order| order.size > 1000, reg, large, small)
2529    ///     .build();
2530    /// ```
2531    pub fn route<NewOut, C0, C1, Params, Pred: IntoRefStep<Out, bool, Params>>(
2532        self,
2533        pred: Pred,
2534        registry: &Registry,
2535        on_true: PipelineChain<Out, NewOut, C0>,
2536        on_false: PipelineChain<Out, NewOut, C1>,
2537    ) -> PipelineChain<In, NewOut, RouteNode<Chain, Pred::Step, C0, C1>>
2538    where
2539        C0: ChainCall<Out, Out = NewOut>,
2540        C1: ChainCall<Out, Out = NewOut>,
2541    {
2542        PipelineChain {
2543            chain: RouteNode {
2544                prev: self.chain,
2545                pred: pred.into_ref_step(registry),
2546                on_true: on_true.chain,
2547                on_false: on_false.chain,
2548            },
2549            _marker: PhantomData,
2550        }
2551    }
2552
2553    /// Fork off a multi-step side-effect chain. The arm borrows
2554    /// `&Out`, runs to completion (producing `()`), and the
2555    /// original value passes through unchanged.
2556    ///
2557    /// Multi-step version of [`tap`](Self::tap) — the arm has the
2558    /// full DAG combinator API with Param resolution. Build with
2559    /// [`DagArmSeed::new()`](crate::dag::DagArmSeed::new).
2560    pub fn tee<C>(self, side: DagArm<Out, (), C>) -> PipelineChain<In, Out, TeeNode<Chain, C>>
2561    where
2562        C: for<'a> ChainCall<&'a Out, Out = ()>,
2563    {
2564        PipelineChain {
2565            chain: TeeNode {
2566                prev: self.chain,
2567                side: side.chain,
2568            },
2569            _marker: PhantomData,
2570        }
2571    }
2572
2573    /// Scan with persistent accumulator. The step receives `&mut Acc`
2574    /// and the current value, returning the new output. State persists
2575    /// across invocations.
2576    ///
2577    /// # Examples
2578    ///
2579    /// ```ignore
2580    /// // Running sum — suppress values below threshold
2581    /// PipelineBuilder::<u64>::new()
2582    ///     .then(identity, reg)
2583    ///     .scan(0u64, |acc: &mut u64, val: u64| {
2584    ///         *acc += val;
2585    ///         if *acc > 100 { Some(*acc) } else { None }
2586    ///     }, reg)
2587    ///     .build();
2588    /// ```
2589    pub fn scan<Acc, NewOut, Params, S>(
2590        self,
2591        initial: Acc,
2592        f: S,
2593        registry: &Registry,
2594    ) -> PipelineChain<In, NewOut, ScanNode<Chain, S::Step, Acc>>
2595    where
2596        Acc: 'static,
2597        S: IntoScanStep<Acc, Out, NewOut, Params>,
2598    {
2599        PipelineChain {
2600            chain: ScanNode {
2601                prev: self.chain,
2602                step: f.into_scan_step(registry),
2603                acc: initial,
2604            },
2605            _marker: PhantomData,
2606        }
2607    }
2608}
2609
2610// =============================================================================
2611// Splat — tuple destructuring into individual function arguments
2612// =============================================================================
2613//
2614// `.splat()` transitions from a tuple output to a builder whose `.then()`
2615// accepts a function taking the tuple elements as individual arguments.
2616// After `.splat().then(f, reg)`, the user is back on PipelineChain.
2617//
2618// Builder types are `#[doc(hidden)]` — users only see `.splat().then()`.
2619
2620// -- Splat builder types ------------------------------------------------------
2621
2622macro_rules! define_splat_builders {
2623    (
2624        $N:literal,
2625        start: $SplatStart:ident,
2626        mid: $SplatBuilder:ident,
2627        node: $SplatThenNode:ident,
2628        into_trait: $IntoSplatStep:ident,
2629        call_trait: $SplatCall:ident,
2630        ($($T:ident),+),
2631        ($($idx:tt),+)
2632    ) => {
2633        /// Chain node for `.splat().then()` — destructures tuple into individual arguments.
2634        #[doc(hidden)]
2635        pub struct $SplatThenNode<Prev, S> {
2636            prev: Prev,
2637            step: S,
2638        }
2639
2640        impl<In, $($T,)+ Prev, S> ChainCall<In> for $SplatThenNode<Prev, S>
2641        where
2642            Prev: ChainCall<In, Out = ($($T,)+)>,
2643            S: $SplatCall<$($T),+>,
2644        {
2645            type Out = S::Out;
2646            fn call(&mut self, world: &mut World, input: In) -> S::Out {
2647                let tuple = self.prev.call(world, input);
2648                self.step.call_splat(world, $(tuple.$idx),+)
2649            }
2650        }
2651
2652        /// Splat builder at pipeline start position.
2653        #[doc(hidden)]
2654        pub struct $SplatStart<$($T),+>(PhantomData<fn(($($T,)+))>);
2655
2656        impl<$($T),+> $SplatStart<$($T),+> {
2657            /// Add a step that receives the tuple elements as individual arguments.
2658            pub fn then<Out, Params, S>(
2659                self,
2660                f: S,
2661                registry: &Registry,
2662            ) -> PipelineChain<($($T,)+), Out, $SplatThenNode<IdentityNode, S::Step>>
2663            where
2664                S: $IntoSplatStep<$($T,)+ Out, Params>,
2665            {
2666                PipelineChain {
2667                    chain: $SplatThenNode {
2668                        prev: IdentityNode,
2669                        step: f.into_splat_step(registry),
2670                    },
2671                    _marker: PhantomData,
2672                }
2673            }
2674        }
2675
2676        impl<$($T),+> PipelineBuilder<($($T,)+)> {
2677            /// Destructure the tuple input into individual function arguments.
2678            pub fn splat(self) -> $SplatStart<$($T),+> {
2679                $SplatStart(PhantomData)
2680            }
2681        }
2682
2683        /// Splat builder at mid-chain position.
2684        #[doc(hidden)]
2685        pub struct $SplatBuilder<In, $($T,)+ Chain> {
2686            chain: Chain,
2687            _marker: PhantomData<fn(In) -> ($($T,)+)>,
2688        }
2689
2690        impl<In, $($T,)+ Chain: ChainCall<In, Out = ($($T,)+)>> $SplatBuilder<In, $($T,)+ Chain> {
2691            /// Add a step that receives the tuple elements as individual arguments.
2692            pub fn then<Out, Params, S>(
2693                self,
2694                f: S,
2695                registry: &Registry,
2696            ) -> PipelineChain<In, Out, $SplatThenNode<Chain, S::Step>>
2697            where
2698                S: $IntoSplatStep<$($T,)+ Out, Params>,
2699            {
2700                PipelineChain {
2701                    chain: $SplatThenNode {
2702                        prev: self.chain,
2703                        step: f.into_splat_step(registry),
2704                    },
2705                    _marker: PhantomData,
2706                }
2707            }
2708        }
2709
2710        impl<In, $($T,)+ Chain: ChainCall<In, Out = ($($T,)+)>> PipelineChain<In, ($($T,)+), Chain> {
2711            /// Destructure the tuple output into individual function arguments.
2712            pub fn splat(self) -> $SplatBuilder<In, $($T,)+ Chain> {
2713                $SplatBuilder {
2714                    chain: self.chain,
2715                    _marker: PhantomData,
2716                }
2717            }
2718        }
2719    };
2720}
2721
2722define_splat_builders!(2,
2723    start: SplatStart2,
2724    mid: SplatBuilder2,
2725    node: SplatThenNode2,
2726    into_trait: IntoSplatStep2,
2727    call_trait: SplatCall2,
2728    (A, B),
2729    (0, 1)
2730);
2731
2732define_splat_builders!(3,
2733    start: SplatStart3,
2734    mid: SplatBuilder3,
2735    node: SplatThenNode3,
2736    into_trait: IntoSplatStep3,
2737    call_trait: SplatCall3,
2738    (A, B, C),
2739    (0, 1, 2)
2740);
2741
2742define_splat_builders!(4,
2743    start: SplatStart4,
2744    mid: SplatBuilder4,
2745    node: SplatThenNode4,
2746    into_trait: IntoSplatStep4,
2747    call_trait: SplatCall4,
2748    (A, B, C, D),
2749    (0, 1, 2, 3)
2750);
2751
2752define_splat_builders!(5,
2753    start: SplatStart5,
2754    mid: SplatBuilder5,
2755    node: SplatThenNode5,
2756    into_trait: IntoSplatStep5,
2757    call_trait: SplatCall5,
2758    (A, B, C, D, E),
2759    (0, 1, 2, 3, 4)
2760);
2761
2762// =============================================================================
2763// Dedup — suppress unchanged values
2764// =============================================================================
2765
2766impl<In, Out: PartialEq + Clone, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
2767    /// Suppress consecutive unchanged values. Returns `Some(val)`
2768    /// when the value differs from the previous invocation, `None`
2769    /// when unchanged. First invocation always returns `Some`.
2770    ///
2771    /// Requires `PartialEq + Clone` — the previous value is stored
2772    /// internally for comparison.
2773    pub fn dedup(self) -> PipelineChain<In, Option<Out>, DedupNode<Chain, Out>> {
2774        PipelineChain {
2775            chain: DedupNode {
2776                prev: self.chain,
2777                last: None,
2778            },
2779            _marker: PhantomData,
2780        }
2781    }
2782}
2783
2784// =============================================================================
2785// Bool combinators
2786// =============================================================================
2787
2788impl<In, Chain: ChainCall<In, Out = bool>> PipelineChain<In, bool, Chain> {
2789    /// Invert a boolean value.
2790    #[allow(clippy::should_implement_trait)]
2791    pub fn not(self) -> PipelineChain<In, bool, NotNode<Chain>> {
2792        PipelineChain {
2793            chain: NotNode { prev: self.chain },
2794            _marker: PhantomData,
2795        }
2796    }
2797
2798    /// Short-circuit AND with a second boolean.
2799    ///
2800    /// If the chain produces `false`, the step is not called.
2801    pub fn and<Params, S: IntoProducer<bool, Params>>(
2802        self,
2803        f: S,
2804        registry: &Registry,
2805    ) -> PipelineChain<In, bool, AndBoolNode<Chain, S::Step>> {
2806        PipelineChain {
2807            chain: AndBoolNode {
2808                prev: self.chain,
2809                producer: f.into_producer(registry),
2810            },
2811            _marker: PhantomData,
2812        }
2813    }
2814
2815    /// Short-circuit OR with a second boolean.
2816    ///
2817    /// If the chain produces `true`, the step is not called.
2818    pub fn or<Params, S: IntoProducer<bool, Params>>(
2819        self,
2820        f: S,
2821        registry: &Registry,
2822    ) -> PipelineChain<In, bool, OrBoolNode<Chain, S::Step>> {
2823        PipelineChain {
2824            chain: OrBoolNode {
2825                prev: self.chain,
2826                producer: f.into_producer(registry),
2827            },
2828            _marker: PhantomData,
2829        }
2830    }
2831
2832    /// XOR with a second boolean.
2833    ///
2834    /// Both sides are always evaluated.
2835    pub fn xor<Params, S: IntoProducer<bool, Params>>(
2836        self,
2837        f: S,
2838        registry: &Registry,
2839    ) -> PipelineChain<In, bool, XorBoolNode<Chain, S::Step>> {
2840        PipelineChain {
2841            chain: XorBoolNode {
2842                prev: self.chain,
2843                producer: f.into_producer(registry),
2844            },
2845            _marker: PhantomData,
2846        }
2847    }
2848}
2849
2850// =============================================================================
2851// Clone helpers — &T → T transitions
2852// =============================================================================
2853
2854impl<'a, In, T: Clone, Chain: ChainCall<In, Out = &'a T>> PipelineChain<In, &'a T, Chain> {
2855    /// Clone a borrowed output to produce an owned value.
2856    ///
2857    /// Transitions the pipeline from `&T` to `T`. Uses UFCS
2858    /// (`T::clone(val)`) — `val.clone()` on a `&&T` resolves to
2859    /// `<&T as Clone>::clone` and returns `&T`, not `T`.
2860    pub fn cloned(self) -> PipelineChain<In, T, ClonedNode<Chain>> {
2861        PipelineChain {
2862            chain: ClonedNode { prev: self.chain },
2863            _marker: PhantomData,
2864        }
2865    }
2866}
2867
2868impl<'a, In, T: Clone, Chain: ChainCall<In, Out = Option<&'a T>>>
2869    PipelineChain<In, Option<&'a T>, Chain>
2870{
2871    /// Clone inner borrowed value. `Option<&T>` → `Option<T>`.
2872    pub fn cloned(self) -> PipelineChain<In, Option<T>, ClonedOptionNode<Chain>> {
2873        PipelineChain {
2874            chain: ClonedOptionNode { prev: self.chain },
2875            _marker: PhantomData,
2876        }
2877    }
2878}
2879
2880impl<'a, In, T: Clone, E, Chain: ChainCall<In, Out = Result<&'a T, E>>>
2881    PipelineChain<In, Result<&'a T, E>, Chain>
2882{
2883    /// Clone inner borrowed Ok value. `Result<&T, E>` → `Result<T, E>`.
2884    pub fn cloned(self) -> PipelineChain<In, Result<T, E>, ClonedResultNode<Chain>> {
2885        PipelineChain {
2886            chain: ClonedResultNode { prev: self.chain },
2887            _marker: PhantomData,
2888        }
2889    }
2890}
2891
2892// =============================================================================
2893// Option helpers — PipelineChain<In, Option<T>, Chain>
2894// =============================================================================
2895
2896impl<In, T, Chain: ChainCall<In, Out = Option<T>>> PipelineChain<In, Option<T>, Chain> {
2897    // -- IntoStep-based (hot path) -------------------------------------------
2898
2899    /// Transform the inner value. Step not called on None.
2900    pub fn map<U, Params, S: IntoStep<T, U, Params>>(
2901        self,
2902        f: S,
2903        registry: &Registry,
2904    ) -> PipelineChain<In, Option<U>, MapOptionNode<Chain, S::Step>> {
2905        PipelineChain {
2906            chain: MapOptionNode {
2907                prev: self.chain,
2908                step: f.into_step(registry),
2909            },
2910            _marker: PhantomData,
2911        }
2912    }
2913
2914    /// Short-circuits on None. std: `Option::and_then`
2915    pub fn and_then<U, Params, S: IntoStep<T, Option<U>, Params>>(
2916        self,
2917        f: S,
2918        registry: &Registry,
2919    ) -> PipelineChain<In, Option<U>, AndThenNode<Chain, S::Step>> {
2920        PipelineChain {
2921            chain: AndThenNode {
2922                prev: self.chain,
2923                step: f.into_step(registry),
2924            },
2925            _marker: PhantomData,
2926        }
2927    }
2928
2929    // -- Resolved (cold path, now with Param resolution) -----------------------
2930
2931    /// Side effect on None. Complement to [`inspect`](Self::inspect).
2932    pub fn on_none<Params, S: IntoProducer<(), Params>>(
2933        self,
2934        f: S,
2935        registry: &Registry,
2936    ) -> PipelineChain<In, Option<T>, OnNoneNode<Chain, S::Step>> {
2937        PipelineChain {
2938            chain: OnNoneNode {
2939                prev: self.chain,
2940                producer: f.into_producer(registry),
2941            },
2942            _marker: PhantomData,
2943        }
2944    }
2945
2946    /// Keep value if predicate holds. std: `Option::filter`
2947    ///
2948    /// # Common Mistakes
2949    ///
2950    /// Filter operates on `&T` inside the Option, not `T`:
2951    /// ```compile_fail
2952    /// # use nexus_rt::{PipelineBuilder, WorldBuilder};
2953    /// # let mut wb = WorldBuilder::new();
2954    /// # let world = wb.build();
2955    /// # let reg = world.registry();
2956    /// fn to_opt(x: u32) -> Option<u32> { Some(x) }
2957    /// // ERROR: takes u32, should be &u32
2958    /// PipelineBuilder::<u32>::new()
2959    ///     .then(to_opt, &reg)
2960    ///     .filter(|x: u32| x > 10, &reg);
2961    /// ```
2962    pub fn filter<Params, S: IntoRefStep<T, bool, Params>>(
2963        self,
2964        f: S,
2965        registry: &Registry,
2966    ) -> PipelineChain<In, Option<T>, FilterNode<Chain, S::Step>> {
2967        PipelineChain {
2968            chain: FilterNode {
2969                prev: self.chain,
2970                step: f.into_ref_step(registry),
2971            },
2972            _marker: PhantomData,
2973        }
2974    }
2975
2976    /// Side effect on Some value. std: `Option::inspect`
2977    ///
2978    /// Takes `&T`, not `T` — the value passes through.
2979    pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
2980        self,
2981        f: S,
2982        registry: &Registry,
2983    ) -> PipelineChain<In, Option<T>, InspectOptionNode<Chain, S::Step>> {
2984        PipelineChain {
2985            chain: InspectOptionNode {
2986                prev: self.chain,
2987                step: f.into_ref_step(registry),
2988            },
2989            _marker: PhantomData,
2990        }
2991    }
2992
2993    /// None becomes Err(err). std: `Option::ok_or`
2994    ///
2995    /// `Clone` required because the pipeline may run many times —
2996    /// the error value is cloned on each `None` invocation.
2997    pub fn ok_or<E: Clone>(self, err: E) -> PipelineChain<In, Result<T, E>, OkOrNode<Chain, E>> {
2998        PipelineChain {
2999            chain: OkOrNode {
3000                prev: self.chain,
3001                err,
3002            },
3003            _marker: PhantomData,
3004        }
3005    }
3006
3007    /// None becomes Err(f()). std: `Option::ok_or_else`
3008    pub fn ok_or_else<E, Params, S: IntoProducer<E, Params>>(
3009        self,
3010        f: S,
3011        registry: &Registry,
3012    ) -> PipelineChain<In, Result<T, E>, OkOrElseNode<Chain, S::Step>> {
3013        PipelineChain {
3014            chain: OkOrElseNode {
3015                prev: self.chain,
3016                producer: f.into_producer(registry),
3017            },
3018            _marker: PhantomData,
3019        }
3020    }
3021
3022    /// Exit Option — None becomes the default value.
3023    ///
3024    /// `Clone` required because the pipeline may run many times —
3025    /// the default is cloned on each `None` invocation (unlike
3026    /// std's `unwrap_or` which consumes the value once).
3027    pub fn unwrap_or(self, default: T) -> PipelineChain<In, T, UnwrapOrOptionNode<Chain, T>>
3028    where
3029        T: Clone,
3030    {
3031        PipelineChain {
3032            chain: UnwrapOrOptionNode {
3033                prev: self.chain,
3034                default,
3035            },
3036            _marker: PhantomData,
3037        }
3038    }
3039
3040    /// Exit Option — None becomes `f()`.
3041    pub fn unwrap_or_else<Params, S: IntoProducer<T, Params>>(
3042        self,
3043        f: S,
3044        registry: &Registry,
3045    ) -> PipelineChain<In, T, UnwrapOrElseOptionNode<Chain, S::Step>> {
3046        PipelineChain {
3047            chain: UnwrapOrElseOptionNode {
3048                prev: self.chain,
3049                producer: f.into_producer(registry),
3050            },
3051            _marker: PhantomData,
3052        }
3053    }
3054}
3055
3056// =============================================================================
3057// Result helpers — PipelineChain<In, Result<T, E>, Chain>
3058// =============================================================================
3059
3060impl<In, T, E, Chain: ChainCall<In, Out = Result<T, E>>> PipelineChain<In, Result<T, E>, Chain> {
3061    // -- IntoStep-based (hot path) -------------------------------------------
3062
3063    /// Transform the Ok value. Step not called on Err.
3064    pub fn map<U, Params, S: IntoStep<T, U, Params>>(
3065        self,
3066        f: S,
3067        registry: &Registry,
3068    ) -> PipelineChain<In, Result<U, E>, MapResultNode<Chain, S::Step>> {
3069        PipelineChain {
3070            chain: MapResultNode {
3071                prev: self.chain,
3072                step: f.into_step(registry),
3073            },
3074            _marker: PhantomData,
3075        }
3076    }
3077
3078    /// Short-circuits on Err. std: `Result::and_then`
3079    pub fn and_then<U, Params, S: IntoStep<T, Result<U, E>, Params>>(
3080        self,
3081        f: S,
3082        registry: &Registry,
3083    ) -> PipelineChain<In, Result<U, E>, AndThenResultNode<Chain, S::Step>> {
3084        PipelineChain {
3085            chain: AndThenResultNode {
3086                prev: self.chain,
3087                step: f.into_step(registry),
3088            },
3089            _marker: PhantomData,
3090        }
3091    }
3092
3093    /// Handle error and transition to Option.
3094    ///
3095    /// `Ok(val)` becomes `Some(val)` — handler not called.
3096    /// `Err(err)` calls the handler, then produces `None`.
3097    pub fn catch<Params, S: IntoStep<E, (), Params>>(
3098        self,
3099        f: S,
3100        registry: &Registry,
3101    ) -> PipelineChain<In, Option<T>, CatchNode<Chain, S::Step>> {
3102        PipelineChain {
3103            chain: CatchNode {
3104                prev: self.chain,
3105                step: f.into_step(registry),
3106            },
3107            _marker: PhantomData,
3108        }
3109    }
3110
3111    // -- Resolved (cold path, now with Param resolution) -----------------------
3112
3113    /// Transform the error. std: `Result::map_err`
3114    pub fn map_err<E2, Params, S: IntoStep<E, E2, Params>>(
3115        self,
3116        f: S,
3117        registry: &Registry,
3118    ) -> PipelineChain<In, Result<T, E2>, MapErrNode<Chain, S::Step>> {
3119        PipelineChain {
3120            chain: MapErrNode {
3121                prev: self.chain,
3122                step: f.into_step(registry),
3123            },
3124            _marker: PhantomData,
3125        }
3126    }
3127
3128    /// Recover from Err. std: `Result::or_else`
3129    pub fn or_else<E2, Params, S: IntoStep<E, Result<T, E2>, Params>>(
3130        self,
3131        f: S,
3132        registry: &Registry,
3133    ) -> PipelineChain<In, Result<T, E2>, OrElseNode<Chain, S::Step>> {
3134        PipelineChain {
3135            chain: OrElseNode {
3136                prev: self.chain,
3137                step: f.into_step(registry),
3138            },
3139            _marker: PhantomData,
3140        }
3141    }
3142
3143    /// Side effect on Ok. std: `Result::inspect`
3144    ///
3145    /// Takes `&T`, not `T` — the value passes through.
3146    pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
3147        self,
3148        f: S,
3149        registry: &Registry,
3150    ) -> PipelineChain<In, Result<T, E>, InspectResultNode<Chain, S::Step>> {
3151        PipelineChain {
3152            chain: InspectResultNode {
3153                prev: self.chain,
3154                step: f.into_ref_step(registry),
3155            },
3156            _marker: PhantomData,
3157        }
3158    }
3159
3160    /// Side effect on Err. std: `Result::inspect_err`
3161    ///
3162    /// Takes `&E`, not `E` — the error passes through.
3163    pub fn inspect_err<Params, S: IntoRefStep<E, (), Params>>(
3164        self,
3165        f: S,
3166        registry: &Registry,
3167    ) -> PipelineChain<In, Result<T, E>, InspectErrNode<Chain, S::Step>> {
3168        PipelineChain {
3169            chain: InspectErrNode {
3170                prev: self.chain,
3171                step: f.into_ref_step(registry),
3172            },
3173            _marker: PhantomData,
3174        }
3175    }
3176
3177    /// Discard error, enter Option land. std: `Result::ok`
3178    pub fn ok(self) -> PipelineChain<In, Option<T>, OkResultNode<Chain>> {
3179        PipelineChain {
3180            chain: OkResultNode { prev: self.chain },
3181            _marker: PhantomData,
3182        }
3183    }
3184
3185    /// Exit Result — Err becomes the default value.
3186    ///
3187    /// `Clone` required because the pipeline may run many times —
3188    /// the default is cloned on each `Err` invocation (unlike
3189    /// std's `unwrap_or` which consumes the value once).
3190    pub fn unwrap_or(self, default: T) -> PipelineChain<In, T, UnwrapOrResultNode<Chain, T>>
3191    where
3192        T: Clone,
3193    {
3194        PipelineChain {
3195            chain: UnwrapOrResultNode {
3196                prev: self.chain,
3197                default,
3198            },
3199            _marker: PhantomData,
3200        }
3201    }
3202
3203    /// Exit Result — Err becomes `f(err)`.
3204    pub fn unwrap_or_else<Params, S: IntoStep<E, T, Params>>(
3205        self,
3206        f: S,
3207        registry: &Registry,
3208    ) -> PipelineChain<In, T, UnwrapOrElseResultNode<Chain, S::Step>> {
3209        PipelineChain {
3210            chain: UnwrapOrElseResultNode {
3211                prev: self.chain,
3212                step: f.into_step(registry),
3213            },
3214            _marker: PhantomData,
3215        }
3216    }
3217}
3218
3219// =============================================================================
3220// PipelineOutput — marker trait for build()
3221// =============================================================================
3222
3223mod pipeline_output_seal {
3224    pub trait Sealed {}
3225    impl Sealed for () {}
3226    impl Sealed for Option<()> {}
3227}
3228
3229/// Sealed marker trait for valid pipeline terminal types.
3230///
3231/// Only `()` and `Option<()>` satisfy this. A pipeline can only
3232/// `.build()` when its output is one of these types — add a final
3233/// `.then()` or `.dispatch()` that consumes the output.
3234#[diagnostic::on_unimplemented(
3235    message = "`build()` requires the pipeline output to be `()` or `Option<()>`",
3236    label = "this pipeline produces `{Self}`, not `()` or `Option<()>`",
3237    note = "add a final `.then()` or `.dispatch()` that consumes the output"
3238)]
3239pub trait PipelineOutput: pipeline_output_seal::Sealed {}
3240impl PipelineOutput for () {}
3241impl PipelineOutput for Option<()> {}
3242
3243// =============================================================================
3244// build — when Out: PipelineOutput (() or Option<()>)
3245// =============================================================================
3246
3247impl<In, Chain: ChainCall<In, Out = ()>> PipelineChain<In, (), Chain> {
3248    /// Finalize the pipeline into a [`Pipeline`].
3249    ///
3250    /// The returned pipeline is a concrete, monomorphized type — no boxing,
3251    /// no virtual dispatch. Call `.run()` directly for zero-cost execution,
3252    /// or wrap in `Box<dyn Handler<In>>` when type erasure is needed.
3253    ///
3254    /// Only available when the pipeline ends with `()` or `Option<()>`.
3255    /// If your chain produces a value, add a final `.then()` that consumes
3256    /// the output.
3257    #[must_use = "building a pipeline without storing it does nothing"]
3258    pub fn build(self) -> Pipeline<Chain> {
3259        Pipeline { chain: self.chain }
3260    }
3261}
3262
3263impl<In, Chain: ChainCall<In, Out = Option<()>>> PipelineChain<In, Option<()>, Chain> {
3264    /// Finalize the pipeline into a [`Pipeline`], discarding the `Option<()>`.
3265    ///
3266    /// Pipelines ending with `Option<()>` (e.g. after `.map()` on an
3267    /// `Option<T>` with a step that returns `()`) produce the same
3268    /// [`Pipeline`] as those ending with `()`.
3269    #[must_use = "building a pipeline without storing it does nothing"]
3270    pub fn build(self) -> Pipeline<DiscardOptionNode<Chain>> {
3271        Pipeline {
3272            chain: DiscardOptionNode { prev: self.chain },
3273        }
3274    }
3275}
3276
3277// =============================================================================
3278// build_batch — when Out: PipelineOutput (() or Option<()>)
3279// =============================================================================
3280
3281impl<In, Out: PipelineOutput, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
3282    /// Finalize into a [`BatchPipeline`] with a pre-allocated input buffer.
3283    ///
3284    /// Same pipeline chain as [`build`](PipelineChain::build), but the
3285    /// pipeline owns an input buffer that drivers fill between dispatch
3286    /// cycles. Each call to [`BatchPipeline::run`] drains the buffer,
3287    /// running every item through the chain independently.
3288    ///
3289    /// Available when the pipeline ends with `()` or `Option<()>` (e.g.
3290    /// after `.catch()` or `.filter()`). Pipelines producing values need
3291    /// a final `.then()` that consumes the output.
3292    ///
3293    /// `capacity` is the initial allocation — the buffer can grow if needed,
3294    /// but sizing it for the expected batch size avoids reallocation.
3295    #[must_use = "building a pipeline without storing it does nothing"]
3296    pub fn build_batch(self, capacity: usize) -> BatchPipeline<In, Chain> {
3297        BatchPipeline {
3298            input: Vec::with_capacity(capacity),
3299            chain: self.chain,
3300        }
3301    }
3302}
3303
3304// =============================================================================
3305// Pipeline<F> — built pipeline
3306// =============================================================================
3307
3308/// Built step pipeline implementing [`Handler<E>`](crate::Handler).
3309///
3310/// Created by [`PipelineChain::build`]. The entire pipeline chain is
3311/// monomorphized at compile time — no boxing, no virtual dispatch.
3312/// Call `.run()` directly for zero-cost execution, or wrap in
3313/// `Box<dyn Handler<E>>` when you need type erasure (single box).
3314///
3315/// Implements [`Handler<E>`](crate::Handler) for any event type `E`
3316/// that the chain accepts — including borrowed types like `&'a [u8]`.
3317/// Supports `for<'a> Handler<&'a T>` for zero-copy event dispatch.
3318pub struct Pipeline<F> {
3319    chain: F,
3320}
3321
3322impl<E, F: ChainCall<E, Out = ()> + Send> crate::Handler<E> for Pipeline<F> {
3323    fn run(&mut self, world: &mut World, event: E) {
3324        self.chain.call(world, event);
3325    }
3326}
3327
3328// =============================================================================
3329// BatchPipeline<In, F> — pipeline with owned input buffer
3330// =============================================================================
3331
3332/// Batch pipeline that owns a pre-allocated input buffer.
3333///
3334/// Created by [`PipelineChain::build_batch`]. Each item flows through
3335/// the full pipeline chain independently — the same per-item `Option`
3336/// and `Result` flow control as [`Pipeline`]. Errors are handled inline
3337/// (via `.catch()`, `.unwrap_or()`, etc.) and the batch continues to
3338/// the next item. No intermediate buffers between steps.
3339///
3340/// # Examples
3341///
3342/// ```
3343/// use nexus_rt::{WorldBuilder, ResMut, PipelineBuilder, Resource};
3344///
3345/// #[derive(Resource)]
3346/// struct Accum(u64);
3347///
3348/// let mut wb = WorldBuilder::new();
3349/// wb.register(Accum(0));
3350/// let mut world = wb.build();
3351///
3352/// fn accumulate(mut sum: ResMut<Accum>, x: u32) {
3353///     sum.0 += x as u64;
3354/// }
3355///
3356/// let r = world.registry();
3357/// let mut batch = PipelineBuilder::<u32>::new()
3358///     .then(accumulate, r)
3359///     .build_batch(64);
3360///
3361/// batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
3362/// batch.run(&mut world);
3363///
3364/// assert_eq!(world.resource::<Accum>().0, 15);
3365/// assert!(batch.input().is_empty());
3366/// ```
3367pub struct BatchPipeline<In, F> {
3368    input: Vec<In>,
3369    chain: F,
3370}
3371
3372impl<In, Out: PipelineOutput, F: ChainCall<In, Out = Out>> BatchPipeline<In, F> {
3373    /// Mutable access to the input buffer. Drivers fill this between
3374    /// dispatch cycles.
3375    pub fn input_mut(&mut self) -> &mut Vec<In> {
3376        &mut self.input
3377    }
3378
3379    /// Read-only access to the input buffer.
3380    pub fn input(&self) -> &[In] {
3381        &self.input
3382    }
3383
3384    /// Drain the input buffer, running each item through the pipeline.
3385    ///
3386    /// Each item gets independent `Option`/`Result` flow control — an
3387    /// error on one item does not affect subsequent items. After `run()`,
3388    /// the input buffer is empty but retains its allocation.
3389    pub fn run(&mut self, world: &mut World) {
3390        for item in self.input.drain(..) {
3391            let _ = self.chain.call(world, item);
3392        }
3393    }
3394}
3395
3396// =============================================================================
3397// resolve_step — pre-resolve a step for manual dispatch (owned input)
3398// =============================================================================
3399
3400/// Resolve a step for use in manual dispatch (e.g. inside an
3401/// [`Opaque`] closure passed to `.then()`).
3402///
3403/// Returns a closure with pre-resolved [`Param`] state — the same
3404/// build-time resolution that `.then()` performs, but as a standalone
3405/// value the caller can invoke from any context.
3406///
3407/// This is the pipeline (owned-input) counterpart of
3408/// [`dag::resolve_arm`](crate::dag::resolve_arm) (reference-input).
3409///
3410/// # Examples
3411///
3412/// ```ignore
3413/// let mut arm0 = resolve_step(handle_new, &reg);
3414/// let mut arm1 = resolve_step(handle_cancel, &reg);
3415///
3416/// pipeline.then(move |world: &mut World, order: Order| match order.kind {
3417///     OrderKind::New    => arm0(world, order),
3418///     OrderKind::Cancel => arm1(world, order),
3419/// }, &reg)
3420/// ```
3421pub fn resolve_step<In, Out, Params, S>(
3422    f: S,
3423    registry: &Registry,
3424) -> impl FnMut(&mut World, In) -> Out + use<In, Out, Params, S>
3425where
3426    In: 'static,
3427    Out: 'static,
3428    S: IntoStep<In, Out, Params>,
3429{
3430    let mut resolved = f.into_step(registry);
3431    move |world: &mut World, input: In| resolved.call(world, input)
3432}
3433
3434// =============================================================================
3435// Tests
3436// =============================================================================
3437
3438#[cfg(test)]
3439mod tests {
3440    use super::*;
3441    use crate::{Handler, IntoHandler, Local, Res, ResMut, WorldBuilder, fan_out};
3442
3443    // =========================================================================
3444    // Core dispatch
3445    // =========================================================================
3446
3447    #[test]
3448    fn step_pure_transform() {
3449        let mut world = WorldBuilder::new().build();
3450        let r = world.registry_mut();
3451        let mut p = PipelineBuilder::<u32>::new().then(|x: u32| x as u64 * 2, r);
3452        assert_eq!(p.run(&mut world, 5), 10u64);
3453    }
3454
3455    #[test]
3456    fn step_one_res() {
3457        let mut wb = WorldBuilder::new();
3458        wb.register::<u64>(10);
3459        let mut world = wb.build();
3460
3461        fn multiply(factor: Res<u64>, x: u32) -> u64 {
3462            *factor * x as u64
3463        }
3464
3465        let r = world.registry_mut();
3466        let mut p = PipelineBuilder::<u32>::new().then(multiply, r);
3467        assert_eq!(p.run(&mut world, 5), 50);
3468    }
3469
3470    #[test]
3471    fn step_one_res_mut() {
3472        let mut wb = WorldBuilder::new();
3473        wb.register::<u64>(0);
3474        let mut world = wb.build();
3475
3476        fn accumulate(mut total: ResMut<u64>, x: u32) {
3477            *total += x as u64;
3478        }
3479
3480        let r = world.registry_mut();
3481        let mut p = PipelineBuilder::<u32>::new().then(accumulate, r);
3482        p.run(&mut world, 10);
3483        p.run(&mut world, 5);
3484        assert_eq!(*world.resource::<u64>(), 15);
3485    }
3486
3487    #[test]
3488    fn step_two_params() {
3489        let mut wb = WorldBuilder::new();
3490        wb.register::<u64>(10);
3491        wb.register::<bool>(true);
3492        let mut world = wb.build();
3493
3494        fn conditional(factor: Res<u64>, flag: Res<bool>, x: u32) -> u64 {
3495            if *flag { *factor * x as u64 } else { 0 }
3496        }
3497
3498        let r = world.registry_mut();
3499        let mut p = PipelineBuilder::<u32>::new().then(conditional, r);
3500        assert_eq!(p.run(&mut world, 5), 50);
3501    }
3502
3503    #[test]
3504    fn step_chain_two() {
3505        let mut wb = WorldBuilder::new();
3506        wb.register::<u64>(2);
3507        let mut world = wb.build();
3508
3509        fn double(factor: Res<u64>, x: u32) -> u64 {
3510            *factor * x as u64
3511        }
3512
3513        let r = world.registry_mut();
3514        let mut p = PipelineBuilder::<u32>::new()
3515            .then(double, r)
3516            .then(|val: u64| val + 1, r);
3517        assert_eq!(p.run(&mut world, 5), 11); // 2*5 + 1
3518    }
3519
3520    // =========================================================================
3521    // Option combinators
3522    // =========================================================================
3523
3524    #[test]
3525    fn option_map_on_some() {
3526        let mut wb = WorldBuilder::new();
3527        wb.register::<u64>(10);
3528        let mut world = wb.build();
3529
3530        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
3531            *factor + x as u64
3532        }
3533
3534        let r = world.registry_mut();
3535        let mut p = PipelineBuilder::<u32>::new()
3536            .then(|x: u32| -> Option<u32> { Some(x) }, r)
3537            .map(add_factor, r);
3538        assert_eq!(p.run(&mut world, 5), Some(15));
3539    }
3540
3541    #[test]
3542    fn option_map_skips_none() {
3543        let mut wb = WorldBuilder::new();
3544        wb.register::<bool>(false);
3545        let mut world = wb.build();
3546
3547        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
3548            *flag = true;
3549            0
3550        }
3551
3552        let r = world.registry_mut();
3553        let mut p = PipelineBuilder::<u32>::new()
3554            .then(|_x: u32| -> Option<u32> { None }, r)
3555            .map(mark, r);
3556        assert_eq!(p.run(&mut world, 5), None);
3557        assert!(!*world.resource::<bool>());
3558    }
3559
3560    #[test]
3561    fn option_and_then_chains() {
3562        let mut wb = WorldBuilder::new();
3563        wb.register::<u64>(10);
3564        let mut world = wb.build();
3565
3566        fn check(min: Res<u64>, x: u32) -> Option<u64> {
3567            let val = x as u64;
3568            if val > *min { Some(val) } else { None }
3569        }
3570
3571        let r = world.registry_mut();
3572        let mut p = PipelineBuilder::<u32>::new()
3573            .then(|x: u32| Some(x), r)
3574            .and_then(check, r);
3575        assert_eq!(p.run(&mut world, 20), Some(20));
3576    }
3577
3578    #[test]
3579    fn option_and_then_short_circuits() {
3580        let mut wb = WorldBuilder::new();
3581        wb.register::<u64>(10);
3582        let mut world = wb.build();
3583
3584        fn check(min: Res<u64>, x: u32) -> Option<u64> {
3585            let val = x as u64;
3586            if val > *min { Some(val) } else { None }
3587        }
3588
3589        let r = world.registry_mut();
3590        let mut p = PipelineBuilder::<u32>::new()
3591            .then(|x: u32| Some(x), r)
3592            .and_then(check, r);
3593        assert_eq!(p.run(&mut world, 5), None);
3594    }
3595
3596    #[test]
3597    fn option_on_none_fires() {
3598        let mut wb = WorldBuilder::new();
3599        wb.register::<bool>(false);
3600        let mut world = wb.build();
3601
3602        let r = world.registry();
3603        let mut p = PipelineBuilder::<u32>::new()
3604            .then(|_x: u32| -> Option<u32> { None }, r)
3605            .on_none(
3606                |w: &mut World| {
3607                    *w.resource_mut::<bool>() = true;
3608                },
3609                r,
3610            );
3611        p.run(&mut world, 0);
3612        assert!(*world.resource::<bool>());
3613    }
3614
3615    #[test]
3616    fn option_filter_keeps() {
3617        let mut world = WorldBuilder::new().build();
3618        let r = world.registry_mut();
3619        let mut p = PipelineBuilder::<u32>::new()
3620            .then(|x: u32| Some(x), r)
3621            .filter(|x: &u32| *x > 3, r);
3622        assert_eq!(p.run(&mut world, 5), Some(5));
3623    }
3624
3625    #[test]
3626    fn option_filter_drops() {
3627        let mut world = WorldBuilder::new().build();
3628        let r = world.registry_mut();
3629        let mut p = PipelineBuilder::<u32>::new()
3630            .then(|x: u32| Some(x), r)
3631            .filter(|x: &u32| *x > 10, r);
3632        assert_eq!(p.run(&mut world, 5), None);
3633    }
3634
3635    // =========================================================================
3636    // Result combinators
3637    // =========================================================================
3638
3639    #[test]
3640    fn result_map_on_ok() {
3641        let mut wb = WorldBuilder::new();
3642        wb.register::<u64>(10);
3643        let mut world = wb.build();
3644
3645        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
3646            *factor + x as u64
3647        }
3648
3649        let r = world.registry_mut();
3650        let mut p = PipelineBuilder::<u32>::new()
3651            .then(|x: u32| -> Result<u32, String> { Ok(x) }, r)
3652            .map(add_factor, r);
3653        assert_eq!(p.run(&mut world, 5), Ok(15));
3654    }
3655
3656    #[test]
3657    fn result_map_skips_err() {
3658        let mut wb = WorldBuilder::new();
3659        wb.register::<bool>(false);
3660        let mut world = wb.build();
3661
3662        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
3663            *flag = true;
3664            0
3665        }
3666
3667        let r = world.registry_mut();
3668        let mut p = PipelineBuilder::<u32>::new()
3669            .then(|_x: u32| -> Result<u32, String> { Err("fail".into()) }, r)
3670            .map(mark, r);
3671        assert!(p.run(&mut world, 5).is_err());
3672        assert!(!*world.resource::<bool>());
3673    }
3674
3675    #[test]
3676    fn result_catch_handles_error() {
3677        let mut wb = WorldBuilder::new();
3678        wb.register::<String>(String::new());
3679        let mut world = wb.build();
3680
3681        fn log_error(mut log: ResMut<String>, err: String) {
3682            *log = err;
3683        }
3684
3685        let r = world.registry_mut();
3686        let mut p = PipelineBuilder::<u32>::new()
3687            .then(|_x: u32| -> Result<u32, String> { Err("caught".into()) }, r)
3688            .catch(log_error, r);
3689        assert_eq!(p.run(&mut world, 0), None);
3690        assert_eq!(world.resource::<String>().as_str(), "caught");
3691    }
3692
3693    #[test]
3694    fn result_catch_passes_ok() {
3695        let mut wb = WorldBuilder::new();
3696        wb.register::<String>(String::new());
3697        let mut world = wb.build();
3698
3699        fn log_error(mut log: ResMut<String>, err: String) {
3700            *log = err;
3701        }
3702
3703        let r = world.registry_mut();
3704        let mut p = PipelineBuilder::<u32>::new()
3705            .then(|x: u32| -> Result<u32, String> { Ok(x) }, r)
3706            .catch(log_error, r);
3707        assert_eq!(p.run(&mut world, 5), Some(5));
3708        assert!(world.resource::<String>().is_empty());
3709    }
3710
3711    // =========================================================================
3712    // Build + Handler
3713    // =========================================================================
3714
3715    #[test]
3716    fn build_produces_handler() {
3717        let mut wb = WorldBuilder::new();
3718        wb.register::<u64>(0);
3719        let mut world = wb.build();
3720
3721        fn accumulate(mut total: ResMut<u64>, x: u32) {
3722            *total += x as u64;
3723        }
3724
3725        let r = world.registry_mut();
3726        let mut pipeline = PipelineBuilder::<u32>::new().then(accumulate, r).build();
3727
3728        pipeline.run(&mut world, 10);
3729        pipeline.run(&mut world, 5);
3730        assert_eq!(*world.resource::<u64>(), 15);
3731    }
3732
3733    #[test]
3734    fn run_returns_output() {
3735        let mut wb = WorldBuilder::new();
3736        wb.register::<u64>(3);
3737        let mut world = wb.build();
3738
3739        fn multiply(factor: Res<u64>, x: u32) -> u64 {
3740            *factor * x as u64
3741        }
3742
3743        let r = world.registry_mut();
3744        let mut p = PipelineBuilder::<u32>::new().then(multiply, r);
3745        let result: u64 = p.run(&mut world, 7);
3746        assert_eq!(result, 21);
3747    }
3748
3749    // =========================================================================
3750    // Safety
3751    // =========================================================================
3752
3753    #[test]
3754    #[should_panic(expected = "not registered")]
3755    fn panics_on_missing_resource() {
3756        let mut world = WorldBuilder::new().build();
3757
3758        fn needs_u64(_val: Res<u64>, _x: u32) -> u32 {
3759            0
3760        }
3761
3762        let r = world.registry_mut();
3763        let _p = PipelineBuilder::<u32>::new().then(needs_u64, r);
3764    }
3765
3766    // =========================================================================
3767    // Access conflict detection
3768    // =========================================================================
3769
3770    #[test]
3771    #[should_panic(expected = "conflicting access")]
3772    fn step_duplicate_access_panics() {
3773        let mut wb = WorldBuilder::new();
3774        wb.register::<u64>(0);
3775        let mut world = wb.build();
3776
3777        fn bad(a: Res<u64>, b: ResMut<u64>, _x: u32) -> u32 {
3778            let _ = (*a, &*b);
3779            0
3780        }
3781
3782        let r = world.registry_mut();
3783        let _p = PipelineBuilder::<u32>::new().then(bad, r);
3784    }
3785
3786    // =========================================================================
3787    // Integration
3788    // =========================================================================
3789
3790    #[test]
3791    fn local_in_step() {
3792        let mut wb = WorldBuilder::new();
3793        wb.register::<u64>(0);
3794        let mut world = wb.build();
3795
3796        fn count(mut count: Local<u64>, mut total: ResMut<u64>, _x: u32) {
3797            *count += 1;
3798            *total = *count;
3799        }
3800
3801        let r = world.registry_mut();
3802        let mut p = PipelineBuilder::<u32>::new().then(count, r);
3803        p.run(&mut world, 0);
3804        p.run(&mut world, 0);
3805        p.run(&mut world, 0);
3806        assert_eq!(*world.resource::<u64>(), 3);
3807    }
3808
3809    // =========================================================================
3810    // Option combinators (extended)
3811    // =========================================================================
3812
3813    #[test]
3814    fn option_unwrap_or_some() {
3815        let mut world = WorldBuilder::new().build();
3816        let r = world.registry_mut();
3817        let mut p = PipelineBuilder::<u32>::new()
3818            .then(|x: u32| -> Option<u32> { Some(x) }, r)
3819            .unwrap_or(99);
3820        assert_eq!(p.run(&mut world, 5), 5);
3821    }
3822
3823    #[test]
3824    fn option_unwrap_or_none() {
3825        let mut world = WorldBuilder::new().build();
3826        let r = world.registry_mut();
3827        let mut p = PipelineBuilder::<u32>::new()
3828            .then(|_x: u32| -> Option<u32> { None }, r)
3829            .unwrap_or(99);
3830        assert_eq!(p.run(&mut world, 5), 99);
3831    }
3832
3833    #[test]
3834    fn option_unwrap_or_else() {
3835        let mut world = WorldBuilder::new().build();
3836        let r = world.registry_mut();
3837        let mut p = PipelineBuilder::<u32>::new()
3838            .then(|_x: u32| -> Option<u32> { None }, r)
3839            .unwrap_or_else(|| 42, r);
3840        assert_eq!(p.run(&mut world, 0), 42);
3841    }
3842
3843    #[test]
3844    fn option_ok_or() {
3845        let mut world = WorldBuilder::new().build();
3846        let r = world.registry_mut();
3847        let mut p = PipelineBuilder::<u32>::new()
3848            .then(|_x: u32| -> Option<u32> { None }, r)
3849            .ok_or("missing");
3850        assert_eq!(p.run(&mut world, 0), Err("missing"));
3851    }
3852
3853    #[test]
3854    fn option_ok_or_some() {
3855        let mut world = WorldBuilder::new().build();
3856        let r = world.registry_mut();
3857        let mut p = PipelineBuilder::<u32>::new()
3858            .then(|x: u32| -> Option<u32> { Some(x) }, r)
3859            .ok_or("missing");
3860        assert_eq!(p.run(&mut world, 7), Ok(7));
3861    }
3862
3863    #[test]
3864    fn option_ok_or_else() {
3865        let mut world = WorldBuilder::new().build();
3866        let r = world.registry_mut();
3867        let mut p = PipelineBuilder::<u32>::new()
3868            .then(|_x: u32| -> Option<u32> { None }, r)
3869            .ok_or_else(|| "computed", r);
3870        assert_eq!(p.run(&mut world, 0), Err("computed"));
3871    }
3872
3873    #[test]
3874    fn option_inspect_passes_through() {
3875        let mut wb = WorldBuilder::new();
3876        wb.register::<u64>(0);
3877        let mut world = wb.build();
3878        let r = world.registry_mut();
3879        let mut p = PipelineBuilder::<u32>::new()
3880            .then(|x: u32| -> Option<u32> { Some(x) }, r)
3881            .inspect(|_val: &u32| {}, r);
3882        // inspect should pass through the value unchanged.
3883        assert_eq!(p.run(&mut world, 10), Some(10));
3884    }
3885
3886    // =========================================================================
3887    // Result combinators (extended)
3888    // =========================================================================
3889
3890    #[test]
3891    fn result_map_err() {
3892        let mut world = WorldBuilder::new().build();
3893        let r = world.registry_mut();
3894        let mut p = PipelineBuilder::<u32>::new()
3895            .then(|_x: u32| -> Result<u32, i32> { Err(-1) }, r)
3896            .map_err(|e: i32| e.to_string(), r);
3897        assert_eq!(p.run(&mut world, 0), Err("-1".to_string()));
3898    }
3899
3900    #[test]
3901    fn result_map_err_ok_passthrough() {
3902        let mut world = WorldBuilder::new().build();
3903        let r = world.registry_mut();
3904        let mut p = PipelineBuilder::<u32>::new()
3905            .then(|x: u32| -> Result<u32, i32> { Ok(x) }, r)
3906            .map_err(|e: i32| e.to_string(), r);
3907        assert_eq!(p.run(&mut world, 5), Ok(5));
3908    }
3909
3910    #[test]
3911    fn result_or_else() {
3912        let mut world = WorldBuilder::new().build();
3913        let r = world.registry_mut();
3914        let mut p = PipelineBuilder::<u32>::new()
3915            .then(|_x: u32| -> Result<u32, &str> { Err("fail") }, r)
3916            .or_else(|_e: &str| Ok::<u32, &str>(42), r);
3917        assert_eq!(p.run(&mut world, 0), Ok(42));
3918    }
3919
3920    #[test]
3921    fn result_inspect_passes_through() {
3922        let mut world = WorldBuilder::new().build();
3923        let r = world.registry_mut();
3924        let mut p = PipelineBuilder::<u32>::new()
3925            .then(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
3926            .inspect(|_val: &u32| {}, r);
3927        // inspect should pass through Ok unchanged.
3928        assert_eq!(p.run(&mut world, 7), Ok(7));
3929    }
3930
3931    #[test]
3932    fn result_inspect_err_passes_through() {
3933        let mut world = WorldBuilder::new().build();
3934        let r = world.registry_mut();
3935        let mut p = PipelineBuilder::<u32>::new()
3936            .then(|_x: u32| -> Result<u32, &str> { Err("bad") }, r)
3937            .inspect_err(|_e: &&str| {}, r);
3938        // inspect_err should pass through Err unchanged.
3939        assert_eq!(p.run(&mut world, 0), Err("bad"));
3940    }
3941
3942    #[test]
3943    fn result_ok_converts() {
3944        let mut world = WorldBuilder::new().build();
3945        let r = world.registry_mut();
3946        let mut p = PipelineBuilder::<u32>::new()
3947            .then(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
3948            .ok();
3949        assert_eq!(p.run(&mut world, 5), Some(5));
3950    }
3951
3952    #[test]
3953    fn result_ok_drops_err() {
3954        let mut world = WorldBuilder::new().build();
3955        let r = world.registry_mut();
3956        let mut p = PipelineBuilder::<u32>::new()
3957            .then(|_x: u32| -> Result<u32, &str> { Err("gone") }, r)
3958            .ok();
3959        assert_eq!(p.run(&mut world, 0), None);
3960    }
3961
3962    #[test]
3963    fn result_unwrap_or() {
3964        let mut world = WorldBuilder::new().build();
3965        let r = world.registry_mut();
3966        let mut p = PipelineBuilder::<u32>::new()
3967            .then(|_x: u32| -> Result<u32, &str> { Err("x") }, r)
3968            .unwrap_or(99);
3969        assert_eq!(p.run(&mut world, 0), 99);
3970    }
3971
3972    #[test]
3973    fn result_unwrap_or_else() {
3974        let mut world = WorldBuilder::new().build();
3975        let r = world.registry_mut();
3976        let mut p = PipelineBuilder::<u32>::new()
3977            .then(|_x: u32| -> Result<u32, i32> { Err(-5) }, r)
3978            .unwrap_or_else(|e: i32| e.unsigned_abs(), r);
3979        assert_eq!(p.run(&mut world, 0), 5);
3980    }
3981
3982    // =========================================================================
3983    // Batch pipeline
3984    // =========================================================================
3985
3986    #[test]
3987    fn batch_accumulates() {
3988        let mut wb = WorldBuilder::new();
3989        wb.register::<u64>(0);
3990        let mut world = wb.build();
3991
3992        fn accumulate(mut sum: ResMut<u64>, x: u32) {
3993            *sum += x as u64;
3994        }
3995
3996        let r = world.registry_mut();
3997        let mut batch = PipelineBuilder::<u32>::new()
3998            .then(accumulate, r)
3999            .build_batch(16);
4000
4001        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
4002        batch.run(&mut world);
4003
4004        assert_eq!(*world.resource::<u64>(), 15);
4005        assert!(batch.input().is_empty());
4006    }
4007
4008    #[test]
4009    fn batch_retains_allocation() {
4010        let mut world = WorldBuilder::new().build();
4011        let r = world.registry_mut();
4012        let mut batch = PipelineBuilder::<u32>::new()
4013            .then(|_x: u32| {}, r)
4014            .build_batch(64);
4015
4016        batch.input_mut().extend_from_slice(&[1, 2, 3]);
4017        batch.run(&mut world);
4018
4019        assert!(batch.input().is_empty());
4020        assert!(batch.input_mut().capacity() >= 64);
4021    }
4022
4023    #[test]
4024    fn batch_empty_is_noop() {
4025        let mut wb = WorldBuilder::new();
4026        wb.register::<u64>(0);
4027        let mut world = wb.build();
4028
4029        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4030            *sum += x as u64;
4031        }
4032
4033        let r = world.registry_mut();
4034        let mut batch = PipelineBuilder::<u32>::new()
4035            .then(accumulate, r)
4036            .build_batch(16);
4037
4038        batch.run(&mut world);
4039        assert_eq!(*world.resource::<u64>(), 0);
4040    }
4041
4042    #[test]
4043    fn batch_catch_continues_on_error() {
4044        let mut wb = WorldBuilder::new();
4045        wb.register::<u64>(0);
4046        wb.register::<u32>(0);
4047        let mut world = wb.build();
4048
4049        fn validate(x: u32) -> Result<u32, &'static str> {
4050            if x > 0 { Ok(x) } else { Err("zero") }
4051        }
4052
4053        fn count_errors(mut errs: ResMut<u32>, _err: &'static str) {
4054            *errs += 1;
4055        }
4056
4057        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4058            *sum += x as u64;
4059        }
4060
4061        let r = world.registry_mut();
4062        let mut batch = PipelineBuilder::<u32>::new()
4063            .then(validate, r)
4064            .catch(count_errors, r)
4065            .map(accumulate, r)
4066            .build_batch(16);
4067
4068        // Items: 1, 0 (error), 2, 0 (error), 3
4069        batch.input_mut().extend_from_slice(&[1, 0, 2, 0, 3]);
4070        batch.run(&mut world);
4071
4072        assert_eq!(*world.resource::<u64>(), 6); // 1 + 2 + 3
4073        assert_eq!(*world.resource::<u32>(), 2); // 2 errors
4074    }
4075
4076    #[test]
4077    fn batch_filter_skips_items() {
4078        let mut wb = WorldBuilder::new();
4079        wb.register::<u64>(0);
4080        let mut world = wb.build();
4081
4082        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4083            *sum += x as u64;
4084        }
4085
4086        let r = world.registry_mut();
4087        let mut batch = PipelineBuilder::<u32>::new()
4088            .then(
4089                |x: u32| -> Option<u32> { if x > 2 { Some(x) } else { None } },
4090                r,
4091            )
4092            .map(accumulate, r)
4093            .build_batch(16);
4094
4095        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
4096        batch.run(&mut world);
4097
4098        assert_eq!(*world.resource::<u64>(), 12); // 3 + 4 + 5
4099    }
4100
4101    #[test]
4102    fn batch_multiple_runs_accumulate() {
4103        let mut wb = WorldBuilder::new();
4104        wb.register::<u64>(0);
4105        let mut world = wb.build();
4106
4107        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4108            *sum += x as u64;
4109        }
4110
4111        let r = world.registry_mut();
4112        let mut batch = PipelineBuilder::<u32>::new()
4113            .then(accumulate, r)
4114            .build_batch(16);
4115
4116        batch.input_mut().extend_from_slice(&[1, 2, 3]);
4117        batch.run(&mut world);
4118        assert_eq!(*world.resource::<u64>(), 6);
4119
4120        batch.input_mut().extend_from_slice(&[4, 5]);
4121        batch.run(&mut world);
4122        assert_eq!(*world.resource::<u64>(), 15);
4123    }
4124
4125    #[test]
4126    fn batch_with_world_access() {
4127        let mut wb = WorldBuilder::new();
4128        wb.register::<u64>(10); // multiplier
4129        wb.register::<Vec<u64>>(Vec::new());
4130        let mut world = wb.build();
4131
4132        fn multiply_and_collect(factor: Res<u64>, mut out: ResMut<Vec<u64>>, x: u32) {
4133            out.push(x as u64 * *factor);
4134        }
4135
4136        let r = world.registry_mut();
4137        let mut batch = PipelineBuilder::<u32>::new()
4138            .then(multiply_and_collect, r)
4139            .build_batch(16);
4140
4141        batch.input_mut().extend_from_slice(&[1, 2, 3]);
4142        batch.run(&mut world);
4143
4144        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[10, 20, 30]);
4145    }
4146
4147    // =========================================================================
4148    // Cloned combinator
4149    // =========================================================================
4150
4151    // Named functions for proper lifetime elision (&'a u32 → &'a u32).
4152    // Closures get two independent lifetimes and fail to compile.
4153    fn ref_identity(x: &u32) -> &u32 {
4154        x
4155    }
4156    #[allow(clippy::unnecessary_wraps)]
4157    fn ref_wrap_some(x: &u32) -> Option<&u32> {
4158        Some(x)
4159    }
4160    fn ref_wrap_none(_x: &u32) -> Option<&u32> {
4161        None
4162    }
4163    #[allow(clippy::unnecessary_wraps)]
4164    fn ref_wrap_ok(x: &u32) -> Result<&u32, String> {
4165        Ok(x)
4166    }
4167    fn ref_wrap_err(_x: &u32) -> Result<&u32, String> {
4168        Err("fail".into())
4169    }
4170
4171    #[test]
4172    fn cloned_bare() {
4173        let mut world = WorldBuilder::new().build();
4174        // val before p — val must outlive the pipeline's In = &u32
4175        let val = 42u32;
4176        let r = world.registry_mut();
4177        let mut p = PipelineBuilder::<&u32>::new()
4178            .then(ref_identity, r)
4179            .cloned();
4180        assert_eq!(p.run(&mut world, &val), 42u32);
4181    }
4182
4183    #[test]
4184    fn cloned_option_some() {
4185        let mut world = WorldBuilder::new().build();
4186        let val = 42u32;
4187        let r = world.registry_mut();
4188        let mut p = PipelineBuilder::<&u32>::new()
4189            .then(ref_wrap_some, r)
4190            .cloned();
4191        assert_eq!(p.run(&mut world, &val), Some(42u32));
4192    }
4193
4194    #[test]
4195    fn cloned_option_none() {
4196        let mut world = WorldBuilder::new().build();
4197        let val = 42u32;
4198        let r = world.registry_mut();
4199        let mut p = PipelineBuilder::<&u32>::new()
4200            .then(ref_wrap_none, r)
4201            .cloned();
4202        assert_eq!(p.run(&mut world, &val), None);
4203    }
4204
4205    #[test]
4206    fn cloned_result_ok() {
4207        let mut world = WorldBuilder::new().build();
4208        let val = 42u32;
4209        let r = world.registry_mut();
4210        let mut p = PipelineBuilder::<&u32>::new().then(ref_wrap_ok, r).cloned();
4211        assert_eq!(p.run(&mut world, &val), Ok(42u32));
4212    }
4213
4214    #[test]
4215    fn cloned_result_err() {
4216        let mut world = WorldBuilder::new().build();
4217        let val = 42u32;
4218        let r = world.registry_mut();
4219        let mut p = PipelineBuilder::<&u32>::new()
4220            .then(ref_wrap_err, r)
4221            .cloned();
4222        assert_eq!(p.run(&mut world, &val), Err("fail".into()));
4223    }
4224
4225    // =========================================================================
4226    // Dispatch combinator
4227    // =========================================================================
4228
4229    #[test]
4230    fn dispatch_to_handler() {
4231        let mut wb = WorldBuilder::new();
4232        wb.register::<u64>(0);
4233        let mut world = wb.build();
4234
4235        fn store(mut out: ResMut<u64>, val: u32) {
4236            *out = val as u64;
4237        }
4238
4239        let r = world.registry_mut();
4240        let handler = PipelineBuilder::<u32>::new().then(store, r).build();
4241
4242        let mut p = PipelineBuilder::<u32>::new()
4243            .then(|x: u32| x * 2, r)
4244            .dispatch(handler)
4245            .build();
4246
4247        p.run(&mut world, 5);
4248        assert_eq!(*world.resource::<u64>(), 10);
4249    }
4250
4251    #[test]
4252    fn dispatch_to_fanout() {
4253        let mut wb = WorldBuilder::new();
4254        wb.register::<u64>(0);
4255        wb.register::<i64>(0);
4256        let mut world = wb.build();
4257
4258        fn write_u64(mut sink: ResMut<u64>, event: &u32) {
4259            *sink += *event as u64;
4260        }
4261        fn write_i64(mut sink: ResMut<i64>, event: &u32) {
4262            *sink += *event as i64;
4263        }
4264
4265        let h1 = write_u64.into_handler(world.registry());
4266        let h2 = write_i64.into_handler(world.registry());
4267        let fan = fan_out!(h1, h2);
4268
4269        let r = world.registry_mut();
4270        let mut p = PipelineBuilder::<u32>::new()
4271            .then(|x: u32| x * 2, r)
4272            .dispatch(fan)
4273            .build();
4274
4275        p.run(&mut world, 5);
4276        assert_eq!(*world.resource::<u64>(), 10);
4277        assert_eq!(*world.resource::<i64>(), 10);
4278    }
4279
4280    #[test]
4281    fn dispatch_to_broadcast() {
4282        let mut wb = WorldBuilder::new();
4283        wb.register::<u64>(0);
4284        let mut world = wb.build();
4285
4286        fn write_u64(mut sink: ResMut<u64>, event: &u32) {
4287            *sink += *event as u64;
4288        }
4289
4290        let mut broadcast = crate::Broadcast::<u32>::new();
4291        broadcast.add(write_u64.into_handler(world.registry()));
4292        broadcast.add(write_u64.into_handler(world.registry()));
4293
4294        let r = world.registry_mut();
4295        let mut p = PipelineBuilder::<u32>::new()
4296            .then(|x: u32| x + 1, r)
4297            .dispatch(broadcast)
4298            .build();
4299
4300        p.run(&mut world, 4);
4301        assert_eq!(*world.resource::<u64>(), 10); // 5 + 5
4302    }
4303
4304    #[test]
4305    fn dispatch_build_produces_handler() {
4306        let mut wb = WorldBuilder::new();
4307        wb.register::<u64>(0);
4308        let mut world = wb.build();
4309
4310        fn store(mut out: ResMut<u64>, val: u32) {
4311            *out = val as u64;
4312        }
4313
4314        let r = world.registry_mut();
4315        let inner = PipelineBuilder::<u32>::new().then(store, r).build();
4316
4317        let mut pipeline: Box<dyn Handler<u32>> = Box::new(
4318            PipelineBuilder::<u32>::new()
4319                .then(|x: u32| x + 1, r)
4320                .dispatch(inner)
4321                .build(),
4322        );
4323
4324        pipeline.run(&mut world, 9);
4325        assert_eq!(*world.resource::<u64>(), 10);
4326    }
4327
4328    // -- Guard combinator --
4329
4330    #[test]
4331    fn pipeline_guard_keeps() {
4332        fn sink(mut out: ResMut<u64>, val: Option<u64>) {
4333            *out = val.unwrap_or(0);
4334        }
4335        let mut wb = WorldBuilder::new();
4336        wb.register::<u64>(0);
4337        let mut world = wb.build();
4338        let reg = world.registry();
4339
4340        let mut p = PipelineBuilder::<u32>::new()
4341            .then(|x: u32| x as u64, reg)
4342            .guard(|v: &u64| *v > 3, reg)
4343            .then(sink, reg);
4344
4345        p.run(&mut world, 5u32);
4346        assert_eq!(*world.resource::<u64>(), 5);
4347    }
4348
4349    #[test]
4350    fn pipeline_guard_drops() {
4351        fn sink(mut out: ResMut<u64>, val: Option<u64>) {
4352            *out = val.unwrap_or(999);
4353        }
4354        let mut wb = WorldBuilder::new();
4355        wb.register::<u64>(0);
4356        let mut world = wb.build();
4357        let reg = world.registry();
4358
4359        let mut p = PipelineBuilder::<u32>::new()
4360            .then(|x: u32| x as u64, reg)
4361            .guard(|v: &u64| *v > 10, reg)
4362            .then(sink, reg);
4363
4364        p.run(&mut world, 5u32);
4365        assert_eq!(*world.resource::<u64>(), 999);
4366    }
4367
4368    // -- Tap combinator --
4369
4370    #[test]
4371    fn pipeline_tap_observes_without_changing() {
4372        fn sink(mut out: ResMut<u64>, val: u64) {
4373            *out = val;
4374        }
4375        let mut wb = WorldBuilder::new();
4376        wb.register::<u64>(0);
4377        wb.register::<bool>(false);
4378        let mut world = wb.build();
4379        let reg = world.registry();
4380
4381        let mut p = PipelineBuilder::<u32>::new()
4382            .then(|x: u32| x as u64 * 2, reg)
4383            .tap(
4384                |w: &mut World, val: &u64| {
4385                    *w.resource_mut::<bool>() = *val == 10;
4386                },
4387                reg,
4388            )
4389            .then(sink, reg);
4390
4391        p.run(&mut world, 5u32);
4392        assert_eq!(*world.resource::<u64>(), 10); // value passed through
4393        assert!(*world.resource::<bool>()); // tap fired
4394    }
4395
4396    // -- Route combinator --
4397
4398    #[test]
4399    fn pipeline_route_true_arm() {
4400        fn sink(mut out: ResMut<u64>, val: u64) {
4401            *out = val;
4402        }
4403        let mut wb = WorldBuilder::new();
4404        wb.register::<u64>(0);
4405        let mut world = wb.build();
4406        let reg = world.registry();
4407
4408        let arm_t = PipelineBuilder::new().then(|x: u64| x * 2, reg);
4409        let arm_f = PipelineBuilder::new().then(|x: u64| x * 3, reg);
4410
4411        let mut p = PipelineBuilder::<u32>::new()
4412            .then(|x: u32| x as u64, reg)
4413            .route(|v: &u64| *v > 3, reg, arm_t, arm_f)
4414            .then(sink, reg);
4415
4416        p.run(&mut world, 5u32); // 5 > 3 → true arm → double → 10
4417        assert_eq!(*world.resource::<u64>(), 10);
4418    }
4419
4420    #[test]
4421    fn pipeline_route_false_arm() {
4422        fn sink(mut out: ResMut<u64>, val: u64) {
4423            *out = val;
4424        }
4425        let mut wb = WorldBuilder::new();
4426        wb.register::<u64>(0);
4427        let mut world = wb.build();
4428        let reg = world.registry();
4429
4430        let arm_t = PipelineBuilder::new().then(|x: u64| x * 2, reg);
4431        let arm_f = PipelineBuilder::new().then(|x: u64| x * 3, reg);
4432
4433        let mut p = PipelineBuilder::<u32>::new()
4434            .then(|x: u32| x as u64, reg)
4435            .route(|v: &u64| *v > 10, reg, arm_t, arm_f)
4436            .then(sink, reg);
4437
4438        p.run(&mut world, 5u32); // 5 <= 10 → false arm → triple → 15
4439        assert_eq!(*world.resource::<u64>(), 15);
4440    }
4441
4442    #[test]
4443    fn pipeline_route_nested() {
4444        fn sink(mut out: ResMut<u64>, val: u64) {
4445            *out = val;
4446        }
4447        let mut wb = WorldBuilder::new();
4448        wb.register::<u64>(0);
4449        let mut world = wb.build();
4450        let reg = world.registry();
4451
4452        // N-ary via nesting: <5 → +100, 5..10 → +200, >=10 → +300
4453        let inner_t = PipelineBuilder::new().then(|x: u64| x + 200, reg);
4454        let inner_f = PipelineBuilder::new().then(|x: u64| x + 300, reg);
4455        let outer_t = PipelineBuilder::new().then(|x: u64| x + 100, reg);
4456        let outer_f = PipelineBuilder::new().then(|x: u64| x, reg).route(
4457            |v: &u64| *v < 10,
4458            reg,
4459            inner_t,
4460            inner_f,
4461        );
4462
4463        let mut p = PipelineBuilder::<u32>::new()
4464            .then(|x: u32| x as u64, reg)
4465            .route(|v: &u64| *v < 5, reg, outer_t, outer_f)
4466            .then(sink, reg);
4467
4468        p.run(&mut world, 3u32); // 3 < 5 → +100 → 103
4469        assert_eq!(*world.resource::<u64>(), 103);
4470
4471        p.run(&mut world, 7u32); // 7 >= 5, 7 < 10 → +200 → 207
4472        assert_eq!(*world.resource::<u64>(), 207);
4473
4474        p.run(&mut world, 15u32); // 15 >= 5, 15 >= 10 → +300 → 315
4475        assert_eq!(*world.resource::<u64>(), 315);
4476    }
4477
4478    // -- Tee combinator --
4479
4480    #[test]
4481    fn pipeline_tee_side_effect_chain() {
4482        use crate::dag::DagArmSeed;
4483
4484        fn log_step(mut counter: ResMut<u32>, _val: &u64) {
4485            *counter += 1;
4486        }
4487        fn sink(mut out: ResMut<u64>, val: u64) {
4488            *out = val;
4489        }
4490        let mut wb = WorldBuilder::new();
4491        wb.register::<u64>(0);
4492        wb.register::<u32>(0);
4493        let mut world = wb.build();
4494        let reg = world.registry();
4495
4496        let side = DagArmSeed::new().then(log_step, reg);
4497
4498        let mut p = PipelineBuilder::<u32>::new()
4499            .then(|x: u32| x as u64 * 2, reg)
4500            .tee(side)
4501            .then(sink, reg);
4502
4503        p.run(&mut world, 5u32);
4504        assert_eq!(*world.resource::<u64>(), 10); // value passed through
4505        assert_eq!(*world.resource::<u32>(), 1); // side-effect fired
4506
4507        p.run(&mut world, 7u32);
4508        assert_eq!(*world.resource::<u64>(), 14);
4509        assert_eq!(*world.resource::<u32>(), 2);
4510    }
4511
4512    // -- Dedup combinator --
4513
4514    #[test]
4515    fn pipeline_dedup_suppresses_unchanged() {
4516        fn sink(mut out: ResMut<u32>, val: Option<u64>) {
4517            if val.is_some() {
4518                *out += 1;
4519            }
4520        }
4521        let mut wb = WorldBuilder::new();
4522        wb.register::<u32>(0);
4523        let mut world = wb.build();
4524        let reg = world.registry();
4525
4526        let mut p = PipelineBuilder::<u32>::new()
4527            .then(|x: u32| x as u64 / 2, reg)
4528            .dedup()
4529            .then(sink, reg);
4530
4531        p.run(&mut world, 4u32); // 2 — first, Some
4532        assert_eq!(*world.resource::<u32>(), 1);
4533
4534        p.run(&mut world, 5u32); // 2 — same, None
4535        assert_eq!(*world.resource::<u32>(), 1);
4536
4537        p.run(&mut world, 6u32); // 3 — changed, Some
4538        assert_eq!(*world.resource::<u32>(), 2);
4539    }
4540
4541    // -- Bool combinators --
4542
4543    #[test]
4544    fn pipeline_not() {
4545        fn sink(mut out: ResMut<bool>, val: bool) {
4546            *out = val;
4547        }
4548        let mut wb = WorldBuilder::new();
4549        wb.register::<bool>(false);
4550        let mut world = wb.build();
4551        let reg = world.registry();
4552
4553        let mut p = PipelineBuilder::<u32>::new()
4554            .then(|x: u32| x > 5, reg)
4555            .not()
4556            .then(sink, reg);
4557
4558        p.run(&mut world, 3u32); // 3 > 5 = false, not = true
4559        assert!(*world.resource::<bool>());
4560
4561        p.run(&mut world, 10u32); // 10 > 5 = true, not = false
4562        assert!(!*world.resource::<bool>());
4563    }
4564
4565    #[test]
4566    fn pipeline_and() {
4567        fn sink(mut out: ResMut<bool>, val: bool) {
4568            *out = val;
4569        }
4570        let mut wb = WorldBuilder::new();
4571        wb.register::<bool>(true);
4572        let mut world = wb.build();
4573        let reg = world.registry();
4574
4575        let mut p = PipelineBuilder::<u32>::new()
4576            .then(|x: u32| x > 5, reg)
4577            .and(|w: &mut World| *w.resource::<bool>(), reg)
4578            .then(sink, reg);
4579
4580        p.run(&mut world, 10u32); // true && true = true
4581        assert!(*world.resource::<bool>());
4582
4583        *world.resource_mut::<bool>() = false;
4584        p.run(&mut world, 10u32); // true && false = false
4585        assert!(!*world.resource::<bool>());
4586    }
4587
4588    #[test]
4589    fn pipeline_or() {
4590        fn sink(mut out: ResMut<bool>, val: bool) {
4591            *out = val;
4592        }
4593        let mut wb = WorldBuilder::new();
4594        wb.register::<bool>(false);
4595        let mut world = wb.build();
4596        let reg = world.registry();
4597
4598        let mut p = PipelineBuilder::<u32>::new()
4599            .then(|x: u32| x > 5, reg)
4600            .or(|w: &mut World| *w.resource::<bool>(), reg)
4601            .then(sink, reg);
4602
4603        p.run(&mut world, 3u32); // false || false = false
4604        assert!(!*world.resource::<bool>());
4605
4606        *world.resource_mut::<bool>() = true;
4607        p.run(&mut world, 3u32); // false || true = true
4608        assert!(*world.resource::<bool>());
4609    }
4610
4611    #[test]
4612    fn pipeline_xor() {
4613        fn sink(mut out: ResMut<bool>, val: bool) {
4614            *out = val;
4615        }
4616        let mut wb = WorldBuilder::new();
4617        wb.register::<bool>(true);
4618        let mut world = wb.build();
4619        let reg = world.registry();
4620
4621        let mut p = PipelineBuilder::<u32>::new()
4622            .then(|x: u32| x > 5, reg)
4623            .xor(|w: &mut World| *w.resource::<bool>(), reg)
4624            .then(sink, reg);
4625
4626        p.run(&mut world, 10u32); // true ^ true = false
4627        assert!(!*world.resource::<bool>());
4628    }
4629
4630    // =========================================================================
4631    // Splat — tuple destructuring
4632    // =========================================================================
4633
4634    #[test]
4635    fn splat2_closure_on_start() {
4636        let mut world = WorldBuilder::new().build();
4637        let r = world.registry_mut();
4638        let mut p = PipelineBuilder::<(u32, u64)>::new()
4639            .splat()
4640            .then(|a: u32, b: u64| a as u64 + b, r);
4641        assert_eq!(p.run(&mut world, (3, 7)), 10);
4642    }
4643
4644    #[test]
4645    fn splat2_named_fn_with_param() {
4646        let mut wb = WorldBuilder::new();
4647        wb.register::<u64>(100);
4648        let mut world = wb.build();
4649
4650        fn process(base: Res<u64>, a: u32, b: u32) -> u64 {
4651            *base + a as u64 + b as u64
4652        }
4653
4654        let r = world.registry_mut();
4655        let mut p = PipelineBuilder::<(u32, u32)>::new()
4656            .splat()
4657            .then(process, r);
4658        assert_eq!(p.run(&mut world, (3, 7)), 110);
4659    }
4660
4661    #[test]
4662    fn splat2_mid_chain() {
4663        let mut world = WorldBuilder::new().build();
4664        let r = world.registry_mut();
4665        let mut p = PipelineBuilder::<u32>::new()
4666            .then(|x: u32| (x, x * 2), r)
4667            .splat()
4668            .then(|a: u32, b: u32| a as u64 + b as u64, r);
4669        assert_eq!(p.run(&mut world, 5), 15); // 5 + 10
4670    }
4671
4672    #[test]
4673    fn splat3_closure_on_start() {
4674        let mut world = WorldBuilder::new().build();
4675        let r = world.registry_mut();
4676        let mut p = PipelineBuilder::<(u32, u32, u32)>::new()
4677            .splat()
4678            .then(|a: u32, b: u32, c: u32| a + b + c, r);
4679        assert_eq!(p.run(&mut world, (1, 2, 3)), 6);
4680    }
4681
4682    #[test]
4683    fn splat3_named_fn_with_param() {
4684        let mut wb = WorldBuilder::new();
4685        wb.register::<u64>(10);
4686        let mut world = wb.build();
4687
4688        fn process(factor: Res<u64>, a: u32, b: u32, c: u32) -> u64 {
4689            *factor * (a + b + c) as u64
4690        }
4691
4692        let r = world.registry_mut();
4693        let mut p = PipelineBuilder::<(u32, u32, u32)>::new()
4694            .splat()
4695            .then(process, r);
4696        assert_eq!(p.run(&mut world, (1, 2, 3)), 60);
4697    }
4698
4699    #[test]
4700    fn splat4_mid_chain() {
4701        let mut world = WorldBuilder::new().build();
4702        let r = world.registry_mut();
4703        let mut p = PipelineBuilder::<u32>::new()
4704            .then(|x: u32| (x, x + 1, x + 2, x + 3), r)
4705            .splat()
4706            .then(|a: u32, b: u32, c: u32, d: u32| (a + b + c + d) as u64, r);
4707        assert_eq!(p.run(&mut world, 10), 46); // 10+11+12+13
4708    }
4709
4710    #[test]
4711    fn splat5_closure_on_start() {
4712        let mut world = WorldBuilder::new().build();
4713        let r = world.registry_mut();
4714        let mut p = PipelineBuilder::<(u8, u8, u8, u8, u8)>::new().splat().then(
4715            |a: u8, b: u8, c: u8, d: u8, e: u8| {
4716                (a as u64) + (b as u64) + (c as u64) + (d as u64) + (e as u64)
4717            },
4718            r,
4719        );
4720        assert_eq!(p.run(&mut world, (1, 2, 3, 4, 5)), 15);
4721    }
4722
4723    #[test]
4724    fn splat_build_into_handler() {
4725        let mut wb = WorldBuilder::new();
4726        wb.register::<u64>(0);
4727        let mut world = wb.build();
4728
4729        fn store(mut out: ResMut<u64>, a: u32, b: u32) {
4730            *out = a as u64 + b as u64;
4731        }
4732
4733        let r = world.registry_mut();
4734        let mut pipeline = PipelineBuilder::<(u32, u32)>::new()
4735            .splat()
4736            .then(store, r)
4737            .build();
4738
4739        pipeline.run(&mut world, (3, 7));
4740        assert_eq!(*world.resource::<u64>(), 10);
4741    }
4742
4743    #[test]
4744    fn splat_build_batch() {
4745        let mut wb = WorldBuilder::new();
4746        wb.register::<u64>(0);
4747        let mut world = wb.build();
4748
4749        fn accumulate(mut sum: ResMut<u64>, a: u32, b: u32) {
4750            *sum += a as u64 + b as u64;
4751        }
4752
4753        let r = world.registry_mut();
4754        let mut batch = PipelineBuilder::<(u32, u32)>::new()
4755            .splat()
4756            .then(accumulate, r)
4757            .build_batch(8);
4758
4759        batch
4760            .input_mut()
4761            .extend_from_slice(&[(1, 2), (3, 4), (5, 6)]);
4762        batch.run(&mut world);
4763        assert_eq!(*world.resource::<u64>(), 21); // 3+7+11
4764    }
4765
4766    #[test]
4767    #[should_panic(expected = "conflicting access")]
4768    fn splat_access_conflict_detected() {
4769        let mut wb = WorldBuilder::new();
4770        wb.register::<u64>(0);
4771        let mut world = wb.build();
4772
4773        fn bad(a: ResMut<u64>, _b: ResMut<u64>, _x: u32, _y: u32) {
4774            let _ = a;
4775        }
4776
4777        let r = world.registry_mut();
4778        // Should panic on duplicate ResMut<u64>
4779        let _ = PipelineBuilder::<(u32, u32)>::new().splat().then(bad, r);
4780    }
4781
4782    // -- Then (previously switch) --
4783
4784    #[test]
4785    fn pipeline_then_branching() {
4786        fn double(x: u32) -> u64 {
4787            x as u64 * 2
4788        }
4789        fn sink(mut out: ResMut<u64>, val: u64) {
4790            *out = val;
4791        }
4792
4793        let mut wb = WorldBuilder::new();
4794        wb.register::<u64>(0);
4795        let mut world = wb.build();
4796        let reg = world.registry();
4797
4798        let mut pipeline = PipelineBuilder::<u32>::new()
4799            .then(double, reg)
4800            .then(|val: u64| if val > 10 { val * 100 } else { val + 1 }, reg)
4801            .then(sink, reg)
4802            .build();
4803
4804        pipeline.run(&mut world, 10u32); // 20 > 10 → 2000
4805        assert_eq!(*world.resource::<u64>(), 2000);
4806
4807        pipeline.run(&mut world, 3u32); // 6 <= 10 → 7
4808        assert_eq!(*world.resource::<u64>(), 7);
4809    }
4810
4811    #[test]
4812    fn pipeline_then_3_way() {
4813        fn sink(mut out: ResMut<u64>, val: u64) {
4814            *out = val;
4815        }
4816
4817        let mut wb = WorldBuilder::new();
4818        wb.register::<u64>(0);
4819        let mut world = wb.build();
4820        let reg = world.registry();
4821
4822        let mut pipeline = PipelineBuilder::<u32>::new()
4823            .then(
4824                |val: u32| match val % 3 {
4825                    0 => val as u64 + 100,
4826                    1 => val as u64 + 200,
4827                    _ => val as u64 + 300,
4828                },
4829                reg,
4830            )
4831            .then(sink, reg)
4832            .build();
4833
4834        pipeline.run(&mut world, 6u32); // 6 % 3 == 0 → 106
4835        assert_eq!(*world.resource::<u64>(), 106);
4836
4837        pipeline.run(&mut world, 7u32); // 7 % 3 == 1 → 207
4838        assert_eq!(*world.resource::<u64>(), 207);
4839
4840        pipeline.run(&mut world, 8u32); // 8 % 3 == 2 → 308
4841        assert_eq!(*world.resource::<u64>(), 308);
4842    }
4843
4844    #[test]
4845    fn pipeline_then_with_resolve_step() {
4846        fn add_offset(offset: Res<i64>, val: u32) -> u64 {
4847            (*offset + val as i64) as u64
4848        }
4849        fn plain_double(val: u32) -> u64 {
4850            val as u64 * 2
4851        }
4852        fn sink(mut out: ResMut<u64>, val: u64) {
4853            *out = val;
4854        }
4855
4856        let mut wb = WorldBuilder::new();
4857        wb.register::<u64>(0);
4858        wb.register::<i64>(100);
4859        let mut world = wb.build();
4860        let reg = world.registry();
4861
4862        let mut arm_offset = resolve_step(add_offset, reg);
4863        let mut arm_double = resolve_step(plain_double, reg);
4864
4865        let mut pipeline = PipelineBuilder::<u32>::new()
4866            .then(
4867                move |world: &mut World, val: u32| {
4868                    if val > 10 {
4869                        arm_offset(world, val)
4870                    } else {
4871                        arm_double(world, val)
4872                    }
4873                },
4874                reg,
4875            )
4876            .then(sink, reg)
4877            .build();
4878
4879        pipeline.run(&mut world, 20u32); // > 10 → offset → 100 + 20 = 120
4880        assert_eq!(*world.resource::<u64>(), 120);
4881
4882        pipeline.run(&mut world, 5u32); // <= 10 → double → 10
4883        assert_eq!(*world.resource::<u64>(), 10);
4884    }
4885
4886    #[test]
4887    fn batch_pipeline_then_branching() {
4888        fn sink(mut out: ResMut<u64>, val: u64) {
4889            *out += val;
4890        }
4891
4892        let mut wb = WorldBuilder::new();
4893        wb.register::<u64>(0);
4894        let mut world = wb.build();
4895        let reg = world.registry();
4896
4897        let mut batch = PipelineBuilder::<u32>::new()
4898            .then(
4899                |val: u32| {
4900                    if val % 2 == 0 {
4901                        val as u64 * 10
4902                    } else {
4903                        val as u64
4904                    }
4905                },
4906                reg,
4907            )
4908            .then(sink, reg)
4909            .build_batch(8);
4910
4911        batch.input_mut().extend([1, 2, 3, 4]);
4912        batch.run(&mut world);
4913
4914        // 1 → 1, 2 → 20, 3 → 3, 4 → 40 = 64
4915        assert_eq!(*world.resource::<u64>(), 64);
4916    }
4917
4918    // -- IntoRefStep with Param: named functions --
4919
4920    #[test]
4921    fn guard_named_fn_with_param() {
4922        fn above_threshold(threshold: Res<u64>, val: &u64) -> bool {
4923            *val > *threshold
4924        }
4925        fn sink(mut out: ResMut<i64>, val: Option<u64>) {
4926            *out = val.map_or(-1, |v| v as i64);
4927        }
4928        let mut wb = WorldBuilder::new();
4929        wb.register::<u64>(5); // threshold
4930        wb.register::<i64>(0);
4931        let mut world = wb.build();
4932        let reg = world.registry();
4933
4934        let mut p = PipelineBuilder::<u32>::new()
4935            .then(|x: u32| x as u64, reg)
4936            .guard(above_threshold, reg)
4937            .then(sink, reg);
4938
4939        p.run(&mut world, 10u32); // 10 > 5 → Some(10)
4940        assert_eq!(*world.resource::<i64>(), 10);
4941
4942        p.run(&mut world, 3u32); // 3 <= 5 → None → -1
4943        assert_eq!(*world.resource::<i64>(), -1);
4944    }
4945
4946    #[test]
4947    fn filter_named_fn_with_param() {
4948        fn is_allowed(allowed: Res<u64>, val: &u64) -> bool {
4949            *val != *allowed
4950        }
4951        fn count(mut ctr: ResMut<i64>, _val: u64) {
4952            *ctr += 1;
4953        }
4954        let mut wb = WorldBuilder::new();
4955        wb.register::<u64>(42); // blocked value
4956        wb.register::<i64>(0);
4957        let mut world = wb.build();
4958        let reg = world.registry();
4959
4960        let mut p = PipelineBuilder::<u32>::new()
4961            .then(|x: u32| -> Option<u64> { Some(x as u64) }, reg)
4962            .filter(is_allowed, reg)
4963            .map(count, reg)
4964            .unwrap_or(());
4965
4966        for v in [1u32, 42, 5, 42, 10] {
4967            p.run(&mut world, v);
4968        }
4969        assert_eq!(*world.resource::<i64>(), 3); // 42 filtered out twice
4970    }
4971
4972    #[test]
4973    fn inspect_named_fn_with_param() {
4974        fn log_value(mut log: ResMut<Vec<u64>>, val: &u64) {
4975            log.push(*val);
4976        }
4977        let mut wb = WorldBuilder::new();
4978        wb.register::<Vec<u64>>(Vec::new());
4979        let mut world = wb.build();
4980        let reg = world.registry();
4981
4982        let mut p = PipelineBuilder::<u32>::new()
4983            .then(|x: u32| -> Option<u64> { Some(x as u64) }, reg)
4984            .inspect(log_value, reg)
4985            .unwrap_or(0);
4986
4987        for v in [1u32, 2, 3] {
4988            p.run(&mut world, v);
4989        }
4990        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[1, 2, 3]);
4991    }
4992
4993    #[test]
4994    fn tap_named_fn_with_param() {
4995        fn observe(mut log: ResMut<Vec<u64>>, val: &u64) {
4996            log.push(*val);
4997        }
4998        fn sink(mut out: ResMut<u64>, val: u64) {
4999            *out = val;
5000        }
5001        let mut wb = WorldBuilder::new();
5002        wb.register::<u64>(0);
5003        wb.register::<Vec<u64>>(Vec::new());
5004        let mut world = wb.build();
5005        let reg = world.registry();
5006
5007        let mut p = PipelineBuilder::<u32>::new()
5008            .then(|x: u32| x as u64, reg)
5009            .tap(observe, reg)
5010            .then(sink, reg);
5011
5012        p.run(&mut world, 7u32);
5013        assert_eq!(*world.resource::<u64>(), 7);
5014        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[7]);
5015    }
5016
5017    // -- IntoProducer with Param: named functions --
5018
5019    #[test]
5020    fn and_named_fn_with_param() {
5021        fn check_enabled(flag: Res<bool>) -> bool {
5022            *flag
5023        }
5024        let mut wb = WorldBuilder::new();
5025        wb.register::<bool>(true);
5026        let mut world = wb.build();
5027        let reg = world.registry();
5028
5029        let mut p = PipelineBuilder::<u32>::new()
5030            .then(|_x: u32| true, reg)
5031            .and(check_enabled, reg);
5032
5033        assert!(p.run(&mut world, 0u32));
5034
5035        *world.resource_mut::<bool>() = false;
5036        assert!(!p.run(&mut world, 0u32)); // short-circuit: true AND false
5037    }
5038
5039    #[test]
5040    fn or_named_fn_with_param() {
5041        fn check_enabled(flag: Res<bool>) -> bool {
5042            *flag
5043        }
5044        let mut wb = WorldBuilder::new();
5045        wb.register::<bool>(true);
5046        let mut world = wb.build();
5047        let reg = world.registry();
5048
5049        let mut p = PipelineBuilder::<u32>::new()
5050            .then(|_x: u32| false, reg)
5051            .or(check_enabled, reg);
5052
5053        assert!(p.run(&mut world, 0u32)); // false OR true
5054
5055        *world.resource_mut::<bool>() = false;
5056        assert!(!p.run(&mut world, 0u32)); // false OR false
5057    }
5058
5059    #[test]
5060    fn on_none_named_fn_with_param() {
5061        fn log_miss(mut ctr: ResMut<u64>) {
5062            *ctr += 1;
5063        }
5064        let mut wb = WorldBuilder::new();
5065        wb.register::<u64>(0);
5066        let mut world = wb.build();
5067        let reg = world.registry();
5068
5069        let mut p = PipelineBuilder::<u32>::new()
5070            .then(
5071                |x: u32| -> Option<u32> { if x > 5 { Some(x) } else { None } },
5072                reg,
5073            )
5074            .on_none(log_miss, reg)
5075            .unwrap_or(0);
5076
5077        for v in [1u32, 10, 3, 20] {
5078            p.run(&mut world, v);
5079        }
5080        assert_eq!(*world.resource::<u64>(), 2); // 1 and 3 are None
5081    }
5082
5083    #[test]
5084    fn ok_or_else_named_fn_with_param() {
5085        fn make_error(msg: Res<String>) -> String {
5086            msg.clone()
5087        }
5088        let mut wb = WorldBuilder::new();
5089        wb.register::<String>("not found".into());
5090        let mut world = wb.build();
5091        let reg = world.registry();
5092
5093        let mut p = PipelineBuilder::<u32>::new()
5094            .then(
5095                |x: u32| -> Option<u32> { if x > 0 { Some(x) } else { None } },
5096                reg,
5097            )
5098            .ok_or_else(make_error, reg);
5099
5100        let r: Result<u32, String> = p.run(&mut world, 5u32);
5101        assert_eq!(r, Ok(5));
5102
5103        let r: Result<u32, String> = p.run(&mut world, 0u32);
5104        assert_eq!(r, Err("not found".into()));
5105    }
5106
5107    #[test]
5108    fn unwrap_or_else_option_named_fn_with_param() {
5109        fn fallback(default: Res<u64>) -> u64 {
5110            *default
5111        }
5112        let mut wb = WorldBuilder::new();
5113        wb.register::<u64>(42);
5114        let mut world = wb.build();
5115        let reg = world.registry();
5116
5117        let mut p = PipelineBuilder::<u32>::new()
5118            .then(
5119                |x: u32| -> Option<u64> { if x > 0 { Some(x as u64) } else { None } },
5120                reg,
5121            )
5122            .unwrap_or_else(fallback, reg);
5123
5124        assert_eq!(p.run(&mut world, 5u32), 5);
5125        assert_eq!(p.run(&mut world, 0u32), 42);
5126    }
5127
5128    // -- IntoStep with Opaque: &mut World closures --
5129
5130    #[test]
5131    fn map_err_named_fn_with_param() {
5132        fn tag_error(prefix: Res<String>, err: String) -> String {
5133            format!("{}: {err}", &*prefix)
5134        }
5135        fn sink(mut out: ResMut<String>, val: Result<u32, String>) {
5136            match val {
5137                Ok(v) => *out = format!("ok:{v}"),
5138                Err(e) => *out = e,
5139            }
5140        }
5141        let mut wb = WorldBuilder::new();
5142        wb.register::<String>("ERR".into());
5143        let mut world = wb.build();
5144        let reg = world.registry();
5145
5146        let mut p = PipelineBuilder::<u32>::new()
5147            .then(
5148                |x: u32| -> Result<u32, String> { if x > 0 { Ok(x) } else { Err("zero".into()) } },
5149                reg,
5150            )
5151            .map_err(tag_error, reg)
5152            .then(sink, reg);
5153
5154        p.run(&mut world, 0u32);
5155        assert_eq!(world.resource::<String>().as_str(), "ERR: zero");
5156
5157        p.run(&mut world, 5u32);
5158        assert_eq!(world.resource::<String>().as_str(), "ok:5");
5159    }
5160
5161    // =========================================================================
5162    // Scan combinator
5163    // =========================================================================
5164
5165    #[test]
5166    fn scan_arity0_closure_running_sum() {
5167        let mut world = WorldBuilder::new().build();
5168        let reg = world.registry();
5169
5170        let mut p = PipelineBuilder::<u64>::new().then(|x: u64| x, reg).scan(
5171            0u64,
5172            |acc: &mut u64, val: u64| {
5173                *acc += val;
5174                Some(*acc)
5175            },
5176            reg,
5177        );
5178
5179        assert_eq!(p.run(&mut world, 10), Some(10));
5180        assert_eq!(p.run(&mut world, 20), Some(30));
5181        assert_eq!(p.run(&mut world, 5), Some(35));
5182    }
5183
5184    #[test]
5185    fn scan_named_fn_with_param() {
5186        let mut wb = WorldBuilder::new();
5187        wb.register::<u64>(100);
5188        let mut world = wb.build();
5189        let reg = world.registry();
5190
5191        fn threshold_scan(limit: Res<u64>, acc: &mut u64, val: u64) -> Option<u64> {
5192            *acc += val;
5193            if *acc > *limit { Some(*acc) } else { None }
5194        }
5195
5196        let mut p =
5197            PipelineBuilder::<u64>::new()
5198                .then(|x: u64| x, reg)
5199                .scan(0u64, threshold_scan, reg);
5200
5201        assert_eq!(p.run(&mut world, 50), None);
5202        assert_eq!(p.run(&mut world, 30), None);
5203        assert_eq!(p.run(&mut world, 25), Some(105));
5204    }
5205
5206    #[test]
5207    fn scan_opaque_closure() {
5208        let mut wb = WorldBuilder::new();
5209        wb.register::<u64>(10);
5210        let mut world = wb.build();
5211        let reg = world.registry();
5212
5213        let mut p = PipelineBuilder::<u64>::new().then(|x: u64| x, reg).scan(
5214            0u64,
5215            |world: &mut World, acc: &mut u64, val: u64| {
5216                let factor = *world.resource::<u64>();
5217                *acc += val * factor;
5218                Some(*acc)
5219            },
5220            reg,
5221        );
5222
5223        assert_eq!(p.run(&mut world, 1), Some(10));
5224        assert_eq!(p.run(&mut world, 2), Some(30));
5225    }
5226
5227    #[test]
5228    fn scan_suppression_returns_none() {
5229        let mut world = WorldBuilder::new().build();
5230        let reg = world.registry();
5231
5232        let mut p = PipelineBuilder::<u64>::new().then(|x: u64| x, reg).scan(
5233            0u64,
5234            |acc: &mut u64, val: u64| -> Option<u64> {
5235                *acc += val;
5236                if *acc > 50 { Some(*acc) } else { None }
5237            },
5238            reg,
5239        );
5240
5241        assert_eq!(p.run(&mut world, 20), None);
5242        assert_eq!(p.run(&mut world, 20), None);
5243        assert_eq!(p.run(&mut world, 20), Some(60));
5244    }
5245
5246    #[test]
5247    fn scan_on_pipeline_start() {
5248        let mut world = WorldBuilder::new().build();
5249        let reg = world.registry();
5250
5251        let mut p = PipelineBuilder::<u64>::new().scan(
5252            0u64,
5253            |acc: &mut u64, val: u64| {
5254                *acc += val;
5255                *acc
5256            },
5257            reg,
5258        );
5259
5260        assert_eq!(p.run(&mut world, 5), 5);
5261        assert_eq!(p.run(&mut world, 3), 8);
5262        assert_eq!(p.run(&mut world, 2), 10);
5263    }
5264
5265    #[test]
5266    fn scan_persistence_across_batch() {
5267        let mut wb = WorldBuilder::new();
5268        wb.register::<u64>(0);
5269        let mut world = wb.build();
5270        let reg = world.registry();
5271
5272        fn store(mut out: ResMut<u64>, val: u64) {
5273            *out = val;
5274        }
5275
5276        let mut p = PipelineBuilder::<u64>::new()
5277            .then(|x: u64| x, reg)
5278            .scan(
5279                0u64,
5280                |acc: &mut u64, val: u64| {
5281                    *acc += val;
5282                    *acc
5283                },
5284                reg,
5285            )
5286            .then(store, reg)
5287            .build_batch(4);
5288
5289        p.input_mut().extend([1, 2, 3]);
5290        p.run(&mut world);
5291
5292        // Accumulator persists: 1, 3, 6
5293        assert_eq!(*world.resource::<u64>(), 6);
5294
5295        p.input_mut().push(4);
5296        p.run(&mut world);
5297        // acc = 6 + 4 = 10
5298        assert_eq!(*world.resource::<u64>(), 10);
5299    }
5300
5301    // =========================================================================
5302    // Build — Option<()> terminal
5303    // =========================================================================
5304
5305    #[test]
5306    fn build_option_unit_terminal() {
5307        let mut wb = WorldBuilder::new();
5308        wb.register::<u64>(0);
5309        let mut world = wb.build();
5310        let r = world.registry_mut();
5311
5312        fn check(x: u32) -> Option<u32> {
5313            if x > 5 { Some(x) } else { None }
5314        }
5315        fn store(mut out: ResMut<u64>, val: u32) {
5316            *out += val as u64;
5317        }
5318
5319        // .map(store) on Option<u32> produces Option<()> — build() must work
5320        let mut p = PipelineBuilder::<u32>::new()
5321            .then(check, r)
5322            .map(store, r)
5323            .build();
5324
5325        p.run(&mut world, 3); // None, skipped
5326        assert_eq!(*world.resource::<u64>(), 0);
5327        p.run(&mut world, 7); // Some, stores
5328        assert_eq!(*world.resource::<u64>(), 7);
5329        p.run(&mut world, 10);
5330        assert_eq!(*world.resource::<u64>(), 17);
5331    }
5332
5333    #[test]
5334    fn build_option_unit_boxes_into_handler() {
5335        let mut wb = WorldBuilder::new();
5336        wb.register::<u64>(0);
5337        let mut world = wb.build();
5338        let r = world.registry_mut();
5339
5340        fn double(x: u32) -> Option<u64> {
5341            if x > 0 { Some(x as u64 * 2) } else { None }
5342        }
5343        fn store(mut out: ResMut<u64>, val: u64) {
5344            *out += val;
5345        }
5346
5347        let mut h: Box<dyn Handler<u32>> = Box::new(
5348            PipelineBuilder::<u32>::new()
5349                .then(double, r)
5350                .map(store, r)
5351                .build(),
5352        );
5353        h.run(&mut world, 0); // None
5354        assert_eq!(*world.resource::<u64>(), 0);
5355        h.run(&mut world, 5); // 10
5356        assert_eq!(*world.resource::<u64>(), 10);
5357    }
5358
5359    // =========================================================================
5360    // Build — borrowed event type
5361    // =========================================================================
5362
5363    #[test]
5364    fn build_borrowed_event_direct() {
5365        let mut wb = WorldBuilder::new();
5366        wb.register::<u64>(0);
5367        let mut world = wb.build();
5368
5369        fn decode(msg: &[u8]) -> u64 {
5370            msg.len() as u64
5371        }
5372        fn store(mut out: ResMut<u64>, val: u64) {
5373            *out = val;
5374        }
5375
5376        // msg declared before p so it outlives the pipeline (drop order).
5377        // Matches real-world usage: pipeline lives long, events come and go.
5378        let msg = vec![1u8, 2, 3];
5379        let r = world.registry_mut();
5380        let mut p = PipelineBuilder::<&[u8]>::new()
5381            .then(decode, r)
5382            .then(store, r)
5383            .build();
5384
5385        p.run(&mut world, &msg);
5386        assert_eq!(*world.resource::<u64>(), 3);
5387    }
5388
5389    #[test]
5390    fn build_borrowed_event_option_unit() {
5391        let mut wb = WorldBuilder::new();
5392        wb.register::<u64>(0);
5393        let mut world = wb.build();
5394
5395        fn decode(msg: &[u8]) -> Option<u64> {
5396            if msg.is_empty() {
5397                None
5398            } else {
5399                Some(msg.len() as u64)
5400            }
5401        }
5402        fn store(mut out: ResMut<u64>, val: u64) {
5403            *out = val;
5404        }
5405
5406        let empty = vec![];
5407        let data = vec![1u8, 2, 3];
5408        let r = world.registry_mut();
5409        let mut p = PipelineBuilder::<&[u8]>::new()
5410            .then(decode, r)
5411            .map(store, r)
5412            .build();
5413
5414        p.run(&mut world, &empty); // None
5415        assert_eq!(*world.resource::<u64>(), 0);
5416        p.run(&mut world, &data); // Some(3)
5417        assert_eq!(*world.resource::<u64>(), 3);
5418    }
5419}