Skip to main content

rust_api/
pipeline.rs

1//! Monadic router pipeline for composable, error-propagating route
2//! registration.
3//!
4//! [`RouterPipeline`] wraps `Result<Router<()>>` and provides a fluent builder
5//! API where every step is `Result::and_then` (`>>=`). A failed step
6//! short-circuits all subsequent steps. The error surfaces at `.build()`.
7//!
8//! # The Kleisli Model
9//!
10//! Each `mount::<C>(state)` call creates a Kleisli arrow
11//! `Router<()> -> Result<Router<()>>` from the controller's `mount` fn and
12//! threads it through the pipeline via `and_then`. The pipeline IS the
13//! Kleisli compositor — controllers are pure arrows, they don't compose
14//! themselves.
15//!
16//! # Algebraic Operations
17//!
18//! | Method | Concept | Description |
19//! |---|---|---|
20//! | `map(f)` | Functor (`fmap`) | Infallible `Router -> Router` transform |
21//! | `and_then(f)` | Monad bind (`>>=`) | Fallible `Router -> Result<Router>` |
22//! | `mount::<C>(state)` | Kleisli bind | Thread router through a `Controller` arrow |
23//! | `mount_if::<C>(bool, state)` | Conditional bind | Mount only when condition is `true` |
24//! | `mount_guarded::<C>(state, g)` | Guarded bind | Mount only when guard `g()` succeeds |
25//! | `fold(steps)` | Catamorphism | Apply a dynamic list of fallible steps |
26//! | `layer_all(transforms)` | `fold` over transforms | Apply a list of `Router -> Router` fns |
27//! | `group(prefix, f)` | Scoped functor | Sub-pipeline with path prefix applied |
28//! | `route(info, handler)` | Route registration | Stateless route via route info tuple |
29//! | `build()` | Interpreter / run | Consume pipeline, surface `Result<Router<()>>` |
30//!
31//! # Example
32//!
33//! ```ignore
34//! let health_svc = Arc::new(HealthService::new());
35//! let echo_svc   = Arc::new(EchoService::new());
36//!
37//! let app = RouterPipeline::new()
38//!     .mount::<HealthController>(health_svc)
39//!     .mount_if::<EchoController>(config.enable_echo, echo_svc)
40//!     .route(__root_route, root_handler)
41//!     .map(|r| r.layer(TraceLayer::new_for_http()))
42//!     .map(|r| r.layer(CorsLayer::permissive()))
43//!     .build()?;
44//! ```
45
46use std::sync::Arc;
47
48use crate::{
49    controller::Controller,
50    error::Result,
51    router::{ApiRoute, Router},
52};
53
54/// A boxed, infallible router transformation.
55///
56/// Used with [`RouterPipeline::layer_all`] to apply a dynamic collection of
57/// transforms (e.g., middleware layers) to the pipeline.
58pub type RouterTransform = Box<dyn FnOnce(Router<()>) -> Router<()>>;
59
60/// Monadic router builder that propagates errors through the pipeline via
61/// Kleisli composition.
62///
63/// Wraps `Result<Router<()>>`. Each step is `Result::and_then` — any error
64/// short-circuits the rest of the chain. Call [`build`](RouterPipeline::build)
65/// at the end to surface the final `Result<Router<()>>`.
66///
67/// See [module-level docs](self) for the full operation table.
68pub struct RouterPipeline(Result<Router<()>>);
69
70impl RouterPipeline {
71    /// Start a new pipeline with an empty `Router<()>`.
72    pub fn new() -> Self {
73        Self(Ok(crate::router::build()))
74    }
75
76    // -----------------------------------------------------------------------
77    // Core operations
78    // -----------------------------------------------------------------------
79
80    /// Kleisli bind: thread the router through a [`Controller`]'s Kleisli
81    /// arrow.
82    ///
83    /// Calls `C::mount(state)` to obtain the arrow, then threads it via
84    /// `and_then`. The controller's routes are merged into the pipeline's
85    /// router. Short-circuits if any previous step failed.
86    ///
87    /// The controller has **no knowledge of routing infrastructure** — it only
88    /// provides the Kleisli arrow. The pipeline is the sole compositor.
89    pub fn mount<C: Controller>(self, state: Arc<C::State>) -> Self {
90        Self(self.0.and_then(C::mount(state)))
91    }
92
93    /// Functor map (`fmap`): apply an infallible `Router -> Router` transform.
94    ///
95    /// The most common use is adding a middleware layer:
96    /// ```ignore
97    /// pipeline.map(|r| r.layer(TraceLayer::new_for_http()))
98    /// ```
99    pub fn map<F>(self, f: F) -> Self
100    where
101        F: FnOnce(Router<()>) -> Router<()>,
102    {
103        Self(self.0.map(f))
104    }
105
106    /// Monad bind (`>>=`): apply a fallible `Router -> Result<Router>`
107    /// transform.
108    ///
109    /// Short-circuits on any previous error. Use for transforms that can fail.
110    pub fn and_then<F>(self, f: F) -> Self
111    where
112        F: FnOnce(Router<()>) -> Result<Router<()>>,
113    {
114        Self(self.0.and_then(f))
115    }
116
117    /// Register a stateless route (no service state) using a route info tuple.
118    ///
119    /// The `route_info` tuple is produced by a route macro annotation:
120    /// `__root_route` is `("/", "GET")` when annotated `#[get("/")]`.
121    /// The HTTP verb is enforced by [`ApiRoute::api_route`].
122    pub fn route<H, T>(self, route_info: (&'static str, &'static str), handler: H) -> Self
123    where
124        H: axum::handler::Handler<T, ()>,
125        T: 'static,
126    {
127        self.map(|r| r.api_route(route_info, handler))
128    }
129
130    /// Terminate the pipeline and return the built `Router<()>`.
131    ///
132    /// Use `?` at the call site to propagate any error that occurred during
133    /// pipeline construction:
134    /// ```ignore
135    /// let app = RouterPipeline::new()
136    ///     .mount::<HealthController>(Arc::new(HealthService::new()))
137    ///     .build()?;
138    /// ```
139    pub fn build(self) -> Result<Router<()>> {
140        self.0
141    }
142
143    // -----------------------------------------------------------------------
144    // Conditional and guarded mounting
145    // -----------------------------------------------------------------------
146
147    /// Conditional mount: mount a [`Controller`] only when `condition` is
148    /// `true`.
149    ///
150    /// When `false`, the pipeline passes through unchanged — no error produced.
151    /// The `state` value is moved into the mount call when `condition` is
152    /// `true`, or dropped when `false`.
153    ///
154    /// ```ignore
155    /// RouterPipeline::new()
156    ///     .mount::<HealthController>(health_svc)
157    ///     .mount_if::<MetricsController>(config.enable_metrics, metrics_svc)
158    ///     .mount_if::<AdminController>(env.is_dev(), admin_svc)
159    ///     .build()?
160    /// ```
161    pub fn mount_if<C: Controller>(self, condition: bool, state: Arc<C::State>) -> Self {
162        if condition {
163            self.mount::<C>(state)
164        } else {
165            // identity — drop state, pass the pipeline through unchanged
166            self
167        }
168    }
169
170    /// Guarded mount: mount a [`Controller`] only when `guard()` returns
171    /// `Ok(())`.
172    ///
173    /// The guard is a fallible predicate evaluated before the controller's
174    /// Kleisli arrow runs. A guard error short-circuits the pipeline the same
175    /// way as a failed `mount`.
176    ///
177    /// Use this for runtime checks (required config, capability flags, etc.):
178    ///
179    /// ```ignore
180    /// RouterPipeline::new()
181    ///     .mount_guarded::<AdminController>(admin_svc, || {
182    ///         if config.admin_secret.is_empty() {
183    ///             Err(Error::other("admin_secret must be set"))
184    ///         } else {
185    ///             Ok(())
186    ///         }
187    ///     })
188    ///     .build()?
189    /// ```
190    pub fn mount_guarded<C: Controller, G>(self, state: Arc<C::State>, guard: G) -> Self
191    where
192        G: FnOnce() -> Result<()>,
193    {
194        Self(self.0.and_then(|router| {
195            guard()?;
196            C::mount(state)(router)
197        }))
198    }
199
200    // -----------------------------------------------------------------------
201    // Collection operations
202    // -----------------------------------------------------------------------
203
204    /// Catamorphism (fold): apply a dynamic, ordered collection of fallible
205    /// `Router -> Result<Router>` steps, left-to-right.
206    ///
207    /// Short-circuits on the first error. Replaces imperative `for` loops
208    /// when the set of pipeline steps is known only at runtime.
209    ///
210    /// ```ignore
211    /// let steps: Vec<Box<dyn FnOnce(Router<()>) -> Result<Router<()>>>> = vec![
212    ///     Box::new(HealthController::mount(health_svc)),
213    ///     Box::new(EchoController::mount(echo_svc)),
214    /// ];
215    ///
216    /// RouterPipeline::new().fold(steps).build()?
217    /// ```
218    pub fn fold<I, F>(self, steps: I) -> Self
219    where
220        I: IntoIterator<Item = F>,
221        F: FnOnce(Router<()>) -> Result<Router<()>>,
222    {
223        steps.into_iter().fold(self, |p, step| p.and_then(step))
224    }
225
226    /// Apply a dynamic collection of infallible `Router -> Router` transforms,
227    /// left-to-right (fold over `map`).
228    ///
229    /// Each item is a [`RouterTransform`] (`Box<dyn FnOnce(Router<()>) ->
230    /// Router<()>>`) so heterogeneous transforms (different layer types)
231    /// can coexist in one collection. For a small, static set of layers,
232    /// chaining `.map()` is cleaner.
233    ///
234    /// ```ignore
235    /// let transforms: Vec<RouterTransform> = vec![
236    ///     Box::new(|r| r.layer(TraceLayer::new_for_http())),
237    ///     Box::new(|r| r.layer(CorsLayer::permissive())),
238    /// ];
239    ///
240    /// RouterPipeline::new()
241    ///     .mount::<HealthController>(svc)
242    ///     .layer_all(transforms)
243    ///     .build()?
244    /// ```
245    pub fn layer_all(self, transforms: impl IntoIterator<Item = RouterTransform>) -> Self {
246        transforms.into_iter().fold(self, |p, f| p.map(f))
247    }
248
249    /// Run a sub-pipeline and nest all of its routes under `prefix`.
250    ///
251    /// All controllers and routes registered inside the closure `f` will have
252    /// `prefix` prepended to their paths before being merged into the outer
253    /// router. This is the scoped functor: mapping a prefix transformation
254    /// over an enclosed group of routes.
255    ///
256    /// ```ignore
257    /// RouterPipeline::new()
258    ///     .group("/api/v1", |g| g
259    ///         .mount::<HealthController>(health_svc)
260    ///         .mount::<EchoController>(echo_svc)
261    ///     )
262    ///     .group("/internal", |g| g
263    ///         .mount_if::<MetricsController>(config.enable_metrics, metrics_svc)
264    ///     )
265    ///     .build()?
266    /// ```
267    pub fn group<F>(self, prefix: &str, f: F) -> Self
268    where
269        F: FnOnce(RouterPipeline) -> RouterPipeline,
270    {
271        let prefix = prefix.to_owned();
272        self.and_then(move |outer| {
273            let inner = f(RouterPipeline::new()).build()?;
274            Ok(outer.merge(Router::new().nest(&prefix, inner)))
275        })
276    }
277}
278
279impl Default for RouterPipeline {
280    fn default() -> Self {
281        Self::new()
282    }
283}
284
285// ---------------------------------------------------------------------------
286// Tests
287// ---------------------------------------------------------------------------
288
289#[cfg(test)]
290mod tests {
291    use axum::{body::Body, http::Request, routing::get};
292    use tower::ServiceExt;
293
294    use super::*;
295    use crate::{controller::Controller, error::Result, router::Router};
296
297    // -----------------------------------------------------------------------
298    // Minimal test controller — state is `()`, handler returns a static string.
299    // Manually implements `Controller` so the test module has no external deps.
300    // -----------------------------------------------------------------------
301
302    struct PingController;
303
304    impl Controller for PingController {
305        type State = ();
306        fn mount(state: Arc<Self::State>) -> impl FnOnce(Router<()>) -> Result<Router<()>> {
307            move |router| {
308                let scoped: Router<Arc<()>> =
309                    Router::new().route("/ping", get(|| async { "pong" }));
310                Ok(router.merge(scoped.with_state(state)))
311            }
312        }
313    }
314
315    fn ping_state() -> Arc<()> {
316        Arc::new(())
317    }
318
319    async fn status(app: Router<()>, uri: &str) -> u16 {
320        app.oneshot(Request::builder().uri(uri).body(Body::empty()).unwrap())
321            .await
322            .unwrap()
323            .status()
324            .as_u16()
325    }
326
327    // -----------------------------------------------------------------------
328    // mount_guarded
329    // -----------------------------------------------------------------------
330
331    #[test]
332    fn mount_guarded_short_circuits_on_err_guard() {
333        let result = RouterPipeline::new()
334            .mount_guarded::<PingController, _>(ping_state(), || {
335                Err(crate::error::Error::other("guard failed"))
336            })
337            .build();
338
339        assert!(
340            result.is_err(),
341            "build() should return Err when guard fails"
342        );
343    }
344
345    #[tokio::test]
346    async fn mount_guarded_registers_route_on_ok_guard() {
347        let app = RouterPipeline::new()
348            .mount_guarded::<PingController, _>(ping_state(), || Ok(()))
349            .build()
350            .expect("build should succeed when guard passes");
351
352        assert_eq!(status(app, "/ping").await, 200);
353    }
354
355    // -----------------------------------------------------------------------
356    // mount_if
357    // -----------------------------------------------------------------------
358
359    #[tokio::test]
360    async fn mount_if_false_route_returns_404() {
361        let app = RouterPipeline::new()
362            .mount_if::<PingController>(false, ping_state())
363            .build()
364            .expect("build should succeed even when mount_if is false");
365
366        assert_eq!(status(app, "/ping").await, 404);
367    }
368
369    #[tokio::test]
370    async fn mount_if_true_route_returns_200() {
371        let app = RouterPipeline::new()
372            .mount_if::<PingController>(true, ping_state())
373            .build()
374            .expect("build should succeed when mount_if is true");
375
376        assert_eq!(status(app, "/ping").await, 200);
377    }
378
379    // -----------------------------------------------------------------------
380    // group prefix
381    // -----------------------------------------------------------------------
382
383    #[tokio::test]
384    async fn group_prefix_is_applied_to_routes() {
385        let app = RouterPipeline::new()
386            .group("/v1", |g| g.mount::<PingController>(ping_state()))
387            .build()
388            .expect("build should succeed");
389
390        assert_eq!(
391            status(app.clone(), "/v1/ping").await,
392            200,
393            "/v1/ping should be 200"
394        );
395        assert_eq!(
396            status(app, "/ping").await,
397            404,
398            "/ping without prefix should be 404"
399        );
400    }
401
402    // -----------------------------------------------------------------------
403    // Error propagation
404    // -----------------------------------------------------------------------
405
406    #[test]
407    fn error_from_and_then_propagates_through_remaining_steps() {
408        let result = RouterPipeline::new()
409            .and_then(|_| Err(crate::error::Error::other("intentional failure")))
410            .mount::<PingController>(ping_state()) // should never run
411            .build();
412
413        assert!(
414            result.is_err(),
415            "error should propagate through the rest of the pipeline"
416        );
417    }
418}