Skip to main content

hyper/client/conn/
http1.rs

1//! HTTP/1 client connections
2
3use std::error::Error as StdError;
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use crate::rt::{Read, Write};
10use bytes::Bytes;
11use futures_core::ready;
12use http::{Request, Response};
13use httparse::ParserConfig;
14
15use super::super::dispatch::{self, TrySendError};
16use crate::body::{Body, Incoming as IncomingBody};
17use crate::proto;
18
19type Dispatcher<T, B> =
20    proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
21
22/// The sender side of an established connection.
23pub struct SendRequest<B> {
24    dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
25}
26
27/// Deconstructed parts of a `Connection`.
28///
29/// This allows taking apart a `Connection` at a later time, in order to
30/// reclaim the IO object, and additional related pieces.
31#[derive(Debug)]
32#[non_exhaustive]
33pub struct Parts<T> {
34    /// The original IO object used in the handshake.
35    pub io: T,
36    /// A buffer of bytes that have been read but not processed as HTTP.
37    ///
38    /// For instance, if the `Connection` is used for an HTTP upgrade request,
39    /// it is possible the server sent back the first bytes of the new protocol
40    /// along with the response upgrade.
41    ///
42    /// You will want to check for any existing bytes if you plan to continue
43    /// communicating on the IO object.
44    pub read_buf: Bytes,
45}
46
47/// A future that processes all HTTP state for the IO object.
48///
49/// In most cases, this should just be spawned into an executor, so that it
50/// can process incoming and outgoing messages, notice hangups, and the like.
51///
52/// Instances of this type are typically created via the [`handshake`] function
53#[must_use = "futures do nothing unless polled"]
54pub struct Connection<T, B>
55where
56    T: Read + Write,
57    B: Body + 'static,
58{
59    inner: Dispatcher<T, B>,
60}
61
62impl<T, B> Connection<T, B>
63where
64    T: Read + Write + Unpin,
65    B: Body + 'static,
66    B::Error: Into<Box<dyn StdError + Send + Sync>>,
67{
68    /// Return the inner IO object, and additional information.
69    ///
70    /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
71    pub fn into_parts(self) -> Parts<T> {
72        let (io, read_buf, _) = self.inner.into_inner();
73        Parts { io, read_buf }
74    }
75
76    /// Poll the connection for completion, but without calling `shutdown`
77    /// on the underlying IO.
78    ///
79    /// This is useful to allow running a connection while doing an HTTP
80    /// upgrade. Once the upgrade is completed, the connection would be "done",
81    /// but it is not desired to actually shutdown the IO object. Instead you
82    /// would take it back using `into_parts`.
83    ///
84    /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
85    /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
86    /// to work with this function; or use the `without_shutdown` wrapper.
87    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
88        self.inner.poll_without_shutdown(cx)
89    }
90
91    /// Prevent shutdown of the underlying IO object at the end of service the request,
92    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
93    pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
94        let mut conn = Some(self);
95        crate::common::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
96            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
97            Poll::Ready(Ok(conn.take().unwrap().into_parts()))
98        })
99        .await
100    }
101}
102
103/// A builder to configure an HTTP connection.
104///
105/// After setting options, the builder is used to create a handshake future.
106///
107/// **Note**: The default values of options are *not considered stable*. They
108/// are subject to change at any time.
109#[derive(Clone, Debug)]
110pub struct Builder {
111    h09_responses: bool,
112    h1_parser_config: ParserConfig,
113    h1_writev: Option<bool>,
114    h1_title_case_headers: bool,
115    h1_preserve_header_case: bool,
116    h1_max_headers: Option<usize>,
117    #[cfg(feature = "ffi")]
118    h1_preserve_header_order: bool,
119    h1_read_buf_exact_size: Option<usize>,
120    h1_max_buf_size: Option<usize>,
121}
122
123/// Returns a handshake future over some IO.
124///
125/// This is a shortcut for `Builder::new().handshake(io)`.
126/// See [`client::conn`](crate::client::conn) for more.
127pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
128where
129    T: Read + Write + Unpin,
130    B: Body + 'static,
131    B::Data: Send,
132    B::Error: Into<Box<dyn StdError + Send + Sync>>,
133{
134    Builder::new().handshake(io).await
135}
136
137// ===== impl SendRequest
138
139impl<B> SendRequest<B> {
140    /// Polls to determine whether this sender can be used yet for a request.
141    ///
142    /// If the associated connection is closed, this returns an Error.
143    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
144        self.dispatch.poll_ready(cx)
145    }
146
147    /// Waits until the dispatcher is ready
148    ///
149    /// If the associated connection is closed, this returns an Error.
150    pub async fn ready(&mut self) -> crate::Result<()> {
151        crate::common::future::poll_fn(|cx| self.poll_ready(cx)).await
152    }
153
154    /// Checks if the connection is currently ready to send a request.
155    ///
156    /// # Note
157    ///
158    /// This is mostly a hint. Due to inherent latency of networks, it is
159    /// possible that even after checking this is ready, sending a request
160    /// may still fail because the connection was closed in the meantime.
161    pub fn is_ready(&self) -> bool {
162        self.dispatch.is_ready()
163    }
164
165    /// Checks if the connection side has been closed.
166    pub fn is_closed(&self) -> bool {
167        self.dispatch.is_closed()
168    }
169}
170
171impl<B> SendRequest<B>
172where
173    B: Body + 'static,
174{
175    /// Sends a `Request` on the associated connection.
176    ///
177    /// Returns a future that if successful, yields the `Response`.
178    ///
179    /// `req` must have a `Host` header.
180    ///
181    /// # Uri
182    ///
183    /// The `Uri` of the request is serialized as-is.
184    ///
185    /// - Usually you want origin-form (`/path?query`).
186    /// - For sending to an HTTP proxy, you want to send in absolute-form
187    ///   (`https://hyper.rs/guides`).
188    ///
189    /// This is however not enforced or validated and it is up to the user
190    /// of this method to ensure the `Uri` is correct for their intended purpose.
191    pub fn send_request(
192        &mut self,
193        req: Request<B>,
194    ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
195        let sent = self.dispatch.send(req);
196
197        async move {
198            match sent {
199                Ok(rx) => match rx.await {
200                    Ok(Ok(resp)) => Ok(resp),
201                    Ok(Err(err)) => Err(err),
202                    // this is definite bug if it happens, but it shouldn't happen!
203                    Err(_canceled) => panic!("dispatch dropped without returning error"),
204                },
205                Err(_req) => {
206                    debug!("connection was not ready");
207                    Err(crate::Error::new_canceled().with("connection was not ready"))
208                }
209            }
210        }
211    }
212
213    /// Sends a `Request` on the associated connection.
214    ///
215    /// Returns a future that if successful, yields the `Response`.
216    ///
217    /// # Error
218    ///
219    /// If there was an error before trying to serialize the request to the
220    /// connection, the message will be returned as part of this error.
221    pub fn try_send_request(
222        &mut self,
223        req: Request<B>,
224    ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
225        let sent = self.dispatch.try_send(req);
226        async move {
227            match sent {
228                Ok(rx) => match rx.await {
229                    Ok(Ok(res)) => Ok(res),
230                    Ok(Err(err)) => Err(err),
231                    // this is definite bug if it happens, but it shouldn't happen!
232                    Err(_) => panic!("dispatch dropped without returning error"),
233                },
234                Err(req) => {
235                    debug!("connection was not ready");
236                    let error = crate::Error::new_canceled().with("connection was not ready");
237                    Err(TrySendError {
238                        error,
239                        message: Some(req),
240                    })
241                }
242            }
243        }
244    }
245}
246
247impl<B> fmt::Debug for SendRequest<B> {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        f.debug_struct("SendRequest").finish()
250    }
251}
252
253// ===== impl Connection
254
255impl<T, B> Connection<T, B>
256where
257    T: Read + Write + Unpin + Send,
258    B: Body + 'static,
259    B::Error: Into<Box<dyn StdError + Send + Sync>>,
260{
261    /// Enable this connection to support higher-level HTTP upgrades.
262    ///
263    /// See [the `upgrade` module](crate::upgrade) for more.
264    pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
265        upgrades::UpgradeableConnection { inner: Some(self) }
266    }
267}
268
269impl<T, B> fmt::Debug for Connection<T, B>
270where
271    T: Read + Write + fmt::Debug,
272    B: Body + 'static,
273{
274    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275        f.debug_struct("Connection").finish()
276    }
277}
278
279impl<T, B> Future for Connection<T, B>
280where
281    T: Read + Write + Unpin,
282    B: Body + 'static,
283    B::Data: Send,
284    B::Error: Into<Box<dyn StdError + Send + Sync>>,
285{
286    type Output = crate::Result<()>;
287
288    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
289        match ready!(Pin::new(&mut self.inner).poll(cx))? {
290            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
291            proto::Dispatched::Upgrade(pending) => {
292                // With no `Send` bound on `I`, we can't try to do
293                // upgrades here. In case a user was trying to use
294                // `upgrade` with this API, send a special
295                // error letting them know about that.
296                pending.manual();
297                Poll::Ready(Ok(()))
298            }
299        }
300    }
301}
302
303// ===== impl Builder
304
305impl Default for Builder {
306    fn default() -> Self {
307        Self::new()
308    }
309}
310
311impl Builder {
312    /// Creates a new connection builder.
313    #[inline]
314    pub fn new() -> Builder {
315        Builder {
316            h09_responses: false,
317            h1_writev: None,
318            h1_read_buf_exact_size: None,
319            h1_parser_config: Default::default(),
320            h1_title_case_headers: false,
321            h1_preserve_header_case: false,
322            h1_max_headers: None,
323            #[cfg(feature = "ffi")]
324            h1_preserve_header_order: false,
325            h1_max_buf_size: None,
326        }
327    }
328
329    /// Set whether HTTP/0.9 responses should be tolerated.
330    ///
331    /// Default is false.
332    pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
333        self.h09_responses = enabled;
334        self
335    }
336
337    /// Set whether HTTP/1 connections will accept spaces between header names
338    /// and the colon that follow them in responses.
339    ///
340    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
341    /// to say about it:
342    ///
343    /// > No whitespace is allowed between the header field-name and colon. In
344    /// > the past, differences in the handling of such whitespace have led to
345    /// > security vulnerabilities in request routing and response handling. A
346    /// > server MUST reject any received request message that contains
347    /// > whitespace between a header field-name and colon with a response code
348    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
349    /// > response message before forwarding the message downstream.
350    ///
351    /// Default is false.
352    ///
353    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
354    pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
355        self.h1_parser_config
356            .allow_spaces_after_header_name_in_responses(enabled);
357        self
358    }
359
360    /// Set whether HTTP/1 connections will accept obsolete line folding for
361    /// header values.
362    ///
363    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
364    /// parsing.
365    ///
366    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
367    /// to say about it:
368    ///
369    /// > A server that receives an obs-fold in a request message that is not
370    /// > within a message/http container MUST either reject the message by
371    /// > sending a 400 (Bad Request), preferably with a representation
372    /// > explaining that obsolete line folding is unacceptable, or replace
373    /// > each received obs-fold with one or more SP octets prior to
374    /// > interpreting the field value or forwarding the message downstream.
375    ///
376    /// > A proxy or gateway that receives an obs-fold in a response message
377    /// > that is not within a message/http container MUST either discard the
378    /// > message and replace it with a 502 (Bad Gateway) response, preferably
379    /// > with a representation explaining that unacceptable line folding was
380    /// > received, or replace each received obs-fold with one or more SP
381    /// > octets prior to interpreting the field value or forwarding the
382    /// > message downstream.
383    ///
384    /// > A user agent that receives an obs-fold in a response message that is
385    /// > not within a message/http container MUST replace each received
386    /// > obs-fold with one or more SP octets prior to interpreting the field
387    /// > value.
388    ///
389    /// Default is false.
390    ///
391    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
392    pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
393        self.h1_parser_config
394            .allow_obsolete_multiline_headers_in_responses(enabled);
395        self
396    }
397
398    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
399    ///
400    /// If this is enabled and a header line does not start with a valid header
401    /// name, or does not include a colon at all, the line will be silently ignored
402    /// and no error will be reported.
403    ///
404    /// Default is false.
405    pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
406        self.h1_parser_config
407            .ignore_invalid_headers_in_responses(enabled);
408        self
409    }
410
411    /// Set whether HTTP/1 connections should try to use vectored writes,
412    /// or always flatten into a single buffer.
413    ///
414    /// Note that setting this to false may mean more copies of body data,
415    /// but may also improve performance when an IO transport doesn't
416    /// support vectored writes well, such as most TLS implementations.
417    ///
418    /// Setting this to true will force hyper to use queued strategy
419    /// which may eliminate unnecessary cloning on some TLS backends
420    ///
421    /// Default is `auto`. In this mode hyper will try to guess which
422    /// mode to use
423    pub fn writev(&mut self, enabled: bool) -> &mut Builder {
424        self.h1_writev = Some(enabled);
425        self
426    }
427
428    /// Set whether HTTP/1 connections will write header names as title case at
429    /// the socket level.
430    ///
431    /// Default is false.
432    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
433        self.h1_title_case_headers = enabled;
434        self
435    }
436
437    /// Set whether to support preserving original header cases.
438    ///
439    /// Currently, this will record the original cases received, and store them
440    /// in a private extension on the `Response`. It will also look for and use
441    /// such an extension in any provided `Request`.
442    ///
443    /// Since the relevant extension is still private, there is no way to
444    /// interact with the original cases. The only effect this can have now is
445    /// to forward the cases in a proxy-like fashion.
446    ///
447    /// Default is false.
448    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
449        self.h1_preserve_header_case = enabled;
450        self
451    }
452
453    /// Set the maximum number of headers.
454    ///
455    /// When a response is received, the parser will reserve a buffer to store headers for optimal
456    /// performance.
457    ///
458    /// If client receives more headers than the buffer size, the error "message header too large"
459    /// is returned.
460    ///
461    /// Note that headers is allocated on the stack by default, which has higher performance. After
462    /// setting this value, headers will be allocated in heap memory, that is, heap memory
463    /// allocation will occur for each response, and there will be a performance drop of about 5%.
464    ///
465    /// Default is 100.
466    pub fn max_headers(&mut self, val: usize) -> &mut Self {
467        self.h1_max_headers = Some(val);
468        self
469    }
470
471    /// Set whether to support preserving original header order.
472    ///
473    /// Currently, this will record the order in which headers are received, and store this
474    /// ordering in a private extension on the `Response`. It will also look for and use
475    /// such an extension in any provided `Request`.
476    ///
477    /// Default is false.
478    #[cfg(feature = "ffi")]
479    pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
480        self.h1_preserve_header_order = enabled;
481        self
482    }
483
484    /// Sets the exact size of the read buffer to *always* use.
485    ///
486    /// Note that setting this option unsets the `max_buf_size` option.
487    ///
488    /// Default is an adaptive read buffer.
489    pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
490        self.h1_read_buf_exact_size = sz;
491        self.h1_max_buf_size = None;
492        self
493    }
494
495    /// Set the maximum buffer size for the connection.
496    ///
497    /// Default is ~400kb.
498    ///
499    /// Note that setting this option unsets the `read_exact_buf_size` option.
500    ///
501    /// # Panics
502    ///
503    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
504    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
505        assert!(
506            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
507            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
508        );
509
510        self.h1_max_buf_size = Some(max);
511        self.h1_read_buf_exact_size = None;
512        self
513    }
514
515    /// Constructs a connection with the configured options and IO.
516    /// See [`client::conn`](crate::client::conn) for more.
517    ///
518    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
519    /// do nothing.
520    pub fn handshake<T, B>(
521        &self,
522        io: T,
523    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
524    where
525        T: Read + Write + Unpin,
526        B: Body + 'static,
527        B::Data: Send,
528        B::Error: Into<Box<dyn StdError + Send + Sync>>,
529    {
530        let opts = self.clone();
531
532        async move {
533            trace!("client handshake HTTP/1");
534
535            let (tx, rx) = dispatch::channel();
536            let mut conn = proto::Conn::new(io);
537            conn.set_h1_parser_config(opts.h1_parser_config);
538            if let Some(writev) = opts.h1_writev {
539                if writev {
540                    conn.set_write_strategy_queue();
541                } else {
542                    conn.set_write_strategy_flatten();
543                }
544            }
545            if opts.h1_title_case_headers {
546                conn.set_title_case_headers();
547            }
548            if opts.h1_preserve_header_case {
549                conn.set_preserve_header_case();
550            }
551            if let Some(max_headers) = opts.h1_max_headers {
552                conn.set_http1_max_headers(max_headers);
553            }
554            #[cfg(feature = "ffi")]
555            if opts.h1_preserve_header_order {
556                conn.set_preserve_header_order();
557            }
558
559            if opts.h09_responses {
560                conn.set_h09_responses();
561            }
562
563            if let Some(sz) = opts.h1_read_buf_exact_size {
564                conn.set_read_buf_exact_size(sz);
565            }
566            if let Some(max) = opts.h1_max_buf_size {
567                conn.set_max_buf_size(max);
568            }
569            let cd = proto::h1::dispatch::Client::new(rx);
570            let proto = proto::h1::Dispatcher::new(cd, conn);
571
572            Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
573        }
574    }
575}
576
577mod upgrades {
578    use crate::upgrade::Upgraded;
579
580    use super::*;
581
582    // A future binding a connection with a Service with Upgrade support.
583    //
584    // This type is unnameable outside the crate.
585    #[must_use = "futures do nothing unless polled"]
586    #[allow(missing_debug_implementations)]
587    pub struct UpgradeableConnection<T, B>
588    where
589        T: Read + Write + Unpin + Send + 'static,
590        B: Body + 'static,
591        B::Error: Into<Box<dyn StdError + Send + Sync>>,
592    {
593        pub(super) inner: Option<Connection<T, B>>,
594    }
595
596    impl<I, B> Future for UpgradeableConnection<I, B>
597    where
598        I: Read + Write + Unpin + Send + 'static,
599        B: Body + 'static,
600        B::Data: Send,
601        B::Error: Into<Box<dyn StdError + Send + Sync>>,
602    {
603        type Output = crate::Result<()>;
604
605        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
606            match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
607                Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
608                Ok(proto::Dispatched::Upgrade(pending)) => {
609                    let Parts { io, read_buf } = self.inner.take().unwrap().into_parts();
610                    pending.fulfill(Upgraded::new(io, read_buf));
611                    Poll::Ready(Ok(()))
612                }
613                Err(e) => Poll::Ready(Err(e)),
614            }
615        }
616    }
617}