trillium_http/http_context.rs
1use crate::{
2 Conn, ConnectionStatus, HttpConfig, Result, TypeSet, Upgrade,
3 conn::{ConnParts, HeadError},
4 headers::header_observer::HeaderObserver,
5};
6use fieldwork::Fieldwork;
7use futures_lite::{AsyncRead, AsyncWrite};
8use std::{future::Future, sync::Arc};
9use swansong::{ShutdownCompletion, Swansong};
10/// Shared configuration and context for an http server.
11///
12/// Contains tunable parameters in a [`HttpConfig`], the [`Swansong`] graceful shutdown control
13/// interface, and a shared [`TypeSet`] that contains application-specific information about the
14/// running server.
15#[derive(Default, Debug, Fieldwork)]
16#[fieldwork(get, set, get_mut, with)]
17pub struct HttpContext {
18 /// [`HttpConfig`] performance and security parameters
19 pub(crate) config: HttpConfig,
20
21 /// [`Swansong`] graceful shutdown interface
22 pub(crate) swansong: Swansong,
23
24 /// [`TypeSet`] shared state
25 pub(crate) shared_state: TypeSet,
26
27 /// Per-listener QPACK header-frequency observer. Shared by `Arc` across all connections
28 /// a given listener accepts; runtime adapters isolate it per hop-and-direction via
29 /// [`__isolate_qpack_observer`](Self::__isolate_qpack_observer).
30 #[cfg_attr(not(feature = "unstable"), field = false)]
31 pub(crate) observer: Arc<HeaderObserver>,
32}
33impl AsRef<TypeSet> for HttpContext {
34 fn as_ref(&self) -> &TypeSet {
35 &self.shared_state
36 }
37}
38
39impl AsMut<TypeSet> for HttpContext {
40 fn as_mut(&mut self) -> &mut TypeSet {
41 &mut self.shared_state
42 }
43}
44
45impl AsRef<Swansong> for HttpContext {
46 fn as_ref(&self) -> &Swansong {
47 &self.swansong
48 }
49}
50
51impl AsRef<HttpConfig> for HttpContext {
52 fn as_ref(&self) -> &HttpConfig {
53 &self.config
54 }
55}
56
57impl HttpContext {
58 /// Construct a new `HttpContext`
59 pub fn new() -> Self {
60 Self::default()
61 }
62
63 /// Perform HTTP on the provided transport, applying the provided `async Conn -> Conn` handler
64 /// function for every distinct http request-response.
65 ///
66 /// For any given invocation of `HttpContext::run`, the handler function may run any number of
67 /// times, depending on whether the connection is reused by the client.
68 ///
69 /// # Errors
70 ///
71 /// This function will return an [`Error`](crate::Error) if any of the http requests is
72 /// irrecoverably malformed or otherwise noncompliant.
73 pub async fn run<Transport, Handler, Fut>(
74 self: Arc<Self>,
75 transport: Transport,
76 handler: Handler,
77 ) -> Result<Option<Upgrade<Transport>>>
78 where
79 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
80 Handler: FnMut(Conn<Transport>) -> Fut,
81 Fut: Future<Output = Conn<Transport>>,
82 {
83 let initial_bytes = Vec::with_capacity(self.config.request_buffer_initial_len);
84 run_with_initial_bytes(self, transport, initial_bytes, handler).await
85 }
86
87 /// Attempt graceful shutdown of this server.
88 ///
89 /// The returned [`ShutdownCompletion`] type can
90 /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
91 /// blocking context
92 pub fn shut_down(&self) -> ShutdownCompletion {
93 self.swansong.shut_down()
94 }
95
96 /// Replace this context's QPACK header observer with a fresh, empty one.
97 ///
98 /// Adapter crates call this during listener setup so each hop-and-direction pair in a
99 /// deployment gets its own observer. A reverse proxy's inbound server observer is distinct
100 /// from its outbound client observer by construction, so header values one hop forwards
101 /// (e.g. `authorization`, `cookie`) cannot reach the QPACK state of unrelated clients on
102 /// the other hop.
103 ///
104 /// Not part of the stable public API; exposed only for adapter crates.
105 #[doc(hidden)]
106 pub fn __isolate_header_observer(&mut self) -> &mut Self {
107 self.observer = Arc::new(HeaderObserver::default());
108 self
109 }
110}
111
112/// Like [`HttpContext::run`], but starts with the supplied bytes pre-filled into the request
113/// buffer.
114///
115/// For adapters that peek the first few bytes off a cleartext TCP stream to decide between
116/// HTTP/1.1 and HTTP/2 prior-knowledge dispatch, then need to hand those bytes into the HTTP/1
117/// parser without re-reading. Bytes already in the buffer are consumed by the parser before
118/// any transport read happens.
119///
120/// # Errors
121///
122/// Same as [`HttpContext::run`] — any irrecoverably malformed or noncompliant HTTP/1 request
123/// surfaces as an [`Error`](crate::Error).
124pub async fn run_with_initial_bytes<Transport, Handler, Fut>(
125 context: Arc<HttpContext>,
126 transport: Transport,
127 initial_bytes: Vec<u8>,
128 mut handler: Handler,
129) -> Result<Option<Upgrade<Transport>>>
130where
131 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
132 Handler: FnMut(Conn<Transport>) -> Fut,
133 Fut: Future<Output = Conn<Transport>>,
134{
135 let _guard = context.swansong.guard();
136
137 let parts = ConnParts::new(context, transport, initial_bytes);
138
139 let mut conn = match parts.parse_head().await {
140 Ok(conn) => conn,
141 Err(HeadError::BadRequest(bad)) => {
142 bad.send().await?;
143 return Ok(None);
144 }
145 Err(HeadError::Fatal(e)) => return Err(e),
146 };
147
148 loop {
149 conn = match handler(conn).await.send().await? {
150 ConnectionStatus::Upgrade(upgrade) => return Ok(Some(upgrade)),
151 ConnectionStatus::Close => return Ok(None),
152 ConnectionStatus::Conn(next) => next,
153 }
154 }
155}
156
157/// Parse a single HTTP/1.x request head off `transport` and return the resulting [`Conn`], without
158/// entering the handler/keepalive loop.
159///
160/// The request buffer is seeded at `request_buffer_initial_len` exactly as [`HttpContext::run`]
161/// seeds it, so this exercises the real head-read buffer-growth path. A complete-but-malformed head
162/// still yields a `Conn` (carrying the error status); only an incomplete head or transport IO error
163/// returns `Err`.
164///
165/// Exposed for microbenchmarking the parse path; not part of the stable public API.
166#[cfg(feature = "unstable")]
167#[doc(hidden)]
168pub async fn parse_head_for_bench<Transport>(
169 context: Arc<HttpContext>,
170 transport: Transport,
171) -> Result<Conn<Transport>>
172where
173 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
174{
175 let initial_bytes = Vec::with_capacity(context.config.request_buffer_initial_len);
176 match ConnParts::new(context, transport, initial_bytes)
177 .parse_head()
178 .await
179 {
180 Ok(conn) => Ok(conn),
181 Err(HeadError::BadRequest(bad)) => Ok(*bad),
182 Err(HeadError::Fatal(e)) => Err(e),
183 }
184}