Skip to main content

nexus_rt/
pipeline.rs

1// Builder return types are necessarily complex — each combinator returns
2// PipelineChain<In, Out, NodeType<Chain, ...>>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4
5//! Pre-resolved pipeline dispatch using [`Param`] steps.
6//!
7//! [`PipelineBuilder`] begins a typed composition chain where each step
8//! is a named function with [`Param`] dependencies resolved at build
9//! time. The result is a monomorphized chain of named node types where dispatch-time
10//! resource access is a single pointer deref per fetch — zero framework overhead.
11//! [`ResourceId`](crate::ResourceId) is a direct pointer, not a HashMap lookup.
12//!
13//! Two dispatch tiers in nexus-rt:
14//! 1. **Pipeline** — static after build, pre-resolved, the workhorse
15//! 2. **Callback** — dynamic registration with per-instance context
16//!
17//! # Step function convention
18//!
19//! Params first, step input last, returns output:
20//!
21//! ```ignore
22//! fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
23//! fn enrich(cache: Res<MarketData>, order: ValidOrder) -> EnrichedOrder { .. }
24//! fn submit(mut gw: ResMut<Gateway>, order: CheckedOrder) { gw.send(order); }
25//! ```
26//!
27//! # Combinator split
28//!
29//! **IntoStep-based (pre-resolved, hot path):**
30//! `.then()`, `.map()`, `.and_then()`, `.catch()`
31//!
32//! **Trait-based (same API for named functions, arity-0 closures, and [`Opaque`] closures):**
33//! `.guard()`, `.filter()`, `.tap()`, `.inspect()`, `.inspect_err()`,
34//! `.on_none()`, `.ok_or_else()`, `.unwrap_or_else()`, `.map_err()`,
35//! `.or_else()`, `.and()`, `.or()`, `.xor()`, `.route()`
36//!
37//! # Combinator quick reference
38//!
39//! **Bare value `T`:** `.then()`, `.tap()`, `.guard()` (→ `Option<T>`),
40//! `.dispatch()`, `.route()`, `.tee()`, `.scan()`, `.dedup()` (→ `Option<T>`)
41//!
42//! **Tuple `(A, B, ...)` (2-5 elements):** `.splat()` (→ splat builder,
43//! call `.then()` with destructured args)
44//!
45//! **`Option<T>`:** `.map()`, `.filter()`, `.inspect()`, `.and_then()`,
46//! `.on_none()`, `.ok_or()` (→ `Result`), `.unwrap_or()` (→ `T`),
47//! `.cloned()` (→ `Option<T>` from `Option<&T>`)
48//!
49//! **`Result<T, E>`:** `.map()`, `.and_then()`, `.catch()` (→ `Option<T>`),
50//! `.map_err()`, `.inspect_err()`, `.ok()` (→ `Option<T>`),
51//! `.unwrap_or()` (→ `T`), `.or_else()`
52//!
53//! **`bool`:** `.not()`, `.and()`, `.or()`, `.xor()`
54//!
55//! **Terminal:** `.build()` (→ `Pipeline`), `.build_batch(cap)`
56//! (→ `BatchPipeline<In>`)
57//!
58//! # Splat — tuple destructuring
59//!
60//! Pipeline steps follow a single-value-in, single-value-out convention.
61//! When a step returns a tuple like `(OrderId, f64)`, the next step
62//! must accept the whole tuple as one argument. `.splat()` destructures
63//! the tuple so the next step receives individual arguments instead:
64//!
65//! ```ignore
66//! // Without splat — next step takes the whole tuple:
67//! fn process(pair: (OrderId, f64)) -> bool { .. }
68//!
69//! // With splat — next step takes individual args:
70//! fn process(id: OrderId, price: f64) -> bool { .. }
71//!
72//! PipelineBuilder::<Order>::new()
73//!     .then(extract, reg)   // Order → (OrderId, f64)
74//!     .splat()              // destructure
75//!     .then(process, reg)   // (OrderId, f64) → bool
76//!     .build();
77//! ```
78//!
79//! Supported for tuples of 2-5 elements. Beyond 5, define a named
80//! struct — if a combinator stage needs that many arguments, a struct
81//! makes the intent clearer and the code more maintainable.
82//!
83//! # Returning pipelines from functions (Rust 2024)
84//!
85//! When a factory function takes `&Registry` and returns `impl Handler<E>`,
86//! Rust 2024 captures the registry borrow in the return type by default.
87//! Use `+ use<...>` to exclude it:
88//!
89//! ```ignore
90//! fn on_order<C: Config>(
91//!     reg: &Registry,
92//! ) -> impl Handler<Order> + use<C> {
93//!     PipelineBuilder::<Order>::new()
94//!         .then(validate::<C>, reg)
95//!         .dispatch(submit::<C>.into_handler(reg))
96//!         .build()
97//! }
98//! ```
99//!
100//! List every type parameter the pipeline captures; omit the `&Registry`
101//! lifetime — it's consumed during `.build()`. See the
102//! [crate-level docs](crate#returning-impl-handler-from-functions-rust-2024)
103//! for the full explanation.
104
105use std::marker::PhantomData;
106
107use crate::Handler;
108use crate::dag::DagArm;
109use crate::handler::{Opaque, Param};
110use crate::world::{Registry, World};
111
112// =============================================================================
113// Step — pre-resolved step with Param state
114// =============================================================================
115
116/// Internal: pre-resolved step with cached Param state.
117///
118/// Users don't construct this directly — it's produced by [`IntoStep`] and
119/// stored inside named chain node types.
120#[doc(hidden)]
121pub struct Step<F, Params: Param> {
122    f: F,
123    state: Params::State,
124    #[allow(dead_code)]
125    name: &'static str,
126}
127
128// =============================================================================
129// StepCall — callable trait for resolved steps
130// =============================================================================
131
132/// Internal: callable trait for resolved steps.
133///
134/// Used as a bound on [`IntoStep::Step`]. Users don't implement this.
135#[doc(hidden)]
136pub trait StepCall<In> {
137    /// The output type of this step.
138    type Out;
139    /// Call this step with a world reference and input value.
140    fn call(&mut self, world: &mut World, input: In) -> Self::Out;
141}
142
143// =============================================================================
144// IntoStep — converts a named function into a resolved step
145// =============================================================================
146
147/// Converts a named function into a pre-resolved pipeline step.
148///
149/// Params first, step input last, returns output. Arity 0 (no
150/// Params) supports closures. Arities 1+ require named functions
151/// (same HRTB+GAT limitation as [`IntoHandler`](crate::IntoHandler)).
152///
153/// # Examples
154///
155/// ```ignore
156/// // Arity 0 — closure works
157/// let step = (|x: u32| x * 2).into_step(registry);
158///
159/// // Arity 1 — named function required
160/// fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
161/// let step = validate.into_step(registry);
162/// ```
163#[diagnostic::on_unimplemented(
164    message = "this function cannot be used as a pipeline step for this input type",
165    note = "if the pipeline output is `Option<T>`, use `.map()` to operate on the inner `T`",
166    note = "if the pipeline output is `Result<T, E>`, use `.map()` for `Ok` or `.catch()` for `Err`",
167    note = "if using a closure with resource params (Res<T>, ResMut<T>), that isn't supported — use a named `fn`"
168)]
169pub trait IntoStep<In, Out, Params> {
170    /// The concrete resolved step type.
171    type Step: StepCall<In, Out = Out>;
172
173    /// Resolve Param state from the registry and produce a step.
174    fn into_step(self, registry: &Registry) -> Self::Step;
175}
176
177// =============================================================================
178// Arity 0 — fn(In) -> Out — closures work (no HRTB+GAT issues)
179// =============================================================================
180
181impl<In, Out, F: FnMut(In) -> Out + 'static> StepCall<In> for Step<F, ()> {
182    type Out = Out;
183    #[inline(always)]
184    fn call(&mut self, _world: &mut World, input: In) -> Out {
185        (self.f)(input)
186    }
187}
188
189impl<In, Out, F: FnMut(In) -> Out + 'static> IntoStep<In, Out, ()> for F {
190    type Step = Step<F, ()>;
191
192    fn into_step(self, registry: &Registry) -> Self::Step {
193        Step {
194            f: self,
195            state: <() as Param>::init(registry),
196            name: std::any::type_name::<F>(),
197        }
198    }
199}
200
201// =============================================================================
202// Arities 1-8 via macro — HRTB with -> Out
203// =============================================================================
204
205macro_rules! impl_into_step {
206    ($($P:ident),+) => {
207        impl<In, Out, F: 'static, $($P: Param + 'static),+>
208            StepCall<In> for Step<F, ($($P,)+)>
209        where
210            for<'a> &'a mut F:
211                FnMut($($P,)+ In) -> Out +
212                FnMut($($P::Item<'a>,)+ In) -> Out,
213        {
214            type Out = Out;
215            #[inline(always)]
216            #[allow(non_snake_case)]
217            fn call(&mut self, world: &mut World, input: In) -> Out {
218                #[allow(clippy::too_many_arguments)]
219                fn call_inner<$($P,)+ Input, Output>(
220                    mut f: impl FnMut($($P,)+ Input) -> Output,
221                    $($P: $P,)+
222                    input: Input,
223                ) -> Output {
224                    f($($P,)+ input)
225                }
226
227                // SAFETY: state was produced by Param::init() on the same Registry
228                // that built this World. Borrows are disjoint — enforced by
229                // conflict detection at build time.
230                #[cfg(debug_assertions)]
231                world.clear_borrows();
232                let ($($P,)+) = unsafe {
233                    <($($P,)+) as Param>::fetch(world, &mut self.state)
234                };
235                call_inner(&mut self.f, $($P,)+ input)
236            }
237        }
238
239        impl<In, Out, F: 'static, $($P: Param + 'static),+>
240            IntoStep<In, Out, ($($P,)+)> for F
241        where
242            for<'a> &'a mut F:
243                FnMut($($P,)+ In) -> Out +
244                FnMut($($P::Item<'a>,)+ In) -> Out,
245        {
246            type Step = Step<F, ($($P,)+)>;
247
248            fn into_step(self, registry: &Registry) -> Self::Step {
249                let state = <($($P,)+) as Param>::init(registry);
250                {
251                    #[allow(non_snake_case)]
252                    let ($($P,)+) = &state;
253                    registry.check_access(&[
254                        $(
255                            (<$P as Param>::resource_id($P),
256                             std::any::type_name::<$P>()),
257                        )+
258                    ]);
259                }
260                Step { f: self, state, name: std::any::type_name::<F>() }
261            }
262        }
263    };
264}
265
266all_tuples!(impl_into_step);
267
268// =============================================================================
269// OpaqueStep — wrapper for opaque closures as steps
270// =============================================================================
271
272/// Internal: wrapper for opaque closures used as pipeline steps.
273///
274/// Unlike [`Step<F, P>`] which stores resolved `Param::State`, this
275/// holds only the function — no state to resolve.
276#[doc(hidden)]
277pub struct OpaqueStep<F> {
278    f: F,
279    #[allow(dead_code)]
280    name: &'static str,
281}
282
283impl<In, Out, F: FnMut(&mut World, In) -> Out + 'static> StepCall<In> for OpaqueStep<F> {
284    type Out = Out;
285    #[inline(always)]
286    fn call(&mut self, world: &mut World, input: In) -> Out {
287        (self.f)(world, input)
288    }
289}
290
291impl<In, Out, F: FnMut(&mut World, In) -> Out + 'static> IntoStep<In, Out, Opaque> for F {
292    type Step = OpaqueStep<F>;
293
294    fn into_step(self, _registry: &Registry) -> Self::Step {
295        OpaqueStep {
296            f: self,
297            name: std::any::type_name::<F>(),
298        }
299    }
300}
301
302// =============================================================================
303// RefStepCall / IntoRefStep — step taking &In, returning Out
304// =============================================================================
305
306/// Internal: callable trait for resolved steps taking input by reference.
307///
308/// Used by combinators like `tap`, `guard`, `filter`, `inspect` that
309/// observe the value without consuming it.
310#[doc(hidden)]
311pub trait RefStepCall<In> {
312    /// The output type of this step.
313    type Out;
314    /// Call this step with a world reference and borrowed input.
315    fn call(&mut self, world: &mut World, input: &In) -> Self::Out;
316}
317
318/// Converts a function into a pre-resolved step taking input by reference.
319///
320/// Same three-tier resolution as [`IntoStep`]:
321///
322/// | Params | Function shape | Example |
323/// |--------|---------------|---------|
324/// | `()` | `FnMut(&In) -> Out` | `\|o: &Order\| o.price > 0.0` |
325/// | `(P0,)...(P0..P7,)` | `fn(Params..., &In) -> Out` | `fn check(c: Res<Config>, o: &Order) -> bool` |
326/// | [`Opaque`] | `FnMut(&mut World, &In) -> Out` | `\|w: &mut World, o: &Order\| { ... }` |
327#[diagnostic::on_unimplemented(
328    message = "this function cannot be used as a reference step for this input type",
329    note = "reference steps (guard, filter, tap, inspect) take `&In`, not `In`",
330    note = "if the pipeline output is `Option<T>`, `.filter()` operates on `&T` inside the Option",
331    note = "closures with resource parameters are not supported — use a named `fn`"
332)]
333pub trait IntoRefStep<In, Out, Params> {
334    /// The concrete resolved step type.
335    type Step: RefStepCall<In, Out = Out>;
336
337    /// Resolve Param state from the registry and produce a step.
338    fn into_ref_step(self, registry: &Registry) -> Self::Step;
339}
340
341// -- Arity 0: FnMut(&In) -> Out — closures work ----------------------------
342
343impl<In, Out, F: FnMut(&In) -> Out + 'static> RefStepCall<In> for Step<F, ()> {
344    type Out = Out;
345    #[inline(always)]
346    fn call(&mut self, _world: &mut World, input: &In) -> Out {
347        (self.f)(input)
348    }
349}
350
351impl<In, Out, F: FnMut(&In) -> Out + 'static> IntoRefStep<In, Out, ()> for F {
352    type Step = Step<F, ()>;
353
354    fn into_ref_step(self, registry: &Registry) -> Self::Step {
355        Step {
356            f: self,
357            state: <() as Param>::init(registry),
358            name: std::any::type_name::<F>(),
359        }
360    }
361}
362
363// -- Arities 1-8: named functions with Param resolution ---------------------
364
365macro_rules! impl_into_ref_step {
366    ($($P:ident),+) => {
367        impl<In, Out, F: 'static, $($P: Param + 'static),+>
368            RefStepCall<In> for Step<F, ($($P,)+)>
369        where
370            for<'a> &'a mut F:
371                FnMut($($P,)+ &In) -> Out +
372                FnMut($($P::Item<'a>,)+ &In) -> Out,
373        {
374            type Out = Out;
375            #[inline(always)]
376            #[allow(non_snake_case)]
377            fn call(&mut self, world: &mut World, input: &In) -> Out {
378                #[allow(clippy::too_many_arguments)]
379                fn call_inner<$($P,)+ Input: ?Sized, Output>(
380                    mut f: impl FnMut($($P,)+ &Input) -> Output,
381                    $($P: $P,)+
382                    input: &Input,
383                ) -> Output {
384                    f($($P,)+ input)
385                }
386
387                // SAFETY: state was produced by Param::init() on the same Registry
388                // that built this World. Borrows are disjoint — enforced by
389                // conflict detection at build time.
390                #[cfg(debug_assertions)]
391                world.clear_borrows();
392                let ($($P,)+) = unsafe {
393                    <($($P,)+) as Param>::fetch(world, &mut self.state)
394                };
395                call_inner(&mut self.f, $($P,)+ input)
396            }
397        }
398
399        impl<In, Out, F: 'static, $($P: Param + 'static),+>
400            IntoRefStep<In, Out, ($($P,)+)> for F
401        where
402            for<'a> &'a mut F:
403                FnMut($($P,)+ &In) -> Out +
404                FnMut($($P::Item<'a>,)+ &In) -> Out,
405        {
406            type Step = Step<F, ($($P,)+)>;
407
408            fn into_ref_step(self, registry: &Registry) -> Self::Step {
409                let state = <($($P,)+) as Param>::init(registry);
410                {
411                    #[allow(non_snake_case)]
412                    let ($($P,)+) = &state;
413                    registry.check_access(&[
414                        $(
415                            (<$P as Param>::resource_id($P),
416                             std::any::type_name::<$P>()),
417                        )+
418                    ]);
419                }
420                Step { f: self, state, name: std::any::type_name::<F>() }
421            }
422        }
423    };
424}
425
426all_tuples!(impl_into_ref_step);
427
428// -- Opaque: FnMut(&mut World, &In) -> Out ---------------------------------
429
430/// Internal: wrapper for opaque closures taking input by reference.
431#[doc(hidden)]
432pub struct OpaqueRefStep<F> {
433    f: F,
434    #[allow(dead_code)]
435    name: &'static str,
436}
437
438impl<In, Out, F: FnMut(&mut World, &In) -> Out + 'static> RefStepCall<In> for OpaqueRefStep<F> {
439    type Out = Out;
440    #[inline(always)]
441    fn call(&mut self, world: &mut World, input: &In) -> Out {
442        (self.f)(world, input)
443    }
444}
445
446impl<In, Out, F: FnMut(&mut World, &In) -> Out + 'static> IntoRefStep<In, Out, Opaque> for F {
447    type Step = OpaqueRefStep<F>;
448
449    fn into_ref_step(self, _registry: &Registry) -> Self::Step {
450        OpaqueRefStep {
451            f: self,
452            name: std::any::type_name::<F>(),
453        }
454    }
455}
456
457// =============================================================================
458// resolve_ref_step — pre-resolve a ref step for manual dispatch
459// =============================================================================
460
461/// Resolve a reference step for manual dispatch.
462///
463/// Returns a closure with pre-resolved [`Param`] state. Reference-input
464/// counterpart of [`resolve_step`].
465pub fn resolve_ref_step<In, Out, Params, S: IntoRefStep<In, Out, Params>>(
466    f: S,
467    registry: &Registry,
468) -> impl FnMut(&mut World, &In) -> Out + use<In, Out, Params, S> {
469    let mut resolved = f.into_ref_step(registry);
470    move |world: &mut World, input: &In| resolved.call(world, input)
471}
472
473// =============================================================================
474// ProducerCall / IntoProducer — step producing a value with no pipeline input
475// =============================================================================
476
477/// Internal: callable trait for resolved steps that produce a value
478/// without receiving pipeline input.
479///
480/// Used by combinators like `and`, `or`, `xor`, `on_none`, `ok_or_else`,
481/// `unwrap_or_else` (Option).
482#[doc(hidden)]
483pub trait ProducerCall {
484    /// The output type of this producer.
485    type Out;
486    /// Call this producer with a world reference.
487    fn call(&mut self, world: &mut World) -> Self::Out;
488}
489
490/// Converts a function into a pre-resolved producer step.
491///
492/// Same three-tier resolution as [`IntoStep`]:
493///
494/// | Params | Function shape | Example |
495/// |--------|---------------|---------|
496/// | `()` | `FnMut() -> Out` | `\|\| true` |
497/// | `(P0,)...(P0..P7,)` | `fn(Params...) -> Out` | `fn is_active(s: Res<State>) -> bool` |
498/// | [`Opaque`] | `FnMut(&mut World) -> Out` | `\|w: &mut World\| { ... }` |
499#[diagnostic::on_unimplemented(
500    message = "this function cannot be used as a producer for this output type",
501    note = "producers take no pipeline input — they produce a value (e.g., default, fallback)",
502    note = "closures with resource parameters are not supported — use a named `fn`"
503)]
504pub trait IntoProducer<Out, Params> {
505    /// The concrete resolved producer type.
506    type Step: ProducerCall<Out = Out>;
507
508    /// Resolve Param state from the registry and produce a step.
509    fn into_producer(self, registry: &Registry) -> Self::Step;
510}
511
512// -- Arity 0: FnMut() -> Out — closures work --------------------------------
513
514impl<Out, F: FnMut() -> Out + 'static> ProducerCall for Step<F, ()> {
515    type Out = Out;
516    #[inline(always)]
517    fn call(&mut self, _world: &mut World) -> Out {
518        (self.f)()
519    }
520}
521
522impl<Out, F: FnMut() -> Out + 'static> IntoProducer<Out, ()> for F {
523    type Step = Step<F, ()>;
524
525    fn into_producer(self, registry: &Registry) -> Self::Step {
526        Step {
527            f: self,
528            state: <() as Param>::init(registry),
529            name: std::any::type_name::<F>(),
530        }
531    }
532}
533
534// -- Arities 1-8: named functions with Param resolution ---------------------
535
536macro_rules! impl_into_producer {
537    ($($P:ident),+) => {
538        impl<Out, F: 'static, $($P: Param + 'static),+>
539            ProducerCall for Step<F, ($($P,)+)>
540        where
541            for<'a> &'a mut F:
542                FnMut($($P,)+) -> Out +
543                FnMut($($P::Item<'a>,)+) -> Out,
544        {
545            type Out = Out;
546            #[inline(always)]
547            #[allow(non_snake_case)]
548            fn call(&mut self, world: &mut World) -> Out {
549                #[allow(clippy::too_many_arguments)]
550                fn call_inner<$($P,)+ Output>(
551                    mut f: impl FnMut($($P,)+) -> Output,
552                    $($P: $P,)+
553                ) -> Output {
554                    f($($P,)+)
555                }
556
557                // SAFETY: state was produced by Param::init() on the same Registry
558                // that built this World. Borrows are disjoint — enforced by
559                // conflict detection at build time.
560                #[cfg(debug_assertions)]
561                world.clear_borrows();
562                let ($($P,)+) = unsafe {
563                    <($($P,)+) as Param>::fetch(world, &mut self.state)
564                };
565                call_inner(&mut self.f, $($P,)+)
566            }
567        }
568
569        impl<Out, F: 'static, $($P: Param + 'static),+>
570            IntoProducer<Out, ($($P,)+)> for F
571        where
572            for<'a> &'a mut F:
573                FnMut($($P,)+) -> Out +
574                FnMut($($P::Item<'a>,)+) -> Out,
575        {
576            type Step = Step<F, ($($P,)+)>;
577
578            fn into_producer(self, registry: &Registry) -> Self::Step {
579                let state = <($($P,)+) as Param>::init(registry);
580                {
581                    #[allow(non_snake_case)]
582                    let ($($P,)+) = &state;
583                    registry.check_access(&[
584                        $(
585                            (<$P as Param>::resource_id($P),
586                             std::any::type_name::<$P>()),
587                        )+
588                    ]);
589                }
590                Step { f: self, state, name: std::any::type_name::<F>() }
591            }
592        }
593    };
594}
595
596all_tuples!(impl_into_producer);
597
598// -- Opaque: FnMut(&mut World) -> Out ---------------------------------------
599
600/// Internal: wrapper for opaque closures used as producers.
601#[doc(hidden)]
602pub struct OpaqueProducer<F> {
603    f: F,
604    #[allow(dead_code)]
605    name: &'static str,
606}
607
608impl<Out, F: FnMut(&mut World) -> Out + 'static> ProducerCall for OpaqueProducer<F> {
609    type Out = Out;
610    #[inline(always)]
611    fn call(&mut self, world: &mut World) -> Out {
612        (self.f)(world)
613    }
614}
615
616impl<Out, F: FnMut(&mut World) -> Out + 'static> IntoProducer<Out, Opaque> for F {
617    type Step = OpaqueProducer<F>;
618
619    fn into_producer(self, _registry: &Registry) -> Self::Step {
620        OpaqueProducer {
621            f: self,
622            name: std::any::type_name::<F>(),
623        }
624    }
625}
626
627// =============================================================================
628// resolve_producer — pre-resolve a producer for manual dispatch
629// =============================================================================
630
631/// Resolve a producer for manual dispatch.
632///
633/// Returns a closure with pre-resolved [`Param`] state. No-input
634/// counterpart of [`resolve_step`].
635pub fn resolve_producer<Out, Params, S: IntoProducer<Out, Params>>(
636    f: S,
637    registry: &Registry,
638) -> impl FnMut(&mut World) -> Out + use<Out, Params, S> {
639    let mut resolved = f.into_producer(registry);
640    move |world: &mut World| resolved.call(world)
641}
642
643// =============================================================================
644// ScanStepCall / IntoScanStep — step with persistent accumulator
645// =============================================================================
646
647/// Internal: callable trait for resolved scan steps.
648///
649/// Like [`StepCall`] but with an additional `&mut Acc` accumulator
650/// argument that persists across invocations.
651#[doc(hidden)]
652pub trait ScanStepCall<Acc, In> {
653    /// The output type of this scan step.
654    type Out;
655    /// Call this scan step with a world reference, accumulator, and input value.
656    fn call(&mut self, world: &mut World, acc: &mut Acc, input: In) -> Self::Out;
657}
658
659/// Converts a function into a pre-resolved scan step with persistent state.
660///
661/// Same three-tier resolution as [`IntoStep`]:
662///
663/// | Params | Function shape | Example |
664/// |--------|---------------|---------|
665/// | `()` | `FnMut(&mut Acc, In) -> Out` | `\|acc, trade\| { *acc += trade.amount; Some(*acc) }` |
666/// | `(P0,)...(P0..P7,)` | `fn(Params..., &mut Acc, In) -> Out` | `fn vwap(c: Res<Config>, acc: &mut State, t: Trade) -> Option<f64>` |
667/// | [`Opaque`] | `FnMut(&mut World, &mut Acc, In) -> Out` | `\|w: &mut World, acc: &mut u64, t: Trade\| { ... }` |
668#[diagnostic::on_unimplemented(
669    message = "this function cannot be used as a scan step",
670    note = "scan steps take `&mut Accumulator` as first param, then resources, then input last",
671    note = "closures with resource parameters are not supported — use a named `fn`"
672)]
673pub trait IntoScanStep<Acc, In, Out, Params> {
674    /// The concrete resolved scan step type.
675    type Step: ScanStepCall<Acc, In, Out = Out>;
676
677    /// Resolve Param state from the registry and produce a scan step.
678    fn into_scan_step(self, registry: &Registry) -> Self::Step;
679}
680
681// -- Arity 0: FnMut(&mut Acc, In) -> Out — closures work --------------------
682
683impl<Acc, In, Out, F: FnMut(&mut Acc, In) -> Out + 'static> ScanStepCall<Acc, In> for Step<F, ()> {
684    type Out = Out;
685    #[inline(always)]
686    fn call(&mut self, _world: &mut World, acc: &mut Acc, input: In) -> Out {
687        (self.f)(acc, input)
688    }
689}
690
691impl<Acc, In, Out, F: FnMut(&mut Acc, In) -> Out + 'static> IntoScanStep<Acc, In, Out, ()> for F {
692    type Step = Step<F, ()>;
693
694    fn into_scan_step(self, registry: &Registry) -> Self::Step {
695        Step {
696            f: self,
697            state: <() as Param>::init(registry),
698            name: std::any::type_name::<F>(),
699        }
700    }
701}
702
703// -- Arities 1-8: named functions with Param resolution ----------------------
704
705macro_rules! impl_into_scan_step {
706    ($($P:ident),+) => {
707        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
708            ScanStepCall<Acc, In> for Step<F, ($($P,)+)>
709        where
710            for<'a> &'a mut F:
711                FnMut($($P,)+ &mut Acc, In) -> Out +
712                FnMut($($P::Item<'a>,)+ &mut Acc, In) -> Out,
713        {
714            type Out = Out;
715            #[inline(always)]
716            #[allow(non_snake_case)]
717            fn call(&mut self, world: &mut World, acc: &mut Acc, input: In) -> Out {
718                #[allow(clippy::too_many_arguments)]
719                fn call_inner<$($P,)+ Accumulator, Input, Output>(
720                    mut f: impl FnMut($($P,)+ &mut Accumulator, Input) -> Output,
721                    $($P: $P,)+
722                    acc: &mut Accumulator,
723                    input: Input,
724                ) -> Output {
725                    f($($P,)+ acc, input)
726                }
727
728                // SAFETY: state was produced by Param::init() on the same Registry
729                // that built this World. Borrows are disjoint — enforced by
730                // conflict detection at build time.
731                #[cfg(debug_assertions)]
732                world.clear_borrows();
733                let ($($P,)+) = unsafe {
734                    <($($P,)+) as Param>::fetch(world, &mut self.state)
735                };
736                call_inner(&mut self.f, $($P,)+ acc, input)
737            }
738        }
739
740        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
741            IntoScanStep<Acc, In, Out, ($($P,)+)> for F
742        where
743            for<'a> &'a mut F:
744                FnMut($($P,)+ &mut Acc, In) -> Out +
745                FnMut($($P::Item<'a>,)+ &mut Acc, In) -> Out,
746        {
747            type Step = Step<F, ($($P,)+)>;
748
749            fn into_scan_step(self, registry: &Registry) -> Self::Step {
750                let state = <($($P,)+) as Param>::init(registry);
751                {
752                    #[allow(non_snake_case)]
753                    let ($($P,)+) = &state;
754                    registry.check_access(&[
755                        $(
756                            (<$P as Param>::resource_id($P),
757                             std::any::type_name::<$P>()),
758                        )+
759                    ]);
760                }
761                Step { f: self, state, name: std::any::type_name::<F>() }
762            }
763        }
764    };
765}
766
767all_tuples!(impl_into_scan_step);
768
769// -- Opaque: FnMut(&mut World, &mut Acc, In) -> Out --------------------------
770
771/// Internal: wrapper for opaque closures used as scan steps.
772#[doc(hidden)]
773pub struct OpaqueScanStep<F> {
774    f: F,
775    #[allow(dead_code)]
776    name: &'static str,
777}
778
779impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, In) -> Out + 'static> ScanStepCall<Acc, In>
780    for OpaqueScanStep<F>
781{
782    type Out = Out;
783    #[inline(always)]
784    fn call(&mut self, world: &mut World, acc: &mut Acc, input: In) -> Out {
785        (self.f)(world, acc, input)
786    }
787}
788
789impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, In) -> Out + 'static>
790    IntoScanStep<Acc, In, Out, Opaque> for F
791{
792    type Step = OpaqueScanStep<F>;
793
794    fn into_scan_step(self, _registry: &Registry) -> Self::Step {
795        OpaqueScanStep {
796            f: self,
797            name: std::any::type_name::<F>(),
798        }
799    }
800}
801
802// =============================================================================
803// resolve_scan_step — pre-resolve a scan step for manual dispatch
804// =============================================================================
805
806/// Resolve a scan step for manual dispatch.
807///
808/// Returns a closure with pre-resolved [`Param`] state. Scan variant
809/// of [`resolve_step`] with an additional `&mut Acc` accumulator.
810pub fn resolve_scan_step<Acc, In, Out, Params, S: IntoScanStep<Acc, In, Out, Params>>(
811    f: S,
812    registry: &Registry,
813) -> impl FnMut(&mut World, &mut Acc, In) -> Out + use<Acc, In, Out, Params, S> {
814    let mut resolved = f.into_scan_step(registry);
815    move |world: &mut World, acc: &mut Acc, input: In| resolved.call(world, acc, input)
816}
817
818// =============================================================================
819// RefScanStepCall / IntoRefScanStep — scan step taking &In
820// =============================================================================
821
822/// Internal: callable trait for resolved scan steps taking input by reference.
823///
824/// DAG variant of [`ScanStepCall`] — each step borrows its input.
825#[doc(hidden)]
826pub trait RefScanStepCall<Acc, In> {
827    /// The output type of this ref-scan step.
828    type Out;
829    /// Call this scan step with a world reference, accumulator, and borrowed input.
830    fn call(&mut self, world: &mut World, acc: &mut Acc, input: &In) -> Self::Out;
831}
832
833/// Converts a function into a pre-resolved ref-scan step with persistent state.
834///
835/// Same three-tier resolution as [`IntoRefStep`]:
836///
837/// | Params | Function shape | Example |
838/// |--------|---------------|---------|
839/// | `()` | `FnMut(&mut Acc, &In) -> Out` | `\|acc, trade: &Trade\| { ... }` |
840/// | `(P0,)...(P0..P7,)` | `fn(Params..., &mut Acc, &In) -> Out` | `fn vwap(c: Res<Config>, acc: &mut State, t: &Trade) -> Option<f64>` |
841/// | [`Opaque`] | `FnMut(&mut World, &mut Acc, &In) -> Out` | `\|w: &mut World, acc: &mut u64, t: &Trade\| { ... }` |
842#[diagnostic::on_unimplemented(
843    message = "this function cannot be used as a reference scan step",
844    note = "reference scan steps take `&mut Accumulator` as first param, then resources, then `&In` last",
845    note = "closures with resource parameters are not supported — use a named `fn`"
846)]
847pub trait IntoRefScanStep<Acc, In, Out, Params> {
848    /// The concrete resolved ref-scan step type.
849    type Step: RefScanStepCall<Acc, In, Out = Out>;
850
851    /// Resolve Param state from the registry and produce a ref-scan step.
852    fn into_ref_scan_step(self, registry: &Registry) -> Self::Step;
853}
854
855// -- Arity 0: FnMut(&mut Acc, &In) -> Out — closures work -------------------
856
857impl<Acc, In, Out, F: FnMut(&mut Acc, &In) -> Out + 'static> RefScanStepCall<Acc, In>
858    for Step<F, ()>
859{
860    type Out = Out;
861    #[inline(always)]
862    fn call(&mut self, _world: &mut World, acc: &mut Acc, input: &In) -> Out {
863        (self.f)(acc, input)
864    }
865}
866
867impl<Acc, In, Out, F: FnMut(&mut Acc, &In) -> Out + 'static> IntoRefScanStep<Acc, In, Out, ()>
868    for F
869{
870    type Step = Step<F, ()>;
871
872    fn into_ref_scan_step(self, registry: &Registry) -> Self::Step {
873        Step {
874            f: self,
875            state: <() as Param>::init(registry),
876            name: std::any::type_name::<F>(),
877        }
878    }
879}
880
881// -- Arities 1-8: named functions with Param resolution ----------------------
882
883macro_rules! impl_into_ref_scan_step {
884    ($($P:ident),+) => {
885        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
886            RefScanStepCall<Acc, In> for Step<F, ($($P,)+)>
887        where
888            for<'a> &'a mut F:
889                FnMut($($P,)+ &mut Acc, &In) -> Out +
890                FnMut($($P::Item<'a>,)+ &mut Acc, &In) -> Out,
891        {
892            type Out = Out;
893            #[inline(always)]
894            #[allow(non_snake_case)]
895            fn call(&mut self, world: &mut World, acc: &mut Acc, input: &In) -> Out {
896                #[allow(clippy::too_many_arguments)]
897                fn call_inner<$($P,)+ Accumulator, Input: ?Sized, Output>(
898                    mut f: impl FnMut($($P,)+ &mut Accumulator, &Input) -> Output,
899                    $($P: $P,)+
900                    acc: &mut Accumulator,
901                    input: &Input,
902                ) -> Output {
903                    f($($P,)+ acc, input)
904                }
905
906                // SAFETY: state was produced by Param::init() on the same Registry
907                // that built this World. Borrows are disjoint — enforced by
908                // conflict detection at build time.
909                #[cfg(debug_assertions)]
910                world.clear_borrows();
911                let ($($P,)+) = unsafe {
912                    <($($P,)+) as Param>::fetch(world, &mut self.state)
913                };
914                call_inner(&mut self.f, $($P,)+ acc, input)
915            }
916        }
917
918        impl<Acc, In, Out, F: 'static, $($P: Param + 'static),+>
919            IntoRefScanStep<Acc, In, Out, ($($P,)+)> for F
920        where
921            for<'a> &'a mut F:
922                FnMut($($P,)+ &mut Acc, &In) -> Out +
923                FnMut($($P::Item<'a>,)+ &mut Acc, &In) -> Out,
924        {
925            type Step = Step<F, ($($P,)+)>;
926
927            fn into_ref_scan_step(self, registry: &Registry) -> Self::Step {
928                let state = <($($P,)+) as Param>::init(registry);
929                {
930                    #[allow(non_snake_case)]
931                    let ($($P,)+) = &state;
932                    registry.check_access(&[
933                        $(
934                            (<$P as Param>::resource_id($P),
935                             std::any::type_name::<$P>()),
936                        )+
937                    ]);
938                }
939                Step { f: self, state, name: std::any::type_name::<F>() }
940            }
941        }
942    };
943}
944
945all_tuples!(impl_into_ref_scan_step);
946
947// -- Opaque: FnMut(&mut World, &mut Acc, &In) -> Out ------------------------
948
949/// Internal: wrapper for opaque closures used as ref-scan steps.
950#[doc(hidden)]
951pub struct OpaqueRefScanStep<F> {
952    f: F,
953    #[allow(dead_code)]
954    name: &'static str,
955}
956
957impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, &In) -> Out + 'static> RefScanStepCall<Acc, In>
958    for OpaqueRefScanStep<F>
959{
960    type Out = Out;
961    #[inline(always)]
962    fn call(&mut self, world: &mut World, acc: &mut Acc, input: &In) -> Out {
963        (self.f)(world, acc, input)
964    }
965}
966
967impl<Acc, In, Out, F: FnMut(&mut World, &mut Acc, &In) -> Out + 'static>
968    IntoRefScanStep<Acc, In, Out, Opaque> for F
969{
970    type Step = OpaqueRefScanStep<F>;
971
972    fn into_ref_scan_step(self, _registry: &Registry) -> Self::Step {
973        OpaqueRefScanStep {
974            f: self,
975            name: std::any::type_name::<F>(),
976        }
977    }
978}
979
980// =============================================================================
981// resolve_ref_scan_step — pre-resolve a ref-scan step for manual dispatch
982// =============================================================================
983
984/// Resolve a ref-scan step for manual dispatch.
985///
986/// Returns a closure with pre-resolved [`Param`] state. Reference-input
987/// counterpart of [`resolve_scan_step`].
988pub fn resolve_ref_scan_step<Acc, In, Out, Params, S: IntoRefScanStep<Acc, In, Out, Params>>(
989    f: S,
990    registry: &Registry,
991) -> impl FnMut(&mut World, &mut Acc, &In) -> Out + use<Acc, In, Out, Params, S> {
992    let mut resolved = f.into_ref_scan_step(registry);
993    move |world: &mut World, acc: &mut Acc, input: &In| resolved.call(world, acc, input)
994}
995
996// =============================================================================
997// SplatCall / IntoSplatStep — splat step dispatch (tuple destructuring)
998// =============================================================================
999//
1000// Splat traits mirror StepCall/IntoStep but accept multiple owned values
1001// instead of a single input. This lets `.splat()` destructure a tuple
1002// output into individual function arguments for the next step.
1003//
1004// One trait pair per arity (2-5). Past 5, use a named struct.
1005
1006// -- Splat 2 ------------------------------------------------------------------
1007
1008/// Internal: callable trait for resolved 2-splat steps.
1009#[doc(hidden)]
1010pub trait SplatCall2<A, B> {
1011    /// Output type of this splat step.
1012    type Out;
1013    fn call_splat(&mut self, world: &mut World, a: A, b: B) -> Self::Out;
1014}
1015
1016/// Converts a named function into a resolved 2-splat step.
1017#[doc(hidden)]
1018pub trait IntoSplatStep2<A, B, Out, Params> {
1019    type Step: SplatCall2<A, B, Out = Out>;
1020    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1021}
1022
1023impl<A, B, Out, F: FnMut(A, B) -> Out + 'static> SplatCall2<A, B> for Step<F, ()> {
1024    type Out = Out;
1025    #[inline(always)]
1026    fn call_splat(&mut self, _world: &mut World, a: A, b: B) -> Out {
1027        (self.f)(a, b)
1028    }
1029}
1030
1031impl<A, B, Out, F: FnMut(A, B) -> Out + 'static> IntoSplatStep2<A, B, Out, ()> for F {
1032    type Step = Step<F, ()>;
1033    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1034        Step {
1035            f: self,
1036            state: <() as Param>::init(registry),
1037            name: std::any::type_name::<F>(),
1038        }
1039    }
1040}
1041
1042macro_rules! impl_splat2_step {
1043    ($($P:ident),+) => {
1044        impl<A, B, Out, F: 'static, $($P: Param + 'static),+>
1045            SplatCall2<A, B> for Step<F, ($($P,)+)>
1046        where
1047            for<'a> &'a mut F:
1048                FnMut($($P,)+ A, B) -> Out +
1049                FnMut($($P::Item<'a>,)+ A, B) -> Out,
1050        {
1051            type Out = Out;
1052            #[inline(always)]
1053            #[allow(non_snake_case)]
1054            fn call_splat(&mut self, world: &mut World, a: A, b: B) -> Out {
1055                #[allow(clippy::too_many_arguments)]
1056                fn call_inner<$($P,)+ IA, IB, Output>(
1057                    mut f: impl FnMut($($P,)+ IA, IB) -> Output,
1058                    $($P: $P,)+
1059                    a: IA, b: IB,
1060                ) -> Output {
1061                    f($($P,)+ a, b)
1062                }
1063                // SAFETY: state was produced by Param::init() on the same Registry
1064                // that built this World. Borrows are disjoint — enforced by
1065                // conflict detection at build time.
1066                #[cfg(debug_assertions)]
1067                world.clear_borrows();
1068                let ($($P,)+) = unsafe {
1069                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1070                };
1071                call_inner(&mut self.f, $($P,)+ a, b)
1072            }
1073        }
1074
1075        impl<A, B, Out, F: 'static, $($P: Param + 'static),+>
1076            IntoSplatStep2<A, B, Out, ($($P,)+)> for F
1077        where
1078            for<'a> &'a mut F:
1079                FnMut($($P,)+ A, B) -> Out +
1080                FnMut($($P::Item<'a>,)+ A, B) -> Out,
1081        {
1082            type Step = Step<F, ($($P,)+)>;
1083
1084            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1085                let state = <($($P,)+) as Param>::init(registry);
1086                {
1087                    #[allow(non_snake_case)]
1088                    let ($($P,)+) = &state;
1089                    registry.check_access(&[
1090                        $(
1091                            (<$P as Param>::resource_id($P),
1092                             std::any::type_name::<$P>()),
1093                        )+
1094                    ]);
1095                }
1096                Step { f: self, state, name: std::any::type_name::<F>() }
1097            }
1098        }
1099    };
1100}
1101
1102// -- Splat 3 ------------------------------------------------------------------
1103
1104/// Internal: callable trait for resolved 3-splat steps.
1105#[doc(hidden)]
1106pub trait SplatCall3<A, B, C> {
1107    /// Output type of this splat step.
1108    type Out;
1109    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C) -> Self::Out;
1110}
1111
1112/// Converts a named function into a resolved 3-splat step.
1113#[doc(hidden)]
1114pub trait IntoSplatStep3<A, B, C, Out, Params> {
1115    type Step: SplatCall3<A, B, C, Out = Out>;
1116    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1117}
1118
1119impl<A, B, C, Out, F: FnMut(A, B, C) -> Out + 'static> SplatCall3<A, B, C> for Step<F, ()> {
1120    type Out = Out;
1121    #[inline(always)]
1122    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C) -> Out {
1123        (self.f)(a, b, c)
1124    }
1125}
1126
1127impl<A, B, C, Out, F: FnMut(A, B, C) -> Out + 'static> IntoSplatStep3<A, B, C, Out, ()> for F {
1128    type Step = Step<F, ()>;
1129    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1130        Step {
1131            f: self,
1132            state: <() as Param>::init(registry),
1133            name: std::any::type_name::<F>(),
1134        }
1135    }
1136}
1137
1138macro_rules! impl_splat3_step {
1139    ($($P:ident),+) => {
1140        impl<A, B, C, Out, F: 'static, $($P: Param + 'static),+>
1141            SplatCall3<A, B, C> for Step<F, ($($P,)+)>
1142        where
1143            for<'a> &'a mut F:
1144                FnMut($($P,)+ A, B, C) -> Out +
1145                FnMut($($P::Item<'a>,)+ A, B, C) -> Out,
1146        {
1147            type Out = Out;
1148            #[inline(always)]
1149            #[allow(non_snake_case)]
1150            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C) -> Out {
1151                #[allow(clippy::too_many_arguments)]
1152                fn call_inner<$($P,)+ IA, IB, IC, Output>(
1153                    mut f: impl FnMut($($P,)+ IA, IB, IC) -> Output,
1154                    $($P: $P,)+
1155                    a: IA, b: IB, c: IC,
1156                ) -> Output {
1157                    f($($P,)+ a, b, c)
1158                }
1159                // SAFETY: state was produced by Param::init() on the same Registry
1160                // that built this World. Borrows are disjoint — enforced by
1161                // conflict detection at build time.
1162                #[cfg(debug_assertions)]
1163                world.clear_borrows();
1164                let ($($P,)+) = unsafe {
1165                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1166                };
1167                call_inner(&mut self.f, $($P,)+ a, b, c)
1168            }
1169        }
1170
1171        impl<A, B, C, Out, F: 'static, $($P: Param + 'static),+>
1172            IntoSplatStep3<A, B, C, Out, ($($P,)+)> for F
1173        where
1174            for<'a> &'a mut F:
1175                FnMut($($P,)+ A, B, C) -> Out +
1176                FnMut($($P::Item<'a>,)+ A, B, C) -> Out,
1177        {
1178            type Step = Step<F, ($($P,)+)>;
1179
1180            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1181                let state = <($($P,)+) as Param>::init(registry);
1182                {
1183                    #[allow(non_snake_case)]
1184                    let ($($P,)+) = &state;
1185                    registry.check_access(&[
1186                        $(
1187                            (<$P as Param>::resource_id($P),
1188                             std::any::type_name::<$P>()),
1189                        )+
1190                    ]);
1191                }
1192                Step { f: self, state, name: std::any::type_name::<F>() }
1193            }
1194        }
1195    };
1196}
1197
1198// -- Splat 4 ------------------------------------------------------------------
1199
1200/// Internal: callable trait for resolved 4-splat steps.
1201#[doc(hidden)]
1202pub trait SplatCall4<A, B, C, D> {
1203    /// Output type of this splat step.
1204    type Out;
1205    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D) -> Self::Out;
1206}
1207
1208/// Converts a named function into a resolved 4-splat step.
1209#[doc(hidden)]
1210pub trait IntoSplatStep4<A, B, C, D, Out, Params> {
1211    type Step: SplatCall4<A, B, C, D, Out = Out>;
1212    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1213}
1214
1215impl<A, B, C, D, Out, F: FnMut(A, B, C, D) -> Out + 'static> SplatCall4<A, B, C, D>
1216    for Step<F, ()>
1217{
1218    type Out = Out;
1219    #[inline(always)]
1220    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C, d: D) -> Out {
1221        (self.f)(a, b, c, d)
1222    }
1223}
1224
1225impl<A, B, C, D, Out, F: FnMut(A, B, C, D) -> Out + 'static> IntoSplatStep4<A, B, C, D, Out, ()>
1226    for F
1227{
1228    type Step = Step<F, ()>;
1229    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1230        Step {
1231            f: self,
1232            state: <() as Param>::init(registry),
1233            name: std::any::type_name::<F>(),
1234        }
1235    }
1236}
1237
1238macro_rules! impl_splat4_step {
1239    ($($P:ident),+) => {
1240        impl<A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
1241            SplatCall4<A, B, C, D> for Step<F, ($($P,)+)>
1242        where for<'a> &'a mut F:
1243            FnMut($($P,)+ A, B, C, D) -> Out +
1244            FnMut($($P::Item<'a>,)+ A, B, C, D) -> Out,
1245        {
1246            type Out = Out;
1247            #[inline(always)]
1248            #[allow(non_snake_case)]
1249            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D) -> Out {
1250                #[allow(clippy::too_many_arguments)]
1251                fn call_inner<$($P,)+ IA, IB, IC, ID, Output>(
1252                    mut f: impl FnMut($($P,)+ IA, IB, IC, ID) -> Output,
1253                    $($P: $P,)+ a: IA, b: IB, c: IC, d: ID,
1254                ) -> Output { f($($P,)+ a, b, c, d) }
1255                // SAFETY: state was produced by Param::init() on the same Registry
1256                // that built this World. Borrows are disjoint — enforced by
1257                // conflict detection at build time.
1258                #[cfg(debug_assertions)]
1259                world.clear_borrows();
1260                let ($($P,)+) = unsafe {
1261                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1262                };
1263                call_inner(&mut self.f, $($P,)+ a, b, c, d)
1264            }
1265        }
1266        impl<A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
1267            IntoSplatStep4<A, B, C, D, Out, ($($P,)+)> for F
1268        where for<'a> &'a mut F:
1269            FnMut($($P,)+ A, B, C, D) -> Out +
1270            FnMut($($P::Item<'a>,)+ A, B, C, D) -> Out,
1271        {
1272            type Step = Step<F, ($($P,)+)>;
1273            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1274                let state = <($($P,)+) as Param>::init(registry);
1275                { #[allow(non_snake_case)] let ($($P,)+) = &state;
1276                  registry.check_access(&[$((<$P as Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
1277                Step { f: self, state, name: std::any::type_name::<F>() }
1278            }
1279        }
1280    };
1281}
1282
1283// -- Splat 5 ------------------------------------------------------------------
1284
1285/// Internal: callable trait for resolved 5-splat steps.
1286#[doc(hidden)]
1287pub trait SplatCall5<A, B, C, D, E> {
1288    /// Output type of this splat step.
1289    type Out;
1290    #[allow(clippy::many_single_char_names)]
1291    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Self::Out;
1292}
1293
1294/// Converts a named function into a resolved 5-splat step.
1295#[doc(hidden)]
1296pub trait IntoSplatStep5<A, B, C, D, E, Out, Params> {
1297    type Step: SplatCall5<A, B, C, D, E, Out = Out>;
1298    fn into_splat_step(self, registry: &Registry) -> Self::Step;
1299}
1300
1301impl<A, B, C, D, E, Out, F: FnMut(A, B, C, D, E) -> Out + 'static> SplatCall5<A, B, C, D, E>
1302    for Step<F, ()>
1303{
1304    type Out = Out;
1305    #[inline(always)]
1306    #[allow(clippy::many_single_char_names)]
1307    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Out {
1308        (self.f)(a, b, c, d, e)
1309    }
1310}
1311
1312impl<A, B, C, D, E, Out, F: FnMut(A, B, C, D, E) -> Out + 'static>
1313    IntoSplatStep5<A, B, C, D, E, Out, ()> for F
1314{
1315    type Step = Step<F, ()>;
1316    fn into_splat_step(self, registry: &Registry) -> Self::Step {
1317        Step {
1318            f: self,
1319            state: <() as Param>::init(registry),
1320            name: std::any::type_name::<F>(),
1321        }
1322    }
1323}
1324
1325macro_rules! impl_splat5_step {
1326    ($($P:ident),+) => {
1327        impl<A, B, C, D, E, Out, F: 'static, $($P: Param + 'static),+>
1328            SplatCall5<A, B, C, D, E> for Step<F, ($($P,)+)>
1329        where for<'a> &'a mut F:
1330            FnMut($($P,)+ A, B, C, D, E) -> Out +
1331            FnMut($($P::Item<'a>,)+ A, B, C, D, E) -> Out,
1332        {
1333            type Out = Out;
1334            #[inline(always)]
1335            #[allow(non_snake_case, clippy::many_single_char_names)]
1336            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Out {
1337                #[allow(clippy::too_many_arguments)]
1338                fn call_inner<$($P,)+ IA, IB, IC, ID, IE, Output>(
1339                    mut f: impl FnMut($($P,)+ IA, IB, IC, ID, IE) -> Output,
1340                    $($P: $P,)+ a: IA, b: IB, c: IC, d: ID, e: IE,
1341                ) -> Output { f($($P,)+ a, b, c, d, e) }
1342                // SAFETY: state was produced by Param::init() on the same Registry
1343                // that built this World. Borrows are disjoint — enforced by
1344                // conflict detection at build time.
1345                #[cfg(debug_assertions)]
1346                world.clear_borrows();
1347                let ($($P,)+) = unsafe {
1348                    <($($P,)+) as Param>::fetch(world, &mut self.state)
1349                };
1350                call_inner(&mut self.f, $($P,)+ a, b, c, d, e)
1351            }
1352        }
1353        impl<A, B, C, D, E, Out, F: 'static, $($P: Param + 'static),+>
1354            IntoSplatStep5<A, B, C, D, E, Out, ($($P,)+)> for F
1355        where for<'a> &'a mut F:
1356            FnMut($($P,)+ A, B, C, D, E) -> Out +
1357            FnMut($($P::Item<'a>,)+ A, B, C, D, E) -> Out,
1358        {
1359            type Step = Step<F, ($($P,)+)>;
1360            fn into_splat_step(self, registry: &Registry) -> Self::Step {
1361                let state = <($($P,)+) as Param>::init(registry);
1362                { #[allow(non_snake_case)] let ($($P,)+) = &state;
1363                  registry.check_access(&[$((<$P as Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
1364                Step { f: self, state, name: std::any::type_name::<F>() }
1365            }
1366        }
1367    };
1368}
1369
1370all_tuples!(impl_splat2_step);
1371all_tuples!(impl_splat3_step);
1372all_tuples!(impl_splat4_step);
1373all_tuples!(impl_splat5_step);
1374
1375// =============================================================================
1376// ChainCall — named chain dispatch trait
1377// =============================================================================
1378
1379/// Trait for pipeline chain nodes. Each node transforms input through
1380/// the chain, producing an output. `In` appears only on the trait impl,
1381/// not on the implementing struct — this preserves HRTB compatibility
1382/// so `Pipeline<Chain>` can satisfy `for<'a> Handler<&'a T>`.
1383#[doc(hidden)]
1384pub trait ChainCall<In> {
1385    /// The output type of this chain node.
1386    type Out;
1387    /// Execute the chain on the given input.
1388    fn call(&mut self, world: &mut World, input: In) -> Self::Out;
1389}
1390
1391// =============================================================================
1392// Chain nodes — named types for HRTB-compatible pipeline composition
1393// =============================================================================
1394//
1395// Each combinator gets a named struct following the iterator adapter pattern
1396// (like `Map<I, F>`, `Filter<I, P>`). `In` appears only on the `ChainCall<In>`
1397// trait impl, never on the struct — this is what enables HRTB boxing.
1398
1399/// Identity passthrough node. Used as the initial chain element.
1400#[doc(hidden)]
1401pub struct IdentityNode;
1402
1403impl<In> ChainCall<In> for IdentityNode {
1404    type Out = In;
1405    #[inline(always)]
1406    fn call(&mut self, _world: &mut World, input: In) -> In {
1407        input
1408    }
1409}
1410
1411// -- Core (any Out) ----------------------------------------------------------
1412
1413/// Chain node for `.then()` — transforms output via a resolved step.
1414#[doc(hidden)]
1415pub struct ThenNode<Prev, S> {
1416    pub(crate) prev: Prev,
1417    pub(crate) step: S,
1418}
1419
1420impl<In, Prev, S> ChainCall<In> for ThenNode<Prev, S>
1421where
1422    Prev: ChainCall<In>,
1423    S: StepCall<Prev::Out>,
1424{
1425    type Out = S::Out;
1426    #[inline(always)]
1427    fn call(&mut self, world: &mut World, input: In) -> S::Out {
1428        let mid = self.prev.call(world, input);
1429        self.step.call(world, mid)
1430    }
1431}
1432
1433/// Chain node for `.tap()` — side effect via `&Out`, value passes through.
1434#[doc(hidden)]
1435pub struct TapNode<Prev, S> {
1436    pub(crate) prev: Prev,
1437    pub(crate) step: S,
1438}
1439
1440impl<In, Prev, S> ChainCall<In> for TapNode<Prev, S>
1441where
1442    Prev: ChainCall<In>,
1443    S: RefStepCall<Prev::Out, Out = ()>,
1444{
1445    type Out = Prev::Out;
1446    #[inline(always)]
1447    fn call(&mut self, world: &mut World, input: In) -> Prev::Out {
1448        let val = self.prev.call(world, input);
1449        self.step.call(world, &val);
1450        val
1451    }
1452}
1453
1454/// Chain node for `.guard()` — wraps output in `Option` based on predicate.
1455#[doc(hidden)]
1456pub struct GuardNode<Prev, S> {
1457    pub(crate) prev: Prev,
1458    pub(crate) step: S,
1459}
1460
1461impl<In, Prev, S> ChainCall<In> for GuardNode<Prev, S>
1462where
1463    Prev: ChainCall<In>,
1464    S: RefStepCall<Prev::Out, Out = bool>,
1465{
1466    type Out = Option<Prev::Out>;
1467    #[inline(always)]
1468    fn call(&mut self, world: &mut World, input: In) -> Option<Prev::Out> {
1469        let val = self.prev.call(world, input);
1470        if self.step.call(world, &val) {
1471            Some(val)
1472        } else {
1473            None
1474        }
1475    }
1476}
1477
1478/// Chain node for `.dedup()` — suppresses consecutive unchanged values.
1479#[doc(hidden)]
1480pub struct DedupNode<Prev, T> {
1481    pub(crate) prev: Prev,
1482    pub(crate) last: Option<T>,
1483}
1484
1485impl<In, T: PartialEq + Clone, Prev: ChainCall<In, Out = T>> ChainCall<In> for DedupNode<Prev, T> {
1486    type Out = Option<T>;
1487    #[inline(always)]
1488    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1489        let val = self.prev.call(world, input);
1490        if self.last.as_ref() == Some(&val) {
1491            None
1492        } else {
1493            self.last = Some(val.clone());
1494            Some(val)
1495        }
1496    }
1497}
1498
1499/// Chain node for `.scan()` — transforms with persistent accumulator.
1500#[doc(hidden)]
1501pub struct ScanNode<Prev, S, Acc> {
1502    pub(crate) prev: Prev,
1503    pub(crate) step: S,
1504    pub(crate) acc: Acc,
1505}
1506
1507impl<In, Prev, S, Acc> ChainCall<In> for ScanNode<Prev, S, Acc>
1508where
1509    Prev: ChainCall<In>,
1510    S: ScanStepCall<Acc, Prev::Out>,
1511{
1512    type Out = S::Out;
1513    #[inline(always)]
1514    fn call(&mut self, world: &mut World, input: In) -> S::Out {
1515        let val = self.prev.call(world, input);
1516        self.step.call(world, &mut self.acc, val)
1517    }
1518}
1519
1520/// Chain node for `.dispatch()` — feeds output to a [`Handler`].
1521#[doc(hidden)]
1522pub struct DispatchNode<Prev, H> {
1523    pub(crate) prev: Prev,
1524    pub(crate) handler: H,
1525}
1526
1527impl<In, Prev, H> ChainCall<In> for DispatchNode<Prev, H>
1528where
1529    Prev: ChainCall<In>,
1530    H: Handler<Prev::Out>,
1531{
1532    type Out = ();
1533    #[inline(always)]
1534    fn call(&mut self, world: &mut World, input: In) {
1535        let out = self.prev.call(world, input);
1536        self.handler.run(world, out);
1537    }
1538}
1539
1540/// Chain node for `.tee()` — runs side-effect chain on `&Out`, passes value through.
1541#[doc(hidden)]
1542pub struct TeeNode<Prev, C> {
1543    pub(crate) prev: Prev,
1544    pub(crate) side: C,
1545}
1546
1547impl<In, Prev, C> ChainCall<In> for TeeNode<Prev, C>
1548where
1549    Prev: ChainCall<In>,
1550    Prev::Out: 'static,
1551    C: for<'a> ChainCall<&'a Prev::Out, Out = ()>,
1552{
1553    type Out = Prev::Out;
1554    #[inline(always)]
1555    fn call(&mut self, world: &mut World, input: In) -> Prev::Out {
1556        let val = self.prev.call(world, input);
1557        self.side.call(world, &val);
1558        val
1559    }
1560}
1561
1562/// Chain node for `.route()` — binary conditional dispatch.
1563#[doc(hidden)]
1564pub struct RouteNode<Prev, P, C0, C1> {
1565    pub(crate) prev: Prev,
1566    pub(crate) pred: P,
1567    pub(crate) on_true: C0,
1568    pub(crate) on_false: C1,
1569}
1570
1571impl<In, Prev, P, C0, C1> ChainCall<In> for RouteNode<Prev, P, C0, C1>
1572where
1573    Prev: ChainCall<In>,
1574    P: RefStepCall<Prev::Out, Out = bool>,
1575    C0: ChainCall<Prev::Out>,
1576    C1: ChainCall<Prev::Out, Out = C0::Out>,
1577{
1578    type Out = C0::Out;
1579    #[inline(always)]
1580    fn call(&mut self, world: &mut World, input: In) -> C0::Out {
1581        let val = self.prev.call(world, input);
1582        if self.pred.call(world, &val) {
1583            self.on_true.call(world, val)
1584        } else {
1585            self.on_false.call(world, val)
1586        }
1587    }
1588}
1589
1590// -- Option<T> nodes ---------------------------------------------------------
1591
1592/// Chain node for `.map()` on `Option<T>`.
1593#[doc(hidden)]
1594pub struct MapOptionNode<Prev, S> {
1595    pub(crate) prev: Prev,
1596    pub(crate) step: S,
1597}
1598
1599impl<In, T, Prev, S> ChainCall<In> for MapOptionNode<Prev, S>
1600where
1601    Prev: ChainCall<In, Out = Option<T>>,
1602    S: StepCall<T>,
1603{
1604    type Out = Option<S::Out>;
1605    #[inline(always)]
1606    fn call(&mut self, world: &mut World, input: In) -> Option<S::Out> {
1607        self.prev
1608            .call(world, input)
1609            .map(|val| self.step.call(world, val))
1610    }
1611}
1612
1613/// Chain node for `.filter()` on `Option<T>`.
1614#[doc(hidden)]
1615pub struct FilterNode<Prev, S> {
1616    pub(crate) prev: Prev,
1617    pub(crate) step: S,
1618}
1619
1620impl<In, T, Prev, S> ChainCall<In> for FilterNode<Prev, S>
1621where
1622    Prev: ChainCall<In, Out = Option<T>>,
1623    S: RefStepCall<T, Out = bool>,
1624{
1625    type Out = Option<T>;
1626    #[inline(always)]
1627    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1628        self.prev
1629            .call(world, input)
1630            .filter(|val| self.step.call(world, val))
1631    }
1632}
1633
1634/// Chain node for `.inspect()` on `Option<T>`.
1635#[doc(hidden)]
1636pub struct InspectOptionNode<Prev, S> {
1637    pub(crate) prev: Prev,
1638    pub(crate) step: S,
1639}
1640
1641impl<In, T, Prev, S> ChainCall<In> for InspectOptionNode<Prev, S>
1642where
1643    Prev: ChainCall<In, Out = Option<T>>,
1644    S: RefStepCall<T, Out = ()>,
1645{
1646    type Out = Option<T>;
1647    #[inline(always)]
1648    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1649        self.prev
1650            .call(world, input)
1651            .inspect(|val| self.step.call(world, val))
1652    }
1653}
1654
1655/// Chain node for `.and_then()` on `Option<T>`.
1656#[doc(hidden)]
1657pub struct AndThenNode<Prev, S> {
1658    pub(crate) prev: Prev,
1659    pub(crate) step: S,
1660}
1661
1662impl<In, T, U, Prev, S> ChainCall<In> for AndThenNode<Prev, S>
1663where
1664    Prev: ChainCall<In, Out = Option<T>>,
1665    S: StepCall<T, Out = Option<U>>,
1666{
1667    type Out = Option<U>;
1668    #[inline(always)]
1669    fn call(&mut self, world: &mut World, input: In) -> Option<U> {
1670        self.prev
1671            .call(world, input)
1672            .and_then(|val| self.step.call(world, val))
1673    }
1674}
1675
1676/// Chain node for `.on_none()` — side effect when `None`.
1677#[doc(hidden)]
1678pub struct OnNoneNode<Prev, P> {
1679    pub(crate) prev: Prev,
1680    pub(crate) producer: P,
1681}
1682
1683impl<In, T, Prev, P> ChainCall<In> for OnNoneNode<Prev, P>
1684where
1685    Prev: ChainCall<In, Out = Option<T>>,
1686    P: ProducerCall<Out = ()>,
1687{
1688    type Out = Option<T>;
1689    #[inline(always)]
1690    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1691        let result = self.prev.call(world, input);
1692        if result.is_none() {
1693            self.producer.call(world);
1694        }
1695        result
1696    }
1697}
1698
1699/// Chain node for `.ok_or()` — `Option<T>` → `Result<T, E>`.
1700#[doc(hidden)]
1701pub struct OkOrNode<Prev, E> {
1702    pub(crate) prev: Prev,
1703    pub(crate) err: E,
1704}
1705
1706impl<In, T, E: Clone, Prev: ChainCall<In, Out = Option<T>>> ChainCall<In> for OkOrNode<Prev, E> {
1707    type Out = Result<T, E>;
1708    #[inline(always)]
1709    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
1710        self.prev.call(world, input).ok_or_else(|| self.err.clone())
1711    }
1712}
1713
1714/// Chain node for `.ok_or_else()` — `Option<T>` → `Result<T, E>`.
1715#[doc(hidden)]
1716pub struct OkOrElseNode<Prev, P> {
1717    pub(crate) prev: Prev,
1718    pub(crate) producer: P,
1719}
1720
1721impl<In, T, E, Prev, P> ChainCall<In> for OkOrElseNode<Prev, P>
1722where
1723    Prev: ChainCall<In, Out = Option<T>>,
1724    P: ProducerCall<Out = E>,
1725{
1726    type Out = Result<T, E>;
1727    #[inline(always)]
1728    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
1729        self.prev
1730            .call(world, input)
1731            .ok_or_else(|| self.producer.call(world))
1732    }
1733}
1734
1735/// Chain node for `.unwrap_or()` on `Option<T>`.
1736#[doc(hidden)]
1737pub struct UnwrapOrOptionNode<Prev, T> {
1738    pub(crate) prev: Prev,
1739    pub(crate) default: T,
1740}
1741
1742impl<In, T: Clone, Prev: ChainCall<In, Out = Option<T>>> ChainCall<In>
1743    for UnwrapOrOptionNode<Prev, T>
1744{
1745    type Out = T;
1746    #[inline(always)]
1747    fn call(&mut self, world: &mut World, input: In) -> T {
1748        self.prev
1749            .call(world, input)
1750            .unwrap_or_else(|| self.default.clone())
1751    }
1752}
1753
1754/// Chain node for `.unwrap_or_else()` on `Option<T>`.
1755#[doc(hidden)]
1756pub struct UnwrapOrElseOptionNode<Prev, P> {
1757    pub(crate) prev: Prev,
1758    pub(crate) producer: P,
1759}
1760
1761impl<In, T, Prev, P> ChainCall<In> for UnwrapOrElseOptionNode<Prev, P>
1762where
1763    Prev: ChainCall<In, Out = Option<T>>,
1764    P: ProducerCall<Out = T>,
1765{
1766    type Out = T;
1767    #[inline(always)]
1768    fn call(&mut self, world: &mut World, input: In) -> T {
1769        self.prev
1770            .call(world, input)
1771            .unwrap_or_else(|| self.producer.call(world))
1772    }
1773}
1774
1775// -- Result<T, E> nodes ------------------------------------------------------
1776
1777/// Chain node for `.map()` on `Result<T, E>`.
1778#[doc(hidden)]
1779pub struct MapResultNode<Prev, S> {
1780    pub(crate) prev: Prev,
1781    pub(crate) step: S,
1782}
1783
1784impl<In, T, E, Prev, S> ChainCall<In> for MapResultNode<Prev, S>
1785where
1786    Prev: ChainCall<In, Out = Result<T, E>>,
1787    S: StepCall<T>,
1788{
1789    type Out = Result<S::Out, E>;
1790    #[inline(always)]
1791    fn call(&mut self, world: &mut World, input: In) -> Result<S::Out, E> {
1792        self.prev
1793            .call(world, input)
1794            .map(|val| self.step.call(world, val))
1795    }
1796}
1797
1798/// Chain node for `.and_then()` on `Result<T, E>`.
1799#[doc(hidden)]
1800pub struct AndThenResultNode<Prev, S> {
1801    pub(crate) prev: Prev,
1802    pub(crate) step: S,
1803}
1804
1805impl<In, T, U, E, Prev, S> ChainCall<In> for AndThenResultNode<Prev, S>
1806where
1807    Prev: ChainCall<In, Out = Result<T, E>>,
1808    S: StepCall<T, Out = Result<U, E>>,
1809{
1810    type Out = Result<U, E>;
1811    #[inline(always)]
1812    fn call(&mut self, world: &mut World, input: In) -> Result<U, E> {
1813        self.prev
1814            .call(world, input)
1815            .and_then(|val| self.step.call(world, val))
1816    }
1817}
1818
1819/// Chain node for `.catch()` — `Result<T, E>` → `Option<T>`.
1820#[doc(hidden)]
1821pub struct CatchNode<Prev, S> {
1822    pub(crate) prev: Prev,
1823    pub(crate) step: S,
1824}
1825
1826impl<In, T, E, Prev, S> ChainCall<In> for CatchNode<Prev, S>
1827where
1828    Prev: ChainCall<In, Out = Result<T, E>>,
1829    S: StepCall<E, Out = ()>,
1830{
1831    type Out = Option<T>;
1832    #[inline(always)]
1833    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1834        match self.prev.call(world, input) {
1835            Ok(val) => Some(val),
1836            Err(err) => {
1837                self.step.call(world, err);
1838                None
1839            }
1840        }
1841    }
1842}
1843
1844/// Chain node for `.map_err()`.
1845#[doc(hidden)]
1846pub struct MapErrNode<Prev, S> {
1847    pub(crate) prev: Prev,
1848    pub(crate) step: S,
1849}
1850
1851impl<In, T, E, Prev, S> ChainCall<In> for MapErrNode<Prev, S>
1852where
1853    Prev: ChainCall<In, Out = Result<T, E>>,
1854    S: StepCall<E>,
1855{
1856    type Out = Result<T, S::Out>;
1857    #[inline(always)]
1858    fn call(&mut self, world: &mut World, input: In) -> Result<T, S::Out> {
1859        self.prev
1860            .call(world, input)
1861            .map_err(|err| self.step.call(world, err))
1862    }
1863}
1864
1865/// Chain node for `.or_else()`.
1866#[doc(hidden)]
1867pub struct OrElseNode<Prev, S> {
1868    pub(crate) prev: Prev,
1869    pub(crate) step: S,
1870}
1871
1872impl<In, T, E, E2, Prev, S> ChainCall<In> for OrElseNode<Prev, S>
1873where
1874    Prev: ChainCall<In, Out = Result<T, E>>,
1875    S: StepCall<E, Out = Result<T, E2>>,
1876{
1877    type Out = Result<T, E2>;
1878    #[inline(always)]
1879    fn call(&mut self, world: &mut World, input: In) -> Result<T, E2> {
1880        self.prev
1881            .call(world, input)
1882            .or_else(|err| self.step.call(world, err))
1883    }
1884}
1885
1886/// Chain node for `.inspect()` on `Result<T, E>`.
1887#[doc(hidden)]
1888pub struct InspectResultNode<Prev, S> {
1889    pub(crate) prev: Prev,
1890    pub(crate) step: S,
1891}
1892
1893impl<In, T, E, Prev, S> ChainCall<In> for InspectResultNode<Prev, S>
1894where
1895    Prev: ChainCall<In, Out = Result<T, E>>,
1896    S: RefStepCall<T, Out = ()>,
1897{
1898    type Out = Result<T, E>;
1899    #[inline(always)]
1900    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
1901        self.prev
1902            .call(world, input)
1903            .inspect(|val| self.step.call(world, val))
1904    }
1905}
1906
1907/// Chain node for `.inspect_err()`.
1908#[doc(hidden)]
1909pub struct InspectErrNode<Prev, S> {
1910    pub(crate) prev: Prev,
1911    pub(crate) step: S,
1912}
1913
1914impl<In, T, E, Prev, S> ChainCall<In> for InspectErrNode<Prev, S>
1915where
1916    Prev: ChainCall<In, Out = Result<T, E>>,
1917    S: RefStepCall<E, Out = ()>,
1918{
1919    type Out = Result<T, E>;
1920    #[inline(always)]
1921    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
1922        self.prev
1923            .call(world, input)
1924            .inspect_err(|err| self.step.call(world, err))
1925    }
1926}
1927
1928/// Chain node for `.ok()` — `Result<T, E>` → `Option<T>`.
1929#[doc(hidden)]
1930pub struct OkResultNode<Prev> {
1931    pub(crate) prev: Prev,
1932}
1933
1934impl<In, T, E, Prev: ChainCall<In, Out = Result<T, E>>> ChainCall<In> for OkResultNode<Prev> {
1935    type Out = Option<T>;
1936    #[inline(always)]
1937    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
1938        self.prev.call(world, input).ok()
1939    }
1940}
1941
1942/// Chain node for `.unwrap_or()` on `Result<T, E>`.
1943#[doc(hidden)]
1944pub struct UnwrapOrResultNode<Prev, T> {
1945    pub(crate) prev: Prev,
1946    pub(crate) default: T,
1947}
1948
1949impl<In, T: Clone, E, Prev: ChainCall<In, Out = Result<T, E>>> ChainCall<In>
1950    for UnwrapOrResultNode<Prev, T>
1951{
1952    type Out = T;
1953    #[inline(always)]
1954    fn call(&mut self, world: &mut World, input: In) -> T {
1955        self.prev
1956            .call(world, input)
1957            .unwrap_or_else(|_| self.default.clone())
1958    }
1959}
1960
1961/// Chain node for `.unwrap_or_else()` on `Result<T, E>`.
1962#[doc(hidden)]
1963pub struct UnwrapOrElseResultNode<Prev, S> {
1964    pub(crate) prev: Prev,
1965    pub(crate) step: S,
1966}
1967
1968impl<In, T, E, Prev, S> ChainCall<In> for UnwrapOrElseResultNode<Prev, S>
1969where
1970    Prev: ChainCall<In, Out = Result<T, E>>,
1971    S: StepCall<E, Out = T>,
1972{
1973    type Out = T;
1974    #[inline(always)]
1975    fn call(&mut self, world: &mut World, input: In) -> T {
1976        match self.prev.call(world, input) {
1977            Ok(val) => val,
1978            Err(err) => self.step.call(world, err),
1979        }
1980    }
1981}
1982
1983// -- Bool nodes --------------------------------------------------------------
1984
1985/// Chain node for `.not()`.
1986#[doc(hidden)]
1987pub struct NotNode<Prev> {
1988    pub(crate) prev: Prev,
1989}
1990
1991impl<In, Prev: ChainCall<In, Out = bool>> ChainCall<In> for NotNode<Prev> {
1992    type Out = bool;
1993    #[inline(always)]
1994    fn call(&mut self, world: &mut World, input: In) -> bool {
1995        !self.prev.call(world, input)
1996    }
1997}
1998
1999/// Chain node for `.and()` on bool.
2000#[doc(hidden)]
2001pub struct AndBoolNode<Prev, P> {
2002    pub(crate) prev: Prev,
2003    pub(crate) producer: P,
2004}
2005
2006impl<In, Prev, P> ChainCall<In> for AndBoolNode<Prev, P>
2007where
2008    Prev: ChainCall<In, Out = bool>,
2009    P: ProducerCall<Out = bool>,
2010{
2011    type Out = bool;
2012    #[inline(always)]
2013    fn call(&mut self, world: &mut World, input: In) -> bool {
2014        self.prev.call(world, input) && self.producer.call(world)
2015    }
2016}
2017
2018/// Chain node for `.or()` on bool.
2019#[doc(hidden)]
2020pub struct OrBoolNode<Prev, P> {
2021    pub(crate) prev: Prev,
2022    pub(crate) producer: P,
2023}
2024
2025impl<In, Prev, P> ChainCall<In> for OrBoolNode<Prev, P>
2026where
2027    Prev: ChainCall<In, Out = bool>,
2028    P: ProducerCall<Out = bool>,
2029{
2030    type Out = bool;
2031    #[inline(always)]
2032    fn call(&mut self, world: &mut World, input: In) -> bool {
2033        self.prev.call(world, input) || self.producer.call(world)
2034    }
2035}
2036
2037/// Chain node for `.xor()` on bool.
2038#[doc(hidden)]
2039pub struct XorBoolNode<Prev, P> {
2040    pub(crate) prev: Prev,
2041    pub(crate) producer: P,
2042}
2043
2044impl<In, Prev, P> ChainCall<In> for XorBoolNode<Prev, P>
2045where
2046    Prev: ChainCall<In, Out = bool>,
2047    P: ProducerCall<Out = bool>,
2048{
2049    type Out = bool;
2050    #[inline(always)]
2051    fn call(&mut self, world: &mut World, input: In) -> bool {
2052        self.prev.call(world, input) ^ self.producer.call(world)
2053    }
2054}
2055
2056// -- Cloned nodes ------------------------------------------------------------
2057
2058/// Chain node for `.cloned()` on `&T`.
2059#[doc(hidden)]
2060pub struct ClonedNode<Prev> {
2061    pub(crate) prev: Prev,
2062}
2063
2064impl<'a, In, T: Clone + 'a, Prev: ChainCall<In, Out = &'a T>> ChainCall<In> for ClonedNode<Prev> {
2065    type Out = T;
2066    #[inline(always)]
2067    fn call(&mut self, world: &mut World, input: In) -> T {
2068        T::clone(self.prev.call(world, input))
2069    }
2070}
2071
2072/// Chain node for `.cloned()` on `Option<&T>`.
2073#[doc(hidden)]
2074pub struct ClonedOptionNode<Prev> {
2075    pub(crate) prev: Prev,
2076}
2077
2078impl<'a, In, T: Clone + 'a, Prev: ChainCall<In, Out = Option<&'a T>>> ChainCall<In>
2079    for ClonedOptionNode<Prev>
2080{
2081    type Out = Option<T>;
2082    #[inline(always)]
2083    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
2084        self.prev.call(world, input).cloned()
2085    }
2086}
2087
2088/// Chain node for `.cloned()` on `Result<&T, E>`.
2089#[doc(hidden)]
2090pub struct ClonedResultNode<Prev> {
2091    pub(crate) prev: Prev,
2092}
2093
2094impl<'a, In, T: Clone + 'a, E, Prev: ChainCall<In, Out = Result<&'a T, E>>> ChainCall<In>
2095    for ClonedResultNode<Prev>
2096{
2097    type Out = Result<T, E>;
2098    #[inline(always)]
2099    fn call(&mut self, world: &mut World, input: In) -> Result<T, E> {
2100        self.prev.call(world, input).cloned()
2101    }
2102}
2103
2104// -- DAG-specific nodes (borrow intermediate) --------------------------------
2105//
2106// DAG steps borrow `&Prev::Out` instead of consuming by value. These nodes
2107// parallel the pipeline nodes above but use `for<'a> StepCall<&'a T>` bounds
2108// and HRTB arm chains (`for<'a> ChainCall<&'a T>`).
2109
2110/// Chain node for DAG `.then()` — borrows intermediate output for next step.
2111///
2112/// Unlike pipeline's [`ThenNode`] which passes by value, this borrows `&Prev::Out`
2113/// for the step. Used for DAG chains where intermediates are owned and borrowed.
2114#[doc(hidden)]
2115pub struct DagThenNode<Prev, S, NewOut> {
2116    pub(crate) prev: Prev,
2117    pub(crate) step: S,
2118    pub(crate) _out: PhantomData<fn() -> NewOut>,
2119}
2120
2121impl<In, Prev, S, NewOut: 'static> ChainCall<In> for DagThenNode<Prev, S, NewOut>
2122where
2123    Prev: ChainCall<In>,
2124    Prev::Out: 'static,
2125    S: for<'a> StepCall<&'a Prev::Out, Out = NewOut>,
2126{
2127    type Out = NewOut;
2128    #[inline(always)]
2129    fn call(&mut self, world: &mut World, input: In) -> NewOut {
2130        let out = self.prev.call(world, input);
2131        self.step.call(world, &out)
2132    }
2133}
2134
2135/// Chain node for DAG `.scan()` — scan with borrowed input.
2136///
2137/// Like [`ScanNode`] but the step receives `&Prev::Out` via [`RefScanStepCall`].
2138#[doc(hidden)]
2139pub struct RefScanNode<Prev, S, Acc> {
2140    pub(crate) prev: Prev,
2141    pub(crate) step: S,
2142    pub(crate) acc: Acc,
2143}
2144
2145impl<In, Prev, S, Acc> ChainCall<In> for RefScanNode<Prev, S, Acc>
2146where
2147    Prev: ChainCall<In>,
2148    S: RefScanStepCall<Acc, Prev::Out>,
2149{
2150    type Out = S::Out;
2151    #[inline(always)]
2152    fn call(&mut self, world: &mut World, input: In) -> S::Out {
2153        let val = self.prev.call(world, input);
2154        self.step.call(world, &mut self.acc, &val)
2155    }
2156}
2157
2158/// Chain node for DAG `.route()` — arms borrow `&Prev::Out` (HRTB).
2159///
2160/// Unlike pipeline's [`RouteNode`] which passes by value, this borrows
2161/// the value for the predicate and arms. Arms satisfy `for<'a> ChainCall<&'a Out>`.
2162#[doc(hidden)]
2163pub struct DagRouteNode<Prev, P, C0, C1, NewOut> {
2164    pub(crate) prev: Prev,
2165    pub(crate) pred: P,
2166    pub(crate) on_true: C0,
2167    pub(crate) on_false: C1,
2168    pub(crate) _out: PhantomData<fn() -> NewOut>,
2169}
2170
2171impl<In, Prev, P, C0, C1, NewOut> ChainCall<In> for DagRouteNode<Prev, P, C0, C1, NewOut>
2172where
2173    Prev: ChainCall<In>,
2174    Prev::Out: 'static,
2175    P: RefStepCall<Prev::Out, Out = bool>,
2176    C0: for<'a> ChainCall<&'a Prev::Out, Out = NewOut>,
2177    C1: for<'a> ChainCall<&'a Prev::Out, Out = NewOut>,
2178{
2179    type Out = NewOut;
2180    #[inline(always)]
2181    fn call(&mut self, world: &mut World, input: In) -> NewOut {
2182        let val = self.prev.call(world, input);
2183        if self.pred.call(world, &val) {
2184            self.on_true.call(world, &val)
2185        } else {
2186            self.on_false.call(world, &val)
2187        }
2188    }
2189}
2190
2191/// Chain node for DAG `.map()` on `Option<T>` — step borrows `&T`.
2192///
2193/// Like [`MapOptionNode`] but the step receives `&T` instead of `T` by value.
2194#[doc(hidden)]
2195pub struct DagMapOptionNode<Prev, S, U> {
2196    pub(crate) prev: Prev,
2197    pub(crate) step: S,
2198    pub(crate) _out: PhantomData<fn() -> U>,
2199}
2200
2201impl<In, T: 'static, U: 'static, Prev, S> ChainCall<In> for DagMapOptionNode<Prev, S, U>
2202where
2203    Prev: ChainCall<In, Out = Option<T>>,
2204    S: for<'a> StepCall<&'a T, Out = U>,
2205{
2206    type Out = Option<U>;
2207    #[inline(always)]
2208    fn call(&mut self, world: &mut World, input: In) -> Option<U> {
2209        self.prev
2210            .call(world, input)
2211            .map(|ref val| self.step.call(world, val))
2212    }
2213}
2214
2215/// Chain node for DAG `.and_then()` on `Option<T>` — step borrows `&T`.
2216#[doc(hidden)]
2217pub struct DagAndThenOptionNode<Prev, S, U> {
2218    pub(crate) prev: Prev,
2219    pub(crate) step: S,
2220    pub(crate) _out: PhantomData<fn() -> U>,
2221}
2222
2223impl<In, T: 'static, U: 'static, Prev, S> ChainCall<In> for DagAndThenOptionNode<Prev, S, U>
2224where
2225    Prev: ChainCall<In, Out = Option<T>>,
2226    S: for<'a> StepCall<&'a T, Out = Option<U>>,
2227{
2228    type Out = Option<U>;
2229    #[inline(always)]
2230    fn call(&mut self, world: &mut World, input: In) -> Option<U> {
2231        self.prev
2232            .call(world, input)
2233            .and_then(|ref val| self.step.call(world, val))
2234    }
2235}
2236
2237/// Chain node for DAG `.map()` on `Result<T, E>` — step borrows `&T`.
2238#[doc(hidden)]
2239pub struct DagMapResultNode<Prev, S, U> {
2240    pub(crate) prev: Prev,
2241    pub(crate) step: S,
2242    pub(crate) _out: PhantomData<fn() -> U>,
2243}
2244
2245impl<In, T: 'static, E, U: 'static, Prev, S> ChainCall<In> for DagMapResultNode<Prev, S, U>
2246where
2247    Prev: ChainCall<In, Out = Result<T, E>>,
2248    S: for<'a> StepCall<&'a T, Out = U>,
2249{
2250    type Out = Result<U, E>;
2251    #[inline(always)]
2252    fn call(&mut self, world: &mut World, input: In) -> Result<U, E> {
2253        self.prev
2254            .call(world, input)
2255            .map(|ref val| self.step.call(world, val))
2256    }
2257}
2258
2259/// Chain node for DAG `.and_then()` on `Result<T, E>` — step borrows `&T`.
2260#[doc(hidden)]
2261pub struct DagAndThenResultNode<Prev, S, U> {
2262    pub(crate) prev: Prev,
2263    pub(crate) step: S,
2264    pub(crate) _out: PhantomData<fn() -> U>,
2265}
2266
2267impl<In, T: 'static, E, U: 'static, Prev, S> ChainCall<In> for DagAndThenResultNode<Prev, S, U>
2268where
2269    Prev: ChainCall<In, Out = Result<T, E>>,
2270    S: for<'a> StepCall<&'a T, Out = Result<U, E>>,
2271{
2272    type Out = Result<U, E>;
2273    #[inline(always)]
2274    fn call(&mut self, world: &mut World, input: In) -> Result<U, E> {
2275        self.prev
2276            .call(world, input)
2277            .and_then(|ref val| self.step.call(world, val))
2278    }
2279}
2280
2281/// Chain node for DAG `.catch()` on `Result<T, E>` — error handler borrows `&E`.
2282///
2283/// Like [`CatchNode`] but the step receives `&E` instead of consuming `E`.
2284#[doc(hidden)]
2285pub struct DagCatchNode<Prev, S> {
2286    pub(crate) prev: Prev,
2287    pub(crate) step: S,
2288}
2289
2290impl<In, T, E: 'static, Prev, S> ChainCall<In> for DagCatchNode<Prev, S>
2291where
2292    Prev: ChainCall<In, Out = Result<T, E>>,
2293    S: for<'a> StepCall<&'a E, Out = ()>,
2294{
2295    type Out = Option<T>;
2296    #[inline(always)]
2297    fn call(&mut self, world: &mut World, input: In) -> Option<T> {
2298        match self.prev.call(world, input) {
2299            Ok(val) => Some(val),
2300            Err(ref err) => {
2301                self.step.call(world, err);
2302                None
2303            }
2304        }
2305    }
2306}
2307
2308// -- Terminal nodes ----------------------------------------------------------
2309
2310/// Chain node for `build()` on `Option<()>` — discards the option wrapper.
2311#[doc(hidden)]
2312pub struct DiscardOptionNode<Prev> {
2313    pub(crate) prev: Prev,
2314}
2315
2316impl<In, Prev: ChainCall<In, Out = Option<()>>> ChainCall<In> for DiscardOptionNode<Prev> {
2317    type Out = ();
2318    #[inline(always)]
2319    fn call(&mut self, world: &mut World, input: In) {
2320        let _ = self.prev.call(world, input);
2321    }
2322}
2323
2324// =============================================================================
2325// PipelineBuilder — entry point
2326// =============================================================================
2327
2328/// Entry point for building a pre-resolved step pipeline.
2329///
2330/// `In` is the pipeline input type. Call [`.then()`](Self::then) to add
2331/// the first step — a named function whose [`Param`] dependencies
2332/// are resolved from the registry at build time.
2333///
2334/// # Examples
2335///
2336/// ```
2337/// use nexus_rt::{WorldBuilder, Res, ResMut, PipelineBuilder, Handler, Resource};
2338///
2339/// #[derive(Resource)]
2340/// struct Factor(u64);
2341/// #[derive(Resource)]
2342/// struct Output(String);
2343///
2344/// let mut wb = WorldBuilder::new();
2345/// wb.register(Factor(10));
2346/// wb.register(Output(String::new()));
2347/// let mut world = wb.build();
2348///
2349/// fn double(factor: Res<Factor>, x: u32) -> u64 {
2350///     factor.0 * x as u64
2351/// }
2352/// fn store(mut out: ResMut<Output>, val: u64) {
2353///     out.0 = val.to_string();
2354/// }
2355///
2356/// let r = world.registry();
2357/// let mut pipeline = PipelineBuilder::<u32>::new()
2358///     .then(double, r)
2359///     .then(store, r)
2360///     .build();
2361///
2362/// pipeline.run(&mut world, 5);
2363/// assert_eq!(world.resource::<Output>().0.as_str(), "50");
2364/// ```
2365#[must_use = "a pipeline builder does nothing unless you chain steps and call .build()"]
2366pub struct PipelineBuilder<In>(PhantomData<fn(In)>);
2367
2368impl<In> PipelineBuilder<In> {
2369    /// Create a new step pipeline entry point.
2370    pub fn new() -> Self {
2371        Self(PhantomData)
2372    }
2373
2374    /// Add the first step. Params resolved from the registry.
2375    pub fn then<Out, Params, S: IntoStep<In, Out, Params>>(
2376        self,
2377        f: S,
2378        registry: &Registry,
2379    ) -> PipelineChain<In, Out, ThenNode<IdentityNode, S::Step>> {
2380        PipelineChain {
2381            chain: ThenNode {
2382                prev: IdentityNode,
2383                step: f.into_step(registry),
2384            },
2385            _marker: PhantomData,
2386        }
2387    }
2388
2389    /// Add the first step as a scan with persistent accumulator.
2390    /// The step receives `&mut Acc` and the input, returning the output.
2391    /// State persists across invocations.
2392    pub fn scan<Acc, Out, Params, S>(
2393        self,
2394        initial: Acc,
2395        f: S,
2396        registry: &Registry,
2397    ) -> PipelineChain<In, Out, ScanNode<IdentityNode, S::Step, Acc>>
2398    where
2399        Acc: 'static,
2400        S: IntoScanStep<Acc, In, Out, Params>,
2401    {
2402        PipelineChain {
2403            chain: ScanNode {
2404                prev: IdentityNode,
2405                step: f.into_scan_step(registry),
2406                acc: initial,
2407            },
2408            _marker: PhantomData,
2409        }
2410    }
2411}
2412
2413impl<In> Default for PipelineBuilder<In> {
2414    fn default() -> Self {
2415        Self::new()
2416    }
2417}
2418
2419// =============================================================================
2420// PipelineChain — typestate builder
2421// =============================================================================
2422
2423/// Builder that composes pre-resolved pipeline steps via named chain nodes.
2424///
2425/// `In` is the pipeline's input type (fixed). `Out` is the current output.
2426/// `Chain` is the concrete chain type (nested named nodes, like iterator adapters).
2427///
2428/// Each combinator consumes `self`, wraps the previous chain in a new named
2429/// node, and returns a new `PipelineChain`. The compiler monomorphizes the
2430/// entire chain — zero virtual dispatch through steps. Named types (not
2431/// closures) preserve HRTB: `Pipeline<Chain>` can satisfy `for<'a> Handler<&'a T>`.
2432///
2433/// IntoStep-based methods (`.then()`, `.map()`, `.and_then()`, `.catch()`)
2434/// take `&Registry` to resolve Param state at build time. Closure-based
2435/// methods don't need the registry.
2436#[must_use = "pipeline chain does nothing until .build() is called"]
2437pub struct PipelineChain<In, Out, Chain> {
2438    pub(crate) chain: Chain,
2439    pub(crate) _marker: PhantomData<fn(In) -> Out>,
2440}
2441
2442// =============================================================================
2443// Core — any Out
2444// =============================================================================
2445
2446impl<In, Out, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
2447    /// Add a step. Params resolved from the registry.
2448    pub fn then<NewOut, Params, S: IntoStep<Out, NewOut, Params>>(
2449        self,
2450        f: S,
2451        registry: &Registry,
2452    ) -> PipelineChain<In, NewOut, ThenNode<Chain, S::Step>> {
2453        PipelineChain {
2454            chain: ThenNode {
2455                prev: self.chain,
2456                step: f.into_step(registry),
2457            },
2458            _marker: PhantomData,
2459        }
2460    }
2461
2462    /// Run the pipeline directly. No boxing, no `'static` on `In`.
2463    pub fn run(&mut self, world: &mut World, input: In) -> Out {
2464        self.chain.call(world, input)
2465    }
2466
2467    /// Dispatch pipeline output to a [`Handler<Out>`].
2468    ///
2469    /// Connects a pipeline's output to any handler — [`HandlerFn`](crate::HandlerFn),
2470    /// [`Callback`](crate::Callback), [`Pipeline`], or a combinator like
2471    /// [`fan_out!`](crate::fan_out).
2472    pub fn dispatch<H: Handler<Out>>(
2473        self,
2474        handler: H,
2475    ) -> PipelineChain<In, (), DispatchNode<Chain, H>> {
2476        PipelineChain {
2477            chain: DispatchNode {
2478                prev: self.chain,
2479                handler,
2480            },
2481            _marker: PhantomData,
2482        }
2483    }
2484
2485    /// Conditionally wrap the output in `Option`. `Some(val)` if
2486    /// the predicate returns true, `None` otherwise.
2487    ///
2488    /// Enters Option-combinator land — follow with `.map()`,
2489    /// `.and_then()`, `.filter()`, `.unwrap_or()`, etc.
2490    ///
2491    /// # Common Mistakes
2492    ///
2493    /// Guard takes `&In`, not `In` — the value passes through unchanged.
2494    ///
2495    /// ```compile_fail
2496    /// # use nexus_rt::{PipelineBuilder, WorldBuilder};
2497    /// # let mut wb = WorldBuilder::new();
2498    /// # let world = wb.build();
2499    /// # let reg = world.registry();
2500    /// // ERROR: takes u32 by value, should be &u32
2501    /// PipelineBuilder::<u32>::new()
2502    ///     .then(|x: u32| x, &reg)
2503    ///     .guard(|x: u32| x > 10, &reg);
2504    /// ```
2505    ///
2506    /// Fix: take by reference:
2507    /// ```ignore
2508    /// PipelineBuilder::<u32>::new()
2509    ///     .then(|x: u32| x, &reg)
2510    ///     .guard(|x: &u32| *x > 10, &reg);
2511    /// ```
2512    pub fn guard<Params, S: IntoRefStep<Out, bool, Params>>(
2513        self,
2514        f: S,
2515        registry: &Registry,
2516    ) -> PipelineChain<In, Option<Out>, GuardNode<Chain, S::Step>> {
2517        PipelineChain {
2518            chain: GuardNode {
2519                prev: self.chain,
2520                step: f.into_ref_step(registry),
2521            },
2522            _marker: PhantomData,
2523        }
2524    }
2525
2526    /// Observe the current value without consuming or changing it.
2527    ///
2528    /// The step receives `&Out`. The value passes through unchanged.
2529    /// Useful for logging, metrics, or debugging mid-chain.
2530    ///
2531    /// # Common Mistakes
2532    ///
2533    /// Tap takes `&In`, not `In`:
2534    /// ```compile_fail
2535    /// # use nexus_rt::{PipelineBuilder, WorldBuilder};
2536    /// # let mut wb = WorldBuilder::new();
2537    /// # let world = wb.build();
2538    /// # let reg = world.registry();
2539    /// // ERROR: takes u32 by value
2540    /// PipelineBuilder::<u32>::new()
2541    ///     .then(|x: u32| x, &reg)
2542    ///     .tap(|x: u32| println!("{x}"), &reg);
2543    /// ```
2544    pub fn tap<Params, S: IntoRefStep<Out, (), Params>>(
2545        self,
2546        f: S,
2547        registry: &Registry,
2548    ) -> PipelineChain<In, Out, TapNode<Chain, S::Step>> {
2549        PipelineChain {
2550            chain: TapNode {
2551                prev: self.chain,
2552                step: f.into_ref_step(registry),
2553            },
2554            _marker: PhantomData,
2555        }
2556    }
2557
2558    /// Binary conditional routing. Evaluates the predicate on the
2559    /// current value, then moves it into exactly one of two arms.
2560    ///
2561    /// Both arms must produce the same output type. Build each arm as
2562    /// a sub-pipeline from [`PipelineBuilder`]. For N-ary routing, nest
2563    /// `route` calls in the false arm.
2564    ///
2565    /// ```ignore
2566    /// let large = PipelineBuilder::new().then(large_check, reg).then(submit, reg);
2567    /// let small = PipelineBuilder::new().then(submit, reg);
2568    ///
2569    /// PipelineBuilder::<Order>::new()
2570    ///     .then(validate, reg)
2571    ///     .route(|order: &Order| order.size > 1000, reg, large, small)
2572    ///     .build();
2573    /// ```
2574    pub fn route<NewOut, C0, C1, Params, Pred: IntoRefStep<Out, bool, Params>>(
2575        self,
2576        pred: Pred,
2577        registry: &Registry,
2578        on_true: PipelineChain<Out, NewOut, C0>,
2579        on_false: PipelineChain<Out, NewOut, C1>,
2580    ) -> PipelineChain<In, NewOut, RouteNode<Chain, Pred::Step, C0, C1>>
2581    where
2582        C0: ChainCall<Out, Out = NewOut>,
2583        C1: ChainCall<Out, Out = NewOut>,
2584    {
2585        PipelineChain {
2586            chain: RouteNode {
2587                prev: self.chain,
2588                pred: pred.into_ref_step(registry),
2589                on_true: on_true.chain,
2590                on_false: on_false.chain,
2591            },
2592            _marker: PhantomData,
2593        }
2594    }
2595
2596    /// Fork off a multi-step side-effect chain. The arm borrows
2597    /// `&Out`, runs to completion (producing `()`), and the
2598    /// original value passes through unchanged.
2599    ///
2600    /// Multi-step version of [`tap`](Self::tap) — the arm has the
2601    /// full DAG combinator API with Param resolution. Build with
2602    /// [`DagArmSeed::new()`](crate::dag::DagArmSeed::new).
2603    pub fn tee<C>(self, side: DagArm<Out, (), C>) -> PipelineChain<In, Out, TeeNode<Chain, C>>
2604    where
2605        C: for<'a> ChainCall<&'a Out, Out = ()>,
2606    {
2607        PipelineChain {
2608            chain: TeeNode {
2609                prev: self.chain,
2610                side: side.chain,
2611            },
2612            _marker: PhantomData,
2613        }
2614    }
2615
2616    /// Scan with persistent accumulator. The step receives `&mut Acc`
2617    /// and the current value, returning the new output. State persists
2618    /// across invocations.
2619    ///
2620    /// # Examples
2621    ///
2622    /// ```ignore
2623    /// // Running sum — suppress values below threshold
2624    /// PipelineBuilder::<u64>::new()
2625    ///     .then(identity, reg)
2626    ///     .scan(0u64, |acc: &mut u64, val: u64| {
2627    ///         *acc += val;
2628    ///         if *acc > 100 { Some(*acc) } else { None }
2629    ///     }, reg)
2630    ///     .build();
2631    /// ```
2632    pub fn scan<Acc, NewOut, Params, S>(
2633        self,
2634        initial: Acc,
2635        f: S,
2636        registry: &Registry,
2637    ) -> PipelineChain<In, NewOut, ScanNode<Chain, S::Step, Acc>>
2638    where
2639        Acc: 'static,
2640        S: IntoScanStep<Acc, Out, NewOut, Params>,
2641    {
2642        PipelineChain {
2643            chain: ScanNode {
2644                prev: self.chain,
2645                step: f.into_scan_step(registry),
2646                acc: initial,
2647            },
2648            _marker: PhantomData,
2649        }
2650    }
2651}
2652
2653// =============================================================================
2654// Splat — tuple destructuring into individual function arguments
2655// =============================================================================
2656//
2657// `.splat()` transitions from a tuple output to a builder whose `.then()`
2658// accepts a function taking the tuple elements as individual arguments.
2659// After `.splat().then(f, reg)`, the user is back on PipelineChain.
2660//
2661// Builder types are `#[doc(hidden)]` — users only see `.splat().then()`.
2662
2663// -- Splat builder types ------------------------------------------------------
2664
2665macro_rules! define_splat_builders {
2666    (
2667        $N:literal,
2668        start: $SplatStart:ident,
2669        mid: $SplatBuilder:ident,
2670        node: $SplatThenNode:ident,
2671        into_trait: $IntoSplatStep:ident,
2672        call_trait: $SplatCall:ident,
2673        ($($T:ident),+),
2674        ($($idx:tt),+)
2675    ) => {
2676        /// Chain node for `.splat().then()` — destructures tuple into individual arguments.
2677        #[doc(hidden)]
2678        pub struct $SplatThenNode<Prev, S> {
2679            prev: Prev,
2680            step: S,
2681        }
2682
2683        impl<In, $($T,)+ Prev, S> ChainCall<In> for $SplatThenNode<Prev, S>
2684        where
2685            Prev: ChainCall<In, Out = ($($T,)+)>,
2686            S: $SplatCall<$($T),+>,
2687        {
2688            type Out = S::Out;
2689            #[inline(always)]
2690            fn call(&mut self, world: &mut World, input: In) -> S::Out {
2691                let tuple = self.prev.call(world, input);
2692                self.step.call_splat(world, $(tuple.$idx),+)
2693            }
2694        }
2695
2696        /// Splat builder at pipeline start position.
2697        #[doc(hidden)]
2698        pub struct $SplatStart<$($T),+>(PhantomData<fn(($($T,)+))>);
2699
2700        impl<$($T),+> $SplatStart<$($T),+> {
2701            /// Add a step that receives the tuple elements as individual arguments.
2702            pub fn then<Out, Params, S>(
2703                self,
2704                f: S,
2705                registry: &Registry,
2706            ) -> PipelineChain<($($T,)+), Out, $SplatThenNode<IdentityNode, S::Step>>
2707            where
2708                S: $IntoSplatStep<$($T,)+ Out, Params>,
2709            {
2710                PipelineChain {
2711                    chain: $SplatThenNode {
2712                        prev: IdentityNode,
2713                        step: f.into_splat_step(registry),
2714                    },
2715                    _marker: PhantomData,
2716                }
2717            }
2718        }
2719
2720        impl<$($T),+> PipelineBuilder<($($T,)+)> {
2721            /// Destructure the tuple input into individual function arguments.
2722            pub fn splat(self) -> $SplatStart<$($T),+> {
2723                $SplatStart(PhantomData)
2724            }
2725        }
2726
2727        /// Splat builder at mid-chain position.
2728        #[doc(hidden)]
2729        pub struct $SplatBuilder<In, $($T,)+ Chain> {
2730            chain: Chain,
2731            _marker: PhantomData<fn(In) -> ($($T,)+)>,
2732        }
2733
2734        impl<In, $($T,)+ Chain: ChainCall<In, Out = ($($T,)+)>> $SplatBuilder<In, $($T,)+ Chain> {
2735            /// Add a step that receives the tuple elements as individual arguments.
2736            pub fn then<Out, Params, S>(
2737                self,
2738                f: S,
2739                registry: &Registry,
2740            ) -> PipelineChain<In, Out, $SplatThenNode<Chain, S::Step>>
2741            where
2742                S: $IntoSplatStep<$($T,)+ Out, Params>,
2743            {
2744                PipelineChain {
2745                    chain: $SplatThenNode {
2746                        prev: self.chain,
2747                        step: f.into_splat_step(registry),
2748                    },
2749                    _marker: PhantomData,
2750                }
2751            }
2752        }
2753
2754        impl<In, $($T,)+ Chain: ChainCall<In, Out = ($($T,)+)>> PipelineChain<In, ($($T,)+), Chain> {
2755            /// Destructure the tuple output into individual function arguments.
2756            pub fn splat(self) -> $SplatBuilder<In, $($T,)+ Chain> {
2757                $SplatBuilder {
2758                    chain: self.chain,
2759                    _marker: PhantomData,
2760                }
2761            }
2762        }
2763    };
2764}
2765
2766define_splat_builders!(2,
2767    start: SplatStart2,
2768    mid: SplatBuilder2,
2769    node: SplatThenNode2,
2770    into_trait: IntoSplatStep2,
2771    call_trait: SplatCall2,
2772    (A, B),
2773    (0, 1)
2774);
2775
2776define_splat_builders!(3,
2777    start: SplatStart3,
2778    mid: SplatBuilder3,
2779    node: SplatThenNode3,
2780    into_trait: IntoSplatStep3,
2781    call_trait: SplatCall3,
2782    (A, B, C),
2783    (0, 1, 2)
2784);
2785
2786define_splat_builders!(4,
2787    start: SplatStart4,
2788    mid: SplatBuilder4,
2789    node: SplatThenNode4,
2790    into_trait: IntoSplatStep4,
2791    call_trait: SplatCall4,
2792    (A, B, C, D),
2793    (0, 1, 2, 3)
2794);
2795
2796define_splat_builders!(5,
2797    start: SplatStart5,
2798    mid: SplatBuilder5,
2799    node: SplatThenNode5,
2800    into_trait: IntoSplatStep5,
2801    call_trait: SplatCall5,
2802    (A, B, C, D, E),
2803    (0, 1, 2, 3, 4)
2804);
2805
2806// =============================================================================
2807// Dedup — suppress unchanged values
2808// =============================================================================
2809
2810impl<In, Out: PartialEq + Clone, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
2811    /// Suppress consecutive unchanged values. Returns `Some(val)`
2812    /// when the value differs from the previous invocation, `None`
2813    /// when unchanged. First invocation always returns `Some`.
2814    ///
2815    /// Requires `PartialEq + Clone` — the previous value is stored
2816    /// internally for comparison.
2817    pub fn dedup(self) -> PipelineChain<In, Option<Out>, DedupNode<Chain, Out>> {
2818        PipelineChain {
2819            chain: DedupNode {
2820                prev: self.chain,
2821                last: None,
2822            },
2823            _marker: PhantomData,
2824        }
2825    }
2826}
2827
2828// =============================================================================
2829// Bool combinators
2830// =============================================================================
2831
2832impl<In, Chain: ChainCall<In, Out = bool>> PipelineChain<In, bool, Chain> {
2833    /// Invert a boolean value.
2834    #[allow(clippy::should_implement_trait)]
2835    pub fn not(self) -> PipelineChain<In, bool, NotNode<Chain>> {
2836        PipelineChain {
2837            chain: NotNode { prev: self.chain },
2838            _marker: PhantomData,
2839        }
2840    }
2841
2842    /// Short-circuit AND with a second boolean.
2843    ///
2844    /// If the chain produces `false`, the step is not called.
2845    pub fn and<Params, S: IntoProducer<bool, Params>>(
2846        self,
2847        f: S,
2848        registry: &Registry,
2849    ) -> PipelineChain<In, bool, AndBoolNode<Chain, S::Step>> {
2850        PipelineChain {
2851            chain: AndBoolNode {
2852                prev: self.chain,
2853                producer: f.into_producer(registry),
2854            },
2855            _marker: PhantomData,
2856        }
2857    }
2858
2859    /// Short-circuit OR with a second boolean.
2860    ///
2861    /// If the chain produces `true`, the step is not called.
2862    pub fn or<Params, S: IntoProducer<bool, Params>>(
2863        self,
2864        f: S,
2865        registry: &Registry,
2866    ) -> PipelineChain<In, bool, OrBoolNode<Chain, S::Step>> {
2867        PipelineChain {
2868            chain: OrBoolNode {
2869                prev: self.chain,
2870                producer: f.into_producer(registry),
2871            },
2872            _marker: PhantomData,
2873        }
2874    }
2875
2876    /// XOR with a second boolean.
2877    ///
2878    /// Both sides are always evaluated.
2879    pub fn xor<Params, S: IntoProducer<bool, Params>>(
2880        self,
2881        f: S,
2882        registry: &Registry,
2883    ) -> PipelineChain<In, bool, XorBoolNode<Chain, S::Step>> {
2884        PipelineChain {
2885            chain: XorBoolNode {
2886                prev: self.chain,
2887                producer: f.into_producer(registry),
2888            },
2889            _marker: PhantomData,
2890        }
2891    }
2892}
2893
2894// =============================================================================
2895// Clone helpers — &T → T transitions
2896// =============================================================================
2897
2898impl<'a, In, T: Clone, Chain: ChainCall<In, Out = &'a T>> PipelineChain<In, &'a T, Chain> {
2899    /// Clone a borrowed output to produce an owned value.
2900    ///
2901    /// Transitions the pipeline from `&T` to `T`. Uses UFCS
2902    /// (`T::clone(val)`) — `val.clone()` on a `&&T` resolves to
2903    /// `<&T as Clone>::clone` and returns `&T`, not `T`.
2904    pub fn cloned(self) -> PipelineChain<In, T, ClonedNode<Chain>> {
2905        PipelineChain {
2906            chain: ClonedNode { prev: self.chain },
2907            _marker: PhantomData,
2908        }
2909    }
2910}
2911
2912impl<'a, In, T: Clone, Chain: ChainCall<In, Out = Option<&'a T>>>
2913    PipelineChain<In, Option<&'a T>, Chain>
2914{
2915    /// Clone inner borrowed value. `Option<&T>` → `Option<T>`.
2916    pub fn cloned(self) -> PipelineChain<In, Option<T>, ClonedOptionNode<Chain>> {
2917        PipelineChain {
2918            chain: ClonedOptionNode { prev: self.chain },
2919            _marker: PhantomData,
2920        }
2921    }
2922}
2923
2924impl<'a, In, T: Clone, E, Chain: ChainCall<In, Out = Result<&'a T, E>>>
2925    PipelineChain<In, Result<&'a T, E>, Chain>
2926{
2927    /// Clone inner borrowed Ok value. `Result<&T, E>` → `Result<T, E>`.
2928    pub fn cloned(self) -> PipelineChain<In, Result<T, E>, ClonedResultNode<Chain>> {
2929        PipelineChain {
2930            chain: ClonedResultNode { prev: self.chain },
2931            _marker: PhantomData,
2932        }
2933    }
2934}
2935
2936// =============================================================================
2937// Option helpers — PipelineChain<In, Option<T>, Chain>
2938// =============================================================================
2939
2940impl<In, T, Chain: ChainCall<In, Out = Option<T>>> PipelineChain<In, Option<T>, Chain> {
2941    // -- IntoStep-based (hot path) -------------------------------------------
2942
2943    /// Transform the inner value. Step not called on None.
2944    pub fn map<U, Params, S: IntoStep<T, U, Params>>(
2945        self,
2946        f: S,
2947        registry: &Registry,
2948    ) -> PipelineChain<In, Option<U>, MapOptionNode<Chain, S::Step>> {
2949        PipelineChain {
2950            chain: MapOptionNode {
2951                prev: self.chain,
2952                step: f.into_step(registry),
2953            },
2954            _marker: PhantomData,
2955        }
2956    }
2957
2958    /// Short-circuits on None. std: `Option::and_then`
2959    pub fn and_then<U, Params, S: IntoStep<T, Option<U>, Params>>(
2960        self,
2961        f: S,
2962        registry: &Registry,
2963    ) -> PipelineChain<In, Option<U>, AndThenNode<Chain, S::Step>> {
2964        PipelineChain {
2965            chain: AndThenNode {
2966                prev: self.chain,
2967                step: f.into_step(registry),
2968            },
2969            _marker: PhantomData,
2970        }
2971    }
2972
2973    // -- Resolved (cold path, now with Param resolution) -----------------------
2974
2975    /// Side effect on None. Complement to [`inspect`](Self::inspect).
2976    pub fn on_none<Params, S: IntoProducer<(), Params>>(
2977        self,
2978        f: S,
2979        registry: &Registry,
2980    ) -> PipelineChain<In, Option<T>, OnNoneNode<Chain, S::Step>> {
2981        PipelineChain {
2982            chain: OnNoneNode {
2983                prev: self.chain,
2984                producer: f.into_producer(registry),
2985            },
2986            _marker: PhantomData,
2987        }
2988    }
2989
2990    /// Keep value if predicate holds. std: `Option::filter`
2991    ///
2992    /// # Common Mistakes
2993    ///
2994    /// Filter operates on `&T` inside the Option, not `T`:
2995    /// ```compile_fail
2996    /// # use nexus_rt::{PipelineBuilder, WorldBuilder};
2997    /// # let mut wb = WorldBuilder::new();
2998    /// # let world = wb.build();
2999    /// # let reg = world.registry();
3000    /// fn to_opt(x: u32) -> Option<u32> { Some(x) }
3001    /// // ERROR: takes u32, should be &u32
3002    /// PipelineBuilder::<u32>::new()
3003    ///     .then(to_opt, &reg)
3004    ///     .filter(|x: u32| x > 10, &reg);
3005    /// ```
3006    pub fn filter<Params, S: IntoRefStep<T, bool, Params>>(
3007        self,
3008        f: S,
3009        registry: &Registry,
3010    ) -> PipelineChain<In, Option<T>, FilterNode<Chain, S::Step>> {
3011        PipelineChain {
3012            chain: FilterNode {
3013                prev: self.chain,
3014                step: f.into_ref_step(registry),
3015            },
3016            _marker: PhantomData,
3017        }
3018    }
3019
3020    /// Side effect on Some value. std: `Option::inspect`
3021    ///
3022    /// Takes `&T`, not `T` — the value passes through.
3023    pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
3024        self,
3025        f: S,
3026        registry: &Registry,
3027    ) -> PipelineChain<In, Option<T>, InspectOptionNode<Chain, S::Step>> {
3028        PipelineChain {
3029            chain: InspectOptionNode {
3030                prev: self.chain,
3031                step: f.into_ref_step(registry),
3032            },
3033            _marker: PhantomData,
3034        }
3035    }
3036
3037    /// None becomes Err(err). std: `Option::ok_or`
3038    ///
3039    /// `Clone` required because the pipeline may run many times —
3040    /// the error value is cloned on each `None` invocation.
3041    pub fn ok_or<E: Clone>(self, err: E) -> PipelineChain<In, Result<T, E>, OkOrNode<Chain, E>> {
3042        PipelineChain {
3043            chain: OkOrNode {
3044                prev: self.chain,
3045                err,
3046            },
3047            _marker: PhantomData,
3048        }
3049    }
3050
3051    /// None becomes Err(f()). std: `Option::ok_or_else`
3052    pub fn ok_or_else<E, Params, S: IntoProducer<E, Params>>(
3053        self,
3054        f: S,
3055        registry: &Registry,
3056    ) -> PipelineChain<In, Result<T, E>, OkOrElseNode<Chain, S::Step>> {
3057        PipelineChain {
3058            chain: OkOrElseNode {
3059                prev: self.chain,
3060                producer: f.into_producer(registry),
3061            },
3062            _marker: PhantomData,
3063        }
3064    }
3065
3066    /// Exit Option — None becomes the default value.
3067    ///
3068    /// `Clone` required because the pipeline may run many times —
3069    /// the default is cloned on each `None` invocation (unlike
3070    /// std's `unwrap_or` which consumes the value once).
3071    pub fn unwrap_or(self, default: T) -> PipelineChain<In, T, UnwrapOrOptionNode<Chain, T>>
3072    where
3073        T: Clone,
3074    {
3075        PipelineChain {
3076            chain: UnwrapOrOptionNode {
3077                prev: self.chain,
3078                default,
3079            },
3080            _marker: PhantomData,
3081        }
3082    }
3083
3084    /// Exit Option — None becomes `f()`.
3085    pub fn unwrap_or_else<Params, S: IntoProducer<T, Params>>(
3086        self,
3087        f: S,
3088        registry: &Registry,
3089    ) -> PipelineChain<In, T, UnwrapOrElseOptionNode<Chain, S::Step>> {
3090        PipelineChain {
3091            chain: UnwrapOrElseOptionNode {
3092                prev: self.chain,
3093                producer: f.into_producer(registry),
3094            },
3095            _marker: PhantomData,
3096        }
3097    }
3098}
3099
3100// =============================================================================
3101// Result helpers — PipelineChain<In, Result<T, E>, Chain>
3102// =============================================================================
3103
3104impl<In, T, E, Chain: ChainCall<In, Out = Result<T, E>>> PipelineChain<In, Result<T, E>, Chain> {
3105    // -- IntoStep-based (hot path) -------------------------------------------
3106
3107    /// Transform the Ok value. Step not called on Err.
3108    pub fn map<U, Params, S: IntoStep<T, U, Params>>(
3109        self,
3110        f: S,
3111        registry: &Registry,
3112    ) -> PipelineChain<In, Result<U, E>, MapResultNode<Chain, S::Step>> {
3113        PipelineChain {
3114            chain: MapResultNode {
3115                prev: self.chain,
3116                step: f.into_step(registry),
3117            },
3118            _marker: PhantomData,
3119        }
3120    }
3121
3122    /// Short-circuits on Err. std: `Result::and_then`
3123    pub fn and_then<U, Params, S: IntoStep<T, Result<U, E>, Params>>(
3124        self,
3125        f: S,
3126        registry: &Registry,
3127    ) -> PipelineChain<In, Result<U, E>, AndThenResultNode<Chain, S::Step>> {
3128        PipelineChain {
3129            chain: AndThenResultNode {
3130                prev: self.chain,
3131                step: f.into_step(registry),
3132            },
3133            _marker: PhantomData,
3134        }
3135    }
3136
3137    /// Handle error and transition to Option.
3138    ///
3139    /// `Ok(val)` becomes `Some(val)` — handler not called.
3140    /// `Err(err)` calls the handler, then produces `None`.
3141    pub fn catch<Params, S: IntoStep<E, (), Params>>(
3142        self,
3143        f: S,
3144        registry: &Registry,
3145    ) -> PipelineChain<In, Option<T>, CatchNode<Chain, S::Step>> {
3146        PipelineChain {
3147            chain: CatchNode {
3148                prev: self.chain,
3149                step: f.into_step(registry),
3150            },
3151            _marker: PhantomData,
3152        }
3153    }
3154
3155    // -- Resolved (cold path, now with Param resolution) -----------------------
3156
3157    /// Transform the error. std: `Result::map_err`
3158    pub fn map_err<E2, Params, S: IntoStep<E, E2, Params>>(
3159        self,
3160        f: S,
3161        registry: &Registry,
3162    ) -> PipelineChain<In, Result<T, E2>, MapErrNode<Chain, S::Step>> {
3163        PipelineChain {
3164            chain: MapErrNode {
3165                prev: self.chain,
3166                step: f.into_step(registry),
3167            },
3168            _marker: PhantomData,
3169        }
3170    }
3171
3172    /// Recover from Err. std: `Result::or_else`
3173    pub fn or_else<E2, Params, S: IntoStep<E, Result<T, E2>, Params>>(
3174        self,
3175        f: S,
3176        registry: &Registry,
3177    ) -> PipelineChain<In, Result<T, E2>, OrElseNode<Chain, S::Step>> {
3178        PipelineChain {
3179            chain: OrElseNode {
3180                prev: self.chain,
3181                step: f.into_step(registry),
3182            },
3183            _marker: PhantomData,
3184        }
3185    }
3186
3187    /// Side effect on Ok. std: `Result::inspect`
3188    ///
3189    /// Takes `&T`, not `T` — the value passes through.
3190    pub fn inspect<Params, S: IntoRefStep<T, (), Params>>(
3191        self,
3192        f: S,
3193        registry: &Registry,
3194    ) -> PipelineChain<In, Result<T, E>, InspectResultNode<Chain, S::Step>> {
3195        PipelineChain {
3196            chain: InspectResultNode {
3197                prev: self.chain,
3198                step: f.into_ref_step(registry),
3199            },
3200            _marker: PhantomData,
3201        }
3202    }
3203
3204    /// Side effect on Err. std: `Result::inspect_err`
3205    ///
3206    /// Takes `&E`, not `E` — the error passes through.
3207    pub fn inspect_err<Params, S: IntoRefStep<E, (), Params>>(
3208        self,
3209        f: S,
3210        registry: &Registry,
3211    ) -> PipelineChain<In, Result<T, E>, InspectErrNode<Chain, S::Step>> {
3212        PipelineChain {
3213            chain: InspectErrNode {
3214                prev: self.chain,
3215                step: f.into_ref_step(registry),
3216            },
3217            _marker: PhantomData,
3218        }
3219    }
3220
3221    /// Discard error, enter Option land. std: `Result::ok`
3222    pub fn ok(self) -> PipelineChain<In, Option<T>, OkResultNode<Chain>> {
3223        PipelineChain {
3224            chain: OkResultNode { prev: self.chain },
3225            _marker: PhantomData,
3226        }
3227    }
3228
3229    /// Exit Result — Err becomes the default value.
3230    ///
3231    /// `Clone` required because the pipeline may run many times —
3232    /// the default is cloned on each `Err` invocation (unlike
3233    /// std's `unwrap_or` which consumes the value once).
3234    pub fn unwrap_or(self, default: T) -> PipelineChain<In, T, UnwrapOrResultNode<Chain, T>>
3235    where
3236        T: Clone,
3237    {
3238        PipelineChain {
3239            chain: UnwrapOrResultNode {
3240                prev: self.chain,
3241                default,
3242            },
3243            _marker: PhantomData,
3244        }
3245    }
3246
3247    /// Exit Result — Err becomes `f(err)`.
3248    pub fn unwrap_or_else<Params, S: IntoStep<E, T, Params>>(
3249        self,
3250        f: S,
3251        registry: &Registry,
3252    ) -> PipelineChain<In, T, UnwrapOrElseResultNode<Chain, S::Step>> {
3253        PipelineChain {
3254            chain: UnwrapOrElseResultNode {
3255                prev: self.chain,
3256                step: f.into_step(registry),
3257            },
3258            _marker: PhantomData,
3259        }
3260    }
3261}
3262
3263// =============================================================================
3264// PipelineOutput — marker trait for build()
3265// =============================================================================
3266
3267mod pipeline_output_seal {
3268    pub trait Sealed {}
3269    impl Sealed for () {}
3270    impl Sealed for Option<()> {}
3271}
3272
3273/// Sealed marker trait for valid pipeline terminal types.
3274///
3275/// Only `()` and `Option<()>` satisfy this. A pipeline can only
3276/// `.build()` when its output is one of these types — add a final
3277/// `.then()` or `.dispatch()` that consumes the output.
3278#[diagnostic::on_unimplemented(
3279    message = "`build()` requires the pipeline output to be `()` or `Option<()>`",
3280    label = "this pipeline produces `{Self}`, not `()` or `Option<()>`",
3281    note = "add a final `.then()` or `.dispatch()` that consumes the output"
3282)]
3283pub trait PipelineOutput: pipeline_output_seal::Sealed {}
3284impl PipelineOutput for () {}
3285impl PipelineOutput for Option<()> {}
3286
3287// =============================================================================
3288// build — when Out: PipelineOutput (() or Option<()>)
3289// =============================================================================
3290
3291impl<In, Chain: ChainCall<In, Out = ()>> PipelineChain<In, (), Chain> {
3292    /// Finalize the pipeline into a [`Pipeline`].
3293    ///
3294    /// The returned pipeline is a concrete, monomorphized type — no boxing,
3295    /// no virtual dispatch. Call `.run()` directly for zero-cost execution,
3296    /// or wrap in `Box<dyn Handler<In>>` when type erasure is needed.
3297    ///
3298    /// Only available when the pipeline ends with `()` or `Option<()>`.
3299    /// If your chain produces a value, add a final `.then()` that consumes
3300    /// the output.
3301    #[must_use = "building a pipeline without storing it does nothing"]
3302    pub fn build(self) -> Pipeline<Chain> {
3303        Pipeline { chain: self.chain }
3304    }
3305}
3306
3307impl<In, Chain: ChainCall<In, Out = Option<()>>> PipelineChain<In, Option<()>, Chain> {
3308    /// Finalize the pipeline into a [`Pipeline`], discarding the `Option<()>`.
3309    ///
3310    /// Pipelines ending with `Option<()>` (e.g. after `.map()` on an
3311    /// `Option<T>` with a step that returns `()`) produce the same
3312    /// [`Pipeline`] as those ending with `()`.
3313    #[must_use = "building a pipeline without storing it does nothing"]
3314    pub fn build(self) -> Pipeline<DiscardOptionNode<Chain>> {
3315        Pipeline {
3316            chain: DiscardOptionNode { prev: self.chain },
3317        }
3318    }
3319}
3320
3321// =============================================================================
3322// build_batch — when Out: PipelineOutput (() or Option<()>)
3323// =============================================================================
3324
3325impl<In, Out: PipelineOutput, Chain: ChainCall<In, Out = Out>> PipelineChain<In, Out, Chain> {
3326    /// Finalize into a [`BatchPipeline`] with a pre-allocated input buffer.
3327    ///
3328    /// Same pipeline chain as [`build`](PipelineChain::build), but the
3329    /// pipeline owns an input buffer that drivers fill between dispatch
3330    /// cycles. Each call to [`BatchPipeline::run`] drains the buffer,
3331    /// running every item through the chain independently.
3332    ///
3333    /// Available when the pipeline ends with `()` or `Option<()>` (e.g.
3334    /// after `.catch()` or `.filter()`). Pipelines producing values need
3335    /// a final `.then()` that consumes the output.
3336    ///
3337    /// `capacity` is the initial allocation — the buffer can grow if needed,
3338    /// but sizing it for the expected batch size avoids reallocation.
3339    #[must_use = "building a pipeline without storing it does nothing"]
3340    pub fn build_batch(self, capacity: usize) -> BatchPipeline<In, Chain> {
3341        BatchPipeline {
3342            input: Vec::with_capacity(capacity),
3343            chain: self.chain,
3344        }
3345    }
3346}
3347
3348// =============================================================================
3349// Pipeline<F> — built pipeline
3350// =============================================================================
3351
3352/// Built step pipeline implementing [`Handler<E>`](crate::Handler).
3353///
3354/// Created by [`PipelineChain::build`]. The entire pipeline chain is
3355/// monomorphized at compile time — no boxing, no virtual dispatch.
3356/// Call `.run()` directly for zero-cost execution, or wrap in
3357/// `Box<dyn Handler<E>>` when you need type erasure (single box).
3358///
3359/// Implements [`Handler<E>`](crate::Handler) for any event type `E`
3360/// that the chain accepts — including borrowed types like `&'a [u8]`.
3361/// Supports `for<'a> Handler<&'a T>` for zero-copy event dispatch.
3362pub struct Pipeline<F> {
3363    chain: F,
3364}
3365
3366impl<E, F: ChainCall<E, Out = ()> + Send> crate::Handler<E> for Pipeline<F> {
3367    fn run(&mut self, world: &mut World, event: E) {
3368        self.chain.call(world, event);
3369    }
3370}
3371
3372// =============================================================================
3373// BatchPipeline<In, F> — pipeline with owned input buffer
3374// =============================================================================
3375
3376/// Batch pipeline that owns a pre-allocated input buffer.
3377///
3378/// Created by [`PipelineChain::build_batch`]. Each item flows through
3379/// the full pipeline chain independently — the same per-item `Option`
3380/// and `Result` flow control as [`Pipeline`]. Errors are handled inline
3381/// (via `.catch()`, `.unwrap_or()`, etc.) and the batch continues to
3382/// the next item. No intermediate buffers between steps.
3383///
3384/// # Examples
3385///
3386/// ```
3387/// use nexus_rt::{WorldBuilder, ResMut, PipelineBuilder, Resource};
3388///
3389/// #[derive(Resource)]
3390/// struct Accum(u64);
3391///
3392/// let mut wb = WorldBuilder::new();
3393/// wb.register(Accum(0));
3394/// let mut world = wb.build();
3395///
3396/// fn accumulate(mut sum: ResMut<Accum>, x: u32) {
3397///     sum.0 += x as u64;
3398/// }
3399///
3400/// let r = world.registry();
3401/// let mut batch = PipelineBuilder::<u32>::new()
3402///     .then(accumulate, r)
3403///     .build_batch(64);
3404///
3405/// batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
3406/// batch.run(&mut world);
3407///
3408/// assert_eq!(world.resource::<Accum>().0, 15);
3409/// assert!(batch.input().is_empty());
3410/// ```
3411pub struct BatchPipeline<In, F> {
3412    input: Vec<In>,
3413    chain: F,
3414}
3415
3416impl<In, Out: PipelineOutput, F: ChainCall<In, Out = Out>> BatchPipeline<In, F> {
3417    /// Mutable access to the input buffer. Drivers fill this between
3418    /// dispatch cycles.
3419    pub fn input_mut(&mut self) -> &mut Vec<In> {
3420        &mut self.input
3421    }
3422
3423    /// Read-only access to the input buffer.
3424    pub fn input(&self) -> &[In] {
3425        &self.input
3426    }
3427
3428    /// Drain the input buffer, running each item through the pipeline.
3429    ///
3430    /// Each item gets independent `Option`/`Result` flow control — an
3431    /// error on one item does not affect subsequent items. After `run()`,
3432    /// the input buffer is empty but retains its allocation.
3433    pub fn run(&mut self, world: &mut World) {
3434        for item in self.input.drain(..) {
3435            let _ = self.chain.call(world, item);
3436        }
3437    }
3438}
3439
3440// =============================================================================
3441// resolve_step — pre-resolve a step for manual dispatch (owned input)
3442// =============================================================================
3443
3444/// Resolve a step for use in manual dispatch (e.g. inside an
3445/// [`Opaque`] closure passed to `.then()`).
3446///
3447/// Returns a closure with pre-resolved [`Param`] state — the same
3448/// build-time resolution that `.then()` performs, but as a standalone
3449/// value the caller can invoke from any context.
3450///
3451/// This is the pipeline (owned-input) counterpart of
3452/// [`dag::resolve_arm`](crate::dag::resolve_arm) (reference-input).
3453///
3454/// # Examples
3455///
3456/// ```ignore
3457/// let mut arm0 = resolve_step(handle_new, &reg);
3458/// let mut arm1 = resolve_step(handle_cancel, &reg);
3459///
3460/// pipeline.then(move |world: &mut World, order: Order| match order.kind {
3461///     OrderKind::New    => arm0(world, order),
3462///     OrderKind::Cancel => arm1(world, order),
3463/// }, &reg)
3464/// ```
3465pub fn resolve_step<In, Out, Params, S>(
3466    f: S,
3467    registry: &Registry,
3468) -> impl FnMut(&mut World, In) -> Out + use<In, Out, Params, S>
3469where
3470    In: 'static,
3471    Out: 'static,
3472    S: IntoStep<In, Out, Params>,
3473{
3474    let mut resolved = f.into_step(registry);
3475    move |world: &mut World, input: In| resolved.call(world, input)
3476}
3477
3478// =============================================================================
3479// Tests
3480// =============================================================================
3481
3482#[cfg(test)]
3483mod tests {
3484    use super::*;
3485    use crate::{Handler, IntoHandler, Local, Res, ResMut, WorldBuilder, fan_out};
3486
3487    // =========================================================================
3488    // Core dispatch
3489    // =========================================================================
3490
3491    #[test]
3492    fn step_pure_transform() {
3493        let mut world = WorldBuilder::new().build();
3494        let r = world.registry_mut();
3495        let mut p = PipelineBuilder::<u32>::new().then(|x: u32| x as u64 * 2, r);
3496        assert_eq!(p.run(&mut world, 5), 10u64);
3497    }
3498
3499    #[test]
3500    fn step_one_res() {
3501        let mut wb = WorldBuilder::new();
3502        wb.register::<u64>(10);
3503        let mut world = wb.build();
3504
3505        fn multiply(factor: Res<u64>, x: u32) -> u64 {
3506            *factor * x as u64
3507        }
3508
3509        let r = world.registry_mut();
3510        let mut p = PipelineBuilder::<u32>::new().then(multiply, r);
3511        assert_eq!(p.run(&mut world, 5), 50);
3512    }
3513
3514    #[test]
3515    fn step_one_res_mut() {
3516        let mut wb = WorldBuilder::new();
3517        wb.register::<u64>(0);
3518        let mut world = wb.build();
3519
3520        fn accumulate(mut total: ResMut<u64>, x: u32) {
3521            *total += x as u64;
3522        }
3523
3524        let r = world.registry_mut();
3525        let mut p = PipelineBuilder::<u32>::new().then(accumulate, r);
3526        p.run(&mut world, 10);
3527        p.run(&mut world, 5);
3528        assert_eq!(*world.resource::<u64>(), 15);
3529    }
3530
3531    #[test]
3532    fn step_two_params() {
3533        let mut wb = WorldBuilder::new();
3534        wb.register::<u64>(10);
3535        wb.register::<bool>(true);
3536        let mut world = wb.build();
3537
3538        fn conditional(factor: Res<u64>, flag: Res<bool>, x: u32) -> u64 {
3539            if *flag { *factor * x as u64 } else { 0 }
3540        }
3541
3542        let r = world.registry_mut();
3543        let mut p = PipelineBuilder::<u32>::new().then(conditional, r);
3544        assert_eq!(p.run(&mut world, 5), 50);
3545    }
3546
3547    #[test]
3548    fn step_chain_two() {
3549        let mut wb = WorldBuilder::new();
3550        wb.register::<u64>(2);
3551        let mut world = wb.build();
3552
3553        fn double(factor: Res<u64>, x: u32) -> u64 {
3554            *factor * x as u64
3555        }
3556
3557        let r = world.registry_mut();
3558        let mut p = PipelineBuilder::<u32>::new()
3559            .then(double, r)
3560            .then(|val: u64| val + 1, r);
3561        assert_eq!(p.run(&mut world, 5), 11); // 2*5 + 1
3562    }
3563
3564    // =========================================================================
3565    // Option combinators
3566    // =========================================================================
3567
3568    #[test]
3569    fn option_map_on_some() {
3570        let mut wb = WorldBuilder::new();
3571        wb.register::<u64>(10);
3572        let mut world = wb.build();
3573
3574        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
3575            *factor + x as u64
3576        }
3577
3578        let r = world.registry_mut();
3579        let mut p = PipelineBuilder::<u32>::new()
3580            .then(|x: u32| -> Option<u32> { Some(x) }, r)
3581            .map(add_factor, r);
3582        assert_eq!(p.run(&mut world, 5), Some(15));
3583    }
3584
3585    #[test]
3586    fn option_map_skips_none() {
3587        let mut wb = WorldBuilder::new();
3588        wb.register::<bool>(false);
3589        let mut world = wb.build();
3590
3591        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
3592            *flag = true;
3593            0
3594        }
3595
3596        let r = world.registry_mut();
3597        let mut p = PipelineBuilder::<u32>::new()
3598            .then(|_x: u32| -> Option<u32> { None }, r)
3599            .map(mark, r);
3600        assert_eq!(p.run(&mut world, 5), None);
3601        assert!(!*world.resource::<bool>());
3602    }
3603
3604    #[test]
3605    fn option_and_then_chains() {
3606        let mut wb = WorldBuilder::new();
3607        wb.register::<u64>(10);
3608        let mut world = wb.build();
3609
3610        fn check(min: Res<u64>, x: u32) -> Option<u64> {
3611            let val = x as u64;
3612            if val > *min { Some(val) } else { None }
3613        }
3614
3615        let r = world.registry_mut();
3616        let mut p = PipelineBuilder::<u32>::new()
3617            .then(|x: u32| Some(x), r)
3618            .and_then(check, r);
3619        assert_eq!(p.run(&mut world, 20), Some(20));
3620    }
3621
3622    #[test]
3623    fn option_and_then_short_circuits() {
3624        let mut wb = WorldBuilder::new();
3625        wb.register::<u64>(10);
3626        let mut world = wb.build();
3627
3628        fn check(min: Res<u64>, x: u32) -> Option<u64> {
3629            let val = x as u64;
3630            if val > *min { Some(val) } else { None }
3631        }
3632
3633        let r = world.registry_mut();
3634        let mut p = PipelineBuilder::<u32>::new()
3635            .then(|x: u32| Some(x), r)
3636            .and_then(check, r);
3637        assert_eq!(p.run(&mut world, 5), None);
3638    }
3639
3640    #[test]
3641    fn option_on_none_fires() {
3642        let mut wb = WorldBuilder::new();
3643        wb.register::<bool>(false);
3644        let mut world = wb.build();
3645
3646        let r = world.registry();
3647        let mut p = PipelineBuilder::<u32>::new()
3648            .then(|_x: u32| -> Option<u32> { None }, r)
3649            .on_none(
3650                |w: &mut World| {
3651                    *w.resource_mut::<bool>() = true;
3652                },
3653                r,
3654            );
3655        p.run(&mut world, 0);
3656        assert!(*world.resource::<bool>());
3657    }
3658
3659    #[test]
3660    fn option_filter_keeps() {
3661        let mut world = WorldBuilder::new().build();
3662        let r = world.registry_mut();
3663        let mut p = PipelineBuilder::<u32>::new()
3664            .then(|x: u32| Some(x), r)
3665            .filter(|x: &u32| *x > 3, r);
3666        assert_eq!(p.run(&mut world, 5), Some(5));
3667    }
3668
3669    #[test]
3670    fn option_filter_drops() {
3671        let mut world = WorldBuilder::new().build();
3672        let r = world.registry_mut();
3673        let mut p = PipelineBuilder::<u32>::new()
3674            .then(|x: u32| Some(x), r)
3675            .filter(|x: &u32| *x > 10, r);
3676        assert_eq!(p.run(&mut world, 5), None);
3677    }
3678
3679    // =========================================================================
3680    // Result combinators
3681    // =========================================================================
3682
3683    #[test]
3684    fn result_map_on_ok() {
3685        let mut wb = WorldBuilder::new();
3686        wb.register::<u64>(10);
3687        let mut world = wb.build();
3688
3689        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
3690            *factor + x as u64
3691        }
3692
3693        let r = world.registry_mut();
3694        let mut p = PipelineBuilder::<u32>::new()
3695            .then(|x: u32| -> Result<u32, String> { Ok(x) }, r)
3696            .map(add_factor, r);
3697        assert_eq!(p.run(&mut world, 5), Ok(15));
3698    }
3699
3700    #[test]
3701    fn result_map_skips_err() {
3702        let mut wb = WorldBuilder::new();
3703        wb.register::<bool>(false);
3704        let mut world = wb.build();
3705
3706        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
3707            *flag = true;
3708            0
3709        }
3710
3711        let r = world.registry_mut();
3712        let mut p = PipelineBuilder::<u32>::new()
3713            .then(|_x: u32| -> Result<u32, String> { Err("fail".into()) }, r)
3714            .map(mark, r);
3715        assert!(p.run(&mut world, 5).is_err());
3716        assert!(!*world.resource::<bool>());
3717    }
3718
3719    #[test]
3720    fn result_catch_handles_error() {
3721        let mut wb = WorldBuilder::new();
3722        wb.register::<String>(String::new());
3723        let mut world = wb.build();
3724
3725        fn log_error(mut log: ResMut<String>, err: String) {
3726            *log = err;
3727        }
3728
3729        let r = world.registry_mut();
3730        let mut p = PipelineBuilder::<u32>::new()
3731            .then(|_x: u32| -> Result<u32, String> { Err("caught".into()) }, r)
3732            .catch(log_error, r);
3733        assert_eq!(p.run(&mut world, 0), None);
3734        assert_eq!(world.resource::<String>().as_str(), "caught");
3735    }
3736
3737    #[test]
3738    fn result_catch_passes_ok() {
3739        let mut wb = WorldBuilder::new();
3740        wb.register::<String>(String::new());
3741        let mut world = wb.build();
3742
3743        fn log_error(mut log: ResMut<String>, err: String) {
3744            *log = err;
3745        }
3746
3747        let r = world.registry_mut();
3748        let mut p = PipelineBuilder::<u32>::new()
3749            .then(|x: u32| -> Result<u32, String> { Ok(x) }, r)
3750            .catch(log_error, r);
3751        assert_eq!(p.run(&mut world, 5), Some(5));
3752        assert!(world.resource::<String>().is_empty());
3753    }
3754
3755    // =========================================================================
3756    // Build + Handler
3757    // =========================================================================
3758
3759    #[test]
3760    fn build_produces_handler() {
3761        let mut wb = WorldBuilder::new();
3762        wb.register::<u64>(0);
3763        let mut world = wb.build();
3764
3765        fn accumulate(mut total: ResMut<u64>, x: u32) {
3766            *total += x as u64;
3767        }
3768
3769        let r = world.registry_mut();
3770        let mut pipeline = PipelineBuilder::<u32>::new().then(accumulate, r).build();
3771
3772        pipeline.run(&mut world, 10);
3773        pipeline.run(&mut world, 5);
3774        assert_eq!(*world.resource::<u64>(), 15);
3775    }
3776
3777    #[test]
3778    fn run_returns_output() {
3779        let mut wb = WorldBuilder::new();
3780        wb.register::<u64>(3);
3781        let mut world = wb.build();
3782
3783        fn multiply(factor: Res<u64>, x: u32) -> u64 {
3784            *factor * x as u64
3785        }
3786
3787        let r = world.registry_mut();
3788        let mut p = PipelineBuilder::<u32>::new().then(multiply, r);
3789        let result: u64 = p.run(&mut world, 7);
3790        assert_eq!(result, 21);
3791    }
3792
3793    // =========================================================================
3794    // Safety
3795    // =========================================================================
3796
3797    #[test]
3798    #[should_panic(expected = "not registered")]
3799    fn panics_on_missing_resource() {
3800        let mut world = WorldBuilder::new().build();
3801
3802        fn needs_u64(_val: Res<u64>, _x: u32) -> u32 {
3803            0
3804        }
3805
3806        let r = world.registry_mut();
3807        let _p = PipelineBuilder::<u32>::new().then(needs_u64, r);
3808    }
3809
3810    // =========================================================================
3811    // Access conflict detection
3812    // =========================================================================
3813
3814    #[test]
3815    #[should_panic(expected = "conflicting access")]
3816    fn step_duplicate_access_panics() {
3817        let mut wb = WorldBuilder::new();
3818        wb.register::<u64>(0);
3819        let mut world = wb.build();
3820
3821        fn bad(a: Res<u64>, b: ResMut<u64>, _x: u32) -> u32 {
3822            let _ = (*a, &*b);
3823            0
3824        }
3825
3826        let r = world.registry_mut();
3827        let _p = PipelineBuilder::<u32>::new().then(bad, r);
3828    }
3829
3830    // =========================================================================
3831    // Integration
3832    // =========================================================================
3833
3834    #[test]
3835    fn local_in_step() {
3836        let mut wb = WorldBuilder::new();
3837        wb.register::<u64>(0);
3838        let mut world = wb.build();
3839
3840        fn count(mut count: Local<u64>, mut total: ResMut<u64>, _x: u32) {
3841            *count += 1;
3842            *total = *count;
3843        }
3844
3845        let r = world.registry_mut();
3846        let mut p = PipelineBuilder::<u32>::new().then(count, r);
3847        p.run(&mut world, 0);
3848        p.run(&mut world, 0);
3849        p.run(&mut world, 0);
3850        assert_eq!(*world.resource::<u64>(), 3);
3851    }
3852
3853    // =========================================================================
3854    // Option combinators (extended)
3855    // =========================================================================
3856
3857    #[test]
3858    fn option_unwrap_or_some() {
3859        let mut world = WorldBuilder::new().build();
3860        let r = world.registry_mut();
3861        let mut p = PipelineBuilder::<u32>::new()
3862            .then(|x: u32| -> Option<u32> { Some(x) }, r)
3863            .unwrap_or(99);
3864        assert_eq!(p.run(&mut world, 5), 5);
3865    }
3866
3867    #[test]
3868    fn option_unwrap_or_none() {
3869        let mut world = WorldBuilder::new().build();
3870        let r = world.registry_mut();
3871        let mut p = PipelineBuilder::<u32>::new()
3872            .then(|_x: u32| -> Option<u32> { None }, r)
3873            .unwrap_or(99);
3874        assert_eq!(p.run(&mut world, 5), 99);
3875    }
3876
3877    #[test]
3878    fn option_unwrap_or_else() {
3879        let mut world = WorldBuilder::new().build();
3880        let r = world.registry_mut();
3881        let mut p = PipelineBuilder::<u32>::new()
3882            .then(|_x: u32| -> Option<u32> { None }, r)
3883            .unwrap_or_else(|| 42, r);
3884        assert_eq!(p.run(&mut world, 0), 42);
3885    }
3886
3887    #[test]
3888    fn option_ok_or() {
3889        let mut world = WorldBuilder::new().build();
3890        let r = world.registry_mut();
3891        let mut p = PipelineBuilder::<u32>::new()
3892            .then(|_x: u32| -> Option<u32> { None }, r)
3893            .ok_or("missing");
3894        assert_eq!(p.run(&mut world, 0), Err("missing"));
3895    }
3896
3897    #[test]
3898    fn option_ok_or_some() {
3899        let mut world = WorldBuilder::new().build();
3900        let r = world.registry_mut();
3901        let mut p = PipelineBuilder::<u32>::new()
3902            .then(|x: u32| -> Option<u32> { Some(x) }, r)
3903            .ok_or("missing");
3904        assert_eq!(p.run(&mut world, 7), Ok(7));
3905    }
3906
3907    #[test]
3908    fn option_ok_or_else() {
3909        let mut world = WorldBuilder::new().build();
3910        let r = world.registry_mut();
3911        let mut p = PipelineBuilder::<u32>::new()
3912            .then(|_x: u32| -> Option<u32> { None }, r)
3913            .ok_or_else(|| "computed", r);
3914        assert_eq!(p.run(&mut world, 0), Err("computed"));
3915    }
3916
3917    #[test]
3918    fn option_inspect_passes_through() {
3919        let mut wb = WorldBuilder::new();
3920        wb.register::<u64>(0);
3921        let mut world = wb.build();
3922        let r = world.registry_mut();
3923        let mut p = PipelineBuilder::<u32>::new()
3924            .then(|x: u32| -> Option<u32> { Some(x) }, r)
3925            .inspect(|_val: &u32| {}, r);
3926        // inspect should pass through the value unchanged.
3927        assert_eq!(p.run(&mut world, 10), Some(10));
3928    }
3929
3930    // =========================================================================
3931    // Result combinators (extended)
3932    // =========================================================================
3933
3934    #[test]
3935    fn result_map_err() {
3936        let mut world = WorldBuilder::new().build();
3937        let r = world.registry_mut();
3938        let mut p = PipelineBuilder::<u32>::new()
3939            .then(|_x: u32| -> Result<u32, i32> { Err(-1) }, r)
3940            .map_err(|e: i32| e.to_string(), r);
3941        assert_eq!(p.run(&mut world, 0), Err("-1".to_string()));
3942    }
3943
3944    #[test]
3945    fn result_map_err_ok_passthrough() {
3946        let mut world = WorldBuilder::new().build();
3947        let r = world.registry_mut();
3948        let mut p = PipelineBuilder::<u32>::new()
3949            .then(|x: u32| -> Result<u32, i32> { Ok(x) }, r)
3950            .map_err(|e: i32| e.to_string(), r);
3951        assert_eq!(p.run(&mut world, 5), Ok(5));
3952    }
3953
3954    #[test]
3955    fn result_or_else() {
3956        let mut world = WorldBuilder::new().build();
3957        let r = world.registry_mut();
3958        let mut p = PipelineBuilder::<u32>::new()
3959            .then(|_x: u32| -> Result<u32, &str> { Err("fail") }, r)
3960            .or_else(|_e: &str| Ok::<u32, &str>(42), r);
3961        assert_eq!(p.run(&mut world, 0), Ok(42));
3962    }
3963
3964    #[test]
3965    fn result_inspect_passes_through() {
3966        let mut world = WorldBuilder::new().build();
3967        let r = world.registry_mut();
3968        let mut p = PipelineBuilder::<u32>::new()
3969            .then(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
3970            .inspect(|_val: &u32| {}, r);
3971        // inspect should pass through Ok unchanged.
3972        assert_eq!(p.run(&mut world, 7), Ok(7));
3973    }
3974
3975    #[test]
3976    fn result_inspect_err_passes_through() {
3977        let mut world = WorldBuilder::new().build();
3978        let r = world.registry_mut();
3979        let mut p = PipelineBuilder::<u32>::new()
3980            .then(|_x: u32| -> Result<u32, &str> { Err("bad") }, r)
3981            .inspect_err(|_e: &&str| {}, r);
3982        // inspect_err should pass through Err unchanged.
3983        assert_eq!(p.run(&mut world, 0), Err("bad"));
3984    }
3985
3986    #[test]
3987    fn result_ok_converts() {
3988        let mut world = WorldBuilder::new().build();
3989        let r = world.registry_mut();
3990        let mut p = PipelineBuilder::<u32>::new()
3991            .then(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
3992            .ok();
3993        assert_eq!(p.run(&mut world, 5), Some(5));
3994    }
3995
3996    #[test]
3997    fn result_ok_drops_err() {
3998        let mut world = WorldBuilder::new().build();
3999        let r = world.registry_mut();
4000        let mut p = PipelineBuilder::<u32>::new()
4001            .then(|_x: u32| -> Result<u32, &str> { Err("gone") }, r)
4002            .ok();
4003        assert_eq!(p.run(&mut world, 0), None);
4004    }
4005
4006    #[test]
4007    fn result_unwrap_or() {
4008        let mut world = WorldBuilder::new().build();
4009        let r = world.registry_mut();
4010        let mut p = PipelineBuilder::<u32>::new()
4011            .then(|_x: u32| -> Result<u32, &str> { Err("x") }, r)
4012            .unwrap_or(99);
4013        assert_eq!(p.run(&mut world, 0), 99);
4014    }
4015
4016    #[test]
4017    fn result_unwrap_or_else() {
4018        let mut world = WorldBuilder::new().build();
4019        let r = world.registry_mut();
4020        let mut p = PipelineBuilder::<u32>::new()
4021            .then(|_x: u32| -> Result<u32, i32> { Err(-5) }, r)
4022            .unwrap_or_else(|e: i32| e.unsigned_abs(), r);
4023        assert_eq!(p.run(&mut world, 0), 5);
4024    }
4025
4026    // =========================================================================
4027    // Batch pipeline
4028    // =========================================================================
4029
4030    #[test]
4031    fn batch_accumulates() {
4032        let mut wb = WorldBuilder::new();
4033        wb.register::<u64>(0);
4034        let mut world = wb.build();
4035
4036        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4037            *sum += x as u64;
4038        }
4039
4040        let r = world.registry_mut();
4041        let mut batch = PipelineBuilder::<u32>::new()
4042            .then(accumulate, r)
4043            .build_batch(16);
4044
4045        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
4046        batch.run(&mut world);
4047
4048        assert_eq!(*world.resource::<u64>(), 15);
4049        assert!(batch.input().is_empty());
4050    }
4051
4052    #[test]
4053    fn batch_retains_allocation() {
4054        let mut world = WorldBuilder::new().build();
4055        let r = world.registry_mut();
4056        let mut batch = PipelineBuilder::<u32>::new()
4057            .then(|_x: u32| {}, r)
4058            .build_batch(64);
4059
4060        batch.input_mut().extend_from_slice(&[1, 2, 3]);
4061        batch.run(&mut world);
4062
4063        assert!(batch.input().is_empty());
4064        assert!(batch.input_mut().capacity() >= 64);
4065    }
4066
4067    #[test]
4068    fn batch_empty_is_noop() {
4069        let mut wb = WorldBuilder::new();
4070        wb.register::<u64>(0);
4071        let mut world = wb.build();
4072
4073        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4074            *sum += x as u64;
4075        }
4076
4077        let r = world.registry_mut();
4078        let mut batch = PipelineBuilder::<u32>::new()
4079            .then(accumulate, r)
4080            .build_batch(16);
4081
4082        batch.run(&mut world);
4083        assert_eq!(*world.resource::<u64>(), 0);
4084    }
4085
4086    #[test]
4087    fn batch_catch_continues_on_error() {
4088        let mut wb = WorldBuilder::new();
4089        wb.register::<u64>(0);
4090        wb.register::<u32>(0);
4091        let mut world = wb.build();
4092
4093        fn validate(x: u32) -> Result<u32, &'static str> {
4094            if x > 0 { Ok(x) } else { Err("zero") }
4095        }
4096
4097        fn count_errors(mut errs: ResMut<u32>, _err: &'static str) {
4098            *errs += 1;
4099        }
4100
4101        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4102            *sum += x as u64;
4103        }
4104
4105        let r = world.registry_mut();
4106        let mut batch = PipelineBuilder::<u32>::new()
4107            .then(validate, r)
4108            .catch(count_errors, r)
4109            .map(accumulate, r)
4110            .build_batch(16);
4111
4112        // Items: 1, 0 (error), 2, 0 (error), 3
4113        batch.input_mut().extend_from_slice(&[1, 0, 2, 0, 3]);
4114        batch.run(&mut world);
4115
4116        assert_eq!(*world.resource::<u64>(), 6); // 1 + 2 + 3
4117        assert_eq!(*world.resource::<u32>(), 2); // 2 errors
4118    }
4119
4120    #[test]
4121    fn batch_filter_skips_items() {
4122        let mut wb = WorldBuilder::new();
4123        wb.register::<u64>(0);
4124        let mut world = wb.build();
4125
4126        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4127            *sum += x as u64;
4128        }
4129
4130        let r = world.registry_mut();
4131        let mut batch = PipelineBuilder::<u32>::new()
4132            .then(
4133                |x: u32| -> Option<u32> { if x > 2 { Some(x) } else { None } },
4134                r,
4135            )
4136            .map(accumulate, r)
4137            .build_batch(16);
4138
4139        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
4140        batch.run(&mut world);
4141
4142        assert_eq!(*world.resource::<u64>(), 12); // 3 + 4 + 5
4143    }
4144
4145    #[test]
4146    fn batch_multiple_runs_accumulate() {
4147        let mut wb = WorldBuilder::new();
4148        wb.register::<u64>(0);
4149        let mut world = wb.build();
4150
4151        fn accumulate(mut sum: ResMut<u64>, x: u32) {
4152            *sum += x as u64;
4153        }
4154
4155        let r = world.registry_mut();
4156        let mut batch = PipelineBuilder::<u32>::new()
4157            .then(accumulate, r)
4158            .build_batch(16);
4159
4160        batch.input_mut().extend_from_slice(&[1, 2, 3]);
4161        batch.run(&mut world);
4162        assert_eq!(*world.resource::<u64>(), 6);
4163
4164        batch.input_mut().extend_from_slice(&[4, 5]);
4165        batch.run(&mut world);
4166        assert_eq!(*world.resource::<u64>(), 15);
4167    }
4168
4169    #[test]
4170    fn batch_with_world_access() {
4171        let mut wb = WorldBuilder::new();
4172        wb.register::<u64>(10); // multiplier
4173        wb.register::<Vec<u64>>(Vec::new());
4174        let mut world = wb.build();
4175
4176        fn multiply_and_collect(factor: Res<u64>, mut out: ResMut<Vec<u64>>, x: u32) {
4177            out.push(x as u64 * *factor);
4178        }
4179
4180        let r = world.registry_mut();
4181        let mut batch = PipelineBuilder::<u32>::new()
4182            .then(multiply_and_collect, r)
4183            .build_batch(16);
4184
4185        batch.input_mut().extend_from_slice(&[1, 2, 3]);
4186        batch.run(&mut world);
4187
4188        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[10, 20, 30]);
4189    }
4190
4191    // =========================================================================
4192    // Cloned combinator
4193    // =========================================================================
4194
4195    // Named functions for proper lifetime elision (&'a u32 → &'a u32).
4196    // Closures get two independent lifetimes and fail to compile.
4197    fn ref_identity(x: &u32) -> &u32 {
4198        x
4199    }
4200    #[allow(clippy::unnecessary_wraps)]
4201    fn ref_wrap_some(x: &u32) -> Option<&u32> {
4202        Some(x)
4203    }
4204    fn ref_wrap_none(_x: &u32) -> Option<&u32> {
4205        None
4206    }
4207    #[allow(clippy::unnecessary_wraps)]
4208    fn ref_wrap_ok(x: &u32) -> Result<&u32, String> {
4209        Ok(x)
4210    }
4211    fn ref_wrap_err(_x: &u32) -> Result<&u32, String> {
4212        Err("fail".into())
4213    }
4214
4215    #[test]
4216    fn cloned_bare() {
4217        let mut world = WorldBuilder::new().build();
4218        // val before p — val must outlive the pipeline's In = &u32
4219        let val = 42u32;
4220        let r = world.registry_mut();
4221        let mut p = PipelineBuilder::<&u32>::new()
4222            .then(ref_identity, r)
4223            .cloned();
4224        assert_eq!(p.run(&mut world, &val), 42u32);
4225    }
4226
4227    #[test]
4228    fn cloned_option_some() {
4229        let mut world = WorldBuilder::new().build();
4230        let val = 42u32;
4231        let r = world.registry_mut();
4232        let mut p = PipelineBuilder::<&u32>::new()
4233            .then(ref_wrap_some, r)
4234            .cloned();
4235        assert_eq!(p.run(&mut world, &val), Some(42u32));
4236    }
4237
4238    #[test]
4239    fn cloned_option_none() {
4240        let mut world = WorldBuilder::new().build();
4241        let val = 42u32;
4242        let r = world.registry_mut();
4243        let mut p = PipelineBuilder::<&u32>::new()
4244            .then(ref_wrap_none, r)
4245            .cloned();
4246        assert_eq!(p.run(&mut world, &val), None);
4247    }
4248
4249    #[test]
4250    fn cloned_result_ok() {
4251        let mut world = WorldBuilder::new().build();
4252        let val = 42u32;
4253        let r = world.registry_mut();
4254        let mut p = PipelineBuilder::<&u32>::new().then(ref_wrap_ok, r).cloned();
4255        assert_eq!(p.run(&mut world, &val), Ok(42u32));
4256    }
4257
4258    #[test]
4259    fn cloned_result_err() {
4260        let mut world = WorldBuilder::new().build();
4261        let val = 42u32;
4262        let r = world.registry_mut();
4263        let mut p = PipelineBuilder::<&u32>::new()
4264            .then(ref_wrap_err, r)
4265            .cloned();
4266        assert_eq!(p.run(&mut world, &val), Err("fail".into()));
4267    }
4268
4269    // =========================================================================
4270    // Dispatch combinator
4271    // =========================================================================
4272
4273    #[test]
4274    fn dispatch_to_handler() {
4275        let mut wb = WorldBuilder::new();
4276        wb.register::<u64>(0);
4277        let mut world = wb.build();
4278
4279        fn store(mut out: ResMut<u64>, val: u32) {
4280            *out = val as u64;
4281        }
4282
4283        let r = world.registry_mut();
4284        let handler = PipelineBuilder::<u32>::new().then(store, r).build();
4285
4286        let mut p = PipelineBuilder::<u32>::new()
4287            .then(|x: u32| x * 2, r)
4288            .dispatch(handler)
4289            .build();
4290
4291        p.run(&mut world, 5);
4292        assert_eq!(*world.resource::<u64>(), 10);
4293    }
4294
4295    #[test]
4296    fn dispatch_to_fanout() {
4297        let mut wb = WorldBuilder::new();
4298        wb.register::<u64>(0);
4299        wb.register::<i64>(0);
4300        let mut world = wb.build();
4301
4302        fn write_u64(mut sink: ResMut<u64>, event: &u32) {
4303            *sink += *event as u64;
4304        }
4305        fn write_i64(mut sink: ResMut<i64>, event: &u32) {
4306            *sink += *event as i64;
4307        }
4308
4309        let h1 = write_u64.into_handler(world.registry());
4310        let h2 = write_i64.into_handler(world.registry());
4311        let fan = fan_out!(h1, h2);
4312
4313        let r = world.registry_mut();
4314        let mut p = PipelineBuilder::<u32>::new()
4315            .then(|x: u32| x * 2, r)
4316            .dispatch(fan)
4317            .build();
4318
4319        p.run(&mut world, 5);
4320        assert_eq!(*world.resource::<u64>(), 10);
4321        assert_eq!(*world.resource::<i64>(), 10);
4322    }
4323
4324    #[test]
4325    fn dispatch_to_broadcast() {
4326        let mut wb = WorldBuilder::new();
4327        wb.register::<u64>(0);
4328        let mut world = wb.build();
4329
4330        fn write_u64(mut sink: ResMut<u64>, event: &u32) {
4331            *sink += *event as u64;
4332        }
4333
4334        let mut broadcast = crate::Broadcast::<u32>::new();
4335        broadcast.add(write_u64.into_handler(world.registry()));
4336        broadcast.add(write_u64.into_handler(world.registry()));
4337
4338        let r = world.registry_mut();
4339        let mut p = PipelineBuilder::<u32>::new()
4340            .then(|x: u32| x + 1, r)
4341            .dispatch(broadcast)
4342            .build();
4343
4344        p.run(&mut world, 4);
4345        assert_eq!(*world.resource::<u64>(), 10); // 5 + 5
4346    }
4347
4348    #[test]
4349    fn dispatch_build_produces_handler() {
4350        let mut wb = WorldBuilder::new();
4351        wb.register::<u64>(0);
4352        let mut world = wb.build();
4353
4354        fn store(mut out: ResMut<u64>, val: u32) {
4355            *out = val as u64;
4356        }
4357
4358        let r = world.registry_mut();
4359        let inner = PipelineBuilder::<u32>::new().then(store, r).build();
4360
4361        let mut pipeline: Box<dyn Handler<u32>> = Box::new(
4362            PipelineBuilder::<u32>::new()
4363                .then(|x: u32| x + 1, r)
4364                .dispatch(inner)
4365                .build(),
4366        );
4367
4368        pipeline.run(&mut world, 9);
4369        assert_eq!(*world.resource::<u64>(), 10);
4370    }
4371
4372    // -- Guard combinator --
4373
4374    #[test]
4375    fn pipeline_guard_keeps() {
4376        fn sink(mut out: ResMut<u64>, val: Option<u64>) {
4377            *out = val.unwrap_or(0);
4378        }
4379        let mut wb = WorldBuilder::new();
4380        wb.register::<u64>(0);
4381        let mut world = wb.build();
4382        let reg = world.registry();
4383
4384        let mut p = PipelineBuilder::<u32>::new()
4385            .then(|x: u32| x as u64, reg)
4386            .guard(|v: &u64| *v > 3, reg)
4387            .then(sink, reg);
4388
4389        p.run(&mut world, 5u32);
4390        assert_eq!(*world.resource::<u64>(), 5);
4391    }
4392
4393    #[test]
4394    fn pipeline_guard_drops() {
4395        fn sink(mut out: ResMut<u64>, val: Option<u64>) {
4396            *out = val.unwrap_or(999);
4397        }
4398        let mut wb = WorldBuilder::new();
4399        wb.register::<u64>(0);
4400        let mut world = wb.build();
4401        let reg = world.registry();
4402
4403        let mut p = PipelineBuilder::<u32>::new()
4404            .then(|x: u32| x as u64, reg)
4405            .guard(|v: &u64| *v > 10, reg)
4406            .then(sink, reg);
4407
4408        p.run(&mut world, 5u32);
4409        assert_eq!(*world.resource::<u64>(), 999);
4410    }
4411
4412    // -- Tap combinator --
4413
4414    #[test]
4415    fn pipeline_tap_observes_without_changing() {
4416        fn sink(mut out: ResMut<u64>, val: u64) {
4417            *out = val;
4418        }
4419        let mut wb = WorldBuilder::new();
4420        wb.register::<u64>(0);
4421        wb.register::<bool>(false);
4422        let mut world = wb.build();
4423        let reg = world.registry();
4424
4425        let mut p = PipelineBuilder::<u32>::new()
4426            .then(|x: u32| x as u64 * 2, reg)
4427            .tap(
4428                |w: &mut World, val: &u64| {
4429                    *w.resource_mut::<bool>() = *val == 10;
4430                },
4431                reg,
4432            )
4433            .then(sink, reg);
4434
4435        p.run(&mut world, 5u32);
4436        assert_eq!(*world.resource::<u64>(), 10); // value passed through
4437        assert!(*world.resource::<bool>()); // tap fired
4438    }
4439
4440    // -- Route combinator --
4441
4442    #[test]
4443    fn pipeline_route_true_arm() {
4444        fn sink(mut out: ResMut<u64>, val: u64) {
4445            *out = val;
4446        }
4447        let mut wb = WorldBuilder::new();
4448        wb.register::<u64>(0);
4449        let mut world = wb.build();
4450        let reg = world.registry();
4451
4452        let arm_t = PipelineBuilder::new().then(|x: u64| x * 2, reg);
4453        let arm_f = PipelineBuilder::new().then(|x: u64| x * 3, reg);
4454
4455        let mut p = PipelineBuilder::<u32>::new()
4456            .then(|x: u32| x as u64, reg)
4457            .route(|v: &u64| *v > 3, reg, arm_t, arm_f)
4458            .then(sink, reg);
4459
4460        p.run(&mut world, 5u32); // 5 > 3 → true arm → double → 10
4461        assert_eq!(*world.resource::<u64>(), 10);
4462    }
4463
4464    #[test]
4465    fn pipeline_route_false_arm() {
4466        fn sink(mut out: ResMut<u64>, val: u64) {
4467            *out = val;
4468        }
4469        let mut wb = WorldBuilder::new();
4470        wb.register::<u64>(0);
4471        let mut world = wb.build();
4472        let reg = world.registry();
4473
4474        let arm_t = PipelineBuilder::new().then(|x: u64| x * 2, reg);
4475        let arm_f = PipelineBuilder::new().then(|x: u64| x * 3, reg);
4476
4477        let mut p = PipelineBuilder::<u32>::new()
4478            .then(|x: u32| x as u64, reg)
4479            .route(|v: &u64| *v > 10, reg, arm_t, arm_f)
4480            .then(sink, reg);
4481
4482        p.run(&mut world, 5u32); // 5 <= 10 → false arm → triple → 15
4483        assert_eq!(*world.resource::<u64>(), 15);
4484    }
4485
4486    #[test]
4487    fn pipeline_route_nested() {
4488        fn sink(mut out: ResMut<u64>, val: u64) {
4489            *out = val;
4490        }
4491        let mut wb = WorldBuilder::new();
4492        wb.register::<u64>(0);
4493        let mut world = wb.build();
4494        let reg = world.registry();
4495
4496        // N-ary via nesting: <5 → +100, 5..10 → +200, >=10 → +300
4497        let inner_t = PipelineBuilder::new().then(|x: u64| x + 200, reg);
4498        let inner_f = PipelineBuilder::new().then(|x: u64| x + 300, reg);
4499        let outer_t = PipelineBuilder::new().then(|x: u64| x + 100, reg);
4500        let outer_f = PipelineBuilder::new().then(|x: u64| x, reg).route(
4501            |v: &u64| *v < 10,
4502            reg,
4503            inner_t,
4504            inner_f,
4505        );
4506
4507        let mut p = PipelineBuilder::<u32>::new()
4508            .then(|x: u32| x as u64, reg)
4509            .route(|v: &u64| *v < 5, reg, outer_t, outer_f)
4510            .then(sink, reg);
4511
4512        p.run(&mut world, 3u32); // 3 < 5 → +100 → 103
4513        assert_eq!(*world.resource::<u64>(), 103);
4514
4515        p.run(&mut world, 7u32); // 7 >= 5, 7 < 10 → +200 → 207
4516        assert_eq!(*world.resource::<u64>(), 207);
4517
4518        p.run(&mut world, 15u32); // 15 >= 5, 15 >= 10 → +300 → 315
4519        assert_eq!(*world.resource::<u64>(), 315);
4520    }
4521
4522    // -- Tee combinator --
4523
4524    #[test]
4525    fn pipeline_tee_side_effect_chain() {
4526        use crate::dag::DagArmSeed;
4527
4528        fn log_step(mut counter: ResMut<u32>, _val: &u64) {
4529            *counter += 1;
4530        }
4531        fn sink(mut out: ResMut<u64>, val: u64) {
4532            *out = val;
4533        }
4534        let mut wb = WorldBuilder::new();
4535        wb.register::<u64>(0);
4536        wb.register::<u32>(0);
4537        let mut world = wb.build();
4538        let reg = world.registry();
4539
4540        let side = DagArmSeed::new().then(log_step, reg);
4541
4542        let mut p = PipelineBuilder::<u32>::new()
4543            .then(|x: u32| x as u64 * 2, reg)
4544            .tee(side)
4545            .then(sink, reg);
4546
4547        p.run(&mut world, 5u32);
4548        assert_eq!(*world.resource::<u64>(), 10); // value passed through
4549        assert_eq!(*world.resource::<u32>(), 1); // side-effect fired
4550
4551        p.run(&mut world, 7u32);
4552        assert_eq!(*world.resource::<u64>(), 14);
4553        assert_eq!(*world.resource::<u32>(), 2);
4554    }
4555
4556    // -- Dedup combinator --
4557
4558    #[test]
4559    fn pipeline_dedup_suppresses_unchanged() {
4560        fn sink(mut out: ResMut<u32>, val: Option<u64>) {
4561            if val.is_some() {
4562                *out += 1;
4563            }
4564        }
4565        let mut wb = WorldBuilder::new();
4566        wb.register::<u32>(0);
4567        let mut world = wb.build();
4568        let reg = world.registry();
4569
4570        let mut p = PipelineBuilder::<u32>::new()
4571            .then(|x: u32| x as u64 / 2, reg)
4572            .dedup()
4573            .then(sink, reg);
4574
4575        p.run(&mut world, 4u32); // 2 — first, Some
4576        assert_eq!(*world.resource::<u32>(), 1);
4577
4578        p.run(&mut world, 5u32); // 2 — same, None
4579        assert_eq!(*world.resource::<u32>(), 1);
4580
4581        p.run(&mut world, 6u32); // 3 — changed, Some
4582        assert_eq!(*world.resource::<u32>(), 2);
4583    }
4584
4585    // -- Bool combinators --
4586
4587    #[test]
4588    fn pipeline_not() {
4589        fn sink(mut out: ResMut<bool>, val: bool) {
4590            *out = val;
4591        }
4592        let mut wb = WorldBuilder::new();
4593        wb.register::<bool>(false);
4594        let mut world = wb.build();
4595        let reg = world.registry();
4596
4597        let mut p = PipelineBuilder::<u32>::new()
4598            .then(|x: u32| x > 5, reg)
4599            .not()
4600            .then(sink, reg);
4601
4602        p.run(&mut world, 3u32); // 3 > 5 = false, not = true
4603        assert!(*world.resource::<bool>());
4604
4605        p.run(&mut world, 10u32); // 10 > 5 = true, not = false
4606        assert!(!*world.resource::<bool>());
4607    }
4608
4609    #[test]
4610    fn pipeline_and() {
4611        fn sink(mut out: ResMut<bool>, val: bool) {
4612            *out = val;
4613        }
4614        let mut wb = WorldBuilder::new();
4615        wb.register::<bool>(true);
4616        let mut world = wb.build();
4617        let reg = world.registry();
4618
4619        let mut p = PipelineBuilder::<u32>::new()
4620            .then(|x: u32| x > 5, reg)
4621            .and(|w: &mut World| *w.resource::<bool>(), reg)
4622            .then(sink, reg);
4623
4624        p.run(&mut world, 10u32); // true && true = true
4625        assert!(*world.resource::<bool>());
4626
4627        *world.resource_mut::<bool>() = false;
4628        p.run(&mut world, 10u32); // true && false = false
4629        assert!(!*world.resource::<bool>());
4630    }
4631
4632    #[test]
4633    fn pipeline_or() {
4634        fn sink(mut out: ResMut<bool>, val: bool) {
4635            *out = val;
4636        }
4637        let mut wb = WorldBuilder::new();
4638        wb.register::<bool>(false);
4639        let mut world = wb.build();
4640        let reg = world.registry();
4641
4642        let mut p = PipelineBuilder::<u32>::new()
4643            .then(|x: u32| x > 5, reg)
4644            .or(|w: &mut World| *w.resource::<bool>(), reg)
4645            .then(sink, reg);
4646
4647        p.run(&mut world, 3u32); // false || false = false
4648        assert!(!*world.resource::<bool>());
4649
4650        *world.resource_mut::<bool>() = true;
4651        p.run(&mut world, 3u32); // false || true = true
4652        assert!(*world.resource::<bool>());
4653    }
4654
4655    #[test]
4656    fn pipeline_xor() {
4657        fn sink(mut out: ResMut<bool>, val: bool) {
4658            *out = val;
4659        }
4660        let mut wb = WorldBuilder::new();
4661        wb.register::<bool>(true);
4662        let mut world = wb.build();
4663        let reg = world.registry();
4664
4665        let mut p = PipelineBuilder::<u32>::new()
4666            .then(|x: u32| x > 5, reg)
4667            .xor(|w: &mut World| *w.resource::<bool>(), reg)
4668            .then(sink, reg);
4669
4670        p.run(&mut world, 10u32); // true ^ true = false
4671        assert!(!*world.resource::<bool>());
4672    }
4673
4674    // =========================================================================
4675    // Splat — tuple destructuring
4676    // =========================================================================
4677
4678    #[test]
4679    fn splat2_closure_on_start() {
4680        let mut world = WorldBuilder::new().build();
4681        let r = world.registry_mut();
4682        let mut p = PipelineBuilder::<(u32, u64)>::new()
4683            .splat()
4684            .then(|a: u32, b: u64| a as u64 + b, r);
4685        assert_eq!(p.run(&mut world, (3, 7)), 10);
4686    }
4687
4688    #[test]
4689    fn splat2_named_fn_with_param() {
4690        let mut wb = WorldBuilder::new();
4691        wb.register::<u64>(100);
4692        let mut world = wb.build();
4693
4694        fn process(base: Res<u64>, a: u32, b: u32) -> u64 {
4695            *base + a as u64 + b as u64
4696        }
4697
4698        let r = world.registry_mut();
4699        let mut p = PipelineBuilder::<(u32, u32)>::new()
4700            .splat()
4701            .then(process, r);
4702        assert_eq!(p.run(&mut world, (3, 7)), 110);
4703    }
4704
4705    #[test]
4706    fn splat2_mid_chain() {
4707        let mut world = WorldBuilder::new().build();
4708        let r = world.registry_mut();
4709        let mut p = PipelineBuilder::<u32>::new()
4710            .then(|x: u32| (x, x * 2), r)
4711            .splat()
4712            .then(|a: u32, b: u32| a as u64 + b as u64, r);
4713        assert_eq!(p.run(&mut world, 5), 15); // 5 + 10
4714    }
4715
4716    #[test]
4717    fn splat3_closure_on_start() {
4718        let mut world = WorldBuilder::new().build();
4719        let r = world.registry_mut();
4720        let mut p = PipelineBuilder::<(u32, u32, u32)>::new()
4721            .splat()
4722            .then(|a: u32, b: u32, c: u32| a + b + c, r);
4723        assert_eq!(p.run(&mut world, (1, 2, 3)), 6);
4724    }
4725
4726    #[test]
4727    fn splat3_named_fn_with_param() {
4728        let mut wb = WorldBuilder::new();
4729        wb.register::<u64>(10);
4730        let mut world = wb.build();
4731
4732        fn process(factor: Res<u64>, a: u32, b: u32, c: u32) -> u64 {
4733            *factor * (a + b + c) as u64
4734        }
4735
4736        let r = world.registry_mut();
4737        let mut p = PipelineBuilder::<(u32, u32, u32)>::new()
4738            .splat()
4739            .then(process, r);
4740        assert_eq!(p.run(&mut world, (1, 2, 3)), 60);
4741    }
4742
4743    #[test]
4744    fn splat4_mid_chain() {
4745        let mut world = WorldBuilder::new().build();
4746        let r = world.registry_mut();
4747        let mut p = PipelineBuilder::<u32>::new()
4748            .then(|x: u32| (x, x + 1, x + 2, x + 3), r)
4749            .splat()
4750            .then(|a: u32, b: u32, c: u32, d: u32| (a + b + c + d) as u64, r);
4751        assert_eq!(p.run(&mut world, 10), 46); // 10+11+12+13
4752    }
4753
4754    #[test]
4755    fn splat5_closure_on_start() {
4756        let mut world = WorldBuilder::new().build();
4757        let r = world.registry_mut();
4758        let mut p = PipelineBuilder::<(u8, u8, u8, u8, u8)>::new().splat().then(
4759            |a: u8, b: u8, c: u8, d: u8, e: u8| {
4760                (a as u64) + (b as u64) + (c as u64) + (d as u64) + (e as u64)
4761            },
4762            r,
4763        );
4764        assert_eq!(p.run(&mut world, (1, 2, 3, 4, 5)), 15);
4765    }
4766
4767    #[test]
4768    fn splat_build_into_handler() {
4769        let mut wb = WorldBuilder::new();
4770        wb.register::<u64>(0);
4771        let mut world = wb.build();
4772
4773        fn store(mut out: ResMut<u64>, a: u32, b: u32) {
4774            *out = a as u64 + b as u64;
4775        }
4776
4777        let r = world.registry_mut();
4778        let mut pipeline = PipelineBuilder::<(u32, u32)>::new()
4779            .splat()
4780            .then(store, r)
4781            .build();
4782
4783        pipeline.run(&mut world, (3, 7));
4784        assert_eq!(*world.resource::<u64>(), 10);
4785    }
4786
4787    #[test]
4788    fn splat_build_batch() {
4789        let mut wb = WorldBuilder::new();
4790        wb.register::<u64>(0);
4791        let mut world = wb.build();
4792
4793        fn accumulate(mut sum: ResMut<u64>, a: u32, b: u32) {
4794            *sum += a as u64 + b as u64;
4795        }
4796
4797        let r = world.registry_mut();
4798        let mut batch = PipelineBuilder::<(u32, u32)>::new()
4799            .splat()
4800            .then(accumulate, r)
4801            .build_batch(8);
4802
4803        batch
4804            .input_mut()
4805            .extend_from_slice(&[(1, 2), (3, 4), (5, 6)]);
4806        batch.run(&mut world);
4807        assert_eq!(*world.resource::<u64>(), 21); // 3+7+11
4808    }
4809
4810    #[test]
4811    #[should_panic(expected = "conflicting access")]
4812    fn splat_access_conflict_detected() {
4813        let mut wb = WorldBuilder::new();
4814        wb.register::<u64>(0);
4815        let mut world = wb.build();
4816
4817        fn bad(a: ResMut<u64>, _b: ResMut<u64>, _x: u32, _y: u32) {
4818            let _ = a;
4819        }
4820
4821        let r = world.registry_mut();
4822        // Should panic on duplicate ResMut<u64>
4823        let _ = PipelineBuilder::<(u32, u32)>::new().splat().then(bad, r);
4824    }
4825
4826    // -- Then (previously switch) --
4827
4828    #[test]
4829    fn pipeline_then_branching() {
4830        fn double(x: u32) -> u64 {
4831            x as u64 * 2
4832        }
4833        fn sink(mut out: ResMut<u64>, val: u64) {
4834            *out = val;
4835        }
4836
4837        let mut wb = WorldBuilder::new();
4838        wb.register::<u64>(0);
4839        let mut world = wb.build();
4840        let reg = world.registry();
4841
4842        let mut pipeline = PipelineBuilder::<u32>::new()
4843            .then(double, reg)
4844            .then(|val: u64| if val > 10 { val * 100 } else { val + 1 }, reg)
4845            .then(sink, reg)
4846            .build();
4847
4848        pipeline.run(&mut world, 10u32); // 20 > 10 → 2000
4849        assert_eq!(*world.resource::<u64>(), 2000);
4850
4851        pipeline.run(&mut world, 3u32); // 6 <= 10 → 7
4852        assert_eq!(*world.resource::<u64>(), 7);
4853    }
4854
4855    #[test]
4856    fn pipeline_then_3_way() {
4857        fn sink(mut out: ResMut<u64>, val: u64) {
4858            *out = val;
4859        }
4860
4861        let mut wb = WorldBuilder::new();
4862        wb.register::<u64>(0);
4863        let mut world = wb.build();
4864        let reg = world.registry();
4865
4866        let mut pipeline = PipelineBuilder::<u32>::new()
4867            .then(
4868                |val: u32| match val % 3 {
4869                    0 => val as u64 + 100,
4870                    1 => val as u64 + 200,
4871                    _ => val as u64 + 300,
4872                },
4873                reg,
4874            )
4875            .then(sink, reg)
4876            .build();
4877
4878        pipeline.run(&mut world, 6u32); // 6 % 3 == 0 → 106
4879        assert_eq!(*world.resource::<u64>(), 106);
4880
4881        pipeline.run(&mut world, 7u32); // 7 % 3 == 1 → 207
4882        assert_eq!(*world.resource::<u64>(), 207);
4883
4884        pipeline.run(&mut world, 8u32); // 8 % 3 == 2 → 308
4885        assert_eq!(*world.resource::<u64>(), 308);
4886    }
4887
4888    #[test]
4889    fn pipeline_then_with_resolve_step() {
4890        fn add_offset(offset: Res<i64>, val: u32) -> u64 {
4891            (*offset + val as i64) as u64
4892        }
4893        fn plain_double(val: u32) -> u64 {
4894            val as u64 * 2
4895        }
4896        fn sink(mut out: ResMut<u64>, val: u64) {
4897            *out = val;
4898        }
4899
4900        let mut wb = WorldBuilder::new();
4901        wb.register::<u64>(0);
4902        wb.register::<i64>(100);
4903        let mut world = wb.build();
4904        let reg = world.registry();
4905
4906        let mut arm_offset = resolve_step(add_offset, reg);
4907        let mut arm_double = resolve_step(plain_double, reg);
4908
4909        let mut pipeline = PipelineBuilder::<u32>::new()
4910            .then(
4911                move |world: &mut World, val: u32| {
4912                    if val > 10 {
4913                        arm_offset(world, val)
4914                    } else {
4915                        arm_double(world, val)
4916                    }
4917                },
4918                reg,
4919            )
4920            .then(sink, reg)
4921            .build();
4922
4923        pipeline.run(&mut world, 20u32); // > 10 → offset → 100 + 20 = 120
4924        assert_eq!(*world.resource::<u64>(), 120);
4925
4926        pipeline.run(&mut world, 5u32); // <= 10 → double → 10
4927        assert_eq!(*world.resource::<u64>(), 10);
4928    }
4929
4930    #[test]
4931    fn batch_pipeline_then_branching() {
4932        fn sink(mut out: ResMut<u64>, val: u64) {
4933            *out += val;
4934        }
4935
4936        let mut wb = WorldBuilder::new();
4937        wb.register::<u64>(0);
4938        let mut world = wb.build();
4939        let reg = world.registry();
4940
4941        let mut batch = PipelineBuilder::<u32>::new()
4942            .then(
4943                |val: u32| {
4944                    if val % 2 == 0 {
4945                        val as u64 * 10
4946                    } else {
4947                        val as u64
4948                    }
4949                },
4950                reg,
4951            )
4952            .then(sink, reg)
4953            .build_batch(8);
4954
4955        batch.input_mut().extend([1, 2, 3, 4]);
4956        batch.run(&mut world);
4957
4958        // 1 → 1, 2 → 20, 3 → 3, 4 → 40 = 64
4959        assert_eq!(*world.resource::<u64>(), 64);
4960    }
4961
4962    // -- IntoRefStep with Param: named functions --
4963
4964    #[test]
4965    fn guard_named_fn_with_param() {
4966        fn above_threshold(threshold: Res<u64>, val: &u64) -> bool {
4967            *val > *threshold
4968        }
4969        fn sink(mut out: ResMut<i64>, val: Option<u64>) {
4970            *out = val.map_or(-1, |v| v as i64);
4971        }
4972        let mut wb = WorldBuilder::new();
4973        wb.register::<u64>(5); // threshold
4974        wb.register::<i64>(0);
4975        let mut world = wb.build();
4976        let reg = world.registry();
4977
4978        let mut p = PipelineBuilder::<u32>::new()
4979            .then(|x: u32| x as u64, reg)
4980            .guard(above_threshold, reg)
4981            .then(sink, reg);
4982
4983        p.run(&mut world, 10u32); // 10 > 5 → Some(10)
4984        assert_eq!(*world.resource::<i64>(), 10);
4985
4986        p.run(&mut world, 3u32); // 3 <= 5 → None → -1
4987        assert_eq!(*world.resource::<i64>(), -1);
4988    }
4989
4990    #[test]
4991    fn filter_named_fn_with_param() {
4992        fn is_allowed(allowed: Res<u64>, val: &u64) -> bool {
4993            *val != *allowed
4994        }
4995        fn count(mut ctr: ResMut<i64>, _val: u64) {
4996            *ctr += 1;
4997        }
4998        let mut wb = WorldBuilder::new();
4999        wb.register::<u64>(42); // blocked value
5000        wb.register::<i64>(0);
5001        let mut world = wb.build();
5002        let reg = world.registry();
5003
5004        let mut p = PipelineBuilder::<u32>::new()
5005            .then(|x: u32| -> Option<u64> { Some(x as u64) }, reg)
5006            .filter(is_allowed, reg)
5007            .map(count, reg)
5008            .unwrap_or(());
5009
5010        for v in [1u32, 42, 5, 42, 10] {
5011            p.run(&mut world, v);
5012        }
5013        assert_eq!(*world.resource::<i64>(), 3); // 42 filtered out twice
5014    }
5015
5016    #[test]
5017    fn inspect_named_fn_with_param() {
5018        fn log_value(mut log: ResMut<Vec<u64>>, val: &u64) {
5019            log.push(*val);
5020        }
5021        let mut wb = WorldBuilder::new();
5022        wb.register::<Vec<u64>>(Vec::new());
5023        let mut world = wb.build();
5024        let reg = world.registry();
5025
5026        let mut p = PipelineBuilder::<u32>::new()
5027            .then(|x: u32| -> Option<u64> { Some(x as u64) }, reg)
5028            .inspect(log_value, reg)
5029            .unwrap_or(0);
5030
5031        for v in [1u32, 2, 3] {
5032            p.run(&mut world, v);
5033        }
5034        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[1, 2, 3]);
5035    }
5036
5037    #[test]
5038    fn tap_named_fn_with_param() {
5039        fn observe(mut log: ResMut<Vec<u64>>, val: &u64) {
5040            log.push(*val);
5041        }
5042        fn sink(mut out: ResMut<u64>, val: u64) {
5043            *out = val;
5044        }
5045        let mut wb = WorldBuilder::new();
5046        wb.register::<u64>(0);
5047        wb.register::<Vec<u64>>(Vec::new());
5048        let mut world = wb.build();
5049        let reg = world.registry();
5050
5051        let mut p = PipelineBuilder::<u32>::new()
5052            .then(|x: u32| x as u64, reg)
5053            .tap(observe, reg)
5054            .then(sink, reg);
5055
5056        p.run(&mut world, 7u32);
5057        assert_eq!(*world.resource::<u64>(), 7);
5058        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[7]);
5059    }
5060
5061    // -- IntoProducer with Param: named functions --
5062
5063    #[test]
5064    fn and_named_fn_with_param() {
5065        fn check_enabled(flag: Res<bool>) -> bool {
5066            *flag
5067        }
5068        let mut wb = WorldBuilder::new();
5069        wb.register::<bool>(true);
5070        let mut world = wb.build();
5071        let reg = world.registry();
5072
5073        let mut p = PipelineBuilder::<u32>::new()
5074            .then(|_x: u32| true, reg)
5075            .and(check_enabled, reg);
5076
5077        assert!(p.run(&mut world, 0u32));
5078
5079        *world.resource_mut::<bool>() = false;
5080        assert!(!p.run(&mut world, 0u32)); // short-circuit: true AND false
5081    }
5082
5083    #[test]
5084    fn or_named_fn_with_param() {
5085        fn check_enabled(flag: Res<bool>) -> bool {
5086            *flag
5087        }
5088        let mut wb = WorldBuilder::new();
5089        wb.register::<bool>(true);
5090        let mut world = wb.build();
5091        let reg = world.registry();
5092
5093        let mut p = PipelineBuilder::<u32>::new()
5094            .then(|_x: u32| false, reg)
5095            .or(check_enabled, reg);
5096
5097        assert!(p.run(&mut world, 0u32)); // false OR true
5098
5099        *world.resource_mut::<bool>() = false;
5100        assert!(!p.run(&mut world, 0u32)); // false OR false
5101    }
5102
5103    #[test]
5104    fn on_none_named_fn_with_param() {
5105        fn log_miss(mut ctr: ResMut<u64>) {
5106            *ctr += 1;
5107        }
5108        let mut wb = WorldBuilder::new();
5109        wb.register::<u64>(0);
5110        let mut world = wb.build();
5111        let reg = world.registry();
5112
5113        let mut p = PipelineBuilder::<u32>::new()
5114            .then(
5115                |x: u32| -> Option<u32> { if x > 5 { Some(x) } else { None } },
5116                reg,
5117            )
5118            .on_none(log_miss, reg)
5119            .unwrap_or(0);
5120
5121        for v in [1u32, 10, 3, 20] {
5122            p.run(&mut world, v);
5123        }
5124        assert_eq!(*world.resource::<u64>(), 2); // 1 and 3 are None
5125    }
5126
5127    #[test]
5128    fn ok_or_else_named_fn_with_param() {
5129        fn make_error(msg: Res<String>) -> String {
5130            msg.clone()
5131        }
5132        let mut wb = WorldBuilder::new();
5133        wb.register::<String>("not found".into());
5134        let mut world = wb.build();
5135        let reg = world.registry();
5136
5137        let mut p = PipelineBuilder::<u32>::new()
5138            .then(
5139                |x: u32| -> Option<u32> { if x > 0 { Some(x) } else { None } },
5140                reg,
5141            )
5142            .ok_or_else(make_error, reg);
5143
5144        let r: Result<u32, String> = p.run(&mut world, 5u32);
5145        assert_eq!(r, Ok(5));
5146
5147        let r: Result<u32, String> = p.run(&mut world, 0u32);
5148        assert_eq!(r, Err("not found".into()));
5149    }
5150
5151    #[test]
5152    fn unwrap_or_else_option_named_fn_with_param() {
5153        fn fallback(default: Res<u64>) -> u64 {
5154            *default
5155        }
5156        let mut wb = WorldBuilder::new();
5157        wb.register::<u64>(42);
5158        let mut world = wb.build();
5159        let reg = world.registry();
5160
5161        let mut p = PipelineBuilder::<u32>::new()
5162            .then(
5163                |x: u32| -> Option<u64> { if x > 0 { Some(x as u64) } else { None } },
5164                reg,
5165            )
5166            .unwrap_or_else(fallback, reg);
5167
5168        assert_eq!(p.run(&mut world, 5u32), 5);
5169        assert_eq!(p.run(&mut world, 0u32), 42);
5170    }
5171
5172    // -- IntoStep with Opaque: &mut World closures --
5173
5174    #[test]
5175    fn map_err_named_fn_with_param() {
5176        fn tag_error(prefix: Res<String>, err: String) -> String {
5177            format!("{}: {err}", &*prefix)
5178        }
5179        fn sink(mut out: ResMut<String>, val: Result<u32, String>) {
5180            match val {
5181                Ok(v) => *out = format!("ok:{v}"),
5182                Err(e) => *out = e,
5183            }
5184        }
5185        let mut wb = WorldBuilder::new();
5186        wb.register::<String>("ERR".into());
5187        let mut world = wb.build();
5188        let reg = world.registry();
5189
5190        let mut p = PipelineBuilder::<u32>::new()
5191            .then(
5192                |x: u32| -> Result<u32, String> { if x > 0 { Ok(x) } else { Err("zero".into()) } },
5193                reg,
5194            )
5195            .map_err(tag_error, reg)
5196            .then(sink, reg);
5197
5198        p.run(&mut world, 0u32);
5199        assert_eq!(world.resource::<String>().as_str(), "ERR: zero");
5200
5201        p.run(&mut world, 5u32);
5202        assert_eq!(world.resource::<String>().as_str(), "ok:5");
5203    }
5204
5205    // =========================================================================
5206    // Scan combinator
5207    // =========================================================================
5208
5209    #[test]
5210    fn scan_arity0_closure_running_sum() {
5211        let mut world = WorldBuilder::new().build();
5212        let reg = world.registry();
5213
5214        let mut p = PipelineBuilder::<u64>::new().then(|x: u64| x, reg).scan(
5215            0u64,
5216            |acc: &mut u64, val: u64| {
5217                *acc += val;
5218                Some(*acc)
5219            },
5220            reg,
5221        );
5222
5223        assert_eq!(p.run(&mut world, 10), Some(10));
5224        assert_eq!(p.run(&mut world, 20), Some(30));
5225        assert_eq!(p.run(&mut world, 5), Some(35));
5226    }
5227
5228    #[test]
5229    fn scan_named_fn_with_param() {
5230        let mut wb = WorldBuilder::new();
5231        wb.register::<u64>(100);
5232        let mut world = wb.build();
5233        let reg = world.registry();
5234
5235        fn threshold_scan(limit: Res<u64>, acc: &mut u64, val: u64) -> Option<u64> {
5236            *acc += val;
5237            if *acc > *limit { Some(*acc) } else { None }
5238        }
5239
5240        let mut p =
5241            PipelineBuilder::<u64>::new()
5242                .then(|x: u64| x, reg)
5243                .scan(0u64, threshold_scan, reg);
5244
5245        assert_eq!(p.run(&mut world, 50), None);
5246        assert_eq!(p.run(&mut world, 30), None);
5247        assert_eq!(p.run(&mut world, 25), Some(105));
5248    }
5249
5250    #[test]
5251    fn scan_opaque_closure() {
5252        let mut wb = WorldBuilder::new();
5253        wb.register::<u64>(10);
5254        let mut world = wb.build();
5255        let reg = world.registry();
5256
5257        let mut p = PipelineBuilder::<u64>::new().then(|x: u64| x, reg).scan(
5258            0u64,
5259            |world: &mut World, acc: &mut u64, val: u64| {
5260                let factor = *world.resource::<u64>();
5261                *acc += val * factor;
5262                Some(*acc)
5263            },
5264            reg,
5265        );
5266
5267        assert_eq!(p.run(&mut world, 1), Some(10));
5268        assert_eq!(p.run(&mut world, 2), Some(30));
5269    }
5270
5271    #[test]
5272    fn scan_suppression_returns_none() {
5273        let mut world = WorldBuilder::new().build();
5274        let reg = world.registry();
5275
5276        let mut p = PipelineBuilder::<u64>::new().then(|x: u64| x, reg).scan(
5277            0u64,
5278            |acc: &mut u64, val: u64| -> Option<u64> {
5279                *acc += val;
5280                if *acc > 50 { Some(*acc) } else { None }
5281            },
5282            reg,
5283        );
5284
5285        assert_eq!(p.run(&mut world, 20), None);
5286        assert_eq!(p.run(&mut world, 20), None);
5287        assert_eq!(p.run(&mut world, 20), Some(60));
5288    }
5289
5290    #[test]
5291    fn scan_on_pipeline_start() {
5292        let mut world = WorldBuilder::new().build();
5293        let reg = world.registry();
5294
5295        let mut p = PipelineBuilder::<u64>::new().scan(
5296            0u64,
5297            |acc: &mut u64, val: u64| {
5298                *acc += val;
5299                *acc
5300            },
5301            reg,
5302        );
5303
5304        assert_eq!(p.run(&mut world, 5), 5);
5305        assert_eq!(p.run(&mut world, 3), 8);
5306        assert_eq!(p.run(&mut world, 2), 10);
5307    }
5308
5309    #[test]
5310    fn scan_persistence_across_batch() {
5311        let mut wb = WorldBuilder::new();
5312        wb.register::<u64>(0);
5313        let mut world = wb.build();
5314        let reg = world.registry();
5315
5316        fn store(mut out: ResMut<u64>, val: u64) {
5317            *out = val;
5318        }
5319
5320        let mut p = PipelineBuilder::<u64>::new()
5321            .then(|x: u64| x, reg)
5322            .scan(
5323                0u64,
5324                |acc: &mut u64, val: u64| {
5325                    *acc += val;
5326                    *acc
5327                },
5328                reg,
5329            )
5330            .then(store, reg)
5331            .build_batch(4);
5332
5333        p.input_mut().extend([1, 2, 3]);
5334        p.run(&mut world);
5335
5336        // Accumulator persists: 1, 3, 6
5337        assert_eq!(*world.resource::<u64>(), 6);
5338
5339        p.input_mut().push(4);
5340        p.run(&mut world);
5341        // acc = 6 + 4 = 10
5342        assert_eq!(*world.resource::<u64>(), 10);
5343    }
5344
5345    // =========================================================================
5346    // Build — Option<()> terminal
5347    // =========================================================================
5348
5349    #[test]
5350    fn build_option_unit_terminal() {
5351        let mut wb = WorldBuilder::new();
5352        wb.register::<u64>(0);
5353        let mut world = wb.build();
5354        let r = world.registry_mut();
5355
5356        fn check(x: u32) -> Option<u32> {
5357            if x > 5 { Some(x) } else { None }
5358        }
5359        fn store(mut out: ResMut<u64>, val: u32) {
5360            *out += val as u64;
5361        }
5362
5363        // .map(store) on Option<u32> produces Option<()> — build() must work
5364        let mut p = PipelineBuilder::<u32>::new()
5365            .then(check, r)
5366            .map(store, r)
5367            .build();
5368
5369        p.run(&mut world, 3); // None, skipped
5370        assert_eq!(*world.resource::<u64>(), 0);
5371        p.run(&mut world, 7); // Some, stores
5372        assert_eq!(*world.resource::<u64>(), 7);
5373        p.run(&mut world, 10);
5374        assert_eq!(*world.resource::<u64>(), 17);
5375    }
5376
5377    #[test]
5378    fn build_option_unit_boxes_into_handler() {
5379        let mut wb = WorldBuilder::new();
5380        wb.register::<u64>(0);
5381        let mut world = wb.build();
5382        let r = world.registry_mut();
5383
5384        fn double(x: u32) -> Option<u64> {
5385            if x > 0 { Some(x as u64 * 2) } else { None }
5386        }
5387        fn store(mut out: ResMut<u64>, val: u64) {
5388            *out += val;
5389        }
5390
5391        let mut h: Box<dyn Handler<u32>> = Box::new(
5392            PipelineBuilder::<u32>::new()
5393                .then(double, r)
5394                .map(store, r)
5395                .build(),
5396        );
5397        h.run(&mut world, 0); // None
5398        assert_eq!(*world.resource::<u64>(), 0);
5399        h.run(&mut world, 5); // 10
5400        assert_eq!(*world.resource::<u64>(), 10);
5401    }
5402
5403    // =========================================================================
5404    // Build — borrowed event type
5405    // =========================================================================
5406
5407    #[test]
5408    fn build_borrowed_event_direct() {
5409        let mut wb = WorldBuilder::new();
5410        wb.register::<u64>(0);
5411        let mut world = wb.build();
5412
5413        fn decode(msg: &[u8]) -> u64 {
5414            msg.len() as u64
5415        }
5416        fn store(mut out: ResMut<u64>, val: u64) {
5417            *out = val;
5418        }
5419
5420        // msg declared before p so it outlives the pipeline (drop order).
5421        // Matches real-world usage: pipeline lives long, events come and go.
5422        let msg = vec![1u8, 2, 3];
5423        let r = world.registry_mut();
5424        let mut p = PipelineBuilder::<&[u8]>::new()
5425            .then(decode, r)
5426            .then(store, r)
5427            .build();
5428
5429        p.run(&mut world, &msg);
5430        assert_eq!(*world.resource::<u64>(), 3);
5431    }
5432
5433    #[test]
5434    fn build_borrowed_event_option_unit() {
5435        let mut wb = WorldBuilder::new();
5436        wb.register::<u64>(0);
5437        let mut world = wb.build();
5438
5439        fn decode(msg: &[u8]) -> Option<u64> {
5440            if msg.is_empty() {
5441                None
5442            } else {
5443                Some(msg.len() as u64)
5444            }
5445        }
5446        fn store(mut out: ResMut<u64>, val: u64) {
5447            *out = val;
5448        }
5449
5450        let empty = vec![];
5451        let data = vec![1u8, 2, 3];
5452        let r = world.registry_mut();
5453        let mut p = PipelineBuilder::<&[u8]>::new()
5454            .then(decode, r)
5455            .map(store, r)
5456            .build();
5457
5458        p.run(&mut world, &empty); // None
5459        assert_eq!(*world.resource::<u64>(), 0);
5460        p.run(&mut world, &data); // Some(3)
5461        assert_eq!(*world.resource::<u64>(), 3);
5462    }
5463}