Skip to main content

fastapi_core/
request.rs

1//! HTTP request types.
2
3use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::fmt;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Mutex;
9
10// Re-export Method from fastapi-types
11pub use fastapi_types::Method;
12
13use asupersync::stream::Stream;
14
15/// Error yielded by streaming request bodies.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum RequestBodyStreamError {
18    /// The body exceeded the configured maximum size.
19    TooLarge { received: usize, max: usize },
20    /// The connection closed before the full body was read.
21    ConnectionClosed,
22    /// An I/O error occurred while reading the body.
23    Io(String),
24}
25
26impl fmt::Display for RequestBodyStreamError {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        match self {
29            Self::TooLarge { received, max } => write!(
30                f,
31                "request body too large: received {received} bytes (max {max})"
32            ),
33            Self::ConnectionClosed => write!(f, "connection closed while reading request body"),
34            Self::Io(e) => write!(f, "I/O error while reading request body: {e}"),
35        }
36    }
37}
38
39impl std::error::Error for RequestBodyStreamError {}
40
41/// Streamed request body type (yields chunks or a streaming error).
42pub type RequestBodyStream =
43    Pin<Box<dyn Stream<Item = Result<Vec<u8>, RequestBodyStreamError>> + Send>>;
44
45/// HTTP protocol version.
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub enum HttpVersion {
48    /// HTTP/1.0
49    Http10,
50    /// HTTP/1.1
51    Http11,
52    /// HTTP/2
53    Http2,
54}
55
56impl HttpVersion {
57    /// Parse an HTTP version string like "HTTP/1.1".
58    #[must_use]
59    pub fn parse(s: &str) -> Option<Self> {
60        match s {
61            "HTTP/1.0" => Some(Self::Http10),
62            "HTTP/1.1" => Some(Self::Http11),
63            "HTTP/2" | "HTTP/2.0" => Some(Self::Http2),
64            _ => None,
65        }
66    }
67}
68
69/// Connection metadata supplied by the server (or test harness).
70///
71/// The core request type does not inherently know whether it arrived over TLS;
72/// servers should insert this as an extension when that information is available.
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub struct ConnectionInfo {
75    /// True if the connection is using TLS (HTTPS).
76    pub is_tls: bool,
77}
78
79impl ConnectionInfo {
80    /// Plain HTTP connection.
81    #[allow(dead_code)]
82    pub const HTTP: Self = Self { is_tls: false };
83    /// HTTPS connection (TLS).
84    #[allow(dead_code)]
85    pub const HTTPS: Self = Self { is_tls: true };
86}
87
88/// HTTP headers collection.
89#[derive(Debug, Default)]
90pub struct Headers {
91    inner: HashMap<String, Vec<u8>>,
92}
93
94impl Headers {
95    /// Create empty headers.
96    #[must_use]
97    pub fn new() -> Self {
98        Self::default()
99    }
100
101    /// Get a header value by name (case-insensitive).
102    #[must_use]
103    pub fn get(&self, name: &str) -> Option<&[u8]> {
104        self.inner
105            .get(&name.to_ascii_lowercase())
106            .map(Vec::as_slice)
107    }
108
109    /// Returns true if a header is present (case-insensitive).
110    #[must_use]
111    pub fn contains(&self, name: &str) -> bool {
112        self.inner.contains_key(&name.to_ascii_lowercase())
113    }
114
115    /// Insert a header.
116    pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Vec<u8>>) {
117        self.inner
118            .insert(name.into().to_ascii_lowercase(), value.into());
119    }
120
121    /// Insert a header from borrowed name/value slices.
122    ///
123    /// This is a convenience for parsers that already have `&str`/`&[u8]` and want
124    /// to avoid constructing intermediate owned buffers.
125    pub fn insert_from_slice(&mut self, name: &str, value: &[u8]) {
126        self.inner.insert(name.to_ascii_lowercase(), value.to_vec());
127    }
128
129    /// Remove a header value by name (case-insensitive).
130    pub fn remove(&mut self, name: &str) -> Option<Vec<u8>> {
131        self.inner.remove(&name.to_ascii_lowercase())
132    }
133
134    /// Iterate over all headers as (name, value) pairs.
135    pub fn iter(&self) -> impl Iterator<Item = (&str, &[u8])> {
136        self.inner
137            .iter()
138            .map(|(name, value)| (name.as_str(), value.as_slice()))
139    }
140
141    /// Returns the number of headers.
142    #[must_use]
143    pub fn len(&self) -> usize {
144        self.inner.len()
145    }
146
147    /// Returns true if there are no headers.
148    #[must_use]
149    pub fn is_empty(&self) -> bool {
150        self.inner.is_empty()
151    }
152}
153
154/// Request body.
155pub enum Body {
156    /// Empty body.
157    Empty,
158    /// Bytes body.
159    Bytes(Vec<u8>),
160    /// Streaming body, optionally with a known content length.
161    Stream {
162        /// Streamed chunks.
163        stream: Mutex<RequestBodyStream>,
164        /// Known content length, if available.
165        content_length: Option<usize>,
166    },
167}
168
169impl fmt::Debug for Body {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171        match self {
172            Self::Empty => f.debug_tuple("Empty").finish(),
173            Self::Bytes(b) => f.debug_tuple("Bytes").field(b).finish(),
174            Self::Stream { content_length, .. } => f
175                .debug_struct("Stream")
176                .field("content_length", content_length)
177                .finish(),
178        }
179    }
180}
181
182impl Body {
183    /// Create a streaming body.
184    #[must_use]
185    pub fn streaming<S>(stream: S) -> Self
186    where
187        S: Stream<Item = Result<Vec<u8>, RequestBodyStreamError>> + Send + 'static,
188    {
189        Self::Stream {
190            stream: Mutex::new(Box::pin(stream)),
191            content_length: None,
192        }
193    }
194
195    /// Create a streaming body with a known content length.
196    #[must_use]
197    pub fn streaming_with_size<S>(stream: S, content_length: usize) -> Self
198    where
199        S: Stream<Item = Result<Vec<u8>, RequestBodyStreamError>> + Send + 'static,
200    {
201        Self::Stream {
202            stream: Mutex::new(Box::pin(stream)),
203            content_length: Some(content_length),
204        }
205    }
206
207    /// Get body as bytes, consuming it.
208    ///
209    /// Note: streaming bodies cannot be synchronously collected; this returns
210    /// an empty vector for `Body::Stream`.
211    #[must_use]
212    pub fn into_bytes(self) -> Vec<u8> {
213        match self {
214            Self::Empty => Vec::new(),
215            Self::Bytes(b) => b,
216            Self::Stream { .. } => Vec::new(),
217        }
218    }
219
220    /// Check if body is empty.
221    #[must_use]
222    pub fn is_empty(&self) -> bool {
223        matches!(self, Self::Empty)
224            || matches!(self, Self::Bytes(b) if b.is_empty())
225            || matches!(
226                self,
227                Self::Stream {
228                    content_length: Some(0),
229                    ..
230                }
231            )
232    }
233
234    /// Take ownership of the inner stream, if this is a streaming body.
235    pub fn into_stream(self) -> Option<(RequestBodyStream, Option<usize>)> {
236        match self {
237            Self::Stream {
238                stream,
239                content_length,
240            } => Some((
241                stream.into_inner().unwrap_or_else(|e| e.into_inner()),
242                content_length,
243            )),
244            _ => None,
245        }
246    }
247}
248
249/// Request-scoped background tasks to execute after the response is sent.
250///
251/// This is inspired by FastAPI's `BackgroundTasks`. Handlers can enqueue work
252/// that is executed by the server after the main response completes.
253pub type BackgroundTasksInner = Mutex<Vec<Pin<Box<dyn Future<Output = ()> + Send>>>>;
254
255pub struct BackgroundTasks {
256    tasks: BackgroundTasksInner,
257}
258
259impl fmt::Debug for BackgroundTasks {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        f.debug_struct("BackgroundTasks").finish_non_exhaustive()
262    }
263}
264
265impl BackgroundTasks {
266    /// Create an empty background task set.
267    #[must_use]
268    pub fn new() -> Self {
269        Self {
270            tasks: Mutex::new(Vec::new()),
271        }
272    }
273
274    /// Add a synchronous task to run after the response is written.
275    ///
276    /// This matches FastAPI's `BackgroundTasks.add_task(...)` UX: you enqueue work
277    /// and the server runs it after the response is sent.
278    pub fn add<F>(&self, f: F)
279    where
280        F: FnOnce() + Send + 'static,
281    {
282        self.add_async(async move { f() });
283    }
284
285    /// Add an async task (future) to run after the response is written.
286    pub fn add_async<Fut>(&self, fut: Fut)
287    where
288        Fut: Future<Output = ()> + Send + 'static,
289    {
290        let mut guard = self
291            .tasks
292            .lock()
293            .unwrap_or_else(std::sync::PoisonError::into_inner);
294        guard.push(Box::pin(fut));
295    }
296
297    /// Execute all background tasks sequentially.
298    pub async fn execute_all(self) {
299        let tasks = self
300            .tasks
301            .into_inner()
302            .unwrap_or_else(std::sync::PoisonError::into_inner);
303        for t in tasks {
304            t.await;
305        }
306    }
307}
308
309impl Default for BackgroundTasks {
310    fn default() -> Self {
311        Self::new()
312    }
313}
314
315/// HTTP request.
316#[derive(Debug)]
317pub struct Request {
318    method: Method,
319    version: HttpVersion,
320    path: String,
321    query: Option<String>,
322    headers: Headers,
323    body: Body,
324    // Extensions for middleware/extractors
325    #[allow(dead_code)] // Used in future implementation
326    extensions: HashMap<std::any::TypeId, Box<dyn std::any::Any + Send + Sync>>,
327}
328
329impl Request {
330    /// Create a new request.
331    #[must_use]
332    pub fn new(method: Method, path: impl Into<String>) -> Self {
333        Self {
334            method,
335            version: HttpVersion::Http11,
336            path: path.into(),
337            query: None,
338            headers: Headers::new(),
339            body: Body::Empty,
340            extensions: HashMap::new(),
341        }
342    }
343
344    /// Create a new request with an explicit HTTP version.
345    #[must_use]
346    pub fn with_version(method: Method, path: impl Into<String>, version: HttpVersion) -> Self {
347        let mut req = Self::new(method, path);
348        req.version = version;
349        req
350    }
351
352    /// Get the HTTP method.
353    #[must_use]
354    pub fn method(&self) -> Method {
355        self.method
356    }
357
358    /// Get the HTTP version.
359    #[must_use]
360    pub fn version(&self) -> HttpVersion {
361        self.version
362    }
363
364    /// Set the HTTP version.
365    pub fn set_version(&mut self, version: HttpVersion) {
366        self.version = version;
367    }
368
369    /// Get the request path.
370    #[must_use]
371    pub fn path(&self) -> &str {
372        &self.path
373    }
374
375    /// Get the query string.
376    #[must_use]
377    pub fn query(&self) -> Option<&str> {
378        self.query.as_deref()
379    }
380
381    /// Get the headers.
382    #[must_use]
383    pub fn headers(&self) -> &Headers {
384        &self.headers
385    }
386
387    /// Get mutable headers.
388    pub fn headers_mut(&mut self) -> &mut Headers {
389        &mut self.headers
390    }
391
392    /// Get the body.
393    #[must_use]
394    pub fn body(&self) -> &Body {
395        &self.body
396    }
397
398    /// Take the body, replacing with Empty.
399    pub fn take_body(&mut self) -> Body {
400        std::mem::replace(&mut self.body, Body::Empty)
401    }
402
403    /// Set the body.
404    pub fn set_body(&mut self, body: Body) {
405        self.body = body;
406    }
407
408    /// Set the query string.
409    pub fn set_query(&mut self, query: Option<String>) {
410        self.query = query;
411    }
412
413    /// Insert a typed extension value.
414    pub fn insert_extension<T: Any + Send + Sync>(&mut self, value: T) {
415        self.extensions.insert(TypeId::of::<T>(), Box::new(value));
416    }
417
418    /// Get a typed extension value.
419    #[must_use]
420    pub fn get_extension<T: Any + Send + Sync>(&self) -> Option<&T> {
421        self.extensions
422            .get(&TypeId::of::<T>())
423            .and_then(|boxed| boxed.downcast_ref::<T>())
424    }
425
426    /// Get a mutable typed extension value.
427    pub fn get_extension_mut<T: Any + Send + Sync>(&mut self) -> Option<&mut T> {
428        self.extensions
429            .get_mut(&TypeId::of::<T>())
430            .and_then(|boxed| boxed.downcast_mut::<T>())
431    }
432
433    /// Remove and return a typed extension value.
434    pub fn take_extension<T: Any + Send + Sync>(&mut self) -> Option<T> {
435        self.extensions
436            .remove(&TypeId::of::<T>())
437            .and_then(|boxed| boxed.downcast::<T>().ok())
438            .map(|boxed| *boxed)
439    }
440
441    /// Access (and lazily create) the request-scoped background tasks container.
442    pub fn background_tasks(&mut self) -> &BackgroundTasks {
443        if !self
444            .extensions
445            .contains_key(&TypeId::of::<BackgroundTasks>())
446        {
447            self.insert_extension(BackgroundTasks::new());
448        }
449        self.get_extension::<BackgroundTasks>()
450            .expect("BackgroundTasks extension should exist")
451    }
452}