rama_http_core/client/conn/
http1.rs

1//! HTTP/1 client connections
2
3use std::fmt;
4use std::pin::Pin;
5use std::task::{Context, Poll, ready};
6
7use bytes::Bytes;
8use httparse::ParserConfig;
9use rama_core::error::BoxError;
10use rama_http_types::{Request, Response};
11use tokio::io::{AsyncRead, AsyncWrite};
12use tracing::{debug, trace};
13
14use super::super::dispatch::{self, TrySendError};
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::proto;
17
18type Dispatcher<T, B> =
19    proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
20
21/// The sender side of an established connection.
22pub struct SendRequest<B> {
23    dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
24}
25
26/// Deconstructed parts of a `Connection`.
27///
28/// This allows taking apart a `Connection` at a later time, in order to
29/// reclaim the IO object, and additional related pieces.
30#[derive(Debug)]
31#[non_exhaustive]
32pub struct Parts<T> {
33    /// The original IO object used in the handshake.
34    pub io: T,
35    /// A buffer of bytes that have been read but not processed as HTTP.
36    ///
37    /// For instance, if the `Connection` is used for an HTTP upgrade request,
38    /// it is possible the server sent back the first bytes of the new protocol
39    /// along with the response upgrade.
40    ///
41    /// You will want to check for any existing bytes if you plan to continue
42    /// communicating on the IO object.
43    pub read_buf: Bytes,
44}
45
46/// A future that processes all HTTP state for the IO object.
47///
48/// In most cases, this should just be spawned into an executor, so that it
49/// can process incoming and outgoing messages, notice hangups, and the like.
50///
51/// Instances of this type are typically created via the [`handshake`] function
52#[must_use = "futures do nothing unless polled"]
53pub struct Connection<T, B>
54where
55    T: AsyncRead + AsyncWrite,
56    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
57{
58    inner: Dispatcher<T, B>,
59}
60
61impl<T, B> Connection<T, B>
62where
63    T: AsyncRead + AsyncWrite + Unpin,
64    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
65{
66    /// Return the inner IO object, and additional information.
67    ///
68    /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
69    pub fn into_parts(self) -> Parts<T> {
70        let (io, read_buf, _) = self.inner.into_inner();
71        Parts { io, read_buf }
72    }
73
74    /// Poll the connection for completion, but without calling `shutdown`
75    /// on the underlying IO.
76    ///
77    /// This is useful to allow running a connection while doing an HTTP
78    /// upgrade. Once the upgrade is completed, the connection would be "done",
79    /// but it is not desired to actually shutdown the IO object. Instead you
80    /// would take it back using `into_parts`.
81    ///
82    /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
83    /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
84    /// to work with this function; or use the `without_shutdown` wrapper.
85    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
86        self.inner.poll_without_shutdown(cx)
87    }
88
89    /// Prevent shutdown of the underlying IO object at the end of service the request,
90    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
91    pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
92        let mut conn = Some(self);
93        futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
94            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
95            Poll::Ready(Ok(conn.take().unwrap().into_parts()))
96        })
97        .await
98    }
99}
100
101/// A builder to configure an HTTP connection.
102///
103/// After setting options, the builder is used to create a handshake future.
104///
105/// **Note**: The default values of options are *not considered stable*. They
106/// are subject to change at any time.
107#[derive(Clone, Debug)]
108pub struct Builder {
109    h09_responses: bool,
110    h1_parser_config: ParserConfig,
111    h1_writev: Option<bool>,
112    h1_title_case_headers: bool,
113    h1_max_headers: Option<usize>,
114
115    h1_read_buf_exact_size: Option<usize>,
116    h1_max_buf_size: Option<usize>,
117}
118
119/// Returns a handshake future over some IO.
120///
121/// This is a shortcut for `Builder::new().handshake(io)`.
122/// See [`client::conn`](crate::client::conn) for more.
123pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
124where
125    T: AsyncRead + AsyncWrite + Unpin,
126    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
127{
128    Builder::new().handshake(io).await
129}
130
131// ===== impl SendRequest
132
133impl<B> SendRequest<B> {
134    /// Polls to determine whether this sender can be used yet for a request.
135    ///
136    /// If the associated connection is closed, this returns an Error.
137    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
138        self.dispatch.poll_ready(cx)
139    }
140
141    /// Waits until the dispatcher is ready
142    ///
143    /// If the associated connection is closed, this returns an Error.
144    pub async fn ready(&mut self) -> crate::Result<()> {
145        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
146    }
147
148    /// Checks if the connection is currently ready to send a request.
149    ///
150    /// # Note
151    ///
152    /// This is mostly a hint. Due to inherent latency of networks, it is
153    /// possible that even after checking this is ready, sending a request
154    /// may still fail because the connection was closed in the meantime.
155    pub fn is_ready(&self) -> bool {
156        self.dispatch.is_ready()
157    }
158
159    /// Checks if the connection side has been closed.
160    pub fn is_closed(&self) -> bool {
161        self.dispatch.is_closed()
162    }
163}
164
165impl<B> SendRequest<B>
166where
167    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
168{
169    /// Sends a `Request` on the associated connection.
170    ///
171    /// Returns a future that if successful, yields the `Response`.
172    ///
173    /// `req` must have a `Host` header.
174    ///
175    /// # Uri
176    ///
177    /// The `Uri` of the request is serialized as-is.
178    ///
179    /// - Usually you want origin-form (`/path?query`).
180    /// - For sending to an HTTP proxy, you want to send in absolute-form.
181    ///
182    /// This is however not enforced or validated and it is up to the user
183    /// of this method to ensure the `Uri` is correct for their intended purpose.
184    pub fn send_request(
185        &mut self,
186        req: Request<B>,
187    ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> + use<B> {
188        let sent = self.dispatch.send(req);
189
190        async move {
191            match sent {
192                Ok(rx) => match rx.await {
193                    Ok(Ok(resp)) => Ok(resp),
194                    Ok(Err(err)) => Err(err),
195                    // this is definite bug if it happens, but it shouldn't happen!
196                    Err(_canceled) => panic!("dispatch dropped without returning error"),
197                },
198                Err(_req) => {
199                    debug!("connection was not ready");
200                    Err(crate::Error::new_canceled().with("connection was not ready"))
201                }
202            }
203        }
204    }
205
206    /// Sends a `Request` on the associated connection.
207    ///
208    /// Returns a future that if successful, yields the `Response`.
209    ///
210    /// # Error
211    ///
212    /// If there was an error before trying to serialize the request to the
213    /// connection, the message will be returned as part of this error.
214    pub fn try_send_request(
215        &mut self,
216        req: Request<B>,
217    ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
218        let sent = self.dispatch.try_send(req);
219        async move {
220            match sent {
221                Ok(rx) => match rx.await {
222                    Ok(Ok(res)) => Ok(res),
223                    Ok(Err(err)) => Err(err),
224                    // this is definite bug if it happens, but it shouldn't happen!
225                    Err(_) => panic!("dispatch dropped without returning error"),
226                },
227                Err(req) => {
228                    debug!("connection was not ready");
229                    let error = crate::Error::new_canceled().with("connection was not ready");
230                    Err(TrySendError {
231                        error,
232                        message: Some(req),
233                    })
234                }
235            }
236        }
237    }
238}
239
240impl<B> fmt::Debug for SendRequest<B> {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        f.debug_struct("SendRequest").finish()
243    }
244}
245
246// ===== impl Connection
247
248impl<T, B> Connection<T, B>
249where
250    T: AsyncRead + AsyncWrite + Unpin + Send,
251    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
252{
253    /// Enable this connection to support higher-level HTTP upgrades.
254    ///
255    /// See [the `upgrade` module](crate::upgrade) for more.
256    pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
257        upgrades::UpgradeableConnection { inner: Some(self) }
258    }
259}
260
261impl<T, B> fmt::Debug for Connection<T, B>
262where
263    T: AsyncRead + AsyncWrite + fmt::Debug,
264    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
265{
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        f.debug_struct("Connection").finish()
268    }
269}
270
271impl<T, B> Future for Connection<T, B>
272where
273    T: AsyncRead + AsyncWrite + Unpin,
274    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
275{
276    type Output = crate::Result<()>;
277
278    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
279        match ready!(Pin::new(&mut self.inner).poll(cx))? {
280            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
281            proto::Dispatched::Upgrade(pending) => {
282                // With no `Send` bound on `I`, we can't try to do
283                // upgrades here. In case a user was trying to use
284                // `upgrade` with this API, send a special
285                // error letting them know about that.
286                pending.manual();
287                Poll::Ready(Ok(()))
288            }
289        }
290    }
291}
292
293// ===== impl Builder
294
295impl Default for Builder {
296    fn default() -> Self {
297        Self::new()
298    }
299}
300
301impl Builder {
302    /// Creates a new connection builder.
303    #[inline]
304    pub fn new() -> Builder {
305        Builder {
306            h09_responses: false,
307            h1_writev: None,
308            h1_read_buf_exact_size: None,
309            h1_parser_config: Default::default(),
310            h1_title_case_headers: false,
311            h1_max_headers: None,
312            h1_max_buf_size: None,
313        }
314    }
315
316    /// Set whether HTTP/0.9 responses should be tolerated.
317    ///
318    /// Default is false.
319    pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
320        self.h09_responses = enabled;
321        self
322    }
323
324    /// Set whether HTTP/1 connections will accept spaces between header names
325    /// and the colon that follow them in responses.
326    ///
327    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
328    /// to say about it:
329    ///
330    /// > No whitespace is allowed between the header field-name and colon. In
331    /// > the past, differences in the handling of such whitespace have led to
332    /// > security vulnerabilities in request routing and response handling. A
333    /// > server MUST reject any received request message that contains
334    /// > whitespace between a header field-name and colon with a response code
335    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
336    /// > response message before forwarding the message downstream.
337    ///
338    /// Default is false.
339    ///
340    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
341    pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
342        self.h1_parser_config
343            .allow_spaces_after_header_name_in_responses(enabled);
344        self
345    }
346
347    /// Set whether HTTP/1 connections will accept obsolete line folding for
348    /// header values.
349    ///
350    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
351    /// parsing.
352    ///
353    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
354    /// to say about it:
355    ///
356    /// > A server that receives an obs-fold in a request message that is not
357    /// > within a message/http container MUST either reject the message by
358    /// > sending a 400 (Bad Request), preferably with a representation
359    /// > explaining that obsolete line folding is unacceptable, or replace
360    /// > each received obs-fold with one or more SP octets prior to
361    /// > interpreting the field value or forwarding the message downstream.
362    ///
363    /// > A proxy or gateway that receives an obs-fold in a response message
364    /// > that is not within a message/http container MUST either discard the
365    /// > message and replace it with a 502 (Bad Gateway) response, preferably
366    /// > with a representation explaining that unacceptable line folding was
367    /// > received, or replace each received obs-fold with one or more SP
368    /// > octets prior to interpreting the field value or forwarding the
369    /// > message downstream.
370    ///
371    /// > A user agent that receives an obs-fold in a response message that is
372    /// > not within a message/http container MUST replace each received
373    /// > obs-fold with one or more SP octets prior to interpreting the field
374    /// > value.
375    ///
376    /// Default is false.
377    ///
378    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
379    pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
380        self.h1_parser_config
381            .allow_obsolete_multiline_headers_in_responses(enabled);
382        self
383    }
384
385    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
386    ///
387    /// If this is enabled and a header line does not start with a valid header
388    /// name, or does not include a colon at all, the line will be silently ignored
389    /// and no error will be reported.
390    ///
391    /// Default is false.
392    pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Builder {
393        self.h1_parser_config
394            .ignore_invalid_headers_in_responses(enabled);
395        self
396    }
397
398    /// Set whether HTTP/1 connections should try to use vectored writes,
399    /// or always flatten into a single buffer.
400    ///
401    /// Note that setting this to false may mean more copies of body data,
402    /// but may also improve performance when an IO transport doesn't
403    /// support vectored writes well, such as most TLS implementations.
404    ///
405    /// Setting this to true will force rama_http_core to use queued strategy
406    /// which may eliminate unnecessary cloning on some TLS backends
407    ///
408    /// Default is `auto`. In this mode rama_http_core will try to guess which
409    /// mode to use
410    pub fn writev(&mut self, enabled: bool) -> &mut Builder {
411        self.h1_writev = Some(enabled);
412        self
413    }
414
415    /// Set whether HTTP/1 connections will write header names as title case at
416    /// the socket level.
417    ///
418    /// Default is false.
419    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
420        self.h1_title_case_headers = enabled;
421        self
422    }
423
424    /// Set the maximum number of headers.
425    ///
426    /// When a response is received, the parser will reserve a buffer to store headers for optimal
427    /// performance.
428    ///
429    /// If client receives more headers than the buffer size, the error "message header too large"
430    /// is returned.
431    ///
432    /// Note that headers is allocated on the stack by default, which has higher performance. After
433    /// setting this value, headers will be allocated in heap memory, that is, heap memory
434    /// allocation will occur for each response, and there will be a performance drop of about 5%.
435    ///
436    /// Default is 100.
437    pub fn max_headers(&mut self, val: usize) -> &mut Self {
438        self.h1_max_headers = Some(val);
439        self
440    }
441
442    /// Sets the exact size of the read buffer to *always* use.
443    ///
444    /// Note that setting this option unsets the `max_buf_size` option.
445    ///
446    /// Default is an adaptive read buffer.
447    pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
448        self.h1_read_buf_exact_size = sz;
449        self.h1_max_buf_size = None;
450        self
451    }
452
453    /// Set the maximum buffer size for the connection.
454    ///
455    /// Default is ~400kb.
456    ///
457    /// Note that setting this option unsets the `read_exact_buf_size` option.
458    ///
459    /// # Panics
460    ///
461    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
462    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
463        assert!(
464            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
465            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
466        );
467
468        self.h1_max_buf_size = Some(max);
469        self.h1_read_buf_exact_size = None;
470        self
471    }
472
473    /// Constructs a connection with the configured options and IO.
474    /// See [`client::conn`](crate::client::conn) for more.
475    ///
476    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
477    /// do nothing.
478    pub fn handshake<T, B>(
479        &self,
480        io: T,
481    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
482    where
483        T: AsyncRead + AsyncWrite + Unpin,
484        B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
485    {
486        let opts = self.clone();
487
488        async move {
489            trace!("client handshake HTTP/1");
490
491            let (tx, rx) = dispatch::channel();
492            let mut conn = proto::Conn::new(io);
493            conn.set_h1_parser_config(opts.h1_parser_config);
494            if let Some(writev) = opts.h1_writev {
495                if writev {
496                    conn.set_write_strategy_queue();
497                } else {
498                    conn.set_write_strategy_flatten();
499                }
500            }
501            if opts.h1_title_case_headers {
502                conn.set_title_case_headers();
503            }
504            if let Some(max_headers) = opts.h1_max_headers {
505                conn.set_http1_max_headers(max_headers);
506            }
507
508            if opts.h09_responses {
509                conn.set_h09_responses();
510            }
511
512            if let Some(sz) = opts.h1_read_buf_exact_size {
513                conn.set_read_buf_exact_size(sz);
514            }
515            if let Some(max) = opts.h1_max_buf_size {
516                conn.set_max_buf_size(max);
517            }
518            let cd = proto::h1::dispatch::Client::new(rx);
519            let proto = proto::h1::Dispatcher::new(cd, conn);
520
521            Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
522        }
523    }
524}
525
526mod upgrades {
527    use crate::upgrade::Upgraded;
528
529    use super::*;
530
531    // A future binding a connection with a Service with Upgrade support.
532    //
533    // This type is unnameable outside the crate.
534    #[must_use = "futures do nothing unless polled"]
535    #[allow(missing_debug_implementations)]
536    pub struct UpgradeableConnection<T, B>
537    where
538        T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
539        B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
540    {
541        pub(super) inner: Option<Connection<T, B>>,
542    }
543
544    impl<I, B> Future for UpgradeableConnection<I, B>
545    where
546        I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
547        B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
548    {
549        type Output = crate::Result<()>;
550
551        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552            match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
553                Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
554                Ok(proto::Dispatched::Upgrade(pending)) => {
555                    let Parts { io, read_buf } = self.inner.take().unwrap().into_parts();
556                    pending.fulfill(Upgraded::new(io, read_buf));
557                    Poll::Ready(Ok(()))
558                }
559                Err(e) => Poll::Ready(Err(e)),
560            }
561        }
562    }
563}