Skip to main content

trillium_http/
conn.rs

1use crate::{
2    Body, Buffer, Headers, HttpContext,
3    KnownHeaderName::Host,
4    Method, ProtocolSession, ReceivedBody, Status, Swansong, TypeSet, Version,
5    after_send::{AfterSend, SendStatus},
6    h2::H2Connection,
7    h3::H3Connection,
8    liveness::{CancelOnDisconnect, LivenessFut},
9    received_body::ReceivedBodyState,
10    util::encoding,
11};
12use encoding_rs::Encoding;
13use futures_lite::{
14    future,
15    io::{AsyncRead, AsyncWrite},
16};
17use std::{
18    borrow::Cow,
19    fmt::{self, Debug, Formatter},
20    future::Future,
21    net::IpAddr,
22    pin::pin,
23    str,
24    sync::Arc,
25    time::Instant,
26};
27mod h1;
28mod h2;
29mod h3;
30mod shared;
31pub(crate) use h1::{HeadError, write_headers_or_trailers};
32pub(crate) use h3::{H3FirstFrame, encode_field_section_h3};
33
34/// An HTTP connection.
35///
36/// This struct represents both the request and the response, and holds the
37/// transport over which the response will be sent.
38#[derive(fieldwork::Fieldwork)]
39pub struct Conn<Transport> {
40    #[field(get)]
41    /// the shared [`HttpContext`]
42    pub(crate) context: Arc<HttpContext>,
43
44    /// request [headers](Headers)
45    #[field(get, get_mut)]
46    pub(crate) request_headers: Headers,
47
48    /// response [headers](Headers)
49    #[field(get, get_mut)]
50    pub(crate) response_headers: Headers,
51
52    pub(crate) path: Cow<'static, str>,
53
54    /// the http method for this conn's request
55    ///
56    /// ```
57    /// # use trillium_http::{Conn, Method};
58    /// let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
59    /// assert_eq!(conn.method(), Method::Get);
60    /// ```
61    #[field(get, set, copy)]
62    pub(crate) method: Method,
63
64    /// the http status for this conn, if set
65    #[field(get, copy)]
66    pub(crate) status: Option<Status>,
67
68    /// The HTTP protocol version in use on this connection.
69    ///
70    /// ```
71    /// # use trillium_http::{Conn, Method, Version};
72    /// let conn = Conn::new_synthetic(Method::Get, "/", ());
73    /// assert_eq!(conn.http_version(), Version::Http1_1);
74    /// ```
75    #[field(get = http_version, copy)]
76    pub(crate) version: Version,
77
78    /// the [state typemap](TypeSet) for this conn
79    #[field(get, get_mut)]
80    pub(crate) state: TypeSet,
81
82    /// the response [body](Body)
83    ///
84    /// ```
85    /// # use trillium_testing::HttpTest;
86    /// HttpTest::new(|conn| async move { conn.with_response_body("hello") })
87    ///     .get("/")
88    ///     .block()
89    ///     .assert_body("hello");
90    ///
91    /// HttpTest::new(|conn| async move { conn.with_response_body(String::from("world")) })
92    ///     .get("/")
93    ///     .block()
94    ///     .assert_body("world");
95    ///
96    /// HttpTest::new(|conn| async move { conn.with_response_body(vec![99, 97, 116]) })
97    ///     .get("/")
98    ///     .block()
99    ///     .assert_body("cat");
100    /// ```
101    #[field(get, set, into, option_set_some, take, with)]
102    pub(crate) response_body: Option<Body>,
103
104    /// the transport
105    ///
106    /// This should only be used to call your own custom methods on the transport that do not read
107    /// or write any data. Calling any method that reads from or writes to the transport will
108    /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
109    /// use an HTTP upgrade.
110    #[field(get, get_mut)]
111    pub(crate) transport: Transport,
112
113    pub(crate) buffer: Buffer,
114
115    pub(crate) request_body_state: ReceivedBodyState,
116
117    pub(crate) after_send: AfterSend,
118
119    /// whether the connection is secure
120    ///
121    /// note that this does not necessarily indicate that the transport itself is secure, as it may
122    /// indicate that `trillium_http` is behind a trusted reverse proxy that has terminated tls and
123    /// provided appropriate headers to indicate this.
124    #[field(get, set, rename_predicates)]
125    pub(crate) secure: bool,
126
127    /// The [`Instant`] that the first header bytes for this conn were
128    /// received, before any processing or parsing has been performed.
129    #[field(get, copy)]
130    pub(crate) start_time: Instant,
131
132    /// The IP Address for the connection, if available
133    #[field(set, get, copy, into)]
134    pub(crate) peer_ip: Option<IpAddr>,
135
136    /// the `:authority` pseudo-header
137    #[field(set, get, into)]
138    pub(crate) authority: Option<Cow<'static, str>>,
139
140    /// the `:scheme` pseudo-header
141    #[field(set, get, into)]
142    pub(crate) scheme: Option<Cow<'static, str>>,
143
144    /// the [`ProtocolSession`] for this conn — the per-protocol session state
145    /// (h2/h3 connection driver and stream id) bundled into a single enum so the
146    /// "set together" invariant is enforced at the type level. `Http1` for
147    /// h1 / synthetic conns.
148    pub(crate) protocol_session: ProtocolSession,
149
150    /// the `:protocol` pseudo-header (extended CONNECT)
151    #[field(set, get, into)]
152    pub(crate) protocol: Option<Cow<'static, str>>,
153
154    /// request trailers, populated after the request body has been fully read
155    #[field(get, get_mut)]
156    pub(crate) request_trailers: Option<Headers>,
157
158    /// Marker set via [`Conn::upgrade`].
159    pub(crate) upgrade: bool,
160}
161
162impl<Transport> Debug for Conn<Transport> {
163    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
164        f.debug_struct("Conn")
165            .field("context", &self.context)
166            .field("request_headers", &self.request_headers)
167            .field("response_headers", &self.response_headers)
168            .field("path", &self.path)
169            .field("method", &self.method)
170            .field("status", &self.status)
171            .field("version", &self.version)
172            .field("state", &self.state)
173            .field("response_body", &self.response_body)
174            .field("transport", &format_args!(".."))
175            .field("buffer", &format_args!(".."))
176            .field("request_body_state", &self.request_body_state)
177            .field("secure", &self.secure)
178            .field("after_send", &format_args!(".."))
179            .field("start_time", &self.start_time)
180            .field("peer_ip", &self.peer_ip)
181            .field("authority", &self.authority)
182            .field("scheme", &self.scheme)
183            .field("protocol", &self.protocol)
184            .field("protocol_session", &self.protocol_session)
185            .field("request_trailers", &self.request_trailers)
186            .field("upgrade", &self.upgrade)
187            .finish()
188    }
189}
190
191impl<Transport> Conn<Transport>
192where
193    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
194{
195    /// Returns the shared state typemap for this conn.
196    pub fn shared_state(&self) -> &TypeSet {
197        &self.context.shared_state
198    }
199
200    /// sets the http status code from any `TryInto<Status>`.
201    ///
202    /// ```
203    /// # use trillium_http::Status;
204    /// # trillium_testing::HttpTest::new(|mut conn| async move {
205    /// assert!(conn.status().is_none());
206    ///
207    /// conn.set_status(200); // a status can be set as a u16
208    /// assert_eq!(conn.status().unwrap(), Status::Ok);
209    ///
210    /// conn.set_status(Status::ImATeapot); // or as a Status
211    /// assert_eq!(conn.status().unwrap(), Status::ImATeapot);
212    /// conn
213    /// # }).get("/").block().assert_status(Status::ImATeapot);
214    /// ```
215    pub fn set_status(&mut self, status: impl TryInto<Status>) -> &mut Self {
216        self.status = Some(status.try_into().unwrap_or_else(|_| {
217            log::error!("attempted to set an invalid status code");
218            Status::InternalServerError
219        }));
220        self
221    }
222
223    /// sets the http status code from any `TryInto<Status>`, returning Conn
224    #[must_use]
225    pub fn with_status(mut self, status: impl TryInto<Status>) -> Self {
226        self.set_status(status);
227        self
228    }
229
230    /// The status to send on the wire: the explicitly-set status, or a
231    /// method-appropriate default when a handler left it unset. Unhandled
232    /// requests default to `404 Not Found`, except CONNECT, which defaults to
233    /// `501 Not Implemented`: an origin server implements no tunnel, and 404's
234    /// resource model does not apply to CONNECT's authority-form target.
235    pub(crate) fn response_status(&self) -> Status {
236        self.status.unwrap_or(match self.method {
237            Method::Connect => Status::NotImplemented,
238            _ => Status::NotFound,
239        })
240    }
241
242    /// retrieves the path part of the request url, up to and excluding any query component
243    /// ```
244    /// # use trillium_testing::HttpTest;
245    /// HttpTest::new(|mut conn| async move {
246    ///     assert_eq!(conn.path(), "/some/path");
247    ///     conn.with_status(200)
248    /// })
249    /// .get("/some/path?and&a=query")
250    /// .block()
251    /// .assert_ok();
252    /// ```
253    pub fn path(&self) -> &str {
254        match self.path.split_once('?') {
255            Some((path, _)) => path,
256            None => &self.path,
257        }
258    }
259
260    /// retrieves the combined path and any query
261    pub fn path_and_query(&self) -> &str {
262        &self.path
263    }
264
265    /// retrieves the query component of the path, or an empty &str
266    ///
267    /// ```
268    /// # use trillium_testing::HttpTest;
269    /// let server = HttpTest::new(|conn| async move {
270    ///     let querystring = conn.querystring().to_string();
271    ///     conn.with_response_body(querystring).with_status(200)
272    /// });
273    ///
274    /// server
275    ///     .get("/some/path?and&a=query")
276    ///     .block()
277    ///     .assert_body("and&a=query");
278    ///
279    /// server.get("/some/path").block().assert_body("");
280    /// ```
281    pub fn querystring(&self) -> &str {
282        self.path
283            .split_once('?')
284            .map(|(_, query)| query)
285            .unwrap_or_default()
286    }
287
288    /// get the host for this conn, if it exists
289    pub fn host(&self) -> Option<&str> {
290        self.request_headers.get_str(Host)
291    }
292
293    /// set the host for this conn
294    pub fn set_host(&mut self, host: String) -> &mut Self {
295        self.request_headers.insert(Host, host);
296        self
297    }
298
299    /// Cancels and drops the future if reading from the transport results in an error or empty read
300    ///
301    /// The use of this method is not advised if your connected http client employs pipelining
302    /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
303    /// time
304    ///
305    /// If the client disconnects from the conn's transport, this function will return None. If the
306    /// future completes without disconnection, this future will return Some containing the output
307    /// of the future.
308    ///
309    /// Note that the inner future cannot borrow conn, so you will need to clone or take any
310    /// information needed to execute the future prior to executing this method.
311    ///
312    /// # Example
313    ///
314    /// ```rust
315    /// # use futures_lite::{AsyncRead, AsyncWrite};
316    /// # use trillium_http::{Conn, Method};
317    /// async fn something_slow_and_cancel_safe() -> String {
318    ///     String::from("this was not actually slow")
319    /// }
320    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
321    /// where
322    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
323    /// {
324    ///     let Some(returned_body) = conn
325    ///         .cancel_on_disconnect(async { something_slow_and_cancel_safe().await })
326    ///         .await
327    ///     else {
328    ///         return conn;
329    ///     };
330    ///     conn.with_response_body(returned_body).with_status(200)
331    /// }
332    /// ```
333    pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
334    where
335        Fut: Future + Send + 'a,
336    {
337        CancelOnDisconnect(self, pin!(fut)).await
338    }
339
340    /// Check if the transport is connected by attempting to read from the transport
341    ///
342    /// # Example
343    ///
344    /// This is best to use at appropriate points in a long-running handler, like:
345    ///
346    /// ```rust
347    /// # use futures_lite::{AsyncRead, AsyncWrite};
348    /// # use trillium_http::{Conn, Method};
349    /// # async fn something_slow_but_not_cancel_safe() {}
350    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
351    /// where
352    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
353    /// {
354    ///     for _ in 0..100 {
355    ///         if conn.is_disconnected().await {
356    ///             return conn;
357    ///         }
358    ///         something_slow_but_not_cancel_safe().await;
359    ///     }
360    ///     conn.with_status(200)
361    /// }
362    /// ```
363    pub async fn is_disconnected(&mut self) -> bool {
364        future::poll_once(LivenessFut::new(self)).await.is_some()
365    }
366
367    /// returns the [`encoding_rs::Encoding`] for this request, as determined from the mime-type
368    /// charset, if available
369    ///
370    /// ```
371    /// # use trillium_testing::HttpTest;
372    /// HttpTest::new(|mut conn| async move {
373    ///     assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
374    ///
375    ///     conn.request_headers_mut()
376    ///         .insert("content-type", "text/plain;charset=utf-16");
377    ///     assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
378    ///
379    ///     conn.with_status(200)
380    /// })
381    /// .get("/")
382    /// .block()
383    /// .assert_ok();
384    /// ```
385    pub fn request_encoding(&self) -> &'static Encoding {
386        encoding(&self.request_headers)
387    }
388
389    /// returns the [`encoding_rs::Encoding`] for this response, as
390    /// determined from the mime-type charset, if available
391    ///
392    /// ```
393    /// # use trillium_testing::HttpTest;
394    /// HttpTest::new(|mut conn| async move {
395    ///     assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
396    ///     conn.response_headers_mut()
397    ///         .insert("content-type", "text/plain;charset=utf-16");
398    ///
399    ///     assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
400    ///
401    ///     conn.with_status(200)
402    /// })
403    /// .get("/")
404    /// .block()
405    /// .assert_ok();
406    /// ```
407    pub fn response_encoding(&self) -> &'static Encoding {
408        encoding(&self.response_headers)
409    }
410
411    /// returns a [`ReceivedBody`] that references this conn. the conn
412    /// retains all data and holds the singular transport, but the
413    /// `ReceivedBody` provides an interface to read body content.
414    ///
415    /// If the request included an `Expect: 100-continue` header, the 100 Continue response is sent
416    /// lazily on the first read from the returned [`ReceivedBody`].
417    /// ```
418    /// # use trillium_testing::HttpTest;
419    /// let server = HttpTest::new(|mut conn| async move {
420    ///     let request_body = conn.request_body();
421    ///     assert_eq!(request_body.content_length(), Some(5));
422    ///     assert_eq!(request_body.read_string().await.unwrap(), "hello");
423    ///     conn.with_status(200)
424    /// });
425    ///
426    /// server.post("/").with_body("hello").block().assert_ok();
427    /// ```
428    pub fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
429        let needs_100_continue = self.needs_100_continue();
430        let body = self.build_request_body();
431        if needs_100_continue {
432            body.with_send_100_continue()
433        } else {
434            body
435        }
436    }
437
438    /// returns a clone of the [`swansong::Swansong`] for this Conn. use
439    /// this to gracefully stop long-running futures and streams
440    /// inside of handler functions
441    pub fn swansong(&self) -> Swansong {
442        self.protocol_session
443            .h3_connection()
444            .map_or_else(|| self.context.swansong.clone(), |h| h.swansong().clone())
445    }
446
447    /// Registers a function to call after the http response has been
448    /// completely transferred.
449    ///
450    /// The callback is guaranteed to fire **exactly once** before the conn is
451    /// dropped. Either the codec's send path invokes it with the real outcome,
452    /// or — if the conn is dropped before send completes (handler panic,
453    /// transport error, mid-write disconnect) — the drop fallback invokes it
454    /// with a `SendStatus` whose `is_success()` returns false. Multiple
455    /// registrations on the same conn chain in registration order.
456    ///
457    /// Because firing is ordered by send-completion rather than handler return,
458    /// this is the right hook for instrumentation that wants to report what the
459    /// peer actually observed.
460    ///
461    /// This is a sync function and should be computationally lightweight. If
462    /// your _application_ needs additional async processing, use your runtime's
463    /// task spawn within this hook. If your _library_ needs additional async
464    /// processing in an `after_send` hook, please open an issue.
465    pub fn after_send<F>(&mut self, after_send: F)
466    where
467        F: FnOnce(SendStatus) + Send + Sync + 'static,
468    {
469        self.after_send.append(after_send);
470    }
471
472    /// applies a mapping function from one transport to another. This
473    /// is particularly useful for boxing the transport. unless you're
474    /// sure this is what you're looking for, you probably don't want
475    /// to be using this
476    pub fn map_transport<NewTransport>(
477        self,
478        f: impl Fn(Transport) -> NewTransport,
479    ) -> Conn<NewTransport>
480    where
481        NewTransport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
482    {
483        // Manual respread: rustc treats `Conn<Transport>` and `Conn<NewTransport>` as
484        // disjoint types and rejects `..self` without the unstable
485        // `type_changing_struct_update` feature. If a new field is added to `Conn`,
486        // update this respread, `Upgrade::map_transport`, and `From<Conn> for Upgrade`
487        // (`upgrade.rs`) — they share this drift hazard.
488        Conn {
489            context: self.context,
490            request_headers: self.request_headers,
491            response_headers: self.response_headers,
492            method: self.method,
493            response_body: self.response_body,
494            path: self.path,
495            status: self.status,
496            version: self.version,
497            state: self.state,
498            transport: f(self.transport),
499            buffer: self.buffer,
500            request_body_state: self.request_body_state,
501            secure: self.secure,
502            after_send: self.after_send,
503            start_time: self.start_time,
504            peer_ip: self.peer_ip,
505            authority: self.authority,
506            scheme: self.scheme,
507            protocol: self.protocol,
508            protocol_session: self.protocol_session,
509            request_trailers: self.request_trailers,
510            upgrade: self.upgrade,
511        }
512    }
513
514    /// whether this conn is suitable for an http upgrade to another protocol
515    pub fn should_upgrade(&self) -> bool {
516        self.upgrade
517            || (self.method() == Method::Connect && self.status == Some(Status::Ok))
518            || self.status == Some(Status::SwitchingProtocols)
519    }
520
521    /// Mark this conn to be handed off as an upgrade once the response headers are sent.
522    /// Set the response status (typically `200`) and any headers describing the upgraded
523    /// byte stream before calling; the handler's `upgrade` method receives an [`Upgrade`]
524    /// with per-protocol framing applied on its `AsyncRead`/`AsyncWrite`.
525    #[doc(hidden)]
526    #[must_use]
527    pub fn upgrade(mut self) -> Self {
528        self.upgrade = true;
529        self
530    }
531
532    #[doc(hidden)]
533    pub fn finalize_headers(&mut self) {
534        if self.version == Version::Http3 {
535            self.finalize_response_headers_h3();
536        } else {
537            self.finalize_response_headers_1x();
538        }
539    }
540
541    /// the [`H2Connection`] driver for this conn, if this is an HTTP/2 request
542    pub fn h2_connection(&self) -> Option<&Arc<H2Connection>> {
543        self.protocol_session.h2_connection()
544    }
545
546    /// the h2 stream id for this conn, if this is an HTTP/2 request
547    pub fn h2_stream_id(&self) -> Option<u32> {
548        self.protocol_session.h2_stream_id()
549    }
550
551    /// the [`H3Connection`] driver for this conn, if this is an HTTP/3 request
552    pub fn h3_connection(&self) -> Option<&Arc<H3Connection>> {
553        self.protocol_session.h3_connection()
554    }
555
556    /// the h3 stream id for this conn, if this is an HTTP/3 request
557    pub fn h3_stream_id(&self) -> Option<u64> {
558        self.protocol_session.h3_stream_id()
559    }
560}