Skip to main content

yeti_types/plugins/
request.rs

1//! HTTP-shaped per-request work and the in-pipeline `Context` service contract.
2//!
3//! Two request types coexist here, intentionally:
4//!
5//!  * [`YetiRequest`] / [`YetiResponse`] — the **protocol-edge** shape
6//!    (`http::Request<Bytes>` / `http::Response<Bytes>`). Every transport
7//!    (HTTP, MQTT, MCP) translates its inbound payload into this, runs
8//!    edge-level Services (e.g. `tower-http` CORS / compression / tracing),
9//!    and translates the response back. Plugins targeting raw HTTP
10//!    edge concerns implement `Service<YetiRequest>`.
11//!
12//!  * [`ContextService`] — the **internal pipeline** shape. Once the
13//!    router has resolved app/resource/path-id and constructed a
14//!    [`Context`], every downstream layer (auth resolution, rate
15//!    limiting, table dispatch) is a
16//!    `Service<Context, Response = Context, Error = YetiError>`.
17//!
18//! `Context` is `Clone`: every field is `Bytes` / `Arc<str>` /
19//! `Arc<dyn Trait>` / small map, so cloning is cheap. With `Clone`,
20//! `Context` flows through `tower::Service<YetiRequest>` chains via
21//! `http::Request::extensions` (the standard tower-http pattern).
22
23use bytes::Bytes;
24use http::{Request, Response};
25use tower::util::BoxCloneSyncService;
26
27use crate::error::{Result, YetiError};
28use crate::resource::Context;
29
30/// The canonical Yeti request — `http::Request<Bytes>` — for protocol-edge
31/// services. The router peels this apart into a [`Context`] before invoking
32/// the middleware pipeline.
33pub type YetiRequest = Request<Bytes>;
34
35/// The canonical Yeti response — `http::Response<Bytes>`.
36pub type YetiResponse = Response<Bytes>;
37
38/// The middleware-pipeline service type. Each registered service consumes
39/// a [`Context`] and returns a (possibly mutated) [`Context`], or fails
40/// with [`YetiError`].
41///
42/// This is the Tower-native replacement for the legacy
43/// `RequestMiddleware` trait. Build one with `tower::service_fn`:
44///
45/// ```ignore
46/// use yeti_types::plugins::{ContextService, service_fn};
47/// use tower::util::BoxCloneSyncService;
48///
49/// let svc: ContextService = BoxCloneSyncService::new(service_fn(|mut ctx: Context| async move {
50///     ctx.set_access(/* ... */);
51///     Ok::<_, YetiError>(ctx)
52/// }));
53/// ```
54///
55/// Uses `BoxCloneSyncService` (rather than plain `BoxCloneService`)
56/// so the type is `Send + Sync` — the router stores its frozen
57/// snapshot inside `Arc<[ContextService]>` and the surrounding
58/// `AutoRouter` is shared across threads.
59pub type ContextService = BoxCloneSyncService<Context, Context, YetiError>;
60
61/// Sequential runner for a chain of [`ContextService`]s. The pipeline
62/// owns the services and threads a `Context` through them one after
63/// another, short-circuiting on the first error.
64///
65/// This replaces the legacy `MiddlewareRegistry::process` loop.
66pub struct RequestPipeline {
67    services: Vec<ContextService>,
68}
69
70impl Default for RequestPipeline {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl std::fmt::Debug for RequestPipeline {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("RequestPipeline")
79            .field("len", &self.services.len())
80            .finish_non_exhaustive()
81    }
82}
83
84impl RequestPipeline {
85    /// Create an empty pipeline.
86    #[must_use]
87    pub const fn new() -> Self {
88        Self {
89            services: Vec::new(),
90        }
91    }
92
93    /// Build a pipeline from a vec of services.
94    #[must_use]
95    pub const fn from_services(services: Vec<ContextService>) -> Self {
96        Self { services }
97    }
98
99    /// Push a service onto the end of the pipeline.
100    pub fn push(&mut self, svc: ContextService) {
101        self.services.push(svc);
102    }
103
104    /// Number of registered services.
105    #[must_use]
106    pub const fn len(&self) -> usize {
107        self.services.len()
108    }
109
110    /// Whether the pipeline has no services registered.
111    #[must_use]
112    pub const fn is_empty(&self) -> bool {
113        self.services.is_empty()
114    }
115
116    /// Run every service in registration order, threading the `Context`
117    /// through. Returns the final `Context` once the chain completes,
118    /// or the first `YetiError`.
119    ///
120    /// # Errors
121    ///
122    /// Surfaces the first `Err` returned by any registered service.
123    /// Subsequent services are skipped.
124    pub async fn run(&mut self, mut ctx: Context) -> Result<Context> {
125        use tower::{Service, ServiceExt};
126        for svc in &mut self.services {
127            ctx = svc.ready().await?.call(ctx).await?;
128        }
129        Ok(ctx)
130    }
131
132    /// Borrow the underlying service vector (for snapshots / cloning).
133    #[must_use]
134    pub fn services(&self) -> &[ContextService] {
135        &self.services
136    }
137
138    /// Compose every registered service into a single `ContextService`.
139    /// Per-request the host clones the composed service (one `Arc`
140    /// bump) and calls it once — versus the unfrozen path which clones
141    /// every service per request and allocates a `Vec`. Used by the
142    /// router's `freeze_middleware` to build the hot-path snapshot.
143    #[must_use]
144    pub fn into_composed(self) -> ContextService {
145        compose(self.services)
146    }
147}
148
149/// Compose a slice of `ContextService`s into a single `ContextService`
150/// that runs them in registration order. The composed service is
151/// `BoxCloneSyncService`, so cloning it is one `Arc` bump regardless
152/// of chain length. Each underlying service still requires a per-call
153/// `clone` (to satisfy `&mut self`), but the outer container is shared
154/// — so the per-request work scales with chain length only inside the
155/// `Future`, not at the boundary.
156#[must_use]
157pub fn compose(services: Vec<ContextService>) -> ContextService {
158    let services: std::sync::Arc<[ContextService]> = services.into();
159    let services_for_fn = std::sync::Arc::clone(&services);
160    BoxCloneSyncService::new(tower::service_fn(move |ctx: Context| {
161        let services = std::sync::Arc::clone(&services_for_fn);
162        async move {
163            use tower::{Service, ServiceExt};
164            let mut current = ctx;
165            for svc in services.iter() {
166                let mut svc = svc.clone();
167                current = svc.ready().await?.call(current).await?;
168            }
169            Ok::<Context, YetiError>(current)
170        }
171    }))
172}