trillium_http/conn.rs
1use crate::{
2 Body, Buffer, Headers, HttpContext,
3 KnownHeaderName::Host,
4 Method, ReceivedBody, Status, Swansong, TypeSet, Version,
5 after_send::{AfterSend, SendStatus},
6 h3::H3Connection,
7 liveness::{CancelOnDisconnect, LivenessFut},
8 received_body::ReceivedBodyState,
9 util::encoding,
10};
11use encoding_rs::Encoding;
12use futures_lite::{
13 future,
14 io::{AsyncRead, AsyncWrite},
15};
16use std::{
17 borrow::Cow,
18 fmt::{self, Debug, Formatter},
19 future::Future,
20 net::IpAddr,
21 pin::pin,
22 str,
23 sync::Arc,
24 time::Instant,
25};
26mod h1;
27mod h3;
28
29/// A http connection
30///
31/// Unlike in other rust http implementations, this struct represents both
32/// the request and the response, and holds the transport over which the
33/// response will be sent.
34#[derive(fieldwork::Fieldwork)]
35pub struct Conn<Transport> {
36 #[field(get)]
37 /// the shared [`HttpContext`]
38 pub(crate) context: Arc<HttpContext>,
39
40 /// request [headers](Headers)
41 #[field(get, get_mut)]
42 pub(crate) request_headers: Headers,
43
44 /// response [headers](Headers)
45 #[field(get, get_mut)]
46 pub(crate) response_headers: Headers,
47
48 pub(crate) path: Cow<'static, str>,
49
50 /// the http method for this conn's request
51 ///
52 /// ```
53 /// # use trillium_http::{Conn, Method};
54 /// let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
55 /// assert_eq!(conn.method(), Method::Get);
56 /// ```
57 #[field(get, set, copy)]
58 pub(crate) method: Method,
59
60 /// the http status for this conn, if set
61 #[field(get, copy)]
62 pub(crate) status: Option<Status>,
63
64 #[field(get = http_version, copy)]
65 /// the http version for this conn
66 pub(crate) version: Version,
67
68 /// the [state typemap](TypeSet) for this conn
69 #[field(get, get_mut)]
70 pub(crate) state: TypeSet,
71
72 /// the response [body](Body)
73 ///
74 /// ```
75 /// # use trillium_testing::HttpTest;
76 /// HttpTest::new(|conn| async move { conn.with_response_body("hello") })
77 /// .get("/")
78 /// .block()
79 /// .assert_body("hello");
80 ///
81 /// HttpTest::new(|conn| async move { conn.with_response_body(String::from("world")) })
82 /// .get("/")
83 /// .block()
84 /// .assert_body("world");
85 ///
86 /// HttpTest::new(|conn| async move { conn.with_response_body(vec![99, 97, 116]) })
87 /// .get("/")
88 /// .block()
89 /// .assert_body("cat");
90 /// ```
91 #[field(get, set, into, option_set_some, take, with)]
92 pub(crate) response_body: Option<Body>,
93
94 /// the transport
95 ///
96 /// This should only be used to call your own custom methods on the transport that do not read
97 /// or write any data. Calling any method that reads from or writes to the transport will
98 /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
99 /// use an HTTP upgrade.
100 #[field(get, get_mut)]
101 pub(crate) transport: Transport,
102
103 pub(crate) buffer: Buffer,
104
105 pub(crate) request_body_state: ReceivedBodyState,
106
107 pub(crate) after_send: AfterSend,
108
109 /// whether the connection is secure
110 ///
111 /// note that this does not necessarily indicate that the transport itself is secure, as it may
112 /// indicate that `trillium_http` is behind a trusted reverse proxy that has terminated tls and
113 /// provided appropriate headers to indicate this.
114 #[field(get, set, rename_predicates)]
115 pub(crate) secure: bool,
116
117 /// The [`Instant`] that the first header bytes for this conn were
118 /// received, before any processing or parsing has been performed.
119 #[field(get, copy)]
120 pub(crate) start_time: Instant,
121
122 /// The IP Address for the connection, if available
123 #[field(set, get, copy, into)]
124 pub(crate) peer_ip: Option<IpAddr>,
125
126 /// the :authority http/3 pseudo-header
127 #[field(set, get, into)]
128 pub(crate) authority: Option<Cow<'static, str>>,
129
130 /// the :scheme http/3 pseudo-header
131 #[field(set, get, into)]
132 pub(crate) scheme: Option<Cow<'static, str>>,
133
134 /// the [`H3Connection`] for this conn, if this is an HTTP/3 request
135 #[field(get(deref = false))]
136 pub(crate) h3_connection: Option<Arc<H3Connection>>,
137
138 /// the :protocol http/3 pseudo-header
139 #[field(set, get, into)]
140 pub(crate) protocol: Option<Cow<'static, str>>,
141
142 /// request trailers, populated after the request body has been fully read
143 #[field(get, get_mut)]
144 pub(crate) request_trailers: Option<Headers>,
145}
146
147impl<Transport> Debug for Conn<Transport> {
148 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
149 f.debug_struct("Conn")
150 .field("context", &self.context)
151 .field("request_headers", &self.request_headers)
152 .field("response_headers", &self.response_headers)
153 .field("path", &self.path)
154 .field("method", &self.method)
155 .field("status", &self.status)
156 .field("version", &self.version)
157 .field("state", &self.state)
158 .field("response_body", &self.response_body)
159 .field("transport", &format_args!(".."))
160 .field("buffer", &format_args!(".."))
161 .field("request_body_state", &self.request_body_state)
162 .field("secure", &self.secure)
163 .field("after_send", &format_args!(".."))
164 .field("start_time", &self.start_time)
165 .field("peer_ip", &self.peer_ip)
166 .field("authority", &self.authority)
167 .field("scheme", &self.scheme)
168 .field("protocol", &self.protocol)
169 .field("h3_connection", &self.h3_connection)
170 .field("request_trailers", &self.request_trailers)
171 .finish()
172 }
173}
174
175impl<Transport> Conn<Transport>
176where
177 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
178{
179 /// Returns the shared state on this conn, if set
180 pub fn shared_state(&self) -> &TypeSet {
181 &self.context.shared_state
182 }
183
184 /// sets the http status code from any `TryInto<Status>`.
185 ///
186 /// ```
187 /// # use trillium_http::Status;
188 /// # trillium_testing::HttpTest::new(|mut conn| async move {
189 /// assert!(conn.status().is_none());
190 ///
191 /// conn.set_status(200); // a status can be set as a u16
192 /// assert_eq!(conn.status().unwrap(), Status::Ok);
193 ///
194 /// conn.set_status(Status::ImATeapot); // or as a Status
195 /// assert_eq!(conn.status().unwrap(), Status::ImATeapot);
196 /// conn
197 /// # }).get("/").block().assert_status(Status::ImATeapot);
198 /// ```
199 pub fn set_status(&mut self, status: impl TryInto<Status>) -> &mut Self {
200 self.status = Some(status.try_into().unwrap_or_else(|_| {
201 log::error!("attempted to set an invalid status code");
202 Status::InternalServerError
203 }));
204 self
205 }
206
207 /// sets the http status code from any `TryInto<Status>`, returning Conn
208 #[must_use]
209 pub fn with_status(mut self, status: impl TryInto<Status>) -> Self {
210 self.set_status(status);
211 self
212 }
213
214 /// retrieves the path part of the request url, up to and excluding any query component
215 /// ```
216 /// # use trillium_testing::HttpTest;
217 /// HttpTest::new(|mut conn| async move {
218 /// assert_eq!(conn.path(), "/some/path");
219 /// conn.with_status(200)
220 /// })
221 /// .get("/some/path?and&a=query")
222 /// .block()
223 /// .assert_ok();
224 /// ```
225 pub fn path(&self) -> &str {
226 match self.path.split_once('?') {
227 Some((path, _)) => path,
228 None => &self.path,
229 }
230 }
231
232 /// retrieves the combined path and any query
233 pub fn path_and_query(&self) -> &str {
234 &self.path
235 }
236
237 /// retrieves the query component of the path, or an empty &str
238 ///
239 /// ```
240 /// # use trillium_testing::HttpTest;
241 /// let server = HttpTest::new(|conn| async move {
242 /// let querystring = conn.querystring().to_string();
243 /// conn.with_response_body(querystring).with_status(200)
244 /// });
245 ///
246 /// server
247 /// .get("/some/path?and&a=query")
248 /// .block()
249 /// .assert_body("and&a=query");
250 ///
251 /// server.get("/some/path").block().assert_body("");
252 /// ```
253 pub fn querystring(&self) -> &str {
254 self.path
255 .split_once('?')
256 .map(|(_, query)| query)
257 .unwrap_or_default()
258 }
259
260 /// get the host for this conn, if it exists
261 pub fn host(&self) -> Option<&str> {
262 self.request_headers.get_str(Host)
263 }
264
265 /// set the host for this conn
266 pub fn set_host(&mut self, host: String) -> &mut Self {
267 self.request_headers.insert(Host, host);
268 self
269 }
270
271 /// Cancels and drops the future if reading from the transport results in an error or empty read
272 ///
273 /// The use of this method is not advised if your connected http client employs pipelining
274 /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
275 /// time
276 ///
277 /// If the client disconnects from the conn's transport, this function will return None. If the
278 /// future completes without disconnection, this future will return Some containing the output
279 /// of the future.
280 ///
281 /// Note that the inner future cannot borrow conn, so you will need to clone or take any
282 /// information needed to execute the future prior to executing this method.
283 ///
284 /// # Example
285 ///
286 /// ```rust
287 /// # use futures_lite::{AsyncRead, AsyncWrite};
288 /// # use trillium_http::{Conn, Method};
289 /// async fn something_slow_and_cancel_safe() -> String {
290 /// String::from("this was not actually slow")
291 /// }
292 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
293 /// where
294 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
295 /// {
296 /// let Some(returned_body) = conn
297 /// .cancel_on_disconnect(async { something_slow_and_cancel_safe().await })
298 /// .await
299 /// else {
300 /// return conn;
301 /// };
302 /// conn.with_response_body(returned_body).with_status(200)
303 /// }
304 /// ```
305 pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
306 where
307 Fut: Future + Send + 'a,
308 {
309 CancelOnDisconnect(self, pin!(fut)).await
310 }
311
312 /// Check if the transport is connected by attempting to read from the transport
313 ///
314 /// # Example
315 ///
316 /// This is best to use at appropriate points in a long-running handler, like:
317 ///
318 /// ```rust
319 /// # use futures_lite::{AsyncRead, AsyncWrite};
320 /// # use trillium_http::{Conn, Method};
321 /// # async fn something_slow_but_not_cancel_safe() {}
322 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
323 /// where
324 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
325 /// {
326 /// for _ in 0..100 {
327 /// if conn.is_disconnected().await {
328 /// return conn;
329 /// }
330 /// something_slow_but_not_cancel_safe().await;
331 /// }
332 /// conn.with_status(200)
333 /// }
334 /// ```
335 pub async fn is_disconnected(&mut self) -> bool {
336 future::poll_once(LivenessFut::new(self)).await.is_some()
337 }
338
339 /// returns the [`encoding_rs::Encoding`] for this request, as determined from the mime-type
340 /// charset, if available
341 ///
342 /// ```
343 /// # use trillium_testing::HttpTest;
344 /// HttpTest::new(|mut conn| async move {
345 /// assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
346 ///
347 /// conn.request_headers_mut()
348 /// .insert("content-type", "text/plain;charset=utf-16");
349 /// assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
350 ///
351 /// conn.with_status(200)
352 /// })
353 /// .get("/")
354 /// .block()
355 /// .assert_ok();
356 /// ```
357 pub fn request_encoding(&self) -> &'static Encoding {
358 encoding(&self.request_headers)
359 }
360
361 /// returns the [`encoding_rs::Encoding`] for this response, as
362 /// determined from the mime-type charset, if available
363 ///
364 /// ```
365 /// # use trillium_testing::HttpTest;
366 /// HttpTest::new(|mut conn| async move {
367 /// assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
368 /// conn.response_headers_mut()
369 /// .insert("content-type", "text/plain;charset=utf-16");
370 ///
371 /// assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
372 ///
373 /// conn.with_status(200)
374 /// })
375 /// .get("/")
376 /// .block()
377 /// .assert_ok();
378 /// ```
379 pub fn response_encoding(&self) -> &'static Encoding {
380 encoding(&self.response_headers)
381 }
382
383 /// returns a [`ReceivedBody`] that references this conn. the conn
384 /// retains all data and holds the singular transport, but the
385 /// `ReceivedBody` provides an interface to read body content.
386 ///
387 /// If the request included an `Expect: 100-continue` header, the 100 Continue response is sent
388 /// lazily on the first read from the returned [`ReceivedBody`].
389 /// ```
390 /// # use trillium_testing::HttpTest;
391 /// let server = HttpTest::new(|mut conn| async move {
392 /// let request_body = conn.request_body();
393 /// assert_eq!(request_body.content_length(), Some(5));
394 /// assert_eq!(request_body.read_string().await.unwrap(), "hello");
395 /// conn.with_status(200)
396 /// });
397 ///
398 /// server.post("/").with_body("hello").block().assert_ok();
399 /// ```
400 pub fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
401 let needs_100_continue = self.needs_100_continue();
402 let body = self.build_request_body();
403 if needs_100_continue {
404 body.with_send_100_continue()
405 } else {
406 body
407 }
408 }
409
410 /// returns a clone of the [`swansong::Swansong`] for this Conn. use
411 /// this to gracefully stop long-running futures and streams
412 /// inside of handler functions
413 pub fn swansong(&self) -> Swansong {
414 self.h3_connection
415 .as_ref()
416 .map_or_else(|| self.context.swansong.clone(), |h| h.swansong().clone())
417 }
418
419 /// Registers a function to call after the http response has been
420 /// completely transferred. Please note that this is a sync function
421 /// and should be computationally lightweight. If your _application_
422 /// needs additional async processing, use your runtime's task spawn
423 /// within this hook. If your _library_ needs additional async
424 /// processing in an `after_send` hook, please open an issue. This hook
425 /// is currently designed for simple instrumentation and logging, and
426 /// should be thought of as equivalent to a Drop hook.
427 pub fn after_send<F>(&mut self, after_send: F)
428 where
429 F: FnOnce(SendStatus) + Send + Sync + 'static,
430 {
431 self.after_send.append(after_send);
432 }
433
434 /// applies a mapping function from one transport to another. This
435 /// is particularly useful for boxing the transport. unless you're
436 /// sure this is what you're looking for, you probably don't want
437 /// to be using this
438 pub fn map_transport<NewTransport>(
439 self,
440 f: impl Fn(Transport) -> NewTransport,
441 ) -> Conn<NewTransport>
442 where
443 NewTransport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
444 {
445 Conn {
446 context: self.context,
447 request_headers: self.request_headers,
448 response_headers: self.response_headers,
449 method: self.method,
450 response_body: self.response_body,
451 path: self.path,
452 status: self.status,
453 version: self.version,
454 state: self.state,
455 transport: f(self.transport),
456 buffer: self.buffer,
457 request_body_state: self.request_body_state,
458 secure: self.secure,
459 after_send: self.after_send,
460 start_time: self.start_time,
461 peer_ip: self.peer_ip,
462 authority: self.authority,
463 scheme: self.scheme,
464 h3_connection: self.h3_connection,
465 protocol: self.protocol,
466 request_trailers: self.request_trailers,
467 }
468 }
469
470 /// whether this conn is suitable for an http upgrade to another protocol
471 pub fn should_upgrade(&self) -> bool {
472 (self.method() == Method::Connect && self.status == Some(Status::Ok))
473 || self.status == Some(Status::SwitchingProtocols)
474 }
475
476 #[doc(hidden)]
477 pub fn finalize_headers(&mut self) {
478 if self.version == Version::Http3 {
479 self.finalize_response_headers_h3();
480 } else {
481 self.finalize_response_headers_1x();
482 }
483 }
484}