1use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::fmt;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Mutex;
9
10pub use fastapi_types::Method;
12
13use asupersync::stream::Stream;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum RequestBodyStreamError {
18 TooLarge { received: usize, max: usize },
20 ConnectionClosed,
22 Io(String),
24}
25
26impl fmt::Display for RequestBodyStreamError {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 match self {
29 Self::TooLarge { received, max } => write!(
30 f,
31 "request body too large: received {received} bytes (max {max})"
32 ),
33 Self::ConnectionClosed => write!(f, "connection closed while reading request body"),
34 Self::Io(e) => write!(f, "I/O error while reading request body: {e}"),
35 }
36 }
37}
38
39impl std::error::Error for RequestBodyStreamError {}
40
41pub type RequestBodyStream =
43 Pin<Box<dyn Stream<Item = Result<Vec<u8>, RequestBodyStreamError>> + Send>>;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub enum HttpVersion {
48 Http10,
50 Http11,
52 Http2,
54}
55
56impl HttpVersion {
57 #[must_use]
59 pub fn parse(s: &str) -> Option<Self> {
60 match s {
61 "HTTP/1.0" => Some(Self::Http10),
62 "HTTP/1.1" => Some(Self::Http11),
63 "HTTP/2" | "HTTP/2.0" => Some(Self::Http2),
64 _ => None,
65 }
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub struct ConnectionInfo {
75 pub is_tls: bool,
77}
78
79impl ConnectionInfo {
80 #[allow(dead_code)]
82 pub const HTTP: Self = Self { is_tls: false };
83 #[allow(dead_code)]
85 pub const HTTPS: Self = Self { is_tls: true };
86}
87
88#[derive(Debug, Default)]
90pub struct Headers {
91 inner: HashMap<String, Vec<u8>>,
92}
93
94impl Headers {
95 #[must_use]
97 pub fn new() -> Self {
98 Self::default()
99 }
100
101 #[must_use]
103 pub fn get(&self, name: &str) -> Option<&[u8]> {
104 self.inner
105 .get(&name.to_ascii_lowercase())
106 .map(Vec::as_slice)
107 }
108
109 #[must_use]
111 pub fn contains(&self, name: &str) -> bool {
112 self.inner.contains_key(&name.to_ascii_lowercase())
113 }
114
115 pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Vec<u8>>) {
117 self.inner
118 .insert(name.into().to_ascii_lowercase(), value.into());
119 }
120
121 pub fn insert_from_slice(&mut self, name: &str, value: &[u8]) {
126 self.inner.insert(name.to_ascii_lowercase(), value.to_vec());
127 }
128
129 pub fn remove(&mut self, name: &str) -> Option<Vec<u8>> {
131 self.inner.remove(&name.to_ascii_lowercase())
132 }
133
134 pub fn iter(&self) -> impl Iterator<Item = (&str, &[u8])> {
136 self.inner
137 .iter()
138 .map(|(name, value)| (name.as_str(), value.as_slice()))
139 }
140
141 #[must_use]
143 pub fn len(&self) -> usize {
144 self.inner.len()
145 }
146
147 #[must_use]
149 pub fn is_empty(&self) -> bool {
150 self.inner.is_empty()
151 }
152}
153
154pub enum Body {
156 Empty,
158 Bytes(Vec<u8>),
160 Stream {
162 stream: Mutex<RequestBodyStream>,
164 content_length: Option<usize>,
166 },
167}
168
169impl fmt::Debug for Body {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171 match self {
172 Self::Empty => f.debug_tuple("Empty").finish(),
173 Self::Bytes(b) => f.debug_tuple("Bytes").field(b).finish(),
174 Self::Stream { content_length, .. } => f
175 .debug_struct("Stream")
176 .field("content_length", content_length)
177 .finish(),
178 }
179 }
180}
181
182impl Body {
183 #[must_use]
185 pub fn streaming<S>(stream: S) -> Self
186 where
187 S: Stream<Item = Result<Vec<u8>, RequestBodyStreamError>> + Send + 'static,
188 {
189 Self::Stream {
190 stream: Mutex::new(Box::pin(stream)),
191 content_length: None,
192 }
193 }
194
195 #[must_use]
197 pub fn streaming_with_size<S>(stream: S, content_length: usize) -> Self
198 where
199 S: Stream<Item = Result<Vec<u8>, RequestBodyStreamError>> + Send + 'static,
200 {
201 Self::Stream {
202 stream: Mutex::new(Box::pin(stream)),
203 content_length: Some(content_length),
204 }
205 }
206
207 #[must_use]
212 pub fn into_bytes(self) -> Vec<u8> {
213 match self {
214 Self::Empty => Vec::new(),
215 Self::Bytes(b) => b,
216 Self::Stream { .. } => Vec::new(),
217 }
218 }
219
220 #[must_use]
222 pub fn is_empty(&self) -> bool {
223 matches!(self, Self::Empty)
224 || matches!(self, Self::Bytes(b) if b.is_empty())
225 || matches!(
226 self,
227 Self::Stream {
228 content_length: Some(0),
229 ..
230 }
231 )
232 }
233
234 pub fn into_stream(self) -> Option<(RequestBodyStream, Option<usize>)> {
236 match self {
237 Self::Stream {
238 stream,
239 content_length,
240 } => Some((
241 stream.into_inner().unwrap_or_else(|e| e.into_inner()),
242 content_length,
243 )),
244 _ => None,
245 }
246 }
247}
248
249pub type BackgroundTasksInner = Mutex<Vec<Pin<Box<dyn Future<Output = ()> + Send>>>>;
254
255pub struct BackgroundTasks {
256 tasks: BackgroundTasksInner,
257}
258
259impl fmt::Debug for BackgroundTasks {
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 f.debug_struct("BackgroundTasks").finish_non_exhaustive()
262 }
263}
264
265impl BackgroundTasks {
266 #[must_use]
268 pub fn new() -> Self {
269 Self {
270 tasks: Mutex::new(Vec::new()),
271 }
272 }
273
274 pub fn add<F>(&self, f: F)
279 where
280 F: FnOnce() + Send + 'static,
281 {
282 self.add_async(async move { f() });
283 }
284
285 pub fn add_async<Fut>(&self, fut: Fut)
287 where
288 Fut: Future<Output = ()> + Send + 'static,
289 {
290 let mut guard = self
291 .tasks
292 .lock()
293 .unwrap_or_else(std::sync::PoisonError::into_inner);
294 guard.push(Box::pin(fut));
295 }
296
297 pub async fn execute_all(self) {
299 let tasks = self
300 .tasks
301 .into_inner()
302 .unwrap_or_else(std::sync::PoisonError::into_inner);
303 for t in tasks {
304 t.await;
305 }
306 }
307}
308
309impl Default for BackgroundTasks {
310 fn default() -> Self {
311 Self::new()
312 }
313}
314
315#[derive(Debug)]
317pub struct Request {
318 method: Method,
319 version: HttpVersion,
320 path: String,
321 query: Option<String>,
322 headers: Headers,
323 body: Body,
324 #[allow(dead_code)] extensions: HashMap<std::any::TypeId, Box<dyn std::any::Any + Send + Sync>>,
327}
328
329impl Request {
330 #[must_use]
332 pub fn new(method: Method, path: impl Into<String>) -> Self {
333 Self {
334 method,
335 version: HttpVersion::Http11,
336 path: path.into(),
337 query: None,
338 headers: Headers::new(),
339 body: Body::Empty,
340 extensions: HashMap::new(),
341 }
342 }
343
344 #[must_use]
346 pub fn with_version(method: Method, path: impl Into<String>, version: HttpVersion) -> Self {
347 let mut req = Self::new(method, path);
348 req.version = version;
349 req
350 }
351
352 #[must_use]
354 pub fn method(&self) -> Method {
355 self.method
356 }
357
358 #[must_use]
360 pub fn version(&self) -> HttpVersion {
361 self.version
362 }
363
364 pub fn set_version(&mut self, version: HttpVersion) {
366 self.version = version;
367 }
368
369 #[must_use]
371 pub fn path(&self) -> &str {
372 &self.path
373 }
374
375 #[must_use]
377 pub fn query(&self) -> Option<&str> {
378 self.query.as_deref()
379 }
380
381 #[must_use]
383 pub fn headers(&self) -> &Headers {
384 &self.headers
385 }
386
387 pub fn headers_mut(&mut self) -> &mut Headers {
389 &mut self.headers
390 }
391
392 #[must_use]
394 pub fn body(&self) -> &Body {
395 &self.body
396 }
397
398 pub fn take_body(&mut self) -> Body {
400 std::mem::replace(&mut self.body, Body::Empty)
401 }
402
403 pub fn set_body(&mut self, body: Body) {
405 self.body = body;
406 }
407
408 pub fn set_query(&mut self, query: Option<String>) {
410 self.query = query;
411 }
412
413 pub fn insert_extension<T: Any + Send + Sync>(&mut self, value: T) {
415 self.extensions.insert(TypeId::of::<T>(), Box::new(value));
416 }
417
418 #[must_use]
420 pub fn get_extension<T: Any + Send + Sync>(&self) -> Option<&T> {
421 self.extensions
422 .get(&TypeId::of::<T>())
423 .and_then(|boxed| boxed.downcast_ref::<T>())
424 }
425
426 pub fn get_extension_mut<T: Any + Send + Sync>(&mut self) -> Option<&mut T> {
428 self.extensions
429 .get_mut(&TypeId::of::<T>())
430 .and_then(|boxed| boxed.downcast_mut::<T>())
431 }
432
433 pub fn take_extension<T: Any + Send + Sync>(&mut self) -> Option<T> {
435 self.extensions
436 .remove(&TypeId::of::<T>())
437 .and_then(|boxed| boxed.downcast::<T>().ok())
438 .map(|boxed| *boxed)
439 }
440
441 pub fn background_tasks(&mut self) -> &BackgroundTasks {
443 if !self
444 .extensions
445 .contains_key(&TypeId::of::<BackgroundTasks>())
446 {
447 self.insert_extension(BackgroundTasks::new());
448 }
449 self.get_extension::<BackgroundTasks>()
450 .expect("BackgroundTasks extension should exist")
451 }
452}