Skip to main content

wreq_proto/conn/
http1.rs

1//! HTTP/1 client connections
2
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{ready, Context, Poll},
7};
8
9use bytes::Bytes;
10use http::{Request, Response};
11use http_body::Body;
12use httparse::ParserConfig;
13use tokio::io::{AsyncRead, AsyncWrite};
14
15use crate::{
16    body::Incoming,
17    dispatch::{self, TrySendError},
18    error::BoxError,
19    proto::{
20        self,
21        http1::{self, conn::Conn, role::Client, Http1Options},
22    },
23    Error, Result,
24};
25
26/// The sender side of an established connection.
27pub struct SendRequest<B> {
28    dispatch: dispatch::Sender<Request<B>, Response<Incoming>>,
29}
30
31/// Deconstructed parts of a `Connection`.
32///
33/// This allows taking apart a `Connection` at a later time, in order to
34/// reclaim the IO object, and additional related pieces.
35#[derive(Debug)]
36#[non_exhaustive]
37pub struct Parts<T> {
38    /// The original IO object used in the handshake.
39    pub io: T,
40    /// A buffer of bytes that have been read but not processed as HTTP.
41    ///
42    /// For instance, if the `Connection` is used for an HTTP upgrade request,
43    /// it is possible the server sent back the first bytes of the new protocol
44    /// along with the response upgrade.
45    ///
46    /// You will want to check for any existing bytes if you plan to continue
47    /// communicating on the IO object.
48    pub read_buf: Bytes,
49}
50
51/// A future that processes all HTTP state for the IO object.
52///
53/// In most cases, this should just be spawned into an executor, so that it
54/// can process incoming and outgoing messages, notice hangups, and the like.
55#[must_use = "futures do nothing unless polled"]
56pub struct Connection<T, B>
57where
58    T: AsyncRead + AsyncWrite,
59    B: Body + 'static,
60{
61    inner: http1::dispatch::Dispatcher<http1::dispatch::Client<B>, B, T, Client>,
62}
63
64impl<T, B> Connection<T, B>
65where
66    T: AsyncRead + AsyncWrite + Unpin,
67    B: Body + 'static,
68    B::Error: Into<BoxError>,
69{
70    /// Return the inner IO object, and additional information.
71    ///
72    /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
73    #[inline]
74    pub fn into_parts(self) -> Parts<T> {
75        let (io, read_buf, _) = self.inner.into_inner();
76        Parts { io, read_buf }
77    }
78}
79
80/// A builder to configure an HTTP connection.
81///
82/// After setting options, the builder is used to create a handshake future.
83///
84/// **Note**: The default values of options are *not considered stable*. They
85/// are subject to change at any time.
86#[derive(Debug, Default, Clone)]
87pub struct Builder {
88    opts: Http1Options,
89}
90
91// ===== impl SendRequest
92
93impl<B> SendRequest<B> {
94    /// Polls to determine whether this sender can be used yet for a request.
95    ///
96    /// If the associated connection is closed, this returns an Error.
97    #[inline]
98    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
99        self.dispatch.poll_ready(cx)
100    }
101
102    /// Waits until the dispatcher is ready
103    ///
104    /// If the associated connection is closed, this returns an Error.
105    #[inline]
106    pub async fn ready(&mut self) -> Result<()> {
107        std::future::poll_fn(|cx| self.poll_ready(cx)).await
108    }
109
110    /// Checks if the connection is currently ready to send a request.
111    ///
112    /// # Note
113    ///
114    /// This is mostly a hint. Due to inherent latency of networks, it is
115    /// possible that even after checking this is ready, sending a request
116    /// may still fail because the connection was closed in the meantime.
117    #[inline]
118    pub fn is_ready(&self) -> bool {
119        self.dispatch.is_ready()
120    }
121}
122
123impl<B> SendRequest<B>
124where
125    B: Body + 'static,
126{
127    /// Sends a `Request` on the associated connection.
128    ///
129    /// Returns a future that if successful, yields the `Response`.
130    ///
131    /// # Error
132    ///
133    /// If there was an error before trying to serialize the request to the
134    /// connection, the message will be returned as part of this error.
135    pub fn try_send_request(
136        &mut self,
137        req: Request<B>,
138    ) -> impl Future<Output = Result<Response<Incoming>, TrySendError<Request<B>>>> {
139        let sent = self.dispatch.try_send(req);
140        async move {
141            match sent {
142                Ok(rx) => match rx.await {
143                    Ok(res) => res,
144                    // this is definite bug if it happens, but it shouldn't happen!
145                    Err(_) => panic!("dispatch dropped without returning error"),
146                },
147                Err(req) => {
148                    debug!("connection was not ready");
149                    Err(TrySendError {
150                        error: Error::new_canceled().with("connection was not ready"),
151                        message: Some(req),
152                    })
153                }
154            }
155        }
156    }
157}
158
159// ===== impl Connection
160
161impl<T, B> Connection<T, B>
162where
163    T: AsyncRead + AsyncWrite + Unpin + Send,
164    B: Body + 'static,
165    B::Error: Into<BoxError>,
166{
167    /// Enable this connection to support higher-level HTTP upgrades.
168    #[inline]
169    pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
170        upgrades::UpgradeableConnection { inner: Some(self) }
171    }
172
173    /// Poll the connection for completion, but without calling `shutdown`
174    /// on the underlying IO.
175    ///
176    /// This is useful to allow running a connection while doing an HTTP
177    /// upgrade. Once the upgrade is completed, the connection would be "done",
178    /// but it is not desired to actually shutdown the IO object. Instead you
179    /// would take it back using `into_parts`.
180    ///
181    /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
182    /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
183    /// to work with this function; or use the `without_shutdown` wrapper.
184    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
185        self.inner.poll_without_shutdown(cx)
186    }
187
188    /// Prevent shutdown of the underlying IO object at the end of service the request,
189    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
190    pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
191        let mut conn = Some(self);
192        std::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
193            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
194            Poll::Ready(Ok(conn.take().unwrap().into_parts()))
195        })
196        .await
197    }
198}
199
200impl<T, B> Future for Connection<T, B>
201where
202    T: AsyncRead + AsyncWrite + Unpin,
203    B: Body + 'static,
204    B::Data: Send,
205    B::Error: Into<BoxError>,
206{
207    type Output = Result<()>;
208
209    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210        match ready!(Pin::new(&mut self.inner).poll(cx))? {
211            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
212            proto::Dispatched::Upgrade(pending) => {
213                // With no `Send` bound on `I`, we can't try to do
214                // upgrades here. In case a user was trying to use
215                // `upgrade` with this API, send a special
216                // error letting them know about that.
217                pending.manual();
218                Poll::Ready(Ok(()))
219            }
220        }
221    }
222}
223
224// ===== impl Builder
225
226impl Builder {
227    /// Provide a options configuration for the HTTP/1 connection.
228    #[inline]
229    pub fn options(mut self, opts: Http1Options) -> Self {
230        self.opts = opts;
231        self
232    }
233
234    /// Constructs a connection with the configured options and IO.
235    ///
236    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
237    /// do nothing.
238    pub async fn handshake<T, B>(self, io: T) -> Result<(SendRequest<B>, Connection<T, B>)>
239    where
240        T: AsyncRead + AsyncWrite + Unpin,
241        B: Body + 'static,
242        B::Data: Send,
243        B::Error: Into<BoxError>,
244    {
245        trace!("client handshake HTTP/1");
246
247        let (tx, rx) = dispatch::channel();
248        let mut conn = Conn::new(io);
249
250        // Set the HTTP/1 parser configuration
251        let h1_parser_config = {
252            let mut h1_parser_config = ParserConfig::default();
253            h1_parser_config
254                .ignore_invalid_headers_in_responses(self.opts.ignore_invalid_headers_in_responses)
255                .allow_spaces_after_header_name_in_responses(
256                    self.opts.allow_spaces_after_header_name_in_responses,
257                )
258                .allow_obsolete_multiline_headers_in_responses(
259                    self.opts.allow_obsolete_multiline_headers_in_responses,
260                );
261            h1_parser_config
262        };
263        conn.set_h1_parser_config(h1_parser_config);
264
265        // Set the h1 write strategy
266        if let Some(writev) = self.opts.h1_writev {
267            if writev {
268                conn.set_write_strategy_queue();
269            } else {
270                conn.set_write_strategy_flatten();
271            }
272        }
273
274        // Set the maximum size of the request headers
275        if let Some(max_headers) = self.opts.h1_max_headers {
276            conn.set_http1_max_headers(max_headers);
277        }
278
279        // Enable HTTP/0.9 responses if requested
280        if self.opts.h09_responses {
281            conn.set_h09_responses();
282        }
283
284        // Set the read buffer size if specified
285        if let Some(sz) = self.opts.h1_read_buf_exact_size {
286            conn.set_read_buf_exact_size(sz);
287        }
288
289        // Set the maximum buffer size for HTTP/1 connections
290        if let Some(max) = self.opts.h1_max_buf_size {
291            conn.set_max_buf_size(max);
292        }
293
294        let cd = http1::dispatch::Client::new(rx);
295        let proto = http1::dispatch::Dispatcher::new(cd, conn);
296
297        Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
298    }
299}
300
301mod upgrades {
302    use super::*;
303    use crate::upgrade::Upgraded;
304
305    // A future binding a connection with a Service with Upgrade support.
306    //
307    // This type is unnameable outside the crate.
308    #[must_use = "futures do nothing unless polled"]
309    pub struct UpgradeableConnection<T, B>
310    where
311        T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
312        B: Body + 'static,
313        B::Error: Into<BoxError>,
314    {
315        pub(super) inner: Option<Connection<T, B>>,
316    }
317
318    impl<I, B> Future for UpgradeableConnection<I, B>
319    where
320        I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
321        B: Body + 'static,
322        B::Data: Send,
323        B::Error: Into<BoxError>,
324    {
325        type Output = Result<()>;
326
327        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
328            match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
329                Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
330                Ok(proto::Dispatched::Upgrade(pending)) => {
331                    let Parts { io, read_buf } = self.inner.take().unwrap().into_parts();
332                    pending.fulfill(Upgraded::new(io, read_buf));
333                    Poll::Ready(Ok(()))
334                }
335                Err(e) => Poll::Ready(Err(e)),
336            }
337        }
338    }
339}