rama_hyper/client/conn/
http1.rs

1//! HTTP/1 client connections
2
3use std::error::Error as StdError;
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use crate::rt::{Read, Write};
10use bytes::Bytes;
11use http::{Request, Response};
12use httparse::ParserConfig;
13
14use super::super::dispatch;
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::proto;
17
18type Dispatcher<T, B> =
19    proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
20
21/// The sender side of an established connection.
22pub struct SendRequest<B> {
23    dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
24}
25
26/// Deconstructed parts of a `Connection`.
27///
28/// This allows taking apart a `Connection` at a later time, in order to
29/// reclaim the IO object, and additional related pieces.
30#[derive(Debug)]
31pub struct Parts<T> {
32    /// The original IO object used in the handshake.
33    pub io: T,
34    /// A buffer of bytes that have been read but not processed as HTTP.
35    ///
36    /// For instance, if the `Connection` is used for an HTTP upgrade request,
37    /// it is possible the server sent back the first bytes of the new protocol
38    /// along with the response upgrade.
39    ///
40    /// You will want to check for any existing bytes if you plan to continue
41    /// communicating on the IO object.
42    pub read_buf: Bytes,
43    _inner: (),
44}
45
46/// A future that processes all HTTP state for the IO object.
47///
48/// In most cases, this should just be spawned into an executor, so that it
49/// can process incoming and outgoing messages, notice hangups, and the like.
50#[must_use = "futures do nothing unless polled"]
51pub struct Connection<T, B>
52where
53    T: Read + Write + 'static,
54    B: Body + 'static,
55{
56    inner: Dispatcher<T, B>,
57}
58
59impl<T, B> Connection<T, B>
60where
61    T: Read + Write + Unpin + 'static,
62    B: Body + 'static,
63    B::Error: Into<Box<dyn StdError + Send + Sync>>,
64{
65    /// Return the inner IO object, and additional information.
66    ///
67    /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
68    pub fn into_parts(self) -> Parts<T> {
69        let (io, read_buf, _) = self.inner.into_inner();
70        Parts {
71            io,
72            read_buf,
73            _inner: (),
74        }
75    }
76
77    /// Poll the connection for completion, but without calling `shutdown`
78    /// on the underlying IO.
79    ///
80    /// This is useful to allow running a connection while doing an HTTP
81    /// upgrade. Once the upgrade is completed, the connection would be "done",
82    /// but it is not desired to actually shutdown the IO object. Instead you
83    /// would take it back using `into_parts`.
84    ///
85    /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
86    /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
87    /// to work with this function; or use the `without_shutdown` wrapper.
88    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
89        self.inner.poll_without_shutdown(cx)
90    }
91
92    /// Prevent shutdown of the underlying IO object at the end of service the request,
93    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
94    pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
95        let mut conn = Some(self);
96        futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
97            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
98            Poll::Ready(Ok(conn.take().unwrap().into_parts()))
99        })
100        .await
101    }
102}
103
104/// A builder to configure an HTTP connection.
105///
106/// After setting options, the builder is used to create a handshake future.
107///
108/// **Note**: The default values of options are *not considered stable*. They
109/// are subject to change at any time.
110#[derive(Clone, Debug)]
111pub struct Builder {
112    h09_responses: bool,
113    h1_parser_config: ParserConfig,
114    h1_writev: Option<bool>,
115    h1_title_case_headers: bool,
116    h1_preserve_header_case: bool,
117    #[cfg(feature = "ffi")]
118    h1_preserve_header_order: bool,
119    h1_read_buf_exact_size: Option<usize>,
120    h1_max_buf_size: Option<usize>,
121}
122
123/// Returns a handshake future over some IO.
124///
125/// This is a shortcut for `Builder::new().handshake(io)`.
126/// See [`client::conn`](crate::client::conn) for more.
127pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
128where
129    T: Read + Write + Unpin + 'static,
130    B: Body + 'static,
131    B::Data: Send,
132    B::Error: Into<Box<dyn StdError + Send + Sync>>,
133{
134    Builder::new().handshake(io).await
135}
136
137// ===== impl SendRequest
138
139impl<B> SendRequest<B> {
140    /// Polls to determine whether this sender can be used yet for a request.
141    ///
142    /// If the associated connection is closed, this returns an Error.
143    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
144        self.dispatch.poll_ready(cx)
145    }
146
147    /// Waits until the dispatcher is ready
148    ///
149    /// If the associated connection is closed, this returns an Error.
150    pub async fn ready(&mut self) -> crate::Result<()> {
151        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
152    }
153
154    /// Checks if the connection is currently ready to send a request.
155    ///
156    /// # Note
157    ///
158    /// This is mostly a hint. Due to inherent latency of networks, it is
159    /// possible that even after checking this is ready, sending a request
160    /// may still fail because the connection was closed in the meantime.
161    pub fn is_ready(&self) -> bool {
162        self.dispatch.is_ready()
163    }
164
165    /// Checks if the connection side has been closed.
166    pub fn is_closed(&self) -> bool {
167        self.dispatch.is_closed()
168    }
169}
170
171impl<B> SendRequest<B>
172where
173    B: Body + 'static,
174{
175    /// Sends a `Request` on the associated connection.
176    ///
177    /// Returns a future that if successful, yields the `Response`.
178    ///
179    /// # Note
180    ///
181    /// There are some key differences in what automatic things the `Client`
182    /// does for you that will not be done here:
183    ///
184    /// - `Client` requires absolute-form `Uri`s, since the scheme and
185    ///   authority are needed to connect. They aren't required here.
186    /// - Since the `Client` requires absolute-form `Uri`s, it can add
187    ///   the `Host` header based on it. You must add a `Host` header yourself
188    ///   before calling this method.
189    /// - Since absolute-form `Uri`s are not required, if received, they will
190    ///   be serialized as-is.
191    pub fn send_request(
192        &mut self,
193        req: Request<B>,
194    ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
195        let sent = self.dispatch.send(req);
196
197        async move {
198            match sent {
199                Ok(rx) => match rx.await {
200                    Ok(Ok(resp)) => Ok(resp),
201                    Ok(Err(err)) => Err(err),
202                    // this is definite bug if it happens, but it shouldn't happen!
203                    Err(_canceled) => panic!("dispatch dropped without returning error"),
204                },
205                Err(_req) => {
206                    debug!("connection was not ready");
207                    Err(crate::Error::new_canceled().with("connection was not ready"))
208                }
209            }
210        }
211    }
212
213    /*
214    pub(super) fn send_request_retryable(
215        &mut self,
216        req: Request<B>,
217    ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
218    where
219        B: Send,
220    {
221        match self.dispatch.try_send(req) {
222            Ok(rx) => {
223                Either::Left(rx.then(move |res| {
224                    match res {
225                        Ok(Ok(res)) => future::ok(res),
226                        Ok(Err(err)) => future::err(err),
227                        // this is definite bug if it happens, but it shouldn't happen!
228                        Err(_) => panic!("dispatch dropped without returning error"),
229                    }
230                }))
231            }
232            Err(req) => {
233                debug!("connection was not ready");
234                let err = crate::Error::new_canceled().with("connection was not ready");
235                Either::Right(future::err((err, Some(req))))
236            }
237        }
238    }
239    */
240}
241
242impl<B> fmt::Debug for SendRequest<B> {
243    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244        f.debug_struct("SendRequest").finish()
245    }
246}
247
248// ===== impl Connection
249
250impl<T, B> Connection<T, B>
251where
252    T: Read + Write + Unpin + Send + 'static,
253    B: Body + 'static,
254    B::Error: Into<Box<dyn StdError + Send + Sync>>,
255{
256    /// Enable this connection to support higher-level HTTP upgrades.
257    ///
258    /// See [the `upgrade` module](crate::upgrade) for more.
259    pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
260        upgrades::UpgradeableConnection { inner: Some(self) }
261    }
262}
263
264impl<T, B> fmt::Debug for Connection<T, B>
265where
266    T: Read + Write + fmt::Debug + 'static,
267    B: Body + 'static,
268{
269    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270        f.debug_struct("Connection").finish()
271    }
272}
273
274impl<T, B> Future for Connection<T, B>
275where
276    T: Read + Write + Unpin + 'static,
277    B: Body + 'static,
278    B::Data: Send,
279    B::Error: Into<Box<dyn StdError + Send + Sync>>,
280{
281    type Output = crate::Result<()>;
282
283    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
284        match ready!(Pin::new(&mut self.inner).poll(cx))? {
285            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
286            proto::Dispatched::Upgrade(pending) => {
287                // With no `Send` bound on `I`, we can't try to do
288                // upgrades here. In case a user was trying to use
289                // `upgrade` with this API, send a special
290                // error letting them know about that.
291                pending.manual();
292                Poll::Ready(Ok(()))
293            }
294        }
295    }
296}
297
298// ===== impl Builder
299
300impl Builder {
301    /// Creates a new connection builder.
302    #[inline]
303    pub fn new() -> Builder {
304        Builder {
305            h09_responses: false,
306            h1_writev: None,
307            h1_read_buf_exact_size: None,
308            h1_parser_config: Default::default(),
309            h1_title_case_headers: false,
310            h1_preserve_header_case: false,
311            #[cfg(feature = "ffi")]
312            h1_preserve_header_order: false,
313            h1_max_buf_size: None,
314        }
315    }
316
317    /// Set whether HTTP/0.9 responses should be tolerated.
318    ///
319    /// Default is false.
320    pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
321        self.h09_responses = enabled;
322        self
323    }
324
325    /// Set whether HTTP/1 connections will accept spaces between header names
326    /// and the colon that follow them in responses.
327    ///
328    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
329    /// to say about it:
330    ///
331    /// > No whitespace is allowed between the header field-name and colon. In
332    /// > the past, differences in the handling of such whitespace have led to
333    /// > security vulnerabilities in request routing and response handling. A
334    /// > server MUST reject any received request message that contains
335    /// > whitespace between a header field-name and colon with a response code
336    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
337    /// > response message before forwarding the message downstream.
338    ///
339    /// Default is false.
340    ///
341    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
342    pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
343        self.h1_parser_config
344            .allow_spaces_after_header_name_in_responses(enabled);
345        self
346    }
347
348    /// Set whether HTTP/1 connections will accept obsolete line folding for
349    /// header values.
350    ///
351    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
352    /// parsing.
353    ///
354    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
355    /// to say about it:
356    ///
357    /// > A server that receives an obs-fold in a request message that is not
358    /// > within a message/http container MUST either reject the message by
359    /// > sending a 400 (Bad Request), preferably with a representation
360    /// > explaining that obsolete line folding is unacceptable, or replace
361    /// > each received obs-fold with one or more SP octets prior to
362    /// > interpreting the field value or forwarding the message downstream.
363    ///
364    /// > A proxy or gateway that receives an obs-fold in a response message
365    /// > that is not within a message/http container MUST either discard the
366    /// > message and replace it with a 502 (Bad Gateway) response, preferably
367    /// > with a representation explaining that unacceptable line folding was
368    /// > received, or replace each received obs-fold with one or more SP
369    /// > octets prior to interpreting the field value or forwarding the
370    /// > message downstream.
371    ///
372    /// > A user agent that receives an obs-fold in a response message that is
373    /// > not within a message/http container MUST replace each received
374    /// > obs-fold with one or more SP octets prior to interpreting the field
375    /// > value.
376    ///
377    /// Default is false.
378    ///
379    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
380    pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
381        self.h1_parser_config
382            .allow_obsolete_multiline_headers_in_responses(enabled);
383        self
384    }
385
386    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
387    ///
388    /// If this is enabled and and a header line does not start with a valid header
389    /// name, or does not include a colon at all, the line will be silently ignored
390    /// and no error will be reported.
391    ///
392    /// Default is false.
393    pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
394        self.h1_parser_config
395            .ignore_invalid_headers_in_responses(enabled);
396        self
397    }
398
399    /// Set whether HTTP/1 connections should try to use vectored writes,
400    /// or always flatten into a single buffer.
401    ///
402    /// Note that setting this to false may mean more copies of body data,
403    /// but may also improve performance when an IO transport doesn't
404    /// support vectored writes well, such as most TLS implementations.
405    ///
406    /// Setting this to true will force hyper to use queued strategy
407    /// which may eliminate unnecessary cloning on some TLS backends
408    ///
409    /// Default is `auto`. In this mode hyper will try to guess which
410    /// mode to use
411    pub fn writev(&mut self, enabled: bool) -> &mut Builder {
412        self.h1_writev = Some(enabled);
413        self
414    }
415
416    /// Set whether HTTP/1 connections will write header names as title case at
417    /// the socket level.
418    ///
419    /// Default is false.
420    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
421        self.h1_title_case_headers = enabled;
422        self
423    }
424
425    /// Set whether to support preserving original header cases.
426    ///
427    /// Currently, this will record the original cases received, and store them
428    /// in a private extension on the `Response`. It will also look for and use
429    /// such an extension in any provided `Request`.
430    ///
431    /// Since the relevant extension is still private, there is no way to
432    /// interact with the original cases. The only effect this can have now is
433    /// to forward the cases in a proxy-like fashion.
434    ///
435    /// Default is false.
436    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
437        self.h1_preserve_header_case = enabled;
438        self
439    }
440
441    /// Set whether to support preserving original header order.
442    ///
443    /// Currently, this will record the order in which headers are received, and store this
444    /// ordering in a private extension on the `Response`. It will also look for and use
445    /// such an extension in any provided `Request`.
446    ///
447    /// Default is false.
448    #[cfg(feature = "ffi")]
449    pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
450        self.h1_preserve_header_order = enabled;
451        self
452    }
453
454    /// Sets the exact size of the read buffer to *always* use.
455    ///
456    /// Note that setting this option unsets the `max_buf_size` option.
457    ///
458    /// Default is an adaptive read buffer.
459    pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
460        self.h1_read_buf_exact_size = sz;
461        self.h1_max_buf_size = None;
462        self
463    }
464
465    /// Set the maximum buffer size for the connection.
466    ///
467    /// Default is ~400kb.
468    ///
469    /// Note that setting this option unsets the `read_exact_buf_size` option.
470    ///
471    /// # Panics
472    ///
473    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
474    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
475        assert!(
476            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
477            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
478        );
479
480        self.h1_max_buf_size = Some(max);
481        self.h1_read_buf_exact_size = None;
482        self
483    }
484
485    /// Constructs a connection with the configured options and IO.
486    /// See [`client::conn`](crate::client::conn) for more.
487    ///
488    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
489    /// do nothing.
490    pub fn handshake<T, B>(
491        &self,
492        io: T,
493    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
494    where
495        T: Read + Write + Unpin + 'static,
496        B: Body + 'static,
497        B::Data: Send,
498        B::Error: Into<Box<dyn StdError + Send + Sync>>,
499    {
500        let opts = self.clone();
501
502        async move {
503            trace!("client handshake HTTP/1");
504
505            let (tx, rx) = dispatch::channel();
506            let mut conn = proto::Conn::new(io);
507            conn.set_h1_parser_config(opts.h1_parser_config);
508            if let Some(writev) = opts.h1_writev {
509                if writev {
510                    conn.set_write_strategy_queue();
511                } else {
512                    conn.set_write_strategy_flatten();
513                }
514            }
515            if opts.h1_title_case_headers {
516                conn.set_title_case_headers();
517            }
518            if opts.h1_preserve_header_case {
519                conn.set_preserve_header_case();
520            }
521            #[cfg(feature = "ffi")]
522            if opts.h1_preserve_header_order {
523                conn.set_preserve_header_order();
524            }
525
526            if opts.h09_responses {
527                conn.set_h09_responses();
528            }
529
530            if let Some(sz) = opts.h1_read_buf_exact_size {
531                conn.set_read_buf_exact_size(sz);
532            }
533            if let Some(max) = opts.h1_max_buf_size {
534                conn.set_max_buf_size(max);
535            }
536            let cd = proto::h1::dispatch::Client::new(rx);
537            let proto = proto::h1::Dispatcher::new(cd, conn);
538
539            Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
540        }
541    }
542}
543
544mod upgrades {
545    use crate::upgrade::Upgraded;
546
547    use super::*;
548
549    // A future binding a connection with a Service with Upgrade support.
550    //
551    // This type is unnameable outside the crate.
552    #[must_use = "futures do nothing unless polled"]
553    #[allow(missing_debug_implementations)]
554    pub struct UpgradeableConnection<T, B>
555    where
556        T: Read + Write + Unpin + Send + 'static,
557        B: Body + 'static,
558        B::Error: Into<Box<dyn StdError + Send + Sync>>,
559    {
560        pub(super) inner: Option<Connection<T, B>>,
561    }
562
563    impl<I, B> Future for UpgradeableConnection<I, B>
564    where
565        I: Read + Write + Unpin + Send + 'static,
566        B: Body + 'static,
567        B::Data: Send,
568        B::Error: Into<Box<dyn StdError + Send + Sync>>,
569    {
570        type Output = crate::Result<()>;
571
572        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
573            match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
574                Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
575                Ok(proto::Dispatched::Upgrade(pending)) => {
576                    let Parts {
577                        io,
578                        read_buf,
579                        _inner,
580                    } = self.inner.take().unwrap().into_parts();
581                    pending.fulfill(Upgraded::new(io, read_buf));
582                    Poll::Ready(Ok(()))
583                }
584                Err(e) => Poll::Ready(Err(e)),
585            }
586        }
587    }
588}