Skip to main content

wreq_proto/conn/
http2.rs

1//! HTTP/2 client connections
2
3use std::{
4    future::Future,
5    marker::PhantomData,
6    pin::Pin,
7    sync::Arc,
8    task::{ready, Context, Poll},
9};
10
11use http::{Request, Response};
12use http_body::Body;
13use tokio::io::{AsyncRead, AsyncWrite};
14
15use crate::{
16    body::Incoming,
17    dispatch::{self, TrySendError},
18    error::{BoxError, Error},
19    proto::{
20        self,
21        http2::{ping, Http2Options},
22    },
23    rt::{bounds::Http2ClientConnExec, Time, Timer},
24    Result,
25};
26
27/// The sender side of an established connection.
28pub struct SendRequest<B> {
29    dispatch: dispatch::UnboundedSender<Request<B>, Response<Incoming>>,
30}
31
32impl<B> Clone for SendRequest<B> {
33    #[inline]
34    fn clone(&self) -> SendRequest<B> {
35        SendRequest {
36            dispatch: self.dispatch.clone(),
37        }
38    }
39}
40
41/// A future that processes all HTTP state for the IO object.
42///
43/// In most cases, this should just be spawned into an executor, so that it
44/// can process incoming and outgoing messages, notice hangups, and the like.
45#[must_use = "futures do nothing unless polled"]
46pub struct Connection<T, B, E>
47where
48    T: AsyncRead + AsyncWrite + Unpin,
49    B: Body + 'static,
50    E: Http2ClientConnExec<B, T> + Unpin,
51    B::Error: Into<BoxError>,
52{
53    inner: (PhantomData<T>, proto::http2::client::ClientTask<B, E, T>),
54}
55
56/// A builder to configure an HTTP connection.
57///
58/// After setting options, the builder is used to create a handshake future.
59///
60/// **Note**: The default values of options are *not considered stable*. They
61/// are subject to change at any time.
62#[derive(Clone)]
63pub struct Builder<Ex> {
64    exec: Ex,
65    timer: Time,
66    opts: Http2Options,
67}
68
69// ===== impl SendRequest
70
71impl<B> SendRequest<B> {
72    /// Polls to determine whether this sender can be used yet for a request.
73    ///
74    /// If the associated connection is closed, this returns an Error.
75    #[inline]
76    pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
77        if self.is_closed() {
78            Poll::Ready(Err(Error::new_closed()))
79        } else {
80            Poll::Ready(Ok(()))
81        }
82    }
83
84    /// Waits until the dispatcher is ready
85    ///
86    /// If the associated connection is closed, this returns an Error.
87    #[inline]
88    pub async fn ready(&mut self) -> Result<()> {
89        std::future::poll_fn(|cx| self.poll_ready(cx)).await
90    }
91
92    /// Checks if the connection is currently ready to send a request.
93    ///
94    /// # Note
95    ///
96    /// This is mostly a hint. Due to inherent latency of networks, it is
97    /// possible that even after checking this is ready, sending a request
98    /// may still fail because the connection was closed in the meantime.
99    #[inline]
100    pub fn is_ready(&self) -> bool {
101        self.dispatch.is_ready()
102    }
103
104    /// Checks if the connection side has been closed.
105    #[inline]
106    pub fn is_closed(&self) -> bool {
107        self.dispatch.is_closed()
108    }
109}
110
111impl<B> SendRequest<B>
112where
113    B: Body + 'static,
114{
115    /// Sends a `Request` on the associated connection.
116    ///
117    /// Returns a future that if successful, yields the `Response`.
118    ///
119    /// # Error
120    ///
121    /// If there was an error before trying to serialize the request to the
122    /// connection, the message will be returned as part of this error.
123    pub fn try_send_request(
124        &mut self,
125        req: Request<B>,
126    ) -> impl Future<Output = Result<Response<Incoming>, TrySendError<Request<B>>>> {
127        let sent = self.dispatch.try_send(req);
128        async move {
129            match sent {
130                Ok(rx) => match rx.await {
131                    Ok(Ok(res)) => Ok(res),
132                    Ok(Err(err)) => Err(err),
133                    // this is definite bug if it happens, but it shouldn't happen!
134                    Err(_) => panic!("dispatch dropped without returning error"),
135                },
136                Err(req) => {
137                    debug!("connection was not ready");
138                    let error = Error::new_canceled().with("connection was not ready");
139                    Err(TrySendError {
140                        error,
141                        message: Some(req),
142                    })
143                }
144            }
145        }
146    }
147}
148
149// ===== impl Connection
150
151impl<T, B, E> Future for Connection<T, B, E>
152where
153    T: AsyncRead + AsyncWrite + Unpin + 'static,
154    B: Body + 'static + Unpin,
155    B::Data: Send,
156    E: Unpin,
157    B::Error: Into<BoxError>,
158    E: Http2ClientConnExec<B, T> + Unpin,
159{
160    type Output = Result<()>;
161
162    #[inline]
163    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164        match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
165            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
166            proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
167        }
168    }
169}
170
171// ===== impl Builder
172
173impl<Ex> Builder<Ex>
174where
175    Ex: Clone,
176{
177    /// Creates a new connection builder.
178    #[inline]
179    pub fn new(exec: Ex) -> Builder<Ex> {
180        Builder {
181            exec,
182            timer: Time::Empty,
183            opts: Default::default(),
184        }
185    }
186
187    /// Provide a timer to execute background HTTP2 tasks.
188    #[inline]
189    pub fn timer<M>(mut self, timer: M) -> Self
190    where
191        M: Timer + Send + Sync + 'static,
192    {
193        self.timer = Time::Timer(Arc::new(timer));
194        self
195    }
196
197    /// Provide a options configuration for the HTTP/2 connection.
198    #[inline]
199    pub fn options(mut self, opts: Http2Options) -> Self {
200        self.opts = opts;
201        self
202    }
203
204    /// Constructs a connection with the configured options and IO.
205    ///
206    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
207    /// do nothing.
208    pub async fn handshake<T, B>(self, io: T) -> Result<(SendRequest<B>, Connection<T, B, Ex>)>
209    where
210        T: AsyncRead + AsyncWrite + Unpin,
211        B: Body + 'static,
212        B::Data: Send,
213        B::Error: Into<BoxError>,
214        Ex: Http2ClientConnExec<B, T> + Unpin,
215    {
216        trace!("client handshake HTTP/2");
217
218        // Crate the HTTP/2 client with the provided options.
219        let mut builder = http2::client::Builder::default();
220        builder
221            .initial_max_send_streams(self.opts.initial_max_send_streams)
222            .initial_window_size(self.opts.initial_window_size)
223            .initial_connection_window_size(self.opts.initial_conn_window_size)
224            .max_send_buffer_size(self.opts.max_send_buffer_size);
225        if let Some(id) = self.opts.initial_stream_id {
226            builder.initial_stream_id(id);
227        }
228        if let Some(max) = self.opts.max_pending_accept_reset_streams {
229            builder.max_pending_accept_reset_streams(max);
230        }
231        if let Some(max) = self.opts.max_concurrent_reset_streams {
232            builder.max_concurrent_reset_streams(max);
233        }
234        if let Some(max) = self.opts.max_concurrent_streams {
235            builder.max_concurrent_streams(max);
236        }
237        if let Some(max) = self.opts.max_header_list_size {
238            builder.max_header_list_size(max);
239        }
240        if let Some(opt) = self.opts.enable_push {
241            builder.enable_push(opt);
242        }
243        if let Some(max) = self.opts.max_frame_size {
244            builder.max_frame_size(max);
245        }
246        if let Some(max) = self.opts.header_table_size {
247            builder.header_table_size(max);
248        }
249        if let Some(v) = self.opts.enable_connect_protocol {
250            builder.enable_connect_protocol(v);
251        }
252        if let Some(v) = self.opts.no_rfc7540_priorities {
253            builder.no_rfc7540_priorities(v);
254        }
255        if let Some(order) = self.opts.settings_order {
256            builder.settings_order(order);
257        }
258        if let Some(stream_dependency) = self.opts.headers_stream_dependency {
259            builder.headers_stream_dependency(stream_dependency);
260        }
261        if let Some(order) = self.opts.headers_pseudo_order {
262            builder.headers_pseudo_order(order);
263        }
264        if let Some(priority) = self.opts.priorities {
265            builder.priorities(priority);
266        }
267
268        // Create the ping configuration for the connection.
269        let ping_config = ping::Config::new(
270            self.opts.adaptive_window,
271            self.opts.initial_window_size,
272            self.opts.keep_alive_interval,
273            self.opts.keep_alive_timeout,
274            self.opts.keep_alive_while_idle,
275        );
276
277        let (tx, rx) = dispatch::channel();
278        let h2 =
279            proto::http2::client::handshake(io, rx, builder, ping_config, self.exec, self.timer)
280                .await?;
281        Ok((
282            SendRequest {
283                dispatch: tx.unbound(),
284            },
285            Connection {
286                inner: (PhantomData, h2),
287            },
288        ))
289    }
290}