Skip to main content

apigate_core/
pipeline.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use axum::body::Body;
6use http::Extensions;
7
8use crate::PartsCtx;
9use crate::error::ApigateError;
10
11// ---------------------------------------------------------------------------
12// RequestScope
13// ---------------------------------------------------------------------------
14
15/// Owns the request body and extracted data for a single pipeline invocation.
16///
17/// App-level state lives in a shared `Arc<Extensions>` (zero-copy per request).
18/// Per-request data (path params, hook insertions) goes into a local `Extensions`
19/// that starts empty and allocates only on first `insert`.
20pub struct RequestScope {
21    shared: Arc<Extensions>,
22    local: Extensions,
23    body: Option<Body>,
24    body_limit: usize,
25}
26
27impl RequestScope {
28    pub fn new(body: Body, body_limit: usize) -> Self {
29        Self {
30            shared: Arc::new(Extensions::new()),
31            local: Extensions::new(),
32            body: Some(body),
33            body_limit,
34        }
35    }
36
37    pub(crate) fn with_shared(shared: Arc<Extensions>, body: Body, body_limit: usize) -> Self {
38        Self {
39            shared,
40            local: Extensions::new(),
41            body: Some(body),
42            body_limit,
43        }
44    }
45
46    pub fn take_body(&mut self) -> Option<Body> {
47        self.body.take()
48    }
49
50    pub fn body_limit(&self) -> usize {
51        self.body_limit
52    }
53
54    /// Returns a shared reference to `T`.
55    /// Checks local (per-request) extensions first, then shared (app) state.
56    pub fn get<T: Send + Sync + 'static>(&self) -> Option<&T> {
57        self.local.get::<T>().or_else(|| self.shared.get::<T>())
58    }
59
60    /// Returns a mutable reference to `T` from local (per-request) extensions only.
61    pub fn get_mut<T: Send + Sync + 'static>(&mut self) -> Option<&mut T> {
62        self.local.get_mut::<T>()
63    }
64
65    /// Inserts a value into per-request (local) extensions.
66    pub fn insert<T: Clone + Send + Sync + 'static>(&mut self, val: T) {
67        self.local.insert(val);
68    }
69
70    /// Takes a value from local extensions first; if absent, clones from shared state.
71    pub fn take<T: Clone + Send + Sync + 'static>(&mut self) -> Option<T> {
72        self.local
73            .remove::<T>()
74            .or_else(|| self.shared.get::<T>().cloned())
75    }
76}
77
78// ---------------------------------------------------------------------------
79// Pipeline types
80// ---------------------------------------------------------------------------
81
82/// Single function that orchestrates all request processing:
83/// parse path params → before hooks → validate/parse body → map → return body.
84pub type PipelineFn = for<'a> fn(PartsCtx<'a>, RequestScope) -> PipelineFuture<'a>;
85pub type PipelineFuture<'a> = Pin<Box<dyn Future<Output = PipelineResult> + Send + 'a>>;
86pub type PipelineResult = Result<Body, ApigateError>;
87
88pub type HookResult = Result<(), ApigateError>;
89pub type MapResult<T> = Result<T, ApigateError>;