trillium_http/conn.rs
1use crate::{
2 Body, Buffer, Headers, HttpContext, KnownHeaderName,
3 KnownHeaderName::Host,
4 Method, ProtocolSession, ReceivedBody, Status, Swansong, TypeSet, Version,
5 after_send::{AfterSend, SendStatus},
6 h2::H2Connection,
7 h3::H3Connection,
8 headers::hpack::FieldSection,
9 liveness::{CancelOnDisconnect, LivenessFut},
10 received_body::ReceivedBodyState,
11 util::encoding,
12};
13
14/// Header names whose semantics only apply at the HTTP/1 layer.
15///
16/// HTTP/2 (RFC 9113 §8.2.2) and HTTP/3 (RFC 9114 §4.2) call these
17/// "connection-specific" headers and forbid them in requests and responses on those
18/// transports. Used both for incoming-request validation in `Conn::new_h2` /
19/// `Conn::build_h3` and for response-header sanitation in
20/// `finalize_response_headers_h2` / `finalize_response_headers_h3`.
21pub(super) const H1_ONLY_HEADERS: [KnownHeaderName; 5] = [
22 KnownHeaderName::Connection,
23 KnownHeaderName::KeepAlive,
24 KnownHeaderName::ProxyConnection,
25 KnownHeaderName::TransferEncoding,
26 KnownHeaderName::Upgrade,
27];
28
29/// Validated request pseudo-headers + headers, the common output of
30/// [`validate_h2h3_request`].
31pub(super) struct ValidatedRequest {
32 pub method: Method,
33 pub path: Cow<'static, str>,
34 pub authority: Option<Cow<'static, str>>,
35 pub scheme: Option<Cow<'static, str>>,
36 pub protocol: Option<Cow<'static, str>>,
37 pub request_headers: Headers,
38}
39
40/// Shared HTTP/2 + HTTP/3 request-validation per RFC 9113 §8.1.2 and RFC 9114 §4.3.1.
41///
42/// Both protocols apply the same malformed-message rules to incoming requests:
43/// no `:status` pseudo, required `:method`, non-empty `:path` (or CONNECT default),
44/// `:scheme` required for non-CONNECT, `:authority` required for CONNECT, no
45/// `Host`/`:authority` mismatch, no [`H1_ONLY_HEADERS`], and `TE` restricted to
46/// `trailers`. Returns `None` on any violation; the caller maps to its
47/// protocol-specific error code (e.g. `H2ErrorCode::ProtocolError`,
48/// `H3ErrorCode::MessageError`) via `.ok_or(...)`.
49pub(super) fn validate_h2h3_request(
50 mut field_section: FieldSection<'static>,
51) -> Option<ValidatedRequest> {
52 let pseudo_headers = field_section.pseudo_headers_mut();
53
54 // §8.1.2.1 / §4.3.1: `:status` is response-only; reject it on requests.
55 if pseudo_headers.status().is_some() {
56 return None;
57 }
58
59 let method = pseudo_headers.take_method();
60 let path = pseudo_headers.take_path();
61 let authority = pseudo_headers.take_authority();
62 let scheme = pseudo_headers.take_scheme();
63 let protocol = pseudo_headers.take_protocol();
64 let request_headers = field_section.into_headers().into_owned();
65
66 if let Some(host) = request_headers.get_str(Host)
67 && let Some(authority) = &authority
68 && host != authority.as_ref()
69 {
70 return None;
71 }
72
73 if H1_ONLY_HEADERS
74 .into_iter()
75 .any(|name| request_headers.has_header(name))
76 {
77 return None;
78 }
79
80 let method = method?;
81
82 if method != Method::Connect && scheme.is_none() {
83 return None;
84 }
85
86 let path = match (method, path) {
87 (_, Some(path)) if !path.is_empty() => path,
88 (Method::Connect, _) => Cow::Borrowed("/"),
89 _ => return None,
90 };
91
92 if method == Method::Connect && authority.is_none() {
93 return None;
94 }
95
96 match request_headers.get_str(KnownHeaderName::Te) {
97 None | Some("trailers") => {}
98 _ => return None,
99 }
100
101 Some(ValidatedRequest {
102 method,
103 path,
104 authority,
105 scheme,
106 protocol,
107 request_headers,
108 })
109}
110use encoding_rs::Encoding;
111use futures_lite::{
112 future,
113 io::{AsyncRead, AsyncWrite},
114};
115use std::{
116 borrow::Cow,
117 fmt::{self, Debug, Formatter},
118 future::Future,
119 net::IpAddr,
120 pin::pin,
121 str,
122 sync::Arc,
123 time::Instant,
124};
125mod h1;
126mod h2;
127mod h3;
128
129/// A http connection
130///
131/// Unlike in other rust http implementations, this struct represents both
132/// the request and the response, and holds the transport over which the
133/// response will be sent.
134#[derive(fieldwork::Fieldwork)]
135pub struct Conn<Transport> {
136 #[field(get)]
137 /// the shared [`HttpContext`]
138 pub(crate) context: Arc<HttpContext>,
139
140 /// request [headers](Headers)
141 #[field(get, get_mut)]
142 pub(crate) request_headers: Headers,
143
144 /// response [headers](Headers)
145 #[field(get, get_mut)]
146 pub(crate) response_headers: Headers,
147
148 pub(crate) path: Cow<'static, str>,
149
150 /// the http method for this conn's request
151 ///
152 /// ```
153 /// # use trillium_http::{Conn, Method};
154 /// let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
155 /// assert_eq!(conn.method(), Method::Get);
156 /// ```
157 #[field(get, set, copy)]
158 pub(crate) method: Method,
159
160 /// the http status for this conn, if set
161 #[field(get, copy)]
162 pub(crate) status: Option<Status>,
163
164 /// The HTTP protocol version in use on this connection — HTTP/1.x, HTTP/2, or HTTP/3.
165 /// Populated by whichever protocol dispatcher opened the stream; handlers that need to
166 /// branch on version (e.g. to emit protocol-specific response headers, or to avoid
167 /// features that are only meaningful in one version) read it here.
168 ///
169 /// See [`HttpConfig`][crate::HttpConfig] for the full dispatch matrix and per-version
170 /// tuning knobs.
171 ///
172 /// ```
173 /// # use trillium_http::{Conn, Method, Version};
174 /// let conn = Conn::new_synthetic(Method::Get, "/", ());
175 /// // Synthetic conns default to HTTP/1.1; real conns reflect what the peer actually
176 /// // spoke (h2 when ALPN negotiated `h2` or when the prior-knowledge preface matched
177 /// // on either cleartext or TLS-without-ALPN-h2; h3 when the listener is a QUIC endpoint).
178 /// assert_eq!(conn.http_version(), Version::Http1_1);
179 /// ```
180 #[field(get = http_version, copy)]
181 pub(crate) version: Version,
182
183 /// the [state typemap](TypeSet) for this conn
184 #[field(get, get_mut)]
185 pub(crate) state: TypeSet,
186
187 /// the response [body](Body)
188 ///
189 /// ```
190 /// # use trillium_testing::HttpTest;
191 /// HttpTest::new(|conn| async move { conn.with_response_body("hello") })
192 /// .get("/")
193 /// .block()
194 /// .assert_body("hello");
195 ///
196 /// HttpTest::new(|conn| async move { conn.with_response_body(String::from("world")) })
197 /// .get("/")
198 /// .block()
199 /// .assert_body("world");
200 ///
201 /// HttpTest::new(|conn| async move { conn.with_response_body(vec![99, 97, 116]) })
202 /// .get("/")
203 /// .block()
204 /// .assert_body("cat");
205 /// ```
206 #[field(get, set, into, option_set_some, take, with)]
207 pub(crate) response_body: Option<Body>,
208
209 /// the transport
210 ///
211 /// This should only be used to call your own custom methods on the transport that do not read
212 /// or write any data. Calling any method that reads from or writes to the transport will
213 /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
214 /// use an HTTP upgrade.
215 #[field(get, get_mut)]
216 pub(crate) transport: Transport,
217
218 pub(crate) buffer: Buffer,
219
220 pub(crate) request_body_state: ReceivedBodyState,
221
222 pub(crate) after_send: AfterSend,
223
224 /// whether the connection is secure
225 ///
226 /// note that this does not necessarily indicate that the transport itself is secure, as it may
227 /// indicate that `trillium_http` is behind a trusted reverse proxy that has terminated tls and
228 /// provided appropriate headers to indicate this.
229 #[field(get, set, rename_predicates)]
230 pub(crate) secure: bool,
231
232 /// The [`Instant`] that the first header bytes for this conn were
233 /// received, before any processing or parsing has been performed.
234 #[field(get, copy)]
235 pub(crate) start_time: Instant,
236
237 /// The IP Address for the connection, if available
238 #[field(set, get, copy, into)]
239 pub(crate) peer_ip: Option<IpAddr>,
240
241 /// the :authority http/3 pseudo-header
242 #[field(set, get, into)]
243 pub(crate) authority: Option<Cow<'static, str>>,
244
245 /// the :scheme http/3 pseudo-header
246 #[field(set, get, into)]
247 pub(crate) scheme: Option<Cow<'static, str>>,
248
249 /// the [`ProtocolSession`] for this conn — the per-protocol session state
250 /// (h2/h3 connection driver and stream id) bundled into a single enum so the
251 /// "set together" invariant is enforced at the type level. `Http1` for
252 /// h1 / synthetic conns.
253 pub(crate) protocol_session: ProtocolSession,
254
255 /// the :protocol http/3 pseudo-header
256 #[field(set, get, into)]
257 pub(crate) protocol: Option<Cow<'static, str>>,
258
259 /// request trailers, populated after the request body has been fully read
260 #[field(get, get_mut)]
261 pub(crate) request_trailers: Option<Headers>,
262}
263
264impl<Transport> Debug for Conn<Transport> {
265 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
266 f.debug_struct("Conn")
267 .field("context", &self.context)
268 .field("request_headers", &self.request_headers)
269 .field("response_headers", &self.response_headers)
270 .field("path", &self.path)
271 .field("method", &self.method)
272 .field("status", &self.status)
273 .field("version", &self.version)
274 .field("state", &self.state)
275 .field("response_body", &self.response_body)
276 .field("transport", &format_args!(".."))
277 .field("buffer", &format_args!(".."))
278 .field("request_body_state", &self.request_body_state)
279 .field("secure", &self.secure)
280 .field("after_send", &format_args!(".."))
281 .field("start_time", &self.start_time)
282 .field("peer_ip", &self.peer_ip)
283 .field("authority", &self.authority)
284 .field("scheme", &self.scheme)
285 .field("protocol", &self.protocol)
286 .field("protocol_session", &self.protocol_session)
287 .field("request_trailers", &self.request_trailers)
288 .finish()
289 }
290}
291
292impl<Transport> Conn<Transport>
293where
294 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
295{
296 /// Returns the shared state on this conn, if set
297 pub fn shared_state(&self) -> &TypeSet {
298 &self.context.shared_state
299 }
300
301 /// sets the http status code from any `TryInto<Status>`.
302 ///
303 /// ```
304 /// # use trillium_http::Status;
305 /// # trillium_testing::HttpTest::new(|mut conn| async move {
306 /// assert!(conn.status().is_none());
307 ///
308 /// conn.set_status(200); // a status can be set as a u16
309 /// assert_eq!(conn.status().unwrap(), Status::Ok);
310 ///
311 /// conn.set_status(Status::ImATeapot); // or as a Status
312 /// assert_eq!(conn.status().unwrap(), Status::ImATeapot);
313 /// conn
314 /// # }).get("/").block().assert_status(Status::ImATeapot);
315 /// ```
316 pub fn set_status(&mut self, status: impl TryInto<Status>) -> &mut Self {
317 self.status = Some(status.try_into().unwrap_or_else(|_| {
318 log::error!("attempted to set an invalid status code");
319 Status::InternalServerError
320 }));
321 self
322 }
323
324 /// sets the http status code from any `TryInto<Status>`, returning Conn
325 #[must_use]
326 pub fn with_status(mut self, status: impl TryInto<Status>) -> Self {
327 self.set_status(status);
328 self
329 }
330
331 /// retrieves the path part of the request url, up to and excluding any query component
332 /// ```
333 /// # use trillium_testing::HttpTest;
334 /// HttpTest::new(|mut conn| async move {
335 /// assert_eq!(conn.path(), "/some/path");
336 /// conn.with_status(200)
337 /// })
338 /// .get("/some/path?and&a=query")
339 /// .block()
340 /// .assert_ok();
341 /// ```
342 pub fn path(&self) -> &str {
343 match self.path.split_once('?') {
344 Some((path, _)) => path,
345 None => &self.path,
346 }
347 }
348
349 /// retrieves the combined path and any query
350 pub fn path_and_query(&self) -> &str {
351 &self.path
352 }
353
354 /// retrieves the query component of the path, or an empty &str
355 ///
356 /// ```
357 /// # use trillium_testing::HttpTest;
358 /// let server = HttpTest::new(|conn| async move {
359 /// let querystring = conn.querystring().to_string();
360 /// conn.with_response_body(querystring).with_status(200)
361 /// });
362 ///
363 /// server
364 /// .get("/some/path?and&a=query")
365 /// .block()
366 /// .assert_body("and&a=query");
367 ///
368 /// server.get("/some/path").block().assert_body("");
369 /// ```
370 pub fn querystring(&self) -> &str {
371 self.path
372 .split_once('?')
373 .map(|(_, query)| query)
374 .unwrap_or_default()
375 }
376
377 /// get the host for this conn, if it exists
378 pub fn host(&self) -> Option<&str> {
379 self.request_headers.get_str(Host)
380 }
381
382 /// set the host for this conn
383 pub fn set_host(&mut self, host: String) -> &mut Self {
384 self.request_headers.insert(Host, host);
385 self
386 }
387
388 /// Cancels and drops the future if reading from the transport results in an error or empty read
389 ///
390 /// The use of this method is not advised if your connected http client employs pipelining
391 /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
392 /// time
393 ///
394 /// If the client disconnects from the conn's transport, this function will return None. If the
395 /// future completes without disconnection, this future will return Some containing the output
396 /// of the future.
397 ///
398 /// Note that the inner future cannot borrow conn, so you will need to clone or take any
399 /// information needed to execute the future prior to executing this method.
400 ///
401 /// # Example
402 ///
403 /// ```rust
404 /// # use futures_lite::{AsyncRead, AsyncWrite};
405 /// # use trillium_http::{Conn, Method};
406 /// async fn something_slow_and_cancel_safe() -> String {
407 /// String::from("this was not actually slow")
408 /// }
409 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
410 /// where
411 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
412 /// {
413 /// let Some(returned_body) = conn
414 /// .cancel_on_disconnect(async { something_slow_and_cancel_safe().await })
415 /// .await
416 /// else {
417 /// return conn;
418 /// };
419 /// conn.with_response_body(returned_body).with_status(200)
420 /// }
421 /// ```
422 pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
423 where
424 Fut: Future + Send + 'a,
425 {
426 CancelOnDisconnect(self, pin!(fut)).await
427 }
428
429 /// Check if the transport is connected by attempting to read from the transport
430 ///
431 /// # Example
432 ///
433 /// This is best to use at appropriate points in a long-running handler, like:
434 ///
435 /// ```rust
436 /// # use futures_lite::{AsyncRead, AsyncWrite};
437 /// # use trillium_http::{Conn, Method};
438 /// # async fn something_slow_but_not_cancel_safe() {}
439 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
440 /// where
441 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
442 /// {
443 /// for _ in 0..100 {
444 /// if conn.is_disconnected().await {
445 /// return conn;
446 /// }
447 /// something_slow_but_not_cancel_safe().await;
448 /// }
449 /// conn.with_status(200)
450 /// }
451 /// ```
452 pub async fn is_disconnected(&mut self) -> bool {
453 future::poll_once(LivenessFut::new(self)).await.is_some()
454 }
455
456 /// returns the [`encoding_rs::Encoding`] for this request, as determined from the mime-type
457 /// charset, if available
458 ///
459 /// ```
460 /// # use trillium_testing::HttpTest;
461 /// HttpTest::new(|mut conn| async move {
462 /// assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
463 ///
464 /// conn.request_headers_mut()
465 /// .insert("content-type", "text/plain;charset=utf-16");
466 /// assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
467 ///
468 /// conn.with_status(200)
469 /// })
470 /// .get("/")
471 /// .block()
472 /// .assert_ok();
473 /// ```
474 pub fn request_encoding(&self) -> &'static Encoding {
475 encoding(&self.request_headers)
476 }
477
478 /// returns the [`encoding_rs::Encoding`] for this response, as
479 /// determined from the mime-type charset, if available
480 ///
481 /// ```
482 /// # use trillium_testing::HttpTest;
483 /// HttpTest::new(|mut conn| async move {
484 /// assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
485 /// conn.response_headers_mut()
486 /// .insert("content-type", "text/plain;charset=utf-16");
487 ///
488 /// assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
489 ///
490 /// conn.with_status(200)
491 /// })
492 /// .get("/")
493 /// .block()
494 /// .assert_ok();
495 /// ```
496 pub fn response_encoding(&self) -> &'static Encoding {
497 encoding(&self.response_headers)
498 }
499
500 /// returns a [`ReceivedBody`] that references this conn. the conn
501 /// retains all data and holds the singular transport, but the
502 /// `ReceivedBody` provides an interface to read body content.
503 ///
504 /// If the request included an `Expect: 100-continue` header, the 100 Continue response is sent
505 /// lazily on the first read from the returned [`ReceivedBody`].
506 /// ```
507 /// # use trillium_testing::HttpTest;
508 /// let server = HttpTest::new(|mut conn| async move {
509 /// let request_body = conn.request_body();
510 /// assert_eq!(request_body.content_length(), Some(5));
511 /// assert_eq!(request_body.read_string().await.unwrap(), "hello");
512 /// conn.with_status(200)
513 /// });
514 ///
515 /// server.post("/").with_body("hello").block().assert_ok();
516 /// ```
517 pub fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
518 let needs_100_continue = self.needs_100_continue();
519 let body = self.build_request_body();
520 if needs_100_continue {
521 body.with_send_100_continue()
522 } else {
523 body
524 }
525 }
526
527 /// returns a clone of the [`swansong::Swansong`] for this Conn. use
528 /// this to gracefully stop long-running futures and streams
529 /// inside of handler functions
530 pub fn swansong(&self) -> Swansong {
531 self.protocol_session
532 .h3_connection()
533 .map_or_else(|| self.context.swansong.clone(), |h| h.swansong().clone())
534 }
535
536 /// Registers a function to call after the http response has been
537 /// completely transferred.
538 ///
539 /// The callback is guaranteed to fire **exactly once** before the conn is
540 /// dropped. Either the codec's send path invokes it with the real outcome,
541 /// or — if the conn is dropped before send completes (handler panic,
542 /// transport error, mid-write disconnect) — the drop fallback invokes it
543 /// with a `SendStatus` whose `is_success()` returns false. Multiple
544 /// registrations on the same conn chain in registration order.
545 ///
546 /// Because firing is ordered by send-completion rather than handler return,
547 /// this is the right hook for instrumentation that wants to report what the
548 /// peer actually observed (`trillium-logger` and the out-of-tree
549 /// `trillium-opentelemetry` handler both depend on this property).
550 ///
551 /// Please note that this is a sync function and should be computationally
552 /// lightweight. If your _application_ needs additional async processing,
553 /// use your runtime's task spawn within this hook. If your _library_ needs
554 /// additional async processing in an `after_send` hook, please open an
555 /// issue. This hook is currently designed for simple instrumentation and
556 /// logging, and should be thought of as equivalent to a Drop hook.
557 pub fn after_send<F>(&mut self, after_send: F)
558 where
559 F: FnOnce(SendStatus) + Send + Sync + 'static,
560 {
561 self.after_send.append(after_send);
562 }
563
564 /// applies a mapping function from one transport to another. This
565 /// is particularly useful for boxing the transport. unless you're
566 /// sure this is what you're looking for, you probably don't want
567 /// to be using this
568 pub fn map_transport<NewTransport>(
569 self,
570 f: impl Fn(Transport) -> NewTransport,
571 ) -> Conn<NewTransport>
572 where
573 NewTransport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
574 {
575 // Manual respread: rustc treats `Conn<Transport>` and `Conn<NewTransport>` as
576 // disjoint types and rejects `..self` without the unstable
577 // `type_changing_struct_update` feature. If a new field is added to `Conn`,
578 // update this respread, `Upgrade::map_transport`, and `From<Conn> for Upgrade`
579 // (`upgrade.rs`) — they share this drift hazard.
580 Conn {
581 context: self.context,
582 request_headers: self.request_headers,
583 response_headers: self.response_headers,
584 method: self.method,
585 response_body: self.response_body,
586 path: self.path,
587 status: self.status,
588 version: self.version,
589 state: self.state,
590 transport: f(self.transport),
591 buffer: self.buffer,
592 request_body_state: self.request_body_state,
593 secure: self.secure,
594 after_send: self.after_send,
595 start_time: self.start_time,
596 peer_ip: self.peer_ip,
597 authority: self.authority,
598 scheme: self.scheme,
599 protocol: self.protocol,
600 protocol_session: self.protocol_session,
601 request_trailers: self.request_trailers,
602 }
603 }
604
605 /// whether this conn is suitable for an http upgrade to another protocol
606 pub fn should_upgrade(&self) -> bool {
607 (self.method() == Method::Connect && self.status == Some(Status::Ok))
608 || self.status == Some(Status::SwitchingProtocols)
609 }
610
611 #[doc(hidden)]
612 pub fn finalize_headers(&mut self) {
613 if self.version == Version::Http3 {
614 self.finalize_response_headers_h3();
615 } else {
616 self.finalize_response_headers_1x();
617 }
618 }
619
620 /// the [`H2Connection`] driver for this conn, if this is an HTTP/2 request
621 pub fn h2_connection(&self) -> Option<&Arc<H2Connection>> {
622 self.protocol_session.h2_connection()
623 }
624
625 /// the h2 stream id for this conn, if this is an HTTP/2 request
626 pub fn h2_stream_id(&self) -> Option<u32> {
627 self.protocol_session.h2_stream_id()
628 }
629
630 /// the [`H3Connection`] driver for this conn, if this is an HTTP/3 request
631 pub fn h3_connection(&self) -> Option<&Arc<H3Connection>> {
632 self.protocol_session.h3_connection()
633 }
634
635 /// the h3 stream id for this conn, if this is an HTTP/3 request
636 pub fn h3_stream_id(&self) -> Option<u64> {
637 self.protocol_session.h3_stream_id()
638 }
639}