trillium_http/conn.rs
1use crate::{
2 after_send::{AfterSend, SendStatus},
3 copy,
4 http_config::DEFAULT_CONFIG,
5 liveness::{CancelOnDisconnect, LivenessFut},
6 received_body::ReceivedBodyState,
7 util::encoding,
8 Body, BufWriter, Buffer, ConnectionStatus, Error, HeaderName, HeaderValue, Headers, HttpConfig,
9 KnownHeaderName::{Connection, ContentLength, Date, Expect, Host, Server, TransferEncoding},
10 Method, ReceivedBody, Result, StateSet, Status, Stopper, Upgrade, Version,
11};
12use encoding_rs::Encoding;
13use futures_lite::{
14 future,
15 io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
16};
17use httparse::{Request, EMPTY_HEADER};
18use memchr::memmem::Finder;
19use std::{
20 fmt::{self, Debug, Formatter},
21 future::Future,
22 net::IpAddr,
23 pin::pin,
24 str::FromStr,
25 time::{Instant, SystemTime},
26};
27
28/// Default Server header
29pub const SERVER: &str = concat!("trillium/", env!("CARGO_PKG_VERSION"));
30
31/** A http connection
32
33Unlike in other rust http implementations, this struct represents both
34the request and the response, and holds the transport over which the
35response will be sent.
36*/
37pub struct Conn<Transport> {
38 pub(crate) request_headers: Headers,
39 pub(crate) response_headers: Headers,
40 pub(crate) path: String,
41 pub(crate) method: Method,
42 pub(crate) status: Option<Status>,
43 pub(crate) version: Version,
44 pub(crate) state: StateSet,
45 pub(crate) response_body: Option<Body>,
46 pub(crate) transport: Transport,
47 pub(crate) buffer: Buffer,
48 pub(crate) request_body_state: ReceivedBodyState,
49 pub(crate) secure: bool,
50 pub(crate) stopper: Stopper,
51 pub(crate) after_send: AfterSend,
52 pub(crate) start_time: Instant,
53 pub(crate) peer_ip: Option<IpAddr>,
54 pub(crate) http_config: HttpConfig,
55}
56
57impl<Transport> Debug for Conn<Transport> {
58 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
59 f.debug_struct("Conn")
60 .field("http_config", &self.http_config)
61 .field("request_headers", &self.request_headers)
62 .field("response_headers", &self.response_headers)
63 .field("path", &self.path)
64 .field("method", &self.method)
65 .field("status", &self.status)
66 .field("version", &self.version)
67 .field("state", &self.state)
68 .field("response_body", &self.response_body)
69 .field("transport", &"..")
70 .field("buffer", &"..")
71 .field("request_body_state", &self.request_body_state)
72 .field("secure", &self.secure)
73 .field("stopper", &self.stopper)
74 .field("after_send", &"..")
75 .field("start_time", &self.start_time)
76 .field("peer_ip", &self.peer_ip)
77 .finish()
78 }
79}
80
81impl<Transport> Conn<Transport>
82where
83 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
84{
85 /// read any number of new `Conn`s from the transport and call the
86 /// provided handler function until either the connection is closed or
87 /// an upgrade is requested. A return value of Ok(None) indicates a
88 /// closed connection, while a return value of Ok(Some(upgrade))
89 /// represents an upgrade.
90 ///
91 /// Provides a default [`HttpConfig`]
92 ///
93 /// See the documentation for [`Conn`] for a full example.
94 ///
95 /// # Errors
96 ///
97 /// This will return an error variant if:
98 ///
99 /// * there is an io error when reading from the underlying transport
100 /// * headers are too long
101 /// * we are unable to parse some aspect of the request
102 /// * the request is an unsupported http version
103 /// * we cannot make sense of the headers, such as if there is a
104 /// `content-length` header as well as a `transfer-encoding: chunked`
105 /// header.
106
107 pub async fn map<F, Fut>(
108 transport: Transport,
109 stopper: Stopper,
110 handler: F,
111 ) -> Result<Option<Upgrade<Transport>>>
112 where
113 F: FnMut(Conn<Transport>) -> Fut,
114 Fut: Future<Output = Conn<Transport>> + Send,
115 {
116 Self::map_with_config(DEFAULT_CONFIG, transport, stopper, handler).await
117 }
118
119 /// read any number of new `Conn`s from the transport and call the
120 /// provided handler function until either the connection is closed or
121 /// an upgrade is requested. A return value of Ok(None) indicates a
122 /// closed connection, while a return value of Ok(Some(upgrade))
123 /// represents an upgrade.
124 ///
125 /// See the documentation for [`Conn`] for a full example.
126 ///
127 /// # Errors
128 ///
129 /// This will return an error variant if:
130 ///
131 /// * there is an io error when reading from the underlying transport
132 /// * headers are too long
133 /// * we are unable to parse some aspect of the request
134 /// * the request is an unsupported http version
135 /// * we cannot make sense of the headers, such as if there is a
136 /// `content-length` header as well as a `transfer-encoding: chunked`
137 /// header.
138 pub async fn map_with_config<F, Fut>(
139 http_config: HttpConfig,
140 transport: Transport,
141 stopper: Stopper,
142 mut handler: F,
143 ) -> Result<Option<Upgrade<Transport>>>
144 where
145 F: FnMut(Conn<Transport>) -> Fut,
146 Fut: Future<Output = Conn<Transport>> + Send,
147 {
148 let mut conn = Conn::new_with_config(
149 http_config,
150 transport,
151 Vec::with_capacity(http_config.request_buffer_initial_len).into(),
152 stopper,
153 )
154 .await?;
155 loop {
156 conn = match handler(conn).await.send().await? {
157 ConnectionStatus::Upgrade(upgrade) => return Ok(Some(upgrade)),
158 ConnectionStatus::Close => return Ok(None),
159 ConnectionStatus::Conn(next) => next,
160 }
161 }
162 }
163
164 async fn send(mut self) -> Result<ConnectionStatus<Transport>> {
165 let mut output_buffer = Vec::with_capacity(self.http_config.response_buffer_len);
166 self.write_headers(&mut output_buffer)?;
167
168 let mut bufwriter = BufWriter::new_with_buffer(output_buffer, &mut self.transport);
169
170 if self.method != Method::Head
171 && !matches!(self.status, Some(Status::NotModified | Status::NoContent))
172 {
173 if let Some(body) = self.response_body.take() {
174 copy(body, &mut bufwriter, self.http_config.copy_loops_per_yield).await?;
175 }
176 }
177
178 bufwriter.flush().await?;
179 self.after_send.call(true.into());
180 self.finish().await
181 }
182
183 /// returns a read-only reference to the [state
184 /// typemap](StateSet) for this conn
185 ///
186 /// stability note: this is not unlikely to be removed at some
187 /// point, as this may end up being more of a trillium concern
188 /// than a `trillium_http` concern
189 pub fn state(&self) -> &StateSet {
190 &self.state
191 }
192
193 /// returns a mutable reference to the [state
194 /// typemap](StateSet) for this conn
195 ///
196 /// stability note: this is not unlikely to be removed at some
197 /// point, as this may end up being more of a trillium concern
198 /// than a `trillium_http` concern
199 pub fn state_mut(&mut self) -> &mut StateSet {
200 &mut self.state
201 }
202
203 /// returns a reference to the request headers
204 pub fn request_headers(&self) -> &Headers {
205 &self.request_headers
206 }
207
208 /// returns a mutable reference to the response [headers](Headers)
209 pub fn request_headers_mut(&mut self) -> &mut Headers {
210 &mut self.request_headers
211 }
212
213 /// returns a mutable reference to the response [headers](Headers)
214 pub fn response_headers_mut(&mut self) -> &mut Headers {
215 &mut self.response_headers
216 }
217
218 /// returns a reference to the response [headers](Headers)
219 pub fn response_headers(&self) -> &Headers {
220 &self.response_headers
221 }
222
223 /** sets the http status code from any `TryInto<Status>`.
224
225 ```
226 # use trillium_http::{Conn, Method, Status};
227 # let mut conn = Conn::new_synthetic(Method::Get, "/", ());
228 assert!(conn.status().is_none());
229
230 conn.set_status(200); // a status can be set as a u16
231 assert_eq!(conn.status().unwrap(), Status::Ok);
232
233 conn.set_status(Status::ImATeapot); // or as a Status
234 assert_eq!(conn.status().unwrap(), Status::ImATeapot);
235 ```
236 */
237 pub fn set_status(&mut self, status: impl TryInto<Status>) {
238 self.status = Some(status.try_into().unwrap_or_else(|_| {
239 log::error!("attempted to set an invalid status code");
240 Status::InternalServerError
241 }));
242 }
243
244 /// retrieves the current response status code for this conn, if
245 /// it has been set. See [`Conn::set_status`] for example usage.
246 pub fn status(&self) -> Option<Status> {
247 self.status
248 }
249
250 /**
251 retrieves the path part of the request url, up to and excluding any query component
252 ```
253 # use trillium_http::{Conn, Method};
254 let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
255 assert_eq!(conn.path(), "/some/path");
256 ```
257 */
258 pub fn path(&self) -> &str {
259 match self.path.split_once('?') {
260 Some((path, _)) => path,
261 None => &self.path,
262 }
263 }
264
265 /// retrieves the combined path and any query
266 pub fn path_and_query(&self) -> &str {
267 &self.path
268 }
269
270 /**
271 retrieves the query component of the path
272 ```
273 # use trillium_http::{Conn, Method};
274 let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
275 assert_eq!(conn.querystring(), "and&a=query");
276
277 let mut conn = Conn::new_synthetic(Method::Get, "/some/path", ());
278 assert_eq!(conn.querystring(), "");
279
280 ```
281 */
282 pub fn querystring(&self) -> &str {
283 match self.path.split_once('?') {
284 Some((_, query)) => query,
285 None => "",
286 }
287 }
288
289 /// get the host for this conn, if it exists
290 pub fn host(&self) -> Option<&str> {
291 self.request_headers.get_str(Host)
292 }
293
294 /// set the host for this conn
295 pub fn set_host(&mut self, host: String) {
296 self.request_headers.insert(Host, host);
297 }
298
299 // pub fn url(&self) -> Result<Url> {
300 // let path = self.path();
301 // let host = self.host().unwrap_or_else(|| String::from("_"));
302 // let method = self.method();
303 // if path.starts_with("http://") || path.starts_with("https://") {
304 // Ok(Url::parse(path)?)
305 // } else if path.starts_with('/') {
306 // Ok(Url::parse(&format!("http://{}{}", host, path))?)
307 // } else if method == &Method::Connect {
308 // Ok(Url::parse(&format!("http://{}/", path))?)
309 // } else {
310 // Err(Error::UnexpectedUriFormat)
311 // }
312 // }
313
314 /**
315 Sets the response body to anything that is [`impl Into<Body>`][Body].
316
317 ```
318 # use trillium_http::{Conn, Method, Body};
319 # let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
320 conn.set_response_body("hello");
321 conn.set_response_body(String::from("hello"));
322 conn.set_response_body(vec![99, 97, 116]);
323 ```
324 */
325 pub fn set_response_body(&mut self, body: impl Into<Body>) {
326 self.response_body = Some(body.into());
327 }
328
329 /// returns a reference to the current response body, if it has been set
330 pub fn response_body(&self) -> Option<&Body> {
331 self.response_body.as_ref()
332 }
333
334 /**
335 remove the response body from this conn and return it
336
337 ```
338 # use trillium_http::{Conn, Method};
339 # let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
340 assert!(conn.response_body().is_none());
341 conn.set_response_body("hello");
342 assert!(conn.response_body().is_some());
343 let body = conn.take_response_body();
344 assert!(body.is_some());
345 assert!(conn.response_body().is_none());
346 ```
347 */
348 pub fn take_response_body(&mut self) -> Option<Body> {
349 self.response_body.take()
350 }
351
352 /**
353 returns the http method for this conn's request.
354 ```
355 # use trillium_http::{Conn, Method};
356 let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
357 assert_eq!(conn.method(), Method::Get);
358 ```
359 */
360 pub fn method(&self) -> Method {
361 self.method
362 }
363
364 /**
365 overrides the http method for this conn
366 */
367 pub fn set_method(&mut self, method: Method) {
368 self.method = method;
369 }
370
371 /**
372 returns the http version for this conn.
373 */
374 pub fn http_version(&self) -> Version {
375 self.version
376 }
377
378 /// Cancels and drops the future if reading from the transport results in an error or empty read
379 ///
380 /// The use of this method is not advised if your connected http client employs pipelining
381 /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
382 /// time
383 ///
384 /// If the client disconnects from the conn's transport, this function will return None. If the
385 /// future completes without disconnection, this future will return Some containing the output
386 /// of the future.
387 ///
388 /// The use of this method is not advised if your connected http client employs pipelining
389 /// (rarely seen in the wild), as it will buffer an unbounded number of requests
390 ///
391 /// Note that the inner future cannot borrow conn, so you will need to clone or take any
392 /// information needed to execute the future prior to executing this method.
393 ///
394 /// # Example
395 ///
396 /// ```rust
397 /// # use futures_lite::{AsyncRead, AsyncWrite};
398 /// # use trillium_http::{Conn, Method};
399 /// async fn something_slow_and_cancel_safe() -> String { String::from("this was not actually slow") }
400 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
401 /// where
402 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static
403 /// {
404 /// let Some(returned_body) = conn.cancel_on_disconnect(async {
405 /// something_slow_and_cancel_safe().await
406 /// }).await else { return conn; };
407 /// conn.set_response_body(returned_body);
408 /// conn.set_status(200);
409 /// conn
410 /// }
411 /// ```
412 pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
413 where
414 Fut: Future + Send + 'a,
415 {
416 CancelOnDisconnect(self, pin!(fut)).await
417 }
418
419 /// Check if the transport is connected by attempting to read from the transport
420 ///
421 /// # Example
422 ///
423 /// This is best to use at appropriate points in a long-running handler, like:
424 ///
425 /// ```rust
426 /// # use futures_lite::{AsyncRead, AsyncWrite};
427 /// # use trillium_http::{Conn, Method};
428 /// # async fn something_slow_but_not_cancel_safe() {}
429 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
430 /// where
431 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static
432 /// {
433 /// for _ in 0..100 {
434 /// if conn.is_disconnected().await {
435 /// return conn;
436 /// }
437 /// something_slow_but_not_cancel_safe().await;
438 /// }
439 /// conn.set_status(200);
440 /// conn
441 /// }
442 /// ```
443 pub async fn is_disconnected(&mut self) -> bool {
444 future::poll_once(LivenessFut::new(self)).await.is_some()
445 }
446
447 fn needs_100_continue(&self) -> bool {
448 self.request_body_state == ReceivedBodyState::Start
449 && self.version != Version::Http1_0
450 && self
451 .request_headers
452 .eq_ignore_ascii_case(Expect, "100-continue")
453 }
454
455 #[allow(clippy::needless_borrow, clippy::needless_borrows_for_generic_args)]
456 fn build_request_body(&mut self) -> ReceivedBody<'_, Transport> {
457 ReceivedBody::new_with_config(
458 self.request_content_length().ok().flatten(),
459 &mut self.buffer,
460 &mut self.transport,
461 &mut self.request_body_state,
462 None,
463 encoding(&self.request_headers),
464 &self.http_config,
465 )
466 }
467
468 /**
469 returns the [`encoding_rs::Encoding`] for this request, as
470 determined from the mime-type charset, if available
471
472 ```
473 # use trillium_http::{Conn, Method};
474 let mut conn = Conn::new_synthetic(Method::Get, "/", ());
475 assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
476 conn.request_headers_mut().insert("content-type", "text/plain;charset=utf-16");
477 assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
478 ```
479 */
480 pub fn request_encoding(&self) -> &'static Encoding {
481 encoding(&self.request_headers)
482 }
483
484 /**
485 returns the [`encoding_rs::Encoding`] for this response, as
486 determined from the mime-type charset, if available
487
488 ```
489 # use trillium_http::{Conn, Method};
490 let mut conn = Conn::new_synthetic(Method::Get, "/", ());
491 assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
492 conn.response_headers_mut().insert("content-type", "text/plain;charset=utf-16");
493 assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
494 ```
495 */
496 pub fn response_encoding(&self) -> &'static Encoding {
497 encoding(&self.response_headers)
498 }
499
500 /**
501 returns a [`ReceivedBody`] that references this conn. the conn
502 retains all data and holds the singular transport, but the
503 `ReceivedBody` provides an interface to read body content
504 ```
505 # async_io::block_on(async {
506 # use trillium_http::{Conn, Method};
507 let mut conn = Conn::new_synthetic(Method::Get, "/", "hello");
508 let request_body = conn.request_body().await;
509 assert_eq!(request_body.content_length(), Some(5));
510 assert_eq!(request_body.read_string().await.unwrap(), "hello");
511 # });
512 ```
513 */
514 pub async fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
515 if self.needs_100_continue() {
516 self.send_100_continue().await.ok();
517 }
518
519 self.build_request_body()
520 }
521
522 /// returns a clone of the [`stopper::Stopper`] for this Conn. use
523 /// this to gracefully stop long-running futures and streams
524 /// inside of handler functions
525 pub fn stopper(&self) -> Stopper {
526 self.stopper.clone()
527 }
528
529 fn validate_headers(request_headers: &Headers) -> Result<()> {
530 let content_length = request_headers.has_header(ContentLength);
531 let transfer_encoding_chunked =
532 request_headers.eq_ignore_ascii_case(TransferEncoding, "chunked");
533
534 if content_length && transfer_encoding_chunked {
535 Err(Error::UnexpectedHeader("content-length"))
536 } else {
537 Ok(())
538 }
539 }
540
541 /// # Create a new `Conn`
542 ///
543 /// This function creates a new conn from the provided
544 /// [`Transport`][crate::transport::Transport], as well as any
545 /// bytes that have already been read from the transport, and a
546 /// [`Stopper`] instance that will be used to signal graceful
547 /// shutdown.
548 ///
549 /// # Errors
550 ///
551 /// This will return an error variant if:
552 ///
553 /// * there is an io error when reading from the underlying transport
554 /// * headers are too long
555 /// * we are unable to parse some aspect of the request
556 /// * the request is an unsupported http version
557 /// * we cannot make sense of the headers, such as if there is a
558 /// `content-length` header as well as a `transfer-encoding: chunked`
559 /// header.
560 pub async fn new(transport: Transport, bytes: Vec<u8>, stopper: Stopper) -> Result<Self> {
561 Self::new_with_config(DEFAULT_CONFIG, transport, bytes.into(), stopper).await
562 }
563
564 /// # Create a new `Conn`
565 ///
566 /// This function creates a new conn from the provided
567 /// [`Transport`][crate::transport::Transport], as well as any
568 /// bytes that have already been read from the transport, and a
569 /// [`Stopper`] instance that will be used to signal graceful
570 /// shutdown.
571 ///
572 /// # Errors
573 ///
574 /// This will return an error variant if:
575 ///
576 /// * there is an io error when reading from the underlying transport
577 /// * headers are too long
578 /// * we are unable to parse some aspect of the request
579 /// * the request is an unsupported http version
580 /// * we cannot make sense of the headers, such as if there is a
581 /// `content-length` header as well as a `transfer-encoding: chunked`
582 /// header.
583 async fn new_with_config(
584 http_config: HttpConfig,
585 mut transport: Transport,
586 mut buffer: Buffer,
587 stopper: Stopper,
588 ) -> Result<Self> {
589 let (head_size, start_time) =
590 Self::head(&mut transport, &mut buffer, &stopper, &http_config).await?;
591
592 let mut headers = vec![EMPTY_HEADER; http_config.max_headers];
593 let mut httparse_req = Request::new(&mut headers);
594
595 let status = httparse_req.parse(&buffer[..])?;
596 if status.is_partial() {
597 return Err(Error::PartialHead);
598 }
599
600 let method = match httparse_req.method {
601 Some(method) => match method.parse() {
602 Ok(method) => method,
603 Err(_) => return Err(Error::UnrecognizedMethod(method.to_string())),
604 },
605 None => return Err(Error::MissingMethod),
606 };
607
608 let version = match httparse_req.version {
609 Some(0) => Version::Http1_0,
610 Some(1) => Version::Http1_1,
611 Some(version) => return Err(Error::UnsupportedVersion(version)),
612 None => return Err(Error::MissingVersion),
613 };
614
615 let mut request_headers = Headers::with_capacity(httparse_req.headers.len());
616 for header in httparse_req.headers {
617 let header_name = HeaderName::from_str(header.name)?;
618 let header_value = HeaderValue::from(header.value.to_owned());
619 request_headers.append(header_name, header_value);
620 }
621
622 Self::validate_headers(&request_headers)?;
623
624 let path = httparse_req
625 .path
626 .ok_or(Error::RequestPathMissing)?
627 .to_owned();
628 log::trace!("received:\n{method} {path} {version}\n{request_headers}");
629
630 let mut response_headers =
631 Headers::with_capacity(http_config.response_header_initial_capacity);
632 response_headers.insert(Server, SERVER);
633
634 buffer.ignore_front(head_size);
635
636 Ok(Self {
637 transport,
638 request_headers,
639 method,
640 version,
641 path,
642 buffer,
643 response_headers,
644 status: None,
645 state: StateSet::new(),
646 response_body: None,
647 request_body_state: ReceivedBodyState::Start,
648 secure: false,
649 stopper,
650 after_send: AfterSend::default(),
651 start_time,
652 peer_ip: None,
653 http_config,
654 })
655 }
656
657 /// predicate function to indicate whether the connection is
658 /// secure. note that this does not necessarily indicate that the
659 /// transport itself is secure, as it may indicate that
660 /// `trillium_http` is behind a trusted reverse proxy that has
661 /// terminated tls and provided appropriate headers to indicate
662 /// this.
663 pub fn is_secure(&self) -> bool {
664 self.secure
665 }
666
667 /// set whether the connection should be considered secure. note
668 /// that this does not necessarily indicate that the transport
669 /// itself is secure, as it may indicate that `trillium_http` is
670 /// behind a trusted reverse proxy that has terminated tls and
671 /// provided appropriate headers to indicate this.
672 pub fn set_secure(&mut self, secure: bool) {
673 self.secure = secure;
674 }
675
676 /**
677 calculates any auto-generated headers for this conn prior to sending it
678 */
679 pub fn finalize_headers(&mut self) {
680 if self.status == Some(Status::SwitchingProtocols) {
681 return;
682 }
683
684 self.response_headers
685 .try_insert_with(Date, || httpdate::fmt_http_date(SystemTime::now()));
686
687 if !matches!(self.status, Some(Status::NotModified | Status::NoContent)) {
688 if let Some(len) = self.body_len() {
689 self.response_headers
690 .try_insert(ContentLength, len.to_string());
691 }
692
693 if self.version == Version::Http1_1 && !self.response_headers.has_header(ContentLength)
694 {
695 self.response_headers.insert(TransferEncoding, "chunked");
696 } else {
697 self.response_headers.remove(TransferEncoding);
698 }
699 }
700
701 if self.stopper.is_stopped() {
702 self.response_headers.insert(Connection, "close");
703 }
704 }
705
706 /**
707 Registers a function to call after the http response has been
708 completely transferred. Please note that this is a sync function
709 and should be computationally lightweight. If your _application_
710 needs additional async processing, use your runtime's task spawn
711 within this hook. If your _library_ needs additional async
712 processing in an `after_send` hook, please open an issue. This hook
713 is currently designed for simple instrumentation and logging, and
714 should be thought of as equivalent to a Drop hook.
715 */
716 pub fn after_send<F>(&mut self, after_send: F)
717 where
718 F: FnOnce(SendStatus) + Send + Sync + 'static,
719 {
720 self.after_send.append(after_send);
721 }
722
723 /// The [`Instant`] that the first header bytes for this conn were
724 /// received, before any processing or parsing has been performed.
725 pub fn start_time(&self) -> Instant {
726 self.start_time
727 }
728
729 async fn send_100_continue(&mut self) -> Result<()> {
730 log::trace!("sending 100-continue");
731 Ok(self
732 .transport
733 .write_all(b"HTTP/1.1 100 Continue\r\n\r\n")
734 .await?)
735 }
736
737 async fn head(
738 transport: &mut Transport,
739 buf: &mut Buffer,
740 stopper: &Stopper,
741 http_config: &HttpConfig,
742 ) -> Result<(usize, Instant)> {
743 let mut len = 0;
744 let mut start_with_read = buf.is_empty();
745 let mut instant = None;
746 let finder = Finder::new(b"\r\n\r\n");
747 loop {
748 if len >= http_config.head_max_len {
749 return Err(Error::HeadersTooLong);
750 }
751
752 let bytes = if start_with_read {
753 buf.expand();
754 if len == 0 {
755 stopper
756 .stop_future(transport.read(buf))
757 .await
758 .ok_or(Error::Closed)??
759 } else {
760 transport.read(&mut buf[len..]).await?
761 }
762 } else {
763 start_with_read = true;
764 buf.len()
765 };
766
767 if instant.is_none() {
768 instant = Some(Instant::now());
769 }
770
771 let search_start = len.max(3) - 3;
772 let search = finder.find(&buf[search_start..]);
773
774 if let Some(index) = search {
775 buf.truncate(len + bytes);
776 return Ok((search_start + index + 4, instant.unwrap()));
777 }
778
779 len += bytes;
780
781 if bytes == 0 {
782 return if len == 0 {
783 Err(Error::Closed)
784 } else {
785 Err(Error::PartialHead)
786 };
787 }
788 }
789 }
790
791 async fn next(mut self) -> Result<Self> {
792 if !self.needs_100_continue() || self.request_body_state != ReceivedBodyState::Start {
793 self.build_request_body().drain().await?;
794 }
795 Conn::new_with_config(self.http_config, self.transport, self.buffer, self.stopper).await
796 }
797
798 fn should_close(&self) -> bool {
799 let request_connection = self.request_headers.get_lower(Connection);
800 let response_connection = self.response_headers.get_lower(Connection);
801
802 match (
803 request_connection.as_deref(),
804 response_connection.as_deref(),
805 ) {
806 (Some("keep-alive"), Some("keep-alive")) => false,
807 (Some("close"), _) | (_, Some("close")) => true,
808 _ => self.version == Version::Http1_0,
809 }
810 }
811
812 fn should_upgrade(&self) -> bool {
813 (self.method() == Method::Connect && self.status == Some(Status::Ok))
814 || self.status == Some(Status::SwitchingProtocols)
815 }
816
817 async fn finish(self) -> Result<ConnectionStatus<Transport>> {
818 if self.should_close() {
819 Ok(ConnectionStatus::Close)
820 } else if self.should_upgrade() {
821 Ok(ConnectionStatus::Upgrade(self.into()))
822 } else {
823 match self.next().await {
824 Err(Error::Closed) => {
825 log::trace!("connection closed by client");
826 Ok(ConnectionStatus::Close)
827 }
828 Err(e) => Err(e),
829 Ok(conn) => Ok(ConnectionStatus::Conn(conn)),
830 }
831 }
832 }
833
834 fn request_content_length(&self) -> Result<Option<u64>> {
835 if self
836 .request_headers
837 .eq_ignore_ascii_case(TransferEncoding, "chunked")
838 {
839 Ok(None)
840 } else if let Some(cl) = self.request_headers.get_str(ContentLength) {
841 cl.parse()
842 .map(Some)
843 .map_err(|_| Error::MalformedHeader("content-length".into()))
844 } else {
845 Ok(Some(0))
846 }
847 }
848
849 fn body_len(&self) -> Option<u64> {
850 match self.response_body {
851 Some(ref body) => body.len(),
852 None => Some(0),
853 }
854 }
855
856 fn write_headers(&mut self, output_buffer: &mut Vec<u8>) -> Result<()> {
857 use std::io::Write;
858 let status = self.status().unwrap_or(Status::NotFound);
859
860 write!(
861 output_buffer,
862 "{} {} {}\r\n",
863 self.version,
864 status as u16,
865 status.canonical_reason()
866 )?;
867
868 self.finalize_headers();
869
870 log::trace!(
871 "sending:\n{} {}\n{}",
872 self.version,
873 status,
874 &self.response_headers
875 );
876
877 for (name, values) in &self.response_headers {
878 if name.is_valid() {
879 for value in values {
880 if value.is_valid() {
881 write!(output_buffer, "{name}: ")?;
882 output_buffer.extend_from_slice(value.as_ref());
883 write!(output_buffer, "\r\n")?;
884 } else {
885 log::error!("skipping invalid header value {value:?} for header {name}");
886 }
887 }
888 } else {
889 log::error!("skipping invalid header with name {name:?}");
890 }
891 }
892
893 write!(output_buffer, "\r\n")?;
894 Ok(())
895 }
896
897 /// applies a mapping function from one transport to another. This
898 /// is particularly useful for boxing the transport. unless you're
899 /// sure this is what you're looking for, you probably don't want
900 /// to be using this
901 pub fn map_transport<T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static>(
902 self,
903 f: impl Fn(Transport) -> T,
904 ) -> Conn<T> {
905 let Conn {
906 request_headers,
907 response_headers,
908 path,
909 status,
910 version,
911 state,
912 transport,
913 buffer,
914 request_body_state,
915 secure,
916 method,
917 response_body,
918 stopper,
919 after_send,
920 start_time,
921 peer_ip,
922 http_config,
923 } = self;
924
925 Conn {
926 request_headers,
927 response_headers,
928 method,
929 response_body,
930 path,
931 status,
932 version,
933 state,
934 transport: f(transport),
935 buffer,
936 request_body_state,
937 secure,
938 stopper,
939 after_send,
940 start_time,
941 peer_ip,
942 http_config,
943 }
944 }
945
946 /// Get a reference to the transport.
947 pub fn transport(&self) -> &Transport {
948 &self.transport
949 }
950
951 /// Get a mutable reference to the transport.
952 ///
953 /// This should only be used to call your own custom methods on the transport that do not read
954 /// or write any data. Calling any method that reads from or writes to the transport will
955 /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
956 /// use an HTTP upgrade.
957 pub fn transport_mut(&mut self) -> &mut Transport {
958 &mut self.transport
959 }
960
961 /// sets the remote ip address for this conn, if available.
962 pub fn set_peer_ip(&mut self, peer_ip: Option<IpAddr>) {
963 self.peer_ip = peer_ip;
964 }
965
966 /// retrieves the remote ip address for this conn, if available.
967 pub fn peer_ip(&self) -> Option<IpAddr> {
968 self.peer_ip
969 }
970}