yeti_types/plugins/request.rs
1//! HTTP-shaped per-request work and the in-pipeline `Context` service contract.
2//!
3//! Two request types coexist here, intentionally:
4//!
5//! * [`YetiRequest`] / [`YetiResponse`] — the **protocol-edge** shape
6//! (`http::Request<Bytes>` / `http::Response<Bytes>`). Every transport
7//! (HTTP, MQTT, MCP) translates its inbound payload into this, runs
8//! edge-level Services (e.g. `tower-http` CORS / compression / tracing),
9//! and translates the response back. Plugins targeting raw HTTP
10//! edge concerns implement `Service<YetiRequest>`.
11//!
12//! * [`ContextService`] — the **internal pipeline** shape. Once the
13//! router has resolved app/resource/path-id and constructed a
14//! [`Context`], every downstream layer (auth resolution, rate
15//! limiting, table dispatch) is a
16//! `Service<Context, Response = Context, Error = YetiError>`.
17//!
18//! `Context` is `Clone`: every field is `Bytes` / `Arc<str>` /
19//! `Arc<dyn Trait>` / small map, so cloning is cheap. With `Clone`,
20//! `Context` flows through `tower::Service<YetiRequest>` chains via
21//! `http::Request::extensions` (the standard tower-http pattern).
22
23use bytes::Bytes;
24use http::{Request, Response};
25use tower::util::BoxCloneSyncService;
26
27use crate::error::{Result, YetiError};
28use crate::resource::Context;
29
30/// The canonical Yeti request — `http::Request<Bytes>` — for protocol-edge
31/// services. The router peels this apart into a [`Context`] before invoking
32/// the middleware pipeline.
33pub type YetiRequest = Request<Bytes>;
34
35/// The canonical Yeti response — `http::Response<Bytes>`.
36pub type YetiResponse = Response<Bytes>;
37
38/// The middleware-pipeline service type. Each registered service consumes
39/// a [`Context`] and returns a (possibly mutated) [`Context`], or fails
40/// with [`YetiError`].
41///
42/// This is the Tower-native replacement for the legacy
43/// `RequestMiddleware` trait. Build one with `tower::service_fn`:
44///
45/// ```ignore
46/// use yeti_types::plugins::{ContextService, service_fn};
47/// use tower::util::BoxCloneSyncService;
48///
49/// let svc: ContextService = BoxCloneSyncService::new(service_fn(|mut ctx: Context| async move {
50/// ctx.set_access(/* ... */);
51/// Ok::<_, YetiError>(ctx)
52/// }));
53/// ```
54///
55/// Uses `BoxCloneSyncService` (rather than plain `BoxCloneService`)
56/// so the type is `Send + Sync` — the router stores its frozen
57/// snapshot inside `Arc<[ContextService]>` and the surrounding
58/// `AutoRouter` is shared across threads.
59pub type ContextService = BoxCloneSyncService<Context, Context, YetiError>;
60
61/// Sequential runner for a chain of [`ContextService`]s. The pipeline
62/// owns the services and threads a `Context` through them one after
63/// another, short-circuiting on the first error.
64///
65/// This replaces the legacy `MiddlewareRegistry::process` loop.
66pub struct RequestPipeline {
67 services: Vec<ContextService>,
68}
69
70impl Default for RequestPipeline {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl std::fmt::Debug for RequestPipeline {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("RequestPipeline")
79 .field("len", &self.services.len())
80 .finish_non_exhaustive()
81 }
82}
83
84impl RequestPipeline {
85 /// Create an empty pipeline.
86 #[must_use]
87 pub const fn new() -> Self {
88 Self {
89 services: Vec::new(),
90 }
91 }
92
93 /// Build a pipeline from a vec of services.
94 #[must_use]
95 pub const fn from_services(services: Vec<ContextService>) -> Self {
96 Self { services }
97 }
98
99 /// Push a service onto the end of the pipeline.
100 pub fn push(&mut self, svc: ContextService) {
101 self.services.push(svc);
102 }
103
104 /// Number of registered services.
105 #[must_use]
106 pub const fn len(&self) -> usize {
107 self.services.len()
108 }
109
110 /// Whether the pipeline has no services registered.
111 #[must_use]
112 pub const fn is_empty(&self) -> bool {
113 self.services.is_empty()
114 }
115
116 /// Run every service in registration order, threading the `Context`
117 /// through. Returns the final `Context` once the chain completes,
118 /// or the first `YetiError`.
119 ///
120 /// # Errors
121 ///
122 /// Surfaces the first `Err` returned by any registered service.
123 /// Subsequent services are skipped.
124 pub async fn run(&mut self, mut ctx: Context) -> Result<Context> {
125 use tower::{Service, ServiceExt};
126 for svc in &mut self.services {
127 ctx = svc.ready().await?.call(ctx).await?;
128 }
129 Ok(ctx)
130 }
131
132 /// Borrow the underlying service vector (for snapshots / cloning).
133 #[must_use]
134 pub fn services(&self) -> &[ContextService] {
135 &self.services
136 }
137
138 /// Compose every registered service into a single `ContextService`.
139 /// Per-request the host clones the composed service (one `Arc`
140 /// bump) and calls it once — versus the unfrozen path which clones
141 /// every service per request and allocates a `Vec`. Used by the
142 /// router's `freeze_middleware` to build the hot-path snapshot.
143 #[must_use]
144 pub fn into_composed(self) -> ContextService {
145 compose(self.services)
146 }
147}
148
149/// Compose a slice of `ContextService`s into a single `ContextService`
150/// that runs them in registration order. The composed service is
151/// `BoxCloneSyncService`, so cloning it is one `Arc` bump regardless
152/// of chain length. Each underlying service still requires a per-call
153/// `clone` (to satisfy `&mut self`), but the outer container is shared
154/// — so the per-request work scales with chain length only inside the
155/// `Future`, not at the boundary.
156#[must_use]
157pub fn compose(services: Vec<ContextService>) -> ContextService {
158 let services: std::sync::Arc<[ContextService]> = services.into();
159 let services_for_fn = std::sync::Arc::clone(&services);
160 BoxCloneSyncService::new(tower::service_fn(move |ctx: Context| {
161 let services = std::sync::Arc::clone(&services_for_fn);
162 async move {
163 use tower::{Service, ServiceExt};
164 let mut current = ctx;
165 for svc in services.iter() {
166 let mut svc = svc.clone();
167 current = svc.ready().await?.call(current).await?;
168 }
169 Ok::<Context, YetiError>(current)
170 }
171 }))
172}