trillium_http/conn.rs
1use crate::{
2 Body, Buffer, Headers, HttpContext,
3 KnownHeaderName::Host,
4 Method, ProtocolSession, ReceivedBody, Status, Swansong, TypeSet, Version,
5 after_send::{AfterSend, SendStatus},
6 h2::H2Connection,
7 h3::H3Connection,
8 liveness::{CancelOnDisconnect, LivenessFut},
9 received_body::ReceivedBodyState,
10 util::encoding,
11};
12use encoding_rs::Encoding;
13use futures_lite::{
14 future,
15 io::{AsyncRead, AsyncWrite},
16};
17use std::{
18 borrow::Cow,
19 fmt::{self, Debug, Formatter},
20 future::Future,
21 net::IpAddr,
22 pin::pin,
23 str,
24 sync::Arc,
25 time::Instant,
26};
27mod h1;
28mod h2;
29mod h3;
30mod shared;
31pub(crate) use h1::{HeadError, write_headers_or_trailers};
32pub(crate) use h3::{H3FirstFrame, encode_field_section_h3};
33
34/// An HTTP connection.
35///
36/// This struct represents both the request and the response, and holds the
37/// transport over which the response will be sent.
38#[derive(fieldwork::Fieldwork)]
39pub struct Conn<Transport> {
40 #[field(get)]
41 /// the shared [`HttpContext`]
42 pub(crate) context: Arc<HttpContext>,
43
44 /// request [headers](Headers)
45 #[field(get, get_mut)]
46 pub(crate) request_headers: Headers,
47
48 /// response [headers](Headers)
49 #[field(get, get_mut)]
50 pub(crate) response_headers: Headers,
51
52 pub(crate) path: Cow<'static, str>,
53
54 /// the http method for this conn's request
55 ///
56 /// ```
57 /// # use trillium_http::{Conn, Method};
58 /// let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
59 /// assert_eq!(conn.method(), Method::Get);
60 /// ```
61 #[field(get, set, copy)]
62 pub(crate) method: Method,
63
64 /// the http status for this conn, if set
65 #[field(get, copy)]
66 pub(crate) status: Option<Status>,
67
68 /// The HTTP protocol version in use on this connection.
69 ///
70 /// ```
71 /// # use trillium_http::{Conn, Method, Version};
72 /// let conn = Conn::new_synthetic(Method::Get, "/", ());
73 /// assert_eq!(conn.http_version(), Version::Http1_1);
74 /// ```
75 #[field(get = http_version, copy)]
76 pub(crate) version: Version,
77
78 /// the [state typemap](TypeSet) for this conn
79 #[field(get, get_mut)]
80 pub(crate) state: TypeSet,
81
82 /// the response [body](Body)
83 ///
84 /// ```
85 /// # use trillium_testing::HttpTest;
86 /// HttpTest::new(|conn| async move { conn.with_response_body("hello") })
87 /// .get("/")
88 /// .block()
89 /// .assert_body("hello");
90 ///
91 /// HttpTest::new(|conn| async move { conn.with_response_body(String::from("world")) })
92 /// .get("/")
93 /// .block()
94 /// .assert_body("world");
95 ///
96 /// HttpTest::new(|conn| async move { conn.with_response_body(vec![99, 97, 116]) })
97 /// .get("/")
98 /// .block()
99 /// .assert_body("cat");
100 /// ```
101 #[field(get, set, into, option_set_some, take, with)]
102 pub(crate) response_body: Option<Body>,
103
104 /// the transport
105 ///
106 /// This should only be used to call your own custom methods on the transport that do not read
107 /// or write any data. Calling any method that reads from or writes to the transport will
108 /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
109 /// use an HTTP upgrade.
110 #[field(get, get_mut)]
111 pub(crate) transport: Transport,
112
113 pub(crate) buffer: Buffer,
114
115 pub(crate) request_body_state: ReceivedBodyState,
116
117 pub(crate) after_send: AfterSend,
118
119 /// whether the connection is secure
120 ///
121 /// note that this does not necessarily indicate that the transport itself is secure, as it may
122 /// indicate that `trillium_http` is behind a trusted reverse proxy that has terminated tls and
123 /// provided appropriate headers to indicate this.
124 #[field(get, set, rename_predicates)]
125 pub(crate) secure: bool,
126
127 /// The [`Instant`] that the first header bytes for this conn were
128 /// received, before any processing or parsing has been performed.
129 #[field(get, copy)]
130 pub(crate) start_time: Instant,
131
132 /// The IP Address for the connection, if available
133 #[field(set, get, copy, into)]
134 pub(crate) peer_ip: Option<IpAddr>,
135
136 /// the `:authority` pseudo-header
137 #[field(set, get, into)]
138 pub(crate) authority: Option<Cow<'static, str>>,
139
140 /// the `:scheme` pseudo-header
141 #[field(set, get, into)]
142 pub(crate) scheme: Option<Cow<'static, str>>,
143
144 /// the [`ProtocolSession`] for this conn — the per-protocol session state
145 /// (h2/h3 connection driver and stream id) bundled into a single enum so the
146 /// "set together" invariant is enforced at the type level. `Http1` for
147 /// h1 / synthetic conns.
148 pub(crate) protocol_session: ProtocolSession,
149
150 /// the `:protocol` pseudo-header (extended CONNECT)
151 #[field(set, get, into)]
152 pub(crate) protocol: Option<Cow<'static, str>>,
153
154 /// request trailers, populated after the request body has been fully read
155 #[field(get, get_mut)]
156 pub(crate) request_trailers: Option<Headers>,
157
158 /// Marker set via [`Conn::upgrade`].
159 pub(crate) upgrade: bool,
160}
161
162impl<Transport> Debug for Conn<Transport> {
163 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
164 f.debug_struct("Conn")
165 .field("context", &self.context)
166 .field("request_headers", &self.request_headers)
167 .field("response_headers", &self.response_headers)
168 .field("path", &self.path)
169 .field("method", &self.method)
170 .field("status", &self.status)
171 .field("version", &self.version)
172 .field("state", &self.state)
173 .field("response_body", &self.response_body)
174 .field("transport", &format_args!(".."))
175 .field("buffer", &format_args!(".."))
176 .field("request_body_state", &self.request_body_state)
177 .field("secure", &self.secure)
178 .field("after_send", &format_args!(".."))
179 .field("start_time", &self.start_time)
180 .field("peer_ip", &self.peer_ip)
181 .field("authority", &self.authority)
182 .field("scheme", &self.scheme)
183 .field("protocol", &self.protocol)
184 .field("protocol_session", &self.protocol_session)
185 .field("request_trailers", &self.request_trailers)
186 .field("upgrade", &self.upgrade)
187 .finish()
188 }
189}
190
191impl<Transport> Conn<Transport>
192where
193 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
194{
195 /// Returns the shared state typemap for this conn.
196 pub fn shared_state(&self) -> &TypeSet {
197 &self.context.shared_state
198 }
199
200 /// sets the http status code from any `TryInto<Status>`.
201 ///
202 /// ```
203 /// # use trillium_http::Status;
204 /// # trillium_testing::HttpTest::new(|mut conn| async move {
205 /// assert!(conn.status().is_none());
206 ///
207 /// conn.set_status(200); // a status can be set as a u16
208 /// assert_eq!(conn.status().unwrap(), Status::Ok);
209 ///
210 /// conn.set_status(Status::ImATeapot); // or as a Status
211 /// assert_eq!(conn.status().unwrap(), Status::ImATeapot);
212 /// conn
213 /// # }).get("/").block().assert_status(Status::ImATeapot);
214 /// ```
215 pub fn set_status(&mut self, status: impl TryInto<Status>) -> &mut Self {
216 self.status = Some(status.try_into().unwrap_or_else(|_| {
217 log::error!("attempted to set an invalid status code");
218 Status::InternalServerError
219 }));
220 self
221 }
222
223 /// sets the http status code from any `TryInto<Status>`, returning Conn
224 #[must_use]
225 pub fn with_status(mut self, status: impl TryInto<Status>) -> Self {
226 self.set_status(status);
227 self
228 }
229
230 /// The status to send on the wire: the explicitly-set status, or a
231 /// method-appropriate default when a handler left it unset. Unhandled
232 /// requests default to `404 Not Found`, except CONNECT, which defaults to
233 /// `501 Not Implemented`: an origin server implements no tunnel, and 404's
234 /// resource model does not apply to CONNECT's authority-form target.
235 pub(crate) fn response_status(&self) -> Status {
236 self.status.unwrap_or(match self.method {
237 Method::Connect => Status::NotImplemented,
238 _ => Status::NotFound,
239 })
240 }
241
242 /// retrieves the path part of the request url, up to and excluding any query component
243 /// ```
244 /// # use trillium_testing::HttpTest;
245 /// HttpTest::new(|mut conn| async move {
246 /// assert_eq!(conn.path(), "/some/path");
247 /// conn.with_status(200)
248 /// })
249 /// .get("/some/path?and&a=query")
250 /// .block()
251 /// .assert_ok();
252 /// ```
253 pub fn path(&self) -> &str {
254 match self.path.split_once('?') {
255 Some((path, _)) => path,
256 None => &self.path,
257 }
258 }
259
260 /// retrieves the combined path and any query
261 pub fn path_and_query(&self) -> &str {
262 &self.path
263 }
264
265 /// retrieves the query component of the path, or an empty &str
266 ///
267 /// ```
268 /// # use trillium_testing::HttpTest;
269 /// let server = HttpTest::new(|conn| async move {
270 /// let querystring = conn.querystring().to_string();
271 /// conn.with_response_body(querystring).with_status(200)
272 /// });
273 ///
274 /// server
275 /// .get("/some/path?and&a=query")
276 /// .block()
277 /// .assert_body("and&a=query");
278 ///
279 /// server.get("/some/path").block().assert_body("");
280 /// ```
281 pub fn querystring(&self) -> &str {
282 self.path
283 .split_once('?')
284 .map(|(_, query)| query)
285 .unwrap_or_default()
286 }
287
288 /// get the host for this conn, if it exists
289 pub fn host(&self) -> Option<&str> {
290 self.request_headers.get_str(Host)
291 }
292
293 /// set the host for this conn
294 pub fn set_host(&mut self, host: String) -> &mut Self {
295 self.request_headers.insert(Host, host);
296 self
297 }
298
299 /// Cancels and drops the future if reading from the transport results in an error or empty read
300 ///
301 /// The use of this method is not advised if your connected http client employs pipelining
302 /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
303 /// time
304 ///
305 /// If the client disconnects from the conn's transport, this function will return None. If the
306 /// future completes without disconnection, this future will return Some containing the output
307 /// of the future.
308 ///
309 /// Note that the inner future cannot borrow conn, so you will need to clone or take any
310 /// information needed to execute the future prior to executing this method.
311 ///
312 /// # Example
313 ///
314 /// ```rust
315 /// # use futures_lite::{AsyncRead, AsyncWrite};
316 /// # use trillium_http::{Conn, Method};
317 /// async fn something_slow_and_cancel_safe() -> String {
318 /// String::from("this was not actually slow")
319 /// }
320 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
321 /// where
322 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
323 /// {
324 /// let Some(returned_body) = conn
325 /// .cancel_on_disconnect(async { something_slow_and_cancel_safe().await })
326 /// .await
327 /// else {
328 /// return conn;
329 /// };
330 /// conn.with_response_body(returned_body).with_status(200)
331 /// }
332 /// ```
333 pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
334 where
335 Fut: Future + Send + 'a,
336 {
337 CancelOnDisconnect(self, pin!(fut)).await
338 }
339
340 /// Check if the transport is connected by attempting to read from the transport
341 ///
342 /// # Example
343 ///
344 /// This is best to use at appropriate points in a long-running handler, like:
345 ///
346 /// ```rust
347 /// # use futures_lite::{AsyncRead, AsyncWrite};
348 /// # use trillium_http::{Conn, Method};
349 /// # async fn something_slow_but_not_cancel_safe() {}
350 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
351 /// where
352 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
353 /// {
354 /// for _ in 0..100 {
355 /// if conn.is_disconnected().await {
356 /// return conn;
357 /// }
358 /// something_slow_but_not_cancel_safe().await;
359 /// }
360 /// conn.with_status(200)
361 /// }
362 /// ```
363 pub async fn is_disconnected(&mut self) -> bool {
364 future::poll_once(LivenessFut::new(self)).await.is_some()
365 }
366
367 /// returns the [`encoding_rs::Encoding`] for this request, as determined from the mime-type
368 /// charset, if available
369 ///
370 /// ```
371 /// # use trillium_testing::HttpTest;
372 /// HttpTest::new(|mut conn| async move {
373 /// assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
374 ///
375 /// conn.request_headers_mut()
376 /// .insert("content-type", "text/plain;charset=utf-16");
377 /// assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
378 ///
379 /// conn.with_status(200)
380 /// })
381 /// .get("/")
382 /// .block()
383 /// .assert_ok();
384 /// ```
385 pub fn request_encoding(&self) -> &'static Encoding {
386 encoding(&self.request_headers)
387 }
388
389 /// returns the [`encoding_rs::Encoding`] for this response, as
390 /// determined from the mime-type charset, if available
391 ///
392 /// ```
393 /// # use trillium_testing::HttpTest;
394 /// HttpTest::new(|mut conn| async move {
395 /// assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
396 /// conn.response_headers_mut()
397 /// .insert("content-type", "text/plain;charset=utf-16");
398 ///
399 /// assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
400 ///
401 /// conn.with_status(200)
402 /// })
403 /// .get("/")
404 /// .block()
405 /// .assert_ok();
406 /// ```
407 pub fn response_encoding(&self) -> &'static Encoding {
408 encoding(&self.response_headers)
409 }
410
411 /// returns a [`ReceivedBody`] that references this conn. the conn
412 /// retains all data and holds the singular transport, but the
413 /// `ReceivedBody` provides an interface to read body content.
414 ///
415 /// If the request included an `Expect: 100-continue` header, the 100 Continue response is sent
416 /// lazily on the first read from the returned [`ReceivedBody`].
417 /// ```
418 /// # use trillium_testing::HttpTest;
419 /// let server = HttpTest::new(|mut conn| async move {
420 /// let request_body = conn.request_body();
421 /// assert_eq!(request_body.content_length(), Some(5));
422 /// assert_eq!(request_body.read_string().await.unwrap(), "hello");
423 /// conn.with_status(200)
424 /// });
425 ///
426 /// server.post("/").with_body("hello").block().assert_ok();
427 /// ```
428 pub fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
429 let needs_100_continue = self.needs_100_continue();
430 let body = self.build_request_body();
431 if needs_100_continue {
432 body.with_send_100_continue()
433 } else {
434 body
435 }
436 }
437
438 /// returns a clone of the [`swansong::Swansong`] for this Conn. use
439 /// this to gracefully stop long-running futures and streams
440 /// inside of handler functions
441 pub fn swansong(&self) -> Swansong {
442 self.protocol_session
443 .h3_connection()
444 .map_or_else(|| self.context.swansong.clone(), |h| h.swansong().clone())
445 }
446
447 /// Registers a function to call after the http response has been
448 /// completely transferred.
449 ///
450 /// The callback is guaranteed to fire **exactly once** before the conn is
451 /// dropped. Either the codec's send path invokes it with the real outcome,
452 /// or — if the conn is dropped before send completes (handler panic,
453 /// transport error, mid-write disconnect) — the drop fallback invokes it
454 /// with a `SendStatus` whose `is_success()` returns false. Multiple
455 /// registrations on the same conn chain in registration order.
456 ///
457 /// Because firing is ordered by send-completion rather than handler return,
458 /// this is the right hook for instrumentation that wants to report what the
459 /// peer actually observed.
460 ///
461 /// This is a sync function and should be computationally lightweight. If
462 /// your _application_ needs additional async processing, use your runtime's
463 /// task spawn within this hook. If your _library_ needs additional async
464 /// processing in an `after_send` hook, please open an issue.
465 pub fn after_send<F>(&mut self, after_send: F)
466 where
467 F: FnOnce(SendStatus) + Send + Sync + 'static,
468 {
469 self.after_send.append(after_send);
470 }
471
472 /// applies a mapping function from one transport to another. This
473 /// is particularly useful for boxing the transport. unless you're
474 /// sure this is what you're looking for, you probably don't want
475 /// to be using this
476 pub fn map_transport<NewTransport>(
477 self,
478 f: impl Fn(Transport) -> NewTransport,
479 ) -> Conn<NewTransport>
480 where
481 NewTransport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
482 {
483 // Manual respread: rustc treats `Conn<Transport>` and `Conn<NewTransport>` as
484 // disjoint types and rejects `..self` without the unstable
485 // `type_changing_struct_update` feature. If a new field is added to `Conn`,
486 // update this respread, `Upgrade::map_transport`, and `From<Conn> for Upgrade`
487 // (`upgrade.rs`) — they share this drift hazard.
488 Conn {
489 context: self.context,
490 request_headers: self.request_headers,
491 response_headers: self.response_headers,
492 method: self.method,
493 response_body: self.response_body,
494 path: self.path,
495 status: self.status,
496 version: self.version,
497 state: self.state,
498 transport: f(self.transport),
499 buffer: self.buffer,
500 request_body_state: self.request_body_state,
501 secure: self.secure,
502 after_send: self.after_send,
503 start_time: self.start_time,
504 peer_ip: self.peer_ip,
505 authority: self.authority,
506 scheme: self.scheme,
507 protocol: self.protocol,
508 protocol_session: self.protocol_session,
509 request_trailers: self.request_trailers,
510 upgrade: self.upgrade,
511 }
512 }
513
514 /// whether this conn is suitable for an http upgrade to another protocol
515 pub fn should_upgrade(&self) -> bool {
516 self.upgrade
517 || (self.method() == Method::Connect && self.status == Some(Status::Ok))
518 || self.status == Some(Status::SwitchingProtocols)
519 }
520
521 /// Mark this conn to be handed off as an upgrade once the response headers are sent.
522 /// Set the response status (typically `200`) and any headers describing the upgraded
523 /// byte stream before calling; the handler's `upgrade` method receives an [`Upgrade`]
524 /// with per-protocol framing applied on its `AsyncRead`/`AsyncWrite`.
525 #[doc(hidden)]
526 #[must_use]
527 pub fn upgrade(mut self) -> Self {
528 self.upgrade = true;
529 self
530 }
531
532 #[doc(hidden)]
533 pub fn finalize_headers(&mut self) {
534 if self.version == Version::Http3 {
535 self.finalize_response_headers_h3();
536 } else {
537 self.finalize_response_headers_1x();
538 }
539 }
540
541 /// the [`H2Connection`] driver for this conn, if this is an HTTP/2 request
542 pub fn h2_connection(&self) -> Option<&Arc<H2Connection>> {
543 self.protocol_session.h2_connection()
544 }
545
546 /// the h2 stream id for this conn, if this is an HTTP/2 request
547 pub fn h2_stream_id(&self) -> Option<u32> {
548 self.protocol_session.h2_stream_id()
549 }
550
551 /// the [`H3Connection`] driver for this conn, if this is an HTTP/3 request
552 pub fn h3_connection(&self) -> Option<&Arc<H3Connection>> {
553 self.protocol_session.h3_connection()
554 }
555
556 /// the h3 stream id for this conn, if this is an HTTP/3 request
557 pub fn h3_stream_id(&self) -> Option<u64> {
558 self.protocol_session.h3_stream_id()
559 }
560}