Skip to main content

nexus_rt/
ctx_pipeline.rs

1// Builder return types are necessarily complex — each combinator returns
2// CtxPipelineChain<C, In, Out, NodeType<Chain, ...>>. Same pattern as iterator adapters.
3#![allow(clippy::type_complexity)]
4
5//! Context-aware pipeline dispatch.
6//!
7//! Mirrors the [`pipeline`](crate::pipeline) module but threads `&mut C`
8//! (per-instance context) through every step. Designed for use inside
9//! [`Callback`](crate::Callback) where each handler instance owns private
10//! state that pipeline steps need to read or mutate.
11//!
12//! # Step function convention
13//!
14//! Context first, then Params, then step input last:
15//!
16//! ```ignore
17//! fn check_retries(ctx: &mut ReconnectCtx, config: Res<Config>, _input: ()) -> Option<()> {
18//!     if ctx.retries < config.max_retries { ctx.retries += 1; Some(()) } else { None }
19//! }
20//! fn attempt_connect(ctx: &mut ReconnectCtx, _input: ()) -> Result<TcpStream, io::Error> {
21//!     TcpStream::connect(&ctx.addr)
22//! }
23//! ```
24//!
25//! # Integration with Callback
26//!
27//! The built [`CtxPipeline`] implements [`CtxStepCall`] — it takes
28//! `&mut C`, `&mut World`, and `In`, returning `Out`.
29//!
30//! To use a pipeline from a [`Handler`](crate::Handler), create a normal
31//! [`Callback`](crate::Callback) whose handler function owns or accesses
32//! the context `C` and calls the pipeline via its `run` method, passing
33//! `&mut C`, `&mut World`, and the handler input. For pipelines that
34//! return a non-unit value, use [`CtxPipelineChain::run`] directly —
35//! `.build()` is only available when `Out = ()` or `Out = Option<()>`.
36//!
37//! # Three-tier step resolution
38//!
39//! Each combinator accepts functions via three tiers, matching the
40//! [`pipeline`](crate::pipeline) module:
41//!
42//! 1. **Named function with Params** — `fn(&mut C, Res<T>, In) -> Out`
43//! 2. **Arity-0 closure** — `FnMut(&mut C, In) -> Out`
44//! 3. **[`Opaque`] closure** — `FnMut(&mut C, &mut World, In) -> Out`
45//!    (raw World access, no Param resolution)
46//!
47//! # Deferred combinators
48//!
49//! The following combinators from [`pipeline`](crate::pipeline) are not yet
50//! implemented: `scan`, `dedup`, `dispatch`, `route`, `tee`, `splat`,
51//! `cloned`, `not`/`and`/`or`/`xor` (bool), `ok_or_else`, `or_else`,
52//! `Result::unwrap_or_else`, and `BatchPipeline`. These can be added when
53//! a concrete use case requires them.
54
55use std::marker::PhantomData;
56
57use crate::handler::{Opaque, Param};
58use crate::world::{Registry, World};
59
60// =============================================================================
61// CtxStep — pre-resolved step with Param state (context-aware)
62// =============================================================================
63
64/// Internal: pre-resolved context-aware step with cached Param state.
65#[doc(hidden)]
66pub struct CtxStep<F, Params: Param> {
67    f: F,
68    state: Params::State,
69    // Retained for future diagnostic/tracing use (step name in error messages).
70    #[allow(dead_code)]
71    name: &'static str,
72}
73
74// =============================================================================
75// CtxStepCall — callable trait for context-aware resolved steps
76// =============================================================================
77
78/// Internal: callable trait for context-aware resolved steps.
79///
80/// Like [`StepCall`](crate::pipeline::StepCall) but with `&mut C` context.
81#[doc(hidden)]
82pub trait CtxStepCall<C, In> {
83    /// The output type of this step.
84    type Out;
85    /// Call this step with context, world, and input.
86    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Self::Out;
87}
88
89// =============================================================================
90// IntoCtxStep — converts a named function into a context-aware step
91// =============================================================================
92
93/// Converts a named function into a pre-resolved context-aware pipeline step.
94///
95/// Three tiers of resolution (same API for all):
96///
97/// | Params | Function shape | Example |
98/// |--------|---------------|---------|
99/// | Named | `fn(&mut C, Res<T>, In) -> Out` | `fn process(ctx: &mut Ctx, cfg: Res<Config>, x: u32) -> u64` |
100/// | `()` | `FnMut(&mut C, In) -> Out` | `\|ctx: &mut Ctx, x: u32\| x * 2` |
101/// | [`Opaque`] | `FnMut(&mut C, &mut World, In) -> Out` | `\|ctx: &mut Ctx, w: &mut World, x: u32\| { ... }` |
102///
103/// # Examples
104///
105/// ```ignore
106/// // Arity 0 — closure works
107/// let step = (|ctx: &mut Ctx, x: u32| { ctx.count += 1; x * 2 }).into_ctx_step(registry);
108///
109/// // Arity 1 — named function required
110/// fn validate(ctx: &mut Ctx, config: Res<Config>, order: Order) -> Option<ValidOrder> { .. }
111/// let step = validate.into_ctx_step(registry);
112///
113/// // Opaque — raw World access
114/// let step = (|ctx: &mut Ctx, w: &mut World, x: u32| { ... }).into_ctx_step(registry);
115/// ```
116#[diagnostic::on_unimplemented(
117    message = "this function cannot be used as a context-aware pipeline step",
118    note = "ctx step signature: `fn(&mut C, Params..., In) -> Out` — context first, resources, input last",
119    note = "for raw World access: `fn(&mut C, &mut World, In) -> Out`",
120    note = "closures with resource parameters are not supported — use a named `fn`"
121)]
122pub trait IntoCtxStep<C, In, Out, Params> {
123    /// The concrete resolved step type.
124    type Step: CtxStepCall<C, In, Out = Out>;
125
126    /// Resolve Param state from the registry and produce a step.
127    fn into_ctx_step(self, registry: &Registry) -> Self::Step;
128}
129
130// =============================================================================
131// Arity 0 — fn(&mut C, In) -> Out — closures work
132// =============================================================================
133
134impl<C, In, Out, F: FnMut(&mut C, In) -> Out + 'static> CtxStepCall<C, In> for CtxStep<F, ()> {
135    type Out = Out;
136    #[inline(always)]
137    fn call(&mut self, ctx: &mut C, _world: &mut World, input: In) -> Out {
138        (self.f)(ctx, input)
139    }
140}
141
142impl<C, In, Out, F: FnMut(&mut C, In) -> Out + 'static> IntoCtxStep<C, In, Out, ()> for F {
143    type Step = CtxStep<F, ()>;
144
145    fn into_ctx_step(self, registry: &Registry) -> Self::Step {
146        CtxStep {
147            f: self,
148            state: <() as Param>::init(registry),
149            name: std::any::type_name::<F>(),
150        }
151    }
152}
153
154// =============================================================================
155// Arities 1-8 via macro — HRTB with -> Out
156// =============================================================================
157
158macro_rules! impl_into_ctx_step {
159    ($($P:ident),+) => {
160        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
161            CtxStepCall<C, In> for CtxStep<F, ($($P,)+)>
162        where
163            for<'a> &'a mut F:
164                FnMut(&mut C, $($P,)+ In) -> Out +
165                FnMut(&mut C, $($P::Item<'a>,)+ In) -> Out,
166        {
167            type Out = Out;
168            #[inline(always)]
169            #[allow(non_snake_case)]
170            fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Out {
171                #[allow(clippy::too_many_arguments)]
172                fn call_inner<Ctx, $($P,)+ Input, Output>(
173                    mut f: impl FnMut(&mut Ctx, $($P,)+ Input) -> Output,
174                    ctx: &mut Ctx,
175                    $($P: $P,)+
176                    input: Input,
177                ) -> Output {
178                    f(ctx, $($P,)+ input)
179                }
180
181                // SAFETY: state was produced by Param::init() on the same Registry
182                // that built this World. Borrows are disjoint — enforced by
183                // conflict detection at build time.
184                // SAFETY: state was produced by Param::init() on the same
185                // Registry that built this World. Single-threaded sequential
186                // dispatch ensures no mutable aliasing across params.
187                #[cfg(debug_assertions)]
188                world.clear_borrows();
189                let ($($P,)+) = unsafe {
190                    <($($P,)+) as Param>::fetch(world, &mut self.state)
191                };
192                call_inner(&mut self.f, ctx, $($P,)+ input)
193            }
194        }
195
196        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
197            IntoCtxStep<C, In, Out, ($($P,)+)> for F
198        where
199            for<'a> &'a mut F:
200                FnMut(&mut C, $($P,)+ In) -> Out +
201                FnMut(&mut C, $($P::Item<'a>,)+ In) -> Out,
202        {
203            type Step = CtxStep<F, ($($P,)+)>;
204
205            fn into_ctx_step(self, registry: &Registry) -> Self::Step {
206                let state = <($($P,)+) as Param>::init(registry);
207                {
208                    #[allow(non_snake_case)]
209                    let ($($P,)+) = &state;
210                    registry.check_access(&[
211                        $(
212                            (<$P as Param>::resource_id($P),
213                             std::any::type_name::<$P>()),
214                        )+
215                    ]);
216                }
217                CtxStep { f: self, state, name: std::any::type_name::<F>() }
218            }
219        }
220    };
221}
222
223all_tuples!(impl_into_ctx_step);
224
225// -- Opaque: FnMut(&mut C, &mut World, In) -> Out ----------------------------
226
227/// Internal: wrapper for opaque closures used as context-aware steps.
228///
229/// Unlike [`CtxStep<F, P>`] which stores resolved `Param::State`, this
230/// holds only the function — the closure receives `&mut World` directly.
231#[doc(hidden)]
232pub struct CtxOpaqueStep<F> {
233    f: F,
234    // Retained for future diagnostic/tracing use (step name in error messages).
235    #[allow(dead_code)]
236    name: &'static str,
237}
238
239impl<C, In, Out, F: FnMut(&mut C, &mut World, In) -> Out + 'static> CtxStepCall<C, In>
240    for CtxOpaqueStep<F>
241{
242    type Out = Out;
243    #[inline(always)]
244    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Out {
245        (self.f)(ctx, world, input)
246    }
247}
248
249impl<C, In, Out, F: FnMut(&mut C, &mut World, In) -> Out + 'static> IntoCtxStep<C, In, Out, Opaque>
250    for F
251{
252    type Step = CtxOpaqueStep<F>;
253
254    fn into_ctx_step(self, _registry: &Registry) -> Self::Step {
255        CtxOpaqueStep {
256            f: self,
257            name: std::any::type_name::<F>(),
258        }
259    }
260}
261
262// =============================================================================
263// CtxRefStepCall / IntoCtxRefStep — context-aware step taking &In
264// =============================================================================
265
266/// Internal: callable trait for context-aware steps taking input by reference.
267///
268/// Used by combinators like `tap`, `guard`, `filter` that observe the
269/// value without consuming it.
270#[doc(hidden)]
271pub trait CtxRefStepCall<C, In> {
272    /// The output type of this step.
273    type Out;
274    /// Call this step with context, world, and borrowed input.
275    fn call(&mut self, ctx: &mut C, world: &mut World, input: &In) -> Self::Out;
276}
277
278/// Converts a function into a context-aware step taking input by reference.
279///
280/// Used by combinators like `tap`, `guard`, `filter`, `inspect` that
281/// observe the value without consuming it.
282///
283/// | Params | Function shape | Example |
284/// |--------|---------------|---------|
285/// | Named | `fn(&mut C, Res<T>, &In) -> Out` | `fn check(ctx: &mut Ctx, cfg: Res<Config>, o: &Order) -> bool` |
286/// | `()` | `FnMut(&mut C, &In) -> Out` | `\|ctx: &mut Ctx, o: &Order\| o.qty > 0` |
287/// | [`Opaque`] | `FnMut(&mut C, &mut World, &In) -> Out` | `\|ctx: &mut Ctx, w: &mut World, o: &Order\| { ... }` |
288#[diagnostic::on_unimplemented(
289    message = "this function cannot be used as a context-aware reference step",
290    note = "ctx ref step signature: `fn(&mut C, Params..., &In) -> Out`",
291    note = "for raw World access: `fn(&mut C, &mut World, &In) -> Out`",
292    note = "closures with resource parameters are not supported — use a named `fn`"
293)]
294pub trait IntoCtxRefStep<C, In, Out, Params> {
295    /// The concrete resolved step type.
296    type Step: CtxRefStepCall<C, In, Out = Out>;
297
298    /// Resolve Param state from the registry and produce a step.
299    fn into_ctx_ref_step(self, registry: &Registry) -> Self::Step;
300}
301
302// -- Arity 0: FnMut(&mut C, &In) -> Out — closures work ----------------------
303
304impl<C, In, Out, F: FnMut(&mut C, &In) -> Out + 'static> CtxRefStepCall<C, In> for CtxStep<F, ()> {
305    type Out = Out;
306    #[inline(always)]
307    fn call(&mut self, ctx: &mut C, _world: &mut World, input: &In) -> Out {
308        (self.f)(ctx, input)
309    }
310}
311
312impl<C, In, Out, F: FnMut(&mut C, &In) -> Out + 'static> IntoCtxRefStep<C, In, Out, ()> for F {
313    type Step = CtxStep<F, ()>;
314
315    fn into_ctx_ref_step(self, registry: &Registry) -> Self::Step {
316        CtxStep {
317            f: self,
318            state: <() as Param>::init(registry),
319            name: std::any::type_name::<F>(),
320        }
321    }
322}
323
324// -- Arities 1-8: named functions with Param resolution -----------------------
325
326macro_rules! impl_into_ctx_ref_step {
327    ($($P:ident),+) => {
328        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
329            CtxRefStepCall<C, In> for CtxStep<F, ($($P,)+)>
330        where
331            for<'a> &'a mut F:
332                FnMut(&mut C, $($P,)+ &In) -> Out +
333                FnMut(&mut C, $($P::Item<'a>,)+ &In) -> Out,
334        {
335            type Out = Out;
336            #[inline(always)]
337            #[allow(non_snake_case)]
338            fn call(&mut self, ctx: &mut C, world: &mut World, input: &In) -> Out {
339                #[allow(clippy::too_many_arguments)]
340                fn call_inner<Ctx, $($P,)+ Input: ?Sized, Output>(
341                    mut f: impl FnMut(&mut Ctx, $($P,)+ &Input) -> Output,
342                    ctx: &mut Ctx,
343                    $($P: $P,)+
344                    input: &Input,
345                ) -> Output {
346                    f(ctx, $($P,)+ input)
347                }
348
349                // SAFETY: state was produced by Param::init() on the same
350                // Registry that built this World. Single-threaded sequential
351                // dispatch ensures no mutable aliasing across params.
352                #[cfg(debug_assertions)]
353                world.clear_borrows();
354                let ($($P,)+) = unsafe {
355                    <($($P,)+) as Param>::fetch(world, &mut self.state)
356                };
357                call_inner(&mut self.f, ctx, $($P,)+ input)
358            }
359        }
360
361        impl<C, In, Out, F: 'static, $($P: Param + 'static),+>
362            IntoCtxRefStep<C, In, Out, ($($P,)+)> for F
363        where
364            for<'a> &'a mut F:
365                FnMut(&mut C, $($P,)+ &In) -> Out +
366                FnMut(&mut C, $($P::Item<'a>,)+ &In) -> Out,
367        {
368            type Step = CtxStep<F, ($($P,)+)>;
369
370            fn into_ctx_ref_step(self, registry: &Registry) -> Self::Step {
371                let state = <($($P,)+) as Param>::init(registry);
372                {
373                    #[allow(non_snake_case)]
374                    let ($($P,)+) = &state;
375                    registry.check_access(&[
376                        $(
377                            (<$P as Param>::resource_id($P),
378                             std::any::type_name::<$P>()),
379                        )+
380                    ]);
381                }
382                CtxStep { f: self, state, name: std::any::type_name::<F>() }
383            }
384        }
385    };
386}
387
388all_tuples!(impl_into_ctx_ref_step);
389
390// -- Opaque: FnMut(&mut C, &mut World, &In) -> Out ---------------------------
391
392/// Internal: wrapper for opaque closures taking input by reference.
393#[doc(hidden)]
394pub struct CtxOpaqueRefStep<F> {
395    f: F,
396    #[allow(dead_code)]
397    name: &'static str,
398}
399
400impl<C, In, Out, F: FnMut(&mut C, &mut World, &In) -> Out + 'static> CtxRefStepCall<C, In>
401    for CtxOpaqueRefStep<F>
402{
403    type Out = Out;
404    #[inline(always)]
405    fn call(&mut self, ctx: &mut C, world: &mut World, input: &In) -> Out {
406        (self.f)(ctx, world, input)
407    }
408}
409
410impl<C, In, Out, F: FnMut(&mut C, &mut World, &In) -> Out + 'static>
411    IntoCtxRefStep<C, In, Out, Opaque> for F
412{
413    type Step = CtxOpaqueRefStep<F>;
414
415    fn into_ctx_ref_step(self, _registry: &Registry) -> Self::Step {
416        CtxOpaqueRefStep {
417            f: self,
418            name: std::any::type_name::<F>(),
419        }
420    }
421}
422
423// =============================================================================
424// CtxProducerCall / IntoCtxProducer — context-aware producer (no pipeline input)
425// =============================================================================
426
427/// Internal: callable trait for context-aware producers.
428///
429/// Used by combinators like `on_none`, `unwrap_or_else`.
430#[doc(hidden)]
431pub trait CtxProducerCall<C> {
432    /// The output type of this producer.
433    type Out;
434    /// Call this producer with context and world.
435    fn call(&mut self, ctx: &mut C, world: &mut World) -> Self::Out;
436}
437
438/// Converts a function into a context-aware producer step.
439///
440/// Used by combinators like `on_none`, `unwrap_or_else`.
441///
442/// | Params | Function shape | Example |
443/// |--------|---------------|---------|
444/// | Named | `fn(&mut C, Res<T>) -> Out` | `fn default_val(ctx: &mut Ctx, cfg: Res<Config>) -> u64` |
445/// | `()` | `FnMut(&mut C) -> Out` | `\|ctx: &mut Ctx\| ctx.fallback` |
446/// | [`Opaque`] | `FnMut(&mut C, &mut World) -> Out` | `\|ctx: &mut Ctx, w: &mut World\| { ... }` |
447#[diagnostic::on_unimplemented(
448    message = "this function cannot be used as a context-aware producer",
449    note = "ctx producer signature: `fn(&mut C, Params...) -> Out`",
450    note = "for raw World access: `fn(&mut C, &mut World) -> Out`",
451    note = "closures with resource parameters are not supported — use a named `fn`"
452)]
453pub trait IntoCtxProducer<C, Out, Params> {
454    /// The concrete resolved producer type.
455    type Step: CtxProducerCall<C, Out = Out>;
456
457    /// Resolve Param state from the registry and produce a step.
458    fn into_ctx_producer(self, registry: &Registry) -> Self::Step;
459}
460
461// -- Arity 0: FnMut(&mut C) -> Out — closures work ----------------------------
462
463impl<C, Out, F: FnMut(&mut C) -> Out + 'static> CtxProducerCall<C> for CtxStep<F, ()> {
464    type Out = Out;
465    #[inline(always)]
466    fn call(&mut self, ctx: &mut C, _world: &mut World) -> Out {
467        (self.f)(ctx)
468    }
469}
470
471impl<C, Out, F: FnMut(&mut C) -> Out + 'static> IntoCtxProducer<C, Out, ()> for F {
472    type Step = CtxStep<F, ()>;
473
474    fn into_ctx_producer(self, registry: &Registry) -> Self::Step {
475        CtxStep {
476            f: self,
477            state: <() as Param>::init(registry),
478            name: std::any::type_name::<F>(),
479        }
480    }
481}
482
483// -- Arities 1-8: named functions with Param resolution -----------------------
484
485macro_rules! impl_into_ctx_producer {
486    ($($P:ident),+) => {
487        impl<C, Out, F: 'static, $($P: Param + 'static),+>
488            CtxProducerCall<C> for CtxStep<F, ($($P,)+)>
489        where
490            for<'a> &'a mut F:
491                FnMut(&mut C, $($P,)+) -> Out +
492                FnMut(&mut C, $($P::Item<'a>,)+) -> Out,
493        {
494            type Out = Out;
495            #[inline(always)]
496            #[allow(non_snake_case)]
497            fn call(&mut self, ctx: &mut C, world: &mut World) -> Out {
498                #[allow(clippy::too_many_arguments)]
499                fn call_inner<Ctx, $($P,)+ Output>(
500                    mut f: impl FnMut(&mut Ctx, $($P,)+) -> Output,
501                    ctx: &mut Ctx,
502                    $($P: $P,)+
503                ) -> Output {
504                    f(ctx, $($P,)+)
505                }
506
507                // SAFETY: state was produced by Param::init() on the same
508                // Registry that built this World. Single-threaded sequential
509                // dispatch ensures no mutable aliasing across params.
510                #[cfg(debug_assertions)]
511                world.clear_borrows();
512                let ($($P,)+) = unsafe {
513                    <($($P,)+) as Param>::fetch(world, &mut self.state)
514                };
515                call_inner(&mut self.f, ctx, $($P,)+)
516            }
517        }
518
519        impl<C, Out, F: 'static, $($P: Param + 'static),+>
520            IntoCtxProducer<C, Out, ($($P,)+)> for F
521        where
522            for<'a> &'a mut F:
523                FnMut(&mut C, $($P,)+) -> Out +
524                FnMut(&mut C, $($P::Item<'a>,)+) -> Out,
525        {
526            type Step = CtxStep<F, ($($P,)+)>;
527
528            fn into_ctx_producer(self, registry: &Registry) -> Self::Step {
529                let state = <($($P,)+) as Param>::init(registry);
530                {
531                    #[allow(non_snake_case)]
532                    let ($($P,)+) = &state;
533                    registry.check_access(&[
534                        $(
535                            (<$P as Param>::resource_id($P),
536                             std::any::type_name::<$P>()),
537                        )+
538                    ]);
539                }
540                CtxStep { f: self, state, name: std::any::type_name::<F>() }
541            }
542        }
543    };
544}
545
546all_tuples!(impl_into_ctx_producer);
547
548// -- Opaque: FnMut(&mut C, &mut World) -> Out --------------------------------
549
550/// Internal: wrapper for opaque closures used as context-aware producers.
551#[doc(hidden)]
552pub struct CtxOpaqueProducer<F> {
553    f: F,
554    #[allow(dead_code)]
555    name: &'static str,
556}
557
558impl<C, Out, F: FnMut(&mut C, &mut World) -> Out + 'static> CtxProducerCall<C>
559    for CtxOpaqueProducer<F>
560{
561    type Out = Out;
562    #[inline(always)]
563    fn call(&mut self, ctx: &mut C, world: &mut World) -> Out {
564        (self.f)(ctx, world)
565    }
566}
567
568impl<C, Out, F: FnMut(&mut C, &mut World) -> Out + 'static> IntoCtxProducer<C, Out, Opaque> for F {
569    type Step = CtxOpaqueProducer<F>;
570
571    fn into_ctx_producer(self, _registry: &Registry) -> Self::Step {
572        CtxOpaqueProducer {
573            f: self,
574            name: std::any::type_name::<F>(),
575        }
576    }
577}
578
579// =============================================================================
580// CtxChainCall — callable trait for context-aware chain nodes
581// =============================================================================
582
583/// Internal: callable trait for context-aware chain nodes.
584///
585/// Like [`ChainCall`](crate::pipeline::ChainCall) but threads `&mut C`.
586#[doc(hidden)]
587pub trait CtxChainCall<C, In> {
588    /// The output type of this chain node.
589    type Out;
590    /// Execute the chain with context, world, and input.
591    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Self::Out;
592}
593
594// =============================================================================
595// Chain nodes — named types for context-aware pipeline composition
596// =============================================================================
597
598/// Identity passthrough node. Used as the initial chain element.
599#[doc(hidden)]
600pub struct CtxIdentityNode;
601
602impl<C, In> CtxChainCall<C, In> for CtxIdentityNode {
603    type Out = In;
604    #[inline(always)]
605    fn call(&mut self, _ctx: &mut C, _world: &mut World, input: In) -> In {
606        input
607    }
608}
609
610// -- Core (any Out) ----------------------------------------------------------
611
612/// Chain node for `.then()` — transforms output via a context-aware step.
613#[doc(hidden)]
614pub struct CtxThenNode<Prev, S> {
615    pub(crate) prev: Prev,
616    pub(crate) step: S,
617}
618
619impl<C, In, Prev, S> CtxChainCall<C, In> for CtxThenNode<Prev, S>
620where
621    Prev: CtxChainCall<C, In>,
622    S: CtxStepCall<C, Prev::Out>,
623{
624    type Out = S::Out;
625    #[inline(always)]
626    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> S::Out {
627        let mid = self.prev.call(ctx, world, input);
628        self.step.call(ctx, world, mid)
629    }
630}
631
632/// Chain node for `.tap()` — side effect via `&Out`, value passes through.
633#[doc(hidden)]
634pub struct CtxTapNode<Prev, S> {
635    pub(crate) prev: Prev,
636    pub(crate) step: S,
637}
638
639impl<C, In, Prev, S> CtxChainCall<C, In> for CtxTapNode<Prev, S>
640where
641    Prev: CtxChainCall<C, In>,
642    S: CtxRefStepCall<C, Prev::Out, Out = ()>,
643{
644    type Out = Prev::Out;
645    #[inline(always)]
646    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Prev::Out {
647        let val = self.prev.call(ctx, world, input);
648        self.step.call(ctx, world, &val);
649        val
650    }
651}
652
653/// Chain node for `.guard()` — wraps output in `Option` based on predicate.
654#[doc(hidden)]
655pub struct CtxGuardNode<Prev, S> {
656    pub(crate) prev: Prev,
657    pub(crate) step: S,
658}
659
660impl<C, In, Prev, S> CtxChainCall<C, In> for CtxGuardNode<Prev, S>
661where
662    Prev: CtxChainCall<C, In>,
663    S: CtxRefStepCall<C, Prev::Out, Out = bool>,
664{
665    type Out = Option<Prev::Out>;
666    #[inline(always)]
667    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<Prev::Out> {
668        let val = self.prev.call(ctx, world, input);
669        if self.step.call(ctx, world, &val) {
670            Some(val)
671        } else {
672            None
673        }
674    }
675}
676
677// -- Option nodes ------------------------------------------------------------
678
679/// Chain node for `.map()` on `Option<T>`.
680#[doc(hidden)]
681pub struct CtxMapOptionNode<Prev, S> {
682    pub(crate) prev: Prev,
683    pub(crate) step: S,
684}
685
686impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxMapOptionNode<Prev, S>
687where
688    Prev: CtxChainCall<C, In, Out = Option<T>>,
689    S: CtxStepCall<C, T>,
690{
691    type Out = Option<S::Out>;
692    #[inline(always)]
693    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<S::Out> {
694        match self.prev.call(ctx, world, input) {
695            Some(val) => Some(self.step.call(ctx, world, val)),
696            None => None,
697        }
698    }
699}
700
701/// Chain node for `.and_then()` on `Option<T>`.
702#[doc(hidden)]
703pub struct CtxAndThenNode<Prev, S> {
704    pub(crate) prev: Prev,
705    pub(crate) step: S,
706}
707
708impl<C, In, T, U, Prev, S> CtxChainCall<C, In> for CtxAndThenNode<Prev, S>
709where
710    Prev: CtxChainCall<C, In, Out = Option<T>>,
711    S: CtxStepCall<C, T, Out = Option<U>>,
712{
713    type Out = Option<U>;
714    #[inline(always)]
715    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<U> {
716        match self.prev.call(ctx, world, input) {
717            Some(val) => self.step.call(ctx, world, val),
718            None => None,
719        }
720    }
721}
722
723/// Chain node for `.filter()` on `Option<T>`.
724#[doc(hidden)]
725pub struct CtxFilterNode<Prev, S> {
726    pub(crate) prev: Prev,
727    pub(crate) step: S,
728}
729
730impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxFilterNode<Prev, S>
731where
732    Prev: CtxChainCall<C, In, Out = Option<T>>,
733    S: CtxRefStepCall<C, T, Out = bool>,
734{
735    type Out = Option<T>;
736    #[inline(always)]
737    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
738        match self.prev.call(ctx, world, input) {
739            Some(val) if self.step.call(ctx, world, &val) => Some(val),
740            _ => None,
741        }
742    }
743}
744
745/// Chain node for `.inspect()` on `Option<T>`.
746#[doc(hidden)]
747pub struct CtxInspectOptionNode<Prev, S> {
748    pub(crate) prev: Prev,
749    pub(crate) step: S,
750}
751
752impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxInspectOptionNode<Prev, S>
753where
754    Prev: CtxChainCall<C, In, Out = Option<T>>,
755    S: CtxRefStepCall<C, T, Out = ()>,
756{
757    type Out = Option<T>;
758    #[inline(always)]
759    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
760        let opt = self.prev.call(ctx, world, input);
761        if let Some(ref val) = opt {
762            self.step.call(ctx, world, val);
763        }
764        opt
765    }
766}
767
768/// Chain node for `.on_none()` on `Option<T>`.
769#[doc(hidden)]
770pub struct CtxOnNoneNode<Prev, S> {
771    pub(crate) prev: Prev,
772    pub(crate) producer: S,
773}
774
775impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxOnNoneNode<Prev, S>
776where
777    Prev: CtxChainCall<C, In, Out = Option<T>>,
778    S: CtxProducerCall<C, Out = ()>,
779{
780    type Out = Option<T>;
781    #[inline(always)]
782    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
783        let opt = self.prev.call(ctx, world, input);
784        if opt.is_none() {
785            self.producer.call(ctx, world);
786        }
787        opt
788    }
789}
790
791/// Chain node for `.unwrap_or_else()` on `Option<T>`.
792#[doc(hidden)]
793pub struct CtxUnwrapOrElseOptionNode<Prev, S> {
794    pub(crate) prev: Prev,
795    pub(crate) producer: S,
796}
797
798impl<C, In, T, Prev, S> CtxChainCall<C, In> for CtxUnwrapOrElseOptionNode<Prev, S>
799where
800    Prev: CtxChainCall<C, In, Out = Option<T>>,
801    S: CtxProducerCall<C, Out = T>,
802{
803    type Out = T;
804    #[inline(always)]
805    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> T {
806        match self.prev.call(ctx, world, input) {
807            Some(val) => val,
808            None => self.producer.call(ctx, world),
809        }
810    }
811}
812
813// -- Result nodes ------------------------------------------------------------
814
815/// Chain node for `.map()` on `Result<T, E>`.
816#[doc(hidden)]
817pub struct CtxMapResultNode<Prev, S> {
818    pub(crate) prev: Prev,
819    pub(crate) step: S,
820}
821
822impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxMapResultNode<Prev, S>
823where
824    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
825    S: CtxStepCall<C, T>,
826{
827    type Out = Result<S::Out, E>;
828    #[inline(always)]
829    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<S::Out, E> {
830        match self.prev.call(ctx, world, input) {
831            Ok(val) => Ok(self.step.call(ctx, world, val)),
832            Err(e) => Err(e),
833        }
834    }
835}
836
837/// Chain node for `.and_then()` on `Result<T, E>`.
838#[doc(hidden)]
839pub struct CtxAndThenResultNode<Prev, S> {
840    pub(crate) prev: Prev,
841    pub(crate) step: S,
842}
843
844impl<C, In, T, U, E, Prev, S> CtxChainCall<C, In> for CtxAndThenResultNode<Prev, S>
845where
846    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
847    S: CtxStepCall<C, T, Out = Result<U, E>>,
848{
849    type Out = Result<U, E>;
850    #[inline(always)]
851    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<U, E> {
852        match self.prev.call(ctx, world, input) {
853            Ok(val) => self.step.call(ctx, world, val),
854            Err(e) => Err(e),
855        }
856    }
857}
858
859/// Chain node for `.catch()` on `Result<T, E>` — handles error, transitions to `Option<T>`.
860#[doc(hidden)]
861pub struct CtxCatchNode<Prev, S> {
862    pub(crate) prev: Prev,
863    pub(crate) step: S,
864}
865
866impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxCatchNode<Prev, S>
867where
868    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
869    S: CtxStepCall<C, E, Out = ()>,
870{
871    type Out = Option<T>;
872    #[inline(always)]
873    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
874        match self.prev.call(ctx, world, input) {
875            Ok(val) => Some(val),
876            Err(e) => {
877                self.step.call(ctx, world, e);
878                None
879            }
880        }
881    }
882}
883
884/// Chain node for `.map_err()` on `Result<T, E>`.
885#[doc(hidden)]
886pub struct CtxMapErrNode<Prev, S> {
887    pub(crate) prev: Prev,
888    pub(crate) step: S,
889}
890
891impl<C, In, T, E, E2, Prev, S> CtxChainCall<C, In> for CtxMapErrNode<Prev, S>
892where
893    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
894    S: CtxStepCall<C, E, Out = E2>,
895{
896    type Out = Result<T, E2>;
897    #[inline(always)]
898    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E2> {
899        match self.prev.call(ctx, world, input) {
900            Ok(val) => Ok(val),
901            Err(e) => Err(self.step.call(ctx, world, e)),
902        }
903    }
904}
905
906/// Chain node for `.inspect_err()` on `Result<T, E>`.
907#[doc(hidden)]
908pub struct CtxInspectErrNode<Prev, S> {
909    pub(crate) prev: Prev,
910    pub(crate) step: S,
911}
912
913impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxInspectErrNode<Prev, S>
914where
915    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
916    S: CtxRefStepCall<C, E, Out = ()>,
917{
918    type Out = Result<T, E>;
919    #[inline(always)]
920    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E> {
921        let result = self.prev.call(ctx, world, input);
922        if let Err(ref e) = result {
923            self.step.call(ctx, world, e);
924        }
925        result
926    }
927}
928
929/// Chain node for `.inspect()` on `Result<T, E>` — side effect on Ok value.
930#[doc(hidden)]
931pub struct CtxInspectResultNode<Prev, S> {
932    pub(crate) prev: Prev,
933    pub(crate) step: S,
934}
935
936impl<C, In, T, E, Prev, S> CtxChainCall<C, In> for CtxInspectResultNode<Prev, S>
937where
938    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
939    S: CtxRefStepCall<C, T, Out = ()>,
940{
941    type Out = Result<T, E>;
942    #[inline(always)]
943    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E> {
944        let result = self.prev.call(ctx, world, input);
945        if let Ok(ref val) = result {
946            self.step.call(ctx, world, val);
947        }
948        result
949    }
950}
951
952/// Chain node for discarding `Option<()>` to `()`.
953#[doc(hidden)]
954pub struct CtxDiscardOptionNode<Prev> {
955    pub(crate) prev: Prev,
956}
957
958impl<C, In, Prev> CtxChainCall<C, In> for CtxDiscardOptionNode<Prev>
959where
960    Prev: CtxChainCall<C, In, Out = Option<()>>,
961{
962    type Out = ();
963    #[inline(always)]
964    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) {
965        let _ = self.prev.call(ctx, world, input);
966    }
967}
968
969// =============================================================================
970// CtxPipelineBuilder — entry point
971// =============================================================================
972
973/// Entry point for building a context-aware pipeline.
974///
975/// Like [`PipelineBuilder`](crate::PipelineBuilder) but every step
976/// receives `&mut C` as the first argument.
977///
978/// # Examples
979///
980/// ```ignore
981/// let pipeline = CtxPipelineBuilder::<ReconnectCtx, ()>::new()
982///     .then(check_retries, &reg)
983///     .then(attempt_connect, &reg)
984///     .build();
985/// ```
986#[must_use = "pipeline builder does nothing until .then() is called"]
987pub struct CtxPipelineBuilder<C, In>(PhantomData<fn(&mut C, In)>);
988
989impl<C, In> CtxPipelineBuilder<C, In> {
990    /// Create a new context-aware pipeline entry point.
991    pub fn new() -> Self {
992        Self(PhantomData)
993    }
994
995    /// Add the first step. Params resolved from the registry.
996    pub fn then<Out, Params, S: IntoCtxStep<C, In, Out, Params>>(
997        self,
998        f: S,
999        registry: &Registry,
1000    ) -> CtxPipelineChain<C, In, Out, CtxThenNode<CtxIdentityNode, S::Step>> {
1001        CtxPipelineChain {
1002            chain: CtxThenNode {
1003                prev: CtxIdentityNode,
1004                step: f.into_ctx_step(registry),
1005            },
1006            _marker: PhantomData,
1007        }
1008    }
1009}
1010
1011impl<C, In> Default for CtxPipelineBuilder<C, In> {
1012    fn default() -> Self {
1013        Self::new()
1014    }
1015}
1016
1017// =============================================================================
1018// CtxPipelineChain — typestate builder
1019// =============================================================================
1020
1021/// Builder that composes context-aware pipeline steps via named chain nodes.
1022///
1023/// `C` is the context type. `In` is the pipeline's input type (fixed).
1024/// `Out` is the current output. `Chain` is the concrete chain type.
1025#[must_use = "pipeline chain does nothing until .build() is called"]
1026pub struct CtxPipelineChain<C, In, Out, Chain> {
1027    chain: Chain,
1028    _marker: PhantomData<fn(&mut C, In) -> Out>,
1029}
1030
1031// =============================================================================
1032// Core — any Out
1033// =============================================================================
1034
1035impl<C, In, Out, Chain: CtxChainCall<C, In, Out = Out>> CtxPipelineChain<C, In, Out, Chain> {
1036    /// Add a step. Params resolved from the registry.
1037    pub fn then<NewOut, Params, S: IntoCtxStep<C, Out, NewOut, Params>>(
1038        self,
1039        f: S,
1040        registry: &Registry,
1041    ) -> CtxPipelineChain<C, In, NewOut, CtxThenNode<Chain, S::Step>> {
1042        CtxPipelineChain {
1043            chain: CtxThenNode {
1044                prev: self.chain,
1045                step: f.into_ctx_step(registry),
1046            },
1047            _marker: PhantomData,
1048        }
1049    }
1050
1051    /// Run the pipeline directly.
1052    pub fn run(&mut self, ctx: &mut C, world: &mut World, input: In) -> Out {
1053        self.chain.call(ctx, world, input)
1054    }
1055
1056    /// Conditionally wrap the output in `Option`. `Some(val)` if
1057    /// the predicate returns true, `None` otherwise.
1058    pub fn guard<Params, S: IntoCtxRefStep<C, Out, bool, Params>>(
1059        self,
1060        f: S,
1061        registry: &Registry,
1062    ) -> CtxPipelineChain<C, In, Option<Out>, CtxGuardNode<Chain, S::Step>> {
1063        CtxPipelineChain {
1064            chain: CtxGuardNode {
1065                prev: self.chain,
1066                step: f.into_ctx_ref_step(registry),
1067            },
1068            _marker: PhantomData,
1069        }
1070    }
1071
1072    /// Observe the current value without consuming or changing it.
1073    pub fn tap<Params, S: IntoCtxRefStep<C, Out, (), Params>>(
1074        self,
1075        f: S,
1076        registry: &Registry,
1077    ) -> CtxPipelineChain<C, In, Out, CtxTapNode<Chain, S::Step>> {
1078        CtxPipelineChain {
1079            chain: CtxTapNode {
1080                prev: self.chain,
1081                step: f.into_ctx_ref_step(registry),
1082            },
1083            _marker: PhantomData,
1084        }
1085    }
1086}
1087
1088// =============================================================================
1089// Option helpers — CtxPipelineChain<C, In, Option<T>, Chain>
1090// =============================================================================
1091
1092impl<C, In, T, Chain: CtxChainCall<C, In, Out = Option<T>>>
1093    CtxPipelineChain<C, In, Option<T>, Chain>
1094{
1095    /// Transform the inner value. Step not called on None.
1096    pub fn map<U, Params, S: IntoCtxStep<C, T, U, Params>>(
1097        self,
1098        f: S,
1099        registry: &Registry,
1100    ) -> CtxPipelineChain<C, In, Option<U>, CtxMapOptionNode<Chain, S::Step>> {
1101        CtxPipelineChain {
1102            chain: CtxMapOptionNode {
1103                prev: self.chain,
1104                step: f.into_ctx_step(registry),
1105            },
1106            _marker: PhantomData,
1107        }
1108    }
1109
1110    /// Short-circuits on None. std: `Option::and_then`
1111    pub fn and_then<U, Params, S: IntoCtxStep<C, T, Option<U>, Params>>(
1112        self,
1113        f: S,
1114        registry: &Registry,
1115    ) -> CtxPipelineChain<C, In, Option<U>, CtxAndThenNode<Chain, S::Step>> {
1116        CtxPipelineChain {
1117            chain: CtxAndThenNode {
1118                prev: self.chain,
1119                step: f.into_ctx_step(registry),
1120            },
1121            _marker: PhantomData,
1122        }
1123    }
1124
1125    /// Keep value if predicate holds. std: `Option::filter`
1126    pub fn filter<Params, S: IntoCtxRefStep<C, T, bool, Params>>(
1127        self,
1128        f: S,
1129        registry: &Registry,
1130    ) -> CtxPipelineChain<C, In, Option<T>, CtxFilterNode<Chain, S::Step>> {
1131        CtxPipelineChain {
1132            chain: CtxFilterNode {
1133                prev: self.chain,
1134                step: f.into_ctx_ref_step(registry),
1135            },
1136            _marker: PhantomData,
1137        }
1138    }
1139
1140    /// Side effect on Some value. std: `Option::inspect`
1141    pub fn inspect<Params, S: IntoCtxRefStep<C, T, (), Params>>(
1142        self,
1143        f: S,
1144        registry: &Registry,
1145    ) -> CtxPipelineChain<C, In, Option<T>, CtxInspectOptionNode<Chain, S::Step>> {
1146        CtxPipelineChain {
1147            chain: CtxInspectOptionNode {
1148                prev: self.chain,
1149                step: f.into_ctx_ref_step(registry),
1150            },
1151            _marker: PhantomData,
1152        }
1153    }
1154
1155    /// Side effect on None. Complement to [`inspect`](Self::inspect).
1156    pub fn on_none<Params, S: IntoCtxProducer<C, (), Params>>(
1157        self,
1158        f: S,
1159        registry: &Registry,
1160    ) -> CtxPipelineChain<C, In, Option<T>, CtxOnNoneNode<Chain, S::Step>> {
1161        CtxPipelineChain {
1162            chain: CtxOnNoneNode {
1163                prev: self.chain,
1164                producer: f.into_ctx_producer(registry),
1165            },
1166            _marker: PhantomData,
1167        }
1168    }
1169
1170    /// Exit Option — None becomes `f()`.
1171    pub fn unwrap_or_else<Params, S: IntoCtxProducer<C, T, Params>>(
1172        self,
1173        f: S,
1174        registry: &Registry,
1175    ) -> CtxPipelineChain<C, In, T, CtxUnwrapOrElseOptionNode<Chain, S::Step>> {
1176        CtxPipelineChain {
1177            chain: CtxUnwrapOrElseOptionNode {
1178                prev: self.chain,
1179                producer: f.into_ctx_producer(registry),
1180            },
1181            _marker: PhantomData,
1182        }
1183    }
1184
1185    /// None becomes Err(err). std: `Option::ok_or`
1186    pub fn ok_or<E: Clone>(
1187        self,
1188        err: E,
1189    ) -> CtxPipelineChain<C, In, Result<T, E>, CtxOkOrNode<Chain, E>> {
1190        CtxPipelineChain {
1191            chain: CtxOkOrNode {
1192                prev: self.chain,
1193                err,
1194            },
1195            _marker: PhantomData,
1196        }
1197    }
1198
1199    /// Exit Option — None becomes the default value.
1200    pub fn unwrap_or(
1201        self,
1202        default: T,
1203    ) -> CtxPipelineChain<C, In, T, CtxUnwrapOrOptionNode<Chain, T>>
1204    where
1205        T: Clone,
1206    {
1207        CtxPipelineChain {
1208            chain: CtxUnwrapOrOptionNode {
1209                prev: self.chain,
1210                default,
1211            },
1212            _marker: PhantomData,
1213        }
1214    }
1215}
1216
1217/// Chain node for `.ok_or()` on `Option<T>`.
1218#[doc(hidden)]
1219pub struct CtxOkOrNode<Prev, E> {
1220    pub(crate) prev: Prev,
1221    pub(crate) err: E,
1222}
1223
1224impl<C, In, T, E: Clone, Prev> CtxChainCall<C, In> for CtxOkOrNode<Prev, E>
1225where
1226    Prev: CtxChainCall<C, In, Out = Option<T>>,
1227{
1228    type Out = Result<T, E>;
1229    #[inline(always)]
1230    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Result<T, E> {
1231        match self.prev.call(ctx, world, input) {
1232            Some(val) => Ok(val),
1233            None => Err(self.err.clone()),
1234        }
1235    }
1236}
1237
1238/// Chain node for `.unwrap_or()` on `Option<T>`.
1239#[doc(hidden)]
1240pub struct CtxUnwrapOrOptionNode<Prev, T> {
1241    pub(crate) prev: Prev,
1242    pub(crate) default: T,
1243}
1244
1245impl<C, In, T: Clone, Prev> CtxChainCall<C, In> for CtxUnwrapOrOptionNode<Prev, T>
1246where
1247    Prev: CtxChainCall<C, In, Out = Option<T>>,
1248{
1249    type Out = T;
1250    #[inline(always)]
1251    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> T {
1252        match self.prev.call(ctx, world, input) {
1253            Some(val) => val,
1254            None => self.default.clone(),
1255        }
1256    }
1257}
1258
1259// =============================================================================
1260// Result helpers — CtxPipelineChain<C, In, Result<T, E>, Chain>
1261// =============================================================================
1262
1263impl<C, In, T, E, Chain: CtxChainCall<C, In, Out = Result<T, E>>>
1264    CtxPipelineChain<C, In, Result<T, E>, Chain>
1265{
1266    /// Transform the Ok value. Step not called on Err.
1267    pub fn map<U, Params, S: IntoCtxStep<C, T, U, Params>>(
1268        self,
1269        f: S,
1270        registry: &Registry,
1271    ) -> CtxPipelineChain<C, In, Result<U, E>, CtxMapResultNode<Chain, S::Step>> {
1272        CtxPipelineChain {
1273            chain: CtxMapResultNode {
1274                prev: self.chain,
1275                step: f.into_ctx_step(registry),
1276            },
1277            _marker: PhantomData,
1278        }
1279    }
1280
1281    /// Short-circuits on Err. std: `Result::and_then`
1282    pub fn and_then<U, Params, S: IntoCtxStep<C, T, Result<U, E>, Params>>(
1283        self,
1284        f: S,
1285        registry: &Registry,
1286    ) -> CtxPipelineChain<C, In, Result<U, E>, CtxAndThenResultNode<Chain, S::Step>> {
1287        CtxPipelineChain {
1288            chain: CtxAndThenResultNode {
1289                prev: self.chain,
1290                step: f.into_ctx_step(registry),
1291            },
1292            _marker: PhantomData,
1293        }
1294    }
1295
1296    /// Handle error and transition to Option.
1297    pub fn catch<Params, S: IntoCtxStep<C, E, (), Params>>(
1298        self,
1299        f: S,
1300        registry: &Registry,
1301    ) -> CtxPipelineChain<C, In, Option<T>, CtxCatchNode<Chain, S::Step>> {
1302        CtxPipelineChain {
1303            chain: CtxCatchNode {
1304                prev: self.chain,
1305                step: f.into_ctx_step(registry),
1306            },
1307            _marker: PhantomData,
1308        }
1309    }
1310
1311    /// Transform the error. std: `Result::map_err`
1312    pub fn map_err<E2, Params, S: IntoCtxStep<C, E, E2, Params>>(
1313        self,
1314        f: S,
1315        registry: &Registry,
1316    ) -> CtxPipelineChain<C, In, Result<T, E2>, CtxMapErrNode<Chain, S::Step>> {
1317        CtxPipelineChain {
1318            chain: CtxMapErrNode {
1319                prev: self.chain,
1320                step: f.into_ctx_step(registry),
1321            },
1322            _marker: PhantomData,
1323        }
1324    }
1325
1326    /// Side effect on Ok value. std: `Result::inspect` (nightly)
1327    pub fn inspect<Params, S: IntoCtxRefStep<C, T, (), Params>>(
1328        self,
1329        f: S,
1330        registry: &Registry,
1331    ) -> CtxPipelineChain<C, In, Result<T, E>, CtxInspectResultNode<Chain, S::Step>> {
1332        CtxPipelineChain {
1333            chain: CtxInspectResultNode {
1334                prev: self.chain,
1335                step: f.into_ctx_ref_step(registry),
1336            },
1337            _marker: PhantomData,
1338        }
1339    }
1340
1341    /// Side effect on Err value. std: `Result::inspect_err` (nightly)
1342    pub fn inspect_err<Params, S: IntoCtxRefStep<C, E, (), Params>>(
1343        self,
1344        f: S,
1345        registry: &Registry,
1346    ) -> CtxPipelineChain<C, In, Result<T, E>, CtxInspectErrNode<Chain, S::Step>> {
1347        CtxPipelineChain {
1348            chain: CtxInspectErrNode {
1349                prev: self.chain,
1350                step: f.into_ctx_ref_step(registry),
1351            },
1352            _marker: PhantomData,
1353        }
1354    }
1355
1356    /// Convert Result to Option, discarding the error.
1357    pub fn ok(self) -> CtxPipelineChain<C, In, Option<T>, CtxOkNode<Chain>> {
1358        CtxPipelineChain {
1359            chain: CtxOkNode { prev: self.chain },
1360            _marker: PhantomData,
1361        }
1362    }
1363
1364    /// Exit Result — Err becomes the default value.
1365    pub fn unwrap_or(
1366        self,
1367        default: T,
1368    ) -> CtxPipelineChain<C, In, T, CtxUnwrapOrResultNode<Chain, T>>
1369    where
1370        T: Clone,
1371    {
1372        CtxPipelineChain {
1373            chain: CtxUnwrapOrResultNode {
1374                prev: self.chain,
1375                default,
1376            },
1377            _marker: PhantomData,
1378        }
1379    }
1380}
1381
1382/// Chain node for `.ok()` on `Result<T, E>` — discards error.
1383#[doc(hidden)]
1384pub struct CtxOkNode<Prev> {
1385    pub(crate) prev: Prev,
1386}
1387
1388impl<C, In, T, E, Prev> CtxChainCall<C, In> for CtxOkNode<Prev>
1389where
1390    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
1391{
1392    type Out = Option<T>;
1393    #[inline(always)]
1394    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> Option<T> {
1395        self.prev.call(ctx, world, input).ok()
1396    }
1397}
1398
1399/// Chain node for `.unwrap_or()` on `Result<T, E>`.
1400#[doc(hidden)]
1401pub struct CtxUnwrapOrResultNode<Prev, T> {
1402    pub(crate) prev: Prev,
1403    pub(crate) default: T,
1404}
1405
1406impl<C, In, T: Clone, E, Prev> CtxChainCall<C, In> for CtxUnwrapOrResultNode<Prev, T>
1407where
1408    Prev: CtxChainCall<C, In, Out = Result<T, E>>,
1409{
1410    type Out = T;
1411    #[inline(always)]
1412    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) -> T {
1413        match self.prev.call(ctx, world, input) {
1414            Ok(val) => val,
1415            Err(_) => self.default.clone(),
1416        }
1417    }
1418}
1419
1420// =============================================================================
1421// build — when Out is () or Option<()>
1422// =============================================================================
1423
1424impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxPipelineChain<C, In, (), Chain> {
1425    /// Finalize the pipeline into a [`CtxPipeline`].
1426    #[must_use = "building a pipeline without storing it does nothing"]
1427    pub fn build(self) -> CtxPipeline<C, In, Chain> {
1428        CtxPipeline {
1429            chain: self.chain,
1430            _marker: PhantomData,
1431        }
1432    }
1433}
1434
1435impl<C, In, Chain: CtxChainCall<C, In, Out = Option<()>>>
1436    CtxPipelineChain<C, In, Option<()>, Chain>
1437{
1438    /// Finalize the pipeline, discarding `Option<()>`.
1439    #[must_use = "building a pipeline without storing it does nothing"]
1440    pub fn build(self) -> CtxPipeline<C, In, CtxDiscardOptionNode<Chain>> {
1441        CtxPipeline {
1442            chain: CtxDiscardOptionNode { prev: self.chain },
1443            _marker: PhantomData,
1444        }
1445    }
1446}
1447
1448// =============================================================================
1449// CtxPipeline — built context-aware pipeline
1450// =============================================================================
1451
1452/// Built context-aware pipeline.
1453///
1454/// Created by [`CtxPipelineChain::build`]. Implements [`CtxStepCall`]
1455/// for use inside [`Callback`](crate::Callback) dispatch.
1456pub struct CtxPipeline<C, In, Chain> {
1457    chain: Chain,
1458    _marker: PhantomData<fn(&mut C, In)>,
1459}
1460
1461impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxStepCall<C, In> for CtxPipeline<C, In, Chain> {
1462    type Out = ();
1463    fn call(&mut self, ctx: &mut C, world: &mut World, input: In) {
1464        self.chain.call(ctx, world, input);
1465    }
1466}
1467
1468impl<C, In, Chain: CtxChainCall<C, In, Out = ()>> CtxPipeline<C, In, Chain> {
1469    /// Run the pipeline with context, world, and input.
1470    pub fn run(&mut self, ctx: &mut C, world: &mut World, input: In) {
1471        self.chain.call(ctx, world, input);
1472    }
1473}
1474
1475// =============================================================================
1476// resolve_ctx_step — free function for select! macro dispatch arms
1477// =============================================================================
1478
1479/// Pre-resolve a context-aware step function against a [`Registry`].
1480///
1481/// Returns a closure `FnMut(&mut C, &mut World, In) -> Out` that can be
1482/// called directly without per-call registry lookup. Used by the
1483/// [`select!`](crate::select) macro to build monomorphized dispatch arms.
1484///
1485/// # Example
1486///
1487/// ```ignore
1488/// let mut arm = resolve_ctx_step::<SessionCtx, _, _, _, _>(on_new_order, &reg);
1489/// // Later, in a match arm:
1490/// arm(&mut ctx, &mut world, order);
1491/// ```
1492pub fn resolve_ctx_step<C, In, Out, Params, S>(
1493    f: S,
1494    registry: &Registry,
1495) -> impl FnMut(&mut C, &mut World, In) -> Out + use<C, In, Out, Params, S>
1496where
1497    C: 'static,
1498    In: 'static,
1499    Out: 'static,
1500    S: IntoCtxStep<C, In, Out, Params>,
1501{
1502    let mut resolved = f.into_ctx_step(registry);
1503    move |ctx: &mut C, world: &mut World, input: In| resolved.call(ctx, world, input)
1504}
1505
1506// =============================================================================
1507// Tests
1508// =============================================================================
1509
1510#[cfg(test)]
1511mod tests {
1512    use super::*;
1513    use crate::{Res, ResMut, WorldBuilder};
1514
1515    // -- Helper types ---------------------------------------------------------
1516
1517    struct ReconnectCtx {
1518        retries: u32,
1519        last_result: Option<bool>,
1520    }
1521
1522    // -- Core dispatch --------------------------------------------------------
1523
1524    #[test]
1525    fn ctx_pipeline_three_steps_with_context_mutation() {
1526        let mut wb = WorldBuilder::new();
1527        wb.register::<u64>(10);
1528        let mut world = wb.build();
1529        let reg = world.registry();
1530
1531        // Step 1: check retries (closure, arity 0)
1532        // Step 2: multiply by resource (named fn, arity 1)
1533        // Step 3: record result in context (closure, arity 0)
1534
1535        fn multiply(ctx: &mut ReconnectCtx, factor: Res<u64>, input: u32) -> u64 {
1536            ctx.retries += 1;
1537            *factor * input as u64
1538        }
1539
1540        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1541            .then(
1542                |ctx: &mut ReconnectCtx, x: u32| {
1543                    ctx.retries += 1;
1544                    x
1545                },
1546                &reg,
1547            )
1548            .then(multiply, &reg)
1549            .then(
1550                |ctx: &mut ReconnectCtx, val: u64| {
1551                    ctx.last_result = Some(val > 0);
1552                },
1553                &reg,
1554            )
1555            .build();
1556
1557        let mut ctx = ReconnectCtx {
1558            retries: 0,
1559            last_result: None,
1560        };
1561
1562        pipeline.run(&mut ctx, &mut world, 5);
1563
1564        // Closure incremented once, named fn incremented once
1565        assert_eq!(ctx.retries, 2);
1566        // 10 * 5 = 50, which is > 0
1567        assert_eq!(ctx.last_result, Some(true));
1568    }
1569
1570    #[test]
1571    fn ctx_pipeline_guard_and_map() {
1572        let mut world = WorldBuilder::new().build();
1573        let reg = world.registry();
1574
1575        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1576            .then(|_ctx: &mut ReconnectCtx, x: u32| x, &reg)
1577            .guard(|_ctx: &mut ReconnectCtx, x: &u32| *x > 10, &reg)
1578            .map(
1579                |ctx: &mut ReconnectCtx, x: u32| {
1580                    ctx.retries += 1;
1581                    x * 2
1582                },
1583                &reg,
1584            );
1585
1586        let mut ctx = ReconnectCtx {
1587            retries: 0,
1588            last_result: None,
1589        };
1590
1591        // x = 5, guard fails, map not called
1592        let result = pipeline.run(&mut ctx, &mut world, 5);
1593        assert_eq!(result, None);
1594        assert_eq!(ctx.retries, 0);
1595
1596        // x = 20, guard passes, map called
1597        let result = pipeline.run(&mut ctx, &mut world, 20);
1598        assert_eq!(result, Some(40));
1599        assert_eq!(ctx.retries, 1);
1600    }
1601
1602    #[test]
1603    fn ctx_pipeline_and_then() {
1604        let mut world = WorldBuilder::new().build();
1605        let reg = world.registry();
1606
1607        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1608            .then(|_ctx: &mut ReconnectCtx, x: u32| Some(x), &reg)
1609            .and_then(
1610                |ctx: &mut ReconnectCtx, x: u32| {
1611                    ctx.retries += 1;
1612                    if x > 5 { Some(x * 2) } else { None }
1613                },
1614                &reg,
1615            );
1616
1617        let mut ctx = ReconnectCtx {
1618            retries: 0,
1619            last_result: None,
1620        };
1621
1622        assert_eq!(pipeline.run(&mut ctx, &mut world, 3), None);
1623        assert_eq!(ctx.retries, 1);
1624
1625        assert_eq!(pipeline.run(&mut ctx, &mut world, 10), Some(20));
1626        assert_eq!(ctx.retries, 2);
1627    }
1628
1629    #[test]
1630    fn ctx_pipeline_catch() {
1631        let mut world = WorldBuilder::new().build();
1632        let reg = world.registry();
1633
1634        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1635            .then(
1636                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
1637                    if x > 0 {
1638                        Ok(x)
1639                    } else {
1640                        Err("zero".to_string())
1641                    }
1642                },
1643                &reg,
1644            )
1645            .catch(
1646                |ctx: &mut ReconnectCtx, _err: String| {
1647                    ctx.retries += 1;
1648                },
1649                &reg,
1650            )
1651            .map(
1652                |ctx: &mut ReconnectCtx, val: u32| {
1653                    ctx.last_result = Some(true);
1654                    val
1655                },
1656                &reg,
1657            );
1658
1659        let mut ctx = ReconnectCtx {
1660            retries: 0,
1661            last_result: None,
1662        };
1663
1664        // Error path
1665        let result = pipeline.run(&mut ctx, &mut world, 0);
1666        assert_eq!(result, None);
1667        assert_eq!(ctx.retries, 1);
1668        assert_eq!(ctx.last_result, None);
1669
1670        // Ok path
1671        let result = pipeline.run(&mut ctx, &mut world, 42);
1672        assert_eq!(result, Some(42));
1673        assert_eq!(ctx.retries, 1);
1674        assert_eq!(ctx.last_result, Some(true));
1675    }
1676
1677    #[test]
1678    fn ctx_pipeline_with_res_mut() {
1679        let mut wb = WorldBuilder::new();
1680        wb.register::<u64>(0);
1681        let mut world = wb.build();
1682        let reg = world.registry();
1683
1684        fn accumulate(ctx: &mut ReconnectCtx, mut total: ResMut<u64>, val: u32) {
1685            *total += val as u64;
1686            ctx.retries += 1;
1687        }
1688
1689        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1690            .then(accumulate, &reg)
1691            .build();
1692
1693        let mut ctx = ReconnectCtx {
1694            retries: 0,
1695            last_result: None,
1696        };
1697
1698        pipeline.run(&mut ctx, &mut world, 10);
1699        pipeline.run(&mut ctx, &mut world, 5);
1700
1701        assert_eq!(*world.resource::<u64>(), 15);
1702        assert_eq!(ctx.retries, 2);
1703    }
1704
1705    #[test]
1706    fn ctx_pipeline_build_with_option_unit() {
1707        let mut world = WorldBuilder::new().build();
1708        let reg = world.registry();
1709
1710        // Pipeline that ends with Option<()> — should still build
1711        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1712            .then(
1713                |_ctx: &mut ReconnectCtx, x: u32| -> Option<u32> {
1714                    if x > 0 { Some(x) } else { None }
1715                },
1716                &reg,
1717            )
1718            .map(|_ctx: &mut ReconnectCtx, _x: u32| {}, &reg)
1719            .build();
1720
1721        let mut ctx = ReconnectCtx {
1722            retries: 0,
1723            last_result: None,
1724        };
1725
1726        // Should not panic
1727        pipeline.run(&mut ctx, &mut world, 5);
1728        pipeline.run(&mut ctx, &mut world, 0);
1729    }
1730
1731    #[test]
1732    fn ctx_pipeline_tap() {
1733        let mut world = WorldBuilder::new().build();
1734        let reg = world.registry();
1735
1736        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1737            .then(|_ctx: &mut ReconnectCtx, x: u32| x * 2, &reg)
1738            .tap(
1739                |ctx: &mut ReconnectCtx, val: &u32| {
1740                    ctx.retries = *val;
1741                },
1742                &reg,
1743            )
1744            .then(
1745                |_ctx: &mut ReconnectCtx, x: u32| {
1746                    // Value should pass through tap unchanged
1747                    assert_eq!(x, 10);
1748                },
1749                &reg,
1750            )
1751            .build();
1752
1753        let mut ctx = ReconnectCtx {
1754            retries: 0,
1755            last_result: None,
1756        };
1757
1758        pipeline.run(&mut ctx, &mut world, 5);
1759        assert_eq!(ctx.retries, 10);
1760    }
1761
1762    #[test]
1763    fn ctx_pipeline_result_map_and_map_err() {
1764        let mut world = WorldBuilder::new().build();
1765        let reg = world.registry();
1766
1767        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1768            .then(
1769                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, u32> {
1770                    if x > 0 { Ok(x) } else { Err(x) }
1771                },
1772                &reg,
1773            )
1774            .map(|_ctx: &mut ReconnectCtx, x: u32| x * 10, &reg)
1775            .map_err(
1776                |ctx: &mut ReconnectCtx, e: u32| {
1777                    ctx.retries += 1;
1778                    format!("error: {e}")
1779                },
1780                &reg,
1781            );
1782
1783        let mut ctx = ReconnectCtx {
1784            retries: 0,
1785            last_result: None,
1786        };
1787
1788        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), Ok(50));
1789        assert_eq!(
1790            pipeline.run(&mut ctx, &mut world, 0),
1791            Err("error: 0".to_string())
1792        );
1793        assert_eq!(ctx.retries, 1);
1794    }
1795
1796    #[test]
1797    fn ctx_pipeline_inspect_err() {
1798        let mut world = WorldBuilder::new().build();
1799        let reg = world.registry();
1800
1801        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1802            .then(
1803                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
1804                    if x > 0 { Ok(x) } else { Err("zero".into()) }
1805                },
1806                &reg,
1807            )
1808            .inspect_err(
1809                |ctx: &mut ReconnectCtx, _e: &String| {
1810                    ctx.retries += 1;
1811                },
1812                &reg,
1813            );
1814
1815        let mut ctx = ReconnectCtx {
1816            retries: 0,
1817            last_result: None,
1818        };
1819
1820        let _ = pipeline.run(&mut ctx, &mut world, 0);
1821        assert_eq!(ctx.retries, 1);
1822
1823        // Ok path — inspect_err not called
1824        let _ = pipeline.run(&mut ctx, &mut world, 5);
1825        assert_eq!(ctx.retries, 1);
1826    }
1827
1828    #[test]
1829    fn ctx_pipeline_filter() {
1830        let mut world = WorldBuilder::new().build();
1831        let reg = world.registry();
1832
1833        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1834            .then(|_ctx: &mut ReconnectCtx, x: u32| Some(x), &reg)
1835            .filter(|_ctx: &mut ReconnectCtx, x: &u32| *x > 10, &reg);
1836
1837        let mut ctx = ReconnectCtx {
1838            retries: 0,
1839            last_result: None,
1840        };
1841
1842        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), None);
1843        assert_eq!(pipeline.run(&mut ctx, &mut world, 20), Some(20));
1844    }
1845
1846    #[test]
1847    fn ctx_pipeline_ok_or() {
1848        let mut world = WorldBuilder::new().build();
1849        let reg = world.registry();
1850
1851        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1852            .then(
1853                |_ctx: &mut ReconnectCtx, x: u32| {
1854                    if x > 0 { Some(x) } else { None }
1855                },
1856                &reg,
1857            )
1858            .ok_or("was zero");
1859
1860        let mut ctx = ReconnectCtx {
1861            retries: 0,
1862            last_result: None,
1863        };
1864
1865        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), Ok(5));
1866        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), Err("was zero"));
1867    }
1868
1869    #[test]
1870    fn ctx_pipeline_unwrap_or_option() {
1871        let mut world = WorldBuilder::new().build();
1872        let reg = world.registry();
1873
1874        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1875            .then(
1876                |_ctx: &mut ReconnectCtx, x: u32| {
1877                    if x > 0 { Some(x) } else { None }
1878                },
1879                &reg,
1880            )
1881            .unwrap_or(99);
1882
1883        let mut ctx = ReconnectCtx {
1884            retries: 0,
1885            last_result: None,
1886        };
1887
1888        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
1889        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 99);
1890    }
1891
1892    #[test]
1893    fn ctx_pipeline_unwrap_or_else_option() {
1894        let mut world = WorldBuilder::new().build();
1895        let reg = world.registry();
1896
1897        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1898            .then(
1899                |_ctx: &mut ReconnectCtx, x: u32| {
1900                    if x > 0 { Some(x) } else { None }
1901                },
1902                &reg,
1903            )
1904            .unwrap_or_else(
1905                |ctx: &mut ReconnectCtx| {
1906                    ctx.retries += 1;
1907                    42
1908                },
1909                &reg,
1910            );
1911
1912        let mut ctx = ReconnectCtx {
1913            retries: 0,
1914            last_result: None,
1915        };
1916
1917        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
1918        assert_eq!(ctx.retries, 0);
1919        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 42);
1920        assert_eq!(ctx.retries, 1);
1921    }
1922
1923    #[test]
1924    fn ctx_pipeline_inspect_option() {
1925        let mut world = WorldBuilder::new().build();
1926        let reg = world.registry();
1927
1928        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1929            .then(
1930                |_ctx: &mut ReconnectCtx, x: u32| {
1931                    if x > 0 { Some(x) } else { None }
1932                },
1933                &reg,
1934            )
1935            .inspect(
1936                |ctx: &mut ReconnectCtx, val: &u32| {
1937                    ctx.retries = *val;
1938                },
1939                &reg,
1940            );
1941
1942        let mut ctx = ReconnectCtx {
1943            retries: 0,
1944            last_result: None,
1945        };
1946
1947        // Some path — inspect fires
1948        let _ = pipeline.run(&mut ctx, &mut world, 7);
1949        assert_eq!(ctx.retries, 7);
1950
1951        // None path — inspect skipped
1952        let _ = pipeline.run(&mut ctx, &mut world, 0);
1953        assert_eq!(ctx.retries, 7);
1954    }
1955
1956    #[test]
1957    fn ctx_pipeline_on_none() {
1958        let mut world = WorldBuilder::new().build();
1959        let reg = world.registry();
1960
1961        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1962            .then(
1963                |_ctx: &mut ReconnectCtx, x: u32| {
1964                    if x > 0 { Some(x) } else { None }
1965                },
1966                &reg,
1967            )
1968            .on_none(
1969                |ctx: &mut ReconnectCtx| {
1970                    ctx.retries += 1;
1971                },
1972                &reg,
1973            );
1974
1975        let mut ctx = ReconnectCtx {
1976            retries: 0,
1977            last_result: None,
1978        };
1979
1980        // Some path — on_none not called
1981        let result = pipeline.run(&mut ctx, &mut world, 5);
1982        assert_eq!(result, Some(5));
1983        assert_eq!(ctx.retries, 0);
1984
1985        // None path — on_none called
1986        let result = pipeline.run(&mut ctx, &mut world, 0);
1987        assert_eq!(result, None);
1988        assert_eq!(ctx.retries, 1);
1989    }
1990
1991    #[test]
1992    fn ctx_pipeline_ok_result() {
1993        let mut world = WorldBuilder::new().build();
1994        let reg = world.registry();
1995
1996        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
1997            .then(
1998                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
1999                    if x > 0 { Ok(x) } else { Err("zero".into()) }
2000                },
2001                &reg,
2002            )
2003            .ok();
2004
2005        let mut ctx = ReconnectCtx {
2006            retries: 0,
2007            last_result: None,
2008        };
2009
2010        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), Some(5));
2011        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), None);
2012    }
2013
2014    #[test]
2015    fn ctx_pipeline_unwrap_or_result() {
2016        let mut world = WorldBuilder::new().build();
2017        let reg = world.registry();
2018
2019        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2020            .then(
2021                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
2022                    if x > 0 { Ok(x) } else { Err("zero".into()) }
2023                },
2024                &reg,
2025            )
2026            .unwrap_or(99);
2027
2028        let mut ctx = ReconnectCtx {
2029            retries: 0,
2030            last_result: None,
2031        };
2032
2033        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
2034        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 99);
2035    }
2036
2037    #[test]
2038    fn ctx_pipeline_inspect_result() {
2039        let mut world = WorldBuilder::new().build();
2040        let reg = world.registry();
2041
2042        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2043            .then(
2044                |_ctx: &mut ReconnectCtx, x: u32| -> Result<u32, String> {
2045                    if x > 0 { Ok(x) } else { Err("zero".into()) }
2046                },
2047                &reg,
2048            )
2049            .inspect(
2050                |ctx: &mut ReconnectCtx, val: &u32| {
2051                    ctx.retries = *val;
2052                },
2053                &reg,
2054            );
2055
2056        let mut ctx = ReconnectCtx {
2057            retries: 0,
2058            last_result: None,
2059        };
2060
2061        // Ok path — inspect fires
2062        let _ = pipeline.run(&mut ctx, &mut world, 7);
2063        assert_eq!(ctx.retries, 7);
2064
2065        // Err path — inspect skipped
2066        let _ = pipeline.run(&mut ctx, &mut world, 0);
2067        assert_eq!(ctx.retries, 7);
2068    }
2069
2070    // -- Opaque escape hatch --------------------------------------------------
2071
2072    #[test]
2073    fn ctx_pipeline_opaque_step() {
2074        let mut wb = WorldBuilder::new();
2075        wb.register::<u64>(100);
2076        let mut world = wb.build();
2077        let reg = world.registry();
2078
2079        // Opaque step: FnMut(&mut C, &mut World, In) -> Out
2080        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2081            .then(
2082                |ctx: &mut ReconnectCtx, w: &mut World, x: u32| {
2083                    ctx.retries += 1;
2084                    let scale = *w.resource::<u64>();
2085                    u64::from(x) * scale
2086                },
2087                &reg,
2088            )
2089            .then(
2090                |ctx: &mut ReconnectCtx, val: u64| {
2091                    ctx.last_result = Some(val > 0);
2092                },
2093                &reg,
2094            )
2095            .build();
2096
2097        let mut ctx = ReconnectCtx {
2098            retries: 0,
2099            last_result: None,
2100        };
2101
2102        pipeline.run(&mut ctx, &mut world, 5);
2103        assert_eq!(ctx.retries, 1);
2104        assert_eq!(ctx.last_result, Some(true));
2105    }
2106
2107    #[test]
2108    fn ctx_pipeline_opaque_guard() {
2109        let mut wb = WorldBuilder::new();
2110        wb.register::<u64>(10);
2111        let mut world = wb.build();
2112        let reg = world.registry();
2113
2114        // Opaque ref step used as guard
2115        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2116            .then(|_ctx: &mut ReconnectCtx, x: u32| x, &reg)
2117            .guard(
2118                |_ctx: &mut ReconnectCtx, w: &mut World, x: &u32| {
2119                    let threshold = *w.resource::<u64>();
2120                    u64::from(*x) > threshold
2121                },
2122                &reg,
2123            );
2124
2125        let mut ctx = ReconnectCtx {
2126            retries: 0,
2127            last_result: None,
2128        };
2129
2130        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), None);
2131        assert_eq!(pipeline.run(&mut ctx, &mut world, 20), Some(20));
2132    }
2133
2134    #[test]
2135    fn ctx_pipeline_opaque_producer() {
2136        let mut wb = WorldBuilder::new();
2137        wb.register::<u64>(42);
2138        let mut world = wb.build();
2139        let reg = world.registry();
2140
2141        // Opaque producer: FnMut(&mut C, &mut World) -> Out
2142        let mut pipeline = CtxPipelineBuilder::<ReconnectCtx, u32>::new()
2143            .then(
2144                |_ctx: &mut ReconnectCtx, x: u32| {
2145                    if x > 0 { Some(x) } else { None }
2146                },
2147                &reg,
2148            )
2149            .unwrap_or_else(
2150                |ctx: &mut ReconnectCtx, w: &mut World| {
2151                    ctx.retries += 1;
2152                    *w.resource::<u64>() as u32
2153                },
2154                &reg,
2155            );
2156
2157        let mut ctx = ReconnectCtx {
2158            retries: 0,
2159            last_result: None,
2160        };
2161
2162        assert_eq!(pipeline.run(&mut ctx, &mut world, 5), 5);
2163        assert_eq!(ctx.retries, 0);
2164        assert_eq!(pipeline.run(&mut ctx, &mut world, 0), 42);
2165        assert_eq!(ctx.retries, 1);
2166    }
2167}