Skip to main content

nexus_rt/
ctx_pipeline.rs

1// Builder return types are necessarily complex — each combinator returns
2// CtxPipelineChain<C, In, Out, NodeType<Chain, ...>>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4
5//! Context-aware pipeline dispatch.
6//!
7//! Mirrors the [`pipeline`](crate::pipeline) module but threads `&mut C`
8//! (per-instance context) through every step. Designed for use inside
9//! [`Callback`](crate::Callback) where each handler instance owns private
10//! state that pipeline steps need to read or mutate.
11//!
12//! # Step function convention
13//!
14//! Context first, then Params, then step input last:
15//!
16//! ```ignore
17//! fn check_retries(ctx: &mut ReconnectCtx, config: Res<Config>, _input: ()) -> Option<()> {
18//!     if ctx.retries < config.max_retries { ctx.retries += 1; Some(()) } else { None }
19//! }
20//! fn attempt_connect(ctx: &mut ReconnectCtx, _input: ()) -> Result<TcpStream, io::Error> {
21//!     TcpStream::connect(&ctx.addr)
22//! }
23//! ```
24//!
25//! # Integration with Callback
26//!
27//! The built [`CtxPipeline`] implements [`CtxStepCall`] — it takes
28//! `&mut C`, `&mut World`, and `In`, returning `Out`.
29//!
30//! To use a pipeline from a [`Handler`](crate::Handler), create a normal
31//! [`Callback`](crate::Callback) whose handler function owns or accesses
32//! the context `C` and calls the pipeline via its `run` method, passing
33//! `&mut C`, `&mut World`, and the handler input. For pipelines that
34//! return a non-unit value, use [`CtxPipelineChain::run`] directly —
35//! `.build()` is only available when `Out = ()` or `Out = Option<()>`.
36//!
37//! # Three-tier step resolution
38//!
39//! Each combinator accepts functions via three tiers, matching the
40//! [`pipeline`](crate::pipeline) module:
41//!
42//! 1. **Named function with Params** — `fn(&mut C, Res<T>, In) -> Out`
43//! 2. **Arity-0 closure** — `FnMut(&mut C, In) -> Out`
44//! 3. **[`Opaque`] closure** — `FnMut(&mut C, &mut World, In) -> Out`
45//!    (raw World access, no Param resolution)
46//!
47//! # Deferred combinators
48//!
49//! The following combinators from [`pipeline`](crate::pipeline) are not yet
50//! implemented: `scan`, `dedup`, `dispatch`, `route`, `tee`, `splat`,
51//! `cloned`, `not`/`and`/`or`/`xor` (bool), `ok_or_else`, `or_else`,
52//! `Result::unwrap_or_else`, and `BatchPipeline`. These can be added when
53//! a concrete use case requires them.
54
55use std::marker::PhantomData;
56
57use crate::handler::{Opaque, Param};
58use crate::world::{Registry, World};
59
60// =============================================================================
61// CtxStep — pre-resolved step with Param state (context-aware)
62// =============================================================================
63
64/// Internal: pre-resolved context-aware step with cached Param state.
65#[doc(hidden)]
66pub struct CtxStep<F, Params: Param> {
67    f: F,
68    state: Params::State,
69    // Retained for future diagnostic/tracing use (step name in error messages).
70    #[allow(dead_code)]
71    name: &'static str,
72}
73
74// =============================================================================
75// CtxStepCall — callable trait for context-aware resolved steps
76// =============================================================================
77
78/// Internal: callable trait for context-aware resolved steps.
79///
80/// Like [`StepCall`](crate::pipeline::StepCall) but with `&mut C` context.
81#[doc(hidden)]
82pub trait CtxStepCall<C, In> {
83    /// The output type of this step.
84    type Out;
85    /// Call this step with context, world, and input.
86    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Self::Out;
87}
88
89// =============================================================================
90// IntoCtxStep — converts a named function into a context-aware step
91// =============================================================================
92
93/// Converts a named function into a pre-resolved context-aware pipeline step.
94///
95/// Three tiers of resolution (same API for all):
96///
97/// | Params | Function shape | Example |
98/// |--------|---------------|---------|
99/// | Named | `fn(&mut C, Res<T>, In) -> Out` | `fn process(ctx: &mut Ctx, cfg: Res<Config>, x: u32) -> u64` |
100/// | `()` | `FnMut(&mut C, In) -> Out` | `\|ctx: &mut Ctx, x: u32\| x * 2` |
101/// | [`Opaque`] | `FnMut(&mut C, &mut World, In) -> Out` | `\|ctx: &mut Ctx, w: &mut World, x: u32\| { ... }` |
102///
103/// # Examples
104///
105/// ```ignore
106/// // Arity 0 — closure works
107/// let step = (|ctx: &mut Ctx, x: u32| { ctx.count += 1; x * 2 }).into_ctx_step(registry);
108///
109/// // Arity 1 — named function required
110/// fn validate(ctx: &mut Ctx, config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
111/// let step = validate.into_ctx_step(registry);
112///
113/// // Opaque — raw World access
114/// let step = (|ctx: &mut Ctx, w: &mut World, x: u32| { ... }).into_ctx_step(registry);
115/// ```
116#[diagnostic::on_unimplemented(
117    message = "this function cannot be used as a context-aware pipeline step",
118    note = "ctx step signature: `fn(&mut C, Params..., In) -> Out` — context first, resources, input last",
119    note = "for raw World access: `fn(&mut C, &mut World, In) -> Out`",
120    note = "closures with resource parameters are not supported — use a named `fn`"
121)]
122pub trait IntoCtxStep<C, In, Out, Params> {
123    /// The concrete resolved step type.
124    type Step: CtxStepCall<C, In, Out = Out>;
125
126    /// Resolve Param state from the registry and produce a step.
127    fn into_ctx_step(self, registry: &Registry) -> Self::Step;
128}
129
130// =============================================================================
131// Arity 0 — fn(&mut C, In) -> Out — closures work
132// =============================================================================
133
134impl<C, In, Out, F: FnMut(&mut C, In) -> Out + 'static> CtxStepCall<C, In> for CtxStep<F, ()> {
135    type Out = Out;
136    #[inline(always)]
137    fn call(&mut self, ctx: &mut C, _world: &mut World, input: In) -> Out {
138        (self.f)(ctx, input)
139    }
140}
141
142impl<C, In, Out, F: FnMut(&mut C, In) -> Out + 'static> IntoCtxStep<C, In, Out, ()> for F {
143    type Step = CtxStep<F, ()>;
144
145    fn into_ctx_step(self, registry: &Registry) -> Self::Step {
146        CtxStep {
147            f: self,
148            state: <() as Param>::init(registry),
149            name: std::any::type_name::<F>(),
150        }
151    }
152}
153
154// =============================================================================
155// Arities 1-8 via macro — HRTB with -> Out
156// =============================================================================
157
158macro_rules! impl_into_ctx_step {
159    ($($P:ident),+) => {
160        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
161            CtxStepCall<C, In> for CtxStep<F, ($($P,)+)>
162        where
163            for<'a> &'a mut F:
164                FnMut(&mut C, $($P,)+ In) -> Out +
165                FnMut(&mut C, $($P::Item<'a>,)+ In) -> Out,
166        {
167            type Out = Out;
168            #[inline(always)]
169            #[allow(non_snake_case)]
170            fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Out {
171                #[allow(clippy::too_many_arguments)]
172                fn call_inner<Ctx, $($P,)+ Input, Output>(
173                    mut f: impl FnMut(&mut Ctx, $($P,)+ Input) -> Output,
174                    ctx: &mut Ctx,
175                    $($P: $P,)+
176                    input: Input,
177                ) -> Output {
178                    f(ctx, $($P,)+ input)
179                }
180
181                // SAFETY: state was produced by Param::init() on the same Registry
182                // that built this World. Borrows are disjoint — enforced by
183                // conflict detection at build time.
184                // SAFETY: state was produced by Param::init() on the same
185                // Registry that built this World. Single-threaded sequential
186                // dispatch ensures no mutable aliasing across params.
187                #[cfg(debug_assertions)]
188                world.clear_borrows();
189                let ($($P,)+) = unsafe {
190                    <($($P,)+) as Param>::fetch(world, &mut self.state)
191                };
192                call_inner(&mut self.f, ctx, $($P,)+ input)
193            }
194        }
195
196        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
197            IntoCtxStep<C, In, Out, ($($P,)+)> for F
198        where
199            for<'a> &'a mut F:
200                FnMut(&mut C, $($P,)+ In) -> Out +
201                FnMut(&mut C, $($P::Item<'a>,)+ In) -> Out,
202        {
203            type Step = CtxStep<F, ($($P,)+)>;
204
205            fn into_ctx_step(self, registry: &Registry) -> Self::Step {
206                let state = <($($P,)+) as Param>::init(registry);
207                {
208                    #[allow(non_snake_case)]
209                    let ($($P,)+) = &state;
210                    registry.check_access(&[
211                        $(
212                            (<$P as Param>::resource_id($P),
213                             std::any::type_name::<$P>()),
214                        )+
215                    ]);
216                }
217                CtxStep { f: self, state, name: std::any::type_name::<F>() }
218            }
219        }
220    };
221}
222
223all_tuples!(impl_into_ctx_step);
224
225// -- Opaque: FnMut(&mut C, &mut World, In) -> Out ----------------------------
226
227/// Internal: wrapper for opaque closures used as context-aware steps.
228///
229/// Unlike [`CtxStep<F, P>`] which stores resolved `Param::State`, this
230/// holds only the function — the closure receives `&mut World` directly.
231#[doc(hidden)]
232pub struct CtxOpaqueStep<F> {
233    f: F,
234    // Retained for future diagnostic/tracing use (step name in error messages).
235    #[allow(dead_code)]
236    name: &'static str,
237}
238
239impl<C, In, Out, F: FnMut(&mut C, &mut World, In) -> Out + 'static> CtxStepCall<C, In>
240    for CtxOpaqueStep<F>
241{
242    type Out = Out;
243    #[inline(always)]
244    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Out {
245        (self.f)(ctx, world, input)
246    }
247}
248
249impl<C, In, Out, F: FnMut(&mut C, &mut World, In) -> Out + 'static> IntoCtxStep<C, In, Out, Opaque>
250    for F
251{
252    type Step = CtxOpaqueStep<F>;
253
254    fn into_ctx_step(self, _registry: &Registry) -> Self::Step {
255        CtxOpaqueStep {
256            f: self,
257            name: std::any::type_name::<F>(),
258        }
259    }
260}
261
262// =============================================================================
263// CtxRefStepCall / IntoCtxRefStep — context-aware step taking &In
264// =============================================================================
265
266/// Internal: callable trait for context-aware steps taking input by reference.
267///
268/// Used by combinators like `tap`, `guard`, `filter` that observe the
269/// value without consuming it.
270#[doc(hidden)]
271pub trait CtxRefStepCall<C, In> {
272    /// The output type of this step.
273    type Out;
274    /// Call this step with context, world, and borrowed input.
275    fn call(&mut self, ctx: &mut C, world: &mut World, input: &In) -> Self::Out;
276}
277
278/// Converts a function into a context-aware step taking input by reference.
279///
280/// Used by combinators like `tap`, `guard`, `filter`, `inspect` that
281/// observe the value without consuming it.
282///
283/// | Params | Function shape | Example |
284/// |--------|---------------|---------|
285/// | Named | `fn(&mut C, Res<T>, &In) -> Out` | `fn check(ctx: &mut Ctx, cfg: Res<Config>, o: &Order) -> bool` |
286/// | `()` | `FnMut(&mut C, &In) -> Out` | `\|ctx: &mut Ctx, o: &Order\| o.qty > 0` |
287/// | [`Opaque`] | `FnMut(&mut C, &mut World, &In) -> Out` | `\|ctx: &mut Ctx, w: &mut World, o: &Order\| { ... }` |
288#[diagnostic::on_unimplemented(
289    message = "this function cannot be used as a context-aware reference step",
290    note = "ctx ref step signature: `fn(&mut C, Params..., &In) -> Out`",
291    note = "for raw World access: `fn(&mut C, &mut World, &In) -> Out`",
292    note = "closures with resource parameters are not supported — use a named `fn`"
293)]
294pub trait IntoCtxRefStep<C, In, Out, Params> {
295    /// The concrete resolved step type.
296    type Step: CtxRefStepCall<C, In, Out = Out>;
297
298    /// Resolve Param state from the registry and produce a step.
299    fn into_ctx_ref_step(self, registry: &Registry) -> Self::Step;
300}
301
302// -- Arity 0: FnMut(&mut C, &In) -> Out — closures work ----------------------
303
304impl<C, In, Out, F: FnMut(&mut C, &In) -> Out + 'static> CtxRefStepCall<C, In> for CtxStep<F, ()> {
305    type Out = Out;
306    #[inline(always)]
307    fn call(&mut self, ctx: &mut C, _world: &mut World, input: &In) -> Out {
308        (self.f)(ctx, input)
309    }
310}
311
312impl<C, In, Out, F: FnMut(&mut C, &In) -> Out + 'static> IntoCtxRefStep<C, In, Out, ()> for F {
313    type Step = CtxStep<F, ()>;
314
315    fn into_ctx_ref_step(self, registry: &Registry) -> Self::Step {
316        CtxStep {
317            f: self,
318            state: <() as Param>::init(registry),
319            name: std::any::type_name::<F>(),
320        }
321    }
322}
323
324// -- Arities 1-8: named functions with Param resolution -----------------------
325
326macro_rules! impl_into_ctx_ref_step {
327    ($($P:ident),+) => {
328        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
329            CtxRefStepCall<C, In> for CtxStep<F, ($($P,)+)>
330        where
331            for<'a> &'a mut F:
332                FnMut(&mut C, $($P,)+ &In) -> Out +
333                FnMut(&mut C, $($P::Item<'a>,)+ &In) -> Out,
334        {
335            type Out = Out;
336            #[inline(always)]
337            #[allow(non_snake_case)]
338            fn call(&mut self, ctx: &mut C, world: &mut World, input: &In) -> Out {
339                #[allow(clippy::too_many_arguments)]
340                fn call_inner<Ctx, $($P,)+ Input: ?Sized, Output>(
341                    mut f: impl FnMut(&mut Ctx, $($P,)+ &Input) -> Output,
342                    ctx: &mut Ctx,
343                    $($P: $P,)+
344                    input: &Input,
345                ) -> Output {
346                    f(ctx, $($P,)+ input)
347                }
348
349                // SAFETY: state was produced by Param::init() on the same
350                // Registry that built this World. Single-threaded sequential
351                // dispatch ensures no mutable aliasing across params.
352                #[cfg(debug_assertions)]
353                world.clear_borrows();
354                let ($($P,)+) = unsafe {
355                    <($($P,)+) as Param>::fetch(world, &mut self.state)
356                };
357                call_inner(&mut self.f, ctx, $($P,)+ input)
358            }
359        }
360
361        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
362            IntoCtxRefStep<C, In, Out, ($($P,)+)> for F
363        where
364            for<'a> &'a mut F:
365                FnMut(&mut C, $($P,)+ &In) -> Out +
366                FnMut(&mut C, $($P::Item<'a>,)+ &In) -> Out,
367        {
368            type Step = CtxStep<F, ($($P,)+)>;
369
370            fn into_ctx_ref_step(self, registry: &Registry) -> Self::Step {
371                let state = <($($P,)+) as Param>::init(registry);
372                {
373                    #[allow(non_snake_case)]
374                    let ($($P,)+) = &state;
375                    registry.check_access(&[
376                        $(
377                            (<$P as Param>::resource_id($P),
378                             std::any::type_name::<$P>()),
379                        )+
380                    ]);
381                }
382                CtxStep { f: self, state, name: std::any::type_name::<F>() }
383            }
384        }
385    };
386}
387
388all_tuples!(impl_into_ctx_ref_step);
389
390// -- Opaque: FnMut(&mut C, &mut World, &In) -> Out ---------------------------
391
392/// Internal: wrapper for opaque closures taking input by reference.
393#[doc(hidden)]
394pub struct CtxOpaqueRefStep<F> {
395    f: F,
396    #[allow(dead_code)]
397    name: &'static str,
398}
399
400impl<C, In, Out, F: FnMut(&mut C, &mut World, &In) -> Out + 'static> CtxRefStepCall<C, In>
401    for CtxOpaqueRefStep<F>
402{
403    type Out = Out;
404    #[inline(always)]
405    fn call(&mut self, ctx: &mut C, world: &mut World, input: &In) -> Out {
406        (self.f)(ctx, world, input)
407    }
408}
409
410impl<C, In, Out, F: FnMut(&mut C, &mut World, &In) -> Out + 'static>
411    IntoCtxRefStep<C, In, Out, Opaque> for F
412{
413    type Step = CtxOpaqueRefStep<F>;
414
415    fn into_ctx_ref_step(self, _registry: &Registry) -> Self::Step {
416        CtxOpaqueRefStep {
417            f: self,
418            name: std::any::type_name::<F>(),
419        }
420    }
421}
422
423// =============================================================================
424// CtxProducerCall / IntoCtxProducer — context-aware producer (no pipeline input)
425// =============================================================================
426
427/// Internal: callable trait for context-aware producers.
428///
429/// Used by combinators like `on_none`, `unwrap_or_else`.
430#[doc(hidden)]
431pub trait CtxProducerCall<C> {
432    /// The output type of this producer.
433    type Out;
434    /// Call this producer with context and world.
435    fn call(&mut self, ctx: &mut C, world: &mut World) -> Self::Out;
436}
437
438/// Converts a function into a context-aware producer step.
439///
440/// Used by combinators like `on_none`, `unwrap_or_else`.
441///
442/// | Params | Function shape | Example |
443/// |--------|---------------|---------|
444/// | Named | `fn(&mut C, Res<T>) -> Out` | `fn default_val(ctx: &mut Ctx, cfg: Res<Config>) -> u64` |
445/// | `()` | `FnMut(&mut C) -> Out` | `\|ctx: &mut Ctx\| ctx.fallback` |
446/// | [`Opaque`] | `FnMut(&mut C, &mut World) -> Out` | `\|ctx: &mut Ctx, w: &mut World\| { ... }` |
447#[diagnostic::on_unimplemented(
448    message = "this function cannot be used as a context-aware producer",
449    note = "ctx producer signature: `fn(&mut C, Params...) -> Out`",
450    note = "for raw World access: `fn(&mut C, &mut World) -> Out`",
451    note = "closures with resource parameters are not supported — use a named `fn`"
452)]
453pub trait IntoCtxProducer<C, Out, Params> {
454    /// The concrete resolved producer type.
455    type Step: CtxProducerCall<C, Out = Out>;
456
457    /// Resolve Param state from the registry and produce a step.
458    fn into_ctx_producer(self, registry: &Registry) -> Self::Step;
459}
460
461// -- Arity 0: FnMut(&mut C) -> Out — closures work ----------------------------
462
463impl<C, Out, F: FnMut(&mut C) -> Out + 'static> CtxProducerCall<C> for CtxStep<F, ()> {
464    type Out = Out;
465    #[inline(always)]
466    fn call(&mut self, ctx: &mut C, _world: &mut World) -> Out {
467        (self.f)(ctx)
468    }
469}
470
471impl<C, Out, F: FnMut(&mut C) -> Out + 'static> IntoCtxProducer<C, Out, ()> for F {
472    type Step = CtxStep<F, ()>;
473
474    fn into_ctx_producer(self, registry: &Registry) -> Self::Step {
475        CtxStep {
476            f: self,
477            state: <() as Param>::init(registry),
478            name: std::any::type_name::<F>(),
479        }
480    }
481}
482
483// -- Arities 1-8: named functions with Param resolution -----------------------
484
485macro_rules! impl_into_ctx_producer {
486    ($($P:ident),+) => {
487        impl<C, Out, F: 'static, $($P: Param + 'static),+>
488            CtxProducerCall<C> for CtxStep<F, ($($P,)+)>
489        where
490            for<'a> &'a mut F:
491                FnMut(&mut C, $($P,)+) -> Out +
492                FnMut(&mut C, $($P::Item<'a>,)+) -> Out,
493        {
494            type Out = Out;
495            #[inline(always)]
496            #[allow(non_snake_case)]
497            fn call(&mut self, ctx: &mut C, world: &mut World) -> Out {
498                #[allow(clippy::too_many_arguments)]
499                fn call_inner<Ctx, $($P,)+ Output>(
500                    mut f: impl FnMut(&mut Ctx, $($P,)+) -> Output,
501                    ctx: &mut Ctx,
502                    $($P: $P,)+
503                ) -> Output {
504                    f(ctx, $($P,)+)
505                }
506
507                // SAFETY: state was produced by Param::init() on the same
508                // Registry that built this World. Single-threaded sequential
509                // dispatch ensures no mutable aliasing across params.
510                #[cfg(debug_assertions)]
511                world.clear_borrows();
512                let ($($P,)+) = unsafe {
513                    <($($P,)+) as Param>::fetch(world, &mut self.state)
514                };
515                call_inner(&mut self.f, ctx, $($P,)+)
516            }
517        }
518
519        impl<C, Out, F: 'static, $($P: Param + 'static),+>
520            IntoCtxProducer<C, Out, ($($P,)+)> for F
521        where
522            for<'a> &'a mut F:
523                FnMut(&mut C, $($P,)+) -> Out +
524                FnMut(&mut C, $($P::Item<'a>,)+) -> Out,
525        {
526            type Step = CtxStep<F, ($($P,)+)>;
527
528            fn into_ctx_producer(self, registry: &Registry) -> Self::Step {
529                let state = <($($P,)+) as Param>::init(registry);
530                {
531                    #[allow(non_snake_case)]
532                    let ($($P,)+) = &state;
533                    registry.check_access(&[
534                        $(
535                            (<$P as Param>::resource_id($P),
536                             std::any::type_name::<$P>()),
537                        )+
538                    ]);
539                }
540                CtxStep { f: self, state, name: std::any::type_name::<F>() }
541            }
542        }
543    };
544}
545
546all_tuples!(impl_into_ctx_producer);
547
548// -- Opaque: FnMut(&mut C, &mut World) -> Out --------------------------------
549
550/// Internal: wrapper for opaque closures used as context-aware producers.
551#[doc(hidden)]
552pub struct CtxOpaqueProducer<F> {
553    f: F,
554    #[allow(dead_code)]
555    name: &'static str,
556}
557
558impl<C, Out, F: FnMut(&mut C, &mut World) -> Out + 'static> CtxProducerCall<C>
559    for CtxOpaqueProducer<F>
560{
561    type Out = Out;
562    #[inline(always)]
563    fn call(&mut self, ctx: &mut C, world: &mut World) -> Out {
564        (self.f)(ctx, world)
565    }
566}
567
568impl<C, Out, F: FnMut(&mut C, &mut World) -> Out + 'static> IntoCtxProducer<C, Out, Opaque> for F {
569    type Step = CtxOpaqueProducer<F>;
570
571    fn into_ctx_producer(self, _registry: &Registry) -> Self::Step {
572        CtxOpaqueProducer {
573            f: self,
574            name: std::any::type_name::<F>(),
575        }
576    }
577}
578
579// =============================================================================
580// CtxChainCall — callable trait for context-aware chain nodes
581// =============================================================================
582
583/// Internal: callable trait for context-aware chain nodes.
584///
585/// Like [`ChainCall`](crate::pipeline::ChainCall) but threads `&mut C`.
586#[doc(hidden)]
587pub trait CtxChainCall<C, In> {
588    /// The output type of this chain node.
589    type Out;
590    /// Execute the chain with context, world, and input.
591    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Self::Out;
592}
593
594// =============================================================================
595// Chain nodes — named types for context-aware pipeline composition
596// =============================================================================
597
598/// Identity passthrough node. Used as the initial chain element.
599#[doc(hidden)]
600pub struct CtxIdentityNode;
601
602impl<C, In> CtxChainCall<C, In> for CtxIdentityNode {
603    type Out = In;
604    #[inline(always)]
605    fn call(&mut self, _ctx: &mut C, _world: &mut World, input: In) -> In {
606        input
607    }
608}
609
610// -- Core (any Out) ----------------------------------------------------------
611
612/// Chain node for `.then()` — transforms output via a context-aware step.
613#[doc(hidden)]
614pub struct CtxThenNode<Prev, S> {
615    pub(crate) prev: Prev,
616    pub(crate) step: S,
617}
618
619impl<C, In, Prev, S> CtxChainCall<C, In> for CtxThenNode<Prev, S>
620where
621    Prev: CtxChainCall<C, In>,
622    S: CtxStepCall<C, Prev::Out>,
623{
624    type Out = S::Out;
625    #[inline(always)]
626    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> S::Out {
627        let mid = self.prev.call(ctx, world, input);
628        self.step.call(ctx, world, mid)
629    }
630}
631
632/// Chain node for `.tap()` — side effect via `&Out`, value passes through.
633#[doc(hidden)]
634pub struct CtxTapNode<Prev, S> {
635    pub(crate) prev: Prev,
636    pub(crate) step: S,
637}
638
639impl<C, In, Prev, S> CtxChainCall<C, In> for CtxTapNode<Prev, S>
640where
641    Prev: CtxChainCall<C, In>,
642    S: CtxRefStepCall<C, Prev::Out, Out = ()>,
643{
644    type Out = Prev::Out;
645    #[inline(always)]
646    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Prev::Out {
647        let val = self.prev.call(ctx, world, input);
648        self.step.call(ctx, world, &val);
649        val
650    }
651}
652
653/// Chain node for `.guard()` — wraps output in `Option` based on predicate.
654#[doc(hidden)]
655pub struct CtxGuardNode<Prev, S> {
656    pub(crate) prev: Prev,
657    pub(crate) step: S,
658}
659
660impl<C, In, Prev, S> CtxChainCall<C, In> for CtxGuardNode<Prev, S>
661where
662    Prev: CtxChainCall<C, In>,
663    S: CtxRefStepCall<C, Prev::Out, Out = bool>,
664{
665    type Out = Option<Prev::Out>;
666    #[inline(always)]
667    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<Prev::Out> {
668        let val = self.prev.call(ctx, world, input);
669        if self.step.call(ctx, world, &val) {
670            Some(val)
671        } else {
672            None
673        }
674    }
675}
676
677// -- Option nodes ------------------------------------------------------------
678
679/// Chain node for `.map()` on `Option<T>`.
680#[doc(hidden)]
681pub struct CtxMapOptionNode<Prev, S> {
682    pub(crate) prev: Prev,
683    pub(crate) step: S,
684}
685
686impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxMapOptionNode<Prev, S>
687where
688    Prev: CtxChainCall<C, In, Out = Option<T>>,
689    S: CtxStepCall<C, T>,
690{
691    type Out = Option<S::Out>;
692    #[inline(always)]
693    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<S::Out> {
694        match self.prev.call(ctx, world, input) {
695            Some(val) => Some(self.step.call(ctx, world, val)),
696            None => None,
697        }
698    }
699}
700
701/// Chain node for `.and_then()` on `Option<T>`.
702#[doc(hidden)]
703pub struct CtxAndThenNode<Prev, S> {
704    pub(crate) prev: Prev,
705    pub(crate) step: S,
706}
707
708impl<C, In, T, U, Prev, S> CtxChainCall<C, In> for CtxAndThenNode<Prev, S>
709where
710    Prev: CtxChainCall<C, In, Out = Option<T>>,
711    S: CtxStepCall<C, T, Out = Option<U>>,
712{
713    type Out = Option<U>;
714    #[inline(always)]
715    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<U> {
716        match self.prev.call(ctx, world, input) {
717            Some(val) => self.step.call(ctx, world, val),
718            None => None,
719        }
720    }
721}
722
723/// Chain node for `.filter()` on `Option<T>`.
724#[doc(hidden)]
725pub struct CtxFilterNode<Prev, S> {
726    pub(crate) prev: Prev,
727    pub(crate) step: S,
728}
729
730impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxFilterNode<Prev, S>
731where
732    Prev: CtxChainCall<C, In, Out = Option<T>>,
733    S: CtxRefStepCall<C, T, Out = bool>,
734{
735    type Out = Option<T>;
736    #[inline(always)]
737    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
738        match self.prev.call(ctx, world, input) {
739            Some(val) if self.step.call(ctx, world, &val) => Some(val),
740            _ => None,
741        }
742    }
743}
744
745/// Chain node for `.inspect()` on `Option<T>`.
746#[doc(hidden)]
747pub struct CtxInspectOptionNode<Prev, S> {
748    pub(crate) prev: Prev,
749    pub(crate) step: S,
750}
751
752impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxInspectOptionNode<Prev, S>
753where
754    Prev: CtxChainCall<C, In, Out = Option<T>>,
755    S: CtxRefStepCall<C, T, Out = ()>,
756{
757    type Out = Option<T>;
758    #[inline(always)]
759    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
760        let opt = self.prev.call(ctx, world, input);
761        if let Some(ref val) = opt {
762            self.step.call(ctx, world, val);
763        }
764        opt
765    }
766}
767
768/// Chain node for `.on_none()` on `Option<T>`.
769#[doc(hidden)]
770pub struct CtxOnNoneNode<Prev, S> {
771    pub(crate) prev: Prev,
772    pub(crate) producer: S,
773}
774
775impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxOnNoneNode<Prev, S>
776where
777    Prev: CtxChainCall<C, In, Out = Option<T>>,
778    S: CtxProducerCall<C, Out = ()>,
779{
780    type Out = Option<T>;
781    #[inline(always)]
782    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
783        let opt = self.prev.call(ctx, world, input);
784        if opt.is_none() {
785            self.producer.call(ctx, world);
786        }
787        opt
788    }
789}
790
791/// Chain node for `.unwrap_or_else()` on `Option<T>`.
792#[doc(hidden)]
793pub struct CtxUnwrapOrElseOptionNode<Prev, S> {
794    pub(crate) prev: Prev,
795    pub(crate) producer: S,
796}
797
798impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxUnwrapOrElseOptionNode<Prev, S>
799where
800    Prev: CtxChainCall<C, In, Out = Option<T>>,
801    S: CtxProducerCall<C, Out = T>,
802{
803    type Out = T;
804    #[inline(always)]
805    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> T {
806        match self.prev.call(ctx, world, input) {
807            Some(val) => val,
808            None => self.producer.call(ctx, world),
809        }
810    }
811}
812
813// -- Result nodes ------------------------------------------------------------
814
815/// Chain node for `.map()` on `Result<T, E>`.
816#[doc(hidden)]
817pub struct CtxMapResultNode<Prev, S> {
818    pub(crate) prev: Prev,
819    pub(crate) step: S,
820}
821
822impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxMapResultNode<Prev, S>
823where
824    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
825    S: CtxStepCall<C, T>,
826{
827    type Out = Result<S::Out, E>;
828    #[inline(always)]
829    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<S::Out, E> {
830        match self.prev.call(ctx, world, input) {
831            Ok(val) => Ok(self.step.call(ctx, world, val)),
832            Err(e) => Err(e),
833        }
834    }
835}
836
837/// Chain node for `.and_then()` on `Result<T, E>`.
838#[doc(hidden)]
839pub struct CtxAndThenResultNode<Prev, S> {
840    pub(crate) prev: Prev,
841    pub(crate) step: S,
842}
843
844impl<C, In, T, U, E, Prev, S> CtxChainCall<C, In> for CtxAndThenResultNode<Prev, S>
845where
846    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
847    S: CtxStepCall<C, T, Out = Result<U, E>>,
848{
849    type Out = Result<U, E>;
850    #[inline(always)]
851    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<U, E> {
852        match self.prev.call(ctx, world, input) {
853            Ok(val) => self.step.call(ctx, world, val),
854            Err(e) => Err(e),
855        }
856    }
857}
858
859/// Chain node for `.catch()` on `Result<T, E>` — handles error, transitions to `Option<T>`.
860#[doc(hidden)]
861pub struct CtxCatchNode<Prev, S> {
862    pub(crate) prev: Prev,
863    pub(crate) step: S,
864}
865
866impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxCatchNode<Prev, S>
867where
868    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
869    S: CtxStepCall<C, E, Out = ()>,
870{
871    type Out = Option<T>;
872    #[inline(always)]
873    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
874        match self.prev.call(ctx, world, input) {
875            Ok(val) => Some(val),
876            Err(e) => {
877                self.step.call(ctx, world, e);
878                None
879            }
880        }
881    }
882}
883
884/// Chain node for `.map_err()` on `Result<T, E>`.
885#[doc(hidden)]
886pub struct CtxMapErrNode<Prev, S> {
887    pub(crate) prev: Prev,
888    pub(crate) step: S,
889}
890
891impl<C, In, T, E, E2, Prev, S> CtxChainCall<C, In> for CtxMapErrNode<Prev, S>
892where
893    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
894    S: CtxStepCall<C, E, Out = E2>,
895{
896    type Out = Result<T, E2>;
897    #[inline(always)]
898    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E2> {
899        match self.prev.call(ctx, world, input) {
900            Ok(val) => Ok(val),
901            Err(e) => Err(self.step.call(ctx, world, e)),
902        }
903    }
904}
905
906/// Chain node for `.inspect_err()` on `Result<T, E>`.
907#[doc(hidden)]
908pub struct CtxInspectErrNode<Prev, S> {
909    pub(crate) prev: Prev,
910    pub(crate) step: S,
911}
912
913impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxInspectErrNode<Prev, S>
914where
915    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
916    S: CtxRefStepCall<C, E, Out = ()>,
917{
918    type Out = Result<T, E>;
919    #[inline(always)]
920    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E> {
921        let result = self.prev.call(ctx, world, input);
922        if let Err(ref e) = result {
923            self.step.call(ctx, world, e);
924        }
925        result
926    }
927}
928
929/// Chain node for `.inspect()` on `Result<T, E>` — side effect on Ok value.
930#[doc(hidden)]
931pub struct CtxInspectResultNode<Prev, S> {
932    pub(crate) prev: Prev,
933    pub(crate) step: S,
934}
935
936impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxInspectResultNode<Prev, S>
937where
938    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
939    S: CtxRefStepCall<C, T, Out = ()>,
940{
941    type Out = Result<T, E>;
942    #[inline(always)]
943    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E> {
944        let result = self.prev.call(ctx, world, input);
945        if let Ok(ref val) = result {
946            self.step.call(ctx, world, val);
947        }
948        result
949    }
950}
951
952/// Chain node for discarding `Option<()>` to `()`.
953#[doc(hidden)]
954pub struct CtxDiscardOptionNode<Prev> {
955    pub(crate) prev: Prev,
956}
957
958impl<C, In, Prev> CtxChainCall<C, In> for CtxDiscardOptionNode<Prev>
959where
960    Prev: CtxChainCall<C, In, Out = Option<()>>,
961{
962    type Out = ();
963    #[inline(always)]
964    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) {
965        let _ = self.prev.call(ctx, world, input);
966    }
967}
968
969// =============================================================================
970// CtxPipelineBuilder — entry point
971// =============================================================================
972
973/// Entry point for building a context-aware pipeline.
974///
975/// Like [`PipelineBuilder`](crate::PipelineBuilder) but every step
976/// receives `&mut C` as the first argument.
977///
978/// # Examples
979///
980/// ```ignore
981/// let pipeline = CtxPipelineBuilder::<ReconnectCtx, ()>::new()
982///     .then(check_retries, &reg)
983///     .then(attempt_connect, &reg)
984///     .build();
985/// ```
986#[must_use = "pipeline builder does nothing until .then() is called"]
987pub struct CtxPipelineBuilder<C, In>(PhantomData<fn(&mut C, In)>);
988
989impl<C, In> CtxPipelineBuilder<C, In> {
990    /// Create a new context-aware pipeline entry point.
991    pub fn new() -> Self {
992        Self(PhantomData)
993    }
994
995    /// Add the first step. Params resolved from the registry.
996    pub fn then<Out, Params, S: IntoCtxStep<C, In, Out, Params>>(
997        self,
998        f: S,
999        registry: &Registry,
1000    ) -> CtxPipelineChain<C, In, Out, CtxThenNode<CtxIdentityNode, S::Step>> {
1001        CtxPipelineChain {
1002            chain: CtxThenNode {
1003                prev: CtxIdentityNode,
1004                step: f.into_ctx_step(registry),
1005            },
1006            _marker: PhantomData,
1007        }
1008    }
1009}
1010
1011impl<C, In> Default for CtxPipelineBuilder<C, In> {
1012    fn default() -> Self {
1013        Self::new()
1014    }
1015}
1016
1017// =============================================================================
1018// CtxPipelineChain — typestate builder
1019// =============================================================================
1020
1021/// Builder that composes context-aware pipeline steps via named chain nodes.
1022///
1023/// `C` is the context type. `In` is the pipeline's input type (fixed).
1024/// `Out` is the current output. `Chain` is the concrete chain type.
1025#[must_use = "pipeline chain does nothing until .build() is called"]
1026pub struct CtxPipelineChain<C, In, Out, Chain> {
1027    chain: Chain,
1028    _marker: PhantomData<fn(&mut C, In) -> Out>,
1029}
1030
1031// =============================================================================
1032// Core — any Out
1033// =============================================================================
1034
1035impl<C, In, Out, Chain: CtxChainCall<C, In, Out = Out>> CtxPipelineChain<C, In, Out, Chain> {
1036    /// Add a step. Params resolved from the registry.
1037    pub fn then<NewOut, Params, S: IntoCtxStep<C, Out, NewOut, Params>>(
1038        self,
1039        f: S,
1040        registry: &Registry,
1041    ) -> CtxPipelineChain<C, In, NewOut, CtxThenNode<Chain, S::Step>> {
1042        CtxPipelineChain {
1043            chain: CtxThenNode {
1044                prev: self.chain,
1045                step: f.into_ctx_step(registry),
1046            },
1047            _marker: PhantomData,
1048        }
1049    }
1050
1051    /// Run the pipeline directly.
1052    pub fn run(&mut self, ctx: &mut C, world: &mut World, input: In) -> Out {
1053        self.chain.call(ctx, world, input)
1054    }
1055
1056    /// Conditionally wrap the output in `Option`. `Some(val)` if
1057    /// the predicate returns true, `None` otherwise.
1058    pub fn guard<Params, S: IntoCtxRefStep<C, Out, bool, Params>>(
1059        self,
1060        f: S,
1061        registry: &Registry,
1062    ) -> CtxPipelineChain<C, In, Option<Out>, CtxGuardNode<Chain, S::Step>> {
1063        CtxPipelineChain {
1064            chain: CtxGuardNode {
1065                prev: self.chain,
1066                step: f.into_ctx_ref_step(registry),
1067            },
1068            _marker: PhantomData,
1069        }
1070    }
1071
1072    /// Observe the current value without consuming or changing it.
1073    pub fn tap<Params, S: IntoCtxRefStep<C, Out, (), Params>>(
1074        self,
1075        f: S,
1076        registry: &Registry,
1077    ) -> CtxPipelineChain<C, In, Out, CtxTapNode<Chain, S::Step>> {
1078        CtxPipelineChain {
1079            chain: CtxTapNode {
1080                prev: self.chain,
1081                step: f.into_ctx_ref_step(registry),
1082            },
1083            _marker: PhantomData,
1084        }
1085    }
1086}
1087
1088// =============================================================================
1089// Option helpers — CtxPipelineChain<C, In, Option<T>, Chain>
1090// =============================================================================
1091
1092impl<C, In, T, Chain: CtxChainCall<C, In, Out = Option<T>>>
1093    CtxPipelineChain<C, In, Option<T>, Chain>
1094{
1095    /// Transform the inner value. Step not called on None.
1096    pub fn map<U, Params, S: IntoCtxStep<C, T, U, Params>>(
1097        self,
1098        f: S,
1099        registry: &Registry,
1100    ) -> CtxPipelineChain<C, In, Option<U>, CtxMapOptionNode<Chain, S::Step>> {
1101        CtxPipelineChain {
1102            chain: CtxMapOptionNode {
1103                prev: self.chain,
1104                step: f.into_ctx_step(registry),
1105            },
1106            _marker: PhantomData,
1107        }
1108    }
1109
1110    /// Short-circuits on None. std: `Option::and_then`
1111    pub fn and_then<U, Params, S: IntoCtxStep<C, T, Option<U>, Params>>(
1112        self,
1113        f: S,
1114        registry: &Registry,
1115    ) -> CtxPipelineChain<C, In, Option<U>, CtxAndThenNode<Chain, S::Step>> {
1116        CtxPipelineChain {
1117            chain: CtxAndThenNode {
1118                prev: self.chain,
1119                step: f.into_ctx_step(registry),
1120            },
1121            _marker: PhantomData,
1122        }
1123    }
1124
1125    /// Keep value if predicate holds. std: `Option::filter`
1126    pub fn filter<Params, S: IntoCtxRefStep<C, T, bool, Params>>(
1127        self,
1128        f: S,
1129        registry: &Registry,
1130    ) -> CtxPipelineChain<C, In, Option<T>, CtxFilterNode<Chain, S::Step>> {
1131        CtxPipelineChain {
1132            chain: CtxFilterNode {
1133                prev: self.chain,
1134                step: f.into_ctx_ref_step(registry),
1135            },
1136            _marker: PhantomData,
1137        }
1138    }
1139
1140    /// Side effect on Some value. std: `Option::inspect`
1141    pub fn inspect<Params, S: IntoCtxRefStep<C, T, (), Params>>(
1142        self,
1143        f: S,
1144        registry: &Registry,
1145    ) -> CtxPipelineChain<C, In, Option<T>, CtxInspectOptionNode<Chain, S::Step>> {
1146        CtxPipelineChain {
1147            chain: CtxInspectOptionNode {
1148                prev: self.chain,
1149                step: f.into_ctx_ref_step(registry),
1150            },
1151            _marker: PhantomData,
1152        }
1153    }
1154
1155    /// Side effect on None. Complement to [`inspect`](Self::inspect).
1156    pub fn on_none<Params, S: IntoCtxProducer<C, (), Params>>(
1157        self,
1158        f: S,
1159        registry: &Registry,
1160    ) -> CtxPipelineChain<C, In, Option<T>, CtxOnNoneNode<Chain, S::Step>> {
1161        CtxPipelineChain {
1162            chain: CtxOnNoneNode {
1163                prev: self.chain,
1164                producer: f.into_ctx_producer(registry),
1165            },
1166            _marker: PhantomData,
1167        }
1168    }
1169
1170    /// Exit Option — None becomes `f()`.
1171    pub fn unwrap_or_else<Params, S: IntoCtxProducer<C, T, Params>>(
1172        self,
1173        f: S,
1174        registry: &Registry,
1175    ) -> CtxPipelineChain<C, In, T, CtxUnwrapOrElseOptionNode<Chain, S::Step>> {
1176        CtxPipelineChain {
1177            chain: CtxUnwrapOrElseOptionNode {
1178                prev: self.chain,
1179                producer: f.into_ctx_producer(registry),
1180            },
1181            _marker: PhantomData,
1182        }
1183    }
1184
1185    /// None becomes Err(err). std: `Option::ok_or`
1186    pub fn ok_or<E: Clone>(
1187        self,
1188        err: E,
1189    ) -> CtxPipelineChain<C, In, Result<T, E>, CtxOkOrNode<Chain, E>> {
1190        CtxPipelineChain {
1191            chain: CtxOkOrNode {
1192                prev: self.chain,
1193                err,
1194            },
1195            _marker: PhantomData,
1196        }
1197    }
1198
1199    /// Exit Option — None becomes the default value.
1200    pub fn unwrap_or(
1201        self,
1202        default: T,
1203    ) -> CtxPipelineChain<C, In, T, CtxUnwrapOrOptionNode<Chain, T>>
1204    where
1205        T: Clone,
1206    {
1207        CtxPipelineChain {
1208            chain: CtxUnwrapOrOptionNode {
1209                prev: self.chain,
1210                default,
1211            },
1212            _marker: PhantomData,
1213        }
1214    }
1215}
1216
1217/// Chain node for `.ok_or()` on `Option<T>`.
1218#[doc(hidden)]
1219pub struct CtxOkOrNode<Prev, E> {
1220    pub(crate) prev: Prev,
1221    pub(crate) err: E,
1222}
1223
1224impl<C, In, T, E: Clone, Prev> CtxChainCall<C, In> for CtxOkOrNode<Prev, E>
1225where
1226    Prev: CtxChainCall<C, In, Out = Option<T>>,
1227{
1228    type Out = Result<T, E>;
1229    #[inline(always)]
1230    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E> {
1231        match self.prev.call(ctx, world, input) {
1232            Some(val) => Ok(val),
1233            None => Err(self.err.clone()),
1234        }
1235    }
1236}
1237
1238/// Chain node for `.unwrap_or()` on `Option<T>`.
1239#[doc(hidden)]
1240pub struct CtxUnwrapOrOptionNode<Prev, T> {
1241    pub(crate) prev: Prev,
1242    pub(crate) default: T,
1243}
1244
1245impl<C, In, T: Clone, Prev> CtxChainCall<C, In> for CtxUnwrapOrOptionNode<Prev, T>
1246where
1247    Prev: CtxChainCall<C, In, Out = Option<T>>,
1248{
1249    type Out = T;
1250    #[inline(always)]
1251    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> T {
1252        match self.prev.call(ctx, world, input) {
1253            Some(val) => val,
1254            None => self.default.clone(),
1255        }
1256    }
1257}
1258
1259// =============================================================================
1260// Result helpers — CtxPipelineChain<C, In, Result<T, E>, Chain>
1261// =============================================================================
1262
1263impl<C, In, T, E, Chain: CtxChainCall<C, In, Out = Result<T, E>>>
1264    CtxPipelineChain<C, In, Result<T, E>, Chain>
1265{
1266    /// Transform the Ok value. Step not called on Err.
1267    pub fn map<U, Params, S: IntoCtxStep<C, T, U, Params>>(
1268        self,
1269        f: S,
1270        registry: &Registry,
1271    ) -> CtxPipelineChain<C, In, Result<U, E>, CtxMapResultNode<Chain, S::Step>> {
1272        CtxPipelineChain {
1273            chain: CtxMapResultNode {
1274                prev: self.chain,
1275                step: f.into_ctx_step(registry),
1276            },
1277            _marker: PhantomData,
1278        }
1279    }
1280
1281    /// Short-circuits on Err. std: `Result::and_then`
1282    pub fn and_then<U, Params, S: IntoCtxStep<C, T, Result<U, E>, Params>>(
1283        self,
1284        f: S,
1285        registry: &Registry,
1286    ) -> CtxPipelineChain<C, In, Result<U, E>, CtxAndThenResultNode<Chain, S::Step>> {
1287        CtxPipelineChain {
1288            chain: CtxAndThenResultNode {
1289                prev: self.chain,
1290                step: f.into_ctx_step(registry),
1291            },
1292            _marker: PhantomData,
1293        }
1294    }
1295
1296    /// Handle error and transition to Option.
1297    pub fn catch<Params, S: IntoCtxStep<C, E, (), Params>>(
1298        self,
1299        f: S,
1300        registry: &Registry,
1301    ) -> CtxPipelineChain<C, In, Option<T>, CtxCatchNode<Chain, S::Step>> {
1302        CtxPipelineChain {
1303            chain: CtxCatchNode {
1304                prev: self.chain,
1305                step: f.into_ctx_step(registry),
1306            },
1307            _marker: PhantomData,
1308        }
1309    }
1310
1311    /// Transform the error. std: `Result::map_err`
1312    pub fn map_err<E2, Params, S: IntoCtxStep<C, E, E2, Params>>(
1313        self,
1314        f: S,
1315        registry: &Registry,
1316    ) -> CtxPipelineChain<C, In, Result<T, E2>, CtxMapErrNode<Chain, S::Step>> {
1317        CtxPipelineChain {
1318            chain: CtxMapErrNode {
1319                prev: self.chain,
1320                step: f.into_ctx_step(registry),
1321            },
1322            _marker: PhantomData,
1323        }
1324    }
1325
1326    /// Side effect on Ok value. std: `Result::inspect` (nightly)
1327    pub fn inspect<Params, S: IntoCtxRefStep<C, T, (), Params>>(
1328        self,
1329        f: S,
1330        registry: &Registry,
1331    ) -> CtxPipelineChain<C, In, Result<T, E>, CtxInspectResultNode<Chain, S::Step>> {
1332        CtxPipelineChain {
1333            chain: CtxInspectResultNode {
1334                prev: self.chain,
1335                step: f.into_ctx_ref_step(registry),
1336            },
1337            _marker: PhantomData,
1338        }
1339    }
1340
1341    /// Side effect on Err value. std: `Result::inspect_err` (nightly)
1342    pub fn inspect_err<Params, S: IntoCtxRefStep<C, E, (), Params>>(
1343        self,
1344        f: S,
1345        registry: &Registry,
1346    ) -> CtxPipelineChain<C, In, Result<T, E>, CtxInspectErrNode<Chain, S::Step>> {
1347        CtxPipelineChain {
1348            chain: CtxInspectErrNode {
1349                prev: self.chain,
1350                step: f.into_ctx_ref_step(registry),
1351            },
1352            _marker: PhantomData,
1353        }
1354    }
1355
1356    /// Convert Result to Option, discarding the error.
1357    pub fn ok(self) -> CtxPipelineChain<C, In, Option<T>, CtxOkNode<Chain>> {
1358        CtxPipelineChain {
1359            chain: CtxOkNode { prev: self.chain },
1360            _marker: PhantomData,
1361        }
1362    }
1363
1364    /// Exit Result — Err becomes the default value.
1365    pub fn unwrap_or(
1366        self,
1367        default: T,
1368    ) -> CtxPipelineChain<C, In, T, CtxUnwrapOrResultNode<Chain, T>>
1369    where
1370        T: Clone,
1371    {
1372        CtxPipelineChain {
1373            chain: CtxUnwrapOrResultNode {
1374                prev: self.chain,
1375                default,
1376            },
1377            _marker: PhantomData,
1378        }
1379    }
1380}
1381
1382/// Chain node for `.ok()` on `Result<T, E>` — discards error.
1383#[doc(hidden)]
1384pub struct CtxOkNode<Prev> {
1385    pub(crate) prev: Prev,
1386}
1387
1388impl<C, In, T, E, Prev> CtxChainCall<C, In> for CtxOkNode<Prev>
1389where
1390    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
1391{
1392    type Out = Option<T>;
1393    #[inline(always)]
1394    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
1395        self.prev.call(ctx, world, input).ok()
1396    }
1397}
1398
1399/// Chain node for `.unwrap_or()` on `Result<T, E>`.
1400#[doc(hidden)]
1401pub struct CtxUnwrapOrResultNode<Prev, T> {
1402    pub(crate) prev: Prev,
1403    pub(crate) default: T,
1404}
1405
1406impl<C, In, T: Clone, E, Prev> CtxChainCall<C, In> for CtxUnwrapOrResultNode<Prev, T>
1407where
1408    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
1409{
1410    type Out = T;
1411    #[inline(always)]
1412    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> T {
1413        match self.prev.call(ctx, world, input) {
1414            Ok(val) => val,
1415            Err(_) => self.default.clone(),
1416        }
1417    }
1418}
1419
1420// =============================================================================
1421// build — when Out is () or Option<()>
1422// =============================================================================
1423
1424impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxPipelineChain<C, In, (), Chain> {
1425    /// Finalize the pipeline into a [`CtxPipeline`].
1426    #[must_use = "building a pipeline without storing it does nothing"]
1427    pub fn build(self) -> CtxPipeline<C, In, Chain> {
1428        CtxPipeline {
1429            chain: self.chain,
1430            _marker: PhantomData,
1431        }
1432    }
1433}
1434
1435impl<C, In, Chain: CtxChainCall<C, In, Out = Option<()>>>
1436    CtxPipelineChain<C, In, Option<()>, Chain>
1437{
1438    /// Finalize the pipeline, discarding `Option<()>`.
1439    #[must_use = "building a pipeline without storing it does nothing"]
1440    pub fn build(self) -> CtxPipeline<C, In, CtxDiscardOptionNode<Chain>> {
1441        CtxPipeline {
1442            chain: CtxDiscardOptionNode { prev: self.chain },
1443            _marker: PhantomData,
1444        }
1445    }
1446}
1447
1448// =============================================================================
1449// CtxPipeline — built context-aware pipeline
1450// =============================================================================
1451
1452/// Built context-aware pipeline.
1453///
1454/// Created by [`CtxPipelineChain::build`]. Implements [`CtxStepCall`]
1455/// for use inside [`Callback`](crate::Callback) dispatch.
1456pub struct CtxPipeline<C, In, Chain> {
1457    chain: Chain,
1458    _marker: PhantomData<fn(&mut C, In)>,
1459}
1460
1461impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxStepCall<C, In> for CtxPipeline<C, In, Chain> {
1462    type Out = ();
1463    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) {
1464        self.chain.call(ctx, world, input);
1465    }
1466}
1467
1468impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxPipeline<C, In, Chain> {
1469    /// Run the pipeline with context, world, and input.
1470    pub fn run(&mut self, ctx: &mut C, world: &mut World, input: In) {
1471        self.chain.call(ctx, world, input);
1472    }
1473}
1474
1475// =============================================================================
1476// Tests
1477// =============================================================================
1478
1479#[cfg(test)]
1480mod tests {
1481    use super::*;
1482    use crate::{Res, ResMut, WorldBuilder};
1483
1484    // -- Helper types ---------------------------------------------------------
1485
1486    struct ReconnectCtx {
1487        retries: u32,
1488        last_result: Option<bool>,
1489    }
1490
1491    // -- Core dispatch --------------------------------------------------------
1492
1493    #[test]
1494    fn ctx_pipeline_three_steps_with_context_mutation() {
1495        let mut wb = WorldBuilder::new();
1496        wb.register::<u64>(10);
1497        let mut world = wb.build();
1498        let reg = world.registry();
1499
1500        // Step 1: check retries (closure, arity 0)
1501        // Step 2: multiply by resource (named fn, arity 1)
1502        // Step 3: record result in context (closure, arity 0)
1503
1504        fn multiply(ctx: &mut ReconnectCtx, factor: Res<u64>, input: u32) -> u64 {
1505            ctx.retries += 1;
1506            *factor * input as u64
1507        }
1508
1509        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1510            .then(
1511                |ctx: &mut ReconnectCtx, x: u32| {
1512                    ctx.retries += 1;
1513                    x
1514                },
1515                &reg,
1516            )
1517            .then(multiply, &reg)
1518            .then(
1519                |ctx: &mut ReconnectCtx, val: u64| {
1520                    ctx.last_result = Some(val > 0);
1521                },
1522                &reg,
1523            )
1524            .build();
1525
1526        let mut ctx = ReconnectCtx {
1527            retries: 0,
1528            last_result: None,
1529        };
1530
1531        pipeline.run(&mut ctx, &mut world, 5);
1532
1533        // Closure incremented once, named fn incremented once
1534        assert_eq!(ctx.retries, 2);
1535        // 10 * 5 = 50, which is > 0
1536        assert_eq!(ctx.last_result, Some(true));
1537    }
1538
1539    #[test]
1540    fn ctx_pipeline_guard_and_map() {
1541        let mut world = WorldBuilder::new().build();
1542        let reg = world.registry();
1543
1544        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1545            .then(|_ctx: &mut ReconnectCtx, x: u32| x, &reg)
1546            .guard(|_ctx: &mut ReconnectCtx, x: &u32| *x > 10, &reg)
1547            .map(
1548                |ctx: &mut ReconnectCtx, x: u32| {
1549                    ctx.retries += 1;
1550                    x * 2
1551                },
1552                &reg,
1553            );
1554
1555        let mut ctx = ReconnectCtx {
1556            retries: 0,
1557            last_result: None,
1558        };
1559
1560        // x = 5, guard fails, map not called
1561        let result = pipeline.run(&mut ctx, &mut world, 5);
1562        assert_eq!(result, None);
1563        assert_eq!(ctx.retries, 0);
1564
1565        // x = 20, guard passes, map called
1566        let result = pipeline.run(&mut ctx, &mut world, 20);
1567        assert_eq!(result, Some(40));
1568        assert_eq!(ctx.retries, 1);
1569    }
1570
1571    #[test]
1572    fn ctx_pipeline_and_then() {
1573        let mut world = WorldBuilder::new().build();
1574        let reg = world.registry();
1575
1576        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1577            .then(|_ctx: &mut ReconnectCtx, x: u32| Some(x), &reg)
1578            .and_then(
1579                |ctx: &mut ReconnectCtx, x: u32| {
1580                    ctx.retries += 1;
1581                    if x > 5 { Some(x * 2) } else { None }
1582                },
1583                &reg,
1584            );
1585
1586        let mut ctx = ReconnectCtx {
1587            retries: 0,
1588            last_result: None,
1589        };
1590
1591        assert_eq!(pipeline.run(&mut ctx, &mut world, 3), None);
1592        assert_eq!(ctx.retries, 1);
1593
1594        assert_eq!(pipeline.run(&mut ctx, &mut world, 10), Some(20));
1595        assert_eq!(ctx.retries, 2);
1596    }
1597
1598    #[test]
1599    fn ctx_pipeline_catch() {
1600        let mut world = WorldBuilder::new().build();
1601        let reg = world.registry();
1602
1603        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1604            .then(
1605                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
1606                    if x > 0 {
1607                        Ok(x)
1608                    } else {
1609                        Err("zero".to_string())
1610                    }
1611                },
1612                &reg,
1613            )
1614            .catch(
1615                |ctx: &mut ReconnectCtx, _err: String| {
1616                    ctx.retries += 1;
1617                },
1618                &reg,
1619            )
1620            .map(
1621                |ctx: &mut ReconnectCtx, val: u32| {
1622                    ctx.last_result = Some(true);
1623                    val
1624                },
1625                &reg,
1626            );
1627
1628        let mut ctx = ReconnectCtx {
1629            retries: 0,
1630            last_result: None,
1631        };
1632
1633        // Error path
1634        let result = pipeline.run(&mut ctx, &mut world, 0);
1635        assert_eq!(result, None);
1636        assert_eq!(ctx.retries, 1);
1637        assert_eq!(ctx.last_result, None);
1638
1639        // Ok path
1640        let result = pipeline.run(&mut ctx, &mut world, 42);
1641        assert_eq!(result, Some(42));
1642        assert_eq!(ctx.retries, 1);
1643        assert_eq!(ctx.last_result, Some(true));
1644    }
1645
1646    #[test]
1647    fn ctx_pipeline_with_res_mut() {
1648        let mut wb = WorldBuilder::new();
1649        wb.register::<u64>(0);
1650        let mut world = wb.build();
1651        let reg = world.registry();
1652
1653        fn accumulate(ctx: &mut ReconnectCtx, mut total: ResMut<u64>, val: u32) {
1654            *total += val as u64;
1655            ctx.retries += 1;
1656        }
1657
1658        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1659            .then(accumulate, &reg)
1660            .build();
1661
1662        let mut ctx = ReconnectCtx {
1663            retries: 0,
1664            last_result: None,
1665        };
1666
1667        pipeline.run(&mut ctx, &mut world, 10);
1668        pipeline.run(&mut ctx, &mut world, 5);
1669
1670        assert_eq!(*world.resource::<u64>(), 15);
1671        assert_eq!(ctx.retries, 2);
1672    }
1673
1674    #[test]
1675    fn ctx_pipeline_build_with_option_unit() {
1676        let mut world = WorldBuilder::new().build();
1677        let reg = world.registry();
1678
1679        // Pipeline that ends with Option<()> — should still build
1680        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1681            .then(
1682                |_ctx: &mut ReconnectCtx, x: u32| -> Option<u32> {
1683                    if x > 0 { Some(x) } else { None }
1684                },
1685                &reg,
1686            )
1687            .map(|_ctx: &mut ReconnectCtx, _x: u32| {}, &reg)
1688            .build();
1689
1690        let mut ctx = ReconnectCtx {
1691            retries: 0,
1692            last_result: None,
1693        };
1694
1695        // Should not panic
1696        pipeline.run(&mut ctx, &mut world, 5);
1697        pipeline.run(&mut ctx, &mut world, 0);
1698    }
1699
1700    #[test]
1701    fn ctx_pipeline_tap() {
1702        let mut world = WorldBuilder::new().build();
1703        let reg = world.registry();
1704
1705        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1706            .then(|_ctx: &mut ReconnectCtx, x: u32| x * 2, &reg)
1707            .tap(
1708                |ctx: &mut ReconnectCtx, val: &u32| {
1709                    ctx.retries = *val;
1710                },
1711                &reg,
1712            )
1713            .then(
1714                |_ctx: &mut ReconnectCtx, x: u32| {
1715                    // Value should pass through tap unchanged
1716                    assert_eq!(x, 10);
1717                },
1718                &reg,
1719            )
1720            .build();
1721
1722        let mut ctx = ReconnectCtx {
1723            retries: 0,
1724            last_result: None,
1725        };
1726
1727        pipeline.run(&mut ctx, &mut world, 5);
1728        assert_eq!(ctx.retries, 10);
1729    }
1730
1731    #[test]
1732    fn ctx_pipeline_result_map_and_map_err() {
1733        let mut world = WorldBuilder::new().build();
1734        let reg = world.registry();
1735
1736        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1737            .then(
1738                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, u32> {
1739                    if x > 0 { Ok(x) } else { Err(x) }
1740                },
1741                &reg,
1742            )
1743            .map(|_ctx: &mut ReconnectCtx, x: u32| x * 10, &reg)
1744            .map_err(
1745                |ctx: &mut ReconnectCtx, e: u32| {
1746                    ctx.retries += 1;
1747                    format!("error: {e}")
1748                },
1749                &reg,
1750            );
1751
1752        let mut ctx = ReconnectCtx {
1753            retries: 0,
1754            last_result: None,
1755        };
1756
1757        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), Ok(50));
1758        assert_eq!(
1759            pipeline.run(&mut ctx, &mut world, 0),
1760            Err("error: 0".to_string())
1761        );
1762        assert_eq!(ctx.retries, 1);
1763    }
1764
1765    #[test]
1766    fn ctx_pipeline_inspect_err() {
1767        let mut world = WorldBuilder::new().build();
1768        let reg = world.registry();
1769
1770        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1771            .then(
1772                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
1773                    if x > 0 { Ok(x) } else { Err("zero".into()) }
1774                },
1775                &reg,
1776            )
1777            .inspect_err(
1778                |ctx: &mut ReconnectCtx, _e: &String| {
1779                    ctx.retries += 1;
1780                },
1781                &reg,
1782            );
1783
1784        let mut ctx = ReconnectCtx {
1785            retries: 0,
1786            last_result: None,
1787        };
1788
1789        let _ = pipeline.run(&mut ctx, &mut world, 0);
1790        assert_eq!(ctx.retries, 1);
1791
1792        // Ok path — inspect_err not called
1793        let _ = pipeline.run(&mut ctx, &mut world, 5);
1794        assert_eq!(ctx.retries, 1);
1795    }
1796
1797    #[test]
1798    fn ctx_pipeline_filter() {
1799        let mut world = WorldBuilder::new().build();
1800        let reg = world.registry();
1801
1802        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1803            .then(|_ctx: &mut ReconnectCtx, x: u32| Some(x), &reg)
1804            .filter(|_ctx: &mut ReconnectCtx, x: &u32| *x > 10, &reg);
1805
1806        let mut ctx = ReconnectCtx {
1807            retries: 0,
1808            last_result: None,
1809        };
1810
1811        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), None);
1812        assert_eq!(pipeline.run(&mut ctx, &mut world, 20), Some(20));
1813    }
1814
1815    #[test]
1816    fn ctx_pipeline_ok_or() {
1817        let mut world = WorldBuilder::new().build();
1818        let reg = world.registry();
1819
1820        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1821            .then(
1822                |_ctx: &mut ReconnectCtx, x: u32| {
1823                    if x > 0 { Some(x) } else { None }
1824                },
1825                &reg,
1826            )
1827            .ok_or("was zero");
1828
1829        let mut ctx = ReconnectCtx {
1830            retries: 0,
1831            last_result: None,
1832        };
1833
1834        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), Ok(5));
1835        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), Err("was zero"));
1836    }
1837
1838    #[test]
1839    fn ctx_pipeline_unwrap_or_option() {
1840        let mut world = WorldBuilder::new().build();
1841        let reg = world.registry();
1842
1843        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1844            .then(
1845                |_ctx: &mut ReconnectCtx, x: u32| {
1846                    if x > 0 { Some(x) } else { None }
1847                },
1848                &reg,
1849            )
1850            .unwrap_or(99);
1851
1852        let mut ctx = ReconnectCtx {
1853            retries: 0,
1854            last_result: None,
1855        };
1856
1857        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
1858        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 99);
1859    }
1860
1861    #[test]
1862    fn ctx_pipeline_unwrap_or_else_option() {
1863        let mut world = WorldBuilder::new().build();
1864        let reg = world.registry();
1865
1866        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1867            .then(
1868                |_ctx: &mut ReconnectCtx, x: u32| {
1869                    if x > 0 { Some(x) } else { None }
1870                },
1871                &reg,
1872            )
1873            .unwrap_or_else(
1874                |ctx: &mut ReconnectCtx| {
1875                    ctx.retries += 1;
1876                    42
1877                },
1878                &reg,
1879            );
1880
1881        let mut ctx = ReconnectCtx {
1882            retries: 0,
1883            last_result: None,
1884        };
1885
1886        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
1887        assert_eq!(ctx.retries, 0);
1888        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 42);
1889        assert_eq!(ctx.retries, 1);
1890    }
1891
1892    #[test]
1893    fn ctx_pipeline_inspect_option() {
1894        let mut world = WorldBuilder::new().build();
1895        let reg = world.registry();
1896
1897        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1898            .then(
1899                |_ctx: &mut ReconnectCtx, x: u32| {
1900                    if x > 0 { Some(x) } else { None }
1901                },
1902                &reg,
1903            )
1904            .inspect(
1905                |ctx: &mut ReconnectCtx, val: &u32| {
1906                    ctx.retries = *val;
1907                },
1908                &reg,
1909            );
1910
1911        let mut ctx = ReconnectCtx {
1912            retries: 0,
1913            last_result: None,
1914        };
1915
1916        // Some path — inspect fires
1917        let _ = pipeline.run(&mut ctx, &mut world, 7);
1918        assert_eq!(ctx.retries, 7);
1919
1920        // None path — inspect skipped
1921        let _ = pipeline.run(&mut ctx, &mut world, 0);
1922        assert_eq!(ctx.retries, 7);
1923    }
1924
1925    #[test]
1926    fn ctx_pipeline_on_none() {
1927        let mut world = WorldBuilder::new().build();
1928        let reg = world.registry();
1929
1930        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1931            .then(
1932                |_ctx: &mut ReconnectCtx, x: u32| {
1933                    if x > 0 { Some(x) } else { None }
1934                },
1935                &reg,
1936            )
1937            .on_none(
1938                |ctx: &mut ReconnectCtx| {
1939                    ctx.retries += 1;
1940                },
1941                &reg,
1942            );
1943
1944        let mut ctx = ReconnectCtx {
1945            retries: 0,
1946            last_result: None,
1947        };
1948
1949        // Some path — on_none not called
1950        let result = pipeline.run(&mut ctx, &mut world, 5);
1951        assert_eq!(result, Some(5));
1952        assert_eq!(ctx.retries, 0);
1953
1954        // None path — on_none called
1955        let result = pipeline.run(&mut ctx, &mut world, 0);
1956        assert_eq!(result, None);
1957        assert_eq!(ctx.retries, 1);
1958    }
1959
1960    #[test]
1961    fn ctx_pipeline_ok_result() {
1962        let mut world = WorldBuilder::new().build();
1963        let reg = world.registry();
1964
1965        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1966            .then(
1967                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
1968                    if x > 0 { Ok(x) } else { Err("zero".into()) }
1969                },
1970                &reg,
1971            )
1972            .ok();
1973
1974        let mut ctx = ReconnectCtx {
1975            retries: 0,
1976            last_result: None,
1977        };
1978
1979        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), Some(5));
1980        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), None);
1981    }
1982
1983    #[test]
1984    fn ctx_pipeline_unwrap_or_result() {
1985        let mut world = WorldBuilder::new().build();
1986        let reg = world.registry();
1987
1988        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1989            .then(
1990                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
1991                    if x > 0 { Ok(x) } else { Err("zero".into()) }
1992                },
1993                &reg,
1994            )
1995            .unwrap_or(99);
1996
1997        let mut ctx = ReconnectCtx {
1998            retries: 0,
1999            last_result: None,
2000        };
2001
2002        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
2003        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 99);
2004    }
2005
2006    #[test]
2007    fn ctx_pipeline_inspect_result() {
2008        let mut world = WorldBuilder::new().build();
2009        let reg = world.registry();
2010
2011        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2012            .then(
2013                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
2014                    if x > 0 { Ok(x) } else { Err("zero".into()) }
2015                },
2016                &reg,
2017            )
2018            .inspect(
2019                |ctx: &mut ReconnectCtx, val: &u32| {
2020                    ctx.retries = *val;
2021                },
2022                &reg,
2023            );
2024
2025        let mut ctx = ReconnectCtx {
2026            retries: 0,
2027            last_result: None,
2028        };
2029
2030        // Ok path — inspect fires
2031        let _ = pipeline.run(&mut ctx, &mut world, 7);
2032        assert_eq!(ctx.retries, 7);
2033
2034        // Err path — inspect skipped
2035        let _ = pipeline.run(&mut ctx, &mut world, 0);
2036        assert_eq!(ctx.retries, 7);
2037    }
2038
2039    // -- Opaque escape hatch --------------------------------------------------
2040
2041    #[test]
2042    fn ctx_pipeline_opaque_step() {
2043        let mut wb = WorldBuilder::new();
2044        wb.register::<u64>(100);
2045        let mut world = wb.build();
2046        let reg = world.registry();
2047
2048        // Opaque step: FnMut(&mut C, &mut World, In) -> Out
2049        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2050            .then(
2051                |ctx: &mut ReconnectCtx, w: &mut World, x: u32| {
2052                    ctx.retries += 1;
2053                    let scale = *w.resource::<u64>();
2054                    u64::from(x) * scale
2055                },
2056                &reg,
2057            )
2058            .then(
2059                |ctx: &mut ReconnectCtx, val: u64| {
2060                    ctx.last_result = Some(val > 0);
2061                },
2062                &reg,
2063            )
2064            .build();
2065
2066        let mut ctx = ReconnectCtx {
2067            retries: 0,
2068            last_result: None,
2069        };
2070
2071        pipeline.run(&mut ctx, &mut world, 5);
2072        assert_eq!(ctx.retries, 1);
2073        assert_eq!(ctx.last_result, Some(true));
2074    }
2075
2076    #[test]
2077    fn ctx_pipeline_opaque_guard() {
2078        let mut wb = WorldBuilder::new();
2079        wb.register::<u64>(10);
2080        let mut world = wb.build();
2081        let reg = world.registry();
2082
2083        // Opaque ref step used as guard
2084        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2085            .then(|_ctx: &mut ReconnectCtx, x: u32| x, &reg)
2086            .guard(
2087                |_ctx: &mut ReconnectCtx, w: &mut World, x: &u32| {
2088                    let threshold = *w.resource::<u64>();
2089                    u64::from(*x) > threshold
2090                },
2091                &reg,
2092            );
2093
2094        let mut ctx = ReconnectCtx {
2095            retries: 0,
2096            last_result: None,
2097        };
2098
2099        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), None);
2100        assert_eq!(pipeline.run(&mut ctx, &mut world, 20), Some(20));
2101    }
2102
2103    #[test]
2104    fn ctx_pipeline_opaque_producer() {
2105        let mut wb = WorldBuilder::new();
2106        wb.register::<u64>(42);
2107        let mut world = wb.build();
2108        let reg = world.registry();
2109
2110        // Opaque producer: FnMut(&mut C, &mut World) -> Out
2111        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2112            .then(
2113                |_ctx: &mut ReconnectCtx, x: u32| {
2114                    if x > 0 { Some(x) } else { None }
2115                },
2116                &reg,
2117            )
2118            .unwrap_or_else(
2119                |ctx: &mut ReconnectCtx, w: &mut World| {
2120                    ctx.retries += 1;
2121                    *w.resource::<u64>() as u32
2122                },
2123                &reg,
2124            );
2125
2126        let mut ctx = ReconnectCtx {
2127            retries: 0,
2128            last_result: None,
2129        };
2130
2131        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
2132        assert_eq!(ctx.retries, 0);
2133        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 42);
2134        assert_eq!(ctx.retries, 1);
2135    }
2136}