Skip to main content

nexus_rt/
ctx_pipeline.rs

1// Builder return types are necessarily complex — each combinator returns
2// CtxPipelineChain<C, In, Out, NodeType<Chain, ...>>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4// Handler arity is architecturally required by the Param trait — handlers
5// take N typed parameters and the macro-generated dispatch impls expand
6// per-arity into call_inner functions with N + Input arguments. Module-level
7// allow rather than one inline attribute per arity expansion.
8#![allow(clippy::too_many_arguments)]
9
10//! Context-aware pipeline dispatch.
11//!
12//! Mirrors the [`pipeline`](crate::pipeline) module but threads `&mut C`
13//! (per-instance context) through every step. Designed for use inside
14//! [`Callback`](crate::Callback) where each handler instance owns private
15//! state that pipeline steps need to read or mutate.
16//!
17//! # Step function convention
18//!
19//! Context first, then Params, then step input last:
20//!
21//! ```ignore
22//! fn check_retries(ctx: &mut ReconnectCtx, config: Res<Config>, _input: ()) -> Option<()> {
23//!     if ctx.retries < config.max_retries { ctx.retries += 1; Some(()) } else { None }
24//! }
25//! fn attempt_connect(ctx: &mut ReconnectCtx, _input: ()) -> Result<TcpStream, io::Error> {
26//!     TcpStream::connect(&ctx.addr)
27//! }
28//! ```
29//!
30//! # Integration with Callback
31//!
32//! The built [`CtxPipeline`] implements [`CtxStepCall`] — it takes
33//! `&mut C`, `&mut World`, and `In`, returning `Out`.
34//!
35//! To use a pipeline from a [`Handler`](crate::Handler), create a normal
36//! [`Callback`](crate::Callback) whose handler function owns or accesses
37//! the context `C` and calls the pipeline via its `run` method, passing
38//! `&mut C`, `&mut World`, and the handler input. For pipelines that
39//! return a non-unit value, use [`CtxPipelineChain::run`] directly —
40//! `.build()` is only available when `Out = ()` or `Out = Option<()>`.
41//!
42//! # Three-tier step resolution
43//!
44//! Each combinator accepts functions via three tiers, matching the
45//! [`pipeline`](crate::pipeline) module:
46//!
47//! 1. **Named function with Params** — `fn(&mut C, Res<T>, In) -> Out`
48//! 2. **Arity-0 closure** — `FnMut(&mut C, In) -> Out`
49//! 3. **[`Opaque`] closure** — `FnMut(&mut C, &mut World, In) -> Out`
50//!    (raw World access, no Param resolution)
51//!
52//! # Deferred combinators
53//!
54//! The following combinators from [`pipeline`](crate::pipeline) are not yet
55//! implemented: `scan`, `dedup`, `dispatch`, `route`, `tee`, `splat`,
56//! `cloned`, `not`/`and`/`or`/`xor` (bool), `ok_or_else`, `or_else`,
57//! `Result::unwrap_or_else`, and `BatchPipeline`. These can be added when
58//! a concrete use case requires them.
59
60use std::marker::PhantomData;
61
62use crate::handler::{Opaque, Param};
63use crate::world::{Registry, World};
64
65// =============================================================================
66// CtxStep — pre-resolved step with Param state (context-aware)
67// =============================================================================
68
69/// Internal: pre-resolved context-aware step with cached Param state.
70#[doc(hidden)]
71pub struct CtxStep<F, Params: Param> {
72    f: F,
73    state: Params::State,
74    // Retained for future diagnostic/tracing use (step name in error messages).
75    #[allow(dead_code)]
76    name: &'static str,
77}
78
79// =============================================================================
80// CtxStepCall — callable trait for context-aware resolved steps
81// =============================================================================
82
83/// Internal: callable trait for context-aware resolved steps.
84///
85/// Like [`StepCall`](crate::pipeline::StepCall) but with `&mut C` context.
86#[doc(hidden)]
87pub trait CtxStepCall<C, In> {
88    /// The output type of this step.
89    type Out;
90    /// Call this step with context, world, and input.
91    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Self::Out;
92}
93
94// =============================================================================
95// IntoCtxStep — converts a named function into a context-aware step
96// =============================================================================
97
98/// Converts a named function into a pre-resolved context-aware pipeline step.
99///
100/// Three tiers of resolution (same API for all):
101///
102/// | Params | Function shape | Example |
103/// |--------|---------------|---------|
104/// | Named | `fn(&mut C, Res<T>, In) -> Out` | `fn process(ctx: &mut Ctx, cfg: Res<Config>, x: u32) -> u64` |
105/// | `()` | `FnMut(&mut C, In) -> Out` | `\|ctx: &mut Ctx, x: u32\| x * 2` |
106/// | [`Opaque`] | `FnMut(&mut C, &mut World, In) -> Out` | `\|ctx: &mut Ctx, w: &mut World, x: u32\| { ... }` |
107///
108/// # Examples
109///
110/// ```ignore
111/// // Arity 0 — closure works
112/// let step = (|ctx: &mut Ctx, x: u32| { ctx.count += 1; x * 2 }).into_ctx_step(registry);
113///
114/// // Arity 1 — named function required
115/// fn validate(ctx: &mut Ctx, config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
116/// let step = validate.into_ctx_step(registry);
117///
118/// // Opaque — raw World access
119/// let step = (|ctx: &mut Ctx, w: &mut World, x: u32| { ... }).into_ctx_step(registry);
120/// ```
121#[diagnostic::on_unimplemented(
122    message = "this function cannot be used as a context-aware pipeline step",
123    note = "ctx step signature: `fn(&mut C, Params..., In) -> Out` — context first, resources, input last",
124    note = "for raw World access: `fn(&mut C, &mut World, In) -> Out`",
125    note = "closures with resource parameters are not supported — use a named `fn`"
126)]
127pub trait IntoCtxStep<C, In, Out, Params> {
128    /// The concrete resolved step type.
129    type Step: CtxStepCall<C, In, Out = Out>;
130
131    /// Resolve Param state from the registry and produce a step.
132    fn into_ctx_step(self, registry: &Registry) -> Self::Step;
133}
134
135// =============================================================================
136// Arity 0 — fn(&mut C, In) -> Out — closures work
137// =============================================================================
138
139impl<C, In, Out, F: FnMut(&mut C, In) -> Out + 'static> CtxStepCall<C, In> for CtxStep<F, ()> {
140    type Out = Out;
141    #[inline(always)]
142    fn call(&mut self, ctx: &mut C, _world: &mut World, input: In) -> Out {
143        (self.f)(ctx, input)
144    }
145}
146
147impl<C, In, Out, F: FnMut(&mut C, In) -> Out + 'static> IntoCtxStep<C, In, Out, ()> for F {
148    type Step = CtxStep<F, ()>;
149
150    fn into_ctx_step(self, registry: &Registry) -> Self::Step {
151        CtxStep {
152            f: self,
153            state: <() as Param>::init(registry),
154            name: std::any::type_name::<F>(),
155        }
156    }
157}
158
159// =============================================================================
160// Arities 1-8 via macro — HRTB with -> Out
161// =============================================================================
162
163macro_rules! impl_into_ctx_step {
164    ($($P:ident),+) => {
165        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
166            CtxStepCall<C, In> for CtxStep<F, ($($P,)+)>
167        where
168            for<'a> &'a mut F:
169                FnMut(&mut C, $($P,)+ In) -> Out +
170                FnMut(&mut C, $($P::Item<'a>,)+ In) -> Out,
171        {
172            type Out = Out;
173            #[inline(always)]
174            #[allow(non_snake_case)]
175            fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Out {
176                fn call_inner<Ctx, $($P,)+ Input, Output>(
177                    mut f: impl FnMut(&mut Ctx, $($P,)+ Input) -> Output,
178                    ctx: &mut Ctx,
179                    $($P: $P,)+
180                    input: Input,
181                ) -> Output {
182                    f(ctx, $($P,)+ input)
183                }
184
185                // SAFETY: state was produced by Param::init() on the same Registry
186                // that built this World. Borrows are disjoint — enforced by
187                // conflict detection at build time.
188                // SAFETY: state was produced by Param::init() on the same
189                // Registry that built this World. Single-threaded sequential
190                // dispatch ensures no mutable aliasing across params.
191                #[cfg(debug_assertions)]
192                world.clear_borrows();
193                let ($($P,)+) = unsafe {
194                    <($($P,)+) as Param>::fetch(world, &mut self.state)
195                };
196                call_inner(&mut self.f, ctx, $($P,)+ input)
197            }
198        }
199
200        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
201            IntoCtxStep<C, In, Out, ($($P,)+)> for F
202        where
203            for<'a> &'a mut F:
204                FnMut(&mut C, $($P,)+ In) -> Out +
205                FnMut(&mut C, $($P::Item<'a>,)+ In) -> Out,
206        {
207            type Step = CtxStep<F, ($($P,)+)>;
208
209            fn into_ctx_step(self, registry: &Registry) -> Self::Step {
210                let state = <($($P,)+) as Param>::init(registry);
211                {
212                    #[allow(non_snake_case)]
213                    let ($($P,)+) = &state;
214                    registry.check_access(&[
215                        $(
216                            (<$P as Param>::resource_id($P),
217                             std::any::type_name::<$P>()),
218                        )+
219                    ]);
220                }
221                CtxStep { f: self, state, name: std::any::type_name::<F>() }
222            }
223        }
224    };
225}
226
227all_tuples!(impl_into_ctx_step);
228
229// =============================================================================
230// No-input impls — IntoCtxStep with In = (), no trailing input parameter
231// =============================================================================
232
233use crate::handler::NoEvent;
234
235// Arity 0: fn(&mut C) -> Out — no params, no input
236impl<C, Out, F: FnMut(&mut C) -> Out + 'static> CtxStepCall<C, ()> for CtxStep<NoEvent<F>, ()> {
237    type Out = Out;
238    #[inline(always)]
239    fn call(&mut self, ctx: &mut C, _world: &mut World, _input: ()) -> Out {
240        (self.f.0)(ctx)
241    }
242}
243
244impl<C, Out, F: FnMut(&mut C) -> Out + 'static> IntoCtxStep<C, (), Out, NoEvent<F>> for F {
245    type Step = CtxStep<NoEvent<F>, ()>;
246
247    fn into_ctx_step(self, registry: &Registry) -> Self::Step {
248        CtxStep {
249            f: NoEvent(self),
250            state: <() as Param>::init(registry),
251            name: std::any::type_name::<F>(),
252        }
253    }
254}
255
256// Arities 1-8: fn(&mut C, Params...) -> Out — no trailing input
257macro_rules! impl_into_ctx_step_no_event {
258    ($($P:ident),+) => {
259        impl<C, Out, F: 'static, $($P: Param + 'static),+>
260            CtxStepCall<C, ()> for CtxStep<NoEvent<F>, ($($P,)+)>
261        where
262            for<'a> &'a mut F:
263                FnMut(&mut C, $($P,)+) -> Out +
264                FnMut(&mut C, $($P::Item<'a>,)+) -> Out,
265        {
266            type Out = Out;
267            #[inline(always)]
268            #[allow(non_snake_case)]
269            fn call(&mut self, ctx: &mut C, world: &mut World, _input: ()) -> Out {
270                fn call_inner<Ctx, $($P,)+ Output>(
271                    mut f: impl FnMut(&mut Ctx, $($P,)+) -> Output,
272                    ctx: &mut Ctx,
273                    $($P: $P,)+
274                ) -> Output {
275                    f(ctx, $($P,)+)
276                }
277
278                #[cfg(debug_assertions)]
279                world.clear_borrows();
280                let ($($P,)+) = unsafe {
281                    <($($P,)+) as Param>::fetch(world, &mut self.state)
282                };
283                call_inner(&mut self.f.0, ctx, $($P,)+)
284            }
285        }
286
287        impl<C, Out, F: 'static, $($P: Param + 'static),+>
288            IntoCtxStep<C, (), Out, ($($P,)+)> for NoEvent<F>
289        where
290            for<'a> &'a mut F:
291                FnMut(&mut C, $($P,)+) -> Out +
292                FnMut(&mut C, $($P::Item<'a>,)+) -> Out,
293        {
294            type Step = CtxStep<NoEvent<F>, ($($P,)+)>;
295
296            fn into_ctx_step(self, registry: &Registry) -> Self::Step {
297                let state = <($($P,)+) as Param>::init(registry);
298                {
299                    #[allow(non_snake_case)]
300                    let ($($P,)+) = &state;
301                    registry.check_access(&[
302                        $(
303                            (<$P as Param>::resource_id($P),
304                             std::any::type_name::<$P>()),
305                        )+
306                    ]);
307                }
308                CtxStep { f: self, state, name: std::any::type_name::<F>() }
309            }
310        }
311    };
312}
313
314all_tuples!(impl_into_ctx_step_no_event);
315
316// -- Opaque: FnMut(&mut C, &mut World, In) -> Out ----------------------------
317
318/// Internal: wrapper for opaque closures used as context-aware steps.
319///
320/// Unlike [`CtxStep<F, P>`] which stores resolved `Param::State`, this
321/// holds only the function — the closure receives `&mut World` directly.
322#[doc(hidden)]
323pub struct CtxOpaqueStep<F> {
324    f: F,
325    // Retained for future diagnostic/tracing use (step name in error messages).
326    #[allow(dead_code)]
327    name: &'static str,
328}
329
330impl<C, In, Out, F: FnMut(&mut C, &mut World, In) -> Out + 'static> CtxStepCall<C, In>
331    for CtxOpaqueStep<F>
332{
333    type Out = Out;
334    #[inline(always)]
335    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Out {
336        (self.f)(ctx, world, input)
337    }
338}
339
340impl<C, In, Out, F: FnMut(&mut C, &mut World, In) -> Out + 'static> IntoCtxStep<C, In, Out, Opaque>
341    for F
342{
343    type Step = CtxOpaqueStep<F>;
344
345    fn into_ctx_step(self, _registry: &Registry) -> Self::Step {
346        CtxOpaqueStep {
347            f: self,
348            name: std::any::type_name::<F>(),
349        }
350    }
351}
352
353// =============================================================================
354// CtxRefStepCall / IntoCtxRefStep — context-aware step taking &In
355// =============================================================================
356
357/// Internal: callable trait for context-aware steps taking input by reference.
358///
359/// Used by combinators like `tap`, `guard`, `filter` that observe the
360/// value without consuming it.
361#[doc(hidden)]
362pub trait CtxRefStepCall<C, In> {
363    /// The output type of this step.
364    type Out;
365    /// Call this step with context, world, and borrowed input.
366    fn call(&mut self, ctx: &mut C, world: &mut World, input: &In) -> Self::Out;
367}
368
369/// Converts a function into a context-aware step taking input by reference.
370///
371/// Used by combinators like `tap`, `guard`, `filter`, `inspect` that
372/// observe the value without consuming it.
373///
374/// | Params | Function shape | Example |
375/// |--------|---------------|---------|
376/// | Named | `fn(&mut C, Res<T>, &In) -> Out` | `fn check(ctx: &mut Ctx, cfg: Res<Config>, o: &Order) -> bool` |
377/// | `()` | `FnMut(&mut C, &In) -> Out` | `\|ctx: &mut Ctx, o: &Order\| o.qty > 0` |
378/// | [`Opaque`] | `FnMut(&mut C, &mut World, &In) -> Out` | `\|ctx: &mut Ctx, w: &mut World, o: &Order\| { ... }` |
379#[diagnostic::on_unimplemented(
380    message = "this function cannot be used as a context-aware reference step",
381    note = "ctx ref step signature: `fn(&mut C, Params..., &In) -> Out`",
382    note = "for raw World access: `fn(&mut C, &mut World, &In) -> Out`",
383    note = "closures with resource parameters are not supported — use a named `fn`"
384)]
385pub trait IntoCtxRefStep<C, In, Out, Params> {
386    /// The concrete resolved step type.
387    type Step: CtxRefStepCall<C, In, Out = Out>;
388
389    /// Resolve Param state from the registry and produce a step.
390    fn into_ctx_ref_step(self, registry: &Registry) -> Self::Step;
391}
392
393// -- Arity 0: FnMut(&mut C, &In) -> Out — closures work ----------------------
394
395impl<C, In, Out, F: FnMut(&mut C, &In) -> Out + 'static> CtxRefStepCall<C, In> for CtxStep<F, ()> {
396    type Out = Out;
397    #[inline(always)]
398    fn call(&mut self, ctx: &mut C, _world: &mut World, input: &In) -> Out {
399        (self.f)(ctx, input)
400    }
401}
402
403impl<C, In, Out, F: FnMut(&mut C, &In) -> Out + 'static> IntoCtxRefStep<C, In, Out, ()> for F {
404    type Step = CtxStep<F, ()>;
405
406    fn into_ctx_ref_step(self, registry: &Registry) -> Self::Step {
407        CtxStep {
408            f: self,
409            state: <() as Param>::init(registry),
410            name: std::any::type_name::<F>(),
411        }
412    }
413}
414
415// -- Arities 1-8: named functions with Param resolution -----------------------
416
417macro_rules! impl_into_ctx_ref_step {
418    ($($P:ident),+) => {
419        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
420            CtxRefStepCall<C, In> for CtxStep<F, ($($P,)+)>
421        where
422            for<'a> &'a mut F:
423                FnMut(&mut C, $($P,)+ &In) -> Out +
424                FnMut(&mut C, $($P::Item<'a>,)+ &In) -> Out,
425        {
426            type Out = Out;
427            #[inline(always)]
428            #[allow(non_snake_case)]
429            fn call(&mut self, ctx: &mut C, world: &mut World, input: &In) -> Out {
430                fn call_inner<Ctx, $($P,)+ Input: ?Sized, Output>(
431                    mut f: impl FnMut(&mut Ctx, $($P,)+ &Input) -> Output,
432                    ctx: &mut Ctx,
433                    $($P: $P,)+
434                    input: &Input,
435                ) -> Output {
436                    f(ctx, $($P,)+ input)
437                }
438
439                // SAFETY: state was produced by Param::init() on the same
440                // Registry that built this World. Single-threaded sequential
441                // dispatch ensures no mutable aliasing across params.
442                #[cfg(debug_assertions)]
443                world.clear_borrows();
444                let ($($P,)+) = unsafe {
445                    <($($P,)+) as Param>::fetch(world, &mut self.state)
446                };
447                call_inner(&mut self.f, ctx, $($P,)+ input)
448            }
449        }
450
451        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
452            IntoCtxRefStep<C, In, Out, ($($P,)+)> for F
453        where
454            for<'a> &'a mut F:
455                FnMut(&mut C, $($P,)+ &In) -> Out +
456                FnMut(&mut C, $($P::Item<'a>,)+ &In) -> Out,
457        {
458            type Step = CtxStep<F, ($($P,)+)>;
459
460            fn into_ctx_ref_step(self, registry: &Registry) -> Self::Step {
461                let state = <($($P,)+) as Param>::init(registry);
462                {
463                    #[allow(non_snake_case)]
464                    let ($($P,)+) = &state;
465                    registry.check_access(&[
466                        $(
467                            (<$P as Param>::resource_id($P),
468                             std::any::type_name::<$P>()),
469                        )+
470                    ]);
471                }
472                CtxStep { f: self, state, name: std::any::type_name::<F>() }
473            }
474        }
475    };
476}
477
478all_tuples!(impl_into_ctx_ref_step);
479
480// -- Opaque: FnMut(&mut C, &mut World, &In) -> Out ---------------------------
481
482/// Internal: wrapper for opaque closures taking input by reference.
483#[doc(hidden)]
484pub struct CtxOpaqueRefStep<F> {
485    f: F,
486    // Retained for future diagnostic/tracing use (step name in error messages).
487    #[allow(dead_code)]
488    name: &'static str,
489}
490
491impl<C, In, Out, F: FnMut(&mut C, &mut World, &In) -> Out + 'static> CtxRefStepCall<C, In>
492    for CtxOpaqueRefStep<F>
493{
494    type Out = Out;
495    #[inline(always)]
496    fn call(&mut self, ctx: &mut C, world: &mut World, input: &In) -> Out {
497        (self.f)(ctx, world, input)
498    }
499}
500
501impl<C, In, Out, F: FnMut(&mut C, &mut World, &In) -> Out + 'static>
502    IntoCtxRefStep<C, In, Out, Opaque> for F
503{
504    type Step = CtxOpaqueRefStep<F>;
505
506    fn into_ctx_ref_step(self, _registry: &Registry) -> Self::Step {
507        CtxOpaqueRefStep {
508            f: self,
509            name: std::any::type_name::<F>(),
510        }
511    }
512}
513
514// =============================================================================
515// CtxProducerCall / IntoCtxProducer — context-aware producer (no pipeline input)
516// =============================================================================
517
518/// Internal: callable trait for context-aware producers.
519///
520/// Used by combinators like `on_none`, `unwrap_or_else`.
521#[doc(hidden)]
522pub trait CtxProducerCall<C> {
523    /// The output type of this producer.
524    type Out;
525    /// Call this producer with context and world.
526    fn call(&mut self, ctx: &mut C, world: &mut World) -> Self::Out;
527}
528
529/// Converts a function into a context-aware producer step.
530///
531/// Used by combinators like `on_none`, `unwrap_or_else`.
532///
533/// | Params | Function shape | Example |
534/// |--------|---------------|---------|
535/// | Named | `fn(&mut C, Res<T>) -> Out` | `fn default_val(ctx: &mut Ctx, cfg: Res<Config>) -> u64` |
536/// | `()` | `FnMut(&mut C) -> Out` | `\|ctx: &mut Ctx\| ctx.fallback` |
537/// | [`Opaque`] | `FnMut(&mut C, &mut World) -> Out` | `\|ctx: &mut Ctx, w: &mut World\| { ... }` |
538#[diagnostic::on_unimplemented(
539    message = "this function cannot be used as a context-aware producer",
540    note = "ctx producer signature: `fn(&mut C, Params...) -> Out`",
541    note = "for raw World access: `fn(&mut C, &mut World) -> Out`",
542    note = "closures with resource parameters are not supported — use a named `fn`"
543)]
544pub trait IntoCtxProducer<C, Out, Params> {
545    /// The concrete resolved producer type.
546    type Step: CtxProducerCall<C, Out = Out>;
547
548    /// Resolve Param state from the registry and produce a step.
549    fn into_ctx_producer(self, registry: &Registry) -> Self::Step;
550}
551
552// -- Arity 0: FnMut(&mut C) -> Out — closures work ----------------------------
553
554impl<C, Out, F: FnMut(&mut C) -> Out + 'static> CtxProducerCall<C> for CtxStep<F, ()> {
555    type Out = Out;
556    #[inline(always)]
557    fn call(&mut self, ctx: &mut C, _world: &mut World) -> Out {
558        (self.f)(ctx)
559    }
560}
561
562impl<C, Out, F: FnMut(&mut C) -> Out + 'static> IntoCtxProducer<C, Out, ()> for F {
563    type Step = CtxStep<F, ()>;
564
565    fn into_ctx_producer(self, registry: &Registry) -> Self::Step {
566        CtxStep {
567            f: self,
568            state: <() as Param>::init(registry),
569            name: std::any::type_name::<F>(),
570        }
571    }
572}
573
574// -- Arities 1-8: named functions with Param resolution -----------------------
575
576macro_rules! impl_into_ctx_producer {
577    ($($P:ident),+) => {
578        impl<C, Out, F: 'static, $($P: Param + 'static),+>
579            CtxProducerCall<C> for CtxStep<F, ($($P,)+)>
580        where
581            for<'a> &'a mut F:
582                FnMut(&mut C, $($P,)+) -> Out +
583                FnMut(&mut C, $($P::Item<'a>,)+) -> Out,
584        {
585            type Out = Out;
586            #[inline(always)]
587            #[allow(non_snake_case)]
588            fn call(&mut self, ctx: &mut C, world: &mut World) -> Out {
589                fn call_inner<Ctx, $($P,)+ Output>(
590                    mut f: impl FnMut(&mut Ctx, $($P,)+) -> Output,
591                    ctx: &mut Ctx,
592                    $($P: $P,)+
593                ) -> Output {
594                    f(ctx, $($P,)+)
595                }
596
597                // SAFETY: state was produced by Param::init() on the same
598                // Registry that built this World. Single-threaded sequential
599                // dispatch ensures no mutable aliasing across params.
600                #[cfg(debug_assertions)]
601                world.clear_borrows();
602                let ($($P,)+) = unsafe {
603                    <($($P,)+) as Param>::fetch(world, &mut self.state)
604                };
605                call_inner(&mut self.f, ctx, $($P,)+)
606            }
607        }
608
609        impl<C, Out, F: 'static, $($P: Param + 'static),+>
610            IntoCtxProducer<C, Out, ($($P,)+)> for F
611        where
612            for<'a> &'a mut F:
613                FnMut(&mut C, $($P,)+) -> Out +
614                FnMut(&mut C, $($P::Item<'a>,)+) -> Out,
615        {
616            type Step = CtxStep<F, ($($P,)+)>;
617
618            fn into_ctx_producer(self, registry: &Registry) -> Self::Step {
619                let state = <($($P,)+) as Param>::init(registry);
620                {
621                    #[allow(non_snake_case)]
622                    let ($($P,)+) = &state;
623                    registry.check_access(&[
624                        $(
625                            (<$P as Param>::resource_id($P),
626                             std::any::type_name::<$P>()),
627                        )+
628                    ]);
629                }
630                CtxStep { f: self, state, name: std::any::type_name::<F>() }
631            }
632        }
633    };
634}
635
636all_tuples!(impl_into_ctx_producer);
637
638// -- Opaque: FnMut(&mut C, &mut World) -> Out --------------------------------
639
640/// Internal: wrapper for opaque closures used as context-aware producers.
641#[doc(hidden)]
642pub struct CtxOpaqueProducer<F> {
643    f: F,
644    // Retained for future diagnostic/tracing use (step name in error messages).
645    #[allow(dead_code)]
646    name: &'static str,
647}
648
649impl<C, Out, F: FnMut(&mut C, &mut World) -> Out + 'static> CtxProducerCall<C>
650    for CtxOpaqueProducer<F>
651{
652    type Out = Out;
653    #[inline(always)]
654    fn call(&mut self, ctx: &mut C, world: &mut World) -> Out {
655        (self.f)(ctx, world)
656    }
657}
658
659impl<C, Out, F: FnMut(&mut C, &mut World) -> Out + 'static> IntoCtxProducer<C, Out, Opaque> for F {
660    type Step = CtxOpaqueProducer<F>;
661
662    fn into_ctx_producer(self, _registry: &Registry) -> Self::Step {
663        CtxOpaqueProducer {
664            f: self,
665            name: std::any::type_name::<F>(),
666        }
667    }
668}
669
670// =============================================================================
671// CtxChainCall — callable trait for context-aware chain nodes
672// =============================================================================
673
674/// Internal: callable trait for context-aware chain nodes.
675///
676/// Like [`ChainCall`](crate::pipeline::ChainCall) but threads `&mut C`.
677#[doc(hidden)]
678pub trait CtxChainCall<C, In> {
679    /// The output type of this chain node.
680    type Out;
681    /// Execute the chain with context, world, and input.
682    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Self::Out;
683}
684
685// =============================================================================
686// Chain nodes — named types for context-aware pipeline composition
687// =============================================================================
688
689/// Identity passthrough node. Used as the initial chain element.
690#[doc(hidden)]
691pub struct CtxIdentityNode;
692
693impl<C, In> CtxChainCall<C, In> for CtxIdentityNode {
694    type Out = In;
695    #[inline(always)]
696    fn call(&mut self, _ctx: &mut C, _world: &mut World, input: In) -> In {
697        input
698    }
699}
700
701// -- Core (any Out) ----------------------------------------------------------
702
703/// Chain node for `.then()` — transforms output via a context-aware step.
704#[doc(hidden)]
705pub struct CtxThenNode<Prev, S> {
706    pub(crate) prev: Prev,
707    pub(crate) step: S,
708}
709
710impl<C, In, Prev, S> CtxChainCall<C, In> for CtxThenNode<Prev, S>
711where
712    Prev: CtxChainCall<C, In>,
713    S: CtxStepCall<C, Prev::Out>,
714{
715    type Out = S::Out;
716    #[inline(always)]
717    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> S::Out {
718        let mid = self.prev.call(ctx, world, input);
719        self.step.call(ctx, world, mid)
720    }
721}
722
723/// Chain node for `.tap()` — side effect via `&Out`, value passes through.
724#[doc(hidden)]
725pub struct CtxTapNode<Prev, S> {
726    pub(crate) prev: Prev,
727    pub(crate) step: S,
728}
729
730impl<C, In, Prev, S> CtxChainCall<C, In> for CtxTapNode<Prev, S>
731where
732    Prev: CtxChainCall<C, In>,
733    S: CtxRefStepCall<C, Prev::Out, Out = ()>,
734{
735    type Out = Prev::Out;
736    #[inline(always)]
737    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Prev::Out {
738        let val = self.prev.call(ctx, world, input);
739        self.step.call(ctx, world, &val);
740        val
741    }
742}
743
744/// Chain node for `.guard()` — wraps output in `Option` based on predicate.
745#[doc(hidden)]
746pub struct CtxGuardNode<Prev, S> {
747    pub(crate) prev: Prev,
748    pub(crate) step: S,
749}
750
751impl<C, In, Prev, S> CtxChainCall<C, In> for CtxGuardNode<Prev, S>
752where
753    Prev: CtxChainCall<C, In>,
754    S: CtxRefStepCall<C, Prev::Out, Out = bool>,
755{
756    type Out = Option<Prev::Out>;
757    #[inline(always)]
758    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<Prev::Out> {
759        let val = self.prev.call(ctx, world, input);
760        if self.step.call(ctx, world, &val) {
761            Some(val)
762        } else {
763            None
764        }
765    }
766}
767
768// -- Option nodes ------------------------------------------------------------
769
770/// Chain node for `.map()` on `Option<T>`.
771#[doc(hidden)]
772pub struct CtxMapOptionNode<Prev, S> {
773    pub(crate) prev: Prev,
774    pub(crate) step: S,
775}
776
777impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxMapOptionNode<Prev, S>
778where
779    Prev: CtxChainCall<C, In, Out = Option<T>>,
780    S: CtxStepCall<C, T>,
781{
782    type Out = Option<S::Out>;
783    #[inline(always)]
784    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<S::Out> {
785        match self.prev.call(ctx, world, input) {
786            Some(val) => Some(self.step.call(ctx, world, val)),
787            None => None,
788        }
789    }
790}
791
792/// Chain node for `.and_then()` on `Option<T>`.
793#[doc(hidden)]
794pub struct CtxAndThenNode<Prev, S> {
795    pub(crate) prev: Prev,
796    pub(crate) step: S,
797}
798
799impl<C, In, T, U, Prev, S> CtxChainCall<C, In> for CtxAndThenNode<Prev, S>
800where
801    Prev: CtxChainCall<C, In, Out = Option<T>>,
802    S: CtxStepCall<C, T, Out = Option<U>>,
803{
804    type Out = Option<U>;
805    #[inline(always)]
806    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<U> {
807        match self.prev.call(ctx, world, input) {
808            Some(val) => self.step.call(ctx, world, val),
809            None => None,
810        }
811    }
812}
813
814/// Chain node for `.filter()` on `Option<T>`.
815#[doc(hidden)]
816pub struct CtxFilterNode<Prev, S> {
817    pub(crate) prev: Prev,
818    pub(crate) step: S,
819}
820
821impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxFilterNode<Prev, S>
822where
823    Prev: CtxChainCall<C, In, Out = Option<T>>,
824    S: CtxRefStepCall<C, T, Out = bool>,
825{
826    type Out = Option<T>;
827    #[inline(always)]
828    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
829        match self.prev.call(ctx, world, input) {
830            Some(val) if self.step.call(ctx, world, &val) => Some(val),
831            _ => None,
832        }
833    }
834}
835
836/// Chain node for `.inspect()` on `Option<T>`.
837#[doc(hidden)]
838pub struct CtxInspectOptionNode<Prev, S> {
839    pub(crate) prev: Prev,
840    pub(crate) step: S,
841}
842
843impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxInspectOptionNode<Prev, S>
844where
845    Prev: CtxChainCall<C, In, Out = Option<T>>,
846    S: CtxRefStepCall<C, T, Out = ()>,
847{
848    type Out = Option<T>;
849    #[inline(always)]
850    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
851        let opt = self.prev.call(ctx, world, input);
852        if let Some(ref val) = opt {
853            self.step.call(ctx, world, val);
854        }
855        opt
856    }
857}
858
859/// Chain node for `.on_none()` on `Option<T>`.
860#[doc(hidden)]
861pub struct CtxOnNoneNode<Prev, S> {
862    pub(crate) prev: Prev,
863    pub(crate) producer: S,
864}
865
866impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxOnNoneNode<Prev, S>
867where
868    Prev: CtxChainCall<C, In, Out = Option<T>>,
869    S: CtxProducerCall<C, Out = ()>,
870{
871    type Out = Option<T>;
872    #[inline(always)]
873    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
874        let opt = self.prev.call(ctx, world, input);
875        if opt.is_none() {
876            self.producer.call(ctx, world);
877        }
878        opt
879    }
880}
881
882/// Chain node for `.unwrap_or_else()` on `Option<T>`.
883#[doc(hidden)]
884pub struct CtxUnwrapOrElseOptionNode<Prev, S> {
885    pub(crate) prev: Prev,
886    pub(crate) producer: S,
887}
888
889impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxUnwrapOrElseOptionNode<Prev, S>
890where
891    Prev: CtxChainCall<C, In, Out = Option<T>>,
892    S: CtxProducerCall<C, Out = T>,
893{
894    type Out = T;
895    #[inline(always)]
896    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> T {
897        match self.prev.call(ctx, world, input) {
898            Some(val) => val,
899            None => self.producer.call(ctx, world),
900        }
901    }
902}
903
904// -- Result nodes ------------------------------------------------------------
905
906/// Chain node for `.map()` on `Result<T, E>`.
907#[doc(hidden)]
908pub struct CtxMapResultNode<Prev, S> {
909    pub(crate) prev: Prev,
910    pub(crate) step: S,
911}
912
913impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxMapResultNode<Prev, S>
914where
915    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
916    S: CtxStepCall<C, T>,
917{
918    type Out = Result<S::Out, E>;
919    #[inline(always)]
920    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<S::Out, E> {
921        match self.prev.call(ctx, world, input) {
922            Ok(val) => Ok(self.step.call(ctx, world, val)),
923            Err(e) => Err(e),
924        }
925    }
926}
927
928/// Chain node for `.and_then()` on `Result<T, E>`.
929#[doc(hidden)]
930pub struct CtxAndThenResultNode<Prev, S> {
931    pub(crate) prev: Prev,
932    pub(crate) step: S,
933}
934
935impl<C, In, T, U, E, Prev, S> CtxChainCall<C, In> for CtxAndThenResultNode<Prev, S>
936where
937    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
938    S: CtxStepCall<C, T, Out = Result<U, E>>,
939{
940    type Out = Result<U, E>;
941    #[inline(always)]
942    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<U, E> {
943        match self.prev.call(ctx, world, input) {
944            Ok(val) => self.step.call(ctx, world, val),
945            Err(e) => Err(e),
946        }
947    }
948}
949
950/// Chain node for `.catch()` on `Result<T, E>` — handles error, transitions to `Option<T>`.
951#[doc(hidden)]
952pub struct CtxCatchNode<Prev, S> {
953    pub(crate) prev: Prev,
954    pub(crate) step: S,
955}
956
957impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxCatchNode<Prev, S>
958where
959    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
960    S: CtxStepCall<C, E, Out = ()>,
961{
962    type Out = Option<T>;
963    #[inline(always)]
964    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
965        match self.prev.call(ctx, world, input) {
966            Ok(val) => Some(val),
967            Err(e) => {
968                self.step.call(ctx, world, e);
969                None
970            }
971        }
972    }
973}
974
975/// Chain node for `.map_err()` on `Result<T, E>`.
976#[doc(hidden)]
977pub struct CtxMapErrNode<Prev, S> {
978    pub(crate) prev: Prev,
979    pub(crate) step: S,
980}
981
982impl<C, In, T, E, E2, Prev, S> CtxChainCall<C, In> for CtxMapErrNode<Prev, S>
983where
984    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
985    S: CtxStepCall<C, E, Out = E2>,
986{
987    type Out = Result<T, E2>;
988    #[inline(always)]
989    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E2> {
990        match self.prev.call(ctx, world, input) {
991            Ok(val) => Ok(val),
992            Err(e) => Err(self.step.call(ctx, world, e)),
993        }
994    }
995}
996
997/// Chain node for `.inspect_err()` on `Result<T, E>`.
998#[doc(hidden)]
999pub struct CtxInspectErrNode<Prev, S> {
1000    pub(crate) prev: Prev,
1001    pub(crate) step: S,
1002}
1003
1004impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxInspectErrNode<Prev, S>
1005where
1006    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
1007    S: CtxRefStepCall<C, E, Out = ()>,
1008{
1009    type Out = Result<T, E>;
1010    #[inline(always)]
1011    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E> {
1012        let result = self.prev.call(ctx, world, input);
1013        if let Err(ref e) = result {
1014            self.step.call(ctx, world, e);
1015        }
1016        result
1017    }
1018}
1019
1020/// Chain node for `.inspect()` on `Result<T, E>` — side effect on Ok value.
1021#[doc(hidden)]
1022pub struct CtxInspectResultNode<Prev, S> {
1023    pub(crate) prev: Prev,
1024    pub(crate) step: S,
1025}
1026
1027impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxInspectResultNode<Prev, S>
1028where
1029    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
1030    S: CtxRefStepCall<C, T, Out = ()>,
1031{
1032    type Out = Result<T, E>;
1033    #[inline(always)]
1034    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E> {
1035        let result = self.prev.call(ctx, world, input);
1036        if let Ok(ref val) = result {
1037            self.step.call(ctx, world, val);
1038        }
1039        result
1040    }
1041}
1042
1043/// Chain node for discarding `Option<()>` to `()`.
1044#[doc(hidden)]
1045pub struct CtxDiscardOptionNode<Prev> {
1046    pub(crate) prev: Prev,
1047}
1048
1049impl<C, In, Prev> CtxChainCall<C, In> for CtxDiscardOptionNode<Prev>
1050where
1051    Prev: CtxChainCall<C, In, Out = Option<()>>,
1052{
1053    type Out = ();
1054    #[inline(always)]
1055    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) {
1056        let _ = self.prev.call(ctx, world, input);
1057    }
1058}
1059
1060// =============================================================================
1061// CtxPipelineBuilder — entry point
1062// =============================================================================
1063
1064/// Entry point for building a context-aware pipeline.
1065///
1066/// Like [`PipelineBuilder`](crate::PipelineBuilder) but every step
1067/// receives `&mut C` as the first argument.
1068///
1069/// # Examples
1070///
1071/// ```ignore
1072/// let pipeline = CtxPipelineBuilder::<ReconnectCtx, ()>::new()
1073///     .then(check_retries, &reg)
1074///     .then(attempt_connect, &reg)
1075///     .build();
1076/// ```
1077#[must_use = "pipeline builder does nothing until .then() is called"]
1078pub struct CtxPipelineBuilder<C, In>(PhantomData<fn(&mut C, In)>);
1079
1080impl<C, In> CtxPipelineBuilder<C, In> {
1081    /// Create a new context-aware pipeline entry point.
1082    pub fn new() -> Self {
1083        Self(PhantomData)
1084    }
1085
1086    /// Add the first step. Params resolved from the registry.
1087    pub fn then<Out, Params, S: IntoCtxStep<C, In, Out, Params>>(
1088        self,
1089        f: S,
1090        registry: &Registry,
1091    ) -> CtxPipelineChain<C, In, Out, CtxThenNode<CtxIdentityNode, S::Step>> {
1092        CtxPipelineChain {
1093            chain: CtxThenNode {
1094                prev: CtxIdentityNode,
1095                step: f.into_ctx_step(registry),
1096            },
1097            _marker: PhantomData,
1098        }
1099    }
1100}
1101
1102impl<C, In> Default for CtxPipelineBuilder<C, In> {
1103    fn default() -> Self {
1104        Self::new()
1105    }
1106}
1107
1108// =============================================================================
1109// CtxPipelineChain — typestate builder
1110// =============================================================================
1111
1112/// Builder that composes context-aware pipeline steps via named chain nodes.
1113///
1114/// `C` is the context type. `In` is the pipeline's input type (fixed).
1115/// `Out` is the current output. `Chain` is the concrete chain type.
1116#[must_use = "pipeline chain does nothing until .build() is called"]
1117pub struct CtxPipelineChain<C, In, Out, Chain> {
1118    chain: Chain,
1119    _marker: PhantomData<fn(&mut C, In) -> Out>,
1120}
1121
1122// =============================================================================
1123// Core — any Out
1124// =============================================================================
1125
1126impl<C, In, Out, Chain: CtxChainCall<C, In, Out = Out>> CtxPipelineChain<C, In, Out, Chain> {
1127    /// Add a step. Params resolved from the registry.
1128    pub fn then<NewOut, Params, S: IntoCtxStep<C, Out, NewOut, Params>>(
1129        self,
1130        f: S,
1131        registry: &Registry,
1132    ) -> CtxPipelineChain<C, In, NewOut, CtxThenNode<Chain, S::Step>> {
1133        CtxPipelineChain {
1134            chain: CtxThenNode {
1135                prev: self.chain,
1136                step: f.into_ctx_step(registry),
1137            },
1138            _marker: PhantomData,
1139        }
1140    }
1141
1142    /// Run the pipeline directly.
1143    pub fn run(&mut self, ctx: &mut C, world: &mut World, input: In) -> Out {
1144        self.chain.call(ctx, world, input)
1145    }
1146
1147    /// Conditionally wrap the output in `Option`. `Some(val)` if
1148    /// the predicate returns true, `None` otherwise.
1149    pub fn guard<Params, S: IntoCtxRefStep<C, Out, bool, Params>>(
1150        self,
1151        f: S,
1152        registry: &Registry,
1153    ) -> CtxPipelineChain<C, In, Option<Out>, CtxGuardNode<Chain, S::Step>> {
1154        CtxPipelineChain {
1155            chain: CtxGuardNode {
1156                prev: self.chain,
1157                step: f.into_ctx_ref_step(registry),
1158            },
1159            _marker: PhantomData,
1160        }
1161    }
1162
1163    /// Observe the current value without consuming or changing it.
1164    pub fn tap<Params, S: IntoCtxRefStep<C, Out, (), Params>>(
1165        self,
1166        f: S,
1167        registry: &Registry,
1168    ) -> CtxPipelineChain<C, In, Out, CtxTapNode<Chain, S::Step>> {
1169        CtxPipelineChain {
1170            chain: CtxTapNode {
1171                prev: self.chain,
1172                step: f.into_ctx_ref_step(registry),
1173            },
1174            _marker: PhantomData,
1175        }
1176    }
1177}
1178
1179// =============================================================================
1180// Option helpers — CtxPipelineChain<C, In, Option<T>, Chain>
1181// =============================================================================
1182
1183impl<C, In, T, Chain: CtxChainCall<C, In, Out = Option<T>>>
1184    CtxPipelineChain<C, In, Option<T>, Chain>
1185{
1186    /// Transform the inner value. Step not called on None.
1187    pub fn map<U, Params, S: IntoCtxStep<C, T, U, Params>>(
1188        self,
1189        f: S,
1190        registry: &Registry,
1191    ) -> CtxPipelineChain<C, In, Option<U>, CtxMapOptionNode<Chain, S::Step>> {
1192        CtxPipelineChain {
1193            chain: CtxMapOptionNode {
1194                prev: self.chain,
1195                step: f.into_ctx_step(registry),
1196            },
1197            _marker: PhantomData,
1198        }
1199    }
1200
1201    /// Short-circuits on None. std: `Option::and_then`
1202    pub fn and_then<U, Params, S: IntoCtxStep<C, T, Option<U>, Params>>(
1203        self,
1204        f: S,
1205        registry: &Registry,
1206    ) -> CtxPipelineChain<C, In, Option<U>, CtxAndThenNode<Chain, S::Step>> {
1207        CtxPipelineChain {
1208            chain: CtxAndThenNode {
1209                prev: self.chain,
1210                step: f.into_ctx_step(registry),
1211            },
1212            _marker: PhantomData,
1213        }
1214    }
1215
1216    /// Keep value if predicate holds. std: `Option::filter`
1217    pub fn filter<Params, S: IntoCtxRefStep<C, T, bool, Params>>(
1218        self,
1219        f: S,
1220        registry: &Registry,
1221    ) -> CtxPipelineChain<C, In, Option<T>, CtxFilterNode<Chain, S::Step>> {
1222        CtxPipelineChain {
1223            chain: CtxFilterNode {
1224                prev: self.chain,
1225                step: f.into_ctx_ref_step(registry),
1226            },
1227            _marker: PhantomData,
1228        }
1229    }
1230
1231    /// Side effect on Some value. std: `Option::inspect`
1232    pub fn inspect<Params, S: IntoCtxRefStep<C, T, (), Params>>(
1233        self,
1234        f: S,
1235        registry: &Registry,
1236    ) -> CtxPipelineChain<C, In, Option<T>, CtxInspectOptionNode<Chain, S::Step>> {
1237        CtxPipelineChain {
1238            chain: CtxInspectOptionNode {
1239                prev: self.chain,
1240                step: f.into_ctx_ref_step(registry),
1241            },
1242            _marker: PhantomData,
1243        }
1244    }
1245
1246    /// Side effect on None. Complement to [`inspect`](Self::inspect).
1247    pub fn on_none<Params, S: IntoCtxProducer<C, (), Params>>(
1248        self,
1249        f: S,
1250        registry: &Registry,
1251    ) -> CtxPipelineChain<C, In, Option<T>, CtxOnNoneNode<Chain, S::Step>> {
1252        CtxPipelineChain {
1253            chain: CtxOnNoneNode {
1254                prev: self.chain,
1255                producer: f.into_ctx_producer(registry),
1256            },
1257            _marker: PhantomData,
1258        }
1259    }
1260
1261    /// Exit Option — None becomes `f()`.
1262    pub fn unwrap_or_else<Params, S: IntoCtxProducer<C, T, Params>>(
1263        self,
1264        f: S,
1265        registry: &Registry,
1266    ) -> CtxPipelineChain<C, In, T, CtxUnwrapOrElseOptionNode<Chain, S::Step>> {
1267        CtxPipelineChain {
1268            chain: CtxUnwrapOrElseOptionNode {
1269                prev: self.chain,
1270                producer: f.into_ctx_producer(registry),
1271            },
1272            _marker: PhantomData,
1273        }
1274    }
1275
1276    /// None becomes Err(err). std: `Option::ok_or`
1277    pub fn ok_or<E: Clone>(
1278        self,
1279        err: E,
1280    ) -> CtxPipelineChain<C, In, Result<T, E>, CtxOkOrNode<Chain, E>> {
1281        CtxPipelineChain {
1282            chain: CtxOkOrNode {
1283                prev: self.chain,
1284                err,
1285            },
1286            _marker: PhantomData,
1287        }
1288    }
1289
1290    /// Exit Option — None becomes the default value.
1291    pub fn unwrap_or(
1292        self,
1293        default: T,
1294    ) -> CtxPipelineChain<C, In, T, CtxUnwrapOrOptionNode<Chain, T>>
1295    where
1296        T: Clone,
1297    {
1298        CtxPipelineChain {
1299            chain: CtxUnwrapOrOptionNode {
1300                prev: self.chain,
1301                default,
1302            },
1303            _marker: PhantomData,
1304        }
1305    }
1306}
1307
1308/// Chain node for `.ok_or()` on `Option<T>`.
1309#[doc(hidden)]
1310pub struct CtxOkOrNode<Prev, E> {
1311    pub(crate) prev: Prev,
1312    pub(crate) err: E,
1313}
1314
1315impl<C, In, T, E: Clone, Prev> CtxChainCall<C, In> for CtxOkOrNode<Prev, E>
1316where
1317    Prev: CtxChainCall<C, In, Out = Option<T>>,
1318{
1319    type Out = Result<T, E>;
1320    #[inline(always)]
1321    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E> {
1322        match self.prev.call(ctx, world, input) {
1323            Some(val) => Ok(val),
1324            None => Err(self.err.clone()),
1325        }
1326    }
1327}
1328
1329/// Chain node for `.unwrap_or()` on `Option<T>`.
1330#[doc(hidden)]
1331pub struct CtxUnwrapOrOptionNode<Prev, T> {
1332    pub(crate) prev: Prev,
1333    pub(crate) default: T,
1334}
1335
1336impl<C, In, T: Clone, Prev> CtxChainCall<C, In> for CtxUnwrapOrOptionNode<Prev, T>
1337where
1338    Prev: CtxChainCall<C, In, Out = Option<T>>,
1339{
1340    type Out = T;
1341    #[inline(always)]
1342    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> T {
1343        match self.prev.call(ctx, world, input) {
1344            Some(val) => val,
1345            None => self.default.clone(),
1346        }
1347    }
1348}
1349
1350// =============================================================================
1351// Result helpers — CtxPipelineChain<C, In, Result<T, E>, Chain>
1352// =============================================================================
1353
1354impl<C, In, T, E, Chain: CtxChainCall<C, In, Out = Result<T, E>>>
1355    CtxPipelineChain<C, In, Result<T, E>, Chain>
1356{
1357    /// Transform the Ok value. Step not called on Err.
1358    pub fn map<U, Params, S: IntoCtxStep<C, T, U, Params>>(
1359        self,
1360        f: S,
1361        registry: &Registry,
1362    ) -> CtxPipelineChain<C, In, Result<U, E>, CtxMapResultNode<Chain, S::Step>> {
1363        CtxPipelineChain {
1364            chain: CtxMapResultNode {
1365                prev: self.chain,
1366                step: f.into_ctx_step(registry),
1367            },
1368            _marker: PhantomData,
1369        }
1370    }
1371
1372    /// Short-circuits on Err. std: `Result::and_then`
1373    pub fn and_then<U, Params, S: IntoCtxStep<C, T, Result<U, E>, Params>>(
1374        self,
1375        f: S,
1376        registry: &Registry,
1377    ) -> CtxPipelineChain<C, In, Result<U, E>, CtxAndThenResultNode<Chain, S::Step>> {
1378        CtxPipelineChain {
1379            chain: CtxAndThenResultNode {
1380                prev: self.chain,
1381                step: f.into_ctx_step(registry),
1382            },
1383            _marker: PhantomData,
1384        }
1385    }
1386
1387    /// Handle error and transition to Option.
1388    pub fn catch<Params, S: IntoCtxStep<C, E, (), Params>>(
1389        self,
1390        f: S,
1391        registry: &Registry,
1392    ) -> CtxPipelineChain<C, In, Option<T>, CtxCatchNode<Chain, S::Step>> {
1393        CtxPipelineChain {
1394            chain: CtxCatchNode {
1395                prev: self.chain,
1396                step: f.into_ctx_step(registry),
1397            },
1398            _marker: PhantomData,
1399        }
1400    }
1401
1402    /// Transform the error. std: `Result::map_err`
1403    pub fn map_err<E2, Params, S: IntoCtxStep<C, E, E2, Params>>(
1404        self,
1405        f: S,
1406        registry: &Registry,
1407    ) -> CtxPipelineChain<C, In, Result<T, E2>, CtxMapErrNode<Chain, S::Step>> {
1408        CtxPipelineChain {
1409            chain: CtxMapErrNode {
1410                prev: self.chain,
1411                step: f.into_ctx_step(registry),
1412            },
1413            _marker: PhantomData,
1414        }
1415    }
1416
1417    /// Side effect on Ok value. std: `Result::inspect` (nightly)
1418    pub fn inspect<Params, S: IntoCtxRefStep<C, T, (), Params>>(
1419        self,
1420        f: S,
1421        registry: &Registry,
1422    ) -> CtxPipelineChain<C, In, Result<T, E>, CtxInspectResultNode<Chain, S::Step>> {
1423        CtxPipelineChain {
1424            chain: CtxInspectResultNode {
1425                prev: self.chain,
1426                step: f.into_ctx_ref_step(registry),
1427            },
1428            _marker: PhantomData,
1429        }
1430    }
1431
1432    /// Side effect on Err value. std: `Result::inspect_err` (nightly)
1433    pub fn inspect_err<Params, S: IntoCtxRefStep<C, E, (), Params>>(
1434        self,
1435        f: S,
1436        registry: &Registry,
1437    ) -> CtxPipelineChain<C, In, Result<T, E>, CtxInspectErrNode<Chain, S::Step>> {
1438        CtxPipelineChain {
1439            chain: CtxInspectErrNode {
1440                prev: self.chain,
1441                step: f.into_ctx_ref_step(registry),
1442            },
1443            _marker: PhantomData,
1444        }
1445    }
1446
1447    /// Convert Result to Option, discarding the error.
1448    pub fn ok(self) -> CtxPipelineChain<C, In, Option<T>, CtxOkNode<Chain>> {
1449        CtxPipelineChain {
1450            chain: CtxOkNode { prev: self.chain },
1451            _marker: PhantomData,
1452        }
1453    }
1454
1455    /// Exit Result — Err becomes the default value.
1456    pub fn unwrap_or(
1457        self,
1458        default: T,
1459    ) -> CtxPipelineChain<C, In, T, CtxUnwrapOrResultNode<Chain, T>>
1460    where
1461        T: Clone,
1462    {
1463        CtxPipelineChain {
1464            chain: CtxUnwrapOrResultNode {
1465                prev: self.chain,
1466                default,
1467            },
1468            _marker: PhantomData,
1469        }
1470    }
1471}
1472
1473/// Chain node for `.ok()` on `Result<T, E>` — discards error.
1474#[doc(hidden)]
1475pub struct CtxOkNode<Prev> {
1476    pub(crate) prev: Prev,
1477}
1478
1479impl<C, In, T, E, Prev> CtxChainCall<C, In> for CtxOkNode<Prev>
1480where
1481    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
1482{
1483    type Out = Option<T>;
1484    #[inline(always)]
1485    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
1486        self.prev.call(ctx, world, input).ok()
1487    }
1488}
1489
1490/// Chain node for `.unwrap_or()` on `Result<T, E>`.
1491#[doc(hidden)]
1492pub struct CtxUnwrapOrResultNode<Prev, T> {
1493    pub(crate) prev: Prev,
1494    pub(crate) default: T,
1495}
1496
1497impl<C, In, T: Clone, E, Prev> CtxChainCall<C, In> for CtxUnwrapOrResultNode<Prev, T>
1498where
1499    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
1500{
1501    type Out = T;
1502    #[inline(always)]
1503    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> T {
1504        match self.prev.call(ctx, world, input) {
1505            Ok(val) => val,
1506            Err(_) => self.default.clone(),
1507        }
1508    }
1509}
1510
1511// =============================================================================
1512// build — when Out is () or Option<()>
1513// =============================================================================
1514
1515impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxPipelineChain<C, In, (), Chain> {
1516    /// Finalize the pipeline into a [`CtxPipeline`].
1517    #[must_use = "building a pipeline without storing it does nothing"]
1518    pub fn build(self) -> CtxPipeline<C, In, Chain> {
1519        CtxPipeline {
1520            chain: self.chain,
1521            _marker: PhantomData,
1522        }
1523    }
1524}
1525
1526impl<C, In, Chain: CtxChainCall<C, In, Out = Option<()>>>
1527    CtxPipelineChain<C, In, Option<()>, Chain>
1528{
1529    /// Finalize the pipeline, discarding `Option<()>`.
1530    #[must_use = "building a pipeline without storing it does nothing"]
1531    pub fn build(self) -> CtxPipeline<C, In, CtxDiscardOptionNode<Chain>> {
1532        CtxPipeline {
1533            chain: CtxDiscardOptionNode { prev: self.chain },
1534            _marker: PhantomData,
1535        }
1536    }
1537}
1538
1539// =============================================================================
1540// CtxPipeline — built context-aware pipeline
1541// =============================================================================
1542
1543/// Built context-aware pipeline.
1544///
1545/// Created by [`CtxPipelineChain::build`]. Implements [`CtxStepCall`]
1546/// for use inside [`Callback`](crate::Callback) dispatch.
1547pub struct CtxPipeline<C, In, Chain> {
1548    chain: Chain,
1549    _marker: PhantomData<fn(&mut C, In)>,
1550}
1551
1552impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxStepCall<C, In> for CtxPipeline<C, In, Chain> {
1553    type Out = ();
1554    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) {
1555        self.chain.call(ctx, world, input);
1556    }
1557}
1558
1559impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxPipeline<C, In, Chain> {
1560    /// Run the pipeline with context, world, and input.
1561    pub fn run(&mut self, ctx: &mut C, world: &mut World, input: In) {
1562        self.chain.call(ctx, world, input);
1563    }
1564}
1565
1566// =============================================================================
1567// resolve_ctx_step — free function for select! macro dispatch arms
1568// =============================================================================
1569
1570/// Pre-resolve a context-aware step function against a [`Registry`].
1571///
1572/// Returns a closure `FnMut(&mut C, &mut World, In) -> Out` that can be
1573/// called directly without per-call registry lookup. Used by the
1574/// [`select!`](crate::select) macro to build monomorphized dispatch arms.
1575///
1576/// # Example
1577///
1578/// ```ignore
1579/// let mut arm = resolve_ctx_step::<SessionCtx, _, _, _, _>(on_new_order, &reg);
1580/// // Later, in a match arm:
1581/// arm(&mut ctx, &mut world, order);
1582/// ```
1583pub fn resolve_ctx_step<C, In, Out, Params, S>(
1584    f: S,
1585    registry: &Registry,
1586) -> impl FnMut(&mut C, &mut World, In) -> Out + use<C, In, Out, Params, S>
1587where
1588    C: 'static,
1589    In: 'static,
1590    Out: 'static,
1591    S: IntoCtxStep<C, In, Out, Params>,
1592{
1593    let mut resolved = f.into_ctx_step(registry);
1594    move |ctx: &mut C, world: &mut World, input: In| resolved.call(ctx, world, input)
1595}
1596
1597// =============================================================================
1598// Tests
1599// =============================================================================
1600
1601#[cfg(test)]
1602mod tests {
1603    use super::*;
1604    use crate::{Res, ResMut, WorldBuilder};
1605
1606    // -- Helper types ---------------------------------------------------------
1607
1608    struct ReconnectCtx {
1609        retries: u32,
1610        last_result: Option<bool>,
1611    }
1612
1613    // -- Core dispatch --------------------------------------------------------
1614
1615    #[test]
1616    fn ctx_pipeline_three_steps_with_context_mutation() {
1617        let mut wb = WorldBuilder::new();
1618        wb.register::<u64>(10);
1619        let mut world = wb.build();
1620        let reg = world.registry();
1621
1622        // Step 1: check retries (closure, arity 0)
1623        // Step 2: multiply by resource (named fn, arity 1)
1624        // Step 3: record result in context (closure, arity 0)
1625
1626        fn multiply(ctx: &mut ReconnectCtx, factor: Res<u64>, input: u32) -> u64 {
1627            ctx.retries += 1;
1628            *factor * input as u64
1629        }
1630
1631        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1632            .then(
1633                |ctx: &mut ReconnectCtx, x: u32| {
1634                    ctx.retries += 1;
1635                    x
1636                },
1637                &reg,
1638            )
1639            .then(multiply, &reg)
1640            .then(
1641                |ctx: &mut ReconnectCtx, val: u64| {
1642                    ctx.last_result = Some(val > 0);
1643                },
1644                &reg,
1645            )
1646            .build();
1647
1648        let mut ctx = ReconnectCtx {
1649            retries: 0,
1650            last_result: None,
1651        };
1652
1653        pipeline.run(&mut ctx, &mut world, 5);
1654
1655        // Closure incremented once, named fn incremented once
1656        assert_eq!(ctx.retries, 2);
1657        // 10 * 5 = 50, which is > 0
1658        assert_eq!(ctx.last_result, Some(true));
1659    }
1660
1661    #[test]
1662    fn ctx_pipeline_guard_and_map() {
1663        let mut world = WorldBuilder::new().build();
1664        let reg = world.registry();
1665
1666        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1667            .then(|_ctx: &mut ReconnectCtx, x: u32| x, &reg)
1668            .guard(|_ctx: &mut ReconnectCtx, x: &u32| *x > 10, &reg)
1669            .map(
1670                |ctx: &mut ReconnectCtx, x: u32| {
1671                    ctx.retries += 1;
1672                    x * 2
1673                },
1674                &reg,
1675            );
1676
1677        let mut ctx = ReconnectCtx {
1678            retries: 0,
1679            last_result: None,
1680        };
1681
1682        // x = 5, guard fails, map not called
1683        let result = pipeline.run(&mut ctx, &mut world, 5);
1684        assert_eq!(result, None);
1685        assert_eq!(ctx.retries, 0);
1686
1687        // x = 20, guard passes, map called
1688        let result = pipeline.run(&mut ctx, &mut world, 20);
1689        assert_eq!(result, Some(40));
1690        assert_eq!(ctx.retries, 1);
1691    }
1692
1693    #[test]
1694    fn ctx_pipeline_and_then() {
1695        let mut world = WorldBuilder::new().build();
1696        let reg = world.registry();
1697
1698        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1699            .then(|_ctx: &mut ReconnectCtx, x: u32| Some(x), &reg)
1700            .and_then(
1701                |ctx: &mut ReconnectCtx, x: u32| {
1702                    ctx.retries += 1;
1703                    if x > 5 { Some(x * 2) } else { None }
1704                },
1705                &reg,
1706            );
1707
1708        let mut ctx = ReconnectCtx {
1709            retries: 0,
1710            last_result: None,
1711        };
1712
1713        assert_eq!(pipeline.run(&mut ctx, &mut world, 3), None);
1714        assert_eq!(ctx.retries, 1);
1715
1716        assert_eq!(pipeline.run(&mut ctx, &mut world, 10), Some(20));
1717        assert_eq!(ctx.retries, 2);
1718    }
1719
1720    #[test]
1721    fn ctx_pipeline_catch() {
1722        let mut world = WorldBuilder::new().build();
1723        let reg = world.registry();
1724
1725        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1726            .then(
1727                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
1728                    if x > 0 {
1729                        Ok(x)
1730                    } else {
1731                        Err("zero".to_string())
1732                    }
1733                },
1734                &reg,
1735            )
1736            .catch(
1737                |ctx: &mut ReconnectCtx, _err: String| {
1738                    ctx.retries += 1;
1739                },
1740                &reg,
1741            )
1742            .map(
1743                |ctx: &mut ReconnectCtx, val: u32| {
1744                    ctx.last_result = Some(true);
1745                    val
1746                },
1747                &reg,
1748            );
1749
1750        let mut ctx = ReconnectCtx {
1751            retries: 0,
1752            last_result: None,
1753        };
1754
1755        // Error path
1756        let result = pipeline.run(&mut ctx, &mut world, 0);
1757        assert_eq!(result, None);
1758        assert_eq!(ctx.retries, 1);
1759        assert_eq!(ctx.last_result, None);
1760
1761        // Ok path
1762        let result = pipeline.run(&mut ctx, &mut world, 42);
1763        assert_eq!(result, Some(42));
1764        assert_eq!(ctx.retries, 1);
1765        assert_eq!(ctx.last_result, Some(true));
1766    }
1767
1768    #[test]
1769    fn ctx_pipeline_with_res_mut() {
1770        let mut wb = WorldBuilder::new();
1771        wb.register::<u64>(0);
1772        let mut world = wb.build();
1773        let reg = world.registry();
1774
1775        fn accumulate(ctx: &mut ReconnectCtx, mut total: ResMut<u64>, val: u32) {
1776            *total += val as u64;
1777            ctx.retries += 1;
1778        }
1779
1780        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1781            .then(accumulate, &reg)
1782            .build();
1783
1784        let mut ctx = ReconnectCtx {
1785            retries: 0,
1786            last_result: None,
1787        };
1788
1789        pipeline.run(&mut ctx, &mut world, 10);
1790        pipeline.run(&mut ctx, &mut world, 5);
1791
1792        assert_eq!(*world.resource::<u64>(), 15);
1793        assert_eq!(ctx.retries, 2);
1794    }
1795
1796    #[test]
1797    fn ctx_pipeline_build_with_option_unit() {
1798        let mut world = WorldBuilder::new().build();
1799        let reg = world.registry();
1800
1801        // Pipeline that ends with Option<()> — should still build
1802        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1803            .then(
1804                |_ctx: &mut ReconnectCtx, x: u32| -> Option<u32> {
1805                    if x > 0 { Some(x) } else { None }
1806                },
1807                &reg,
1808            )
1809            .map(|_ctx: &mut ReconnectCtx, _x: u32| {}, &reg)
1810            .build();
1811
1812        let mut ctx = ReconnectCtx {
1813            retries: 0,
1814            last_result: None,
1815        };
1816
1817        // Should not panic
1818        pipeline.run(&mut ctx, &mut world, 5);
1819        pipeline.run(&mut ctx, &mut world, 0);
1820    }
1821
1822    #[test]
1823    fn ctx_pipeline_tap() {
1824        let mut world = WorldBuilder::new().build();
1825        let reg = world.registry();
1826
1827        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1828            .then(|_ctx: &mut ReconnectCtx, x: u32| x * 2, &reg)
1829            .tap(
1830                |ctx: &mut ReconnectCtx, val: &u32| {
1831                    ctx.retries = *val;
1832                },
1833                &reg,
1834            )
1835            .then(
1836                |_ctx: &mut ReconnectCtx, x: u32| {
1837                    // Value should pass through tap unchanged
1838                    assert_eq!(x, 10);
1839                },
1840                &reg,
1841            )
1842            .build();
1843
1844        let mut ctx = ReconnectCtx {
1845            retries: 0,
1846            last_result: None,
1847        };
1848
1849        pipeline.run(&mut ctx, &mut world, 5);
1850        assert_eq!(ctx.retries, 10);
1851    }
1852
1853    #[test]
1854    fn ctx_pipeline_result_map_and_map_err() {
1855        let mut world = WorldBuilder::new().build();
1856        let reg = world.registry();
1857
1858        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1859            .then(
1860                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, u32> {
1861                    if x > 0 { Ok(x) } else { Err(x) }
1862                },
1863                &reg,
1864            )
1865            .map(|_ctx: &mut ReconnectCtx, x: u32| x * 10, &reg)
1866            .map_err(
1867                |ctx: &mut ReconnectCtx, e: u32| {
1868                    ctx.retries += 1;
1869                    format!("error: {e}")
1870                },
1871                &reg,
1872            );
1873
1874        let mut ctx = ReconnectCtx {
1875            retries: 0,
1876            last_result: None,
1877        };
1878
1879        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), Ok(50));
1880        assert_eq!(
1881            pipeline.run(&mut ctx, &mut world, 0),
1882            Err("error: 0".to_string())
1883        );
1884        assert_eq!(ctx.retries, 1);
1885    }
1886
1887    #[test]
1888    fn ctx_pipeline_inspect_err() {
1889        let mut world = WorldBuilder::new().build();
1890        let reg = world.registry();
1891
1892        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1893            .then(
1894                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
1895                    if x > 0 { Ok(x) } else { Err("zero".into()) }
1896                },
1897                &reg,
1898            )
1899            .inspect_err(
1900                |ctx: &mut ReconnectCtx, _e: &String| {
1901                    ctx.retries += 1;
1902                },
1903                &reg,
1904            );
1905
1906        let mut ctx = ReconnectCtx {
1907            retries: 0,
1908            last_result: None,
1909        };
1910
1911        let _ = pipeline.run(&mut ctx, &mut world, 0);
1912        assert_eq!(ctx.retries, 1);
1913
1914        // Ok path — inspect_err not called
1915        let _ = pipeline.run(&mut ctx, &mut world, 5);
1916        assert_eq!(ctx.retries, 1);
1917    }
1918
1919    #[test]
1920    fn ctx_pipeline_filter() {
1921        let mut world = WorldBuilder::new().build();
1922        let reg = world.registry();
1923
1924        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1925            .then(|_ctx: &mut ReconnectCtx, x: u32| Some(x), &reg)
1926            .filter(|_ctx: &mut ReconnectCtx, x: &u32| *x > 10, &reg);
1927
1928        let mut ctx = ReconnectCtx {
1929            retries: 0,
1930            last_result: None,
1931        };
1932
1933        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), None);
1934        assert_eq!(pipeline.run(&mut ctx, &mut world, 20), Some(20));
1935    }
1936
1937    #[test]
1938    fn ctx_pipeline_ok_or() {
1939        let mut world = WorldBuilder::new().build();
1940        let reg = world.registry();
1941
1942        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1943            .then(
1944                |_ctx: &mut ReconnectCtx, x: u32| {
1945                    if x > 0 { Some(x) } else { None }
1946                },
1947                &reg,
1948            )
1949            .ok_or("was zero");
1950
1951        let mut ctx = ReconnectCtx {
1952            retries: 0,
1953            last_result: None,
1954        };
1955
1956        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), Ok(5));
1957        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), Err("was zero"));
1958    }
1959
1960    #[test]
1961    fn ctx_pipeline_unwrap_or_option() {
1962        let mut world = WorldBuilder::new().build();
1963        let reg = world.registry();
1964
1965        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1966            .then(
1967                |_ctx: &mut ReconnectCtx, x: u32| {
1968                    if x > 0 { Some(x) } else { None }
1969                },
1970                &reg,
1971            )
1972            .unwrap_or(99);
1973
1974        let mut ctx = ReconnectCtx {
1975            retries: 0,
1976            last_result: None,
1977        };
1978
1979        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
1980        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 99);
1981    }
1982
1983    #[test]
1984    fn ctx_pipeline_unwrap_or_else_option() {
1985        let mut world = WorldBuilder::new().build();
1986        let reg = world.registry();
1987
1988        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1989            .then(
1990                |_ctx: &mut ReconnectCtx, x: u32| {
1991                    if x > 0 { Some(x) } else { None }
1992                },
1993                &reg,
1994            )
1995            .unwrap_or_else(
1996                |ctx: &mut ReconnectCtx| {
1997                    ctx.retries += 1;
1998                    42
1999                },
2000                &reg,
2001            );
2002
2003        let mut ctx = ReconnectCtx {
2004            retries: 0,
2005            last_result: None,
2006        };
2007
2008        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
2009        assert_eq!(ctx.retries, 0);
2010        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 42);
2011        assert_eq!(ctx.retries, 1);
2012    }
2013
2014    #[test]
2015    fn ctx_pipeline_inspect_option() {
2016        let mut world = WorldBuilder::new().build();
2017        let reg = world.registry();
2018
2019        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2020            .then(
2021                |_ctx: &mut ReconnectCtx, x: u32| {
2022                    if x > 0 { Some(x) } else { None }
2023                },
2024                &reg,
2025            )
2026            .inspect(
2027                |ctx: &mut ReconnectCtx, val: &u32| {
2028                    ctx.retries = *val;
2029                },
2030                &reg,
2031            );
2032
2033        let mut ctx = ReconnectCtx {
2034            retries: 0,
2035            last_result: None,
2036        };
2037
2038        // Some path — inspect fires
2039        let _ = pipeline.run(&mut ctx, &mut world, 7);
2040        assert_eq!(ctx.retries, 7);
2041
2042        // None path — inspect skipped
2043        let _ = pipeline.run(&mut ctx, &mut world, 0);
2044        assert_eq!(ctx.retries, 7);
2045    }
2046
2047    #[test]
2048    fn ctx_pipeline_on_none() {
2049        let mut world = WorldBuilder::new().build();
2050        let reg = world.registry();
2051
2052        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2053            .then(
2054                |_ctx: &mut ReconnectCtx, x: u32| {
2055                    if x > 0 { Some(x) } else { None }
2056                },
2057                &reg,
2058            )
2059            .on_none(
2060                |ctx: &mut ReconnectCtx| {
2061                    ctx.retries += 1;
2062                },
2063                &reg,
2064            );
2065
2066        let mut ctx = ReconnectCtx {
2067            retries: 0,
2068            last_result: None,
2069        };
2070
2071        // Some path — on_none not called
2072        let result = pipeline.run(&mut ctx, &mut world, 5);
2073        assert_eq!(result, Some(5));
2074        assert_eq!(ctx.retries, 0);
2075
2076        // None path — on_none called
2077        let result = pipeline.run(&mut ctx, &mut world, 0);
2078        assert_eq!(result, None);
2079        assert_eq!(ctx.retries, 1);
2080    }
2081
2082    #[test]
2083    fn ctx_pipeline_ok_result() {
2084        let mut world = WorldBuilder::new().build();
2085        let reg = world.registry();
2086
2087        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2088            .then(
2089                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
2090                    if x > 0 { Ok(x) } else { Err("zero".into()) }
2091                },
2092                &reg,
2093            )
2094            .ok();
2095
2096        let mut ctx = ReconnectCtx {
2097            retries: 0,
2098            last_result: None,
2099        };
2100
2101        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), Some(5));
2102        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), None);
2103    }
2104
2105    #[test]
2106    fn ctx_pipeline_unwrap_or_result() {
2107        let mut world = WorldBuilder::new().build();
2108        let reg = world.registry();
2109
2110        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2111            .then(
2112                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
2113                    if x > 0 { Ok(x) } else { Err("zero".into()) }
2114                },
2115                &reg,
2116            )
2117            .unwrap_or(99);
2118
2119        let mut ctx = ReconnectCtx {
2120            retries: 0,
2121            last_result: None,
2122        };
2123
2124        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
2125        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 99);
2126    }
2127
2128    #[test]
2129    fn ctx_pipeline_inspect_result() {
2130        let mut world = WorldBuilder::new().build();
2131        let reg = world.registry();
2132
2133        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2134            .then(
2135                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
2136                    if x > 0 { Ok(x) } else { Err("zero".into()) }
2137                },
2138                &reg,
2139            )
2140            .inspect(
2141                |ctx: &mut ReconnectCtx, val: &u32| {
2142                    ctx.retries = *val;
2143                },
2144                &reg,
2145            );
2146
2147        let mut ctx = ReconnectCtx {
2148            retries: 0,
2149            last_result: None,
2150        };
2151
2152        // Ok path — inspect fires
2153        let _ = pipeline.run(&mut ctx, &mut world, 7);
2154        assert_eq!(ctx.retries, 7);
2155
2156        // Err path — inspect skipped
2157        let _ = pipeline.run(&mut ctx, &mut world, 0);
2158        assert_eq!(ctx.retries, 7);
2159    }
2160
2161    // -- Opaque escape hatch --------------------------------------------------
2162
2163    #[test]
2164    fn ctx_pipeline_opaque_step() {
2165        let mut wb = WorldBuilder::new();
2166        wb.register::<u64>(100);
2167        let mut world = wb.build();
2168        let reg = world.registry();
2169
2170        // Opaque step: FnMut(&mut C, &mut World, In) -> Out
2171        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2172            .then(
2173                |ctx: &mut ReconnectCtx, w: &mut World, x: u32| {
2174                    ctx.retries += 1;
2175                    let scale = *w.resource::<u64>();
2176                    u64::from(x) * scale
2177                },
2178                &reg,
2179            )
2180            .then(
2181                |ctx: &mut ReconnectCtx, val: u64| {
2182                    ctx.last_result = Some(val > 0);
2183                },
2184                &reg,
2185            )
2186            .build();
2187
2188        let mut ctx = ReconnectCtx {
2189            retries: 0,
2190            last_result: None,
2191        };
2192
2193        pipeline.run(&mut ctx, &mut world, 5);
2194        assert_eq!(ctx.retries, 1);
2195        assert_eq!(ctx.last_result, Some(true));
2196    }
2197
2198    #[test]
2199    fn ctx_pipeline_opaque_guard() {
2200        let mut wb = WorldBuilder::new();
2201        wb.register::<u64>(10);
2202        let mut world = wb.build();
2203        let reg = world.registry();
2204
2205        // Opaque ref step used as guard
2206        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2207            .then(|_ctx: &mut ReconnectCtx, x: u32| x, &reg)
2208            .guard(
2209                |_ctx: &mut ReconnectCtx, w: &mut World, x: &u32| {
2210                    let threshold = *w.resource::<u64>();
2211                    u64::from(*x) > threshold
2212                },
2213                &reg,
2214            );
2215
2216        let mut ctx = ReconnectCtx {
2217            retries: 0,
2218            last_result: None,
2219        };
2220
2221        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), None);
2222        assert_eq!(pipeline.run(&mut ctx, &mut world, 20), Some(20));
2223    }
2224
2225    #[test]
2226    fn ctx_pipeline_opaque_producer() {
2227        let mut wb = WorldBuilder::new();
2228        wb.register::<u64>(42);
2229        let mut world = wb.build();
2230        let reg = world.registry();
2231
2232        // Opaque producer: FnMut(&mut C, &mut World) -> Out
2233        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2234            .then(
2235                |_ctx: &mut ReconnectCtx, x: u32| {
2236                    if x > 0 { Some(x) } else { None }
2237                },
2238                &reg,
2239            )
2240            .unwrap_or_else(
2241                |ctx: &mut ReconnectCtx, w: &mut World| {
2242                    ctx.retries += 1;
2243                    *w.resource::<u64>() as u32
2244                },
2245                &reg,
2246            );
2247
2248        let mut ctx = ReconnectCtx {
2249            retries: 0,
2250            last_result: None,
2251        };
2252
2253        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
2254        assert_eq!(ctx.retries, 0);
2255        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 42);
2256        assert_eq!(ctx.retries, 1);
2257    }
2258}