Skip to main content

trillium_http/
conn.rs

1use crate::{
2    Body, Buffer, Headers, HttpContext, KnownHeaderName,
3    KnownHeaderName::Host,
4    Method, ProtocolSession, ReceivedBody, Status, Swansong, TypeSet, Version,
5    after_send::{AfterSend, SendStatus},
6    h2::H2Connection,
7    h3::H3Connection,
8    headers::hpack::FieldSection,
9    liveness::{CancelOnDisconnect, LivenessFut},
10    received_body::ReceivedBodyState,
11    util::encoding,
12};
13
14/// Header names whose semantics only apply at the HTTP/1 layer.
15///
16/// HTTP/2 (RFC 9113 §8.2.2) and HTTP/3 (RFC 9114 §4.2) call these
17/// "connection-specific" headers and forbid them in requests and responses on those
18/// transports. Used both for incoming-request validation in `Conn::new_h2` /
19/// `Conn::build_h3` and for response-header sanitation in
20/// `finalize_response_headers_h2` / `finalize_response_headers_h3`.
21pub(super) const H1_ONLY_HEADERS: [KnownHeaderName; 5] = [
22    KnownHeaderName::Connection,
23    KnownHeaderName::KeepAlive,
24    KnownHeaderName::ProxyConnection,
25    KnownHeaderName::TransferEncoding,
26    KnownHeaderName::Upgrade,
27];
28
29/// Validated request pseudo-headers + headers, the common output of
30/// [`validate_h2h3_request`].
31pub(super) struct ValidatedRequest {
32    pub method: Method,
33    pub path: Cow<'static, str>,
34    pub authority: Option<Cow<'static, str>>,
35    pub scheme: Option<Cow<'static, str>>,
36    pub protocol: Option<Cow<'static, str>>,
37    pub request_headers: Headers,
38}
39
40/// Shared HTTP/2 + HTTP/3 request-validation per RFC 9113 §8.1.2 and RFC 9114 §4.3.1.
41///
42/// Both protocols apply the same malformed-message rules to incoming requests:
43/// no `:status` pseudo, required `:method`, non-empty `:path` (or CONNECT default),
44/// `:scheme` required for non-CONNECT, `:authority` required for CONNECT, `:authority`
45/// or `Host` required when `:scheme` is `http`/`https`, no `Host`/`:authority`
46/// mismatch, no [`H1_ONLY_HEADERS`], and `TE` restricted to `trailers`. Returns `None`
47/// on any violation; the caller maps to its protocol-specific error code (e.g.
48/// `H2ErrorCode::ProtocolError`, `H3ErrorCode::MessageError`) via `.ok_or(...)`.
49pub(super) fn validate_h2h3_request(
50    mut field_section: FieldSection<'static>,
51) -> Option<ValidatedRequest> {
52    let pseudo_headers = field_section.pseudo_headers_mut();
53
54    // §8.1.2.1 / §4.3.1: `:status` is response-only; reject it on requests.
55    if pseudo_headers.status().is_some() {
56        return None;
57    }
58
59    let method = pseudo_headers.take_method();
60    let path = pseudo_headers.take_path();
61    let authority = pseudo_headers.take_authority();
62    let scheme = pseudo_headers.take_scheme();
63    let protocol = pseudo_headers.take_protocol();
64    let request_headers = field_section.into_headers().into_owned();
65
66    if let Some(host) = request_headers.get_str(Host)
67        && let Some(authority) = &authority
68        && host != authority.as_ref()
69    {
70        return None;
71    }
72
73    if H1_ONLY_HEADERS
74        .into_iter()
75        .any(|name| request_headers.has_header(name))
76    {
77        return None;
78    }
79
80    let method = method?;
81
82    if method != Method::Connect && scheme.is_none() {
83        return None;
84    }
85
86    let path = match (method, path) {
87        (_, Some(path)) if !path.is_empty() => path,
88        (Method::Connect, _) => Cow::Borrowed("/"),
89        _ => return None,
90    };
91
92    if method == Method::Connect && authority.is_none() {
93        return None;
94    }
95
96    // RFC 9114 §4.3.1 / RFC 9113 §8.3.1: when :scheme names a scheme with a mandatory
97    // authority component, the request MUST carry either :authority or a Host header.
98    // The spec gives "http" and "https" as the canonical examples; we also include "ws"
99    // and "wss" (RFC 6455 §3, same hierarchical-with-mandatory-authority shape) so the
100    // rule applies consistently if a non-standard sender uses those. Exotic schemes
101    // without mandatory authority (file, data, mailto, urn) are exempt; CONNECT is
102    // handled above.
103    if method != Method::Connect
104        && matches!(scheme.as_deref(), Some("http" | "https" | "ws" | "wss"))
105        && authority.is_none()
106        && request_headers.get_str(Host).is_none()
107    {
108        return None;
109    }
110
111    match request_headers.get_str(KnownHeaderName::Te) {
112        None | Some("trailers") => {}
113        _ => return None,
114    }
115
116    Some(ValidatedRequest {
117        method,
118        path,
119        authority,
120        scheme,
121        protocol,
122        request_headers,
123    })
124}
125use encoding_rs::Encoding;
126use futures_lite::{
127    future,
128    io::{AsyncRead, AsyncWrite},
129};
130use std::{
131    borrow::Cow,
132    fmt::{self, Debug, Formatter},
133    future::Future,
134    net::IpAddr,
135    pin::pin,
136    str,
137    sync::Arc,
138    time::Instant,
139};
140mod h1;
141mod h2;
142mod h3;
143pub(crate) use h3::H3FirstFrame;
144
145/// A http connection
146///
147/// Unlike in other rust http implementations, this struct represents both
148/// the request and the response, and holds the transport over which the
149/// response will be sent.
150#[derive(fieldwork::Fieldwork)]
151pub struct Conn<Transport> {
152    #[field(get)]
153    /// the shared [`HttpContext`]
154    pub(crate) context: Arc<HttpContext>,
155
156    /// request [headers](Headers)
157    #[field(get, get_mut)]
158    pub(crate) request_headers: Headers,
159
160    /// response [headers](Headers)
161    #[field(get, get_mut)]
162    pub(crate) response_headers: Headers,
163
164    pub(crate) path: Cow<'static, str>,
165
166    /// the http method for this conn's request
167    ///
168    /// ```
169    /// # use trillium_http::{Conn, Method};
170    /// let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
171    /// assert_eq!(conn.method(), Method::Get);
172    /// ```
173    #[field(get, set, copy)]
174    pub(crate) method: Method,
175
176    /// the http status for this conn, if set
177    #[field(get, copy)]
178    pub(crate) status: Option<Status>,
179
180    /// The HTTP protocol version in use on this connection — HTTP/1.x, HTTP/2, or HTTP/3.
181    /// Populated by whichever protocol dispatcher opened the stream; handlers that need to
182    /// branch on version (e.g. to emit protocol-specific response headers, or to avoid
183    /// features that are only meaningful in one version) read it here.
184    ///
185    /// See [`HttpConfig`][crate::HttpConfig] for the full dispatch matrix and per-version
186    /// tuning knobs.
187    ///
188    /// ```
189    /// # use trillium_http::{Conn, Method, Version};
190    /// let conn = Conn::new_synthetic(Method::Get, "/", ());
191    /// // Synthetic conns default to HTTP/1.1; real conns reflect what the peer actually
192    /// // spoke (h2 when ALPN negotiated `h2` or when the prior-knowledge preface matched
193    /// // on either cleartext or TLS-without-ALPN-h2; h3 when the listener is a QUIC endpoint).
194    /// assert_eq!(conn.http_version(), Version::Http1_1);
195    /// ```
196    #[field(get = http_version, copy)]
197    pub(crate) version: Version,
198
199    /// the [state typemap](TypeSet) for this conn
200    #[field(get, get_mut)]
201    pub(crate) state: TypeSet,
202
203    /// the response [body](Body)
204    ///
205    /// ```
206    /// # use trillium_testing::HttpTest;
207    /// HttpTest::new(|conn| async move { conn.with_response_body("hello") })
208    ///     .get("/")
209    ///     .block()
210    ///     .assert_body("hello");
211    ///
212    /// HttpTest::new(|conn| async move { conn.with_response_body(String::from("world")) })
213    ///     .get("/")
214    ///     .block()
215    ///     .assert_body("world");
216    ///
217    /// HttpTest::new(|conn| async move { conn.with_response_body(vec![99, 97, 116]) })
218    ///     .get("/")
219    ///     .block()
220    ///     .assert_body("cat");
221    /// ```
222    #[field(get, set, into, option_set_some, take, with)]
223    pub(crate) response_body: Option<Body>,
224
225    /// the transport
226    ///
227    /// This should only be used to call your own custom methods on the transport that do not read
228    /// or write any data. Calling any method that reads from or writes to the transport will
229    /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
230    /// use an HTTP upgrade.
231    #[field(get, get_mut)]
232    pub(crate) transport: Transport,
233
234    pub(crate) buffer: Buffer,
235
236    pub(crate) request_body_state: ReceivedBodyState,
237
238    pub(crate) after_send: AfterSend,
239
240    /// whether the connection is secure
241    ///
242    /// note that this does not necessarily indicate that the transport itself is secure, as it may
243    /// indicate that `trillium_http` is behind a trusted reverse proxy that has terminated tls and
244    /// provided appropriate headers to indicate this.
245    #[field(get, set, rename_predicates)]
246    pub(crate) secure: bool,
247
248    /// The [`Instant`] that the first header bytes for this conn were
249    /// received, before any processing or parsing has been performed.
250    #[field(get, copy)]
251    pub(crate) start_time: Instant,
252
253    /// The IP Address for the connection, if available
254    #[field(set, get, copy, into)]
255    pub(crate) peer_ip: Option<IpAddr>,
256
257    /// the :authority http/3 pseudo-header
258    #[field(set, get, into)]
259    pub(crate) authority: Option<Cow<'static, str>>,
260
261    /// the :scheme http/3 pseudo-header
262    #[field(set, get, into)]
263    pub(crate) scheme: Option<Cow<'static, str>>,
264
265    /// the [`ProtocolSession`] for this conn — the per-protocol session state
266    /// (h2/h3 connection driver and stream id) bundled into a single enum so the
267    /// "set together" invariant is enforced at the type level. `Http1` for
268    /// h1 / synthetic conns.
269    pub(crate) protocol_session: ProtocolSession,
270
271    /// the :protocol http/3 pseudo-header
272    #[field(set, get, into)]
273    pub(crate) protocol: Option<Cow<'static, str>>,
274
275    /// request trailers, populated after the request body has been fully read
276    #[field(get, get_mut)]
277    pub(crate) request_trailers: Option<Headers>,
278}
279
280impl<Transport> Debug for Conn<Transport> {
281    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
282        f.debug_struct("Conn")
283            .field("context", &self.context)
284            .field("request_headers", &self.request_headers)
285            .field("response_headers", &self.response_headers)
286            .field("path", &self.path)
287            .field("method", &self.method)
288            .field("status", &self.status)
289            .field("version", &self.version)
290            .field("state", &self.state)
291            .field("response_body", &self.response_body)
292            .field("transport", &format_args!(".."))
293            .field("buffer", &format_args!(".."))
294            .field("request_body_state", &self.request_body_state)
295            .field("secure", &self.secure)
296            .field("after_send", &format_args!(".."))
297            .field("start_time", &self.start_time)
298            .field("peer_ip", &self.peer_ip)
299            .field("authority", &self.authority)
300            .field("scheme", &self.scheme)
301            .field("protocol", &self.protocol)
302            .field("protocol_session", &self.protocol_session)
303            .field("request_trailers", &self.request_trailers)
304            .finish()
305    }
306}
307
308impl<Transport> Conn<Transport>
309where
310    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
311{
312    /// Returns the shared state on this conn, if set
313    pub fn shared_state(&self) -> &TypeSet {
314        &self.context.shared_state
315    }
316
317    /// sets the http status code from any `TryInto<Status>`.
318    ///
319    /// ```
320    /// # use trillium_http::Status;
321    /// # trillium_testing::HttpTest::new(|mut conn| async move {
322    /// assert!(conn.status().is_none());
323    ///
324    /// conn.set_status(200); // a status can be set as a u16
325    /// assert_eq!(conn.status().unwrap(), Status::Ok);
326    ///
327    /// conn.set_status(Status::ImATeapot); // or as a Status
328    /// assert_eq!(conn.status().unwrap(), Status::ImATeapot);
329    /// conn
330    /// # }).get("/").block().assert_status(Status::ImATeapot);
331    /// ```
332    pub fn set_status(&mut self, status: impl TryInto<Status>) -> &mut Self {
333        self.status = Some(status.try_into().unwrap_or_else(|_| {
334            log::error!("attempted to set an invalid status code");
335            Status::InternalServerError
336        }));
337        self
338    }
339
340    /// sets the http status code from any `TryInto<Status>`, returning Conn
341    #[must_use]
342    pub fn with_status(mut self, status: impl TryInto<Status>) -> Self {
343        self.set_status(status);
344        self
345    }
346
347    /// retrieves the path part of the request url, up to and excluding any query component
348    /// ```
349    /// # use trillium_testing::HttpTest;
350    /// HttpTest::new(|mut conn| async move {
351    ///     assert_eq!(conn.path(), "/some/path");
352    ///     conn.with_status(200)
353    /// })
354    /// .get("/some/path?and&a=query")
355    /// .block()
356    /// .assert_ok();
357    /// ```
358    pub fn path(&self) -> &str {
359        match self.path.split_once('?') {
360            Some((path, _)) => path,
361            None => &self.path,
362        }
363    }
364
365    /// retrieves the combined path and any query
366    pub fn path_and_query(&self) -> &str {
367        &self.path
368    }
369
370    /// retrieves the query component of the path, or an empty &str
371    ///
372    /// ```
373    /// # use trillium_testing::HttpTest;
374    /// let server = HttpTest::new(|conn| async move {
375    ///     let querystring = conn.querystring().to_string();
376    ///     conn.with_response_body(querystring).with_status(200)
377    /// });
378    ///
379    /// server
380    ///     .get("/some/path?and&a=query")
381    ///     .block()
382    ///     .assert_body("and&a=query");
383    ///
384    /// server.get("/some/path").block().assert_body("");
385    /// ```
386    pub fn querystring(&self) -> &str {
387        self.path
388            .split_once('?')
389            .map(|(_, query)| query)
390            .unwrap_or_default()
391    }
392
393    /// get the host for this conn, if it exists
394    pub fn host(&self) -> Option<&str> {
395        self.request_headers.get_str(Host)
396    }
397
398    /// set the host for this conn
399    pub fn set_host(&mut self, host: String) -> &mut Self {
400        self.request_headers.insert(Host, host);
401        self
402    }
403
404    /// Cancels and drops the future if reading from the transport results in an error or empty read
405    ///
406    /// The use of this method is not advised if your connected http client employs pipelining
407    /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
408    /// time
409    ///
410    /// If the client disconnects from the conn's transport, this function will return None. If the
411    /// future completes without disconnection, this future will return Some containing the output
412    /// of the future.
413    ///
414    /// Note that the inner future cannot borrow conn, so you will need to clone or take any
415    /// information needed to execute the future prior to executing this method.
416    ///
417    /// # Example
418    ///
419    /// ```rust
420    /// # use futures_lite::{AsyncRead, AsyncWrite};
421    /// # use trillium_http::{Conn, Method};
422    /// async fn something_slow_and_cancel_safe() -> String {
423    ///     String::from("this was not actually slow")
424    /// }
425    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
426    /// where
427    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
428    /// {
429    ///     let Some(returned_body) = conn
430    ///         .cancel_on_disconnect(async { something_slow_and_cancel_safe().await })
431    ///         .await
432    ///     else {
433    ///         return conn;
434    ///     };
435    ///     conn.with_response_body(returned_body).with_status(200)
436    /// }
437    /// ```
438    pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
439    where
440        Fut: Future + Send + 'a,
441    {
442        CancelOnDisconnect(self, pin!(fut)).await
443    }
444
445    /// Check if the transport is connected by attempting to read from the transport
446    ///
447    /// # Example
448    ///
449    /// This is best to use at appropriate points in a long-running handler, like:
450    ///
451    /// ```rust
452    /// # use futures_lite::{AsyncRead, AsyncWrite};
453    /// # use trillium_http::{Conn, Method};
454    /// # async fn something_slow_but_not_cancel_safe() {}
455    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
456    /// where
457    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
458    /// {
459    ///     for _ in 0..100 {
460    ///         if conn.is_disconnected().await {
461    ///             return conn;
462    ///         }
463    ///         something_slow_but_not_cancel_safe().await;
464    ///     }
465    ///     conn.with_status(200)
466    /// }
467    /// ```
468    pub async fn is_disconnected(&mut self) -> bool {
469        future::poll_once(LivenessFut::new(self)).await.is_some()
470    }
471
472    /// returns the [`encoding_rs::Encoding`] for this request, as determined from the mime-type
473    /// charset, if available
474    ///
475    /// ```
476    /// # use trillium_testing::HttpTest;
477    /// HttpTest::new(|mut conn| async move {
478    ///     assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
479    ///
480    ///     conn.request_headers_mut()
481    ///         .insert("content-type", "text/plain;charset=utf-16");
482    ///     assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
483    ///
484    ///     conn.with_status(200)
485    /// })
486    /// .get("/")
487    /// .block()
488    /// .assert_ok();
489    /// ```
490    pub fn request_encoding(&self) -> &'static Encoding {
491        encoding(&self.request_headers)
492    }
493
494    /// returns the [`encoding_rs::Encoding`] for this response, as
495    /// determined from the mime-type charset, if available
496    ///
497    /// ```
498    /// # use trillium_testing::HttpTest;
499    /// HttpTest::new(|mut conn| async move {
500    ///     assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
501    ///     conn.response_headers_mut()
502    ///         .insert("content-type", "text/plain;charset=utf-16");
503    ///
504    ///     assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
505    ///
506    ///     conn.with_status(200)
507    /// })
508    /// .get("/")
509    /// .block()
510    /// .assert_ok();
511    /// ```
512    pub fn response_encoding(&self) -> &'static Encoding {
513        encoding(&self.response_headers)
514    }
515
516    /// returns a [`ReceivedBody`] that references this conn. the conn
517    /// retains all data and holds the singular transport, but the
518    /// `ReceivedBody` provides an interface to read body content.
519    ///
520    /// If the request included an `Expect: 100-continue` header, the 100 Continue response is sent
521    /// lazily on the first read from the returned [`ReceivedBody`].
522    /// ```
523    /// # use trillium_testing::HttpTest;
524    /// let server = HttpTest::new(|mut conn| async move {
525    ///     let request_body = conn.request_body();
526    ///     assert_eq!(request_body.content_length(), Some(5));
527    ///     assert_eq!(request_body.read_string().await.unwrap(), "hello");
528    ///     conn.with_status(200)
529    /// });
530    ///
531    /// server.post("/").with_body("hello").block().assert_ok();
532    /// ```
533    pub fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
534        let needs_100_continue = self.needs_100_continue();
535        let body = self.build_request_body();
536        if needs_100_continue {
537            body.with_send_100_continue()
538        } else {
539            body
540        }
541    }
542
543    /// returns a clone of the [`swansong::Swansong`] for this Conn. use
544    /// this to gracefully stop long-running futures and streams
545    /// inside of handler functions
546    pub fn swansong(&self) -> Swansong {
547        self.protocol_session
548            .h3_connection()
549            .map_or_else(|| self.context.swansong.clone(), |h| h.swansong().clone())
550    }
551
552    /// Registers a function to call after the http response has been
553    /// completely transferred.
554    ///
555    /// The callback is guaranteed to fire **exactly once** before the conn is
556    /// dropped. Either the codec's send path invokes it with the real outcome,
557    /// or — if the conn is dropped before send completes (handler panic,
558    /// transport error, mid-write disconnect) — the drop fallback invokes it
559    /// with a `SendStatus` whose `is_success()` returns false. Multiple
560    /// registrations on the same conn chain in registration order.
561    ///
562    /// Because firing is ordered by send-completion rather than handler return,
563    /// this is the right hook for instrumentation that wants to report what the
564    /// peer actually observed (`trillium-logger` and the out-of-tree
565    /// `trillium-opentelemetry` handler both depend on this property).
566    ///
567    /// Please note that this is a sync function and should be computationally
568    /// lightweight. If your _application_ needs additional async processing,
569    /// use your runtime's task spawn within this hook. If your _library_ needs
570    /// additional async processing in an `after_send` hook, please open an
571    /// issue. This hook is currently designed for simple instrumentation and
572    /// logging, and should be thought of as equivalent to a Drop hook.
573    pub fn after_send<F>(&mut self, after_send: F)
574    where
575        F: FnOnce(SendStatus) + Send + Sync + 'static,
576    {
577        self.after_send.append(after_send);
578    }
579
580    /// applies a mapping function from one transport to another. This
581    /// is particularly useful for boxing the transport. unless you're
582    /// sure this is what you're looking for, you probably don't want
583    /// to be using this
584    pub fn map_transport<NewTransport>(
585        self,
586        f: impl Fn(Transport) -> NewTransport,
587    ) -> Conn<NewTransport>
588    where
589        NewTransport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
590    {
591        // Manual respread: rustc treats `Conn<Transport>` and `Conn<NewTransport>` as
592        // disjoint types and rejects `..self` without the unstable
593        // `type_changing_struct_update` feature. If a new field is added to `Conn`,
594        // update this respread, `Upgrade::map_transport`, and `From<Conn> for Upgrade`
595        // (`upgrade.rs`) — they share this drift hazard.
596        Conn {
597            context: self.context,
598            request_headers: self.request_headers,
599            response_headers: self.response_headers,
600            method: self.method,
601            response_body: self.response_body,
602            path: self.path,
603            status: self.status,
604            version: self.version,
605            state: self.state,
606            transport: f(self.transport),
607            buffer: self.buffer,
608            request_body_state: self.request_body_state,
609            secure: self.secure,
610            after_send: self.after_send,
611            start_time: self.start_time,
612            peer_ip: self.peer_ip,
613            authority: self.authority,
614            scheme: self.scheme,
615            protocol: self.protocol,
616            protocol_session: self.protocol_session,
617            request_trailers: self.request_trailers,
618        }
619    }
620
621    /// whether this conn is suitable for an http upgrade to another protocol
622    pub fn should_upgrade(&self) -> bool {
623        (self.method() == Method::Connect && self.status == Some(Status::Ok))
624            || self.status == Some(Status::SwitchingProtocols)
625    }
626
627    #[doc(hidden)]
628    pub fn finalize_headers(&mut self) {
629        if self.version == Version::Http3 {
630            self.finalize_response_headers_h3();
631        } else {
632            self.finalize_response_headers_1x();
633        }
634    }
635
636    /// the [`H2Connection`] driver for this conn, if this is an HTTP/2 request
637    pub fn h2_connection(&self) -> Option<&Arc<H2Connection>> {
638        self.protocol_session.h2_connection()
639    }
640
641    /// the h2 stream id for this conn, if this is an HTTP/2 request
642    pub fn h2_stream_id(&self) -> Option<u32> {
643        self.protocol_session.h2_stream_id()
644    }
645
646    /// the [`H3Connection`] driver for this conn, if this is an HTTP/3 request
647    pub fn h3_connection(&self) -> Option<&Arc<H3Connection>> {
648        self.protocol_session.h3_connection()
649    }
650
651    /// the h3 stream id for this conn, if this is an HTTP/3 request
652    pub fn h3_stream_id(&self) -> Option<u64> {
653        self.protocol_session.h3_stream_id()
654    }
655}