trillium_http/http_context.rs
1use crate::{
2 Buffer, Conn, ConnectionStatus, HttpConfig, Result, TypeSet, Upgrade,
3 headers::header_observer::HeaderObserver,
4};
5use fieldwork::Fieldwork;
6use futures_lite::{AsyncRead, AsyncWrite};
7use std::{future::Future, sync::Arc};
8use swansong::{ShutdownCompletion, Swansong};
9/// This struct represents the shared configuration and context for a http server.
10///
11/// This currently contains tunable parameters in a [`HttpConfig`], the [`Swansong`] graceful
12/// shutdown control interface, and a shared [`TypeSet`] that contains application-specific
13/// information about the running server
14#[derive(Default, Debug, Fieldwork)]
15#[fieldwork(get, set, get_mut, with)]
16pub struct HttpContext {
17 /// [`HttpConfig`] performance and security parameters
18 pub(crate) config: HttpConfig,
19
20 /// [`Swansong`] graceful shutdown interface
21 pub(crate) swansong: Swansong,
22
23 /// [`TypeSet`] shared state
24 pub(crate) shared_state: TypeSet,
25
26 /// Per-listener QPACK header-frequency observer. Shared by `Arc` across all connections
27 /// a given listener accepts; runtime adapters isolate it per hop-and-direction via
28 /// [`__isolate_qpack_observer`](Self::__isolate_qpack_observer).
29 #[cfg_attr(not(feature = "unstable"), field = false)]
30 pub(crate) observer: Arc<HeaderObserver>,
31}
32impl AsRef<TypeSet> for HttpContext {
33 fn as_ref(&self) -> &TypeSet {
34 &self.shared_state
35 }
36}
37
38impl AsMut<TypeSet> for HttpContext {
39 fn as_mut(&mut self) -> &mut TypeSet {
40 &mut self.shared_state
41 }
42}
43
44impl AsRef<Swansong> for HttpContext {
45 fn as_ref(&self) -> &Swansong {
46 &self.swansong
47 }
48}
49
50impl AsRef<HttpConfig> for HttpContext {
51 fn as_ref(&self) -> &HttpConfig {
52 &self.config
53 }
54}
55
56impl HttpContext {
57 /// Construct a new `HttpContext`
58 pub fn new() -> Self {
59 Self::default()
60 }
61
62 /// Perform HTTP on the provided transport, applying the provided `async Conn -> Conn` handler
63 /// function for every distinct http request-response.
64 ///
65 /// For any given invocation of `HttpContext::run`, the handler function may run any number of
66 /// times, depending on whether the connection is reused by the client.
67 ///
68 /// This can only be called on an `Arc<HttpContext>` because an arc clone is moved into the
69 /// Conn.
70 ///
71 /// # Errors
72 ///
73 /// This function will return an [`Error`](crate::Error) if any of the http requests is
74 /// irrecoverably malformed or otherwise noncompliant.
75 pub async fn run<Transport, Handler, Fut>(
76 self: Arc<Self>,
77 transport: Transport,
78 handler: Handler,
79 ) -> Result<Option<Upgrade<Transport>>>
80 where
81 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
82 Handler: FnMut(Conn<Transport>) -> Fut,
83 Fut: Future<Output = Conn<Transport>>,
84 {
85 let initial_bytes = Vec::with_capacity(self.config.request_buffer_initial_len);
86 run_with_initial_bytes(self, transport, initial_bytes, handler).await
87 }
88
89 /// Attempt graceful shutdown of this server.
90 ///
91 /// The returned [`ShutdownCompletion`] type can
92 /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
93 /// blocking context
94 pub fn shut_down(&self) -> ShutdownCompletion {
95 self.swansong.shut_down()
96 }
97
98 /// Replace this context's QPACK header observer with a fresh, empty one.
99 ///
100 /// Runtime adapters (trillium-server-common, trillium-client) call this during listener
101 /// setup so that each hop-and-direction pair in a deployment gets its own observer. A
102 /// reverse proxy's inbound server observer is distinct from its outbound client observer
103 /// by construction, so header values one hop forwards (e.g. `authorization`, `cookie`)
104 /// cannot reach the QPACK state of unrelated clients on the other hop.
105 ///
106 /// Not part of the stable public API; exposed only so adapter crates can call it.
107 #[doc(hidden)]
108 pub fn __isolate_qpack_observer(&mut self) -> &mut Self {
109 self.observer = Arc::new(HeaderObserver::default());
110 log::trace!(
111 target: "qpack_metrics",
112 "isolated fresh QPACK observer for this context (ptr={:p})",
113 Arc::as_ptr(&self.observer),
114 );
115 self
116 }
117}
118
119/// Like [`HttpContext::run`], but starts with the supplied bytes pre-filled into the request
120/// buffer.
121///
122/// Used by runtime adapters that need to peek the first few bytes off a cleartext TCP stream
123/// to decide between HTTP/1.1 and HTTP/2 prior-knowledge dispatch, and then hand those bytes
124/// into the HTTP/1 parser without re-reading them.
125///
126/// The mechanism is the same one the keep-alive / pipelining path already uses when a follow-up
127/// request arrives on the tail of the previous response: any bytes already in the buffer are
128/// consumed by the parser before the next transport read.
129///
130/// # Errors
131///
132/// Same as [`HttpContext::run`] — any irrecoverably malformed or noncompliant HTTP/1 request
133/// surfaces as an [`Error`](crate::Error).
134pub async fn run_with_initial_bytes<Transport, Handler, Fut>(
135 context: Arc<HttpContext>,
136 transport: Transport,
137 initial_bytes: Vec<u8>,
138 mut handler: Handler,
139) -> Result<Option<Upgrade<Transport>>>
140where
141 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
142 Handler: FnMut(Conn<Transport>) -> Fut,
143 Fut: Future<Output = Conn<Transport>>,
144{
145 let _guard = context.swansong.guard();
146 let buffer: Buffer = initial_bytes.into();
147 let mut conn = Conn::new_internal(context, transport, buffer).await?;
148
149 loop {
150 conn = match handler(conn).await.send().await? {
151 ConnectionStatus::Upgrade(upgrade) => return Ok(Some(upgrade)),
152 ConnectionStatus::Close => return Ok(None),
153 ConnectionStatus::Conn(next) => next,
154 }
155 }
156}