trillium_http/
conn.rs

1use crate::{
2    after_send::{AfterSend, SendStatus},
3    copy,
4    http_config::DEFAULT_CONFIG,
5    liveness::{CancelOnDisconnect, LivenessFut},
6    received_body::ReceivedBodyState,
7    util::encoding,
8    Body, BufWriter, Buffer, ConnectionStatus, Error, HeaderName, HeaderValue, Headers, HttpConfig,
9    KnownHeaderName::{Connection, ContentLength, Date, Expect, Host, Server, TransferEncoding},
10    Method, ReceivedBody, Result, StateSet, Status, Stopper, Upgrade, Version,
11};
12use encoding_rs::Encoding;
13use futures_lite::{
14    future,
15    io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
16};
17use httparse::{Request, EMPTY_HEADER};
18use memchr::memmem::Finder;
19use std::{
20    fmt::{self, Debug, Formatter},
21    future::Future,
22    net::IpAddr,
23    pin::pin,
24    str::FromStr,
25    time::{Instant, SystemTime},
26};
27
28/// Default Server header
29pub const SERVER: &str = concat!("trillium/", env!("CARGO_PKG_VERSION"));
30
31/** A http connection
32
33Unlike in other rust http implementations, this struct represents both
34the request and the response, and holds the transport over which the
35response will be sent.
36*/
37pub struct Conn<Transport> {
38    pub(crate) request_headers: Headers,
39    pub(crate) response_headers: Headers,
40    pub(crate) path: String,
41    pub(crate) method: Method,
42    pub(crate) status: Option<Status>,
43    pub(crate) version: Version,
44    pub(crate) state: StateSet,
45    pub(crate) response_body: Option<Body>,
46    pub(crate) transport: Transport,
47    pub(crate) buffer: Buffer,
48    pub(crate) request_body_state: ReceivedBodyState,
49    pub(crate) secure: bool,
50    pub(crate) stopper: Stopper,
51    pub(crate) after_send: AfterSend,
52    pub(crate) start_time: Instant,
53    pub(crate) peer_ip: Option<IpAddr>,
54    pub(crate) http_config: HttpConfig,
55}
56
57impl<Transport> Debug for Conn<Transport> {
58    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
59        f.debug_struct("Conn")
60            .field("http_config", &self.http_config)
61            .field("request_headers", &self.request_headers)
62            .field("response_headers", &self.response_headers)
63            .field("path", &self.path)
64            .field("method", &self.method)
65            .field("status", &self.status)
66            .field("version", &self.version)
67            .field("state", &self.state)
68            .field("response_body", &self.response_body)
69            .field("transport", &"..")
70            .field("buffer", &"..")
71            .field("request_body_state", &self.request_body_state)
72            .field("secure", &self.secure)
73            .field("stopper", &self.stopper)
74            .field("after_send", &"..")
75            .field("start_time", &self.start_time)
76            .field("peer_ip", &self.peer_ip)
77            .finish()
78    }
79}
80
81impl<Transport> Conn<Transport>
82where
83    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
84{
85    /// read any number of new `Conn`s from the transport and call the
86    /// provided handler function until either the connection is closed or
87    /// an upgrade is requested. A return value of Ok(None) indicates a
88    /// closed connection, while a return value of Ok(Some(upgrade))
89    /// represents an upgrade.
90    ///
91    /// Provides a default [`HttpConfig`]
92    ///
93    /// See the documentation for [`Conn`] for a full example.
94    ///
95    /// # Errors
96    ///
97    /// This will return an error variant if:
98    ///
99    /// * there is an io error when reading from the underlying transport
100    /// * headers are too long
101    /// * we are unable to parse some aspect of the request
102    /// * the request is an unsupported http version
103    /// * we cannot make sense of the headers, such as if there is a
104    /// `content-length` header as well as a `transfer-encoding: chunked`
105    /// header.
106
107    pub async fn map<F, Fut>(
108        transport: Transport,
109        stopper: Stopper,
110        handler: F,
111    ) -> Result<Option<Upgrade<Transport>>>
112    where
113        F: FnMut(Conn<Transport>) -> Fut,
114        Fut: Future<Output = Conn<Transport>> + Send,
115    {
116        Self::map_with_config(DEFAULT_CONFIG, transport, stopper, handler).await
117    }
118
119    /// read any number of new `Conn`s from the transport and call the
120    /// provided handler function until either the connection is closed or
121    /// an upgrade is requested. A return value of Ok(None) indicates a
122    /// closed connection, while a return value of Ok(Some(upgrade))
123    /// represents an upgrade.
124    ///
125    /// See the documentation for [`Conn`] for a full example.
126    ///
127    /// # Errors
128    ///
129    /// This will return an error variant if:
130    ///
131    /// * there is an io error when reading from the underlying transport
132    /// * headers are too long
133    /// * we are unable to parse some aspect of the request
134    /// * the request is an unsupported http version
135    /// * we cannot make sense of the headers, such as if there is a
136    /// `content-length` header as well as a `transfer-encoding: chunked`
137    /// header.
138    pub async fn map_with_config<F, Fut>(
139        http_config: HttpConfig,
140        transport: Transport,
141        stopper: Stopper,
142        mut handler: F,
143    ) -> Result<Option<Upgrade<Transport>>>
144    where
145        F: FnMut(Conn<Transport>) -> Fut,
146        Fut: Future<Output = Conn<Transport>> + Send,
147    {
148        let mut conn = Conn::new_with_config(
149            http_config,
150            transport,
151            Vec::with_capacity(http_config.request_buffer_initial_len).into(),
152            stopper,
153        )
154        .await?;
155        loop {
156            conn = match handler(conn).await.send().await? {
157                ConnectionStatus::Upgrade(upgrade) => return Ok(Some(upgrade)),
158                ConnectionStatus::Close => return Ok(None),
159                ConnectionStatus::Conn(next) => next,
160            }
161        }
162    }
163
164    async fn send(mut self) -> Result<ConnectionStatus<Transport>> {
165        let mut output_buffer = Vec::with_capacity(self.http_config.response_buffer_len);
166        self.write_headers(&mut output_buffer)?;
167
168        let mut bufwriter = BufWriter::new_with_buffer(output_buffer, &mut self.transport);
169
170        if self.method != Method::Head
171            && !matches!(self.status, Some(Status::NotModified | Status::NoContent))
172        {
173            if let Some(body) = self.response_body.take() {
174                copy(body, &mut bufwriter, self.http_config.copy_loops_per_yield).await?;
175            }
176        }
177
178        bufwriter.flush().await?;
179        self.after_send.call(true.into());
180        self.finish().await
181    }
182
183    /// returns a read-only reference to the [state
184    /// typemap](StateSet) for this conn
185    ///
186    /// stability note: this is not unlikely to be removed at some
187    /// point, as this may end up being more of a trillium concern
188    /// than a `trillium_http` concern
189    pub fn state(&self) -> &StateSet {
190        &self.state
191    }
192
193    /// returns a mutable reference to the [state
194    /// typemap](StateSet) for this conn
195    ///
196    /// stability note: this is not unlikely to be removed at some
197    /// point, as this may end up being more of a trillium concern
198    /// than a `trillium_http` concern
199    pub fn state_mut(&mut self) -> &mut StateSet {
200        &mut self.state
201    }
202
203    /// returns a reference to the request headers
204    pub fn request_headers(&self) -> &Headers {
205        &self.request_headers
206    }
207
208    /// returns a mutable reference to the response [headers](Headers)
209    pub fn request_headers_mut(&mut self) -> &mut Headers {
210        &mut self.request_headers
211    }
212
213    /// returns a mutable reference to the response [headers](Headers)
214    pub fn response_headers_mut(&mut self) -> &mut Headers {
215        &mut self.response_headers
216    }
217
218    /// returns a reference to the response [headers](Headers)
219    pub fn response_headers(&self) -> &Headers {
220        &self.response_headers
221    }
222
223    /** sets the http status code from any `TryInto<Status>`.
224
225    ```
226    # use trillium_http::{Conn, Method, Status};
227    # let mut conn = Conn::new_synthetic(Method::Get, "/", ());
228    assert!(conn.status().is_none());
229
230    conn.set_status(200); // a status can be set as a u16
231    assert_eq!(conn.status().unwrap(), Status::Ok);
232
233    conn.set_status(Status::ImATeapot); // or as a Status
234    assert_eq!(conn.status().unwrap(), Status::ImATeapot);
235    ```
236    */
237    pub fn set_status(&mut self, status: impl TryInto<Status>) {
238        self.status = Some(status.try_into().unwrap_or_else(|_| {
239            log::error!("attempted to set an invalid status code");
240            Status::InternalServerError
241        }));
242    }
243
244    /// retrieves the current response status code for this conn, if
245    /// it has been set. See [`Conn::set_status`] for example usage.
246    pub fn status(&self) -> Option<Status> {
247        self.status
248    }
249
250    /**
251    retrieves the path part of the request url, up to and excluding any query component
252    ```
253    # use trillium_http::{Conn, Method};
254    let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
255    assert_eq!(conn.path(), "/some/path");
256    ```
257    */
258    pub fn path(&self) -> &str {
259        match self.path.split_once('?') {
260            Some((path, _)) => path,
261            None => &self.path,
262        }
263    }
264
265    /// retrieves the combined path and any query
266    pub fn path_and_query(&self) -> &str {
267        &self.path
268    }
269
270    /**
271    retrieves the query component of the path
272    ```
273    # use trillium_http::{Conn, Method};
274    let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
275    assert_eq!(conn.querystring(), "and&a=query");
276
277    let mut conn = Conn::new_synthetic(Method::Get, "/some/path", ());
278    assert_eq!(conn.querystring(), "");
279
280    ```
281    */
282    pub fn querystring(&self) -> &str {
283        match self.path.split_once('?') {
284            Some((_, query)) => query,
285            None => "",
286        }
287    }
288
289    /// get the host for this conn, if it exists
290    pub fn host(&self) -> Option<&str> {
291        self.request_headers.get_str(Host)
292    }
293
294    /// set the host for this conn
295    pub fn set_host(&mut self, host: String) {
296        self.request_headers.insert(Host, host);
297    }
298
299    // pub fn url(&self) -> Result<Url> {
300    //     let path = self.path();
301    //     let host = self.host().unwrap_or_else(|| String::from("_"));
302    //     let method = self.method();
303    //     if path.starts_with("http://") || path.starts_with("https://") {
304    //         Ok(Url::parse(path)?)
305    //     } else if path.starts_with('/') {
306    //         Ok(Url::parse(&format!("http://{}{}", host, path))?)
307    //     } else if method == &Method::Connect {
308    //         Ok(Url::parse(&format!("http://{}/", path))?)
309    //     } else {
310    //         Err(Error::UnexpectedUriFormat)
311    //     }
312    // }
313
314    /**
315    Sets the response body to anything that is [`impl Into<Body>`][Body].
316
317    ```
318    # use trillium_http::{Conn, Method, Body};
319    # let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
320    conn.set_response_body("hello");
321    conn.set_response_body(String::from("hello"));
322    conn.set_response_body(vec![99, 97, 116]);
323    ```
324    */
325    pub fn set_response_body(&mut self, body: impl Into<Body>) {
326        self.response_body = Some(body.into());
327    }
328
329    /// returns a reference to the current response body, if it has been set
330    pub fn response_body(&self) -> Option<&Body> {
331        self.response_body.as_ref()
332    }
333
334    /**
335    remove the response body from this conn and return it
336
337    ```
338    # use trillium_http::{Conn, Method};
339    # let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
340    assert!(conn.response_body().is_none());
341    conn.set_response_body("hello");
342    assert!(conn.response_body().is_some());
343    let body = conn.take_response_body();
344    assert!(body.is_some());
345    assert!(conn.response_body().is_none());
346    ```
347    */
348    pub fn take_response_body(&mut self) -> Option<Body> {
349        self.response_body.take()
350    }
351
352    /**
353    returns the http method for this conn's request.
354    ```
355    # use trillium_http::{Conn, Method};
356    let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
357    assert_eq!(conn.method(), Method::Get);
358    ```
359     */
360    pub fn method(&self) -> Method {
361        self.method
362    }
363
364    /**
365    overrides the http method for this conn
366     */
367    pub fn set_method(&mut self, method: Method) {
368        self.method = method;
369    }
370
371    /**
372    returns the http version for this conn.
373    */
374    pub fn http_version(&self) -> Version {
375        self.version
376    }
377
378    /// Cancels and drops the future if reading from the transport results in an error or empty read
379    ///
380    /// The use of this method is not advised if your connected http client employs pipelining
381    /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
382    /// time
383    ///
384    /// If the client disconnects from the conn's transport, this function will return None. If the
385    /// future completes without disconnection, this future will return Some containing the output
386    /// of the future.
387    ///
388    /// The use of this method is not advised if your connected http client employs pipelining
389    /// (rarely seen in the wild), as it will buffer an unbounded number of requests
390    ///
391    /// Note that the inner future cannot borrow conn, so you will need to clone or take any
392    /// information needed to execute the future prior to executing this method.
393    ///
394    /// # Example
395    ///
396    /// ```rust
397    /// # use futures_lite::{AsyncRead, AsyncWrite};
398    /// # use trillium_http::{Conn, Method};
399    /// async fn something_slow_and_cancel_safe() -> String { String::from("this was not actually slow") }
400    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
401    /// where
402    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static
403    /// {
404    ///     let Some(returned_body) = conn.cancel_on_disconnect(async {
405    ///         something_slow_and_cancel_safe().await
406    ///     }).await else { return conn; };
407    ///     conn.set_response_body(returned_body);
408    ///     conn.set_status(200);
409    ///     conn
410    /// }
411    /// ```
412    pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
413    where
414        Fut: Future + Send + 'a,
415    {
416        CancelOnDisconnect(self, pin!(fut)).await
417    }
418
419    /// Check if the transport is connected by attempting to read from the transport
420    ///
421    /// # Example
422    ///
423    /// This is best to use at appropriate points in a long-running handler, like:
424    ///
425    /// ```rust
426    /// # use futures_lite::{AsyncRead, AsyncWrite};
427    /// # use trillium_http::{Conn, Method};
428    /// # async fn something_slow_but_not_cancel_safe() {}
429    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
430    /// where
431    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static
432    /// {
433    ///     for _ in 0..100 {
434    ///         if conn.is_disconnected().await {
435    ///             return conn;
436    ///         }
437    ///         something_slow_but_not_cancel_safe().await;
438    ///     }
439    ///    conn.set_status(200);
440    ///    conn
441    /// }
442    /// ```
443    pub async fn is_disconnected(&mut self) -> bool {
444        future::poll_once(LivenessFut::new(self)).await.is_some()
445    }
446
447    fn needs_100_continue(&self) -> bool {
448        self.request_body_state == ReceivedBodyState::Start
449            && self.version != Version::Http1_0
450            && self
451                .request_headers
452                .eq_ignore_ascii_case(Expect, "100-continue")
453    }
454
455    #[allow(clippy::needless_borrow, clippy::needless_borrows_for_generic_args)]
456    fn build_request_body(&mut self) -> ReceivedBody<'_, Transport> {
457        ReceivedBody::new_with_config(
458            self.request_content_length().ok().flatten(),
459            &mut self.buffer,
460            &mut self.transport,
461            &mut self.request_body_state,
462            None,
463            encoding(&self.request_headers),
464            &self.http_config,
465        )
466    }
467
468    /**
469    returns the [`encoding_rs::Encoding`] for this request, as
470    determined from the mime-type charset, if available
471
472    ```
473    # use trillium_http::{Conn, Method};
474    let mut conn = Conn::new_synthetic(Method::Get, "/", ());
475    assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
476    conn.request_headers_mut().insert("content-type", "text/plain;charset=utf-16");
477    assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
478    ```
479    */
480    pub fn request_encoding(&self) -> &'static Encoding {
481        encoding(&self.request_headers)
482    }
483
484    /**
485    returns the [`encoding_rs::Encoding`] for this response, as
486    determined from the mime-type charset, if available
487
488    ```
489    # use trillium_http::{Conn, Method};
490    let mut conn = Conn::new_synthetic(Method::Get, "/", ());
491    assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
492    conn.response_headers_mut().insert("content-type", "text/plain;charset=utf-16");
493    assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
494    ```
495    */
496    pub fn response_encoding(&self) -> &'static Encoding {
497        encoding(&self.response_headers)
498    }
499
500    /**
501    returns a [`ReceivedBody`] that references this conn. the conn
502    retains all data and holds the singular transport, but the
503    `ReceivedBody` provides an interface to read body content
504    ```
505    # async_io::block_on(async {
506    # use trillium_http::{Conn, Method};
507    let mut conn = Conn::new_synthetic(Method::Get, "/", "hello");
508    let request_body = conn.request_body().await;
509    assert_eq!(request_body.content_length(), Some(5));
510    assert_eq!(request_body.read_string().await.unwrap(), "hello");
511    # });
512    ```
513    */
514    pub async fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
515        if self.needs_100_continue() {
516            self.send_100_continue().await.ok();
517        }
518
519        self.build_request_body()
520    }
521
522    /// returns a clone of the [`stopper::Stopper`] for this Conn. use
523    /// this to gracefully stop long-running futures and streams
524    /// inside of handler functions
525    pub fn stopper(&self) -> Stopper {
526        self.stopper.clone()
527    }
528
529    fn validate_headers(request_headers: &Headers) -> Result<()> {
530        let content_length = request_headers.has_header(ContentLength);
531        let transfer_encoding_chunked =
532            request_headers.eq_ignore_ascii_case(TransferEncoding, "chunked");
533
534        if content_length && transfer_encoding_chunked {
535            Err(Error::UnexpectedHeader("content-length"))
536        } else {
537            Ok(())
538        }
539    }
540
541    /// # Create a new `Conn`
542    ///
543    /// This function creates a new conn from the provided
544    /// [`Transport`][crate::transport::Transport], as well as any
545    /// bytes that have already been read from the transport, and a
546    /// [`Stopper`] instance that will be used to signal graceful
547    /// shutdown.
548    ///
549    /// # Errors
550    ///
551    /// This will return an error variant if:
552    ///
553    /// * there is an io error when reading from the underlying transport
554    /// * headers are too long
555    /// * we are unable to parse some aspect of the request
556    /// * the request is an unsupported http version
557    /// * we cannot make sense of the headers, such as if there is a
558    /// `content-length` header as well as a `transfer-encoding: chunked`
559    /// header.
560    pub async fn new(transport: Transport, bytes: Vec<u8>, stopper: Stopper) -> Result<Self> {
561        Self::new_with_config(DEFAULT_CONFIG, transport, bytes.into(), stopper).await
562    }
563
564    /// # Create a new `Conn`
565    ///
566    /// This function creates a new conn from the provided
567    /// [`Transport`][crate::transport::Transport], as well as any
568    /// bytes that have already been read from the transport, and a
569    /// [`Stopper`] instance that will be used to signal graceful
570    /// shutdown.
571    ///
572    /// # Errors
573    ///
574    /// This will return an error variant if:
575    ///
576    /// * there is an io error when reading from the underlying transport
577    /// * headers are too long
578    /// * we are unable to parse some aspect of the request
579    /// * the request is an unsupported http version
580    /// * we cannot make sense of the headers, such as if there is a
581    /// `content-length` header as well as a `transfer-encoding: chunked`
582    /// header.
583    async fn new_with_config(
584        http_config: HttpConfig,
585        mut transport: Transport,
586        mut buffer: Buffer,
587        stopper: Stopper,
588    ) -> Result<Self> {
589        let (head_size, start_time) =
590            Self::head(&mut transport, &mut buffer, &stopper, &http_config).await?;
591
592        let mut headers = vec![EMPTY_HEADER; http_config.max_headers];
593        let mut httparse_req = Request::new(&mut headers);
594
595        let status = httparse_req.parse(&buffer[..])?;
596        if status.is_partial() {
597            return Err(Error::PartialHead);
598        }
599
600        let method = match httparse_req.method {
601            Some(method) => match method.parse() {
602                Ok(method) => method,
603                Err(_) => return Err(Error::UnrecognizedMethod(method.to_string())),
604            },
605            None => return Err(Error::MissingMethod),
606        };
607
608        let version = match httparse_req.version {
609            Some(0) => Version::Http1_0,
610            Some(1) => Version::Http1_1,
611            Some(version) => return Err(Error::UnsupportedVersion(version)),
612            None => return Err(Error::MissingVersion),
613        };
614
615        let mut request_headers = Headers::with_capacity(httparse_req.headers.len());
616        for header in httparse_req.headers {
617            let header_name = HeaderName::from_str(header.name)?;
618            let header_value = HeaderValue::from(header.value.to_owned());
619            request_headers.append(header_name, header_value);
620        }
621
622        Self::validate_headers(&request_headers)?;
623
624        let path = httparse_req
625            .path
626            .ok_or(Error::RequestPathMissing)?
627            .to_owned();
628        log::trace!("received:\n{method} {path} {version}\n{request_headers}");
629
630        let mut response_headers =
631            Headers::with_capacity(http_config.response_header_initial_capacity);
632        response_headers.insert(Server, SERVER);
633
634        buffer.ignore_front(head_size);
635
636        Ok(Self {
637            transport,
638            request_headers,
639            method,
640            version,
641            path,
642            buffer,
643            response_headers,
644            status: None,
645            state: StateSet::new(),
646            response_body: None,
647            request_body_state: ReceivedBodyState::Start,
648            secure: false,
649            stopper,
650            after_send: AfterSend::default(),
651            start_time,
652            peer_ip: None,
653            http_config,
654        })
655    }
656
657    /// predicate function to indicate whether the connection is
658    /// secure. note that this does not necessarily indicate that the
659    /// transport itself is secure, as it may indicate that
660    /// `trillium_http` is behind a trusted reverse proxy that has
661    /// terminated tls and provided appropriate headers to indicate
662    /// this.
663    pub fn is_secure(&self) -> bool {
664        self.secure
665    }
666
667    /// set whether the connection should be considered secure. note
668    /// that this does not necessarily indicate that the transport
669    /// itself is secure, as it may indicate that `trillium_http` is
670    /// behind a trusted reverse proxy that has terminated tls and
671    /// provided appropriate headers to indicate this.
672    pub fn set_secure(&mut self, secure: bool) {
673        self.secure = secure;
674    }
675
676    /**
677    calculates any auto-generated headers for this conn prior to sending it
678    */
679    pub fn finalize_headers(&mut self) {
680        if self.status == Some(Status::SwitchingProtocols) {
681            return;
682        }
683
684        self.response_headers
685            .try_insert_with(Date, || httpdate::fmt_http_date(SystemTime::now()));
686
687        if !matches!(self.status, Some(Status::NotModified | Status::NoContent)) {
688            if let Some(len) = self.body_len() {
689                self.response_headers
690                    .try_insert(ContentLength, len.to_string());
691            }
692
693            if self.version == Version::Http1_1 && !self.response_headers.has_header(ContentLength)
694            {
695                self.response_headers.insert(TransferEncoding, "chunked");
696            } else {
697                self.response_headers.remove(TransferEncoding);
698            }
699        }
700
701        if self.stopper.is_stopped() {
702            self.response_headers.insert(Connection, "close");
703        }
704    }
705
706    /**
707    Registers a function to call after the http response has been
708    completely transferred. Please note that this is a sync function
709    and should be computationally lightweight. If your _application_
710    needs additional async processing, use your runtime's task spawn
711    within this hook.  If your _library_ needs additional async
712    processing in an `after_send` hook, please open an issue. This hook
713    is currently designed for simple instrumentation and logging, and
714    should be thought of as equivalent to a Drop hook.
715    */
716    pub fn after_send<F>(&mut self, after_send: F)
717    where
718        F: FnOnce(SendStatus) + Send + Sync + 'static,
719    {
720        self.after_send.append(after_send);
721    }
722
723    /// The [`Instant`] that the first header bytes for this conn were
724    /// received, before any processing or parsing has been performed.
725    pub fn start_time(&self) -> Instant {
726        self.start_time
727    }
728
729    async fn send_100_continue(&mut self) -> Result<()> {
730        log::trace!("sending 100-continue");
731        Ok(self
732            .transport
733            .write_all(b"HTTP/1.1 100 Continue\r\n\r\n")
734            .await?)
735    }
736
737    async fn head(
738        transport: &mut Transport,
739        buf: &mut Buffer,
740        stopper: &Stopper,
741        http_config: &HttpConfig,
742    ) -> Result<(usize, Instant)> {
743        let mut len = 0;
744        let mut start_with_read = buf.is_empty();
745        let mut instant = None;
746        let finder = Finder::new(b"\r\n\r\n");
747        loop {
748            if len >= http_config.head_max_len {
749                return Err(Error::HeadersTooLong);
750            }
751
752            let bytes = if start_with_read {
753                buf.expand();
754                if len == 0 {
755                    stopper
756                        .stop_future(transport.read(buf))
757                        .await
758                        .ok_or(Error::Closed)??
759                } else {
760                    transport.read(&mut buf[len..]).await?
761                }
762            } else {
763                start_with_read = true;
764                buf.len()
765            };
766
767            if instant.is_none() {
768                instant = Some(Instant::now());
769            }
770
771            let search_start = len.max(3) - 3;
772            let search = finder.find(&buf[search_start..]);
773
774            if let Some(index) = search {
775                buf.truncate(len + bytes);
776                return Ok((search_start + index + 4, instant.unwrap()));
777            }
778
779            len += bytes;
780
781            if bytes == 0 {
782                return if len == 0 {
783                    Err(Error::Closed)
784                } else {
785                    Err(Error::PartialHead)
786                };
787            }
788        }
789    }
790
791    async fn next(mut self) -> Result<Self> {
792        if !self.needs_100_continue() || self.request_body_state != ReceivedBodyState::Start {
793            self.build_request_body().drain().await?;
794        }
795        Conn::new_with_config(self.http_config, self.transport, self.buffer, self.stopper).await
796    }
797
798    fn should_close(&self) -> bool {
799        let request_connection = self.request_headers.get_lower(Connection);
800        let response_connection = self.response_headers.get_lower(Connection);
801
802        match (
803            request_connection.as_deref(),
804            response_connection.as_deref(),
805        ) {
806            (Some("keep-alive"), Some("keep-alive")) => false,
807            (Some("close"), _) | (_, Some("close")) => true,
808            _ => self.version == Version::Http1_0,
809        }
810    }
811
812    fn should_upgrade(&self) -> bool {
813        (self.method() == Method::Connect && self.status == Some(Status::Ok))
814            || self.status == Some(Status::SwitchingProtocols)
815    }
816
817    async fn finish(self) -> Result<ConnectionStatus<Transport>> {
818        if self.should_close() {
819            Ok(ConnectionStatus::Close)
820        } else if self.should_upgrade() {
821            Ok(ConnectionStatus::Upgrade(self.into()))
822        } else {
823            match self.next().await {
824                Err(Error::Closed) => {
825                    log::trace!("connection closed by client");
826                    Ok(ConnectionStatus::Close)
827                }
828                Err(e) => Err(e),
829                Ok(conn) => Ok(ConnectionStatus::Conn(conn)),
830            }
831        }
832    }
833
834    fn request_content_length(&self) -> Result<Option<u64>> {
835        if self
836            .request_headers
837            .eq_ignore_ascii_case(TransferEncoding, "chunked")
838        {
839            Ok(None)
840        } else if let Some(cl) = self.request_headers.get_str(ContentLength) {
841            cl.parse()
842                .map(Some)
843                .map_err(|_| Error::MalformedHeader("content-length".into()))
844        } else {
845            Ok(Some(0))
846        }
847    }
848
849    fn body_len(&self) -> Option<u64> {
850        match self.response_body {
851            Some(ref body) => body.len(),
852            None => Some(0),
853        }
854    }
855
856    fn write_headers(&mut self, output_buffer: &mut Vec<u8>) -> Result<()> {
857        use std::io::Write;
858        let status = self.status().unwrap_or(Status::NotFound);
859
860        write!(
861            output_buffer,
862            "{} {} {}\r\n",
863            self.version,
864            status as u16,
865            status.canonical_reason()
866        )?;
867
868        self.finalize_headers();
869
870        log::trace!(
871            "sending:\n{} {}\n{}",
872            self.version,
873            status,
874            &self.response_headers
875        );
876
877        for (name, values) in &self.response_headers {
878            if name.is_valid() {
879                for value in values {
880                    if value.is_valid() {
881                        write!(output_buffer, "{name}: ")?;
882                        output_buffer.extend_from_slice(value.as_ref());
883                        write!(output_buffer, "\r\n")?;
884                    } else {
885                        log::error!("skipping invalid header value {value:?} for header {name}");
886                    }
887                }
888            } else {
889                log::error!("skipping invalid header with name {name:?}");
890            }
891        }
892
893        write!(output_buffer, "\r\n")?;
894        Ok(())
895    }
896
897    /// applies a mapping function from one transport to another. This
898    /// is particularly useful for boxing the transport. unless you're
899    /// sure this is what you're looking for, you probably don't want
900    /// to be using this
901    pub fn map_transport<T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static>(
902        self,
903        f: impl Fn(Transport) -> T,
904    ) -> Conn<T> {
905        let Conn {
906            request_headers,
907            response_headers,
908            path,
909            status,
910            version,
911            state,
912            transport,
913            buffer,
914            request_body_state,
915            secure,
916            method,
917            response_body,
918            stopper,
919            after_send,
920            start_time,
921            peer_ip,
922            http_config,
923        } = self;
924
925        Conn {
926            request_headers,
927            response_headers,
928            method,
929            response_body,
930            path,
931            status,
932            version,
933            state,
934            transport: f(transport),
935            buffer,
936            request_body_state,
937            secure,
938            stopper,
939            after_send,
940            start_time,
941            peer_ip,
942            http_config,
943        }
944    }
945
946    /// Get a reference to the transport.
947    pub fn transport(&self) -> &Transport {
948        &self.transport
949    }
950
951    /// Get a mutable reference to the transport.
952    ///
953    /// This should only be used to call your own custom methods on the transport that do not read
954    /// or write any data. Calling any method that reads from or writes to the transport will
955    /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
956    /// use an HTTP upgrade.
957    pub fn transport_mut(&mut self) -> &mut Transport {
958        &mut self.transport
959    }
960
961    /// sets the remote ip address for this conn, if available.
962    pub fn set_peer_ip(&mut self, peer_ip: Option<IpAddr>) {
963        self.peer_ip = peer_ip;
964    }
965
966    /// retrieves the remote ip address for this conn, if available.
967    pub fn peer_ip(&self) -> Option<IpAddr> {
968        self.peer_ip
969    }
970}