Skip to main content

trillium_http/
conn.rs

1use crate::{
2    Body, Buffer, Headers, HttpContext,
3    KnownHeaderName::Host,
4    Method, ReceivedBody, Status, Swansong, TypeSet, Version,
5    after_send::{AfterSend, SendStatus},
6    h3::H3Connection,
7    liveness::{CancelOnDisconnect, LivenessFut},
8    received_body::ReceivedBodyState,
9    util::encoding,
10};
11use encoding_rs::Encoding;
12use futures_lite::{
13    future,
14    io::{AsyncRead, AsyncWrite},
15};
16use std::{
17    borrow::Cow,
18    fmt::{self, Debug, Formatter},
19    future::Future,
20    net::IpAddr,
21    pin::pin,
22    str,
23    sync::Arc,
24    time::Instant,
25};
26mod h1;
27mod h3;
28
29/// A http connection
30///
31/// Unlike in other rust http implementations, this struct represents both
32/// the request and the response, and holds the transport over which the
33/// response will be sent.
34#[derive(fieldwork::Fieldwork)]
35pub struct Conn<Transport> {
36    #[field(get)]
37    /// the shared [`HttpContext`]
38    pub(crate) context: Arc<HttpContext>,
39
40    /// request [headers](Headers)
41    #[field(get, get_mut)]
42    pub(crate) request_headers: Headers,
43
44    /// response [headers](Headers)
45    #[field(get, get_mut)]
46    pub(crate) response_headers: Headers,
47
48    pub(crate) path: Cow<'static, str>,
49
50    /// the http method for this conn's request
51    ///
52    /// ```
53    /// # use trillium_http::{Conn, Method};
54    /// let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
55    /// assert_eq!(conn.method(), Method::Get);
56    /// ```
57    #[field(get, set, copy)]
58    pub(crate) method: Method,
59
60    /// the http status for this conn, if set
61    #[field(get, copy)]
62    pub(crate) status: Option<Status>,
63
64    #[field(get = http_version, copy)]
65    /// the http version for this conn
66    pub(crate) version: Version,
67
68    /// the [state typemap](TypeSet) for this conn
69    #[field(get, get_mut)]
70    pub(crate) state: TypeSet,
71
72    /// the response [body](Body)
73    ///
74    /// ```
75    /// # use trillium_testing::HttpTest;
76    /// HttpTest::new(|conn| async move { conn.with_response_body("hello") })
77    ///     .get("/")
78    ///     .block()
79    ///     .assert_body("hello");
80    ///
81    /// HttpTest::new(|conn| async move { conn.with_response_body(String::from("world")) })
82    ///     .get("/")
83    ///     .block()
84    ///     .assert_body("world");
85    ///
86    /// HttpTest::new(|conn| async move { conn.with_response_body(vec![99, 97, 116]) })
87    ///     .get("/")
88    ///     .block()
89    ///     .assert_body("cat");
90    /// ```
91    #[field(get, set, into, option_set_some, take, with)]
92    pub(crate) response_body: Option<Body>,
93
94    /// the transport
95    ///
96    /// This should only be used to call your own custom methods on the transport that do not read
97    /// or write any data. Calling any method that reads from or writes to the transport will
98    /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
99    /// use an HTTP upgrade.
100    #[field(get, get_mut)]
101    pub(crate) transport: Transport,
102
103    pub(crate) buffer: Buffer,
104
105    pub(crate) request_body_state: ReceivedBodyState,
106
107    pub(crate) after_send: AfterSend,
108
109    /// whether the connection is secure
110    ///
111    /// note that this does not necessarily indicate that the transport itself is secure, as it may
112    /// indicate that `trillium_http` is behind a trusted reverse proxy that has terminated tls and
113    /// provided appropriate headers to indicate this.
114    #[field(get, set, rename_predicates)]
115    pub(crate) secure: bool,
116
117    /// The [`Instant`] that the first header bytes for this conn were
118    /// received, before any processing or parsing has been performed.
119    #[field(get, copy)]
120    pub(crate) start_time: Instant,
121
122    /// The IP Address for the connection, if available
123    #[field(set, get, copy, into)]
124    pub(crate) peer_ip: Option<IpAddr>,
125
126    /// the :authority http/3 pseudo-header
127    #[field(set, get, into)]
128    pub(crate) authority: Option<Cow<'static, str>>,
129
130    /// the :scheme http/3 pseudo-header
131    #[field(set, get, into)]
132    pub(crate) scheme: Option<Cow<'static, str>>,
133
134    /// the [`H3Connection`] for this conn, if this is an HTTP/3 request
135    #[field(get(deref = false))]
136    pub(crate) h3_connection: Option<Arc<H3Connection>>,
137
138    /// the :protocol http/3 pseudo-header
139    #[field(set, get, into)]
140    pub(crate) protocol: Option<Cow<'static, str>>,
141
142    /// request trailers, populated after the request body has been fully read
143    #[field(get, get_mut)]
144    pub(crate) request_trailers: Option<Headers>,
145}
146
147impl<Transport> Debug for Conn<Transport> {
148    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
149        f.debug_struct("Conn")
150            .field("context", &self.context)
151            .field("request_headers", &self.request_headers)
152            .field("response_headers", &self.response_headers)
153            .field("path", &self.path)
154            .field("method", &self.method)
155            .field("status", &self.status)
156            .field("version", &self.version)
157            .field("state", &self.state)
158            .field("response_body", &self.response_body)
159            .field("transport", &format_args!(".."))
160            .field("buffer", &format_args!(".."))
161            .field("request_body_state", &self.request_body_state)
162            .field("secure", &self.secure)
163            .field("after_send", &format_args!(".."))
164            .field("start_time", &self.start_time)
165            .field("peer_ip", &self.peer_ip)
166            .field("authority", &self.authority)
167            .field("scheme", &self.scheme)
168            .field("protocol", &self.protocol)
169            .field("h3_connection", &self.h3_connection)
170            .field("request_trailers", &self.request_trailers)
171            .finish()
172    }
173}
174
175impl<Transport> Conn<Transport>
176where
177    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
178{
179    /// Returns the shared state on this conn, if set
180    pub fn shared_state(&self) -> &TypeSet {
181        &self.context.shared_state
182    }
183
184    /// sets the http status code from any `TryInto<Status>`.
185    ///
186    /// ```
187    /// # use trillium_http::Status;
188    /// # trillium_testing::HttpTest::new(|mut conn| async move {
189    /// assert!(conn.status().is_none());
190    ///
191    /// conn.set_status(200); // a status can be set as a u16
192    /// assert_eq!(conn.status().unwrap(), Status::Ok);
193    ///
194    /// conn.set_status(Status::ImATeapot); // or as a Status
195    /// assert_eq!(conn.status().unwrap(), Status::ImATeapot);
196    /// conn
197    /// # }).get("/").block().assert_status(Status::ImATeapot);
198    /// ```
199    pub fn set_status(&mut self, status: impl TryInto<Status>) -> &mut Self {
200        self.status = Some(status.try_into().unwrap_or_else(|_| {
201            log::error!("attempted to set an invalid status code");
202            Status::InternalServerError
203        }));
204        self
205    }
206
207    /// sets the http status code from any `TryInto<Status>`, returning Conn
208    #[must_use]
209    pub fn with_status(mut self, status: impl TryInto<Status>) -> Self {
210        self.set_status(status);
211        self
212    }
213
214    /// retrieves the path part of the request url, up to and excluding any query component
215    /// ```
216    /// # use trillium_testing::HttpTest;
217    /// HttpTest::new(|mut conn| async move {
218    ///     assert_eq!(conn.path(), "/some/path");
219    ///     conn.with_status(200)
220    /// })
221    /// .get("/some/path?and&a=query")
222    /// .block()
223    /// .assert_ok();
224    /// ```
225    pub fn path(&self) -> &str {
226        match self.path.split_once('?') {
227            Some((path, _)) => path,
228            None => &self.path,
229        }
230    }
231
232    /// retrieves the combined path and any query
233    pub fn path_and_query(&self) -> &str {
234        &self.path
235    }
236
237    /// retrieves the query component of the path, or an empty &str
238    ///
239    /// ```
240    /// # use trillium_testing::HttpTest;
241    /// let server = HttpTest::new(|conn| async move {
242    ///     let querystring = conn.querystring().to_string();
243    ///     conn.with_response_body(querystring).with_status(200)
244    /// });
245    ///
246    /// server
247    ///     .get("/some/path?and&a=query")
248    ///     .block()
249    ///     .assert_body("and&a=query");
250    ///
251    /// server.get("/some/path").block().assert_body("");
252    /// ```
253    pub fn querystring(&self) -> &str {
254        self.path
255            .split_once('?')
256            .map(|(_, query)| query)
257            .unwrap_or_default()
258    }
259
260    /// get the host for this conn, if it exists
261    pub fn host(&self) -> Option<&str> {
262        self.request_headers.get_str(Host)
263    }
264
265    /// set the host for this conn
266    pub fn set_host(&mut self, host: String) -> &mut Self {
267        self.request_headers.insert(Host, host);
268        self
269    }
270
271    /// Cancels and drops the future if reading from the transport results in an error or empty read
272    ///
273    /// The use of this method is not advised if your connected http client employs pipelining
274    /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
275    /// time
276    ///
277    /// If the client disconnects from the conn's transport, this function will return None. If the
278    /// future completes without disconnection, this future will return Some containing the output
279    /// of the future.
280    ///
281    /// Note that the inner future cannot borrow conn, so you will need to clone or take any
282    /// information needed to execute the future prior to executing this method.
283    ///
284    /// # Example
285    ///
286    /// ```rust
287    /// # use futures_lite::{AsyncRead, AsyncWrite};
288    /// # use trillium_http::{Conn, Method};
289    /// async fn something_slow_and_cancel_safe() -> String {
290    ///     String::from("this was not actually slow")
291    /// }
292    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
293    /// where
294    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
295    /// {
296    ///     let Some(returned_body) = conn
297    ///         .cancel_on_disconnect(async { something_slow_and_cancel_safe().await })
298    ///         .await
299    ///     else {
300    ///         return conn;
301    ///     };
302    ///     conn.with_response_body(returned_body).with_status(200)
303    /// }
304    /// ```
305    pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
306    where
307        Fut: Future + Send + 'a,
308    {
309        CancelOnDisconnect(self, pin!(fut)).await
310    }
311
312    /// Check if the transport is connected by attempting to read from the transport
313    ///
314    /// # Example
315    ///
316    /// This is best to use at appropriate points in a long-running handler, like:
317    ///
318    /// ```rust
319    /// # use futures_lite::{AsyncRead, AsyncWrite};
320    /// # use trillium_http::{Conn, Method};
321    /// # async fn something_slow_but_not_cancel_safe() {}
322    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
323    /// where
324    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
325    /// {
326    ///     for _ in 0..100 {
327    ///         if conn.is_disconnected().await {
328    ///             return conn;
329    ///         }
330    ///         something_slow_but_not_cancel_safe().await;
331    ///     }
332    ///     conn.with_status(200)
333    /// }
334    /// ```
335    pub async fn is_disconnected(&mut self) -> bool {
336        future::poll_once(LivenessFut::new(self)).await.is_some()
337    }
338
339    /// returns the [`encoding_rs::Encoding`] for this request, as determined from the mime-type
340    /// charset, if available
341    ///
342    /// ```
343    /// # use trillium_testing::HttpTest;
344    /// HttpTest::new(|mut conn| async move {
345    ///     assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
346    ///
347    ///     conn.request_headers_mut()
348    ///         .insert("content-type", "text/plain;charset=utf-16");
349    ///     assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
350    ///
351    ///     conn.with_status(200)
352    /// })
353    /// .get("/")
354    /// .block()
355    /// .assert_ok();
356    /// ```
357    pub fn request_encoding(&self) -> &'static Encoding {
358        encoding(&self.request_headers)
359    }
360
361    /// returns the [`encoding_rs::Encoding`] for this response, as
362    /// determined from the mime-type charset, if available
363    ///
364    /// ```
365    /// # use trillium_testing::HttpTest;
366    /// HttpTest::new(|mut conn| async move {
367    ///     assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
368    ///     conn.response_headers_mut()
369    ///         .insert("content-type", "text/plain;charset=utf-16");
370    ///
371    ///     assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
372    ///
373    ///     conn.with_status(200)
374    /// })
375    /// .get("/")
376    /// .block()
377    /// .assert_ok();
378    /// ```
379    pub fn response_encoding(&self) -> &'static Encoding {
380        encoding(&self.response_headers)
381    }
382
383    /// returns a [`ReceivedBody`] that references this conn. the conn
384    /// retains all data and holds the singular transport, but the
385    /// `ReceivedBody` provides an interface to read body content.
386    ///
387    /// If the request included an `Expect: 100-continue` header, the 100 Continue response is sent
388    /// lazily on the first read from the returned [`ReceivedBody`].
389    /// ```
390    /// # use trillium_testing::HttpTest;
391    /// let server = HttpTest::new(|mut conn| async move {
392    ///     let request_body = conn.request_body();
393    ///     assert_eq!(request_body.content_length(), Some(5));
394    ///     assert_eq!(request_body.read_string().await.unwrap(), "hello");
395    ///     conn.with_status(200)
396    /// });
397    ///
398    /// server.post("/").with_body("hello").block().assert_ok();
399    /// ```
400    pub fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
401        let needs_100_continue = self.needs_100_continue();
402        let body = self.build_request_body();
403        if needs_100_continue {
404            body.with_send_100_continue()
405        } else {
406            body
407        }
408    }
409
410    /// returns a clone of the [`swansong::Swansong`] for this Conn. use
411    /// this to gracefully stop long-running futures and streams
412    /// inside of handler functions
413    pub fn swansong(&self) -> Swansong {
414        self.h3_connection
415            .as_ref()
416            .map_or_else(|| self.context.swansong.clone(), |h| h.swansong().clone())
417    }
418
419    /// Registers a function to call after the http response has been
420    /// completely transferred. Please note that this is a sync function
421    /// and should be computationally lightweight. If your _application_
422    /// needs additional async processing, use your runtime's task spawn
423    /// within this hook.  If your _library_ needs additional async
424    /// processing in an `after_send` hook, please open an issue. This hook
425    /// is currently designed for simple instrumentation and logging, and
426    /// should be thought of as equivalent to a Drop hook.
427    pub fn after_send<F>(&mut self, after_send: F)
428    where
429        F: FnOnce(SendStatus) + Send + Sync + 'static,
430    {
431        self.after_send.append(after_send);
432    }
433
434    /// applies a mapping function from one transport to another. This
435    /// is particularly useful for boxing the transport. unless you're
436    /// sure this is what you're looking for, you probably don't want
437    /// to be using this
438    pub fn map_transport<NewTransport>(
439        self,
440        f: impl Fn(Transport) -> NewTransport,
441    ) -> Conn<NewTransport>
442    where
443        NewTransport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
444    {
445        Conn {
446            context: self.context,
447            request_headers: self.request_headers,
448            response_headers: self.response_headers,
449            method: self.method,
450            response_body: self.response_body,
451            path: self.path,
452            status: self.status,
453            version: self.version,
454            state: self.state,
455            transport: f(self.transport),
456            buffer: self.buffer,
457            request_body_state: self.request_body_state,
458            secure: self.secure,
459            after_send: self.after_send,
460            start_time: self.start_time,
461            peer_ip: self.peer_ip,
462            authority: self.authority,
463            scheme: self.scheme,
464            h3_connection: self.h3_connection,
465            protocol: self.protocol,
466            request_trailers: self.request_trailers,
467        }
468    }
469
470    /// whether this conn is suitable for an http upgrade to another protocol
471    pub fn should_upgrade(&self) -> bool {
472        (self.method() == Method::Connect && self.status == Some(Status::Ok))
473            || self.status == Some(Status::SwitchingProtocols)
474    }
475
476    #[doc(hidden)]
477    pub fn finalize_headers(&mut self) {
478        if self.version == Version::Http3 {
479            self.finalize_response_headers_h3();
480        } else {
481            self.finalize_response_headers_1x();
482        }
483    }
484}