Skip to main content

nexus_rt/
pipeline.rs

1// Builder return types are necessarily complex — each combinator returns
2// PipelineBuilder<In, Out, impl FnMut(...)>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4
5//! Pre-resolved pipeline dispatch using [`Param`] steps.
6//!
7//! [`PipelineStart`] begins a typed composition chain where each step
8//! is a named function with [`Param`] dependencies resolved at build
9//! time. The result is a monomorphized closure chain where dispatch-time
10//! resource access is ~3 cycles per fetch (pre-resolved [`ResourceId`](crate::ResourceId)),
11//! not a HashMap lookup.
12//!
13//! Two dispatch tiers in nexus-rt:
14//! 1. **Pipeline** — static after build, pre-resolved, the workhorse
15//! 2. **Callback** — dynamic registration with per-instance context
16//!
17//! # Step function convention
18//!
19//! Params first, step input last, returns output:
20//!
21//! ```ignore
22//! fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
23//! fn enrich(cache: Res<MarketData>, order: ValidOrder) -> EnrichedOrder { .. }
24//! fn submit(mut gw: ResMut<Gateway>, order: CheckedOrder) { gw.send(order); }
25//! ```
26//!
27//! # Combinator split
28//!
29//! **IntoStep-based (pre-resolved, hot path):**
30//! `.then()`, `.map()`, `.and_then()`, `.catch()`
31//!
32//! **Closure-based (cold path, `&mut World`):**
33//! `.on_none()`, `.inspect()`, `.inspect_err()`, `.filter()`, `.ok()`,
34//! `.unwrap_or()`, `.unwrap_or_else()`, `.map_err()`, `.or_else()`,
35//! `.cloned()`, `.dispatch()`, `.tap()`, `.guard()`, `.route()`,
36//! `.switch()`
37//!
38//! # Combinator quick reference
39//!
40//! **Bare value `T`:** `.then()`, `.tap()`, `.guard()` (→ `Option<T>`),
41//! `.dispatch()`, `.route()`, `.switch()`, `.tee()`, `.dedup()` (→ `Option<T>`)
42//!
43//! **Tuple `(A, B, ...)` (2-5 elements):** `.splat()` (→ splat builder,
44//! call `.then()` with destructured args)
45//!
46//! **`Option<T>`:** `.map()`, `.filter()`, `.inspect()`, `.and_then()`,
47//! `.on_none()`, `.ok_or()` (→ `Result`), `.unwrap_or()` (→ `T`),
48//! `.cloned()` (→ `Option<T>` from `Option<&T>`)
49//!
50//! **`Result<T, E>`:** `.map()`, `.and_then()`, `.catch()` (→ `Option<T>`),
51//! `.map_err()`, `.inspect_err()`, `.ok()` (→ `Option<T>`),
52//! `.unwrap_or()` (→ `T`), `.or_else()`
53//!
54//! **`bool`:** `.not()`, `.and()`, `.or()`, `.xor()`
55//!
56//! **Terminal:** `.build()` (→ `Pipeline<In>`), `.build_batch(cap)`
57//! (→ `BatchPipeline<In>`)
58//!
59//! # Splat — tuple destructuring
60//!
61//! Pipeline steps follow a single-value-in, single-value-out convention.
62//! When a step returns a tuple like `(OrderId, f64)`, the next step
63//! must accept the whole tuple as one argument. `.splat()` destructures
64//! the tuple so the next step receives individual arguments instead:
65//!
66//! ```ignore
67//! // Without splat — next step takes the whole tuple:
68//! fn process(pair: (OrderId, f64)) -> bool { .. }
69//!
70//! // With splat — next step takes individual args:
71//! fn process(id: OrderId, price: f64) -> bool { .. }
72//!
73//! PipelineStart::<Order>::new()
74//!     .then(extract, reg)   // Order → (OrderId, f64)
75//!     .splat()              // destructure
76//!     .then(process, reg)   // (OrderId, f64) → bool
77//!     .build();
78//! ```
79//!
80//! Supported for tuples of 2-5 elements. Beyond 5, define a named
81//! struct — if a combinator stage needs that many arguments, a struct
82//! makes the intent clearer and the code more maintainable.
83
84use std::marker::PhantomData;
85
86use crate::Handler;
87use crate::dag::DagArm;
88use crate::handler::Param;
89use crate::world::{Registry, World};
90
91// =============================================================================
92// Step — pre-resolved step with Param state
93// =============================================================================
94
95/// Internal: pre-resolved step with cached Param state.
96///
97/// Users don't construct this directly — it's produced by [`IntoStep`] and
98/// captured inside pipeline chain closures.
99#[doc(hidden)]
100pub struct Step<F, Params: Param> {
101    f: F,
102    state: Params::State,
103    #[allow(dead_code)]
104    name: &'static str,
105}
106
107// =============================================================================
108// StepCall — callable trait for resolved steps
109// =============================================================================
110
111/// Internal: callable trait for resolved steps.
112///
113/// Used as a bound on [`IntoStep::Step`]. Users don't implement this.
114#[doc(hidden)]
115pub trait StepCall<In, Out> {
116    /// Call this step with a world reference and input value.
117    fn call(&mut self, world: &mut World, input: In) -> Out;
118}
119
120// =============================================================================
121// IntoStep — converts a named function into a resolved step
122// =============================================================================
123
124/// Converts a named function into a pre-resolved pipeline step.
125///
126/// Params first, step input last, returns output. Arity 0 (no
127/// Params) supports closures. Arities 1+ require named functions
128/// (same HRTB+GAT limitation as [`IntoHandler`](crate::IntoHandler)).
129///
130/// # Examples
131///
132/// ```ignore
133/// // Arity 0 — closure works
134/// let step = (|x: u32| x * 2).into_step(registry);
135///
136/// // Arity 1 — named function required
137/// fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
138/// let step = validate.into_step(registry);
139/// ```
140pub trait IntoStep<In, Out, Params> {
141    /// The concrete resolved step type.
142    type Step: StepCall<In, Out>;
143
144    /// Resolve Param state from the registry and produce a step.
145    fn into_step(self, registry: &Registry) -> Self::Step;
146}
147
148// =============================================================================
149// Arity 0 — fn(In) -> Out — closures work (no HRTB+GAT issues)
150// =============================================================================
151
152impl<In, Out, F: FnMut(In) -> Out + 'static> StepCall<In, Out> for Step<F, ()> {
153    #[inline(always)]
154    fn call(&mut self, _world: &mut World, input: In) -> Out {
155        (self.f)(input)
156    }
157}
158
159impl<In, Out, F: FnMut(In) -> Out + 'static> IntoStep<In, Out, ()> for F {
160    type Step = Step<F, ()>;
161
162    fn into_step(self, registry: &Registry) -> Self::Step {
163        Step {
164            f: self,
165            state: <() as Param>::init(registry),
166            name: std::any::type_name::<F>(),
167        }
168    }
169}
170
171// =============================================================================
172// Arities 1-8 via macro — HRTB with -> Out
173// =============================================================================
174
175macro_rules! impl_into_step {
176    ($($P:ident),+) => {
177        impl<In, Out, F: 'static, $($P: Param + 'static),+>
178            StepCall<In, Out> for Step<F, ($($P,)+)>
179        where
180            for<'a> &'a mut F:
181                FnMut($($P,)+ In) -> Out +
182                FnMut($($P::Item<'a>,)+ In) -> Out,
183        {
184            #[inline(always)]
185            #[allow(non_snake_case)]
186            fn call(&mut self, world: &mut World, input: In) -> Out {
187                #[allow(clippy::too_many_arguments)]
188                fn call_inner<$($P,)+ Input, Output>(
189                    mut f: impl FnMut($($P,)+ Input) -> Output,
190                    $($P: $P,)+
191                    input: Input,
192                ) -> Output {
193                    f($($P,)+ input)
194                }
195
196                // SAFETY: state was produced by init() on the same registry
197                // that built this world. Single-threaded sequential dispatch
198                // ensures no mutable aliasing across params.
199                #[cfg(debug_assertions)]
200                world.clear_borrows();
201                let ($($P,)+) = unsafe {
202                    <($($P,)+) as Param>::fetch(world, &mut self.state)
203                };
204                call_inner(&mut self.f, $($P,)+ input)
205            }
206        }
207
208        impl<In, Out, F: 'static, $($P: Param + 'static),+>
209            IntoStep<In, Out, ($($P,)+)> for F
210        where
211            for<'a> &'a mut F:
212                FnMut($($P,)+ In) -> Out +
213                FnMut($($P::Item<'a>,)+ In) -> Out,
214        {
215            type Step = Step<F, ($($P,)+)>;
216
217            fn into_step(self, registry: &Registry) -> Self::Step {
218                let state = <($($P,)+) as Param>::init(registry);
219                {
220                    #[allow(non_snake_case)]
221                    let ($($P,)+) = &state;
222                    registry.check_access(&[
223                        $(
224                            (<$P as Param>::resource_id($P),
225                             std::any::type_name::<$P>()),
226                        )+
227                    ]);
228                }
229                Step { f: self, state, name: std::any::type_name::<F>() }
230            }
231        }
232    };
233}
234
235macro_rules! all_tuples {
236    ($m:ident) => {
237        $m!(P0);
238        $m!(P0, P1);
239        $m!(P0, P1, P2);
240        $m!(P0, P1, P2, P3);
241        $m!(P0, P1, P2, P3, P4);
242        $m!(P0, P1, P2, P3, P4, P5);
243        $m!(P0, P1, P2, P3, P4, P5, P6);
244        $m!(P0, P1, P2, P3, P4, P5, P6, P7);
245    };
246}
247
248all_tuples!(impl_into_step);
249
250// =============================================================================
251// SplatCall / IntoSplatStep — splat step dispatch (tuple destructuring)
252// =============================================================================
253//
254// Splat traits mirror StepCall/IntoStep but accept multiple owned values
255// instead of a single input. This lets `.splat()` destructure a tuple
256// output into individual function arguments for the next step.
257//
258// One trait pair per arity (2-5). Past 5, use a named struct.
259
260// -- Splat 2 ------------------------------------------------------------------
261
262/// Internal: callable trait for resolved 2-splat steps.
263#[doc(hidden)]
264pub trait SplatCall2<A, B, Out> {
265    fn call_splat(&mut self, world: &mut World, a: A, b: B) -> Out;
266}
267
268/// Converts a named function into a resolved 2-splat step.
269#[doc(hidden)]
270pub trait IntoSplatStep2<A, B, Out, Params> {
271    type Step: SplatCall2<A, B, Out>;
272    fn into_splat_step(self, registry: &Registry) -> Self::Step;
273}
274
275impl<A, B, Out, F: FnMut(A, B) -> Out + 'static> SplatCall2<A, B, Out> for Step<F, ()> {
276    #[inline(always)]
277    fn call_splat(&mut self, _world: &mut World, a: A, b: B) -> Out {
278        (self.f)(a, b)
279    }
280}
281
282impl<A, B, Out, F: FnMut(A, B) -> Out + 'static> IntoSplatStep2<A, B, Out, ()> for F {
283    type Step = Step<F, ()>;
284    fn into_splat_step(self, registry: &Registry) -> Self::Step {
285        Step {
286            f: self,
287            state: <() as Param>::init(registry),
288            name: std::any::type_name::<F>(),
289        }
290    }
291}
292
293macro_rules! impl_splat2_step {
294    ($($P:ident),+) => {
295        impl<A, B, Out, F: 'static, $($P: Param + 'static),+>
296            SplatCall2<A, B, Out> for Step<F, ($($P,)+)>
297        where
298            for<'a> &'a mut F:
299                FnMut($($P,)+ A, B) -> Out +
300                FnMut($($P::Item<'a>,)+ A, B) -> Out,
301        {
302            #[inline(always)]
303            #[allow(non_snake_case)]
304            fn call_splat(&mut self, world: &mut World, a: A, b: B) -> Out {
305                #[allow(clippy::too_many_arguments)]
306                fn call_inner<$($P,)+ IA, IB, Output>(
307                    mut f: impl FnMut($($P,)+ IA, IB) -> Output,
308                    $($P: $P,)+
309                    a: IA, b: IB,
310                ) -> Output {
311                    f($($P,)+ a, b)
312                }
313                #[cfg(debug_assertions)]
314                world.clear_borrows();
315                let ($($P,)+) = unsafe {
316                    <($($P,)+) as Param>::fetch(world, &mut self.state)
317                };
318                call_inner(&mut self.f, $($P,)+ a, b)
319            }
320        }
321
322        impl<A, B, Out, F: 'static, $($P: Param + 'static),+>
323            IntoSplatStep2<A, B, Out, ($($P,)+)> for F
324        where
325            for<'a> &'a mut F:
326                FnMut($($P,)+ A, B) -> Out +
327                FnMut($($P::Item<'a>,)+ A, B) -> Out,
328        {
329            type Step = Step<F, ($($P,)+)>;
330
331            fn into_splat_step(self, registry: &Registry) -> Self::Step {
332                let state = <($($P,)+) as Param>::init(registry);
333                {
334                    #[allow(non_snake_case)]
335                    let ($($P,)+) = &state;
336                    registry.check_access(&[
337                        $(
338                            (<$P as Param>::resource_id($P),
339                             std::any::type_name::<$P>()),
340                        )+
341                    ]);
342                }
343                Step { f: self, state, name: std::any::type_name::<F>() }
344            }
345        }
346    };
347}
348
349// -- Splat 3 ------------------------------------------------------------------
350
351/// Internal: callable trait for resolved 3-splat steps.
352#[doc(hidden)]
353pub trait SplatCall3<A, B, C, Out> {
354    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C) -> Out;
355}
356
357/// Converts a named function into a resolved 3-splat step.
358#[doc(hidden)]
359pub trait IntoSplatStep3<A, B, C, Out, Params> {
360    type Step: SplatCall3<A, B, C, Out>;
361    fn into_splat_step(self, registry: &Registry) -> Self::Step;
362}
363
364impl<A, B, C, Out, F: FnMut(A, B, C) -> Out + 'static> SplatCall3<A, B, C, Out> for Step<F, ()> {
365    #[inline(always)]
366    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C) -> Out {
367        (self.f)(a, b, c)
368    }
369}
370
371impl<A, B, C, Out, F: FnMut(A, B, C) -> Out + 'static> IntoSplatStep3<A, B, C, Out, ()> for F {
372    type Step = Step<F, ()>;
373    fn into_splat_step(self, registry: &Registry) -> Self::Step {
374        Step {
375            f: self,
376            state: <() as Param>::init(registry),
377            name: std::any::type_name::<F>(),
378        }
379    }
380}
381
382macro_rules! impl_splat3_step {
383    ($($P:ident),+) => {
384        impl<A, B, C, Out, F: 'static, $($P: Param + 'static),+>
385            SplatCall3<A, B, C, Out> for Step<F, ($($P,)+)>
386        where
387            for<'a> &'a mut F:
388                FnMut($($P,)+ A, B, C) -> Out +
389                FnMut($($P::Item<'a>,)+ A, B, C) -> Out,
390        {
391            #[inline(always)]
392            #[allow(non_snake_case)]
393            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C) -> Out {
394                #[allow(clippy::too_many_arguments)]
395                fn call_inner<$($P,)+ IA, IB, IC, Output>(
396                    mut f: impl FnMut($($P,)+ IA, IB, IC) -> Output,
397                    $($P: $P,)+
398                    a: IA, b: IB, c: IC,
399                ) -> Output {
400                    f($($P,)+ a, b, c)
401                }
402                #[cfg(debug_assertions)]
403                world.clear_borrows();
404                let ($($P,)+) = unsafe {
405                    <($($P,)+) as Param>::fetch(world, &mut self.state)
406                };
407                call_inner(&mut self.f, $($P,)+ a, b, c)
408            }
409        }
410
411        impl<A, B, C, Out, F: 'static, $($P: Param + 'static),+>
412            IntoSplatStep3<A, B, C, Out, ($($P,)+)> for F
413        where
414            for<'a> &'a mut F:
415                FnMut($($P,)+ A, B, C) -> Out +
416                FnMut($($P::Item<'a>,)+ A, B, C) -> Out,
417        {
418            type Step = Step<F, ($($P,)+)>;
419
420            fn into_splat_step(self, registry: &Registry) -> Self::Step {
421                let state = <($($P,)+) as Param>::init(registry);
422                {
423                    #[allow(non_snake_case)]
424                    let ($($P,)+) = &state;
425                    registry.check_access(&[
426                        $(
427                            (<$P as Param>::resource_id($P),
428                             std::any::type_name::<$P>()),
429                        )+
430                    ]);
431                }
432                Step { f: self, state, name: std::any::type_name::<F>() }
433            }
434        }
435    };
436}
437
438// -- Splat 4 ------------------------------------------------------------------
439
440/// Internal: callable trait for resolved 4-splat steps.
441#[doc(hidden)]
442pub trait SplatCall4<A, B, C, D, Out> {
443    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D) -> Out;
444}
445
446/// Converts a named function into a resolved 4-splat step.
447#[doc(hidden)]
448pub trait IntoSplatStep4<A, B, C, D, Out, Params> {
449    type Step: SplatCall4<A, B, C, D, Out>;
450    fn into_splat_step(self, registry: &Registry) -> Self::Step;
451}
452
453impl<A, B, C, D, Out, F: FnMut(A, B, C, D) -> Out + 'static> SplatCall4<A, B, C, D, Out>
454    for Step<F, ()>
455{
456    #[inline(always)]
457    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C, d: D) -> Out {
458        (self.f)(a, b, c, d)
459    }
460}
461
462impl<A, B, C, D, Out, F: FnMut(A, B, C, D) -> Out + 'static> IntoSplatStep4<A, B, C, D, Out, ()>
463    for F
464{
465    type Step = Step<F, ()>;
466    fn into_splat_step(self, registry: &Registry) -> Self::Step {
467        Step {
468            f: self,
469            state: <() as Param>::init(registry),
470            name: std::any::type_name::<F>(),
471        }
472    }
473}
474
475macro_rules! impl_splat4_step {
476    ($($P:ident),+) => {
477        impl<A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
478            SplatCall4<A, B, C, D, Out> for Step<F, ($($P,)+)>
479        where for<'a> &'a mut F:
480            FnMut($($P,)+ A, B, C, D) -> Out +
481            FnMut($($P::Item<'a>,)+ A, B, C, D) -> Out,
482        {
483            #[inline(always)]
484            #[allow(non_snake_case)]
485            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D) -> Out {
486                #[allow(clippy::too_many_arguments)]
487                fn call_inner<$($P,)+ IA, IB, IC, ID, Output>(
488                    mut f: impl FnMut($($P,)+ IA, IB, IC, ID) -> Output,
489                    $($P: $P,)+ a: IA, b: IB, c: IC, d: ID,
490                ) -> Output { f($($P,)+ a, b, c, d) }
491                #[cfg(debug_assertions)]
492                world.clear_borrows();
493                let ($($P,)+) = unsafe {
494                    <($($P,)+) as Param>::fetch(world, &mut self.state)
495                };
496                call_inner(&mut self.f, $($P,)+ a, b, c, d)
497            }
498        }
499        impl<A, B, C, D, Out, F: 'static, $($P: Param + 'static),+>
500            IntoSplatStep4<A, B, C, D, Out, ($($P,)+)> for F
501        where for<'a> &'a mut F:
502            FnMut($($P,)+ A, B, C, D) -> Out +
503            FnMut($($P::Item<'a>,)+ A, B, C, D) -> Out,
504        {
505            type Step = Step<F, ($($P,)+)>;
506            fn into_splat_step(self, registry: &Registry) -> Self::Step {
507                let state = <($($P,)+) as Param>::init(registry);
508                { #[allow(non_snake_case)] let ($($P,)+) = &state;
509                  registry.check_access(&[$((<$P as Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
510                Step { f: self, state, name: std::any::type_name::<F>() }
511            }
512        }
513    };
514}
515
516// -- Splat 5 ------------------------------------------------------------------
517
518/// Internal: callable trait for resolved 5-splat steps.
519#[doc(hidden)]
520pub trait SplatCall5<A, B, C, D, E, Out> {
521    #[allow(clippy::many_single_char_names)]
522    fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Out;
523}
524
525/// Converts a named function into a resolved 5-splat step.
526#[doc(hidden)]
527pub trait IntoSplatStep5<A, B, C, D, E, Out, Params> {
528    type Step: SplatCall5<A, B, C, D, E, Out>;
529    fn into_splat_step(self, registry: &Registry) -> Self::Step;
530}
531
532impl<A, B, C, D, E, Out, F: FnMut(A, B, C, D, E) -> Out + 'static> SplatCall5<A, B, C, D, E, Out>
533    for Step<F, ()>
534{
535    #[inline(always)]
536    #[allow(clippy::many_single_char_names)]
537    fn call_splat(&mut self, _world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Out {
538        (self.f)(a, b, c, d, e)
539    }
540}
541
542impl<A, B, C, D, E, Out, F: FnMut(A, B, C, D, E) -> Out + 'static>
543    IntoSplatStep5<A, B, C, D, E, Out, ()> for F
544{
545    type Step = Step<F, ()>;
546    fn into_splat_step(self, registry: &Registry) -> Self::Step {
547        Step {
548            f: self,
549            state: <() as Param>::init(registry),
550            name: std::any::type_name::<F>(),
551        }
552    }
553}
554
555macro_rules! impl_splat5_step {
556    ($($P:ident),+) => {
557        impl<A, B, C, D, E, Out, F: 'static, $($P: Param + 'static),+>
558            SplatCall5<A, B, C, D, E, Out> for Step<F, ($($P,)+)>
559        where for<'a> &'a mut F:
560            FnMut($($P,)+ A, B, C, D, E) -> Out +
561            FnMut($($P::Item<'a>,)+ A, B, C, D, E) -> Out,
562        {
563            #[inline(always)]
564            #[allow(non_snake_case, clippy::many_single_char_names)]
565            fn call_splat(&mut self, world: &mut World, a: A, b: B, c: C, d: D, e: E) -> Out {
566                #[allow(clippy::too_many_arguments)]
567                fn call_inner<$($P,)+ IA, IB, IC, ID, IE, Output>(
568                    mut f: impl FnMut($($P,)+ IA, IB, IC, ID, IE) -> Output,
569                    $($P: $P,)+ a: IA, b: IB, c: IC, d: ID, e: IE,
570                ) -> Output { f($($P,)+ a, b, c, d, e) }
571                #[cfg(debug_assertions)]
572                world.clear_borrows();
573                let ($($P,)+) = unsafe {
574                    <($($P,)+) as Param>::fetch(world, &mut self.state)
575                };
576                call_inner(&mut self.f, $($P,)+ a, b, c, d, e)
577            }
578        }
579        impl<A, B, C, D, E, Out, F: 'static, $($P: Param + 'static),+>
580            IntoSplatStep5<A, B, C, D, E, Out, ($($P,)+)> for F
581        where for<'a> &'a mut F:
582            FnMut($($P,)+ A, B, C, D, E) -> Out +
583            FnMut($($P::Item<'a>,)+ A, B, C, D, E) -> Out,
584        {
585            type Step = Step<F, ($($P,)+)>;
586            fn into_splat_step(self, registry: &Registry) -> Self::Step {
587                let state = <($($P,)+) as Param>::init(registry);
588                { #[allow(non_snake_case)] let ($($P,)+) = &state;
589                  registry.check_access(&[$((<$P as Param>::resource_id($P), std::any::type_name::<$P>()),)+]); }
590                Step { f: self, state, name: std::any::type_name::<F>() }
591            }
592        }
593    };
594}
595
596all_tuples!(impl_splat2_step);
597all_tuples!(impl_splat3_step);
598all_tuples!(impl_splat4_step);
599all_tuples!(impl_splat5_step);
600
601// =============================================================================
602// PipelineStart — entry point
603// =============================================================================
604
605/// Entry point for building a pre-resolved step pipeline.
606///
607/// `In` is the pipeline input type. Call [`.then()`](Self::then) to add
608/// the first step — a named function whose [`Param`] dependencies
609/// are resolved from the registry at build time.
610///
611/// # Examples
612///
613/// ```
614/// use nexus_rt::{WorldBuilder, Res, ResMut, PipelineStart, Handler};
615///
616/// let mut wb = WorldBuilder::new();
617/// wb.register::<u64>(10);
618/// wb.register::<String>(String::new());
619/// let mut world = wb.build();
620///
621/// fn double(factor: Res<u64>, x: u32) -> u64 {
622///     (*factor) * x as u64
623/// }
624/// fn store(mut out: ResMut<String>, val: u64) {
625///     *out = val.to_string();
626/// }
627///
628/// let r = world.registry_mut();
629/// let mut pipeline = PipelineStart::<u32>::new()
630///     .then(double, r)
631///     .then(store, r)
632///     .build();
633///
634/// pipeline.run(&mut world, 5);
635/// assert_eq!(world.resource::<String>().as_str(), "50");
636/// ```
637pub struct PipelineStart<In>(PhantomData<fn(In)>);
638
639impl<In> PipelineStart<In> {
640    /// Create a new step pipeline entry point.
641    pub fn new() -> Self {
642        Self(PhantomData)
643    }
644
645    /// Add the first step. Params resolved from the registry.
646    pub fn then<Out, Params, S: IntoStep<In, Out, Params>>(
647        self,
648        f: S,
649        registry: &Registry,
650    ) -> PipelineBuilder<In, Out, impl FnMut(&mut World, In) -> Out + use<In, Out, Params, S>> {
651        let mut resolved = f.into_step(registry);
652        PipelineBuilder {
653            chain: move |world: &mut World, input: In| resolved.call(world, input),
654            _marker: PhantomData,
655        }
656    }
657
658    /// Closure-based first step for N-ary conditional routing.
659    ///
660    /// Same as [`PipelineBuilder::switch`] but usable as the first step.
661    /// See that method for full documentation.
662    pub fn switch<Out>(
663        self,
664        mut f: impl FnMut(&mut World, In) -> Out,
665    ) -> PipelineBuilder<In, Out, impl FnMut(&mut World, In) -> Out> {
666        PipelineBuilder {
667            chain: move |world: &mut World, input: In| f(world, input),
668            _marker: PhantomData,
669        }
670    }
671}
672
673impl<In> Default for PipelineStart<In> {
674    fn default() -> Self {
675        Self::new()
676    }
677}
678
679// =============================================================================
680// PipelineBuilder — typestate builder
681// =============================================================================
682
683/// Builder that composes pre-resolved pipeline steps via closure nesting.
684///
685/// `In` is the pipeline's input type (fixed). `Out` is the current output.
686/// `Chain` is the concrete composed closure type (opaque, never named by users).
687///
688/// Each combinator consumes `self`, captures the previous chain in a new
689/// closure, and returns a new `PipelineBuilder`. The compiler
690/// monomorphizes the entire chain — zero virtual dispatch through steps.
691///
692/// IntoStep-based methods (`.then()`, `.map()`, `.and_then()`, `.catch()`)
693/// take `&Registry` to resolve Param state at build time. Closure-based
694/// methods don't need the registry.
695pub struct PipelineBuilder<In, Out, Chain> {
696    chain: Chain,
697    _marker: PhantomData<fn(In) -> Out>,
698}
699
700// =============================================================================
701// Core — any Out
702// =============================================================================
703
704impl<In, Out, Chain> PipelineBuilder<In, Out, Chain>
705where
706    Chain: FnMut(&mut World, In) -> Out,
707{
708    /// Add a step. Params resolved from the registry.
709    pub fn then<NewOut, Params, S: IntoStep<Out, NewOut, Params>>(
710        self,
711        f: S,
712        registry: &Registry,
713    ) -> PipelineBuilder<
714        In,
715        NewOut,
716        impl FnMut(&mut World, In) -> NewOut + use<In, Out, NewOut, Params, Chain, S>,
717    > {
718        let mut chain = self.chain;
719        let mut resolved = f.into_step(registry);
720        PipelineBuilder {
721            chain: move |world: &mut World, input: In| {
722                let out = chain(world, input);
723                resolved.call(world, out)
724            },
725            _marker: PhantomData,
726        }
727    }
728
729    /// Run the pipeline directly. No boxing, no `'static` on `In`.
730    pub fn run(&mut self, world: &mut World, input: In) -> Out {
731        (self.chain)(world, input)
732    }
733
734    /// Dispatch pipeline output to a [`Handler<Out>`].
735    ///
736    /// Connects a pipeline's output to any handler — [`HandlerFn`](crate::HandlerFn),
737    /// [`Callback`](crate::Callback), [`Pipeline`], or a combinator like
738    /// [`fan_out!`](crate::fan_out).
739    pub fn dispatch<H: Handler<Out>>(
740        self,
741        mut handler: H,
742    ) -> PipelineBuilder<In, (), impl FnMut(&mut World, In) + use<In, Out, Chain, H>> {
743        let mut chain = self.chain;
744        PipelineBuilder {
745            chain: move |world: &mut World, input: In| {
746                let out = chain(world, input);
747                handler.run(world, out);
748            },
749            _marker: PhantomData,
750        }
751    }
752
753    /// Conditionally wrap the output in `Option`. `Some(val)` if
754    /// the predicate returns true, `None` otherwise.
755    ///
756    /// Enters Option-combinator land — follow with `.map()`,
757    /// `.and_then()`, `.filter()`, `.unwrap_or()`, etc.
758    pub fn guard(
759        self,
760        mut f: impl FnMut(&mut World, &Out) -> bool,
761    ) -> PipelineBuilder<In, Option<Out>, impl FnMut(&mut World, In) -> Option<Out>> {
762        let mut chain = self.chain;
763        PipelineBuilder {
764            chain: move |world: &mut World, input: In| {
765                let val = chain(world, input);
766                if f(world, &val) { Some(val) } else { None }
767            },
768            _marker: PhantomData,
769        }
770    }
771
772    /// Observe the current value without consuming or changing it.
773    ///
774    /// The closure receives `&mut World` and `&Out`. The value passes
775    /// through unchanged. Useful for logging, metrics, or debugging
776    /// mid-chain.
777    pub fn tap(
778        self,
779        mut f: impl FnMut(&mut World, &Out),
780    ) -> PipelineBuilder<In, Out, impl FnMut(&mut World, In) -> Out> {
781        let mut chain = self.chain;
782        PipelineBuilder {
783            chain: move |world: &mut World, input: In| {
784                let val = chain(world, input);
785                f(world, &val);
786                val
787            },
788            _marker: PhantomData,
789        }
790    }
791
792    /// Binary conditional routing. Evaluates the predicate on the
793    /// current value, then moves it into exactly one of two arms.
794    ///
795    /// Both arms must produce the same output type. Build each arm as
796    /// a sub-pipeline from [`PipelineStart`]. For N-ary routing, nest
797    /// `route` calls in the false arm.
798    ///
799    /// ```ignore
800    /// let large = PipelineStart::new().then(large_check, reg).then(submit, reg);
801    /// let small = PipelineStart::new().then(submit, reg);
802    ///
803    /// PipelineStart::<Order>::new()
804    ///     .then(validate, reg)
805    ///     .route(|_, order| order.size > 1000, large, small)
806    ///     .build();
807    /// ```
808    pub fn route<NewOut, C0, C1, P>(
809        self,
810        mut pred: P,
811        on_true: PipelineBuilder<Out, NewOut, C0>,
812        on_false: PipelineBuilder<Out, NewOut, C1>,
813    ) -> PipelineBuilder<
814        In,
815        NewOut,
816        impl FnMut(&mut World, In) -> NewOut + use<In, Out, NewOut, Chain, C0, C1, P>,
817    >
818    where
819        P: FnMut(&mut World, &Out) -> bool,
820        C0: FnMut(&mut World, Out) -> NewOut,
821        C1: FnMut(&mut World, Out) -> NewOut,
822    {
823        let mut chain = self.chain;
824        let mut c0 = on_true.chain;
825        let mut c1 = on_false.chain;
826        PipelineBuilder {
827            chain: move |world: &mut World, input: In| {
828                let val = chain(world, input);
829                if pred(world, &val) {
830                    c0(world, val)
831                } else {
832                    c1(world, val)
833                }
834            },
835            _marker: PhantomData,
836        }
837    }
838
839    /// Fork off a multi-step side-effect chain. The arm borrows
840    /// `&Out`, runs to completion (producing `()`), and the
841    /// original value passes through unchanged.
842    ///
843    /// Multi-step version of [`tap`](Self::tap) — the arm has the
844    /// full DAG combinator API with Param resolution. Build with
845    /// [`DagArmStart::new()`](crate::dag::DagArmStart::new).
846    pub fn tee<C>(
847        self,
848        side: DagArm<Out, (), C>,
849    ) -> PipelineBuilder<In, Out, impl FnMut(&mut World, In) -> Out>
850    where
851        C: FnMut(&mut World, &Out),
852    {
853        let mut chain = self.chain;
854        let mut side_chain = side.chain;
855        PipelineBuilder {
856            chain: move |world: &mut World, input: In| {
857                let val = chain(world, input);
858                side_chain(world, &val);
859                val
860            },
861            _marker: PhantomData,
862        }
863    }
864
865    /// Closure-based step for N-ary conditional routing.
866    ///
867    /// The closure receives `&mut World` and the current value `Out`
868    /// **by value** (pipeline ownership semantics), returning a new
869    /// value of type `NewOut`. Use with [`resolve_step`] to pre-resolve
870    /// per-arm steps with independent [`Param`] sets:
871    ///
872    /// ```ignore
873    /// let mut arm_new    = resolve_step(handle_new, reg);
874    /// let mut arm_cancel = resolve_step(handle_cancel, reg);
875    ///
876    /// pipeline.switch(move |world, order: Order| match order.kind {
877    ///     OrderKind::New    => arm_new(world, order),
878    ///     OrderKind::Cancel => arm_cancel(world, order),
879    /// })
880    /// ```
881    ///
882    /// For simple cases where all arms share the same params, a named
883    /// function with a `match` body passed to [`.then()`](Self::then)
884    /// is sufficient.
885    pub fn switch<NewOut>(
886        self,
887        mut f: impl FnMut(&mut World, Out) -> NewOut,
888    ) -> PipelineBuilder<In, NewOut, impl FnMut(&mut World, In) -> NewOut> {
889        let mut chain = self.chain;
890        PipelineBuilder {
891            chain: move |world: &mut World, input: In| {
892                let val = chain(world, input);
893                f(world, val)
894            },
895            _marker: PhantomData,
896        }
897    }
898}
899
900// =============================================================================
901// Splat — tuple destructuring into individual function arguments
902// =============================================================================
903//
904// `.splat()` transitions from a tuple output to a builder whose `.then()`
905// accepts a function taking the tuple elements as individual arguments.
906// After `.splat().then(f, reg)`, the user is back on PipelineBuilder.
907//
908// Builder types are `#[doc(hidden)]` — users only see `.splat().then()`.
909
910// -- Splat builder types ------------------------------------------------------
911
912macro_rules! define_splat_builders {
913    (
914        $N:literal,
915        start: $SplatStart:ident,
916        mid: $SplatBuilder:ident,
917        into_trait: $IntoSplatStep:ident,
918        call_trait: $SplatCall:ident,
919        ($($T:ident),+),
920        ($($idx:tt),+)
921    ) => {
922        /// Splat builder at pipeline start position.
923        #[doc(hidden)]
924        pub struct $SplatStart<$($T),+>(PhantomData<fn(($($T,)+))>);
925
926        impl<$($T),+> $SplatStart<$($T),+> {
927            /// Add a step that receives the tuple elements as individual arguments.
928            pub fn then<Out, Params, S>(
929                self,
930                f: S,
931                registry: &Registry,
932            ) -> PipelineBuilder<
933                ($($T,)+),
934                Out,
935                impl FnMut(&mut World, ($($T,)+)) -> Out
936                    + use<$($T,)+ Out, Params, S>,
937            >
938            where
939                S: $IntoSplatStep<$($T,)+ Out, Params>,
940            {
941                let mut resolved = f.into_splat_step(registry);
942                PipelineBuilder {
943                    chain: move |world: &mut World, input: ($($T,)+)| {
944                        resolved.call_splat(world, $(input.$idx),+)
945                    },
946                    _marker: PhantomData,
947                }
948            }
949        }
950
951        impl<$($T),+> PipelineStart<($($T,)+)> {
952            /// Destructure the tuple input into individual function arguments.
953            pub fn splat(self) -> $SplatStart<$($T),+> {
954                $SplatStart(PhantomData)
955            }
956        }
957
958        /// Splat builder at mid-chain position.
959        #[doc(hidden)]
960        pub struct $SplatBuilder<In, $($T,)+ Chain> {
961            chain: Chain,
962            _marker: PhantomData<fn(In) -> ($($T,)+)>,
963        }
964
965        impl<In, $($T,)+ Chain> $SplatBuilder<In, $($T,)+ Chain>
966        where
967            Chain: FnMut(&mut World, In) -> ($($T,)+),
968        {
969            /// Add a step that receives the tuple elements as individual arguments.
970            pub fn then<Out, Params, S>(
971                self,
972                f: S,
973                registry: &Registry,
974            ) -> PipelineBuilder<
975                In,
976                Out,
977                impl FnMut(&mut World, In) -> Out
978                    + use<In, $($T,)+ Out, Params, Chain, S>,
979            >
980            where
981                S: $IntoSplatStep<$($T,)+ Out, Params>,
982            {
983                let mut chain = self.chain;
984                let mut resolved = f.into_splat_step(registry);
985                PipelineBuilder {
986                    chain: move |world: &mut World, input: In| {
987                        let tuple = chain(world, input);
988                        resolved.call_splat(world, $(tuple.$idx),+)
989                    },
990                    _marker: PhantomData,
991                }
992            }
993        }
994
995        impl<In, $($T,)+ Chain> PipelineBuilder<In, ($($T,)+), Chain>
996        where
997            Chain: FnMut(&mut World, In) -> ($($T,)+),
998        {
999            /// Destructure the tuple output into individual function arguments.
1000            pub fn splat(self) -> $SplatBuilder<In, $($T,)+ Chain> {
1001                $SplatBuilder {
1002                    chain: self.chain,
1003                    _marker: PhantomData,
1004                }
1005            }
1006        }
1007    };
1008}
1009
1010define_splat_builders!(2,
1011    start: SplatStart2,
1012    mid: SplatBuilder2,
1013    into_trait: IntoSplatStep2,
1014    call_trait: SplatCall2,
1015    (A, B),
1016    (0, 1)
1017);
1018
1019define_splat_builders!(3,
1020    start: SplatStart3,
1021    mid: SplatBuilder3,
1022    into_trait: IntoSplatStep3,
1023    call_trait: SplatCall3,
1024    (A, B, C),
1025    (0, 1, 2)
1026);
1027
1028define_splat_builders!(4,
1029    start: SplatStart4,
1030    mid: SplatBuilder4,
1031    into_trait: IntoSplatStep4,
1032    call_trait: SplatCall4,
1033    (A, B, C, D),
1034    (0, 1, 2, 3)
1035);
1036
1037define_splat_builders!(5,
1038    start: SplatStart5,
1039    mid: SplatBuilder5,
1040    into_trait: IntoSplatStep5,
1041    call_trait: SplatCall5,
1042    (A, B, C, D, E),
1043    (0, 1, 2, 3, 4)
1044);
1045
1046// =============================================================================
1047// Dedup — suppress unchanged values
1048// =============================================================================
1049
1050impl<In, Out: PartialEq + Clone, Chain> PipelineBuilder<In, Out, Chain>
1051where
1052    Chain: FnMut(&mut World, In) -> Out,
1053{
1054    /// Suppress consecutive unchanged values. Returns `Some(val)`
1055    /// when the value differs from the previous invocation, `None`
1056    /// when unchanged. First invocation always returns `Some`.
1057    ///
1058    /// Requires `PartialEq + Clone` — the previous value is stored
1059    /// internally for comparison.
1060    pub fn dedup(
1061        self,
1062    ) -> PipelineBuilder<In, Option<Out>, impl FnMut(&mut World, In) -> Option<Out>> {
1063        let mut chain = self.chain;
1064        let mut prev: Option<Out> = None;
1065        PipelineBuilder {
1066            chain: move |world: &mut World, input: In| {
1067                let val = chain(world, input);
1068                if prev.as_ref() == Some(&val) {
1069                    None
1070                } else {
1071                    prev = Some(val.clone());
1072                    Some(val)
1073                }
1074            },
1075            _marker: PhantomData,
1076        }
1077    }
1078}
1079
1080// =============================================================================
1081// Bool combinators
1082// =============================================================================
1083
1084impl<In, Chain> PipelineBuilder<In, bool, Chain>
1085where
1086    Chain: FnMut(&mut World, In) -> bool,
1087{
1088    /// Invert a boolean value.
1089    #[allow(clippy::should_implement_trait)]
1090    pub fn not(self) -> PipelineBuilder<In, bool, impl FnMut(&mut World, In) -> bool> {
1091        let mut chain = self.chain;
1092        PipelineBuilder {
1093            chain: move |world: &mut World, input: In| !chain(world, input),
1094            _marker: PhantomData,
1095        }
1096    }
1097
1098    /// Short-circuit AND with a second boolean from World state.
1099    ///
1100    /// If the chain produces `false`, the closure is not called.
1101    pub fn and(
1102        self,
1103        mut f: impl FnMut(&mut World) -> bool,
1104    ) -> PipelineBuilder<In, bool, impl FnMut(&mut World, In) -> bool> {
1105        let mut chain = self.chain;
1106        PipelineBuilder {
1107            chain: move |world: &mut World, input: In| chain(world, input) && f(world),
1108            _marker: PhantomData,
1109        }
1110    }
1111
1112    /// Short-circuit OR with a second boolean from World state.
1113    ///
1114    /// If the chain produces `true`, the closure is not called.
1115    pub fn or(
1116        self,
1117        mut f: impl FnMut(&mut World) -> bool,
1118    ) -> PipelineBuilder<In, bool, impl FnMut(&mut World, In) -> bool> {
1119        let mut chain = self.chain;
1120        PipelineBuilder {
1121            chain: move |world: &mut World, input: In| chain(world, input) || f(world),
1122            _marker: PhantomData,
1123        }
1124    }
1125
1126    /// XOR with a second boolean from World state.
1127    ///
1128    /// Both sides are always evaluated.
1129    pub fn xor(
1130        self,
1131        mut f: impl FnMut(&mut World) -> bool,
1132    ) -> PipelineBuilder<In, bool, impl FnMut(&mut World, In) -> bool> {
1133        let mut chain = self.chain;
1134        PipelineBuilder {
1135            chain: move |world: &mut World, input: In| chain(world, input) ^ f(world),
1136            _marker: PhantomData,
1137        }
1138    }
1139}
1140
1141// =============================================================================
1142// Clone helpers — &T → T transitions
1143// =============================================================================
1144
1145impl<'a, In, T: Clone, Chain> PipelineBuilder<In, &'a T, Chain>
1146where
1147    Chain: FnMut(&mut World, In) -> &'a T,
1148{
1149    /// Clone a borrowed output to produce an owned value.
1150    ///
1151    /// Transitions the pipeline from `&T` to `T`. Uses UFCS
1152    /// (`T::clone(val)`) — `val.clone()` on a `&&T` resolves to
1153    /// `<&T as Clone>::clone` and returns `&T`, not `T`.
1154    pub fn cloned(self) -> PipelineBuilder<In, T, impl FnMut(&mut World, In) -> T> {
1155        let mut chain = self.chain;
1156        PipelineBuilder {
1157            chain: move |world: &mut World, input: In| T::clone(chain(world, input)),
1158            _marker: PhantomData,
1159        }
1160    }
1161}
1162
1163impl<'a, In, T: Clone, Chain> PipelineBuilder<In, Option<&'a T>, Chain>
1164where
1165    Chain: FnMut(&mut World, In) -> Option<&'a T>,
1166{
1167    /// Clone inner borrowed value. `Option<&T>` → `Option<T>`.
1168    pub fn cloned(self) -> PipelineBuilder<In, Option<T>, impl FnMut(&mut World, In) -> Option<T>> {
1169        let mut chain = self.chain;
1170        PipelineBuilder {
1171            chain: move |world: &mut World, input: In| chain(world, input).cloned(),
1172            _marker: PhantomData,
1173        }
1174    }
1175}
1176
1177impl<'a, In, T: Clone, E, Chain> PipelineBuilder<In, Result<&'a T, E>, Chain>
1178where
1179    Chain: FnMut(&mut World, In) -> Result<&'a T, E>,
1180{
1181    /// Clone inner borrowed Ok value. `Result<&T, E>` → `Result<T, E>`.
1182    pub fn cloned(
1183        self,
1184    ) -> PipelineBuilder<In, Result<T, E>, impl FnMut(&mut World, In) -> Result<T, E>> {
1185        let mut chain = self.chain;
1186        PipelineBuilder {
1187            chain: move |world: &mut World, input: In| chain(world, input).cloned(),
1188            _marker: PhantomData,
1189        }
1190    }
1191}
1192
1193// =============================================================================
1194// Option helpers — PipelineBuilder<In, Option<T>, Chain>
1195// =============================================================================
1196
1197impl<In, T, Chain> PipelineBuilder<In, Option<T>, Chain>
1198where
1199    Chain: FnMut(&mut World, In) -> Option<T>,
1200{
1201    // -- IntoStep-based (hot path) -------------------------------------------
1202
1203    /// Transform the inner value. Step not called on None.
1204    pub fn map<U, Params, S: IntoStep<T, U, Params>>(
1205        self,
1206        f: S,
1207        registry: &Registry,
1208    ) -> PipelineBuilder<
1209        In,
1210        Option<U>,
1211        impl FnMut(&mut World, In) -> Option<U> + use<In, T, U, Params, Chain, S>,
1212    > {
1213        let mut chain = self.chain;
1214        let mut resolved = f.into_step(registry);
1215        PipelineBuilder {
1216            chain: move |world: &mut World, input: In| {
1217                chain(world, input).map(|val| resolved.call(world, val))
1218            },
1219            _marker: PhantomData,
1220        }
1221    }
1222
1223    /// Short-circuits on None. std: `Option::and_then`
1224    pub fn and_then<U, Params, S: IntoStep<T, Option<U>, Params>>(
1225        self,
1226        f: S,
1227        registry: &Registry,
1228    ) -> PipelineBuilder<
1229        In,
1230        Option<U>,
1231        impl FnMut(&mut World, In) -> Option<U> + use<In, T, U, Params, Chain, S>,
1232    > {
1233        let mut chain = self.chain;
1234        let mut resolved = f.into_step(registry);
1235        PipelineBuilder {
1236            chain: move |world: &mut World, input: In| {
1237                chain(world, input).and_then(|val| resolved.call(world, val))
1238            },
1239            _marker: PhantomData,
1240        }
1241    }
1242
1243    // -- Closure-based (cold path, &mut World) --------------------------------
1244
1245    /// Side effect on None. Complement to [`inspect`](Self::inspect).
1246    pub fn on_none(
1247        self,
1248        mut f: impl FnMut(&mut World),
1249    ) -> PipelineBuilder<In, Option<T>, impl FnMut(&mut World, In) -> Option<T>> {
1250        let mut chain = self.chain;
1251        PipelineBuilder {
1252            chain: move |world: &mut World, input: In| {
1253                let result = chain(world, input);
1254                if result.is_none() {
1255                    f(world);
1256                }
1257                result
1258            },
1259            _marker: PhantomData,
1260        }
1261    }
1262
1263    /// Keep value if predicate holds. std: `Option::filter`
1264    pub fn filter(
1265        self,
1266        mut f: impl FnMut(&mut World, &T) -> bool,
1267    ) -> PipelineBuilder<In, Option<T>, impl FnMut(&mut World, In) -> Option<T>> {
1268        let mut chain = self.chain;
1269        PipelineBuilder {
1270            chain: move |world: &mut World, input: In| {
1271                chain(world, input).filter(|val| f(world, val))
1272            },
1273            _marker: PhantomData,
1274        }
1275    }
1276
1277    /// Side effect on Some value. std: `Option::inspect`
1278    pub fn inspect(
1279        self,
1280        mut f: impl FnMut(&mut World, &T),
1281    ) -> PipelineBuilder<In, Option<T>, impl FnMut(&mut World, In) -> Option<T>> {
1282        let mut chain = self.chain;
1283        PipelineBuilder {
1284            chain: move |world: &mut World, input: In| {
1285                chain(world, input).inspect(|val| f(world, val))
1286            },
1287            _marker: PhantomData,
1288        }
1289    }
1290
1291    /// None becomes Err(err). std: `Option::ok_or`
1292    ///
1293    /// `Clone` required because the pipeline may run many times —
1294    /// the error value is cloned on each `None` invocation.
1295    pub fn ok_or<E: Clone>(
1296        self,
1297        err: E,
1298    ) -> PipelineBuilder<In, Result<T, E>, impl FnMut(&mut World, In) -> Result<T, E>> {
1299        let mut chain = self.chain;
1300        PipelineBuilder {
1301            chain: move |world: &mut World, input: In| {
1302                chain(world, input).ok_or_else(|| err.clone())
1303            },
1304            _marker: PhantomData,
1305        }
1306    }
1307
1308    /// None becomes Err(f()). std: `Option::ok_or_else`
1309    pub fn ok_or_else<E>(
1310        self,
1311        mut f: impl FnMut(&mut World) -> E,
1312    ) -> PipelineBuilder<In, Result<T, E>, impl FnMut(&mut World, In) -> Result<T, E>> {
1313        let mut chain = self.chain;
1314        PipelineBuilder {
1315            chain: move |world: &mut World, input: In| chain(world, input).ok_or_else(|| f(world)),
1316            _marker: PhantomData,
1317        }
1318    }
1319
1320    /// Exit Option — None becomes the default value.
1321    ///
1322    /// `Clone` required because the pipeline may run many times —
1323    /// the default is cloned on each `None` invocation (unlike
1324    /// std's `unwrap_or` which consumes the value once).
1325    pub fn unwrap_or(self, default: T) -> PipelineBuilder<In, T, impl FnMut(&mut World, In) -> T>
1326    where
1327        T: Clone,
1328    {
1329        let mut chain = self.chain;
1330        PipelineBuilder {
1331            chain: move |world: &mut World, input: In| {
1332                chain(world, input).unwrap_or_else(|| default.clone())
1333            },
1334            _marker: PhantomData,
1335        }
1336    }
1337
1338    /// Exit Option — None becomes `f()`.
1339    pub fn unwrap_or_else(
1340        self,
1341        mut f: impl FnMut(&mut World) -> T,
1342    ) -> PipelineBuilder<In, T, impl FnMut(&mut World, In) -> T> {
1343        let mut chain = self.chain;
1344        PipelineBuilder {
1345            chain: move |world: &mut World, input: In| {
1346                chain(world, input).unwrap_or_else(|| f(world))
1347            },
1348            _marker: PhantomData,
1349        }
1350    }
1351}
1352
1353// =============================================================================
1354// Result helpers — PipelineBuilder<In, Result<T, E>, Chain>
1355// =============================================================================
1356
1357impl<In, T, E, Chain> PipelineBuilder<In, Result<T, E>, Chain>
1358where
1359    Chain: FnMut(&mut World, In) -> Result<T, E>,
1360{
1361    // -- IntoStep-based (hot path) -------------------------------------------
1362
1363    /// Transform the Ok value. Step not called on Err.
1364    pub fn map<U, Params, S: IntoStep<T, U, Params>>(
1365        self,
1366        f: S,
1367        registry: &Registry,
1368    ) -> PipelineBuilder<
1369        In,
1370        Result<U, E>,
1371        impl FnMut(&mut World, In) -> Result<U, E> + use<In, T, E, U, Params, Chain, S>,
1372    > {
1373        let mut chain = self.chain;
1374        let mut resolved = f.into_step(registry);
1375        PipelineBuilder {
1376            chain: move |world: &mut World, input: In| {
1377                chain(world, input).map(|val| resolved.call(world, val))
1378            },
1379            _marker: PhantomData,
1380        }
1381    }
1382
1383    /// Short-circuits on Err. std: `Result::and_then`
1384    pub fn and_then<U, Params, S: IntoStep<T, Result<U, E>, Params>>(
1385        self,
1386        f: S,
1387        registry: &Registry,
1388    ) -> PipelineBuilder<
1389        In,
1390        Result<U, E>,
1391        impl FnMut(&mut World, In) -> Result<U, E> + use<In, T, E, U, Params, Chain, S>,
1392    > {
1393        let mut chain = self.chain;
1394        let mut resolved = f.into_step(registry);
1395        PipelineBuilder {
1396            chain: move |world: &mut World, input: In| {
1397                chain(world, input).and_then(|val| resolved.call(world, val))
1398            },
1399            _marker: PhantomData,
1400        }
1401    }
1402
1403    /// Handle error and transition to Option.
1404    ///
1405    /// `Ok(val)` becomes `Some(val)` — handler not called.
1406    /// `Err(err)` calls the handler, then produces `None`.
1407    pub fn catch<Params, S: IntoStep<E, (), Params>>(
1408        self,
1409        f: S,
1410        registry: &Registry,
1411    ) -> PipelineBuilder<
1412        In,
1413        Option<T>,
1414        impl FnMut(&mut World, In) -> Option<T> + use<In, T, E, Params, Chain, S>,
1415    > {
1416        let mut chain = self.chain;
1417        let mut resolved = f.into_step(registry);
1418        PipelineBuilder {
1419            chain: move |world: &mut World, input: In| match chain(world, input) {
1420                Ok(val) => Some(val),
1421                Err(err) => {
1422                    resolved.call(world, err);
1423                    None
1424                }
1425            },
1426            _marker: PhantomData,
1427        }
1428    }
1429
1430    // -- Closure-based (cold path, &mut World) --------------------------------
1431
1432    /// Transform the error. std: `Result::map_err`
1433    pub fn map_err<E2>(
1434        self,
1435        mut f: impl FnMut(&mut World, E) -> E2,
1436    ) -> PipelineBuilder<In, Result<T, E2>, impl FnMut(&mut World, In) -> Result<T, E2>> {
1437        let mut chain = self.chain;
1438        PipelineBuilder {
1439            chain: move |world: &mut World, input: In| {
1440                chain(world, input).map_err(|err| f(world, err))
1441            },
1442            _marker: PhantomData,
1443        }
1444    }
1445
1446    /// Recover from Err. std: `Result::or_else`
1447    pub fn or_else<E2>(
1448        self,
1449        mut f: impl FnMut(&mut World, E) -> Result<T, E2>,
1450    ) -> PipelineBuilder<In, Result<T, E2>, impl FnMut(&mut World, In) -> Result<T, E2>> {
1451        let mut chain = self.chain;
1452        PipelineBuilder {
1453            chain: move |world: &mut World, input: In| {
1454                chain(world, input).or_else(|err| f(world, err))
1455            },
1456            _marker: PhantomData,
1457        }
1458    }
1459
1460    /// Side effect on Ok. std: `Result::inspect`
1461    pub fn inspect(
1462        self,
1463        mut f: impl FnMut(&mut World, &T),
1464    ) -> PipelineBuilder<In, Result<T, E>, impl FnMut(&mut World, In) -> Result<T, E>> {
1465        let mut chain = self.chain;
1466        PipelineBuilder {
1467            chain: move |world: &mut World, input: In| {
1468                chain(world, input).inspect(|val| f(world, val))
1469            },
1470            _marker: PhantomData,
1471        }
1472    }
1473
1474    /// Side effect on Err. std: `Result::inspect_err`
1475    pub fn inspect_err(
1476        self,
1477        mut f: impl FnMut(&mut World, &E),
1478    ) -> PipelineBuilder<In, Result<T, E>, impl FnMut(&mut World, In) -> Result<T, E>> {
1479        let mut chain = self.chain;
1480        PipelineBuilder {
1481            chain: move |world: &mut World, input: In| {
1482                chain(world, input).inspect_err(|err| f(world, err))
1483            },
1484            _marker: PhantomData,
1485        }
1486    }
1487
1488    /// Discard error, enter Option land. std: `Result::ok`
1489    pub fn ok(self) -> PipelineBuilder<In, Option<T>, impl FnMut(&mut World, In) -> Option<T>> {
1490        let mut chain = self.chain;
1491        PipelineBuilder {
1492            chain: move |world: &mut World, input: In| chain(world, input).ok(),
1493            _marker: PhantomData,
1494        }
1495    }
1496
1497    /// Exit Result — Err becomes the default value.
1498    ///
1499    /// `Clone` required because the pipeline may run many times —
1500    /// the default is cloned on each `Err` invocation (unlike
1501    /// std's `unwrap_or` which consumes the value once).
1502    pub fn unwrap_or(self, default: T) -> PipelineBuilder<In, T, impl FnMut(&mut World, In) -> T>
1503    where
1504        T: Clone,
1505    {
1506        let mut chain = self.chain;
1507        PipelineBuilder {
1508            chain: move |world: &mut World, input: In| {
1509                chain(world, input).unwrap_or_else(|_| default.clone())
1510            },
1511            _marker: PhantomData,
1512        }
1513    }
1514
1515    /// Exit Result — Err becomes `f(err)`.
1516    pub fn unwrap_or_else(
1517        self,
1518        mut f: impl FnMut(&mut World, E) -> T,
1519    ) -> PipelineBuilder<In, T, impl FnMut(&mut World, In) -> T> {
1520        let mut chain = self.chain;
1521        PipelineBuilder {
1522            chain: move |world: &mut World, input: In| match chain(world, input) {
1523                Ok(val) => val,
1524                Err(err) => f(world, err),
1525            },
1526            _marker: PhantomData,
1527        }
1528    }
1529}
1530
1531// =============================================================================
1532// PipelineOutput — marker trait for build()
1533// =============================================================================
1534
1535/// Marker trait restricting [`PipelineBuilder::build`] to pipelines
1536/// that produce `()`.
1537///
1538/// If your pipeline produces a value, add a final `.then()` that
1539/// writes it somewhere (e.g. `ResMut<T>`).
1540#[diagnostic::on_unimplemented(
1541    message = "`build()` requires the step pipeline output to be `()`",
1542    label = "this pipeline produces `{Self}`, not `()`",
1543    note = "add a final `.then()` that consumes the output"
1544)]
1545pub trait PipelineOutput {}
1546impl PipelineOutput for () {}
1547impl PipelineOutput for Option<()> {}
1548
1549// =============================================================================
1550// build — when Out = ()
1551// =============================================================================
1552
1553impl<In: 'static, Chain> PipelineBuilder<In, (), Chain>
1554where
1555    Chain: FnMut(&mut World, In) + 'static,
1556{
1557    /// Finalize the pipeline into a [`Pipeline`].
1558    ///
1559    /// The returned pipeline is a concrete, monomorphized type — no boxing,
1560    /// no virtual dispatch. Call `.run()` directly for zero-cost execution,
1561    /// or wrap in `Box<dyn Handler<In>>` when type erasure is needed.
1562    ///
1563    /// Only available when the pipeline ends with `()`. If your chain
1564    /// produces a value, add a final `.then()` that consumes the output.
1565    pub fn build(self) -> Pipeline<In, Chain> {
1566        Pipeline {
1567            chain: self.chain,
1568            _marker: PhantomData,
1569        }
1570    }
1571}
1572
1573// =============================================================================
1574// build_batch — when Out: PipelineOutput (() or Option<()>)
1575// =============================================================================
1576
1577impl<In: 'static, Out: PipelineOutput, Chain> PipelineBuilder<In, Out, Chain>
1578where
1579    Chain: FnMut(&mut World, In) -> Out + 'static,
1580{
1581    /// Finalize into a [`BatchPipeline`] with a pre-allocated input buffer.
1582    ///
1583    /// Same pipeline chain as [`build`](PipelineBuilder::build), but the
1584    /// pipeline owns an input buffer that drivers fill between dispatch
1585    /// cycles. Each call to [`BatchPipeline::run`] drains the buffer,
1586    /// running every item through the chain independently.
1587    ///
1588    /// Available when the pipeline ends with `()` or `Option<()>` (e.g.
1589    /// after `.catch()` or `.filter()`). Pipelines producing values need
1590    /// a final `.then()` that consumes the output.
1591    ///
1592    /// `capacity` is the initial allocation — the buffer can grow if needed,
1593    /// but sizing it for the expected batch size avoids reallocation.
1594    pub fn build_batch(self, capacity: usize) -> BatchPipeline<In, Chain> {
1595        BatchPipeline {
1596            input: Vec::with_capacity(capacity),
1597            chain: self.chain,
1598        }
1599    }
1600}
1601
1602// =============================================================================
1603// Pipeline<In, F> — built pipeline
1604// =============================================================================
1605
1606/// Built step pipeline implementing [`Handler<In>`](crate::Handler).
1607///
1608/// Created by [`PipelineBuilder::build`]. The entire pipeline chain is
1609/// monomorphized at compile time — no boxing, no virtual dispatch.
1610/// Call `.run()` directly for zero-cost execution, or wrap in
1611/// `Box<dyn Handler<In>>` when you need type erasure (single box).
1612///
1613/// Implements [`Handler<In>`](crate::Handler), so it can be stored in
1614/// driver handler collections alongside [`Callback`](crate::Callback)
1615/// and [`HandlerFn`](crate::HandlerFn). For batch processing, see
1616/// [`BatchPipeline`].
1617pub struct Pipeline<In, F> {
1618    chain: F,
1619    _marker: PhantomData<fn(In)>,
1620}
1621
1622impl<In: 'static, F: FnMut(&mut World, In) + Send + 'static> crate::Handler<In>
1623    for Pipeline<In, F>
1624{
1625    fn run(&mut self, world: &mut World, event: In) {
1626        (self.chain)(world, event);
1627    }
1628}
1629
1630// =============================================================================
1631// BatchPipeline<In, F> — pipeline with owned input buffer
1632// =============================================================================
1633
1634/// Batch pipeline that owns a pre-allocated input buffer.
1635///
1636/// Created by [`PipelineBuilder::build_batch`]. Each item flows through
1637/// the full pipeline chain independently — the same per-item `Option`
1638/// and `Result` flow control as [`Pipeline`]. Errors are handled inline
1639/// (via `.catch()`, `.unwrap_or()`, etc.) and the batch continues to
1640/// the next item. No intermediate buffers between steps.
1641///
1642/// # Examples
1643///
1644/// ```
1645/// use nexus_rt::{WorldBuilder, Res, ResMut, PipelineStart};
1646///
1647/// let mut wb = WorldBuilder::new();
1648/// wb.register::<u64>(0);
1649/// let mut world = wb.build();
1650///
1651/// fn accumulate(mut sum: ResMut<u64>, x: u32) {
1652///     *sum += x as u64;
1653/// }
1654///
1655/// let r = world.registry_mut();
1656/// let mut batch = PipelineStart::<u32>::new()
1657///     .then(accumulate, r)
1658///     .build_batch(64);
1659///
1660/// batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
1661/// batch.run(&mut world);
1662///
1663/// assert_eq!(*world.resource::<u64>(), 15);
1664/// assert!(batch.input().is_empty());
1665/// ```
1666pub struct BatchPipeline<In, F> {
1667    input: Vec<In>,
1668    chain: F,
1669}
1670
1671impl<In, Out: PipelineOutput, F: FnMut(&mut World, In) -> Out> BatchPipeline<In, F> {
1672    /// Mutable access to the input buffer. Drivers fill this between
1673    /// dispatch cycles.
1674    pub fn input_mut(&mut self) -> &mut Vec<In> {
1675        &mut self.input
1676    }
1677
1678    /// Read-only access to the input buffer.
1679    pub fn input(&self) -> &[In] {
1680        &self.input
1681    }
1682
1683    /// Drain the input buffer, running each item through the pipeline.
1684    ///
1685    /// Each item gets independent `Option`/`Result` flow control — an
1686    /// error on one item does not affect subsequent items. After `run()`,
1687    /// the input buffer is empty but retains its allocation.
1688    pub fn run(&mut self, world: &mut World) {
1689        for item in self.input.drain(..) {
1690            let _ = (self.chain)(world, item);
1691        }
1692    }
1693}
1694
1695// =============================================================================
1696// resolve_step — pre-resolve a step for manual dispatch (owned input)
1697// =============================================================================
1698
1699/// Resolve a step for use in manual dispatch (e.g. inside a
1700/// [`.switch()`](PipelineBuilder::switch) closure).
1701///
1702/// Returns a closure with pre-resolved [`Param`] state — the same
1703/// build-time resolution that `.then()` performs, but as a standalone
1704/// value the caller can invoke from any context.
1705///
1706/// This is the pipeline (owned-input) counterpart of
1707/// [`dag::resolve_arm`](crate::dag::resolve_arm) (reference-input).
1708///
1709/// # Examples
1710///
1711/// ```ignore
1712/// let mut arm0 = resolve_step(handle_new, reg);
1713/// let mut arm1 = resolve_step(handle_cancel, reg);
1714///
1715/// pipeline.switch(move |world, order: Order| match order.kind {
1716///     OrderKind::New    => arm0(world, order),
1717///     OrderKind::Cancel => arm1(world, order),
1718/// })
1719/// ```
1720pub fn resolve_step<In, Out, Params, S>(
1721    f: S,
1722    registry: &Registry,
1723) -> impl FnMut(&mut World, In) -> Out + use<In, Out, Params, S>
1724where
1725    In: 'static,
1726    Out: 'static,
1727    S: IntoStep<In, Out, Params>,
1728{
1729    let mut resolved = f.into_step(registry);
1730    move |world: &mut World, input: In| resolved.call(world, input)
1731}
1732
1733// =============================================================================
1734// Tests
1735// =============================================================================
1736
1737#[cfg(test)]
1738mod tests {
1739    use super::*;
1740    use crate::{Handler, IntoHandler, Local, Res, ResMut, WorldBuilder, fan_out};
1741
1742    // =========================================================================
1743    // Core dispatch
1744    // =========================================================================
1745
1746    #[test]
1747    fn step_pure_transform() {
1748        let mut world = WorldBuilder::new().build();
1749        let r = world.registry_mut();
1750        let mut p = PipelineStart::<u32>::new().then(|x: u32| x as u64 * 2, r);
1751        assert_eq!(p.run(&mut world, 5), 10u64);
1752    }
1753
1754    #[test]
1755    fn step_one_res() {
1756        let mut wb = WorldBuilder::new();
1757        wb.register::<u64>(10);
1758        let mut world = wb.build();
1759
1760        fn multiply(factor: Res<u64>, x: u32) -> u64 {
1761            *factor * x as u64
1762        }
1763
1764        let r = world.registry_mut();
1765        let mut p = PipelineStart::<u32>::new().then(multiply, r);
1766        assert_eq!(p.run(&mut world, 5), 50);
1767    }
1768
1769    #[test]
1770    fn step_one_res_mut() {
1771        let mut wb = WorldBuilder::new();
1772        wb.register::<u64>(0);
1773        let mut world = wb.build();
1774
1775        fn accumulate(mut total: ResMut<u64>, x: u32) {
1776            *total += x as u64;
1777        }
1778
1779        let r = world.registry_mut();
1780        let mut p = PipelineStart::<u32>::new().then(accumulate, r);
1781        p.run(&mut world, 10);
1782        p.run(&mut world, 5);
1783        assert_eq!(*world.resource::<u64>(), 15);
1784    }
1785
1786    #[test]
1787    fn step_two_params() {
1788        let mut wb = WorldBuilder::new();
1789        wb.register::<u64>(10);
1790        wb.register::<bool>(true);
1791        let mut world = wb.build();
1792
1793        fn conditional(factor: Res<u64>, flag: Res<bool>, x: u32) -> u64 {
1794            if *flag { *factor * x as u64 } else { 0 }
1795        }
1796
1797        let r = world.registry_mut();
1798        let mut p = PipelineStart::<u32>::new().then(conditional, r);
1799        assert_eq!(p.run(&mut world, 5), 50);
1800    }
1801
1802    #[test]
1803    fn step_chain_two() {
1804        let mut wb = WorldBuilder::new();
1805        wb.register::<u64>(2);
1806        let mut world = wb.build();
1807
1808        fn double(factor: Res<u64>, x: u32) -> u64 {
1809            *factor * x as u64
1810        }
1811
1812        let r = world.registry_mut();
1813        let mut p = PipelineStart::<u32>::new()
1814            .then(double, r)
1815            .then(|val: u64| val + 1, r);
1816        assert_eq!(p.run(&mut world, 5), 11); // 2*5 + 1
1817    }
1818
1819    // =========================================================================
1820    // Option combinators
1821    // =========================================================================
1822
1823    #[test]
1824    fn option_map_on_some() {
1825        let mut wb = WorldBuilder::new();
1826        wb.register::<u64>(10);
1827        let mut world = wb.build();
1828
1829        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
1830            *factor + x as u64
1831        }
1832
1833        let r = world.registry_mut();
1834        let mut p = PipelineStart::<u32>::new()
1835            .then(|x: u32| -> Option<u32> { Some(x) }, r)
1836            .map(add_factor, r);
1837        assert_eq!(p.run(&mut world, 5), Some(15));
1838    }
1839
1840    #[test]
1841    fn option_map_skips_none() {
1842        let mut wb = WorldBuilder::new();
1843        wb.register::<bool>(false);
1844        let mut world = wb.build();
1845
1846        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
1847            *flag = true;
1848            0
1849        }
1850
1851        let r = world.registry_mut();
1852        let mut p = PipelineStart::<u32>::new()
1853            .then(|_x: u32| -> Option<u32> { None }, r)
1854            .map(mark, r);
1855        assert_eq!(p.run(&mut world, 5), None);
1856        assert!(!*world.resource::<bool>());
1857    }
1858
1859    #[test]
1860    fn option_and_then_chains() {
1861        let mut wb = WorldBuilder::new();
1862        wb.register::<u64>(10);
1863        let mut world = wb.build();
1864
1865        fn check(min: Res<u64>, x: u32) -> Option<u64> {
1866            let val = x as u64;
1867            if val > *min { Some(val) } else { None }
1868        }
1869
1870        let r = world.registry_mut();
1871        let mut p = PipelineStart::<u32>::new()
1872            .then(|x: u32| Some(x), r)
1873            .and_then(check, r);
1874        assert_eq!(p.run(&mut world, 20), Some(20));
1875    }
1876
1877    #[test]
1878    fn option_and_then_short_circuits() {
1879        let mut wb = WorldBuilder::new();
1880        wb.register::<u64>(10);
1881        let mut world = wb.build();
1882
1883        fn check(min: Res<u64>, x: u32) -> Option<u64> {
1884            let val = x as u64;
1885            if val > *min { Some(val) } else { None }
1886        }
1887
1888        let r = world.registry_mut();
1889        let mut p = PipelineStart::<u32>::new()
1890            .then(|x: u32| Some(x), r)
1891            .and_then(check, r);
1892        assert_eq!(p.run(&mut world, 5), None);
1893    }
1894
1895    #[test]
1896    fn option_on_none_fires() {
1897        let mut wb = WorldBuilder::new();
1898        wb.register::<bool>(false);
1899        let mut world = wb.build();
1900
1901        let r = world.registry_mut();
1902        let mut p = PipelineStart::<u32>::new()
1903            .then(|_x: u32| -> Option<u32> { None }, r)
1904            .on_none(|w| {
1905                *w.resource_mut::<bool>() = true;
1906            });
1907        p.run(&mut world, 0);
1908        assert!(*world.resource::<bool>());
1909    }
1910
1911    #[test]
1912    fn option_filter_keeps() {
1913        let mut world = WorldBuilder::new().build();
1914        let r = world.registry_mut();
1915        let mut p = PipelineStart::<u32>::new()
1916            .then(|x: u32| Some(x), r)
1917            .filter(|_w, x| *x > 3);
1918        assert_eq!(p.run(&mut world, 5), Some(5));
1919    }
1920
1921    #[test]
1922    fn option_filter_drops() {
1923        let mut world = WorldBuilder::new().build();
1924        let r = world.registry_mut();
1925        let mut p = PipelineStart::<u32>::new()
1926            .then(|x: u32| Some(x), r)
1927            .filter(|_w, x| *x > 10);
1928        assert_eq!(p.run(&mut world, 5), None);
1929    }
1930
1931    // =========================================================================
1932    // Result combinators
1933    // =========================================================================
1934
1935    #[test]
1936    fn result_map_on_ok() {
1937        let mut wb = WorldBuilder::new();
1938        wb.register::<u64>(10);
1939        let mut world = wb.build();
1940
1941        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
1942            *factor + x as u64
1943        }
1944
1945        let r = world.registry_mut();
1946        let mut p = PipelineStart::<u32>::new()
1947            .then(|x: u32| -> Result<u32, String> { Ok(x) }, r)
1948            .map(add_factor, r);
1949        assert_eq!(p.run(&mut world, 5), Ok(15));
1950    }
1951
1952    #[test]
1953    fn result_map_skips_err() {
1954        let mut wb = WorldBuilder::new();
1955        wb.register::<bool>(false);
1956        let mut world = wb.build();
1957
1958        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
1959            *flag = true;
1960            0
1961        }
1962
1963        let r = world.registry_mut();
1964        let mut p = PipelineStart::<u32>::new()
1965            .then(|_x: u32| -> Result<u32, String> { Err("fail".into()) }, r)
1966            .map(mark, r);
1967        assert!(p.run(&mut world, 5).is_err());
1968        assert!(!*world.resource::<bool>());
1969    }
1970
1971    #[test]
1972    fn result_catch_handles_error() {
1973        let mut wb = WorldBuilder::new();
1974        wb.register::<String>(String::new());
1975        let mut world = wb.build();
1976
1977        fn log_error(mut log: ResMut<String>, err: String) {
1978            *log = err;
1979        }
1980
1981        let r = world.registry_mut();
1982        let mut p = PipelineStart::<u32>::new()
1983            .then(|_x: u32| -> Result<u32, String> { Err("caught".into()) }, r)
1984            .catch(log_error, r);
1985        assert_eq!(p.run(&mut world, 0), None);
1986        assert_eq!(world.resource::<String>().as_str(), "caught");
1987    }
1988
1989    #[test]
1990    fn result_catch_passes_ok() {
1991        let mut wb = WorldBuilder::new();
1992        wb.register::<String>(String::new());
1993        let mut world = wb.build();
1994
1995        fn log_error(mut log: ResMut<String>, err: String) {
1996            *log = err;
1997        }
1998
1999        let r = world.registry_mut();
2000        let mut p = PipelineStart::<u32>::new()
2001            .then(|x: u32| -> Result<u32, String> { Ok(x) }, r)
2002            .catch(log_error, r);
2003        assert_eq!(p.run(&mut world, 5), Some(5));
2004        assert!(world.resource::<String>().is_empty());
2005    }
2006
2007    // =========================================================================
2008    // Build + Handler
2009    // =========================================================================
2010
2011    #[test]
2012    fn build_produces_handler() {
2013        let mut wb = WorldBuilder::new();
2014        wb.register::<u64>(0);
2015        let mut world = wb.build();
2016
2017        fn accumulate(mut total: ResMut<u64>, x: u32) {
2018            *total += x as u64;
2019        }
2020
2021        let r = world.registry_mut();
2022        let mut pipeline = PipelineStart::<u32>::new().then(accumulate, r).build();
2023
2024        pipeline.run(&mut world, 10);
2025        pipeline.run(&mut world, 5);
2026        assert_eq!(*world.resource::<u64>(), 15);
2027    }
2028
2029    #[test]
2030    fn run_returns_output() {
2031        let mut wb = WorldBuilder::new();
2032        wb.register::<u64>(3);
2033        let mut world = wb.build();
2034
2035        fn multiply(factor: Res<u64>, x: u32) -> u64 {
2036            *factor * x as u64
2037        }
2038
2039        let r = world.registry_mut();
2040        let mut p = PipelineStart::<u32>::new().then(multiply, r);
2041        let result: u64 = p.run(&mut world, 7);
2042        assert_eq!(result, 21);
2043    }
2044
2045    // =========================================================================
2046    // Safety
2047    // =========================================================================
2048
2049    #[test]
2050    #[should_panic(expected = "not registered")]
2051    fn panics_on_missing_resource() {
2052        let mut world = WorldBuilder::new().build();
2053
2054        fn needs_u64(_val: Res<u64>, _x: u32) -> u32 {
2055            0
2056        }
2057
2058        let r = world.registry_mut();
2059        let _p = PipelineStart::<u32>::new().then(needs_u64, r);
2060    }
2061
2062    // =========================================================================
2063    // Access conflict detection
2064    // =========================================================================
2065
2066    #[test]
2067    #[should_panic(expected = "conflicting access")]
2068    fn step_duplicate_access_panics() {
2069        let mut wb = WorldBuilder::new();
2070        wb.register::<u64>(0);
2071        let mut world = wb.build();
2072
2073        fn bad(a: Res<u64>, b: ResMut<u64>, _x: u32) -> u32 {
2074            let _ = (*a, &*b);
2075            0
2076        }
2077
2078        let r = world.registry_mut();
2079        let _p = PipelineStart::<u32>::new().then(bad, r);
2080    }
2081
2082    // =========================================================================
2083    // Integration
2084    // =========================================================================
2085
2086    #[test]
2087    fn local_in_step() {
2088        let mut wb = WorldBuilder::new();
2089        wb.register::<u64>(0);
2090        let mut world = wb.build();
2091
2092        fn count(mut count: Local<u64>, mut total: ResMut<u64>, _x: u32) {
2093            *count += 1;
2094            *total = *count;
2095        }
2096
2097        let r = world.registry_mut();
2098        let mut p = PipelineStart::<u32>::new().then(count, r);
2099        p.run(&mut world, 0);
2100        p.run(&mut world, 0);
2101        p.run(&mut world, 0);
2102        assert_eq!(*world.resource::<u64>(), 3);
2103    }
2104
2105    // =========================================================================
2106    // Option combinators (extended)
2107    // =========================================================================
2108
2109    #[test]
2110    fn option_unwrap_or_some() {
2111        let mut world = WorldBuilder::new().build();
2112        let r = world.registry_mut();
2113        let mut p = PipelineStart::<u32>::new()
2114            .then(|x: u32| -> Option<u32> { Some(x) }, r)
2115            .unwrap_or(99);
2116        assert_eq!(p.run(&mut world, 5), 5);
2117    }
2118
2119    #[test]
2120    fn option_unwrap_or_none() {
2121        let mut world = WorldBuilder::new().build();
2122        let r = world.registry_mut();
2123        let mut p = PipelineStart::<u32>::new()
2124            .then(|_x: u32| -> Option<u32> { None }, r)
2125            .unwrap_or(99);
2126        assert_eq!(p.run(&mut world, 5), 99);
2127    }
2128
2129    #[test]
2130    fn option_unwrap_or_else() {
2131        let mut world = WorldBuilder::new().build();
2132        let r = world.registry_mut();
2133        let mut p = PipelineStart::<u32>::new()
2134            .then(|_x: u32| -> Option<u32> { None }, r)
2135            .unwrap_or_else(|_w| 42);
2136        assert_eq!(p.run(&mut world, 0), 42);
2137    }
2138
2139    #[test]
2140    fn option_ok_or() {
2141        let mut world = WorldBuilder::new().build();
2142        let r = world.registry_mut();
2143        let mut p = PipelineStart::<u32>::new()
2144            .then(|_x: u32| -> Option<u32> { None }, r)
2145            .ok_or("missing");
2146        assert_eq!(p.run(&mut world, 0), Err("missing"));
2147    }
2148
2149    #[test]
2150    fn option_ok_or_some() {
2151        let mut world = WorldBuilder::new().build();
2152        let r = world.registry_mut();
2153        let mut p = PipelineStart::<u32>::new()
2154            .then(|x: u32| -> Option<u32> { Some(x) }, r)
2155            .ok_or("missing");
2156        assert_eq!(p.run(&mut world, 7), Ok(7));
2157    }
2158
2159    #[test]
2160    fn option_ok_or_else() {
2161        let mut world = WorldBuilder::new().build();
2162        let r = world.registry_mut();
2163        let mut p = PipelineStart::<u32>::new()
2164            .then(|_x: u32| -> Option<u32> { None }, r)
2165            .ok_or_else(|_w| "computed");
2166        assert_eq!(p.run(&mut world, 0), Err("computed"));
2167    }
2168
2169    #[test]
2170    fn option_inspect_passes_through() {
2171        let mut wb = WorldBuilder::new();
2172        wb.register::<u64>(0);
2173        let mut world = wb.build();
2174        let r = world.registry_mut();
2175        let mut p = PipelineStart::<u32>::new()
2176            .then(|x: u32| -> Option<u32> { Some(x) }, r)
2177            .inspect(|_w, _val| {});
2178        // inspect should pass through the value unchanged.
2179        assert_eq!(p.run(&mut world, 10), Some(10));
2180    }
2181
2182    // =========================================================================
2183    // Result combinators (extended)
2184    // =========================================================================
2185
2186    #[test]
2187    fn result_map_err() {
2188        let mut world = WorldBuilder::new().build();
2189        let r = world.registry_mut();
2190        let mut p = PipelineStart::<u32>::new()
2191            .then(|_x: u32| -> Result<u32, i32> { Err(-1) }, r)
2192            .map_err(|_w, e| e.to_string());
2193        assert_eq!(p.run(&mut world, 0), Err("-1".to_string()));
2194    }
2195
2196    #[test]
2197    fn result_map_err_ok_passthrough() {
2198        let mut world = WorldBuilder::new().build();
2199        let r = world.registry_mut();
2200        let mut p = PipelineStart::<u32>::new()
2201            .then(|x: u32| -> Result<u32, i32> { Ok(x) }, r)
2202            .map_err(|_w, e| e.to_string());
2203        assert_eq!(p.run(&mut world, 5), Ok(5));
2204    }
2205
2206    #[test]
2207    fn result_or_else() {
2208        let mut world = WorldBuilder::new().build();
2209        let r = world.registry_mut();
2210        let mut p = PipelineStart::<u32>::new()
2211            .then(|_x: u32| -> Result<u32, &str> { Err("fail") }, r)
2212            .or_else(|_w, _e| Ok::<u32, &str>(42));
2213        assert_eq!(p.run(&mut world, 0), Ok(42));
2214    }
2215
2216    #[test]
2217    fn result_inspect_passes_through() {
2218        let mut world = WorldBuilder::new().build();
2219        let r = world.registry_mut();
2220        let mut p = PipelineStart::<u32>::new()
2221            .then(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
2222            .inspect(|_w, _val| {});
2223        // inspect should pass through Ok unchanged.
2224        assert_eq!(p.run(&mut world, 7), Ok(7));
2225    }
2226
2227    #[test]
2228    fn result_inspect_err_passes_through() {
2229        let mut world = WorldBuilder::new().build();
2230        let r = world.registry_mut();
2231        let mut p = PipelineStart::<u32>::new()
2232            .then(|_x: u32| -> Result<u32, &str> { Err("bad") }, r)
2233            .inspect_err(|_w, _e| {});
2234        // inspect_err should pass through Err unchanged.
2235        assert_eq!(p.run(&mut world, 0), Err("bad"));
2236    }
2237
2238    #[test]
2239    fn result_ok_converts() {
2240        let mut world = WorldBuilder::new().build();
2241        let r = world.registry_mut();
2242        let mut p = PipelineStart::<u32>::new()
2243            .then(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
2244            .ok();
2245        assert_eq!(p.run(&mut world, 5), Some(5));
2246    }
2247
2248    #[test]
2249    fn result_ok_drops_err() {
2250        let mut world = WorldBuilder::new().build();
2251        let r = world.registry_mut();
2252        let mut p = PipelineStart::<u32>::new()
2253            .then(|_x: u32| -> Result<u32, &str> { Err("gone") }, r)
2254            .ok();
2255        assert_eq!(p.run(&mut world, 0), None);
2256    }
2257
2258    #[test]
2259    fn result_unwrap_or() {
2260        let mut world = WorldBuilder::new().build();
2261        let r = world.registry_mut();
2262        let mut p = PipelineStart::<u32>::new()
2263            .then(|_x: u32| -> Result<u32, &str> { Err("x") }, r)
2264            .unwrap_or(99);
2265        assert_eq!(p.run(&mut world, 0), 99);
2266    }
2267
2268    #[test]
2269    fn result_unwrap_or_else() {
2270        let mut world = WorldBuilder::new().build();
2271        let r = world.registry_mut();
2272        let mut p = PipelineStart::<u32>::new()
2273            .then(|_x: u32| -> Result<u32, i32> { Err(-5) }, r)
2274            .unwrap_or_else(|_w, e| e.unsigned_abs());
2275        assert_eq!(p.run(&mut world, 0), 5);
2276    }
2277
2278    // =========================================================================
2279    // Batch pipeline
2280    // =========================================================================
2281
2282    #[test]
2283    fn batch_accumulates() {
2284        let mut wb = WorldBuilder::new();
2285        wb.register::<u64>(0);
2286        let mut world = wb.build();
2287
2288        fn accumulate(mut sum: ResMut<u64>, x: u32) {
2289            *sum += x as u64;
2290        }
2291
2292        let r = world.registry_mut();
2293        let mut batch = PipelineStart::<u32>::new()
2294            .then(accumulate, r)
2295            .build_batch(16);
2296
2297        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
2298        batch.run(&mut world);
2299
2300        assert_eq!(*world.resource::<u64>(), 15);
2301        assert!(batch.input().is_empty());
2302    }
2303
2304    #[test]
2305    fn batch_retains_allocation() {
2306        let mut world = WorldBuilder::new().build();
2307        let r = world.registry_mut();
2308        let mut batch = PipelineStart::<u32>::new()
2309            .then(|_x: u32| {}, r)
2310            .build_batch(64);
2311
2312        batch.input_mut().extend_from_slice(&[1, 2, 3]);
2313        batch.run(&mut world);
2314
2315        assert!(batch.input().is_empty());
2316        assert!(batch.input_mut().capacity() >= 64);
2317    }
2318
2319    #[test]
2320    fn batch_empty_is_noop() {
2321        let mut wb = WorldBuilder::new();
2322        wb.register::<u64>(0);
2323        let mut world = wb.build();
2324
2325        fn accumulate(mut sum: ResMut<u64>, x: u32) {
2326            *sum += x as u64;
2327        }
2328
2329        let r = world.registry_mut();
2330        let mut batch = PipelineStart::<u32>::new()
2331            .then(accumulate, r)
2332            .build_batch(16);
2333
2334        batch.run(&mut world);
2335        assert_eq!(*world.resource::<u64>(), 0);
2336    }
2337
2338    #[test]
2339    fn batch_catch_continues_on_error() {
2340        let mut wb = WorldBuilder::new();
2341        wb.register::<u64>(0);
2342        wb.register::<u32>(0);
2343        let mut world = wb.build();
2344
2345        fn validate(x: u32) -> Result<u32, &'static str> {
2346            if x > 0 { Ok(x) } else { Err("zero") }
2347        }
2348
2349        fn count_errors(mut errs: ResMut<u32>, _err: &'static str) {
2350            *errs += 1;
2351        }
2352
2353        fn accumulate(mut sum: ResMut<u64>, x: u32) {
2354            *sum += x as u64;
2355        }
2356
2357        let r = world.registry_mut();
2358        let mut batch = PipelineStart::<u32>::new()
2359            .then(validate, r)
2360            .catch(count_errors, r)
2361            .map(accumulate, r)
2362            .build_batch(16);
2363
2364        // Items: 1, 0 (error), 2, 0 (error), 3
2365        batch.input_mut().extend_from_slice(&[1, 0, 2, 0, 3]);
2366        batch.run(&mut world);
2367
2368        assert_eq!(*world.resource::<u64>(), 6); // 1 + 2 + 3
2369        assert_eq!(*world.resource::<u32>(), 2); // 2 errors
2370    }
2371
2372    #[test]
2373    fn batch_filter_skips_items() {
2374        let mut wb = WorldBuilder::new();
2375        wb.register::<u64>(0);
2376        let mut world = wb.build();
2377
2378        fn accumulate(mut sum: ResMut<u64>, x: u32) {
2379            *sum += x as u64;
2380        }
2381
2382        let r = world.registry_mut();
2383        let mut batch = PipelineStart::<u32>::new()
2384            .then(
2385                |x: u32| -> Option<u32> { if x > 2 { Some(x) } else { None } },
2386                r,
2387            )
2388            .map(accumulate, r)
2389            .build_batch(16);
2390
2391        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
2392        batch.run(&mut world);
2393
2394        assert_eq!(*world.resource::<u64>(), 12); // 3 + 4 + 5
2395    }
2396
2397    #[test]
2398    fn batch_multiple_runs_accumulate() {
2399        let mut wb = WorldBuilder::new();
2400        wb.register::<u64>(0);
2401        let mut world = wb.build();
2402
2403        fn accumulate(mut sum: ResMut<u64>, x: u32) {
2404            *sum += x as u64;
2405        }
2406
2407        let r = world.registry_mut();
2408        let mut batch = PipelineStart::<u32>::new()
2409            .then(accumulate, r)
2410            .build_batch(16);
2411
2412        batch.input_mut().extend_from_slice(&[1, 2, 3]);
2413        batch.run(&mut world);
2414        assert_eq!(*world.resource::<u64>(), 6);
2415
2416        batch.input_mut().extend_from_slice(&[4, 5]);
2417        batch.run(&mut world);
2418        assert_eq!(*world.resource::<u64>(), 15);
2419    }
2420
2421    #[test]
2422    fn batch_with_world_access() {
2423        let mut wb = WorldBuilder::new();
2424        wb.register::<u64>(10); // multiplier
2425        wb.register::<Vec<u64>>(Vec::new());
2426        let mut world = wb.build();
2427
2428        fn multiply_and_collect(factor: Res<u64>, mut out: ResMut<Vec<u64>>, x: u32) {
2429            out.push(x as u64 * *factor);
2430        }
2431
2432        let r = world.registry_mut();
2433        let mut batch = PipelineStart::<u32>::new()
2434            .then(multiply_and_collect, r)
2435            .build_batch(16);
2436
2437        batch.input_mut().extend_from_slice(&[1, 2, 3]);
2438        batch.run(&mut world);
2439
2440        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[10, 20, 30]);
2441    }
2442
2443    // =========================================================================
2444    // Cloned combinator
2445    // =========================================================================
2446
2447    // Named functions for proper lifetime elision (&'a u32 → &'a u32).
2448    // Closures get two independent lifetimes and fail to compile.
2449    fn ref_identity(x: &u32) -> &u32 {
2450        x
2451    }
2452    fn ref_wrap_some(x: &u32) -> Option<&u32> {
2453        Some(x)
2454    }
2455    fn ref_wrap_none(_x: &u32) -> Option<&u32> {
2456        None
2457    }
2458    fn ref_wrap_ok(x: &u32) -> Result<&u32, String> {
2459        Ok(x)
2460    }
2461    fn ref_wrap_err(_x: &u32) -> Result<&u32, String> {
2462        Err("fail".into())
2463    }
2464
2465    #[test]
2466    fn cloned_bare() {
2467        let mut world = WorldBuilder::new().build();
2468        // val before p — val must outlive the pipeline's In = &u32
2469        let val = 42u32;
2470        let r = world.registry_mut();
2471        let mut p = PipelineStart::<&u32>::new().then(ref_identity, r).cloned();
2472        assert_eq!(p.run(&mut world, &val), 42u32);
2473    }
2474
2475    #[test]
2476    fn cloned_option_some() {
2477        let mut world = WorldBuilder::new().build();
2478        let val = 42u32;
2479        let r = world.registry_mut();
2480        let mut p = PipelineStart::<&u32>::new().then(ref_wrap_some, r).cloned();
2481        assert_eq!(p.run(&mut world, &val), Some(42u32));
2482    }
2483
2484    #[test]
2485    fn cloned_option_none() {
2486        let mut world = WorldBuilder::new().build();
2487        let val = 42u32;
2488        let r = world.registry_mut();
2489        let mut p = PipelineStart::<&u32>::new().then(ref_wrap_none, r).cloned();
2490        assert_eq!(p.run(&mut world, &val), None);
2491    }
2492
2493    #[test]
2494    fn cloned_result_ok() {
2495        let mut world = WorldBuilder::new().build();
2496        let val = 42u32;
2497        let r = world.registry_mut();
2498        let mut p = PipelineStart::<&u32>::new().then(ref_wrap_ok, r).cloned();
2499        assert_eq!(p.run(&mut world, &val), Ok(42u32));
2500    }
2501
2502    #[test]
2503    fn cloned_result_err() {
2504        let mut world = WorldBuilder::new().build();
2505        let val = 42u32;
2506        let r = world.registry_mut();
2507        let mut p = PipelineStart::<&u32>::new().then(ref_wrap_err, r).cloned();
2508        assert_eq!(p.run(&mut world, &val), Err("fail".into()));
2509    }
2510
2511    // =========================================================================
2512    // Dispatch combinator
2513    // =========================================================================
2514
2515    #[test]
2516    fn dispatch_to_handler() {
2517        let mut wb = WorldBuilder::new();
2518        wb.register::<u64>(0);
2519        let mut world = wb.build();
2520
2521        fn store(mut out: ResMut<u64>, val: u32) {
2522            *out = val as u64;
2523        }
2524
2525        let r = world.registry_mut();
2526        let handler = PipelineStart::<u32>::new().then(store, r).build();
2527
2528        let mut p = PipelineStart::<u32>::new()
2529            .then(|x: u32| x * 2, r)
2530            .dispatch(handler)
2531            .build();
2532
2533        p.run(&mut world, 5);
2534        assert_eq!(*world.resource::<u64>(), 10);
2535    }
2536
2537    #[test]
2538    fn dispatch_to_fanout() {
2539        let mut wb = WorldBuilder::new();
2540        wb.register::<u64>(0);
2541        wb.register::<i64>(0);
2542        let mut world = wb.build();
2543
2544        fn write_u64(mut sink: ResMut<u64>, event: &u32) {
2545            *sink += *event as u64;
2546        }
2547        fn write_i64(mut sink: ResMut<i64>, event: &u32) {
2548            *sink += *event as i64;
2549        }
2550
2551        let h1 = write_u64.into_handler(world.registry());
2552        let h2 = write_i64.into_handler(world.registry());
2553        let fan = fan_out!(h1, h2);
2554
2555        let r = world.registry_mut();
2556        let mut p = PipelineStart::<u32>::new()
2557            .then(|x: u32| x * 2, r)
2558            .dispatch(fan)
2559            .build();
2560
2561        p.run(&mut world, 5);
2562        assert_eq!(*world.resource::<u64>(), 10);
2563        assert_eq!(*world.resource::<i64>(), 10);
2564    }
2565
2566    #[test]
2567    fn dispatch_to_broadcast() {
2568        let mut wb = WorldBuilder::new();
2569        wb.register::<u64>(0);
2570        let mut world = wb.build();
2571
2572        fn write_u64(mut sink: ResMut<u64>, event: &u32) {
2573            *sink += *event as u64;
2574        }
2575
2576        let mut broadcast = crate::Broadcast::<u32>::new();
2577        broadcast.add(write_u64.into_handler(world.registry()));
2578        broadcast.add(write_u64.into_handler(world.registry()));
2579
2580        let r = world.registry_mut();
2581        let mut p = PipelineStart::<u32>::new()
2582            .then(|x: u32| x + 1, r)
2583            .dispatch(broadcast)
2584            .build();
2585
2586        p.run(&mut world, 4);
2587        assert_eq!(*world.resource::<u64>(), 10); // 5 + 5
2588    }
2589
2590    #[test]
2591    fn dispatch_build_produces_handler() {
2592        let mut wb = WorldBuilder::new();
2593        wb.register::<u64>(0);
2594        let mut world = wb.build();
2595
2596        fn store(mut out: ResMut<u64>, val: u32) {
2597            *out = val as u64;
2598        }
2599
2600        let r = world.registry_mut();
2601        let inner = PipelineStart::<u32>::new().then(store, r).build();
2602
2603        let mut pipeline: Box<dyn Handler<u32>> = Box::new(
2604            PipelineStart::<u32>::new()
2605                .then(|x: u32| x + 1, r)
2606                .dispatch(inner)
2607                .build(),
2608        );
2609
2610        pipeline.run(&mut world, 9);
2611        assert_eq!(*world.resource::<u64>(), 10);
2612    }
2613
2614    // -- Guard combinator --
2615
2616    #[test]
2617    fn pipeline_guard_keeps() {
2618        fn sink(mut out: ResMut<u64>, val: Option<u64>) {
2619            *out = val.unwrap_or(0);
2620        }
2621        let mut wb = WorldBuilder::new();
2622        wb.register::<u64>(0);
2623        let mut world = wb.build();
2624        let reg = world.registry();
2625
2626        let mut p = PipelineStart::<u32>::new()
2627            .then(|x: u32| x as u64, reg)
2628            .guard(|_w, v| *v > 3)
2629            .then(sink, reg);
2630
2631        p.run(&mut world, 5u32);
2632        assert_eq!(*world.resource::<u64>(), 5);
2633    }
2634
2635    #[test]
2636    fn pipeline_guard_drops() {
2637        fn sink(mut out: ResMut<u64>, val: Option<u64>) {
2638            *out = val.unwrap_or(999);
2639        }
2640        let mut wb = WorldBuilder::new();
2641        wb.register::<u64>(0);
2642        let mut world = wb.build();
2643        let reg = world.registry();
2644
2645        let mut p = PipelineStart::<u32>::new()
2646            .then(|x: u32| x as u64, reg)
2647            .guard(|_w, v| *v > 10)
2648            .then(sink, reg);
2649
2650        p.run(&mut world, 5u32);
2651        assert_eq!(*world.resource::<u64>(), 999);
2652    }
2653
2654    // -- Tap combinator --
2655
2656    #[test]
2657    fn pipeline_tap_observes_without_changing() {
2658        fn sink(mut out: ResMut<u64>, val: u64) {
2659            *out = val;
2660        }
2661        let mut wb = WorldBuilder::new();
2662        wb.register::<u64>(0);
2663        wb.register::<bool>(false);
2664        let mut world = wb.build();
2665        let reg = world.registry();
2666
2667        let mut p = PipelineStart::<u32>::new()
2668            .then(|x: u32| x as u64 * 2, reg)
2669            .tap(|w, val| {
2670                *w.resource_mut::<bool>() = *val == 10;
2671            })
2672            .then(sink, reg);
2673
2674        p.run(&mut world, 5u32);
2675        assert_eq!(*world.resource::<u64>(), 10); // value passed through
2676        assert!(*world.resource::<bool>()); // tap fired
2677    }
2678
2679    // -- Route combinator --
2680
2681    #[test]
2682    fn pipeline_route_true_arm() {
2683        fn sink(mut out: ResMut<u64>, val: u64) {
2684            *out = val;
2685        }
2686        let mut wb = WorldBuilder::new();
2687        wb.register::<u64>(0);
2688        let mut world = wb.build();
2689        let reg = world.registry();
2690
2691        let arm_t = PipelineStart::new().then(|x: u64| x * 2, reg);
2692        let arm_f = PipelineStart::new().then(|x: u64| x * 3, reg);
2693
2694        let mut p = PipelineStart::<u32>::new()
2695            .then(|x: u32| x as u64, reg)
2696            .route(|_w, v| *v > 3, arm_t, arm_f)
2697            .then(sink, reg);
2698
2699        p.run(&mut world, 5u32); // 5 > 3 → true arm → double → 10
2700        assert_eq!(*world.resource::<u64>(), 10);
2701    }
2702
2703    #[test]
2704    fn pipeline_route_false_arm() {
2705        fn sink(mut out: ResMut<u64>, val: u64) {
2706            *out = val;
2707        }
2708        let mut wb = WorldBuilder::new();
2709        wb.register::<u64>(0);
2710        let mut world = wb.build();
2711        let reg = world.registry();
2712
2713        let arm_t = PipelineStart::new().then(|x: u64| x * 2, reg);
2714        let arm_f = PipelineStart::new().then(|x: u64| x * 3, reg);
2715
2716        let mut p = PipelineStart::<u32>::new()
2717            .then(|x: u32| x as u64, reg)
2718            .route(|_w, v| *v > 10, arm_t, arm_f)
2719            .then(sink, reg);
2720
2721        p.run(&mut world, 5u32); // 5 <= 10 → false arm → triple → 15
2722        assert_eq!(*world.resource::<u64>(), 15);
2723    }
2724
2725    #[test]
2726    fn pipeline_route_nested() {
2727        fn sink(mut out: ResMut<u64>, val: u64) {
2728            *out = val;
2729        }
2730        let mut wb = WorldBuilder::new();
2731        wb.register::<u64>(0);
2732        let mut world = wb.build();
2733        let reg = world.registry();
2734
2735        // N-ary via nesting: <5 → +100, 5..10 → +200, >=10 → +300
2736        let inner_t = PipelineStart::new().then(|x: u64| x + 200, reg);
2737        let inner_f = PipelineStart::new().then(|x: u64| x + 300, reg);
2738        let outer_t = PipelineStart::new().then(|x: u64| x + 100, reg);
2739        let outer_f =
2740            PipelineStart::new()
2741                .then(|x: u64| x, reg)
2742                .route(|_w, v| *v < 10, inner_t, inner_f);
2743
2744        let mut p = PipelineStart::<u32>::new()
2745            .then(|x: u32| x as u64, reg)
2746            .route(|_w, v| *v < 5, outer_t, outer_f)
2747            .then(sink, reg);
2748
2749        p.run(&mut world, 3u32); // 3 < 5 → +100 → 103
2750        assert_eq!(*world.resource::<u64>(), 103);
2751
2752        p.run(&mut world, 7u32); // 7 >= 5, 7 < 10 → +200 → 207
2753        assert_eq!(*world.resource::<u64>(), 207);
2754
2755        p.run(&mut world, 15u32); // 15 >= 5, 15 >= 10 → +300 → 315
2756        assert_eq!(*world.resource::<u64>(), 315);
2757    }
2758
2759    // -- Tee combinator --
2760
2761    #[test]
2762    fn pipeline_tee_side_effect_chain() {
2763        use crate::dag::DagArmStart;
2764
2765        fn log_step(mut counter: ResMut<u32>, _val: &u64) {
2766            *counter += 1;
2767        }
2768        fn sink(mut out: ResMut<u64>, val: u64) {
2769            *out = val;
2770        }
2771        let mut wb = WorldBuilder::new();
2772        wb.register::<u64>(0);
2773        wb.register::<u32>(0);
2774        let mut world = wb.build();
2775        let reg = world.registry();
2776
2777        let side = DagArmStart::new().then(log_step, reg);
2778
2779        let mut p = PipelineStart::<u32>::new()
2780            .then(|x: u32| x as u64 * 2, reg)
2781            .tee(side)
2782            .then(sink, reg);
2783
2784        p.run(&mut world, 5u32);
2785        assert_eq!(*world.resource::<u64>(), 10); // value passed through
2786        assert_eq!(*world.resource::<u32>(), 1); // side-effect fired
2787
2788        p.run(&mut world, 7u32);
2789        assert_eq!(*world.resource::<u64>(), 14);
2790        assert_eq!(*world.resource::<u32>(), 2);
2791    }
2792
2793    // -- Dedup combinator --
2794
2795    #[test]
2796    fn pipeline_dedup_suppresses_unchanged() {
2797        fn sink(mut out: ResMut<u32>, val: Option<u64>) {
2798            if val.is_some() {
2799                *out += 1;
2800            }
2801        }
2802        let mut wb = WorldBuilder::new();
2803        wb.register::<u32>(0);
2804        let mut world = wb.build();
2805        let reg = world.registry();
2806
2807        let mut p = PipelineStart::<u32>::new()
2808            .then(|x: u32| x as u64 / 2, reg)
2809            .dedup()
2810            .then(sink, reg);
2811
2812        p.run(&mut world, 4u32); // 2 — first, Some
2813        assert_eq!(*world.resource::<u32>(), 1);
2814
2815        p.run(&mut world, 5u32); // 2 — same, None
2816        assert_eq!(*world.resource::<u32>(), 1);
2817
2818        p.run(&mut world, 6u32); // 3 — changed, Some
2819        assert_eq!(*world.resource::<u32>(), 2);
2820    }
2821
2822    // -- Bool combinators --
2823
2824    #[test]
2825    fn pipeline_not() {
2826        fn sink(mut out: ResMut<bool>, val: bool) {
2827            *out = val;
2828        }
2829        let mut wb = WorldBuilder::new();
2830        wb.register::<bool>(false);
2831        let mut world = wb.build();
2832        let reg = world.registry();
2833
2834        let mut p = PipelineStart::<u32>::new()
2835            .then(|x: u32| x > 5, reg)
2836            .not()
2837            .then(sink, reg);
2838
2839        p.run(&mut world, 3u32); // 3 > 5 = false, not = true
2840        assert!(*world.resource::<bool>());
2841
2842        p.run(&mut world, 10u32); // 10 > 5 = true, not = false
2843        assert!(!*world.resource::<bool>());
2844    }
2845
2846    #[test]
2847    fn pipeline_and() {
2848        fn sink(mut out: ResMut<bool>, val: bool) {
2849            *out = val;
2850        }
2851        let mut wb = WorldBuilder::new();
2852        wb.register::<bool>(true);
2853        let mut world = wb.build();
2854        let reg = world.registry();
2855
2856        let mut p = PipelineStart::<u32>::new()
2857            .then(|x: u32| x > 5, reg)
2858            .and(|w| *w.resource::<bool>())
2859            .then(sink, reg);
2860
2861        p.run(&mut world, 10u32); // true && true = true
2862        assert!(*world.resource::<bool>());
2863
2864        *world.resource_mut::<bool>() = false;
2865        p.run(&mut world, 10u32); // true && false = false
2866        assert!(!*world.resource::<bool>());
2867    }
2868
2869    #[test]
2870    fn pipeline_or() {
2871        fn sink(mut out: ResMut<bool>, val: bool) {
2872            *out = val;
2873        }
2874        let mut wb = WorldBuilder::new();
2875        wb.register::<bool>(false);
2876        let mut world = wb.build();
2877        let reg = world.registry();
2878
2879        let mut p = PipelineStart::<u32>::new()
2880            .then(|x: u32| x > 5, reg)
2881            .or(|w| *w.resource::<bool>())
2882            .then(sink, reg);
2883
2884        p.run(&mut world, 3u32); // false || false = false
2885        assert!(!*world.resource::<bool>());
2886
2887        *world.resource_mut::<bool>() = true;
2888        p.run(&mut world, 3u32); // false || true = true
2889        assert!(*world.resource::<bool>());
2890    }
2891
2892    #[test]
2893    fn pipeline_xor() {
2894        fn sink(mut out: ResMut<bool>, val: bool) {
2895            *out = val;
2896        }
2897        let mut wb = WorldBuilder::new();
2898        wb.register::<bool>(true);
2899        let mut world = wb.build();
2900        let reg = world.registry();
2901
2902        let mut p = PipelineStart::<u32>::new()
2903            .then(|x: u32| x > 5, reg)
2904            .xor(|w| *w.resource::<bool>())
2905            .then(sink, reg);
2906
2907        p.run(&mut world, 10u32); // true ^ true = false
2908        assert!(!*world.resource::<bool>());
2909    }
2910
2911    // =========================================================================
2912    // Splat — tuple destructuring
2913    // =========================================================================
2914
2915    #[test]
2916    fn splat2_closure_on_start() {
2917        let mut world = WorldBuilder::new().build();
2918        let r = world.registry_mut();
2919        let mut p = PipelineStart::<(u32, u64)>::new()
2920            .splat()
2921            .then(|a: u32, b: u64| a as u64 + b, r);
2922        assert_eq!(p.run(&mut world, (3, 7)), 10);
2923    }
2924
2925    #[test]
2926    fn splat2_named_fn_with_param() {
2927        let mut wb = WorldBuilder::new();
2928        wb.register::<u64>(100);
2929        let mut world = wb.build();
2930
2931        fn process(base: Res<u64>, a: u32, b: u32) -> u64 {
2932            *base + a as u64 + b as u64
2933        }
2934
2935        let r = world.registry_mut();
2936        let mut p = PipelineStart::<(u32, u32)>::new().splat().then(process, r);
2937        assert_eq!(p.run(&mut world, (3, 7)), 110);
2938    }
2939
2940    #[test]
2941    fn splat2_mid_chain() {
2942        let mut world = WorldBuilder::new().build();
2943        let r = world.registry_mut();
2944        let mut p = PipelineStart::<u32>::new()
2945            .then(|x: u32| (x, x * 2), r)
2946            .splat()
2947            .then(|a: u32, b: u32| a as u64 + b as u64, r);
2948        assert_eq!(p.run(&mut world, 5), 15); // 5 + 10
2949    }
2950
2951    #[test]
2952    fn splat3_closure_on_start() {
2953        let mut world = WorldBuilder::new().build();
2954        let r = world.registry_mut();
2955        let mut p = PipelineStart::<(u32, u32, u32)>::new()
2956            .splat()
2957            .then(|a: u32, b: u32, c: u32| a + b + c, r);
2958        assert_eq!(p.run(&mut world, (1, 2, 3)), 6);
2959    }
2960
2961    #[test]
2962    fn splat3_named_fn_with_param() {
2963        let mut wb = WorldBuilder::new();
2964        wb.register::<u64>(10);
2965        let mut world = wb.build();
2966
2967        fn process(factor: Res<u64>, a: u32, b: u32, c: u32) -> u64 {
2968            *factor * (a + b + c) as u64
2969        }
2970
2971        let r = world.registry_mut();
2972        let mut p = PipelineStart::<(u32, u32, u32)>::new()
2973            .splat()
2974            .then(process, r);
2975        assert_eq!(p.run(&mut world, (1, 2, 3)), 60);
2976    }
2977
2978    #[test]
2979    fn splat4_mid_chain() {
2980        let mut world = WorldBuilder::new().build();
2981        let r = world.registry_mut();
2982        let mut p = PipelineStart::<u32>::new()
2983            .then(|x: u32| (x, x + 1, x + 2, x + 3), r)
2984            .splat()
2985            .then(|a: u32, b: u32, c: u32, d: u32| (a + b + c + d) as u64, r);
2986        assert_eq!(p.run(&mut world, 10), 46); // 10+11+12+13
2987    }
2988
2989    #[test]
2990    fn splat5_closure_on_start() {
2991        let mut world = WorldBuilder::new().build();
2992        let r = world.registry_mut();
2993        let mut p = PipelineStart::<(u8, u8, u8, u8, u8)>::new().splat().then(
2994            |a: u8, b: u8, c: u8, d: u8, e: u8| {
2995                (a as u64) + (b as u64) + (c as u64) + (d as u64) + (e as u64)
2996            },
2997            r,
2998        );
2999        assert_eq!(p.run(&mut world, (1, 2, 3, 4, 5)), 15);
3000    }
3001
3002    #[test]
3003    fn splat_build_into_handler() {
3004        let mut wb = WorldBuilder::new();
3005        wb.register::<u64>(0);
3006        let mut world = wb.build();
3007
3008        fn store(mut out: ResMut<u64>, a: u32, b: u32) {
3009            *out = a as u64 + b as u64;
3010        }
3011
3012        let r = world.registry_mut();
3013        let mut pipeline = PipelineStart::<(u32, u32)>::new()
3014            .splat()
3015            .then(store, r)
3016            .build();
3017
3018        pipeline.run(&mut world, (3, 7));
3019        assert_eq!(*world.resource::<u64>(), 10);
3020    }
3021
3022    #[test]
3023    fn splat_build_batch() {
3024        let mut wb = WorldBuilder::new();
3025        wb.register::<u64>(0);
3026        let mut world = wb.build();
3027
3028        fn accumulate(mut sum: ResMut<u64>, a: u32, b: u32) {
3029            *sum += a as u64 + b as u64;
3030        }
3031
3032        let r = world.registry_mut();
3033        let mut batch = PipelineStart::<(u32, u32)>::new()
3034            .splat()
3035            .then(accumulate, r)
3036            .build_batch(8);
3037
3038        batch
3039            .input_mut()
3040            .extend_from_slice(&[(1, 2), (3, 4), (5, 6)]);
3041        batch.run(&mut world);
3042        assert_eq!(*world.resource::<u64>(), 21); // 3+7+11
3043    }
3044
3045    #[test]
3046    #[should_panic]
3047    fn splat_access_conflict_detected() {
3048        let mut wb = WorldBuilder::new();
3049        wb.register::<u64>(0);
3050        let mut world = wb.build();
3051
3052        fn bad(a: ResMut<u64>, _b: ResMut<u64>, _x: u32, _y: u32) {
3053            let _ = a;
3054        }
3055
3056        let r = world.registry_mut();
3057        // Should panic on duplicate ResMut<u64>
3058        let _ = PipelineStart::<(u32, u32)>::new().splat().then(bad, r);
3059    }
3060
3061    // -- Switch combinator --
3062
3063    #[test]
3064    fn pipeline_switch_basic() {
3065        fn double(x: u32) -> u64 {
3066            x as u64 * 2
3067        }
3068        fn sink(mut out: ResMut<u64>, val: u64) {
3069            *out = val;
3070        }
3071
3072        let mut wb = WorldBuilder::new();
3073        wb.register::<u64>(0);
3074        let mut world = wb.build();
3075        let reg = world.registry();
3076
3077        let mut pipeline = PipelineStart::<u32>::new()
3078            .then(double, reg)
3079            .switch(|_world, val| if val > 10 { val * 100 } else { val + 1 })
3080            .then(sink, reg)
3081            .build();
3082
3083        pipeline.run(&mut world, 10u32); // 20 > 10 → 2000
3084        assert_eq!(*world.resource::<u64>(), 2000);
3085
3086        pipeline.run(&mut world, 3u32); // 6 <= 10 → 7
3087        assert_eq!(*world.resource::<u64>(), 7);
3088    }
3089
3090    #[test]
3091    fn pipeline_switch_3_way() {
3092        fn sink(mut out: ResMut<u64>, val: u64) {
3093            *out = val;
3094        }
3095
3096        let mut wb = WorldBuilder::new();
3097        wb.register::<u64>(0);
3098        let mut world = wb.build();
3099        let reg = world.registry();
3100
3101        let mut pipeline = PipelineStart::<u32>::new()
3102            .switch(|_world, val| match val % 3 {
3103                0 => val as u64 + 100,
3104                1 => val as u64 + 200,
3105                _ => val as u64 + 300,
3106            })
3107            .then(sink, reg)
3108            .build();
3109
3110        pipeline.run(&mut world, 6u32); // 6 % 3 == 0 → 106
3111        assert_eq!(*world.resource::<u64>(), 106);
3112
3113        pipeline.run(&mut world, 7u32); // 7 % 3 == 1 → 207
3114        assert_eq!(*world.resource::<u64>(), 207);
3115
3116        pipeline.run(&mut world, 8u32); // 8 % 3 == 2 → 308
3117        assert_eq!(*world.resource::<u64>(), 308);
3118    }
3119
3120    #[test]
3121    fn pipeline_switch_with_resolve_step() {
3122        fn add_offset(offset: Res<i64>, val: u32) -> u64 {
3123            (*offset + val as i64) as u64
3124        }
3125        fn plain_double(val: u32) -> u64 {
3126            val as u64 * 2
3127        }
3128        fn sink(mut out: ResMut<u64>, val: u64) {
3129            *out = val;
3130        }
3131
3132        let mut wb = WorldBuilder::new();
3133        wb.register::<u64>(0);
3134        wb.register::<i64>(100);
3135        let mut world = wb.build();
3136        let reg = world.registry();
3137
3138        let mut arm_offset = resolve_step(add_offset, reg);
3139        let mut arm_double = resolve_step(plain_double, reg);
3140
3141        let mut pipeline = PipelineStart::<u32>::new()
3142            .switch(move |world, val| {
3143                if val > 10 {
3144                    arm_offset(world, val)
3145                } else {
3146                    arm_double(world, val)
3147                }
3148            })
3149            .then(sink, reg)
3150            .build();
3151
3152        pipeline.run(&mut world, 20u32); // > 10 → offset → 100 + 20 = 120
3153        assert_eq!(*world.resource::<u64>(), 120);
3154
3155        pipeline.run(&mut world, 5u32); // <= 10 → double → 10
3156        assert_eq!(*world.resource::<u64>(), 10);
3157    }
3158
3159    #[test]
3160    fn batch_pipeline_switch() {
3161        fn sink(mut out: ResMut<u64>, val: u64) {
3162            *out += val;
3163        }
3164
3165        let mut wb = WorldBuilder::new();
3166        wb.register::<u64>(0);
3167        let mut world = wb.build();
3168        let reg = world.registry();
3169
3170        let mut batch = PipelineStart::<u32>::new()
3171            .switch(|_world, val| {
3172                if val % 2 == 0 {
3173                    val as u64 * 10
3174                } else {
3175                    val as u64
3176                }
3177            })
3178            .then(sink, reg)
3179            .build_batch(8);
3180
3181        batch.input_mut().extend([1, 2, 3, 4]);
3182        batch.run(&mut world);
3183
3184        // 1 → 1, 2 → 20, 3 → 3, 4 → 40 = 64
3185        assert_eq!(*world.resource::<u64>(), 64);
3186    }
3187}