Skip to main content

nexus_rt/
pipeline.rs

1// Builder return types are necessarily complex — each combinator returns
2// PipelineBuilder<In, Out, impl FnMut(...)>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4
5//! Pre-resolved pipeline dispatch using [`SystemParam`] stages.
6//!
7//! [`PipelineStart`] begins a typed composition chain where each stage
8//! is a named function with [`SystemParam`] dependencies resolved at build
9//! time. The result is a monomorphized closure chain where dispatch-time
10//! resource access is ~3 cycles per fetch (pre-resolved [`ResourceId`](crate::ResourceId)),
11//! not a HashMap lookup.
12//!
13//! Two dispatch tiers in nexus-rt:
14//! 1. **Pipeline** — static after build, pre-resolved, the workhorse
15//! 2. **Callback** — dynamic registration with per-instance context
16//!
17//! # Stage function convention
18//!
19//! SystemParams first, stage input last, returns output:
20//!
21//! ```ignore
22//! fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
23//! fn enrich(cache: Res<MarketData>, order: ValidOrder) -> EnrichedOrder { .. }
24//! fn submit(mut gw: ResMut<Gateway>, order: CheckedOrder) { gw.send(order); }
25//! ```
26//!
27//! # Combinator split
28//!
29//! **IntoStage-based (pre-resolved, hot path):**
30//! `.stage()`, `.map()`, `.and_then()`, `.catch()`
31//!
32//! **Closure-based (cold path, `&mut World`):**
33//! `.on_none()`, `.inspect()`, `.inspect_err()`, `.filter()`, `.ok()`,
34//! `.unwrap_or()`, `.unwrap_or_else()`, `.map_err()`, `.or_else()`
35
36use std::marker::PhantomData;
37
38use crate::system::SystemParam;
39use crate::world::{Registry, World};
40
41// =============================================================================
42// Stage — pre-resolved stage with SystemParam state
43// =============================================================================
44
45/// Internal: pre-resolved stage with cached SystemParam state.
46///
47/// Users don't construct this directly — it's produced by [`IntoStage`] and
48/// captured inside pipeline chain closures.
49#[doc(hidden)]
50pub struct Stage<F, Params: SystemParam> {
51    f: F,
52    state: Params::State,
53    #[allow(dead_code)]
54    name: &'static str,
55}
56
57// =============================================================================
58// StageCall — callable trait for resolved stages
59// =============================================================================
60
61/// Internal: callable trait for resolved stages.
62///
63/// Used as a bound on [`IntoStage::Stage`]. Users don't implement this.
64#[doc(hidden)]
65pub trait StageCall<In, Out> {
66    /// Call this stage with a world reference and input value.
67    fn call(&mut self, world: &mut World, input: In) -> Out;
68}
69
70// =============================================================================
71// IntoStage — converts a named function into a resolved stage
72// =============================================================================
73
74/// Converts a named function into a pre-resolved pipeline stage.
75///
76/// SystemParams first, stage input last, returns output. Arity 0 (no
77/// SystemParams) supports closures. Arities 1+ require named functions
78/// (same HRTB+GAT limitation as [`IntoHandler`](crate::IntoHandler)).
79///
80/// # Examples
81///
82/// ```ignore
83/// // Arity 0 — closure works
84/// let stage = (|x: u32| x * 2).into_stage(registry);
85///
86/// // Arity 1 — named function required
87/// fn validate(config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
88/// let stage = validate.into_stage(registry);
89/// ```
90pub trait IntoStage<In, Out, Params> {
91    /// The concrete resolved stage type.
92    type Stage: StageCall<In, Out>;
93
94    /// Resolve SystemParam state from the registry and produce a stage.
95    fn into_stage(self, registry: &mut Registry) -> Self::Stage;
96}
97
98// =============================================================================
99// Arity 0 — fn(In) -> Out — closures work (no HRTB+GAT issues)
100// =============================================================================
101
102impl<In, Out, F: FnMut(In) -> Out + 'static> StageCall<In, Out> for Stage<F, ()> {
103    #[inline(always)]
104    fn call(&mut self, _world: &mut World, input: In) -> Out {
105        (self.f)(input)
106    }
107}
108
109impl<In, Out, F: FnMut(In) -> Out + 'static> IntoStage<In, Out, ()> for F {
110    type Stage = Stage<F, ()>;
111
112    fn into_stage(self, registry: &mut Registry) -> Self::Stage {
113        Stage {
114            f: self,
115            state: <() as SystemParam>::init(registry),
116            name: std::any::type_name::<F>(),
117        }
118    }
119}
120
121// =============================================================================
122// Arities 1-8 via macro — HRTB with -> Out
123// =============================================================================
124
125macro_rules! impl_into_stage {
126    ($($P:ident),+) => {
127        impl<In, Out, F: 'static, $($P: SystemParam + 'static),+>
128            StageCall<In, Out> for Stage<F, ($($P,)+)>
129        where
130            for<'a> &'a mut F:
131                FnMut($($P,)+ In) -> Out +
132                FnMut($($P::Item<'a>,)+ In) -> Out,
133        {
134            #[inline(always)]
135            #[allow(non_snake_case)]
136            fn call(&mut self, world: &mut World, input: In) -> Out {
137                #[allow(clippy::too_many_arguments)]
138                fn call_inner<$($P,)+ Input, Output>(
139                    mut f: impl FnMut($($P,)+ Input) -> Output,
140                    $($P: $P,)+
141                    input: Input,
142                ) -> Output {
143                    f($($P,)+ input)
144                }
145
146                // SAFETY: state was produced by init() on the same registry
147                // that built this world. Single-threaded sequential dispatch
148                // ensures no mutable aliasing across params.
149                let ($($P,)+) = unsafe {
150                    <($($P,)+) as SystemParam>::fetch(world, &mut self.state)
151                };
152                call_inner(&mut self.f, $($P,)+ input)
153            }
154        }
155
156        impl<In, Out, F: 'static, $($P: SystemParam + 'static),+>
157            IntoStage<In, Out, ($($P,)+)> for F
158        where
159            for<'a> &'a mut F:
160                FnMut($($P,)+ In) -> Out +
161                FnMut($($P::Item<'a>,)+ In) -> Out,
162        {
163            type Stage = Stage<F, ($($P,)+)>;
164
165            fn into_stage(self, registry: &mut Registry) -> Self::Stage {
166                let state = <($($P,)+) as SystemParam>::init(registry);
167                {
168                    #[allow(non_snake_case)]
169                    let ($($P,)+) = &state;
170                    registry.check_access(&[
171                        $(
172                            (<$P as SystemParam>::resource_id($P),
173                             std::any::type_name::<$P>()),
174                        )+
175                    ]);
176                }
177                Stage { f: self, state, name: std::any::type_name::<F>() }
178            }
179        }
180    };
181}
182
183macro_rules! all_tuples {
184    ($m:ident) => {
185        $m!(P0);
186        $m!(P0, P1);
187        $m!(P0, P1, P2);
188        $m!(P0, P1, P2, P3);
189        $m!(P0, P1, P2, P3, P4);
190        $m!(P0, P1, P2, P3, P4, P5);
191        $m!(P0, P1, P2, P3, P4, P5, P6);
192        $m!(P0, P1, P2, P3, P4, P5, P6, P7);
193    };
194}
195
196all_tuples!(impl_into_stage);
197
198// =============================================================================
199// PipelineStart — entry point
200// =============================================================================
201
202/// Entry point for building a pre-resolved stage pipeline.
203///
204/// `In` is the pipeline input type. Call [`.stage()`](Self::stage) to add
205/// the first stage — a named function whose [`SystemParam`] dependencies
206/// are resolved from the registry at build time.
207///
208/// # Examples
209///
210/// ```
211/// use nexus_rt::{WorldBuilder, Res, ResMut, PipelineStart, Handler};
212///
213/// let mut wb = WorldBuilder::new();
214/// wb.register::<u64>(10);
215/// wb.register::<String>(String::new());
216/// let mut world = wb.build();
217///
218/// fn double(factor: Res<u64>, x: u32) -> u64 {
219///     (*factor) * x as u64
220/// }
221/// fn store(mut out: ResMut<String>, val: u64) {
222///     *out = val.to_string();
223/// }
224///
225/// let r = world.registry_mut();
226/// let mut pipeline = PipelineStart::<u32>::new()
227///     .stage(double, r)
228///     .stage(store, r)
229///     .build();
230///
231/// pipeline.run(&mut world, 5);
232/// assert_eq!(world.resource::<String>().as_str(), "50");
233/// ```
234pub struct PipelineStart<In>(PhantomData<fn(In)>);
235
236impl<In> PipelineStart<In> {
237    /// Create a new stage pipeline entry point.
238    pub fn new() -> Self {
239        Self(PhantomData)
240    }
241
242    /// Add the first stage. SystemParams resolved from the registry.
243    pub fn stage<Out, Params, S: IntoStage<In, Out, Params>>(
244        self,
245        f: S,
246        registry: &mut Registry,
247    ) -> PipelineBuilder<In, Out, impl FnMut(&mut World, In) -> Out + use<In, Out, Params, S>> {
248        let mut resolved = f.into_stage(registry);
249        PipelineBuilder {
250            chain: move |world: &mut World, input: In| resolved.call(world, input),
251            _marker: PhantomData,
252        }
253    }
254}
255
256impl<In> Default for PipelineStart<In> {
257    fn default() -> Self {
258        Self::new()
259    }
260}
261
262// =============================================================================
263// PipelineBuilder — typestate builder
264// =============================================================================
265
266/// Builder that composes pre-resolved pipeline stages via closure nesting.
267///
268/// `In` is the pipeline's input type (fixed). `Out` is the current output.
269/// `Chain` is the concrete composed closure type (opaque, never named by users).
270///
271/// Each combinator consumes `self`, captures the previous chain in a new
272/// closure, and returns a new `PipelineBuilder`. The compiler
273/// monomorphizes the entire chain — zero virtual dispatch through stages.
274///
275/// IntoStage-based methods (`.stage()`, `.map()`, `.and_then()`, `.catch()`)
276/// take `&Registry` to resolve SystemParam state at build time. Closure-based
277/// methods don't need the registry.
278pub struct PipelineBuilder<In, Out, Chain> {
279    chain: Chain,
280    _marker: PhantomData<fn(In) -> Out>,
281}
282
283// =============================================================================
284// Core — any Out
285// =============================================================================
286
287impl<In, Out, Chain> PipelineBuilder<In, Out, Chain>
288where
289    Chain: FnMut(&mut World, In) -> Out,
290{
291    /// Add a stage. SystemParams resolved from the registry.
292    pub fn stage<NewOut, Params, S: IntoStage<Out, NewOut, Params>>(
293        self,
294        f: S,
295        registry: &mut Registry,
296    ) -> PipelineBuilder<
297        In,
298        NewOut,
299        impl FnMut(&mut World, In) -> NewOut + use<In, Out, NewOut, Params, Chain, S>,
300    > {
301        let mut chain = self.chain;
302        let mut resolved = f.into_stage(registry);
303        PipelineBuilder {
304            chain: move |world: &mut World, input: In| {
305                let out = chain(world, input);
306                resolved.call(world, out)
307            },
308            _marker: PhantomData,
309        }
310    }
311
312    /// Run the pipeline directly. No boxing, no `'static` on `In`.
313    pub fn run(&mut self, world: &mut World, input: In) -> Out {
314        (self.chain)(world, input)
315    }
316}
317
318// =============================================================================
319// Option helpers — PipelineBuilder<In, Option<T>, Chain>
320// =============================================================================
321
322impl<In, T, Chain> PipelineBuilder<In, Option<T>, Chain>
323where
324    Chain: FnMut(&mut World, In) -> Option<T>,
325{
326    // -- IntoStage-based (hot path) -------------------------------------------
327
328    /// Transform the inner value. Stage not called on None.
329    pub fn map<U, Params, S: IntoStage<T, U, Params>>(
330        self,
331        f: S,
332        registry: &mut Registry,
333    ) -> PipelineBuilder<
334        In,
335        Option<U>,
336        impl FnMut(&mut World, In) -> Option<U> + use<In, T, U, Params, Chain, S>,
337    > {
338        let mut chain = self.chain;
339        let mut resolved = f.into_stage(registry);
340        PipelineBuilder {
341            chain: move |world: &mut World, input: In| {
342                chain(world, input).map(|val| resolved.call(world, val))
343            },
344            _marker: PhantomData,
345        }
346    }
347
348    /// Short-circuits on None. std: `Option::and_then`
349    pub fn and_then<U, Params, S: IntoStage<T, Option<U>, Params>>(
350        self,
351        f: S,
352        registry: &mut Registry,
353    ) -> PipelineBuilder<
354        In,
355        Option<U>,
356        impl FnMut(&mut World, In) -> Option<U> + use<In, T, U, Params, Chain, S>,
357    > {
358        let mut chain = self.chain;
359        let mut resolved = f.into_stage(registry);
360        PipelineBuilder {
361            chain: move |world: &mut World, input: In| {
362                chain(world, input).and_then(|val| resolved.call(world, val))
363            },
364            _marker: PhantomData,
365        }
366    }
367
368    // -- Closure-based (cold path, &mut World) --------------------------------
369
370    /// Side effect on None. Complement to [`inspect`](Self::inspect).
371    pub fn on_none(
372        self,
373        mut f: impl FnMut(&mut World) + 'static,
374    ) -> PipelineBuilder<In, Option<T>, impl FnMut(&mut World, In) -> Option<T>> {
375        let mut chain = self.chain;
376        PipelineBuilder {
377            chain: move |world: &mut World, input: In| {
378                let result = chain(world, input);
379                if result.is_none() {
380                    f(world);
381                }
382                result
383            },
384            _marker: PhantomData,
385        }
386    }
387
388    /// Keep value if predicate holds. std: `Option::filter`
389    pub fn filter(
390        self,
391        mut f: impl FnMut(&mut World, &T) -> bool + 'static,
392    ) -> PipelineBuilder<In, Option<T>, impl FnMut(&mut World, In) -> Option<T>> {
393        let mut chain = self.chain;
394        PipelineBuilder {
395            chain: move |world: &mut World, input: In| {
396                chain(world, input).filter(|val| f(world, val))
397            },
398            _marker: PhantomData,
399        }
400    }
401
402    /// Side effect on Some value. std: `Option::inspect`
403    pub fn inspect(
404        self,
405        mut f: impl FnMut(&mut World, &T) + 'static,
406    ) -> PipelineBuilder<In, Option<T>, impl FnMut(&mut World, In) -> Option<T>> {
407        let mut chain = self.chain;
408        PipelineBuilder {
409            chain: move |world: &mut World, input: In| {
410                chain(world, input).inspect(|val| f(world, val))
411            },
412            _marker: PhantomData,
413        }
414    }
415
416    /// None becomes Err(err). std: `Option::ok_or`
417    pub fn ok_or<E: Clone + 'static>(
418        self,
419        err: E,
420    ) -> PipelineBuilder<In, Result<T, E>, impl FnMut(&mut World, In) -> Result<T, E>> {
421        let mut chain = self.chain;
422        PipelineBuilder {
423            chain: move |world: &mut World, input: In| {
424                chain(world, input).ok_or_else(|| err.clone())
425            },
426            _marker: PhantomData,
427        }
428    }
429
430    /// None becomes Err(f()). std: `Option::ok_or_else`
431    pub fn ok_or_else<E>(
432        self,
433        mut f: impl FnMut(&mut World) -> E + 'static,
434    ) -> PipelineBuilder<In, Result<T, E>, impl FnMut(&mut World, In) -> Result<T, E>> {
435        let mut chain = self.chain;
436        PipelineBuilder {
437            chain: move |world: &mut World, input: In| chain(world, input).ok_or_else(|| f(world)),
438            _marker: PhantomData,
439        }
440    }
441
442    /// Exit Option — None becomes the default value.
443    pub fn unwrap_or(self, default: T) -> PipelineBuilder<In, T, impl FnMut(&mut World, In) -> T>
444    where
445        T: Clone + 'static,
446    {
447        let mut chain = self.chain;
448        PipelineBuilder {
449            chain: move |world: &mut World, input: In| {
450                chain(world, input).unwrap_or_else(|| default.clone())
451            },
452            _marker: PhantomData,
453        }
454    }
455
456    /// Exit Option — None becomes `f()`.
457    pub fn unwrap_or_else(
458        self,
459        mut f: impl FnMut(&mut World) -> T + 'static,
460    ) -> PipelineBuilder<In, T, impl FnMut(&mut World, In) -> T> {
461        let mut chain = self.chain;
462        PipelineBuilder {
463            chain: move |world: &mut World, input: In| {
464                chain(world, input).unwrap_or_else(|| f(world))
465            },
466            _marker: PhantomData,
467        }
468    }
469}
470
471// =============================================================================
472// Result helpers — PipelineBuilder<In, Result<T, E>, Chain>
473// =============================================================================
474
475impl<In, T, E, Chain> PipelineBuilder<In, Result<T, E>, Chain>
476where
477    Chain: FnMut(&mut World, In) -> Result<T, E>,
478{
479    // -- IntoStage-based (hot path) -------------------------------------------
480
481    /// Transform the Ok value. Stage not called on Err.
482    pub fn map<U, Params, S: IntoStage<T, U, Params>>(
483        self,
484        f: S,
485        registry: &mut Registry,
486    ) -> PipelineBuilder<
487        In,
488        Result<U, E>,
489        impl FnMut(&mut World, In) -> Result<U, E> + use<In, T, E, U, Params, Chain, S>,
490    > {
491        let mut chain = self.chain;
492        let mut resolved = f.into_stage(registry);
493        PipelineBuilder {
494            chain: move |world: &mut World, input: In| {
495                chain(world, input).map(|val| resolved.call(world, val))
496            },
497            _marker: PhantomData,
498        }
499    }
500
501    /// Short-circuits on Err. std: `Result::and_then`
502    pub fn and_then<U, Params, S: IntoStage<T, Result<U, E>, Params>>(
503        self,
504        f: S,
505        registry: &mut Registry,
506    ) -> PipelineBuilder<
507        In,
508        Result<U, E>,
509        impl FnMut(&mut World, In) -> Result<U, E> + use<In, T, E, U, Params, Chain, S>,
510    > {
511        let mut chain = self.chain;
512        let mut resolved = f.into_stage(registry);
513        PipelineBuilder {
514            chain: move |world: &mut World, input: In| {
515                chain(world, input).and_then(|val| resolved.call(world, val))
516            },
517            _marker: PhantomData,
518        }
519    }
520
521    /// Handle error and transition to Option.
522    ///
523    /// `Ok(val)` becomes `Some(val)` — handler not called.
524    /// `Err(err)` calls the handler, then produces `None`.
525    pub fn catch<Params, S: IntoStage<E, (), Params>>(
526        self,
527        f: S,
528        registry: &mut Registry,
529    ) -> PipelineBuilder<
530        In,
531        Option<T>,
532        impl FnMut(&mut World, In) -> Option<T> + use<In, T, E, Params, Chain, S>,
533    > {
534        let mut chain = self.chain;
535        let mut resolved = f.into_stage(registry);
536        PipelineBuilder {
537            chain: move |world: &mut World, input: In| match chain(world, input) {
538                Ok(val) => Some(val),
539                Err(err) => {
540                    resolved.call(world, err);
541                    None
542                }
543            },
544            _marker: PhantomData,
545        }
546    }
547
548    // -- Closure-based (cold path, &mut World) --------------------------------
549
550    /// Transform the error. std: `Result::map_err`
551    pub fn map_err<E2>(
552        self,
553        mut f: impl FnMut(&mut World, E) -> E2 + 'static,
554    ) -> PipelineBuilder<In, Result<T, E2>, impl FnMut(&mut World, In) -> Result<T, E2>> {
555        let mut chain = self.chain;
556        PipelineBuilder {
557            chain: move |world: &mut World, input: In| {
558                chain(world, input).map_err(|err| f(world, err))
559            },
560            _marker: PhantomData,
561        }
562    }
563
564    /// Recover from Err. std: `Result::or_else`
565    pub fn or_else<E2>(
566        self,
567        mut f: impl FnMut(&mut World, E) -> Result<T, E2> + 'static,
568    ) -> PipelineBuilder<In, Result<T, E2>, impl FnMut(&mut World, In) -> Result<T, E2>> {
569        let mut chain = self.chain;
570        PipelineBuilder {
571            chain: move |world: &mut World, input: In| {
572                chain(world, input).or_else(|err| f(world, err))
573            },
574            _marker: PhantomData,
575        }
576    }
577
578    /// Side effect on Ok. std: `Result::inspect`
579    pub fn inspect(
580        self,
581        mut f: impl FnMut(&mut World, &T) + 'static,
582    ) -> PipelineBuilder<In, Result<T, E>, impl FnMut(&mut World, In) -> Result<T, E>> {
583        let mut chain = self.chain;
584        PipelineBuilder {
585            chain: move |world: &mut World, input: In| {
586                chain(world, input).inspect(|val| f(world, val))
587            },
588            _marker: PhantomData,
589        }
590    }
591
592    /// Side effect on Err. std: `Result::inspect_err`
593    pub fn inspect_err(
594        self,
595        mut f: impl FnMut(&mut World, &E) + 'static,
596    ) -> PipelineBuilder<In, Result<T, E>, impl FnMut(&mut World, In) -> Result<T, E>> {
597        let mut chain = self.chain;
598        PipelineBuilder {
599            chain: move |world: &mut World, input: In| {
600                chain(world, input).inspect_err(|err| f(world, err))
601            },
602            _marker: PhantomData,
603        }
604    }
605
606    /// Discard error, enter Option land. std: `Result::ok`
607    pub fn ok(self) -> PipelineBuilder<In, Option<T>, impl FnMut(&mut World, In) -> Option<T>> {
608        let mut chain = self.chain;
609        PipelineBuilder {
610            chain: move |world: &mut World, input: In| chain(world, input).ok(),
611            _marker: PhantomData,
612        }
613    }
614
615    /// Exit Result — Err becomes the default value.
616    pub fn unwrap_or(self, default: T) -> PipelineBuilder<In, T, impl FnMut(&mut World, In) -> T>
617    where
618        T: Clone + 'static,
619    {
620        let mut chain = self.chain;
621        PipelineBuilder {
622            chain: move |world: &mut World, input: In| {
623                chain(world, input).unwrap_or_else(|_| default.clone())
624            },
625            _marker: PhantomData,
626        }
627    }
628
629    /// Exit Result — Err becomes `f(err)`.
630    pub fn unwrap_or_else(
631        self,
632        mut f: impl FnMut(&mut World, E) -> T + 'static,
633    ) -> PipelineBuilder<In, T, impl FnMut(&mut World, In) -> T> {
634        let mut chain = self.chain;
635        PipelineBuilder {
636            chain: move |world: &mut World, input: In| match chain(world, input) {
637                Ok(val) => val,
638                Err(err) => f(world, err),
639            },
640            _marker: PhantomData,
641        }
642    }
643}
644
645// =============================================================================
646// PipelineOutput — marker trait for build()
647// =============================================================================
648
649/// Marker trait restricting [`PipelineBuilder::build`] to pipelines
650/// that produce `()`.
651///
652/// If your pipeline produces a value, add a final `.stage()` that
653/// writes it somewhere (e.g. `ResMut<T>`).
654#[diagnostic::on_unimplemented(
655    message = "`build()` requires the stage pipeline output to be `()`",
656    label = "this pipeline produces `{Self}`, not `()`",
657    note = "add a final `.stage()` that consumes the output"
658)]
659pub trait PipelineOutput {}
660impl PipelineOutput for () {}
661impl PipelineOutput for Option<()> {}
662
663// =============================================================================
664// build — when Out = ()
665// =============================================================================
666
667impl<In: 'static, Chain> PipelineBuilder<In, (), Chain>
668where
669    Chain: FnMut(&mut World, In) + 'static,
670{
671    /// Finalize the pipeline into a [`Pipeline`].
672    ///
673    /// The returned pipeline is a concrete, monomorphized type — no boxing,
674    /// no virtual dispatch. Call `.run()` directly for zero-cost execution,
675    /// or wrap in `Box<dyn Handler<In>>` when type erasure is needed.
676    ///
677    /// Only available when the pipeline ends with `()`. If your chain
678    /// produces a value, add a final `.stage()` that consumes the output.
679    pub fn build(self) -> Pipeline<In, Chain> {
680        Pipeline {
681            chain: self.chain,
682            _marker: PhantomData,
683        }
684    }
685}
686
687// =============================================================================
688// build_batch — when Out: PipelineOutput (() or Option<()>)
689// =============================================================================
690
691impl<In: 'static, Out: PipelineOutput, Chain> PipelineBuilder<In, Out, Chain>
692where
693    Chain: FnMut(&mut World, In) -> Out + 'static,
694{
695    /// Finalize into a [`BatchPipeline`] with a pre-allocated input buffer.
696    ///
697    /// Same pipeline chain as [`build`](PipelineBuilder::build), but the
698    /// pipeline owns an input buffer that drivers fill between dispatch
699    /// cycles. Each call to [`BatchPipeline::run`] drains the buffer,
700    /// running every item through the chain independently.
701    ///
702    /// Available when the pipeline ends with `()` or `Option<()>` (e.g.
703    /// after `.catch()` or `.filter()`). Pipelines producing values need
704    /// a final `.stage()` that consumes the output.
705    ///
706    /// `capacity` is the initial allocation — the buffer can grow if needed,
707    /// but sizing it for the expected batch size avoids reallocation.
708    pub fn build_batch(self, capacity: usize) -> BatchPipeline<In, Chain> {
709        BatchPipeline {
710            input: Vec::with_capacity(capacity),
711            chain: self.chain,
712        }
713    }
714}
715
716// =============================================================================
717// Pipeline<In, F> — built pipeline
718// =============================================================================
719
720/// Built stage pipeline implementing [`Handler<In>`](crate::Handler).
721///
722/// Created by [`PipelineBuilder::build`]. The entire pipeline chain is
723/// monomorphized at compile time — no boxing, no virtual dispatch.
724/// Call `.run()` directly for zero-cost execution, or wrap in
725/// `Box<dyn Handler<In>>` when you need type erasure (single box).
726pub struct Pipeline<In, F> {
727    chain: F,
728    _marker: PhantomData<fn(In)>,
729}
730
731impl<In: 'static, F: FnMut(&mut World, In) + 'static> crate::Handler<In> for Pipeline<In, F> {
732    fn run(&mut self, world: &mut World, event: In) {
733        (self.chain)(world, event);
734    }
735}
736
737// =============================================================================
738// BatchPipeline<In, F> — pipeline with owned input buffer
739// =============================================================================
740
741/// Batch pipeline that owns a pre-allocated input buffer.
742///
743/// Created by [`PipelineBuilder::build_batch`]. Each item flows through
744/// the full pipeline chain independently — the same per-item `Option`
745/// and `Result` flow control as [`Pipeline`]. Errors are handled inline
746/// (via `.catch()`, `.unwrap_or()`, etc.) and the batch continues to
747/// the next item. No intermediate buffers between stages.
748///
749/// # Examples
750///
751/// ```
752/// use nexus_rt::{WorldBuilder, Res, ResMut, PipelineStart};
753///
754/// let mut wb = WorldBuilder::new();
755/// wb.register::<u64>(0);
756/// let mut world = wb.build();
757///
758/// fn accumulate(mut sum: ResMut<u64>, x: u32) {
759///     *sum += x as u64;
760/// }
761///
762/// let r = world.registry_mut();
763/// let mut batch = PipelineStart::<u32>::new()
764///     .stage(accumulate, r)
765///     .build_batch(64);
766///
767/// batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
768/// batch.run(&mut world);
769///
770/// assert_eq!(*world.resource::<u64>(), 15);
771/// assert!(batch.input().is_empty());
772/// ```
773pub struct BatchPipeline<In, F> {
774    input: Vec<In>,
775    chain: F,
776}
777
778impl<In, Out: PipelineOutput, F: FnMut(&mut World, In) -> Out> BatchPipeline<In, F> {
779    /// Mutable access to the input buffer. Drivers fill this between
780    /// dispatch cycles.
781    pub fn input_mut(&mut self) -> &mut Vec<In> {
782        &mut self.input
783    }
784
785    /// Read-only access to the input buffer.
786    pub fn input(&self) -> &[In] {
787        &self.input
788    }
789
790    /// Drain the input buffer, running each item through the pipeline.
791    ///
792    /// Each item gets independent `Option`/`Result` flow control — an
793    /// error on one item does not affect subsequent items. After `run()`,
794    /// the input buffer is empty but retains its allocation.
795    pub fn run(&mut self, world: &mut World) {
796        for item in self.input.drain(..) {
797            let _ = (self.chain)(world, item);
798        }
799    }
800}
801
802// =============================================================================
803// Tests
804// =============================================================================
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809    use crate::{Handler, Local, Res, ResMut, WorldBuilder};
810
811    // =========================================================================
812    // Core dispatch
813    // =========================================================================
814
815    #[test]
816    fn stage_pure_transform() {
817        let mut world = WorldBuilder::new().build();
818        let r = world.registry_mut();
819        let mut p = PipelineStart::<u32>::new().stage(|x: u32| x as u64 * 2, r);
820        assert_eq!(p.run(&mut world, 5), 10u64);
821    }
822
823    #[test]
824    fn stage_one_res() {
825        let mut wb = WorldBuilder::new();
826        wb.register::<u64>(10);
827        let mut world = wb.build();
828
829        fn multiply(factor: Res<u64>, x: u32) -> u64 {
830            *factor * x as u64
831        }
832
833        let r = world.registry_mut();
834        let mut p = PipelineStart::<u32>::new().stage(multiply, r);
835        assert_eq!(p.run(&mut world, 5), 50);
836    }
837
838    #[test]
839    fn stage_one_res_mut() {
840        let mut wb = WorldBuilder::new();
841        wb.register::<u64>(0);
842        let mut world = wb.build();
843
844        fn accumulate(mut total: ResMut<u64>, x: u32) {
845            *total += x as u64;
846        }
847
848        let r = world.registry_mut();
849        let mut p = PipelineStart::<u32>::new().stage(accumulate, r);
850        p.run(&mut world, 10);
851        p.run(&mut world, 5);
852        assert_eq!(*world.resource::<u64>(), 15);
853    }
854
855    #[test]
856    fn stage_two_params() {
857        let mut wb = WorldBuilder::new();
858        wb.register::<u64>(10);
859        wb.register::<bool>(true);
860        let mut world = wb.build();
861
862        fn conditional(factor: Res<u64>, flag: Res<bool>, x: u32) -> u64 {
863            if *flag { *factor * x as u64 } else { 0 }
864        }
865
866        let r = world.registry_mut();
867        let mut p = PipelineStart::<u32>::new().stage(conditional, r);
868        assert_eq!(p.run(&mut world, 5), 50);
869    }
870
871    #[test]
872    fn stage_chain_two() {
873        let mut wb = WorldBuilder::new();
874        wb.register::<u64>(2);
875        let mut world = wb.build();
876
877        fn double(factor: Res<u64>, x: u32) -> u64 {
878            *factor * x as u64
879        }
880
881        let r = world.registry_mut();
882        let mut p = PipelineStart::<u32>::new()
883            .stage(double, r)
884            .stage(|val: u64| val + 1, r);
885        assert_eq!(p.run(&mut world, 5), 11); // 2*5 + 1
886    }
887
888    // =========================================================================
889    // Option combinators
890    // =========================================================================
891
892    #[test]
893    fn option_map_on_some() {
894        let mut wb = WorldBuilder::new();
895        wb.register::<u64>(10);
896        let mut world = wb.build();
897
898        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
899            *factor + x as u64
900        }
901
902        let r = world.registry_mut();
903        let mut p = PipelineStart::<u32>::new()
904            .stage(|x: u32| -> Option<u32> { Some(x) }, r)
905            .map(add_factor, r);
906        assert_eq!(p.run(&mut world, 5), Some(15));
907    }
908
909    #[test]
910    fn option_map_skips_none() {
911        let mut wb = WorldBuilder::new();
912        wb.register::<bool>(false);
913        let mut world = wb.build();
914
915        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
916            *flag = true;
917            0
918        }
919
920        let r = world.registry_mut();
921        let mut p = PipelineStart::<u32>::new()
922            .stage(|_x: u32| -> Option<u32> { None }, r)
923            .map(mark, r);
924        assert_eq!(p.run(&mut world, 5), None);
925        assert!(!*world.resource::<bool>());
926    }
927
928    #[test]
929    fn option_and_then_chains() {
930        let mut wb = WorldBuilder::new();
931        wb.register::<u64>(10);
932        let mut world = wb.build();
933
934        fn check(min: Res<u64>, x: u32) -> Option<u64> {
935            let val = x as u64;
936            if val > *min { Some(val) } else { None }
937        }
938
939        let r = world.registry_mut();
940        let mut p = PipelineStart::<u32>::new()
941            .stage(|x: u32| Some(x), r)
942            .and_then(check, r);
943        assert_eq!(p.run(&mut world, 20), Some(20));
944    }
945
946    #[test]
947    fn option_and_then_short_circuits() {
948        let mut wb = WorldBuilder::new();
949        wb.register::<u64>(10);
950        let mut world = wb.build();
951
952        fn check(min: Res<u64>, x: u32) -> Option<u64> {
953            let val = x as u64;
954            if val > *min { Some(val) } else { None }
955        }
956
957        let r = world.registry_mut();
958        let mut p = PipelineStart::<u32>::new()
959            .stage(|x: u32| Some(x), r)
960            .and_then(check, r);
961        assert_eq!(p.run(&mut world, 5), None);
962    }
963
964    #[test]
965    fn option_on_none_fires() {
966        let mut wb = WorldBuilder::new();
967        wb.register::<bool>(false);
968        let mut world = wb.build();
969
970        let r = world.registry_mut();
971        let mut p = PipelineStart::<u32>::new()
972            .stage(|_x: u32| -> Option<u32> { None }, r)
973            .on_none(|w| {
974                *w.resource_mut::<bool>() = true;
975            });
976        p.run(&mut world, 0);
977        assert!(*world.resource::<bool>());
978    }
979
980    #[test]
981    fn option_filter_keeps() {
982        let mut world = WorldBuilder::new().build();
983        let r = world.registry_mut();
984        let mut p = PipelineStart::<u32>::new()
985            .stage(|x: u32| Some(x), r)
986            .filter(|_w, x| *x > 3);
987        assert_eq!(p.run(&mut world, 5), Some(5));
988    }
989
990    #[test]
991    fn option_filter_drops() {
992        let mut world = WorldBuilder::new().build();
993        let r = world.registry_mut();
994        let mut p = PipelineStart::<u32>::new()
995            .stage(|x: u32| Some(x), r)
996            .filter(|_w, x| *x > 10);
997        assert_eq!(p.run(&mut world, 5), None);
998    }
999
1000    // =========================================================================
1001    // Result combinators
1002    // =========================================================================
1003
1004    #[test]
1005    fn result_map_on_ok() {
1006        let mut wb = WorldBuilder::new();
1007        wb.register::<u64>(10);
1008        let mut world = wb.build();
1009
1010        fn add_factor(factor: Res<u64>, x: u32) -> u64 {
1011            *factor + x as u64
1012        }
1013
1014        let r = world.registry_mut();
1015        let mut p = PipelineStart::<u32>::new()
1016            .stage(|x: u32| -> Result<u32, String> { Ok(x) }, r)
1017            .map(add_factor, r);
1018        assert_eq!(p.run(&mut world, 5), Ok(15));
1019    }
1020
1021    #[test]
1022    fn result_map_skips_err() {
1023        let mut wb = WorldBuilder::new();
1024        wb.register::<bool>(false);
1025        let mut world = wb.build();
1026
1027        fn mark(mut flag: ResMut<bool>, _x: u32) -> u32 {
1028            *flag = true;
1029            0
1030        }
1031
1032        let r = world.registry_mut();
1033        let mut p = PipelineStart::<u32>::new()
1034            .stage(|_x: u32| -> Result<u32, String> { Err("fail".into()) }, r)
1035            .map(mark, r);
1036        assert!(p.run(&mut world, 5).is_err());
1037        assert!(!*world.resource::<bool>());
1038    }
1039
1040    #[test]
1041    fn result_catch_handles_error() {
1042        let mut wb = WorldBuilder::new();
1043        wb.register::<String>(String::new());
1044        let mut world = wb.build();
1045
1046        fn log_error(mut log: ResMut<String>, err: String) {
1047            *log = err;
1048        }
1049
1050        let r = world.registry_mut();
1051        let mut p = PipelineStart::<u32>::new()
1052            .stage(|_x: u32| -> Result<u32, String> { Err("caught".into()) }, r)
1053            .catch(log_error, r);
1054        assert_eq!(p.run(&mut world, 0), None);
1055        assert_eq!(world.resource::<String>().as_str(), "caught");
1056    }
1057
1058    #[test]
1059    fn result_catch_passes_ok() {
1060        let mut wb = WorldBuilder::new();
1061        wb.register::<String>(String::new());
1062        let mut world = wb.build();
1063
1064        fn log_error(mut log: ResMut<String>, err: String) {
1065            *log = err;
1066        }
1067
1068        let r = world.registry_mut();
1069        let mut p = PipelineStart::<u32>::new()
1070            .stage(|x: u32| -> Result<u32, String> { Ok(x) }, r)
1071            .catch(log_error, r);
1072        assert_eq!(p.run(&mut world, 5), Some(5));
1073        assert!(world.resource::<String>().is_empty());
1074    }
1075
1076    // =========================================================================
1077    // Build + Handler
1078    // =========================================================================
1079
1080    #[test]
1081    fn build_produces_system() {
1082        let mut wb = WorldBuilder::new();
1083        wb.register::<u64>(0);
1084        let mut world = wb.build();
1085
1086        fn accumulate(mut total: ResMut<u64>, x: u32) {
1087            *total += x as u64;
1088        }
1089
1090        let r = world.registry_mut();
1091        let mut pipeline = PipelineStart::<u32>::new().stage(accumulate, r).build();
1092
1093        pipeline.run(&mut world, 10);
1094        pipeline.run(&mut world, 5);
1095        assert_eq!(*world.resource::<u64>(), 15);
1096    }
1097
1098    #[test]
1099    fn run_returns_output() {
1100        let mut wb = WorldBuilder::new();
1101        wb.register::<u64>(3);
1102        let mut world = wb.build();
1103
1104        fn multiply(factor: Res<u64>, x: u32) -> u64 {
1105            *factor * x as u64
1106        }
1107
1108        let r = world.registry_mut();
1109        let mut p = PipelineStart::<u32>::new().stage(multiply, r);
1110        let result: u64 = p.run(&mut world, 7);
1111        assert_eq!(result, 21);
1112    }
1113
1114    // =========================================================================
1115    // Safety
1116    // =========================================================================
1117
1118    #[test]
1119    #[should_panic(expected = "not registered")]
1120    fn panics_on_missing_resource() {
1121        let mut world = WorldBuilder::new().build();
1122
1123        fn needs_u64(_val: Res<u64>, _x: u32) -> u32 {
1124            0
1125        }
1126
1127        let r = world.registry_mut();
1128        let _p = PipelineStart::<u32>::new().stage(needs_u64, r);
1129    }
1130
1131    // =========================================================================
1132    // Access conflict detection
1133    // =========================================================================
1134
1135    #[test]
1136    #[should_panic(expected = "conflicting access")]
1137    fn stage_duplicate_access_panics() {
1138        let mut wb = WorldBuilder::new();
1139        wb.register::<u64>(0);
1140        let mut world = wb.build();
1141
1142        fn bad(a: Res<u64>, b: ResMut<u64>, _x: u32) -> u32 {
1143            let _ = (*a, &*b);
1144            0
1145        }
1146
1147        let r = world.registry_mut();
1148        let _p = PipelineStart::<u32>::new().stage(bad, r);
1149    }
1150
1151    // =========================================================================
1152    // Integration
1153    // =========================================================================
1154
1155    #[test]
1156    fn local_in_stage() {
1157        let mut wb = WorldBuilder::new();
1158        wb.register::<u64>(0);
1159        let mut world = wb.build();
1160
1161        fn count(mut count: Local<u64>, mut total: ResMut<u64>, _x: u32) {
1162            *count += 1;
1163            *total = *count;
1164        }
1165
1166        let r = world.registry_mut();
1167        let mut p = PipelineStart::<u32>::new().stage(count, r);
1168        p.run(&mut world, 0);
1169        p.run(&mut world, 0);
1170        p.run(&mut world, 0);
1171        assert_eq!(*world.resource::<u64>(), 3);
1172    }
1173
1174    // =========================================================================
1175    // Option combinators (extended)
1176    // =========================================================================
1177
1178    #[test]
1179    fn option_unwrap_or_some() {
1180        let mut world = WorldBuilder::new().build();
1181        let r = world.registry_mut();
1182        let mut p = PipelineStart::<u32>::new()
1183            .stage(|x: u32| -> Option<u32> { Some(x) }, r)
1184            .unwrap_or(99);
1185        assert_eq!(p.run(&mut world, 5), 5);
1186    }
1187
1188    #[test]
1189    fn option_unwrap_or_none() {
1190        let mut world = WorldBuilder::new().build();
1191        let r = world.registry_mut();
1192        let mut p = PipelineStart::<u32>::new()
1193            .stage(|_x: u32| -> Option<u32> { None }, r)
1194            .unwrap_or(99);
1195        assert_eq!(p.run(&mut world, 5), 99);
1196    }
1197
1198    #[test]
1199    fn option_unwrap_or_else() {
1200        let mut world = WorldBuilder::new().build();
1201        let r = world.registry_mut();
1202        let mut p = PipelineStart::<u32>::new()
1203            .stage(|_x: u32| -> Option<u32> { None }, r)
1204            .unwrap_or_else(|_w| 42);
1205        assert_eq!(p.run(&mut world, 0), 42);
1206    }
1207
1208    #[test]
1209    fn option_ok_or() {
1210        let mut world = WorldBuilder::new().build();
1211        let r = world.registry_mut();
1212        let mut p = PipelineStart::<u32>::new()
1213            .stage(|_x: u32| -> Option<u32> { None }, r)
1214            .ok_or("missing");
1215        assert_eq!(p.run(&mut world, 0), Err("missing"));
1216    }
1217
1218    #[test]
1219    fn option_ok_or_some() {
1220        let mut world = WorldBuilder::new().build();
1221        let r = world.registry_mut();
1222        let mut p = PipelineStart::<u32>::new()
1223            .stage(|x: u32| -> Option<u32> { Some(x) }, r)
1224            .ok_or("missing");
1225        assert_eq!(p.run(&mut world, 7), Ok(7));
1226    }
1227
1228    #[test]
1229    fn option_ok_or_else() {
1230        let mut world = WorldBuilder::new().build();
1231        let r = world.registry_mut();
1232        let mut p = PipelineStart::<u32>::new()
1233            .stage(|_x: u32| -> Option<u32> { None }, r)
1234            .ok_or_else(|_w| "computed");
1235        assert_eq!(p.run(&mut world, 0), Err("computed"));
1236    }
1237
1238    #[test]
1239    fn option_inspect_passes_through() {
1240        let mut wb = WorldBuilder::new();
1241        wb.register::<u64>(0);
1242        let mut world = wb.build();
1243        let r = world.registry_mut();
1244        let mut p = PipelineStart::<u32>::new()
1245            .stage(|x: u32| -> Option<u32> { Some(x) }, r)
1246            .inspect(|_w, _val| {});
1247        // inspect should pass through the value unchanged.
1248        assert_eq!(p.run(&mut world, 10), Some(10));
1249    }
1250
1251    // =========================================================================
1252    // Result combinators (extended)
1253    // =========================================================================
1254
1255    #[test]
1256    fn result_map_err() {
1257        let mut world = WorldBuilder::new().build();
1258        let r = world.registry_mut();
1259        let mut p = PipelineStart::<u32>::new()
1260            .stage(|_x: u32| -> Result<u32, i32> { Err(-1) }, r)
1261            .map_err(|_w, e| e.to_string());
1262        assert_eq!(p.run(&mut world, 0), Err("-1".to_string()));
1263    }
1264
1265    #[test]
1266    fn result_map_err_ok_passthrough() {
1267        let mut world = WorldBuilder::new().build();
1268        let r = world.registry_mut();
1269        let mut p = PipelineStart::<u32>::new()
1270            .stage(|x: u32| -> Result<u32, i32> { Ok(x) }, r)
1271            .map_err(|_w, e| e.to_string());
1272        assert_eq!(p.run(&mut world, 5), Ok(5));
1273    }
1274
1275    #[test]
1276    fn result_or_else() {
1277        let mut world = WorldBuilder::new().build();
1278        let r = world.registry_mut();
1279        let mut p = PipelineStart::<u32>::new()
1280            .stage(|_x: u32| -> Result<u32, &str> { Err("fail") }, r)
1281            .or_else(|_w, _e| Ok::<u32, &str>(42));
1282        assert_eq!(p.run(&mut world, 0), Ok(42));
1283    }
1284
1285    #[test]
1286    fn result_inspect_passes_through() {
1287        let mut world = WorldBuilder::new().build();
1288        let r = world.registry_mut();
1289        let mut p = PipelineStart::<u32>::new()
1290            .stage(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
1291            .inspect(|_w, _val| {});
1292        // inspect should pass through Ok unchanged.
1293        assert_eq!(p.run(&mut world, 7), Ok(7));
1294    }
1295
1296    #[test]
1297    fn result_inspect_err_passes_through() {
1298        let mut world = WorldBuilder::new().build();
1299        let r = world.registry_mut();
1300        let mut p = PipelineStart::<u32>::new()
1301            .stage(|_x: u32| -> Result<u32, &str> { Err("bad") }, r)
1302            .inspect_err(|_w, _e| {});
1303        // inspect_err should pass through Err unchanged.
1304        assert_eq!(p.run(&mut world, 0), Err("bad"));
1305    }
1306
1307    #[test]
1308    fn result_ok_converts() {
1309        let mut world = WorldBuilder::new().build();
1310        let r = world.registry_mut();
1311        let mut p = PipelineStart::<u32>::new()
1312            .stage(|x: u32| -> Result<u32, &str> { Ok(x) }, r)
1313            .ok();
1314        assert_eq!(p.run(&mut world, 5), Some(5));
1315    }
1316
1317    #[test]
1318    fn result_ok_drops_err() {
1319        let mut world = WorldBuilder::new().build();
1320        let r = world.registry_mut();
1321        let mut p = PipelineStart::<u32>::new()
1322            .stage(|_x: u32| -> Result<u32, &str> { Err("gone") }, r)
1323            .ok();
1324        assert_eq!(p.run(&mut world, 0), None);
1325    }
1326
1327    #[test]
1328    fn result_unwrap_or() {
1329        let mut world = WorldBuilder::new().build();
1330        let r = world.registry_mut();
1331        let mut p = PipelineStart::<u32>::new()
1332            .stage(|_x: u32| -> Result<u32, &str> { Err("x") }, r)
1333            .unwrap_or(99);
1334        assert_eq!(p.run(&mut world, 0), 99);
1335    }
1336
1337    #[test]
1338    fn result_unwrap_or_else() {
1339        let mut world = WorldBuilder::new().build();
1340        let r = world.registry_mut();
1341        let mut p = PipelineStart::<u32>::new()
1342            .stage(|_x: u32| -> Result<u32, i32> { Err(-5) }, r)
1343            .unwrap_or_else(|_w, e| e.unsigned_abs());
1344        assert_eq!(p.run(&mut world, 0), 5);
1345    }
1346
1347    // =========================================================================
1348    // Batch pipeline
1349    // =========================================================================
1350
1351    #[test]
1352    fn batch_accumulates() {
1353        let mut wb = WorldBuilder::new();
1354        wb.register::<u64>(0);
1355        let mut world = wb.build();
1356
1357        fn accumulate(mut sum: ResMut<u64>, x: u32) {
1358            *sum += x as u64;
1359        }
1360
1361        let r = world.registry_mut();
1362        let mut batch = PipelineStart::<u32>::new()
1363            .stage(accumulate, r)
1364            .build_batch(16);
1365
1366        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
1367        batch.run(&mut world);
1368
1369        assert_eq!(*world.resource::<u64>(), 15);
1370        assert!(batch.input().is_empty());
1371    }
1372
1373    #[test]
1374    fn batch_retains_allocation() {
1375        let mut world = WorldBuilder::new().build();
1376        let r = world.registry_mut();
1377        let mut batch = PipelineStart::<u32>::new()
1378            .stage(|_x: u32| {}, r)
1379            .build_batch(64);
1380
1381        batch.input_mut().extend_from_slice(&[1, 2, 3]);
1382        batch.run(&mut world);
1383
1384        assert!(batch.input().is_empty());
1385        assert!(batch.input_mut().capacity() >= 64);
1386    }
1387
1388    #[test]
1389    fn batch_empty_is_noop() {
1390        let mut wb = WorldBuilder::new();
1391        wb.register::<u64>(0);
1392        let mut world = wb.build();
1393
1394        fn accumulate(mut sum: ResMut<u64>, x: u32) {
1395            *sum += x as u64;
1396        }
1397
1398        let r = world.registry_mut();
1399        let mut batch = PipelineStart::<u32>::new()
1400            .stage(accumulate, r)
1401            .build_batch(16);
1402
1403        batch.run(&mut world);
1404        assert_eq!(*world.resource::<u64>(), 0);
1405    }
1406
1407    #[test]
1408    fn batch_catch_continues_on_error() {
1409        let mut wb = WorldBuilder::new();
1410        wb.register::<u64>(0);
1411        wb.register::<u32>(0);
1412        let mut world = wb.build();
1413
1414        fn validate(x: u32) -> Result<u32, &'static str> {
1415            if x > 0 { Ok(x) } else { Err("zero") }
1416        }
1417
1418        fn count_errors(mut errs: ResMut<u32>, _err: &'static str) {
1419            *errs += 1;
1420        }
1421
1422        fn accumulate(mut sum: ResMut<u64>, x: u32) {
1423            *sum += x as u64;
1424        }
1425
1426        let r = world.registry_mut();
1427        let mut batch = PipelineStart::<u32>::new()
1428            .stage(validate, r)
1429            .catch(count_errors, r)
1430            .map(accumulate, r)
1431            .build_batch(16);
1432
1433        // Items: 1, 0 (error), 2, 0 (error), 3
1434        batch.input_mut().extend_from_slice(&[1, 0, 2, 0, 3]);
1435        batch.run(&mut world);
1436
1437        assert_eq!(*world.resource::<u64>(), 6); // 1 + 2 + 3
1438        assert_eq!(*world.resource::<u32>(), 2); // 2 errors
1439    }
1440
1441    #[test]
1442    fn batch_filter_skips_items() {
1443        let mut wb = WorldBuilder::new();
1444        wb.register::<u64>(0);
1445        let mut world = wb.build();
1446
1447        fn accumulate(mut sum: ResMut<u64>, x: u32) {
1448            *sum += x as u64;
1449        }
1450
1451        let r = world.registry_mut();
1452        let mut batch = PipelineStart::<u32>::new()
1453            .stage(
1454                |x: u32| -> Option<u32> { if x > 2 { Some(x) } else { None } },
1455                r,
1456            )
1457            .map(accumulate, r)
1458            .build_batch(16);
1459
1460        batch.input_mut().extend_from_slice(&[1, 2, 3, 4, 5]);
1461        batch.run(&mut world);
1462
1463        assert_eq!(*world.resource::<u64>(), 12); // 3 + 4 + 5
1464    }
1465
1466    #[test]
1467    fn batch_multiple_runs_accumulate() {
1468        let mut wb = WorldBuilder::new();
1469        wb.register::<u64>(0);
1470        let mut world = wb.build();
1471
1472        fn accumulate(mut sum: ResMut<u64>, x: u32) {
1473            *sum += x as u64;
1474        }
1475
1476        let r = world.registry_mut();
1477        let mut batch = PipelineStart::<u32>::new()
1478            .stage(accumulate, r)
1479            .build_batch(16);
1480
1481        batch.input_mut().extend_from_slice(&[1, 2, 3]);
1482        batch.run(&mut world);
1483        assert_eq!(*world.resource::<u64>(), 6);
1484
1485        batch.input_mut().extend_from_slice(&[4, 5]);
1486        batch.run(&mut world);
1487        assert_eq!(*world.resource::<u64>(), 15);
1488    }
1489
1490    #[test]
1491    fn batch_with_world_access() {
1492        let mut wb = WorldBuilder::new();
1493        wb.register::<u64>(10); // multiplier
1494        wb.register::<Vec<u64>>(Vec::new());
1495        let mut world = wb.build();
1496
1497        fn multiply_and_collect(factor: Res<u64>, mut out: ResMut<Vec<u64>>, x: u32) {
1498            out.push(x as u64 * *factor);
1499        }
1500
1501        let r = world.registry_mut();
1502        let mut batch = PipelineStart::<u32>::new()
1503            .stage(multiply_and_collect, r)
1504            .build_batch(16);
1505
1506        batch.input_mut().extend_from_slice(&[1, 2, 3]);
1507        batch.run(&mut world);
1508
1509        assert_eq!(world.resource::<Vec<u64>>().as_slice(), &[10, 20, 30]);
1510    }
1511}