http2/
client.rs

1//! Client implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 client requires the caller to establish the underlying
6//! connection as well as get the connection to a state that is ready to begin
7//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12//!
13//! Once a connection is obtained, it is passed to [`handshake`], which will
14//! begin the [HTTP/2 handshake]. This returns a future that completes once
15//! the handshake process is performed and HTTP/2 streams may be initialized.
16//!
17//! [`handshake`] uses default configuration values. There are a number of
18//! settings that can be changed by using [`Builder`] instead.
19//!
20//! Once the handshake future completes, the caller is provided with a
21//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22//! instance is used to drive the connection (see [Managing the connection]).
23//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24//! requests]).
25//!
26//! # Making requests
27//!
28//! Requests are made using the [`SendRequest`] handle provided by the handshake
29//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30//! the request is sent to the server.
31//!
32//! A request body and request trailers are sent using [`SendRequest`] and the
33//! server's response is returned once the [`ResponseFuture`] future completes.
34//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36//! initialized by the sent request.
37//!
38//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39//! stream can be created, i.e. as long as the current number of active streams
40//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41//! caller will be notified once an existing stream closes, freeing capacity for
42//! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43//! capacity before sending a request to the server.
44//!
45//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47//! to do so will result in an [`Error`] being returned.
48//!
49//! # Managing the connection
50//!
51//! The [`Connection`] instance is used to manage connection state. The caller
52//! is required to call [`Connection::poll`] in order to advance state.
53//! [`SendRequest::send_request`] and other functions have no effect unless
54//! [`Connection::poll`] is called.
55//!
56//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57//! returns `Ready`. At this point, the underlying socket has been closed and no
58//! further work needs to be done.
59//!
60//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61//! submit the [`Connection`] instance to an [executor]. The executor will then
62//! manage polling the connection until the connection is complete.
63//! Alternatively, the caller can call `poll` manually.
64//!
65//! # Example
66//!
67//! ```rust, no_run
68//!
69//! use http2::client;
70//!
71//! use http::{Request, Method};
72//! use std::error::Error;
73//! use tokio::net::TcpStream;
74//!
75//! #[tokio::main]
76//! pub async fn main() -> Result<(), Box<dyn Error>> {
77//!     // Establish TCP connection to the server.
78//!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79//!     let (http2, connection) = client::handshake(tcp).await?;
80//!     tokio::spawn(async move {
81//!         connection.await.unwrap();
82//!     });
83//!
84//!     let mut http2 = http2.ready().await?;
85//!     // Prepare the HTTP request to send to the server.
86//!     let request = Request::builder()
87//!                     .method(Method::GET)
88//!                     .uri("https://www.example.com/")
89//!                     .body(())
90//!                     .unwrap();
91//!
92//!     // Send the request. The second tuple item allows the caller
93//!     // to stream a request body.
94//!     let (response, _) = http2.send_request(request, true).unwrap();
95//!
96//!     let (head, mut body) = response.await?.into_parts();
97//!
98//!     println!("Received response: {:?}", head);
99//!
100//!     // The `flow_control` handle allows the caller to manage
101//!     // flow control.
102//!     //
103//!     // Whenever data is received, the caller is responsible for
104//!     // releasing capacity back to the server once it has freed
105//!     // the data from memory.
106//!     let mut flow_control = body.flow_control().clone();
107//!
108//!     while let Some(chunk) = body.data().await {
109//!         let chunk = chunk?;
110//!         println!("RX: {:?}", chunk);
111//!
112//!         // Let the server send more data.
113//!         let _ = flow_control.release_capacity(chunk.len());
114//!     }
115//!
116//!     Ok(())
117//! }
118//! ```
119//!
120//! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121//! [`handshake`]: fn.handshake.html
122//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123//! [`SendRequest`]: struct.SendRequest.html
124//! [`SendStream`]: ../struct.SendStream.html
125//! [Making requests]: #making-requests
126//! [Managing the connection]: #managing-the-connection
127//! [`Connection`]: struct.Connection.html
128//! [`Connection::poll`]: struct.Connection.html#method.poll
129//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131//! [`SendRequest`]: struct.SendRequest.html
132//! [`ResponseFuture`]: struct.ResponseFuture.html
133//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135//! [`Builder`]: struct.Builder.html
136//! [`Error`]: ../struct.Error.html
137
138use crate::codec::{Codec, SendError, UserError};
139use crate::ext::Protocol;
140#[cfg(feature = "unstable")]
141use crate::frame::ExperimentalSettings;
142use crate::frame::{
143    Headers, Priorities, Pseudo, PseudoOrder, Reason, Settings, SettingsOrder, StreamDependency,
144    StreamId,
145};
146use crate::proto::{self, Error};
147use crate::{tracing, FlowControl, PingPong, RecvStream, SendStream};
148
149#[cfg(feature = "tracing")]
150use ::tracing::Instrument;
151use bytes::{Buf, Bytes};
152use http::{uri, HeaderMap, Method, Request, Response, Version};
153use std::fmt;
154use std::future::Future;
155use std::pin::Pin;
156use std::task::{Context, Poll};
157use std::time::Duration;
158use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
159
160/// Initializes new HTTP/2 streams on a connection by sending a request.
161///
162/// This type does no work itself. Instead, it is a handle to the inner
163/// connection state held by [`Connection`]. If the associated connection
164/// instance is dropped, all `SendRequest` functions will return [`Error`].
165///
166/// [`SendRequest`] instances are able to move to and operate on separate tasks
167/// / threads than their associated [`Connection`] instance. Internally, there
168/// is a buffer used to stage requests before they get written to the
169/// connection. There is no guarantee that requests get written to the
170/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
171///
172/// [`SendRequest`] implements [`Clone`], enabling the creation of many
173/// instances that are backed by a single connection.
174///
175/// See [module] level documentation for more details.
176///
177/// [module]: index.html
178/// [`Connection`]: struct.Connection.html
179/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
180/// [`Error`]: ../struct.Error.html
181pub struct SendRequest<B: Buf> {
182    inner: proto::Streams<B, Peer>,
183    pending: Option<proto::OpaqueStreamRef>,
184}
185
186/// Returns a `SendRequest` instance once it is ready to send at least one
187/// request.
188#[derive(Debug)]
189pub struct ReadySendRequest<B: Buf> {
190    inner: Option<SendRequest<B>>,
191}
192
193/// Manages all state associated with an HTTP/2 client connection.
194///
195/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
196/// implements the HTTP/2 client logic for that connection. It is responsible
197/// for driving the internal state forward, performing the work requested of the
198/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
199/// [`RecvStream`]).
200///
201/// `Connection` values are created by calling [`handshake`]. Once a
202/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
203/// until `Ready` is returned. The easiest way to do this is to submit the
204/// `Connection` instance to an [executor].
205///
206/// [module]: index.html
207/// [`handshake`]: fn.handshake.html
208/// [`SendRequest`]: struct.SendRequest.html
209/// [`ResponseFuture`]: struct.ResponseFuture.html
210/// [`SendStream`]: ../struct.SendStream.html
211/// [`RecvStream`]: ../struct.RecvStream.html
212/// [`poll`]: #method.poll
213/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
214///
215/// # Examples
216///
217/// ```
218/// # use tokio::io::{AsyncRead, AsyncWrite};
219/// # use http2::client;
220/// # use http2::client::*;
221/// #
222/// # async fn doc<T>(my_io: T) -> Result<(), http2::Error>
223/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
224/// # {
225///     let (send_request, connection) = client::handshake(my_io).await?;
226///     // Submit the connection handle to an executor.
227///     tokio::spawn(async { connection.await.expect("connection failed"); });
228///
229///     // Now, use `send_request` to initialize HTTP/2 streams.
230///     // ...
231/// # Ok(())
232/// # }
233/// #
234/// # pub fn main() {}
235/// ```
236#[must_use = "futures do nothing unless polled"]
237pub struct Connection<T, B: Buf = Bytes> {
238    inner: proto::Connection<T, Peer, B>,
239}
240
241/// A future of an HTTP response.
242#[derive(Debug)]
243#[must_use = "futures do nothing unless polled"]
244pub struct ResponseFuture {
245    inner: proto::OpaqueStreamRef,
246    push_promise_consumed: bool,
247}
248
249/// A future of a pushed HTTP response.
250///
251/// We have to differentiate between pushed and non pushed because of the spec
252/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
253/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
254/// > that is in either the "open" or "half-closed (remote)" state.
255#[derive(Debug)]
256#[must_use = "futures do nothing unless polled"]
257pub struct PushedResponseFuture {
258    inner: ResponseFuture,
259}
260
261/// A pushed response and corresponding request headers
262#[derive(Debug)]
263pub struct PushPromise {
264    /// The request headers
265    request: Request<()>,
266
267    /// The pushed response
268    response: PushedResponseFuture,
269}
270
271/// A stream of pushed responses and corresponding promised requests
272#[derive(Debug)]
273#[must_use = "streams do nothing unless polled"]
274pub struct PushPromises {
275    inner: proto::OpaqueStreamRef,
276}
277
278/// Builds client connections with custom configuration values.
279///
280/// Methods can be chained in order to set the configuration values.
281///
282/// The client is constructed by calling [`handshake`] and passing the I/O
283/// handle that will back the HTTP/2 server.
284///
285/// New instances of `Builder` are obtained via [`Builder::new`].
286///
287/// See function level documentation for details on the various client
288/// configuration settings.
289///
290/// [`Builder::new`]: struct.Builder.html#method.new
291/// [`handshake`]: struct.Builder.html#method.handshake
292///
293/// # Examples
294///
295/// ```
296/// # use tokio::io::{AsyncRead, AsyncWrite};
297/// # use http2::client::*;
298/// # use bytes::Bytes;
299/// #
300/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
301///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
302/// # {
303/// // `client_fut` is a future representing the completion of the HTTP/2
304/// // handshake.
305/// let client_fut = Builder::new()
306///     .initial_window_size(1_000_000)
307///     .max_concurrent_streams(1000)
308///     .handshake(my_io);
309/// # client_fut.await
310/// # }
311/// #
312/// # pub fn main() {}
313/// ```
314#[derive(Clone, Debug)]
315pub struct Builder {
316    /// Time to keep locally reset streams around before reaping.
317    reset_stream_duration: Duration,
318
319    /// Initial maximum number of locally initiated (send) streams.
320    /// After receiving a SETTINGS frame from the remote peer,
321    /// the connection will overwrite this value with the
322    /// MAX_CONCURRENT_STREAMS specified in the frame.
323    /// If no value is advertised by the remote peer in the initial SETTINGS
324    /// frame, it will be set to usize::MAX.
325    initial_max_send_streams: usize,
326
327    /// Initial target window size for new connections.
328    initial_target_connection_window_size: Option<u32>,
329
330    /// Maximum amount of bytes to "buffer" for writing per stream.
331    max_send_buffer_size: usize,
332
333    /// Maximum number of locally reset streams to keep at a time.
334    reset_stream_max: usize,
335
336    /// Maximum number of remotely reset streams to allow in the pending
337    /// accept queue.
338    pending_accept_reset_stream_max: usize,
339
340    /// Initial `Settings` frame to send as part of the handshake.
341    settings: Settings,
342
343    /// The stream ID of the first (lowest) stream. Subsequent streams will use
344    /// monotonically increasing stream IDs.
345    stream_id: StreamId,
346
347    /// Maximum number of locally reset streams due to protocol error across
348    /// the lifetime of the connection.
349    ///
350    /// When this gets exceeded, we issue GOAWAYs.
351    local_max_error_reset_streams: Option<usize>,
352
353    /// The headers frame pseudo order
354    headers_pseudo_order: Option<PseudoOrder>,
355
356    /// The headers frame stream dependency
357    headers_stream_dependency: Option<StreamDependency>,
358
359    /// Priority stream list
360    priorities: Option<Priorities>,
361}
362
363#[derive(Debug)]
364pub(crate) struct Peer;
365
366// ===== impl SendRequest =====
367
368impl<B> SendRequest<B>
369where
370    B: Buf,
371{
372    /// Returns `Ready` when the connection can initialize a new HTTP/2
373    /// stream.
374    ///
375    /// This function must return `Ready` before `send_request` is called. When
376    /// `Poll::Pending` is returned, the task will be notified once the readiness
377    /// state changes.
378    ///
379    /// See [module] level docs for more details.
380    ///
381    /// [module]: index.html
382    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
383        ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
384        self.pending = None;
385        Poll::Ready(Ok(()))
386    }
387
388    /// Consumes `self`, returning a future that returns `self` back once it is
389    /// ready to send a request.
390    ///
391    /// This function should be called before calling `send_request`.
392    ///
393    /// This is a functional combinator for [`poll_ready`]. The returned future
394    /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
395    /// the caller.
396    ///
397    /// # Examples
398    ///
399    /// ```rust
400    /// # use http2::client::*;
401    /// # use http::*;
402    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
403    /// # {
404    /// // First, wait until the `send_request` handle is ready to send a new
405    /// // request
406    /// let mut send_request = send_request.ready().await.unwrap();
407    /// // Use `send_request` here.
408    /// # }
409    /// # pub fn main() {}
410    /// ```
411    ///
412    /// See [module] level docs for more details.
413    ///
414    /// [`poll_ready`]: #method.poll_ready
415    /// [module]: index.html
416    pub fn ready(self) -> ReadySendRequest<B> {
417        ReadySendRequest { inner: Some(self) }
418    }
419
420    /// Sends a HTTP/2 request to the server.
421    ///
422    /// `send_request` initializes a new HTTP/2 stream on the associated
423    /// connection, then sends the given request using this new stream. Only the
424    /// request head is sent.
425    ///
426    /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
427    /// are returned. The [`ResponseFuture`] instance is used to get the
428    /// server's response and the [`SendStream`] instance is used to send a
429    /// request body or trailers to the server over the same HTTP/2 stream.
430    ///
431    /// To send a request body or trailers, set `end_of_stream` to `false`.
432    /// Then, use the returned [`SendStream`] instance to stream request body
433    /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
434    /// then attempting to call [`SendStream::send_data`] or
435    /// [`SendStream::send_trailers`] will result in an error.
436    ///
437    /// If no request body or trailers are to be sent, set `end_of_stream` to
438    /// `true` and drop the returned [`SendStream`] instance.
439    ///
440    /// # A note on HTTP versions
441    ///
442    /// The provided `Request` will be encoded differently depending on the
443    /// value of its version field. If the version is set to 2.0, then the
444    /// request is encoded as per the specification recommends.
445    ///
446    /// If the version is set to a lower value, then the request is encoded to
447    /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
448    /// headers are permitted and the `:authority` pseudo header is not
449    /// included.
450    ///
451    /// The caller should always set the request's version field to 2.0 unless
452    /// specifically transmitting an HTTP 1.1 request over 2.0.
453    ///
454    /// # Examples
455    ///
456    /// Sending a request with no body
457    ///
458    /// ```rust
459    /// # use http2::client::*;
460    /// # use http::*;
461    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
462    /// # {
463    /// // First, wait until the `send_request` handle is ready to send a new
464    /// // request
465    /// let mut send_request = send_request.ready().await.unwrap();
466    /// // Prepare the HTTP request to send to the server.
467    /// let request = Request::get("https://www.example.com/")
468    ///     .body(())
469    ///     .unwrap();
470    ///
471    /// // Send the request to the server. Since we are not sending a
472    /// // body or trailers, we can drop the `SendStream` instance.
473    /// let (response, _) = send_request.send_request(request, true).unwrap();
474    /// let response = response.await.unwrap();
475    /// // Process the response
476    /// # }
477    /// # pub fn main() {}
478    /// ```
479    ///
480    /// Sending a request with a body and trailers
481    ///
482    /// ```rust
483    /// # use http2::client::*;
484    /// # use http::*;
485    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
486    /// # {
487    /// // First, wait until the `send_request` handle is ready to send a new
488    /// // request
489    /// let mut send_request = send_request.ready().await.unwrap();
490    ///
491    /// // Prepare the HTTP request to send to the server.
492    /// let request = Request::get("https://www.example.com/")
493    ///     .body(())
494    ///     .unwrap();
495    ///
496    /// // Send the request to the server. If we are not sending a
497    /// // body or trailers, we can drop the `SendStream` instance.
498    /// let (response, mut send_stream) = send_request
499    ///     .send_request(request, false).unwrap();
500    ///
501    /// // At this point, one option would be to wait for send capacity.
502    /// // Doing so would allow us to not hold data in memory that
503    /// // cannot be sent. However, this is not a requirement, so this
504    /// // example will skip that step. See `SendStream` documentation
505    /// // for more details.
506    /// send_stream.send_data(b"hello", false).unwrap();
507    /// send_stream.send_data(b"world", false).unwrap();
508    ///
509    /// // Send the trailers.
510    /// let mut trailers = HeaderMap::new();
511    /// trailers.insert(
512    ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
513    ///     header::HeaderValue::from_bytes(b"hello").unwrap());
514    ///
515    /// send_stream.send_trailers(trailers).unwrap();
516    ///
517    /// let response = response.await.unwrap();
518    /// // Process the response
519    /// # }
520    /// # pub fn main() {}
521    /// ```
522    ///
523    /// [`ResponseFuture`]: struct.ResponseFuture.html
524    /// [`SendStream`]: ../struct.SendStream.html
525    /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
526    /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
527    pub fn send_request(
528        &mut self,
529        request: Request<()>,
530        end_of_stream: bool,
531    ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
532        self.inner
533            .send_request(request, end_of_stream, self.pending.as_ref())
534            .map_err(Into::into)
535            .map(|(stream, is_full)| {
536                if stream.is_pending_open() && is_full {
537                    // Only prevent sending another request when the request queue
538                    // is not full.
539                    self.pending = Some(stream.clone_to_opaque());
540                }
541
542                let response = ResponseFuture {
543                    inner: stream.clone_to_opaque(),
544                    push_promise_consumed: false,
545                };
546
547                let stream = SendStream::new(stream);
548
549                (response, stream)
550            })
551    }
552
553    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
554    ///
555    /// This setting is configured by the server peer by sending the
556    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
557    /// This method returns the currently acknowledged value received from the
558    /// remote.
559    ///
560    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
561    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
562    pub fn is_extended_connect_protocol_enabled(&self) -> bool {
563        self.inner.is_extended_connect_protocol_enabled()
564    }
565
566    /// Returns the current max send streams
567    pub fn current_max_send_streams(&self) -> usize {
568        self.inner.current_max_send_streams()
569    }
570
571    /// Returns the current max recv streams
572    pub fn current_max_recv_streams(&self) -> usize {
573        self.inner.current_max_recv_streams()
574    }
575}
576
577impl<B> fmt::Debug for SendRequest<B>
578where
579    B: Buf,
580{
581    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
582        fmt.debug_struct("SendRequest").finish()
583    }
584}
585
586impl<B> Clone for SendRequest<B>
587where
588    B: Buf,
589{
590    fn clone(&self) -> Self {
591        SendRequest {
592            inner: self.inner.clone(),
593            pending: None,
594        }
595    }
596}
597
598#[cfg(feature = "unstable")]
599impl<B> SendRequest<B>
600where
601    B: Buf,
602{
603    /// Returns the number of active streams.
604    ///
605    /// An active stream is a stream that has not yet transitioned to a closed
606    /// state.
607    pub fn num_active_streams(&self) -> usize {
608        self.inner.num_active_streams()
609    }
610
611    /// Returns the number of streams that are held in memory.
612    ///
613    /// A wired stream is a stream that is either active or is closed but must
614    /// stay in memory for some reason. For example, there are still outstanding
615    /// userspace handles pointing to the slot.
616    pub fn num_wired_streams(&self) -> usize {
617        self.inner.num_wired_streams()
618    }
619}
620
621// ===== impl ReadySendRequest =====
622
623impl<B> Future for ReadySendRequest<B>
624where
625    B: Buf,
626{
627    type Output = Result<SendRequest<B>, crate::Error>;
628
629    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
630        match &mut self.inner {
631            Some(send_request) => {
632                ready!(send_request.poll_ready(cx))?;
633            }
634            None => panic!("called `poll` after future completed"),
635        }
636
637        Poll::Ready(Ok(self.inner.take().unwrap()))
638    }
639}
640
641// ===== impl Builder =====
642
643impl Builder {
644    /// Returns a new client builder instance initialized with default
645    /// configuration values.
646    ///
647    /// Configuration methods can be chained on the return value.
648    ///
649    /// # Examples
650    ///
651    /// ```
652    /// # use tokio::io::{AsyncRead, AsyncWrite};
653    /// # use http2::client::*;
654    /// # use bytes::Bytes;
655    /// #
656    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
657    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
658    /// # {
659    /// // `client_fut` is a future representing the completion of the HTTP/2
660    /// // handshake.
661    /// let client_fut = Builder::new()
662    ///     .initial_window_size(1_000_000)
663    ///     .max_concurrent_streams(1000)
664    ///     .handshake(my_io);
665    /// # client_fut.await
666    /// # }
667    /// #
668    /// # pub fn main() {}
669    /// ```
670    pub fn new() -> Builder {
671        Builder {
672            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
673            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
674            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
675            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
676            initial_target_connection_window_size: None,
677            initial_max_send_streams: usize::MAX,
678            settings: Default::default(),
679            stream_id: 1.into(),
680            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
681            headers_pseudo_order: None,
682            headers_stream_dependency: None,
683            priorities: None,
684        }
685    }
686
687    /// Indicates the initial window size (in octets) for stream-level
688    /// flow control for received data.
689    ///
690    /// The initial window of a stream is used as part of flow control. For more
691    /// details, see [`FlowControl`].
692    ///
693    /// The default value is 65,535.
694    ///
695    /// [`FlowControl`]: ../struct.FlowControl.html
696    ///
697    /// # Examples
698    ///
699    /// ```
700    /// # use tokio::io::{AsyncRead, AsyncWrite};
701    /// # use http2::client::*;
702    /// # use bytes::Bytes;
703    /// #
704    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
705    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
706    /// # {
707    /// // `client_fut` is a future representing the completion of the HTTP/2
708    /// // handshake.
709    /// let client_fut = Builder::new()
710    ///     .initial_window_size(1_000_000)
711    ///     .handshake(my_io);
712    /// # client_fut.await
713    /// # }
714    /// #
715    /// # pub fn main() {}
716    /// ```
717    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
718        self.settings.set_initial_window_size(Some(size));
719        self
720    }
721
722    /// Indicates the initial window size (in octets) for connection-level flow control
723    /// for received data.
724    ///
725    /// The initial window of a connection is used as part of flow control. For more details,
726    /// see [`FlowControl`].
727    ///
728    /// The default value is 65,535.
729    ///
730    /// [`FlowControl`]: ../struct.FlowControl.html
731    ///
732    /// # Examples
733    ///
734    /// ```
735    /// # use tokio::io::{AsyncRead, AsyncWrite};
736    /// # use http2::client::*;
737    /// # use bytes::Bytes;
738    /// #
739    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
740    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
741    /// # {
742    /// // `client_fut` is a future representing the completion of the HTTP/2
743    /// // handshake.
744    /// let client_fut = Builder::new()
745    ///     .initial_connection_window_size(1_000_000)
746    ///     .handshake(my_io);
747    /// # client_fut.await
748    /// # }
749    /// #
750    /// # pub fn main() {}
751    /// ```
752    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
753        self.initial_target_connection_window_size = Some(size);
754        self
755    }
756
757    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
758    /// configured client is able to accept.
759    ///
760    /// The sender may send data frames that are **smaller** than this value,
761    /// but any data larger than `max` will be broken up into multiple `DATA`
762    /// frames.
763    ///
764    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
765    ///
766    /// # Examples
767    ///
768    /// ```
769    /// # use tokio::io::{AsyncRead, AsyncWrite};
770    /// # use http2::client::*;
771    /// # use bytes::Bytes;
772    /// #
773    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
774    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
775    /// # {
776    /// // `client_fut` is a future representing the completion of the HTTP/2
777    /// // handshake.
778    /// let client_fut = Builder::new()
779    ///     .max_frame_size(1_000_000)
780    ///     .handshake(my_io);
781    /// # client_fut.await
782    /// # }
783    /// #
784    /// # pub fn main() {}
785    /// ```
786    ///
787    /// # Panics
788    ///
789    /// This function panics if `max` is not within the legal range specified
790    /// above.
791    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
792        self.settings.set_max_frame_size(Some(max));
793        self
794    }
795
796    /// Sets the max size of received header frames.
797    ///
798    /// This advisory setting informs a peer of the maximum size of header list
799    /// that the sender is prepared to accept, in octets. The value is based on
800    /// the uncompressed size of header fields, including the length of the name
801    /// and value in octets plus an overhead of 32 octets for each header field.
802    ///
803    /// This setting is also used to limit the maximum amount of data that is
804    /// buffered to decode HEADERS frames.
805    ///
806    /// # Examples
807    ///
808    /// ```
809    /// # use tokio::io::{AsyncRead, AsyncWrite};
810    /// # use http2::client::*;
811    /// # use bytes::Bytes;
812    /// #
813    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
814    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
815    /// # {
816    /// // `client_fut` is a future representing the completion of the HTTP/2
817    /// // handshake.
818    /// let client_fut = Builder::new()
819    ///     .max_header_list_size(16 * 1024)
820    ///     .handshake(my_io);
821    /// # client_fut.await
822    /// # }
823    /// #
824    /// # pub fn main() {}
825    /// ```
826    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
827        self.settings.set_max_header_list_size(Some(max));
828        self
829    }
830
831    /// Sets the maximum number of concurrent streams.
832    ///
833    /// The maximum concurrent streams setting only controls the maximum number
834    /// of streams that can be initiated by the remote peer. In other words,
835    /// when this setting is set to 100, this does not limit the number of
836    /// concurrent streams that can be created by the caller.
837    ///
838    /// It is recommended that this value be no smaller than 100, so as to not
839    /// unnecessarily limit parallelism. However, any value is legal, including
840    /// 0. If `max` is set to 0, then the remote will not be permitted to
841    /// initiate streams.
842    ///
843    /// Note that streams in the reserved state, i.e., push promises that have
844    /// been reserved but the stream has not started, do not count against this
845    /// setting.
846    ///
847    /// Also note that if the remote *does* exceed the value set here, it is not
848    /// a protocol level error. Instead, the `http2` library will immediately reset
849    /// the stream.
850    ///
851    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
852    ///
853    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
854    ///
855    /// # Examples
856    ///
857    /// ```
858    /// # use tokio::io::{AsyncRead, AsyncWrite};
859    /// # use http2::client::*;
860    /// # use bytes::Bytes;
861    /// #
862    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
863    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
864    /// # {
865    /// // `client_fut` is a future representing the completion of the HTTP/2
866    /// // handshake.
867    /// let client_fut = Builder::new()
868    ///     .max_concurrent_streams(1000)
869    ///     .handshake(my_io);
870    /// # client_fut.await
871    /// # }
872    /// #
873    /// # pub fn main() {}
874    /// ```
875    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
876        self.settings.set_max_concurrent_streams(Some(max));
877        self
878    }
879
880    /// Sets the initial maximum of locally initiated (send) streams.
881    ///
882    /// The initial settings will be overwritten by the remote peer when
883    /// the SETTINGS frame is received. The new value will be set to the
884    /// `max_concurrent_streams()` from the frame. If no value is advertised in
885    /// the initial SETTINGS frame from the remote peer as part of
886    /// [HTTP/2 Connection Preface], `usize::MAX` will be set.
887    ///
888    /// This setting prevents the caller from exceeding this number of
889    /// streams that are counted towards the concurrency limit.
890    ///
891    /// Sending streams past the limit returned by the peer will be treated
892    /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
893    ///
894    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
895    ///
896    /// The default value is `usize::MAX`.
897    ///
898    /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
899    /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
900    ///
901    /// # Examples
902    ///
903    /// ```
904    /// # use tokio::io::{AsyncRead, AsyncWrite};
905    /// # use http2::client::*;
906    /// # use bytes::Bytes;
907    /// #
908    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
909    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
910    /// # {
911    /// // `client_fut` is a future representing the completion of the HTTP/2
912    /// // handshake.
913    /// let client_fut = Builder::new()
914    ///     .initial_max_send_streams(1000)
915    ///     .handshake(my_io);
916    /// # client_fut.await
917    /// # }
918    /// #
919    /// # pub fn main() {}
920    /// ```
921    pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
922        self.initial_max_send_streams = initial;
923        self
924    }
925
926    /// Sets the maximum number of concurrent locally reset streams.
927    ///
928    /// When a stream is explicitly reset, the HTTP/2 specification requires
929    /// that any further frames received for that stream must be ignored for
930    /// "some time".
931    ///
932    /// In order to satisfy the specification, internal state must be maintained
933    /// to implement the behavior. This state grows linearly with the number of
934    /// streams that are locally reset.
935    ///
936    /// The `max_concurrent_reset_streams` setting configures sets an upper
937    /// bound on the amount of state that is maintained. When this max value is
938    /// reached, the oldest reset stream is purged from memory.
939    ///
940    /// Once the stream has been fully purged from memory, any additional frames
941    /// received for that stream will result in a connection level protocol
942    /// error, forcing the connection to terminate.
943    ///
944    /// The default value is currently 50.
945    ///
946    /// # Examples
947    ///
948    /// ```
949    /// # use tokio::io::{AsyncRead, AsyncWrite};
950    /// # use http2::client::*;
951    /// # use bytes::Bytes;
952    /// #
953    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
954    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
955    /// # {
956    /// // `client_fut` is a future representing the completion of the HTTP/2
957    /// // handshake.
958    /// let client_fut = Builder::new()
959    ///     .max_concurrent_reset_streams(1000)
960    ///     .handshake(my_io);
961    /// # client_fut.await
962    /// # }
963    /// #
964    /// # pub fn main() {}
965    /// ```
966    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
967        self.reset_stream_max = max;
968        self
969    }
970
971    /// Sets the duration to remember locally reset streams.
972    ///
973    /// When a stream is explicitly reset, the HTTP/2 specification requires
974    /// that any further frames received for that stream must be ignored for
975    /// "some time".
976    ///
977    /// In order to satisfy the specification, internal state must be maintained
978    /// to implement the behavior. This state grows linearly with the number of
979    /// streams that are locally reset.
980    ///
981    /// The `reset_stream_duration` setting configures the max amount of time
982    /// this state will be maintained in memory. Once the duration elapses, the
983    /// stream state is purged from memory.
984    ///
985    /// Once the stream has been fully purged from memory, any additional frames
986    /// received for that stream will result in a connection level protocol
987    /// error, forcing the connection to terminate.
988    ///
989    /// The default value is currently 1 second.
990    ///
991    /// # Examples
992    ///
993    /// ```
994    /// # use tokio::io::{AsyncRead, AsyncWrite};
995    /// # use http2::client::*;
996    /// # use std::time::Duration;
997    /// # use bytes::Bytes;
998    /// #
999    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1000    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
1001    /// # {
1002    /// // `client_fut` is a future representing the completion of the HTTP/2
1003    /// // handshake.
1004    /// let client_fut = Builder::new()
1005    ///     .reset_stream_duration(Duration::from_secs(10))
1006    ///     .handshake(my_io);
1007    /// # client_fut.await
1008    /// # }
1009    /// #
1010    /// # pub fn main() {}
1011    /// ```
1012    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1013        self.reset_stream_duration = dur;
1014        self
1015    }
1016
1017    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
1018    ///
1019    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
1020    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
1021    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
1022    ///
1023    /// When the number of local resets exceeds this threshold, the client will close the connection.
1024    ///
1025    /// If you really want to disable this, supply [`Option::None`] here.
1026    /// Disabling this is not recommended and may expose you to DOS attacks.
1027    ///
1028    /// The default value is currently 1024, but could change.
1029    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
1030        self.local_max_error_reset_streams = max;
1031        self
1032    }
1033
1034    /// Sets the maximum number of pending-accept remotely-reset streams.
1035    ///
1036    /// Streams that have been received by the peer, but not accepted by the
1037    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
1038    /// could send a request and then shortly after, realize it is not needed,
1039    /// sending a CANCEL.
1040    ///
1041    /// However, since those streams are now "closed", they don't count towards
1042    /// the max concurrent streams. So, they will sit in the accept queue,
1043    /// using memory.
1044    ///
1045    /// When the number of remotely-reset streams sitting in the pending-accept
1046    /// queue reaches this maximum value, a connection error with the code of
1047    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
1048    /// `Future`.
1049    ///
1050    /// The default value is currently 20, but could change.
1051    ///
1052    /// # Examples
1053    ///
1054    /// ```
1055    /// # use tokio::io::{AsyncRead, AsyncWrite};
1056    /// # use http2::client::*;
1057    /// # use bytes::Bytes;
1058    /// #
1059    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1060    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
1061    /// # {
1062    /// // `client_fut` is a future representing the completion of the HTTP/2
1063    /// // handshake.
1064    /// let client_fut = Builder::new()
1065    ///     .max_pending_accept_reset_streams(100)
1066    ///     .handshake(my_io);
1067    /// # client_fut.await
1068    /// # }
1069    /// #
1070    /// # pub fn main() {}
1071    /// ```
1072    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1073        self.pending_accept_reset_stream_max = max;
1074        self
1075    }
1076
1077    /// Sets the maximum send buffer size per stream.
1078    ///
1079    /// Once a stream has buffered up to (or over) the maximum, the stream's
1080    /// flow control will not "poll" additional capacity. Once bytes for the
1081    /// stream have been written to the connection, the send buffer capacity
1082    /// will be freed up again.
1083    ///
1084    /// The default is currently ~400KB, but may change.
1085    ///
1086    /// # Panics
1087    ///
1088    /// This function panics if `max` is larger than `u32::MAX`.
1089    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1090        assert!(max <= u32::MAX as usize);
1091        self.max_send_buffer_size = max;
1092        self
1093    }
1094
1095    /// Enables or disables server push promises.
1096    ///
1097    /// This value is included in the initial SETTINGS handshake.
1098    /// Setting this value to value to
1099    /// false in the initial SETTINGS handshake guarantees that the remote server
1100    /// will never send a push promise.
1101    ///
1102    /// This setting can be changed during the life of a single HTTP/2
1103    /// connection by sending another settings frame updating the value.
1104    ///
1105    /// Default value: `true`.
1106    ///
1107    /// # Examples
1108    ///
1109    /// ```
1110    /// # use tokio::io::{AsyncRead, AsyncWrite};
1111    /// # use http2::client::*;
1112    /// # use std::time::Duration;
1113    /// # use bytes::Bytes;
1114    /// #
1115    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1116    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
1117    /// # {
1118    /// // `client_fut` is a future representing the completion of the HTTP/2
1119    /// // handshake.
1120    /// let client_fut = Builder::new()
1121    ///     .enable_push(false)
1122    ///     .handshake(my_io);
1123    /// # client_fut.await
1124    /// # }
1125    /// #
1126    /// # pub fn main() {}
1127    /// ```
1128    pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1129        self.settings.set_enable_push(enabled);
1130        self
1131    }
1132
1133    /// Sets the header table size.
1134    ///
1135    /// This setting informs the peer of the maximum size of the header compression
1136    /// table used to encode header blocks, in octets. The encoder may select any value
1137    /// equal to or less than the header table size specified by the sender.
1138    ///
1139    /// The default value is 4,096.
1140    ///
1141    /// # Examples
1142    ///
1143    /// ```
1144    /// # use tokio::io::{AsyncRead, AsyncWrite};
1145    /// # use http2::client::*;
1146    /// # use bytes::Bytes;
1147    /// #
1148    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1149    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
1150    /// # {
1151    /// // `client_fut` is a future representing the completion of the HTTP/2
1152    /// // handshake.
1153    /// let client_fut = Builder::new()
1154    ///     .header_table_size(1_000_000)
1155    ///     .handshake(my_io);
1156    /// # client_fut.await
1157    /// # }
1158    /// #
1159    /// # pub fn main() {}
1160    /// ```
1161    pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1162        self.settings.set_header_table_size(Some(size));
1163        self
1164    }
1165
1166    /// Sets the enable connect protocol.
1167    pub fn enable_connect_protocol(&mut self, enabled: bool) -> &mut Self {
1168        self.settings
1169            .set_enable_connect_protocol(Some(enabled as _));
1170        self
1171    }
1172
1173    /// Disable RFC 7540 Stream Priorities (set to `true` to disable).
1174    /// [RFC 9218]: <https://www.rfc-editor.org/rfc/rfc9218.html#section-2.1>
1175    pub fn no_rfc7540_priorities(&mut self, enabled: bool) -> &mut Self {
1176        self.settings.set_no_rfc7540_priorities(enabled);
1177        self
1178    }
1179
1180    /// Configures custom experimental HTTP/2 setting.
1181    ///
1182    /// This setting is reserved for future use or experimental purposes.
1183    /// Enabling or disabling it may have no effect unless explicitly supported
1184    /// by the server or client implementation.
1185    //
1186    // - Experimental feature – subject to removal without notice
1187    #[cfg(feature = "unstable")]
1188    pub fn experimental_settings(
1189        &mut self,
1190        experimental_settings: ExperimentalSettings,
1191    ) -> &mut Self {
1192        self.settings
1193            .set_experimental_settings(experimental_settings);
1194        self
1195    }
1196
1197    /// Sets the first stream ID to something other than 1.
1198    #[cfg(feature = "unstable")]
1199    pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1200        self.stream_id = stream_id.into();
1201        assert!(
1202            self.stream_id.is_client_initiated(),
1203            "stream id must be odd"
1204        );
1205        self
1206    }
1207
1208    /// Sets the order of settings parameters in the initial SETTINGS frame.
1209    ///
1210    /// This determines the order in which settings are sent during the HTTP/2 handshake.
1211    /// Customizing the order may be useful for testing or protocol compliance.
1212    pub fn settings_order(&mut self, order: SettingsOrder) -> &mut Self {
1213        self.settings.set_settings_order(order);
1214        self
1215    }
1216
1217    /// Sets the HTTP/2 pseudo-header field order for outgoing HEADERS frames.
1218    ///
1219    /// This determines the order in which pseudo-header fields (such as `:method`, `:scheme`, etc.)
1220    /// are encoded in the HEADERS frame. Customizing the order may be useful for interoperability
1221    /// or testing purposes.
1222    pub fn headers_pseudo_order(&mut self, order: PseudoOrder) -> &mut Self {
1223        self.headers_pseudo_order = Some(order.into());
1224        self
1225    }
1226
1227    /// Sets the stream dependency and weight for the outgoing HEADERS frame.
1228    ///
1229    /// This configures the priority of the stream by specifying its dependency and weight,
1230    /// as defined by the HTTP/2 priority mechanism. This can be used to influence how the
1231    /// server allocates resources to this stream relative to others.
1232    pub fn headers_stream_dependency(&mut self, stream_dependency: StreamDependency) -> &mut Self {
1233        self.headers_stream_dependency = Some(stream_dependency);
1234        self
1235    }
1236
1237    /// Sets the list of PRIORITY frames to be sent immediately after the connection is established,
1238    /// but before the first request is sent.
1239    ///
1240    /// This allows you to pre-configure the HTTP/2 stream dependency tree by specifying a set of
1241    /// PRIORITY frames that will be sent as part of the connection preface. This can be useful for
1242    /// optimizing resource allocation or testing custom stream prioritization strategies.
1243    ///
1244    /// Each `Priority` in the list must have a valid (non-zero) stream ID. Any priority with a
1245    /// stream ID of zero will be ignored.
1246    pub fn priorities(&mut self, priorities: Priorities) -> &mut Self {
1247        self.priorities = Some(priorities);
1248        self
1249    }
1250
1251    /// Creates a new configured HTTP/2 client backed by `io`.
1252    ///
1253    /// It is expected that `io` already be in an appropriate state to commence
1254    /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1255    /// preface and the initial settings frame is sent by the client.
1256    ///
1257    /// The handshake future does not wait for the initial settings frame from the
1258    /// server.
1259    ///
1260    /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1261    /// tuple once the HTTP/2 handshake has been completed.
1262    ///
1263    /// This function also allows the caller to configure the send payload data
1264    /// type. See [Outbound data type] for more details.
1265    ///
1266    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1267    /// [`Connection`]: struct.Connection.html
1268    /// [`SendRequest`]: struct.SendRequest.html
1269    /// [Outbound data type]: ../index.html#outbound-data-type.
1270    ///
1271    /// # Examples
1272    ///
1273    /// Basic usage:
1274    ///
1275    /// ```
1276    /// # use tokio::io::{AsyncRead, AsyncWrite};
1277    /// # use http2::client::*;
1278    /// # use bytes::Bytes;
1279    /// #
1280    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1281    ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), http2::Error>
1282    /// # {
1283    /// // `client_fut` is a future representing the completion of the HTTP/2
1284    /// // handshake.
1285    /// let client_fut = Builder::new()
1286    ///     .handshake(my_io);
1287    /// # client_fut.await
1288    /// # }
1289    /// #
1290    /// # pub fn main() {}
1291    /// ```
1292    ///
1293    /// Configures the send-payload data type. In this case, the outbound data
1294    /// type will be `&'static [u8]`.
1295    ///
1296    /// ```
1297    /// # use tokio::io::{AsyncRead, AsyncWrite};
1298    /// # use http2::client::*;
1299    /// #
1300    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1301    /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), http2::Error>
1302    /// # {
1303    /// // `client_fut` is a future representing the completion of the HTTP/2
1304    /// // handshake.
1305    /// let client_fut = Builder::new()
1306    ///     .handshake::<_, &'static [u8]>(my_io);
1307    /// # client_fut.await
1308    /// # }
1309    /// #
1310    /// # pub fn main() {}
1311    /// ```
1312    pub fn handshake<T, B>(
1313        &self,
1314        io: T,
1315    ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1316    where
1317        T: AsyncRead + AsyncWrite + Unpin,
1318        B: Buf,
1319    {
1320        Connection::handshake2(io, self.clone())
1321    }
1322}
1323
1324impl Default for Builder {
1325    fn default() -> Builder {
1326        Builder::new()
1327    }
1328}
1329
1330/// Creates a new configured HTTP/2 client with default configuration
1331/// values backed by `io`.
1332///
1333/// It is expected that `io` already be in an appropriate state to commence
1334/// the [HTTP/2 handshake]. See [Handshake] for more details.
1335///
1336/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1337/// tuple once the HTTP/2 handshake has been completed. The returned
1338/// [`Connection`] instance will be using default configuration values. Use
1339/// [`Builder`] to customize the configuration values used by a [`Connection`]
1340/// instance.
1341///
1342/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1343/// [Handshake]: ../index.html#handshake
1344/// [`Connection`]: struct.Connection.html
1345/// [`SendRequest`]: struct.SendRequest.html
1346///
1347/// # Examples
1348///
1349/// ```
1350/// # use tokio::io::{AsyncRead, AsyncWrite};
1351/// # use http2::client;
1352/// # use http2::client::*;
1353/// #
1354/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), http2::Error>
1355/// # {
1356/// let (send_request, connection) = client::handshake(my_io).await?;
1357/// // The HTTP/2 handshake has completed, now start polling
1358/// // `connection` and use `send_request` to send requests to the
1359/// // server.
1360/// # Ok(())
1361/// # }
1362/// #
1363/// # pub fn main() {}
1364/// ```
1365pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1366where
1367    T: AsyncRead + AsyncWrite + Unpin,
1368{
1369    let builder = Builder::new();
1370
1371    #[cfg(feature = "tracing")]
1372    return builder
1373        .handshake(io)
1374        .instrument(::tracing::trace_span!("client_handshake"))
1375        .await;
1376
1377    #[cfg(not(feature = "tracing"))]
1378    return builder.handshake(io).await;
1379}
1380
1381// ===== impl Connection =====
1382
1383async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1384where
1385    T: AsyncRead + AsyncWrite + Unpin,
1386{
1387    tracing::debug!("binding client connection");
1388
1389    let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1390    io.write_all(msg).await.map_err(crate::Error::from_io)?;
1391
1392    tracing::debug!("client connection bound");
1393
1394    Ok(())
1395}
1396
1397impl<T, B> Connection<T, B>
1398where
1399    T: AsyncRead + AsyncWrite + Unpin,
1400    B: Buf,
1401{
1402    async fn handshake2(
1403        mut io: T,
1404        builder: Builder,
1405    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1406        bind_connection(&mut io).await?;
1407
1408        // Create the codec
1409        let mut codec = Codec::new(io);
1410
1411        if let Some(max) = builder.settings.max_frame_size() {
1412            codec.set_max_recv_frame_size(max as usize);
1413        }
1414
1415        if let Some(max) = builder.settings.max_header_list_size() {
1416            codec.set_max_recv_header_list_size(max as usize);
1417        }
1418
1419        // Send initial settings frame
1420        codec
1421            .buffer((builder.settings.clone()).into())
1422            .expect("invalid SETTINGS frame");
1423
1424        let inner = proto::Connection::new(
1425            codec,
1426            proto::Config {
1427                next_stream_id: builder.stream_id,
1428                initial_max_send_streams: builder.initial_max_send_streams,
1429                max_send_buffer_size: builder.max_send_buffer_size,
1430                reset_stream_duration: builder.reset_stream_duration,
1431                reset_stream_max: builder.reset_stream_max,
1432                remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1433                local_error_reset_streams_max: builder.local_max_error_reset_streams,
1434                settings: builder.settings.clone(),
1435                headers_pseudo_order: builder.headers_pseudo_order,
1436                headers_stream_dependency: builder.headers_stream_dependency,
1437                priorities: builder.priorities,
1438            },
1439        );
1440        let send_request = SendRequest {
1441            inner: inner.streams().clone(),
1442            pending: None,
1443        };
1444
1445        let mut connection = Connection { inner };
1446        if let Some(sz) = builder.initial_target_connection_window_size {
1447            connection.set_target_window_size(sz);
1448        }
1449
1450        Ok((send_request, connection))
1451    }
1452
1453    /// Sets the target window size for the whole connection.
1454    ///
1455    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1456    /// frame will be immediately sent to the remote, increasing the connection
1457    /// level window by `size - current_value`.
1458    ///
1459    /// If `size` is less than the current value, nothing will happen
1460    /// immediately. However, as window capacity is released by
1461    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1462    /// out until the number of "in flight" bytes drops below `size`.
1463    ///
1464    /// The default value is 65,535.
1465    ///
1466    /// See [`FlowControl`] documentation for more details.
1467    ///
1468    /// [`FlowControl`]: ../struct.FlowControl.html
1469    /// [library level]: ../index.html#flow-control
1470    pub fn set_target_window_size(&mut self, size: u32) {
1471        assert!(size <= proto::MAX_WINDOW_SIZE);
1472        self.inner.set_target_window_size(size);
1473    }
1474
1475    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1476    /// flow control for received data.
1477    ///
1478    /// The `SETTINGS` will be sent to the remote, and only applied once the
1479    /// remote acknowledges the change.
1480    ///
1481    /// This can be used to increase or decrease the window size for existing
1482    /// streams.
1483    ///
1484    /// # Errors
1485    ///
1486    /// Returns an error if a previous call is still pending acknowledgement
1487    /// from the remote endpoint.
1488    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1489        assert!(size <= proto::MAX_WINDOW_SIZE);
1490        self.inner.set_initial_window_size(size)?;
1491        Ok(())
1492    }
1493
1494    /// Takes a `PingPong` instance from the connection.
1495    ///
1496    /// # Note
1497    ///
1498    /// This may only be called once. Calling multiple times will return `None`.
1499    pub fn ping_pong(&mut self) -> Option<PingPong> {
1500        self.inner.take_user_pings().map(PingPong::new)
1501    }
1502
1503    /// Returns the maximum number of concurrent streams that may be initiated
1504    /// by this client.
1505    ///
1506    /// This limit is configured by the server peer by sending the
1507    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1508    /// This method returns the currently acknowledged value received from the
1509    /// remote.
1510    ///
1511    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1512    pub fn max_concurrent_send_streams(&self) -> usize {
1513        self.inner.max_send_streams()
1514    }
1515    /// Returns the maximum number of concurrent streams that may be initiated
1516    /// by the server on this connection.
1517    ///
1518    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1519    /// parameter][1] sent in a `SETTINGS` frame that has been
1520    /// acknowledged by the remote peer. The value to be sent is configured by
1521    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1522    /// with the remote peer.
1523    ///
1524    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1525    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1526    pub fn max_concurrent_recv_streams(&self) -> usize {
1527        self.inner.max_recv_streams()
1528    }
1529}
1530
1531impl<T, B> Future for Connection<T, B>
1532where
1533    T: AsyncRead + AsyncWrite + Unpin,
1534    B: Buf,
1535{
1536    type Output = Result<(), crate::Error>;
1537
1538    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1539        self.inner.maybe_close_connection_if_no_streams();
1540        let had_streams_or_refs = self.inner.has_streams_or_other_references();
1541        let result = self.inner.poll(cx).map_err(Into::into);
1542        // if we had streams/refs, and don't anymore, wake up one more time to
1543        // ensure proper shutdown
1544        if result.is_pending()
1545            && had_streams_or_refs
1546            && !self.inner.has_streams_or_other_references()
1547        {
1548            tracing::trace!("last stream closed during poll, wake again");
1549            cx.waker().wake_by_ref();
1550        }
1551        result
1552    }
1553}
1554
1555impl<T, B> fmt::Debug for Connection<T, B>
1556where
1557    T: AsyncRead + AsyncWrite,
1558    T: fmt::Debug,
1559    B: fmt::Debug + Buf,
1560{
1561    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1562        fmt::Debug::fmt(&self.inner, fmt)
1563    }
1564}
1565
1566// ===== impl ResponseFuture =====
1567
1568impl Future for ResponseFuture {
1569    type Output = Result<Response<RecvStream>, crate::Error>;
1570
1571    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1572        let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1573        let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1574
1575        Poll::Ready(Ok(Response::from_parts(parts, body)))
1576    }
1577}
1578
1579impl ResponseFuture {
1580    /// Returns the stream ID of the response stream.
1581    ///
1582    /// # Panics
1583    ///
1584    /// If the lock on the stream store has been poisoned.
1585    pub fn stream_id(&self) -> crate::StreamId {
1586        crate::StreamId::from_internal(self.inner.stream_id())
1587    }
1588
1589    /// Polls for informational responses (1xx status codes).
1590    ///
1591    /// This method should be called before polling the main response future
1592    /// to check for any informational responses that have been received.
1593    ///
1594    /// Returns `Poll::Ready(Some(response))` if an informational response is available,
1595    /// `Poll::Ready(None)` if no more informational responses are expected,
1596    /// or `Poll::Pending` if no informational response is currently available.
1597    pub fn poll_informational(
1598        &mut self,
1599        cx: &mut Context<'_>,
1600    ) -> Poll<Option<Result<Response<()>, crate::Error>>> {
1601        self.inner.poll_informational(cx).map_err(Into::into)
1602    }
1603
1604    /// Returns a stream of PushPromises
1605    ///
1606    /// # Panics
1607    ///
1608    /// If this method has been called before
1609    /// or the stream was itself was pushed
1610    pub fn push_promises(&mut self) -> PushPromises {
1611        if self.push_promise_consumed {
1612            panic!("Reference to push promises stream taken!");
1613        }
1614        self.push_promise_consumed = true;
1615        PushPromises {
1616            inner: self.inner.clone(),
1617        }
1618    }
1619}
1620
1621// ===== impl PushPromises =====
1622
1623impl PushPromises {
1624    /// Get the next `PushPromise`.
1625    pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1626        crate::poll_fn(move |cx| self.poll_push_promise(cx)).await
1627    }
1628
1629    #[doc(hidden)]
1630    pub fn poll_push_promise(
1631        &mut self,
1632        cx: &mut Context<'_>,
1633    ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1634        match self.inner.poll_pushed(cx) {
1635            Poll::Ready(Some(Ok((request, response)))) => {
1636                let response = PushedResponseFuture {
1637                    inner: ResponseFuture {
1638                        inner: response,
1639                        push_promise_consumed: false,
1640                    },
1641                };
1642                Poll::Ready(Some(Ok(PushPromise { request, response })))
1643            }
1644            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1645            Poll::Ready(None) => Poll::Ready(None),
1646            Poll::Pending => Poll::Pending,
1647        }
1648    }
1649}
1650
1651#[cfg(feature = "stream")]
1652impl futures_core::Stream for PushPromises {
1653    type Item = Result<PushPromise, crate::Error>;
1654
1655    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1656        self.poll_push_promise(cx)
1657    }
1658}
1659
1660// ===== impl PushPromise =====
1661
1662impl PushPromise {
1663    /// Returns a reference to the push promise's request headers.
1664    pub fn request(&self) -> &Request<()> {
1665        &self.request
1666    }
1667
1668    /// Returns a mutable reference to the push promise's request headers.
1669    pub fn request_mut(&mut self) -> &mut Request<()> {
1670        &mut self.request
1671    }
1672
1673    /// Consumes `self`, returning the push promise's request headers and
1674    /// response future.
1675    pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1676        (self.request, self.response)
1677    }
1678}
1679
1680// ===== impl PushedResponseFuture =====
1681
1682impl Future for PushedResponseFuture {
1683    type Output = Result<Response<RecvStream>, crate::Error>;
1684
1685    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1686        Pin::new(&mut self.inner).poll(cx)
1687    }
1688}
1689
1690impl PushedResponseFuture {
1691    /// Returns the stream ID of the response stream.
1692    ///
1693    /// # Panics
1694    ///
1695    /// If the lock on the stream store has been poisoned.
1696    pub fn stream_id(&self) -> crate::StreamId {
1697        self.inner.stream_id()
1698    }
1699}
1700
1701// ===== impl Peer =====
1702
1703impl Peer {
1704    pub fn convert_send_message(
1705        id: StreamId,
1706        request: Request<()>,
1707        protocol: Option<Protocol>,
1708        end_of_stream: bool,
1709        pseudo_order: Option<PseudoOrder>,
1710        headers_stream_dependency: Option<StreamDependency>,
1711    ) -> Result<Headers, SendError> {
1712        use http::request::Parts;
1713
1714        let (
1715            Parts {
1716                method,
1717                uri,
1718                headers,
1719                version,
1720                ..
1721            },
1722            _,
1723        ) = request.into_parts();
1724
1725        let is_connect = method == Method::CONNECT;
1726
1727        // Build the set pseudo header set. All requests will include `method`
1728        // and `path`.
1729        let mut pseudo = Pseudo::request(method, uri, protocol);
1730
1731        // If the pseudo order is set, then set the pseudo order
1732        if let Some(pseudo_order) = pseudo_order {
1733            pseudo.set_pseudo_order(pseudo_order);
1734        }
1735
1736        if pseudo.scheme.is_none() {
1737            // If the scheme is not set, then there are a two options.
1738            //
1739            // 1) Authority is not set. In this case, a request was issued with
1740            //    a relative URI. This is permitted **only** when forwarding
1741            //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1742            //    this is an error.
1743            //
1744            // 2) Authority is set, then the HTTP method *must* be CONNECT.
1745            //
1746            // It is not possible to have a scheme but not an authority set (the
1747            // `http` crate does not allow it).
1748            //
1749            if pseudo.authority.is_none() {
1750                if version == Version::HTTP_2 {
1751                    return Err(UserError::MissingUriSchemeAndAuthority.into());
1752                } else {
1753                    // This is acceptable as per the above comment. However,
1754                    // HTTP/2 requires that a scheme is set. Since we are
1755                    // forwarding an HTTP 1.1 request, the scheme is set to
1756                    // "http".
1757                    pseudo.set_scheme(uri::Scheme::HTTP);
1758                }
1759            } else if !is_connect {
1760                // TODO: Error
1761            }
1762        }
1763
1764        // Create the HEADERS frame
1765        let mut headers_frame = Headers::new(id, pseudo, headers);
1766        if let Some(stream_dep) = headers_stream_dependency {
1767            headers_frame.set_stream_dependency(stream_dep);
1768        }
1769
1770        if end_of_stream {
1771            headers_frame.set_end_stream()
1772        }
1773
1774        Ok(headers_frame)
1775    }
1776}
1777
1778impl proto::Peer for Peer {
1779    type Poll = Response<()>;
1780
1781    #[cfg(feature = "tracing")]
1782    const NAME: &'static str = "Client";
1783
1784    fn r#dyn() -> proto::DynPeer {
1785        proto::DynPeer::Client
1786    }
1787
1788    /*
1789    fn is_server() -> bool {
1790        false
1791    }
1792    */
1793
1794    fn convert_poll_message(
1795        pseudo: Pseudo,
1796        fields: HeaderMap,
1797        stream_id: StreamId,
1798    ) -> Result<Self::Poll, Error> {
1799        let mut b = Response::builder();
1800
1801        b = b.version(Version::HTTP_2);
1802
1803        if let Some(status) = pseudo.status {
1804            b = b.status(status);
1805        }
1806
1807        let mut response = match b.body(()) {
1808            Ok(response) => response,
1809            Err(_) => {
1810                // TODO: Should there be more specialized handling for different
1811                // kinds of errors
1812                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1813            }
1814        };
1815
1816        *response.headers_mut() = fields;
1817
1818        Ok(response)
1819    }
1820}