monoio_http/h2/
client.rs

1//! Client implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 client requires the caller to establish the underlying
6//! connection as well as get the connection to a state that is ready to begin
7//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Monoio's [`TcpStream`] to connect to a remote
11//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12//!
13//! Once a connection is obtained, it is passed to [`handshake`], which will
14//! begin the [HTTP/2 handshake]. This returns a future that completes once
15//! the handshake process is performed and HTTP/2 streams may be initialized.
16//!
17//! [`handshake`] uses default configuration values. There are a number of
18//! settings that can be changed by using [`Builder`] instead.
19//!
20//! Once the handshake future completes, the caller is provided with a
21//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22//! instance is used to drive the connection (see [Managing the connection]).
23//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24//! requests]).
25//!
26//! # Making requests
27//!
28//! Requests are made using the [`SendRequest`] handle provided by the handshake
29//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30//! the request is sent to the server.
31//!
32//! A request body and request trailers are sent using [`SendRequest`] and the
33//! server's response is returned once the [`ResponseFuture`] future completes.
34//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36//! initialized by the sent request.
37//!
38//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39//! stream can be created, i.e. as long as the current number of active streams
40//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41//! caller will be notified once an existing stream closes, freeing capacity for
42//! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43//! capacity before sending a request to the server.
44//!
45//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47//! to do so will result in an [`Error`] being returned.
48//!
49//! # Managing the connection
50//!
51//! The [`Connection`] instance is used to manage connection state. The caller
52//! is required to call [`Connection::poll`] in order to advance state.
53//! [`SendRequest::send_request`] and other functions have no effect unless
54//! [`Connection::poll`] is called.
55//!
56//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57//! returns `Ready`. At this point, the underlying socket has been closed and no
58//! further work needs to be done.
59//!
60//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61//! submit the [`Connection`] instance to an [executor]. The executor will then
62//! manage polling the connection until the connection is complete.
63//! Alternatively, the caller can call `poll` manually.
64//!
65//! # Example
66//!
67//! ```rust, no_run
68//! use std::error::Error;
69//!
70//! use h2::client;
71//! use http::{Method, Request};
72//! use monoio::net::TcpStream;
73//!
74//! #[monoio::main]
75//! pub async fn main() -> Result<(), Box<dyn Error>> {
76//!     // Establish TCP connection to the server.
77//!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
78//!     let (h2, connection) = client::handshake(tcp).await?;
79//!     monoio::spawn(async move {
80//!         connection.await.unwrap();
81//!     });
82//!
83//!     let mut h2 = h2.ready().await?;
84//!     // Prepare the HTTP request to send to the server.
85//!     let request = Request::builder()
86//!         .method(Method::GET)
87//!         .uri("https://www.example.com/")
88//!         .body(())
89//!         .unwrap();
90//!
91//!     // Send the request. The second tuple item allows the caller
92//!     // to stream a request body.
93//!     let (response, _) = h2.send_request(request, true).unwrap();
94//!
95//!     let (head, mut body) = response.await?.into_parts();
96//!
97//!     println!("Received response: {:?}", head);
98//!
99//!     // The `flow_control` handle allows the caller to manage
100//!     // flow control.
101//!     //
102//!     // Whenever data is received, the caller is responsible for
103//!     // releasing capacity back to the server once it has freed
104//!     // the data from memory.
105//!     let mut flow_control = body.flow_control().clone();
106//!
107//!     while let Some(chunk) = body.data().await {
108//!         let chunk = chunk?;
109//!         println!("RX: {:?}", chunk);
110//!
111//!         // Let the server send more data.
112//!         let _ = flow_control.release_capacity(chunk.len());
113//!     }
114//!
115//!     Ok(())
116//! }
117//! ```
118//!
119//! [`TcpStream`]: https://docs.rs/monoio/latest/monoio/net/tcp/struct.TcpStream.html
120//! [`handshake`]: fn.handshake.html
121//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
122//! [`SendRequest`]: struct.SendRequest.html
123//! [`SendStream`]: ../struct.SendStream.html
124//! [Making requests]: #making-requests
125//! [Managing the connection]: #managing-the-connection
126//! [`Connection`]: struct.Connection.html
127//! [`Connection::poll`]: struct.Connection.html#method.poll
128//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
129//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
130//! [`SendRequest`]: struct.SendRequest.html
131//! [`ResponseFuture`]: struct.ResponseFuture.html
132//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
133//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
134//! [`Builder`]: struct.Builder.html
135//! [`Error`]: ../struct.Error.html
136
137use std::{
138    fmt,
139    future::Future,
140    pin::Pin,
141    task::{Context, Poll},
142    time::Duration,
143};
144
145use bytes::{Buf, Bytes};
146use http::{uri, HeaderMap, Method, Request, Response, Version};
147use monoio::io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt};
148use tracing::Instrument;
149
150use crate::{
151    common::error::HttpError,
152    h2::{
153        codec::{Codec, SendError, UserError},
154        ext::Protocol,
155        frame::{Headers, Pseudo, Reason, Settings, StreamId},
156        proto::{self, Error},
157        FlowControl, PingPong, RecvStream, SendStream,
158    },
159};
160
161/// Initializes new HTTP/2 streams on a connection by sending a request.
162///
163/// This type does no work itself. Instead, it is a handle to the inner
164/// connection state held by [`Connection`]. If the associated connection
165/// instance is dropped, all `SendRequest` functions will return [`Error`].
166///
167/// [`SendRequest`] instances are able to move to and operate on separate tasks
168/// / threads than their associated [`Connection`] instance. Internally, there
169/// is a buffer used to stage requests before they get written to the
170/// connection. There is no guarantee that requests get written to the
171/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
172///
173/// [`SendRequest`] implements [`Clone`], enabling the creation of many
174/// instances that are backed by a single connection.
175///
176/// See [module] level documentation for more details.
177///
178/// [module]: index.html
179/// [`Connection`]: struct.Connection.html
180/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
181/// [`Error`]: ../struct.Error.html
182pub struct SendRequest<B: Buf> {
183    inner: proto::Streams<B, Peer>,
184    pending: Option<proto::OpaqueStreamRef>,
185}
186
187/// Returns a `SendRequest` instance once it is ready to send at least one
188/// request.
189#[derive(Debug)]
190pub struct ReadySendRequest<B: Buf> {
191    inner: Option<SendRequest<B>>,
192}
193
194/// Manages all state associated with an HTTP/2 client connection.
195///
196/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
197/// implements the HTTP/2 client logic for that connection. It is responsible
198/// for driving the internal state forward, performing the work requested of the
199/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
200/// [`RecvStream`]).
201///
202/// `Connection` values are created by calling [`handshake`]. Once a
203/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
204/// until `Ready` is returned. The easiest way to do this is to submit the
205/// `Connection` instance to an [executor].
206///
207/// [module]: index.html
208/// [`handshake`]: fn.handshake.html
209/// [`SendRequest`]: struct.SendRequest.html
210/// [`ResponseFuture`]: struct.ResponseFuture.html
211/// [`SendStream`]: ../struct.SendStream.html
212/// [`RecvStream`]: ../struct.RecvStream.html
213/// [`poll`]: #method.poll
214/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
215///
216/// # Examples
217///
218/// ```
219/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
220/// # use h2::client;
221/// # use h2::client::*;
222/// #
223/// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
224/// # where T: AsyncReadRent + AsyncWriteRent + Send + Unpin + 'static,
225/// # {
226/// let (send_request, connection) = client::handshake(my_io).await?;
227/// // Submit the connection handle to an executor.
228/// monoio::spawn(async {
229///     connection.await.expect("connection failed");
230/// });
231///
232/// // Now, use `send_request` to initialize HTTP/2 streams.
233/// // ...
234/// # Ok(())
235/// # }
236/// #
237/// # pub fn main() {}
238/// ```
239#[must_use = "futures do nothing unless polled"]
240pub struct Connection<T, B: Buf = Bytes> {
241    inner: proto::Connection<T, Peer, B>,
242}
243
244/// A future of an HTTP response.
245#[derive(Debug)]
246#[must_use = "futures do nothing unless polled"]
247pub struct ResponseFuture {
248    inner: proto::OpaqueStreamRef,
249    push_promise_consumed: bool,
250}
251
252/// A future of a pushed HTTP response.
253///
254/// We have to differentiate between pushed and non pushed because of the spec
255/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
256/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
257/// > that is in either the "open" or "half-closed (remote)" state.
258#[derive(Debug)]
259#[must_use = "futures do nothing unless polled"]
260pub struct PushedResponseFuture {
261    inner: ResponseFuture,
262}
263
264/// A pushed response and corresponding request headers
265#[derive(Debug)]
266pub struct PushPromise {
267    /// The request headers
268    request: Request<()>,
269
270    /// The pushed response
271    response: PushedResponseFuture,
272}
273
274/// A stream of pushed responses and corresponding promised requests
275#[derive(Debug)]
276#[must_use = "streams do nothing unless polled"]
277pub struct PushPromises {
278    inner: proto::OpaqueStreamRef,
279}
280
281/// Builds client connections with custom configuration values.
282///
283/// Methods can be chained in order to set the configuration values.
284///
285/// The client is constructed by calling [`handshake`] and passing the I/O
286/// handle that will back the HTTP/2 server.
287///
288/// New instances of `Builder` are obtained via [`Builder::new`].
289///
290/// See function level documentation for details on the various client
291/// configuration settings.
292///
293/// [`Builder::new`]: struct.Builder.html#method.new
294/// [`handshake`]: struct.Builder.html#method.handshake
295///
296/// # Examples
297///
298/// ```
299/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
300/// # use h2::client::*;
301/// # use bytes::Bytes;
302/// #
303/// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
304///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
305/// # {
306/// // `client_fut` is a future representing the completion of the HTTP/2
307/// // handshake.
308/// let client_fut = Builder::new()
309///     .initial_window_size(1_000_000)
310///     .max_concurrent_streams(1000)
311///     .handshake(my_io);
312/// # client_fut.await
313/// # }
314/// #
315/// # pub fn main() {}
316/// ```
317#[derive(Clone, Debug)]
318pub struct Builder {
319    /// Time to keep locally reset streams around before reaping.
320    reset_stream_duration: Duration,
321
322    /// Initial maximum number of locally initiated (send) streams.
323    /// After receiving a Settings frame from the remote peer,
324    /// the connection will overwrite this value with the
325    /// MAX_CONCURRENT_STREAMS specified in the frame.
326    initial_max_send_streams: usize,
327
328    /// Initial target window size for new connections.
329    initial_target_connection_window_size: Option<u32>,
330
331    /// Maximum amount of bytes to "buffer" for writing per stream.
332    max_send_buffer_size: usize,
333
334    /// Maximum number of locally reset streams to keep at a time.
335    reset_stream_max: usize,
336
337    /// Initial `Settings` frame to send as part of the handshake.
338    settings: Settings,
339
340    /// The stream ID of the first (lowest) stream. Subsequent streams will use
341    /// monotonically increasing stream IDs.
342    stream_id: StreamId,
343}
344
345#[derive(Debug)]
346pub(crate) struct Peer;
347
348// ===== impl SendRequest =====
349
350impl<B> SendRequest<B>
351where
352    B: Buf,
353{
354    /// Returns `Ready` when the connection can initialize a new HTTP/2
355    /// stream.
356    ///
357    /// This function must return `Ready` before `send_request` is called. When
358    /// `Poll::Pending` is returned, the task will be notified once the readiness
359    /// state changes.
360    ///
361    /// See [module] level docs for more details.
362    ///
363    /// [module]: index.html
364    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::h2::Error>> {
365        ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
366        self.pending = None;
367        Poll::Ready(Ok(()))
368    }
369
370    /// Consumes `self`, returning a future that returns `self` back once it is
371    /// ready to send a request.
372    ///
373    /// This function should be called before calling `send_request`.
374    ///
375    /// This is a functional combinator for [`poll_ready`]. The returned future
376    /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
377    /// the caller.
378    ///
379    /// # Examples
380    ///
381    /// ```rust
382    /// # use h2::client::*;
383    /// # use http::*;
384    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
385    /// # {
386    /// // First, wait until the `send_request` handle is ready to send a new
387    /// // request
388    /// let mut send_request = send_request.ready().await.unwrap();
389    /// // Use `send_request` here.
390    /// # }
391    /// # pub fn main() {}
392    /// ```
393    ///
394    /// See [module] level docs for more details.
395    ///
396    /// [`poll_ready`]: #method.poll_ready
397    /// [module]: index.html
398    pub fn ready(self) -> ReadySendRequest<B> {
399        ReadySendRequest { inner: Some(self) }
400    }
401
402    /// Sends a HTTP/2 request to the server.
403    ///
404    /// `send_request` initializes a new HTTP/2 stream on the associated
405    /// connection, then sends the given request using this new stream. Only the
406    /// request head is sent.
407    ///
408    /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
409    /// are returned. The [`ResponseFuture`] instance is used to get the
410    /// server's response and the [`SendStream`] instance is used to send a
411    /// request body or trailers to the server over the same HTTP/2 stream.
412    ///
413    /// To send a request body or trailers, set `end_of_stream` to `false`.
414    /// Then, use the returned [`SendStream`] instance to stream request body
415    /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
416    /// then attempting to call [`SendStream::send_data`] or
417    /// [`SendStream::send_trailers`] will result in an error.
418    ///
419    /// If no request body or trailers are to be sent, set `end_of_stream` to
420    /// `true` and drop the returned [`SendStream`] instance.
421    ///
422    /// # A note on HTTP versions
423    ///
424    /// The provided `Request` will be encoded differently depending on the
425    /// value of its version field. If the version is set to 2.0, then the
426    /// request is encoded as per the specification recommends.
427    ///
428    /// If the version is set to a lower value, then the request is encoded to
429    /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
430    /// headers are permitted and the `:authority` pseudo header is not
431    /// included.
432    ///
433    /// The caller should always set the request's version field to 2.0 unless
434    /// specifically transmitting an HTTP 1.1 request over 2.0.
435    ///
436    /// # Examples
437    ///
438    /// Sending a request with no body
439    ///
440    /// ```rust
441    /// # use h2::client::*;
442    /// # use http::*;
443    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
444    /// # {
445    /// // First, wait until the `send_request` handle is ready to send a new
446    /// // request
447    /// let mut send_request = send_request.ready().await.unwrap();
448    /// // Prepare the HTTP request to send to the server.
449    /// let request = Request::get("https://www.example.com/").body(()).unwrap();
450    ///
451    /// // Send the request to the server. Since we are not sending a
452    /// // body or trailers, we can drop the `SendStream` instance.
453    /// let (response, _) = send_request.send_request(request, true).unwrap();
454    /// let response = response.await.unwrap();
455    /// // Process the response
456    /// # }
457    /// # pub fn main() {}
458    /// ```
459    ///
460    /// Sending a request with a body and trailers
461    ///
462    /// ```rust
463    /// # use h2::client::*;
464    /// # use http::*;
465    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
466    /// # {
467    /// // First, wait until the `send_request` handle is ready to send a new
468    /// // request
469    /// let mut send_request = send_request.ready().await.unwrap();
470    ///
471    /// // Prepare the HTTP request to send to the server.
472    /// let request = Request::get("https://www.example.com/").body(()).unwrap();
473    ///
474    /// // Send the request to the server. If we are not sending a
475    /// // body or trailers, we can drop the `SendStream` instance.
476    /// let (response, mut send_stream) = send_request.send_request(request, false).unwrap();
477    ///
478    /// // At this point, one option would be to wait for send capacity.
479    /// // Doing so would allow us to not hold data in memory that
480    /// // cannot be sent. However, this is not a requirement, so this
481    /// // example will skip that step. See `SendStream` documentation
482    /// // for more details.
483    /// send_stream.send_data(b"hello", false).unwrap();
484    /// send_stream.send_data(b"world", false).unwrap();
485    ///
486    /// // Send the trailers.
487    /// let mut trailers = HeaderMap::new();
488    /// trailers.insert(
489    ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
490    ///     header::HeaderValue::from_bytes(b"hello").unwrap(),
491    /// );
492    ///
493    /// send_stream.send_trailers(trailers).unwrap();
494    ///
495    /// let response = response.await.unwrap();
496    /// // Process the response
497    /// # }
498    /// # pub fn main() {}
499    /// ```
500    ///
501    /// [`ResponseFuture`]: struct.ResponseFuture.html
502    /// [`SendStream`]: ../struct.SendStream.html
503    /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
504    /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
505    pub fn send_request(
506        &mut self,
507        request: Request<()>,
508        end_of_stream: bool,
509    ) -> Result<(ResponseFuture, SendStream<B>), crate::h2::Error> {
510        self.inner
511            .send_request(request, end_of_stream, self.pending.as_ref())
512            .map_err(Into::into)
513            .map(|stream| {
514                if stream.is_pending_open() {
515                    self.pending = Some(stream.clone_to_opaque());
516                }
517
518                let response = ResponseFuture {
519                    inner: stream.clone_to_opaque(),
520                    push_promise_consumed: false,
521                };
522
523                let stream = SendStream::new(stream);
524
525                (response, stream)
526            })
527    }
528
529    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
530    ///
531    /// This setting is configured by the server peer by sending the
532    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
533    /// This method returns the currently acknowledged value received from the
534    /// remote.
535    ///
536    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
537    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
538    pub fn is_extended_connect_protocol_enabled(&self) -> bool {
539        self.inner.is_extended_connect_protocol_enabled()
540    }
541
542    pub fn has_conn_error(&self) -> bool {
543        self.inner.has_conn_error()
544    }
545
546    pub fn conn_error(&self) -> Option<HttpError> {
547        self.inner.conn_error().map(HttpError::from)
548    }
549}
550
551impl<B> fmt::Debug for SendRequest<B>
552where
553    B: Buf,
554{
555    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
556        fmt.debug_struct("SendRequest").finish()
557    }
558}
559
560impl<B> Clone for SendRequest<B>
561where
562    B: Buf,
563{
564    fn clone(&self) -> Self {
565        SendRequest {
566            inner: self.inner.clone(),
567            pending: None,
568        }
569    }
570}
571
572impl<B> SendRequest<B>
573where
574    B: Buf,
575{
576    /// Returns the number of active streams.
577    ///
578    /// An active stream is a stream that has not yet transitioned to a closed
579    /// state.
580    pub fn num_active_streams(&self) -> usize {
581        self.inner.num_active_streams()
582    }
583
584    /// Returns the number of streams that are held in memory.
585    ///
586    /// A wired stream is a stream that is either active or is closed but must
587    /// stay in memory for some reason. For example, there are still outstanding
588    /// userspace handles pointing to the slot.
589    pub fn num_wired_streams(&self) -> usize {
590        self.inner.num_wired_streams()
591    }
592}
593
594// ===== impl ReadySendRequest =====
595
596impl<B> Future for ReadySendRequest<B>
597where
598    B: Buf,
599{
600    type Output = Result<SendRequest<B>, crate::h2::Error>;
601
602    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
603        match &mut self.inner {
604            Some(send_request) => {
605                ready!(send_request.poll_ready(cx))?;
606            }
607            None => panic!("called `poll` after future completed"),
608        }
609
610        Poll::Ready(Ok(self.inner.take().unwrap()))
611    }
612}
613
614// ===== impl Builder =====
615
616impl Builder {
617    /// Returns a new client builder instance initialized with default
618    /// configuration values.
619    ///
620    /// Configuration methods can be chained on the return value.
621    ///
622    /// # Examples
623    ///
624    /// ```
625    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
626    /// # use h2::client::*;
627    /// # use bytes::Bytes;
628    /// #
629    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
630    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
631    /// # {
632    /// // `client_fut` is a future representing the completion of the HTTP/2
633    /// // handshake.
634    /// let client_fut = Builder::new()
635    ///     .initial_window_size(1_000_000)
636    ///     .max_concurrent_streams(1000)
637    ///     .handshake(my_io);
638    /// # client_fut.await
639    /// # }
640    /// #
641    /// # pub fn main() {}
642    /// ```
643    pub fn new() -> Builder {
644        Builder {
645            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
646            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
647            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
648            initial_target_connection_window_size: None,
649            initial_max_send_streams: usize::MAX,
650            settings: Default::default(),
651            stream_id: 1.into(),
652        }
653    }
654
655    /// Indicates the initial window size (in octets) for stream-level
656    /// flow control for received data.
657    ///
658    /// The initial window of a stream is used as part of flow control. For more
659    /// details, see [`FlowControl`].
660    ///
661    /// The default value is 65,535.
662    ///
663    /// [`FlowControl`]: ../struct.FlowControl.html
664    ///
665    /// # Examples
666    ///
667    /// ```
668    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
669    /// # use h2::client::*;
670    /// # use bytes::Bytes;
671    /// #
672    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
673    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
674    /// # {
675    /// // `client_fut` is a future representing the completion of the HTTP/2
676    /// // handshake.
677    /// let client_fut = Builder::new()
678    ///     .initial_window_size(1_000_000)
679    ///     .handshake(my_io);
680    /// # client_fut.await
681    /// # }
682    /// #
683    /// # pub fn main() {}
684    /// ```
685    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
686        self.settings.set_initial_window_size(Some(size));
687        self
688    }
689
690    /// Indicates the initial window size (in octets) for connection-level flow control
691    /// for received data.
692    ///
693    /// The initial window of a connection is used as part of flow control. For more details,
694    /// see [`FlowControl`].
695    ///
696    /// The default value is 65,535.
697    ///
698    /// [`FlowControl`]: ../struct.FlowControl.html
699    ///
700    /// # Examples
701    ///
702    /// ```
703    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
704    /// # use h2::client::*;
705    /// # use bytes::Bytes;
706    /// #
707    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
708    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
709    /// # {
710    /// // `client_fut` is a future representing the completion of the HTTP/2
711    /// // handshake.
712    /// let client_fut = Builder::new()
713    ///     .initial_connection_window_size(1_000_000)
714    ///     .handshake(my_io);
715    /// # client_fut.await
716    /// # }
717    /// #
718    /// # pub fn main() {}
719    /// ```
720    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
721        self.initial_target_connection_window_size = Some(size);
722        self
723    }
724
725    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
726    /// configured client is able to accept.
727    ///
728    /// The sender may send data frames that are **smaller** than this value,
729    /// but any data larger than `max` will be broken up into multiple `DATA`
730    /// frames.
731    ///
732    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
733    ///
734    /// # Examples
735    ///
736    /// ```
737    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
738    /// # use h2::client::*;
739    /// # use bytes::Bytes;
740    /// #
741    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
742    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
743    /// # {
744    /// // `client_fut` is a future representing the completion of the HTTP/2
745    /// // handshake.
746    /// let client_fut = Builder::new().max_frame_size(1_000_000).handshake(my_io);
747    /// # client_fut.await
748    /// # }
749    /// #
750    /// # pub fn main() {}
751    /// ```
752    ///
753    /// # Panics
754    ///
755    /// This function panics if `max` is not within the legal range specified
756    /// above.
757    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
758        self.settings.set_max_frame_size(Some(max));
759        self
760    }
761
762    /// Sets the max size of received header frames.
763    ///
764    /// This advisory setting informs a peer of the maximum size of header list
765    /// that the sender is prepared to accept, in octets. The value is based on
766    /// the uncompressed size of header fields, including the length of the name
767    /// and value in octets plus an overhead of 32 octets for each header field.
768    ///
769    /// This setting is also used to limit the maximum amount of data that is
770    /// buffered to decode HEADERS frames.
771    ///
772    /// # Examples
773    ///
774    /// ```
775    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
776    /// # use h2::client::*;
777    /// # use bytes::Bytes;
778    /// #
779    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
780    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
781    /// # {
782    /// // `client_fut` is a future representing the completion of the HTTP/2
783    /// // handshake.
784    /// let client_fut = Builder::new()
785    ///     .max_header_list_size(16 * 1024)
786    ///     .handshake(my_io);
787    /// # client_fut.await
788    /// # }
789    /// #
790    /// # pub fn main() {}
791    /// ```
792    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
793        self.settings.set_max_header_list_size(Some(max));
794        self
795    }
796
797    /// Sets the maximum number of concurrent streams.
798    ///
799    /// The maximum concurrent streams setting only controls the maximum number
800    /// of streams that can be initiated by the remote peer. In other words,
801    /// when this setting is set to 100, this does not limit the number of
802    /// concurrent streams that can be created by the caller.
803    ///
804    /// It is recommended that this value be no smaller than 100, so as to not
805    /// unnecessarily limit parallelism. However, any value is legal, including
806    /// 0. If `max` is set to 0, then the remote will not be permitted to
807    /// initiate streams.
808    ///
809    /// Note that streams in the reserved state, i.e., push promises that have
810    /// been reserved but the stream has not started, do not count against this
811    /// setting.
812    ///
813    /// Also note that if the remote *does* exceed the value set here, it is not
814    /// a protocol level error. Instead, the `h2` library will immediately reset
815    /// the stream.
816    ///
817    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
818    ///
819    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
820    ///
821    /// # Examples
822    ///
823    /// ```
824    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
825    /// # use h2::client::*;
826    /// # use bytes::Bytes;
827    /// #
828    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
829    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
830    /// # {
831    /// // `client_fut` is a future representing the completion of the HTTP/2
832    /// // handshake.
833    /// let client_fut = Builder::new().max_concurrent_streams(1000).handshake(my_io);
834    /// # client_fut.await
835    /// # }
836    /// #
837    /// # pub fn main() {}
838    /// ```
839    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
840        self.settings.set_max_concurrent_streams(Some(max));
841        self
842    }
843
844    /// Sets the initial maximum of locally initiated (send) streams.
845    ///
846    /// The initial settings will be overwritten by the remote peer when
847    /// the Settings frame is received. The new value will be set to the
848    /// `max_concurrent_streams()` from the frame.
849    ///
850    /// This setting prevents the caller from exceeding this number of
851    /// streams that are counted towards the concurrency limit.
852    ///
853    /// Sending streams past the limit returned by the peer will be treated
854    /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
855    ///
856    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
857    ///
858    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
859    ///
860    /// # Examples
861    ///
862    /// ```
863    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
864    /// # use h2::client::*;
865    /// # use bytes::Bytes;
866    /// #
867    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
868    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
869    /// # {
870    /// // `client_fut` is a future representing the completion of the HTTP/2
871    /// // handshake.
872    /// let client_fut = Builder::new()
873    ///     .initial_max_send_streams(1000)
874    ///     .handshake(my_io);
875    /// # client_fut.await
876    /// # }
877    /// #
878    /// # pub fn main() {}
879    /// ```
880    pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
881        self.initial_max_send_streams = initial;
882        self
883    }
884
885    /// Sets the maximum number of concurrent locally reset streams.
886    ///
887    /// When a stream is explicitly reset, the HTTP/2 specification requires
888    /// that any further frames received for that stream must be ignored for
889    /// "some time".
890    ///
891    /// In order to satisfy the specification, internal state must be maintained
892    /// to implement the behavior. This state grows linearly with the number of
893    /// streams that are locally reset.
894    ///
895    /// The `max_concurrent_reset_streams` setting configures sets an upper
896    /// bound on the amount of state that is maintained. When this max value is
897    /// reached, the oldest reset stream is purged from memory.
898    ///
899    /// Once the stream has been fully purged from memory, any additional frames
900    /// received for that stream will result in a connection level protocol
901    /// error, forcing the connection to terminate.
902    ///
903    /// The default value is 10.
904    ///
905    /// # Examples
906    ///
907    /// ```
908    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
909    /// # use h2::client::*;
910    /// # use bytes::Bytes;
911    /// #
912    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
913    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
914    /// # {
915    /// // `client_fut` is a future representing the completion of the HTTP/2
916    /// // handshake.
917    /// let client_fut = Builder::new()
918    ///     .max_concurrent_reset_streams(1000)
919    ///     .handshake(my_io);
920    /// # client_fut.await
921    /// # }
922    /// #
923    /// # pub fn main() {}
924    /// ```
925    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
926        self.reset_stream_max = max;
927        self
928    }
929
930    /// Sets the duration to remember locally reset streams.
931    ///
932    /// When a stream is explicitly reset, the HTTP/2 specification requires
933    /// that any further frames received for that stream must be ignored for
934    /// "some time".
935    ///
936    /// In order to satisfy the specification, internal state must be maintained
937    /// to implement the behavior. This state grows linearly with the number of
938    /// streams that are locally reset.
939    ///
940    /// The `reset_stream_duration` setting configures the max amount of time
941    /// this state will be maintained in memory. Once the duration elapses, the
942    /// stream state is purged from memory.
943    ///
944    /// Once the stream has been fully purged from memory, any additional frames
945    /// received for that stream will result in a connection level protocol
946    /// error, forcing the connection to terminate.
947    ///
948    /// The default value is 30 seconds.
949    ///
950    /// # Examples
951    ///
952    /// ```
953    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
954    /// # use h2::client::*;
955    /// # use std::time::Duration;
956    /// # use bytes::Bytes;
957    /// #
958    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
959    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
960    /// # {
961    /// // `client_fut` is a future representing the completion of the HTTP/2
962    /// // handshake.
963    /// let client_fut = Builder::new()
964    ///     .reset_stream_duration(Duration::from_secs(10))
965    ///     .handshake(my_io);
966    /// # client_fut.await
967    /// # }
968    /// #
969    /// # pub fn main() {}
970    /// ```
971    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
972        self.reset_stream_duration = dur;
973        self
974    }
975
976    /// Sets the maximum send buffer size per stream.
977    ///
978    /// Once a stream has buffered up to (or over) the maximum, the stream's
979    /// flow control will not "poll" additional capacity. Once bytes for the
980    /// stream have been written to the connection, the send buffer capacity
981    /// will be freed up again.
982    ///
983    /// The default is currently ~400MB, but may change.
984    ///
985    /// # Panics
986    ///
987    /// This function panics if `max` is larger than `u32::MAX`.
988    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
989        assert!(max <= u32::MAX as usize);
990        self.max_send_buffer_size = max;
991        self
992    }
993
994    /// Enables or disables server push promises.
995    ///
996    /// This value is included in the initial SETTINGS handshake.
997    /// Setting this value to value to
998    /// false in the initial SETTINGS handshake guarantees that the remote server
999    /// will never send a push promise.
1000    ///
1001    /// This setting can be changed during the life of a single HTTP/2
1002    /// connection by sending another settings frame updating the value.
1003    ///
1004    /// Default value: `true`.
1005    ///
1006    /// # Examples
1007    ///
1008    /// ```
1009    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
1010    /// # use h2::client::*;
1011    /// # use std::time::Duration;
1012    /// # use bytes::Bytes;
1013    /// #
1014    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
1015    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1016    /// # {
1017    /// // `client_fut` is a future representing the completion of the HTTP/2
1018    /// // handshake.
1019    /// let client_fut = Builder::new().enable_push(false).handshake(my_io);
1020    /// # client_fut.await
1021    /// # }
1022    /// #
1023    /// # pub fn main() {}
1024    /// ```
1025    pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1026        self.settings.set_enable_push(enabled);
1027        self
1028    }
1029
1030    /// Sets the first stream ID to something other than 1.
1031    #[cfg(feature = "unstable")]
1032    pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1033        self.stream_id = stream_id.into();
1034        assert!(
1035            self.stream_id.is_client_initiated(),
1036            "stream id must be odd"
1037        );
1038        self
1039    }
1040
1041    /// Creates a new configured HTTP/2 client backed by `io`.
1042    ///
1043    /// It is expected that `io` already be in an appropriate state to commence
1044    /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1045    /// preface and the initial settings frame is sent by the client.
1046    ///
1047    /// The handshake future does not wait for the initial settings frame from the
1048    /// server.
1049    ///
1050    /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1051    /// tuple once the HTTP/2 handshake has been completed.
1052    ///
1053    /// This function also allows the caller to configure the send payload data
1054    /// type. See [Outbound data type] for more details.
1055    ///
1056    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1057    /// [`Connection`]: struct.Connection.html
1058    /// [`SendRequest`]: struct.SendRequest.html
1059    /// [Outbound data type]: ../index.html#outbound-data-type.
1060    ///
1061    /// # Examples
1062    ///
1063    /// Basic usage:
1064    ///
1065    /// ```
1066    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
1067    /// # use h2::client::*;
1068    /// # use bytes::Bytes;
1069    /// #
1070    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
1071    ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1072    /// # {
1073    /// // `client_fut` is a future representing the completion of the HTTP/2
1074    /// // handshake.
1075    /// let client_fut = Builder::new()
1076    ///     .handshake(my_io);
1077    /// # client_fut.await
1078    /// # }
1079    /// #
1080    /// # pub fn main() {}
1081    /// ```
1082    ///
1083    /// Configures the send-payload data type. In this case, the outbound data
1084    /// type will be `&'static [u8]`.
1085    ///
1086    /// ```
1087    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
1088    /// # use h2::client::*;
1089    /// #
1090    /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
1091    /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1092    /// # {
1093    /// // `client_fut` is a future representing the completion of the HTTP/2
1094    /// // handshake.
1095    /// let client_fut = Builder::new().handshake::<_, &'static [u8]>(my_io);
1096    /// # client_fut.await
1097    /// # }
1098    /// #
1099    /// # pub fn main() {}
1100    /// ```
1101    pub fn handshake<T, B>(
1102        &self,
1103        io: T,
1104    ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::h2::Error>>
1105    where
1106        T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1107        B: Buf + 'static,
1108    {
1109        Connection::handshake2(io, self.clone())
1110    }
1111}
1112
1113impl Default for Builder {
1114    fn default() -> Builder {
1115        Builder::new()
1116    }
1117}
1118
1119/// Creates a new configured HTTP/2 client with default configuration
1120/// values backed by `io`.
1121///
1122/// It is expected that `io` already be in an appropriate state to commence
1123/// the [HTTP/2 handshake]. See [Handshake] for more details.
1124///
1125/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1126/// tuple once the HTTP/2 handshake has been completed. The returned
1127/// [`Connection`] instance will be using default configuration values. Use
1128/// [`Builder`] to customize the configuration values used by a [`Connection`]
1129/// instance.
1130///
1131/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1132/// [Handshake]: ../index.html#handshake
1133/// [`Connection`]: struct.Connection.html
1134/// [`SendRequest`]: struct.SendRequest.html
1135///
1136/// # Examples
1137///
1138/// ```
1139/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
1140/// # use h2::client;
1141/// # use h2::client::*;
1142/// #
1143/// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T) -> Result<(), h2::Error>
1144/// # {
1145/// let (send_request, connection) = client::handshake(my_io).await?;
1146/// // The HTTP/2 handshake has completed, now start polling
1147/// // `connection` and use `send_request` to send requests to the
1148/// // server.
1149/// # Ok(())
1150/// # }
1151/// #
1152/// # pub fn main() {}
1153/// ```
1154pub async fn handshake<T>(
1155    io: T,
1156) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::h2::Error>
1157where
1158    T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1159{
1160    let builder = Builder::new();
1161    builder
1162        .handshake(io)
1163        .instrument(tracing::trace_span!("client_handshake"))
1164        .await
1165}
1166
1167// ===== impl Connection =====
1168
1169async fn bind_connection<T>(io: &mut T) -> Result<(), crate::h2::Error>
1170where
1171    T: AsyncReadRent + AsyncWriteRent + Unpin,
1172{
1173    tracing::debug!("binding client connection");
1174
1175    let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1176    let (res, _) = io.write_all(msg).await;
1177
1178    res.map_err(crate::h2::Error::from_io)?;
1179
1180    tracing::debug!("client connection bound");
1181
1182    Ok(())
1183}
1184
1185impl<T, B> Connection<T, B>
1186where
1187    T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1188    B: Buf + 'static,
1189{
1190    async fn handshake2(
1191        mut io: T,
1192        builder: Builder,
1193    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::h2::Error> {
1194        bind_connection(&mut io).await?;
1195
1196        // Create the codec
1197        let mut codec = Codec::new(io);
1198
1199        if let Some(max) = builder.settings.max_frame_size() {
1200            codec.set_max_recv_frame_size(max as usize);
1201        }
1202
1203        if let Some(max) = builder.settings.max_header_list_size() {
1204            codec.set_max_recv_header_list_size(max as usize);
1205        }
1206
1207        // Send initial settings frame
1208        codec
1209            .buffer(builder.settings.clone().into())
1210            .expect("invalid SETTINGS frame");
1211
1212        let inner = proto::Connection::new(
1213            codec,
1214            proto::Config {
1215                next_stream_id: builder.stream_id,
1216                initial_max_send_streams: builder.initial_max_send_streams,
1217                max_send_buffer_size: builder.max_send_buffer_size,
1218                reset_stream_duration: builder.reset_stream_duration,
1219                reset_stream_max: builder.reset_stream_max,
1220                settings: builder.settings.clone(),
1221            },
1222        );
1223        let send_request = SendRequest {
1224            inner: inner.streams().clone(),
1225            pending: None,
1226        };
1227
1228        let mut connection = Connection { inner };
1229        if let Some(sz) = builder.initial_target_connection_window_size {
1230            connection.set_target_window_size(sz);
1231        }
1232
1233        Ok((send_request, connection))
1234    }
1235
1236    /// Sets the target window size for the whole connection.
1237    ///
1238    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1239    /// frame will be immediately sent to the remote, increasing the connection
1240    /// level window by `size - current_value`.
1241    ///
1242    /// If `size` is less than the current value, nothing will happen
1243    /// immediately. However, as window capacity is released by
1244    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1245    /// out until the number of "in flight" bytes drops below `size`.
1246    ///
1247    /// The default value is 65,535.
1248    ///
1249    /// See [`FlowControl`] documentation for more details.
1250    ///
1251    /// [`FlowControl`]: ../struct.FlowControl.html
1252    /// [library level]: ../index.html#flow-control
1253    pub fn set_target_window_size(&mut self, size: u32) {
1254        assert!(size <= proto::MAX_WINDOW_SIZE);
1255        self.inner.set_target_window_size(size);
1256    }
1257
1258    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1259    /// flow control for received data.
1260    ///
1261    /// The `SETTINGS` will be sent to the remote, and only applied once the
1262    /// remote acknowledges the change.
1263    ///
1264    /// This can be used to increase or decrease the window size for existing
1265    /// streams.
1266    ///
1267    /// # Errors
1268    ///
1269    /// Returns an error if a previous call is still pending acknowledgement
1270    /// from the remote endpoint.
1271    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::h2::Error> {
1272        assert!(size <= proto::MAX_WINDOW_SIZE);
1273        self.inner.set_initial_window_size(size)?;
1274        Ok(())
1275    }
1276
1277    /// Takes a `PingPong` instance from the connection.
1278    ///
1279    /// # Note
1280    ///
1281    /// This may only be called once. Calling multiple times will return `None`.
1282    pub fn ping_pong(&mut self) -> Option<PingPong> {
1283        self.inner.take_user_pings().map(PingPong::new)
1284    }
1285
1286    /// Returns the maximum number of concurrent streams that may be initiated
1287    /// by this client.
1288    ///
1289    /// This limit is configured by the server peer by sending the
1290    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1291    /// This method returns the currently acknowledged value received from the
1292    /// remote.
1293    ///
1294    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1295    pub fn max_concurrent_send_streams(&self) -> usize {
1296        self.inner.max_send_streams()
1297    }
1298    /// Returns the maximum number of concurrent streams that may be initiated
1299    /// by the server on this connection.
1300    ///
1301    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1302    /// parameter][1] sent in a `SETTINGS` frame that has been
1303    /// acknowledged by the remote peer. The value to be sent is configured by
1304    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1305    /// with the remote peer.
1306    ///
1307    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1308    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1309    pub fn max_concurrent_recv_streams(&self) -> usize {
1310        self.inner.max_recv_streams()
1311    }
1312}
1313
1314impl<T, B> Future for Connection<T, B>
1315where
1316    T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1317    B: Buf + 'static,
1318{
1319    type Output = Result<(), crate::h2::Error>;
1320
1321    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1322        self.inner.maybe_close_connection_if_no_streams();
1323        self.inner.poll(cx).map_err(Into::into)
1324    }
1325}
1326
1327impl<T, B> fmt::Debug for Connection<T, B>
1328where
1329    T: AsyncReadRent + AsyncWriteRent,
1330    T: fmt::Debug,
1331    B: fmt::Debug + Buf,
1332{
1333    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1334        fmt::Debug::fmt(&self.inner, fmt)
1335    }
1336}
1337
1338// ===== impl ResponseFuture =====
1339
1340impl Future for ResponseFuture {
1341    type Output = Result<Response<RecvStream>, crate::h2::Error>;
1342
1343    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1344        let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1345        let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1346
1347        Poll::Ready(Ok(Response::from_parts(parts, body)))
1348    }
1349}
1350
1351impl ResponseFuture {
1352    /// Returns the stream ID of the response stream.
1353    ///
1354    /// # Panics
1355    ///
1356    /// If the lock on the stream store has been poisoned.
1357    pub fn stream_id(&self) -> crate::h2::StreamId {
1358        crate::h2::StreamId::from_internal(self.inner.stream_id())
1359    }
1360    /// Returns a stream of PushPromises
1361    ///
1362    /// # Panics
1363    ///
1364    /// If this method has been called before
1365    /// or the stream was itself was pushed
1366    pub fn push_promises(&mut self) -> PushPromises {
1367        if self.push_promise_consumed {
1368            panic!("Reference to push promises stream taken!");
1369        }
1370        self.push_promise_consumed = true;
1371        PushPromises {
1372            inner: self.inner.clone(),
1373        }
1374    }
1375}
1376
1377// ===== impl PushPromises =====
1378
1379impl PushPromises {
1380    /// Get the next `PushPromise`.
1381    pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::h2::Error>> {
1382        std::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
1383    }
1384
1385    #[doc(hidden)]
1386    pub fn poll_push_promise(
1387        &mut self,
1388        cx: &Context<'_>,
1389    ) -> Poll<Option<Result<PushPromise, crate::h2::Error>>> {
1390        match self.inner.poll_pushed(cx) {
1391            Poll::Ready(Some(Ok((request, response)))) => {
1392                let response = PushedResponseFuture {
1393                    inner: ResponseFuture {
1394                        inner: response,
1395                        push_promise_consumed: false,
1396                    },
1397                };
1398                Poll::Ready(Some(Ok(PushPromise { request, response })))
1399            }
1400            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1401            Poll::Ready(None) => Poll::Ready(None),
1402            Poll::Pending => Poll::Pending,
1403        }
1404    }
1405}
1406
1407#[cfg(feature = "stream")]
1408impl futures_core::Stream for PushPromises {
1409    type Item = Result<PushPromise, crate::h2::Error>;
1410
1411    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1412        self.poll_push_promise(cx)
1413    }
1414}
1415
1416// ===== impl PushPromise =====
1417
1418impl PushPromise {
1419    /// Returns a reference to the push promise's request headers.
1420    pub fn request(&self) -> &Request<()> {
1421        &self.request
1422    }
1423
1424    /// Returns a mutable reference to the push promise's request headers.
1425    pub fn request_mut(&mut self) -> &mut Request<()> {
1426        &mut self.request
1427    }
1428
1429    /// Consumes `self`, returning the push promise's request headers and
1430    /// response future.
1431    pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1432        (self.request, self.response)
1433    }
1434}
1435
1436// ===== impl PushedResponseFuture =====
1437
1438impl Future for PushedResponseFuture {
1439    type Output = Result<Response<RecvStream>, crate::h2::Error>;
1440
1441    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1442        Pin::new(&mut self.inner).poll(cx)
1443    }
1444}
1445
1446impl PushedResponseFuture {
1447    /// Returns the stream ID of the response stream.
1448    ///
1449    /// # Panics
1450    ///
1451    /// If the lock on the stream store has been poisoned.
1452    pub fn stream_id(&self) -> crate::h2::StreamId {
1453        self.inner.stream_id()
1454    }
1455}
1456
1457// ===== impl Peer =====
1458
1459impl Peer {
1460    pub fn convert_send_message(
1461        id: StreamId,
1462        request: Request<()>,
1463        protocol: Option<Protocol>,
1464        end_of_stream: bool,
1465    ) -> Result<Headers, SendError> {
1466        use http::request::Parts;
1467
1468        let (
1469            Parts {
1470                method,
1471                uri,
1472                headers,
1473                version,
1474                ..
1475            },
1476            _,
1477        ) = request.into_parts();
1478
1479        let is_connect = method == Method::CONNECT;
1480
1481        // Build the set pseudo header set. All requests will include `method`
1482        // and `path`.
1483        let mut pseudo = Pseudo::request(method, uri, protocol);
1484
1485        if pseudo.scheme.is_none() {
1486            // If the scheme is not set, then there are a two options.
1487            //
1488            // 1) Authority is not set. In this case, a request was issued with a relative URI. This
1489            //    is permitted **only** when forwarding HTTP 1.x requests. If the HTTP version is
1490            //    set to 2.0, then this is an error.
1491            //
1492            // 2) Authority is set, then the HTTP method *must* be CONNECT.
1493            //
1494            // It is not possible to have a scheme but not an authority set (the
1495            // `http` crate does not allow it).
1496            //
1497            if pseudo.authority.is_none() {
1498                if version == Version::HTTP_2 {
1499                    return Err(UserError::MissingUriSchemeAndAuthority.into());
1500                } else {
1501                    // This is acceptable as per the above comment. However,
1502                    // HTTP/2 requires that a scheme is set. Since we are
1503                    // forwarding an HTTP 1.1 request, the scheme is set to
1504                    // "http".
1505                    pseudo.set_scheme(uri::Scheme::HTTP);
1506                }
1507            } else if !is_connect {
1508                // TODO: Error
1509            }
1510        }
1511
1512        // Create the HEADERS frame
1513        let mut frame = Headers::new(id, pseudo, headers);
1514
1515        if end_of_stream {
1516            frame.set_end_stream()
1517        }
1518
1519        Ok(frame)
1520    }
1521}
1522
1523impl proto::Peer for Peer {
1524    type Poll = Response<()>;
1525
1526    const NAME: &'static str = "Client";
1527
1528    fn r#dyn() -> proto::DynPeer {
1529        proto::DynPeer::Client
1530    }
1531
1532    fn is_server() -> bool {
1533        false
1534    }
1535
1536    fn convert_poll_message(
1537        pseudo: Pseudo,
1538        fields: HeaderMap,
1539        stream_id: StreamId,
1540    ) -> Result<Self::Poll, Error> {
1541        let mut b = Response::builder();
1542
1543        b = b.version(Version::HTTP_2);
1544
1545        if let Some(status) = pseudo.status {
1546            b = b.status(status);
1547        }
1548
1549        let mut response = match b.body(()) {
1550            Ok(response) => response,
1551            Err(_) => {
1552                // TODO: Should there be more specialized handling for different
1553                // kinds of errors
1554                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1555            }
1556        };
1557
1558        *response.headers_mut() = fields;
1559
1560        Ok(response)
1561    }
1562}