Skip to main content

h2/
client.rs

1//! Client implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 client requires the caller to establish the underlying
6//! connection as well as get the connection to a state that is ready to begin
7//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12//!
13//! Once a connection is obtained, it is passed to [`handshake`], which will
14//! begin the [HTTP/2 handshake]. This returns a future that completes once
15//! the handshake process is performed and HTTP/2 streams may be initialized.
16//!
17//! [`handshake`] uses default configuration values. There are a number of
18//! settings that can be changed by using [`Builder`] instead.
19//!
20//! Once the handshake future completes, the caller is provided with a
21//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22//! instance is used to drive the connection (see [Managing the connection]).
23//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24//! requests]).
25//!
26//! # Making requests
27//!
28//! Requests are made using the [`SendRequest`] handle provided by the handshake
29//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30//! the request is sent to the server.
31//!
32//! A request body and request trailers are sent using [`SendRequest`] and the
33//! server's response is returned once the [`ResponseFuture`] future completes.
34//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36//! initialized by the sent request.
37//!
38//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39//! stream can be created, i.e. as long as the current number of active streams
40//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41//! caller will be notified once an existing stream closes, freeing capacity for
42//! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43//! capacity before sending a request to the server.
44//!
45//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47//! to do so will result in an [`Error`] being returned.
48//!
49//! # Managing the connection
50//!
51//! The [`Connection`] instance is used to manage connection state. The caller
52//! is required to call [`Connection::poll`] in order to advance state.
53//! [`SendRequest::send_request`] and other functions have no effect unless
54//! [`Connection::poll`] is called.
55//!
56//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57//! returns `Ready`. At this point, the underlying socket has been closed and no
58//! further work needs to be done.
59//!
60//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61//! submit the [`Connection`] instance to an [executor]. The executor will then
62//! manage polling the connection until the connection is complete.
63//! Alternatively, the caller can call `poll` manually.
64//!
65//! # Example
66//!
67//! ```rust, no_run
68//!
69//! use h2::client;
70//!
71//! use http::{Request, Method};
72//! use std::error::Error;
73//! use tokio::net::TcpStream;
74//!
75//! #[tokio::main]
76//! pub async fn main() -> Result<(), Box<dyn Error>> {
77//!     // Establish TCP connection to the server.
78//!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79//!     let (h2, connection) = client::handshake(tcp).await?;
80//!     tokio::spawn(async move {
81//!         connection.await.unwrap();
82//!     });
83//!
84//!     let mut h2 = h2.ready().await?;
85//!     // Prepare the HTTP request to send to the server.
86//!     let request = Request::builder()
87//!                     .method(Method::GET)
88//!                     .uri("https://www.example.com/")
89//!                     .body(())
90//!                     .unwrap();
91//!
92//!     // Send the request. The second tuple item allows the caller
93//!     // to stream a request body.
94//!     let (response, _) = h2.send_request(request, true).unwrap();
95//!
96//!     let (head, mut body) = response.await?.into_parts();
97//!
98//!     println!("Received response: {:?}", head);
99//!
100//!     // The `flow_control` handle allows the caller to manage
101//!     // flow control.
102//!     //
103//!     // Whenever data is received, the caller is responsible for
104//!     // releasing capacity back to the server once it has freed
105//!     // the data from memory.
106//!     let mut flow_control = body.flow_control().clone();
107//!
108//!     while let Some(chunk) = body.data().await {
109//!         let chunk = chunk?;
110//!         println!("RX: {:?}", chunk);
111//!
112//!         // Let the server send more data.
113//!         let _ = flow_control.release_capacity(chunk.len());
114//!     }
115//!
116//!     Ok(())
117//! }
118//! ```
119//!
120//! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121//! [`handshake`]: fn.handshake.html
122//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123//! [`SendRequest`]: struct.SendRequest.html
124//! [`SendStream`]: ../struct.SendStream.html
125//! [Making requests]: #making-requests
126//! [Managing the connection]: #managing-the-connection
127//! [`Connection`]: struct.Connection.html
128//! [`Connection::poll`]: struct.Connection.html#method.poll
129//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131//! [`SendRequest`]: struct.SendRequest.html
132//! [`ResponseFuture`]: struct.ResponseFuture.html
133//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135//! [`Builder`]: struct.Builder.html
136//! [`Error`]: ../struct.Error.html
137
138use crate::codec::{Codec, SendError, UserError};
139use crate::ext::Protocol;
140use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId, WindowUpdate};
141use crate::proto::{self, Error};
142use crate::{FlowControl, PingPong, RecvStream, SendStream};
143
144use bytes::{Buf, Bytes};
145use http::{uri, HeaderMap, Method, Request, Response, Version};
146use std::fmt;
147use std::future::Future;
148use std::pin::Pin;
149use std::task::{Context, Poll};
150use std::time::Duration;
151use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
152use tracing::Instrument;
153
154/// Initializes new HTTP/2 streams on a connection by sending a request.
155///
156/// This type does no work itself. Instead, it is a handle to the inner
157/// connection state held by [`Connection`]. If the associated connection
158/// instance is dropped, all `SendRequest` functions will return [`Error`].
159///
160/// [`SendRequest`] instances are able to move to and operate on separate tasks
161/// / threads than their associated [`Connection`] instance. Internally, there
162/// is a buffer used to stage requests before they get written to the
163/// connection. There is no guarantee that requests get written to the
164/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
165///
166/// [`SendRequest`] implements [`Clone`], enabling the creation of many
167/// instances that are backed by a single connection.
168///
169/// See [module] level documentation for more details.
170///
171/// [module]: index.html
172/// [`Connection`]: struct.Connection.html
173/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
174/// [`Error`]: ../struct.Error.html
175pub struct SendRequest<B: Buf> {
176    inner: proto::Streams<B, Peer>,
177    pending: Option<proto::OpaqueStreamRef>,
178}
179
180/// Returns a `SendRequest` instance once it is ready to send at least one
181/// request.
182#[derive(Debug)]
183pub struct ReadySendRequest<B: Buf> {
184    inner: Option<SendRequest<B>>,
185}
186
187/// Manages all state associated with an HTTP/2 client connection.
188///
189/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
190/// implements the HTTP/2 client logic for that connection. It is responsible
191/// for driving the internal state forward, performing the work requested of the
192/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
193/// [`RecvStream`]).
194///
195/// `Connection` values are created by calling [`handshake`]. Once a
196/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
197/// until `Ready` is returned. The easiest way to do this is to submit the
198/// `Connection` instance to an [executor].
199///
200/// [module]: index.html
201/// [`handshake`]: fn.handshake.html
202/// [`SendRequest`]: struct.SendRequest.html
203/// [`ResponseFuture`]: struct.ResponseFuture.html
204/// [`SendStream`]: ../struct.SendStream.html
205/// [`RecvStream`]: ../struct.RecvStream.html
206/// [`poll`]: #method.poll
207/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
208///
209/// # Examples
210///
211/// ```
212/// # use tokio::io::{AsyncRead, AsyncWrite};
213/// # use h2::client;
214/// # use h2::client::*;
215/// #
216/// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
217/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
218/// # {
219///     let (send_request, connection) = client::handshake(my_io).await?;
220///     // Submit the connection handle to an executor.
221///     tokio::spawn(async { connection.await.expect("connection failed"); });
222///
223///     // Now, use `send_request` to initialize HTTP/2 streams.
224///     // ...
225/// # Ok(())
226/// # }
227/// #
228/// # pub fn main() {}
229/// ```
230#[must_use = "futures do nothing unless polled"]
231pub struct Connection<T, B: Buf = Bytes> {
232    inner: proto::Connection<T, Peer, B>,
233}
234
235/// A future of an HTTP response.
236#[derive(Debug)]
237#[must_use = "futures do nothing unless polled"]
238pub struct ResponseFuture {
239    inner: proto::OpaqueStreamRef,
240    push_promise_consumed: bool,
241}
242
243/// A future of a pushed HTTP response.
244///
245/// We have to differentiate between pushed and non pushed because of the spec
246/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
247/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
248/// > that is in either the "open" or "half-closed (remote)" state.
249#[derive(Debug)]
250#[must_use = "futures do nothing unless polled"]
251pub struct PushedResponseFuture {
252    inner: ResponseFuture,
253}
254
255/// A pushed response and corresponding request headers
256#[derive(Debug)]
257pub struct PushPromise {
258    /// The request headers
259    request: Request<()>,
260
261    /// The pushed response
262    response: PushedResponseFuture,
263}
264
265/// A stream of pushed responses and corresponding promised requests
266#[derive(Debug)]
267#[must_use = "streams do nothing unless polled"]
268pub struct PushPromises {
269    inner: proto::OpaqueStreamRef,
270}
271
272/// Builds client connections with custom configuration values.
273///
274/// Methods can be chained in order to set the configuration values.
275///
276/// The client is constructed by calling [`handshake`] and passing the I/O
277/// handle that will back the HTTP/2 server.
278///
279/// New instances of `Builder` are obtained via [`Builder::new`].
280///
281/// See function level documentation for details on the various client
282/// configuration settings.
283///
284/// [`Builder::new`]: struct.Builder.html#method.new
285/// [`handshake`]: struct.Builder.html#method.handshake
286///
287/// # Examples
288///
289/// ```
290/// # use tokio::io::{AsyncRead, AsyncWrite};
291/// # use h2::client::*;
292/// # use bytes::Bytes;
293/// #
294/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
295///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
296/// # {
297/// // `client_fut` is a future representing the completion of the HTTP/2
298/// // handshake.
299/// let client_fut = Builder::new()
300///     .initial_window_size(1_000_000)
301///     .max_concurrent_streams(1000)
302///     .handshake(my_io);
303/// # client_fut.await
304/// # }
305/// #
306/// # pub fn main() {}
307/// ```
308#[derive(Clone, Debug)]
309pub struct Builder {
310    /// Time to keep locally reset streams around before reaping.
311    reset_stream_duration: Duration,
312
313    /// Initial maximum number of locally initiated (send) streams.
314    /// After receiving a SETTINGS frame from the remote peer,
315    /// the connection will overwrite this value with the
316    /// MAX_CONCURRENT_STREAMS specified in the frame.
317    /// If no value is advertised by the remote peer in the initial SETTINGS
318    /// frame, it will be set to usize::MAX.
319    initial_max_send_streams: usize,
320
321    /// Initial target window size for new connections.
322    initial_target_connection_window_size: Option<u32>,
323
324    /// Extra receive window capacity to add to new streams.
325    ///
326    /// When set, after creating a new locally-initiated stream, a WINDOW_UPDATE
327    /// frame will be sent to increase the stream's receive window by this amount.
328    initial_stream_window_increment: Option<u32>,
329
330    /// Maximum amount of bytes to "buffer" for writing per stream.
331    max_send_buffer_size: usize,
332
333    /// Maximum number of locally reset streams to keep at a time.
334    reset_stream_max: usize,
335
336    /// Maximum number of remotely reset streams to allow in the pending
337    /// accept queue.
338    pending_accept_reset_stream_max: usize,
339
340    /// Initial `Settings` frame to send as part of the handshake.
341    settings: Settings,
342
343    /// The stream ID of the first (lowest) stream. Subsequent streams will use
344    /// monotonically increasing stream IDs.
345    stream_id: StreamId,
346
347    /// Maximum number of locally reset streams due to protocol error across
348    /// the lifetime of the connection.
349    ///
350    /// When this gets exceeded, we issue GOAWAYs.
351    local_max_error_reset_streams: Option<usize>,
352
353    /// The headers frame pseudo order
354    headers_pseudo_order: Option<crate::frame::PseudoOrder>,
355
356    /// Whether to include PRIORITY flag in HEADERS frames and its parameters.
357    /// None = no PRIORITY, Some((weight, dep, exclusive)) = PRIORITY with those values.
358    headers_priority: Option<(u8, u32, bool)>,
359
360    /// Optional ordering for HTTP/2 regular headers (for browser fingerprinting).
361    /// When set, headers are encoded in this order instead of hash-based order.
362    headers_order: Option<Vec<http::HeaderName>>,
363}
364
365#[derive(Debug)]
366pub(crate) struct Peer;
367
368// ===== impl SendRequest =====
369
370impl<B> SendRequest<B>
371where
372    B: Buf,
373{
374    /// Returns `Ready` when the connection can initialize a new HTTP/2
375    /// stream.
376    ///
377    /// This function must return `Ready` before `send_request` is called. When
378    /// `Poll::Pending` is returned, the task will be notified once the readiness
379    /// state changes.
380    ///
381    /// See [module] level docs for more details.
382    ///
383    /// [module]: index.html
384    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
385        ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
386        self.pending = None;
387        Poll::Ready(Ok(()))
388    }
389
390    /// Consumes `self`, returning a future that returns `self` back once it is
391    /// ready to send a request.
392    ///
393    /// This function should be called before calling `send_request`.
394    ///
395    /// This is a functional combinator for [`poll_ready`]. The returned future
396    /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
397    /// the caller.
398    ///
399    /// # Examples
400    ///
401    /// ```rust
402    /// # use h2::client::*;
403    /// # use http::*;
404    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
405    /// # {
406    /// // First, wait until the `send_request` handle is ready to send a new
407    /// // request
408    /// let mut send_request = send_request.ready().await.unwrap();
409    /// // Use `send_request` here.
410    /// # }
411    /// # pub fn main() {}
412    /// ```
413    ///
414    /// See [module] level docs for more details.
415    ///
416    /// [`poll_ready`]: #method.poll_ready
417    /// [module]: index.html
418    pub fn ready(self) -> ReadySendRequest<B> {
419        ReadySendRequest { inner: Some(self) }
420    }
421
422    /// Sends a HTTP/2 request to the server.
423    ///
424    /// `send_request` initializes a new HTTP/2 stream on the associated
425    /// connection, then sends the given request using this new stream. Only the
426    /// request head is sent.
427    ///
428    /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
429    /// are returned. The [`ResponseFuture`] instance is used to get the
430    /// server's response and the [`SendStream`] instance is used to send a
431    /// request body or trailers to the server over the same HTTP/2 stream.
432    ///
433    /// To send a request body or trailers, set `end_of_stream` to `false`.
434    /// Then, use the returned [`SendStream`] instance to stream request body
435    /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
436    /// then attempting to call [`SendStream::send_data`] or
437    /// [`SendStream::send_trailers`] will result in an error.
438    ///
439    /// If no request body or trailers are to be sent, set `end_of_stream` to
440    /// `true` and drop the returned [`SendStream`] instance.
441    ///
442    /// # A note on HTTP versions
443    ///
444    /// The provided `Request` will be encoded differently depending on the
445    /// value of its version field. If the version is set to 2.0, then the
446    /// request is encoded as per the specification recommends.
447    ///
448    /// If the version is set to a lower value, then the request is encoded to
449    /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
450    /// headers are permitted and the `:authority` pseudo header is not
451    /// included.
452    ///
453    /// The caller should always set the request's version field to 2.0 unless
454    /// specifically transmitting an HTTP 1.1 request over 2.0.
455    ///
456    /// # Examples
457    ///
458    /// Sending a request with no body
459    ///
460    /// ```rust
461    /// # use h2::client::*;
462    /// # use http::*;
463    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
464    /// # {
465    /// // First, wait until the `send_request` handle is ready to send a new
466    /// // request
467    /// let mut send_request = send_request.ready().await.unwrap();
468    /// // Prepare the HTTP request to send to the server.
469    /// let request = Request::get("https://www.example.com/")
470    ///     .body(())
471    ///     .unwrap();
472    ///
473    /// // Send the request to the server. Since we are not sending a
474    /// // body or trailers, we can drop the `SendStream` instance.
475    /// let (response, _) = send_request.send_request(request, true).unwrap();
476    /// let response = response.await.unwrap();
477    /// // Process the response
478    /// # }
479    /// # pub fn main() {}
480    /// ```
481    ///
482    /// Sending a request with a body and trailers
483    ///
484    /// ```rust
485    /// # use h2::client::*;
486    /// # use http::*;
487    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
488    /// # {
489    /// // First, wait until the `send_request` handle is ready to send a new
490    /// // request
491    /// let mut send_request = send_request.ready().await.unwrap();
492    ///
493    /// // Prepare the HTTP request to send to the server.
494    /// let request = Request::get("https://www.example.com/")
495    ///     .body(())
496    ///     .unwrap();
497    ///
498    /// // Send the request to the server. If we are not sending a
499    /// // body or trailers, we can drop the `SendStream` instance.
500    /// let (response, mut send_stream) = send_request
501    ///     .send_request(request, false).unwrap();
502    ///
503    /// // At this point, one option would be to wait for send capacity.
504    /// // Doing so would allow us to not hold data in memory that
505    /// // cannot be sent. However, this is not a requirement, so this
506    /// // example will skip that step. See `SendStream` documentation
507    /// // for more details.
508    /// send_stream.send_data(b"hello", false).unwrap();
509    /// send_stream.send_data(b"world", false).unwrap();
510    ///
511    /// // Send the trailers.
512    /// let mut trailers = HeaderMap::new();
513    /// trailers.insert(
514    ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
515    ///     header::HeaderValue::from_bytes(b"hello").unwrap());
516    ///
517    /// send_stream.send_trailers(trailers).unwrap();
518    ///
519    /// let response = response.await.unwrap();
520    /// // Process the response
521    /// # }
522    /// # pub fn main() {}
523    /// ```
524    ///
525    /// [`ResponseFuture`]: struct.ResponseFuture.html
526    /// [`SendStream`]: ../struct.SendStream.html
527    /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
528    /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
529    pub fn send_request(
530        &mut self,
531        request: Request<()>,
532        end_of_stream: bool,
533    ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
534        self.inner
535            .send_request(request, end_of_stream, self.pending.as_ref())
536            .map_err(Into::into)
537            .map(|(stream, is_full)| {
538                if stream.is_pending_open() && is_full {
539                    // Only prevent sending another request when the request queue
540                    // is not full.
541                    self.pending = Some(stream.clone_to_opaque());
542                }
543
544                let response = ResponseFuture {
545                    inner: stream.clone_to_opaque(),
546                    push_promise_consumed: false,
547                };
548
549                let stream = SendStream::new(stream);
550
551                (response, stream)
552            })
553    }
554
555    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
556    ///
557    /// This setting is configured by the server peer by sending the
558    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
559    /// This method returns the currently acknowledged value received from the
560    /// remote.
561    ///
562    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
563    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
564    pub fn is_extended_connect_protocol_enabled(&self) -> bool {
565        self.inner.is_extended_connect_protocol_enabled()
566    }
567
568    /// Returns the current max send streams
569    pub fn current_max_send_streams(&self) -> usize {
570        self.inner.current_max_send_streams()
571    }
572
573    /// Returns the current max recv streams
574    pub fn current_max_recv_streams(&self) -> usize {
575        self.inner.current_max_recv_streams()
576    }
577}
578
579impl<B> fmt::Debug for SendRequest<B>
580where
581    B: Buf,
582{
583    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
584        fmt.debug_struct("SendRequest").finish()
585    }
586}
587
588impl<B> Clone for SendRequest<B>
589where
590    B: Buf,
591{
592    fn clone(&self) -> Self {
593        SendRequest {
594            inner: self.inner.clone(),
595            pending: None,
596        }
597    }
598}
599
600#[cfg(feature = "unstable")]
601impl<B> SendRequest<B>
602where
603    B: Buf,
604{
605    /// Returns the number of active streams.
606    ///
607    /// An active stream is a stream that has not yet transitioned to a closed
608    /// state.
609    pub fn num_active_streams(&self) -> usize {
610        self.inner.num_active_streams()
611    }
612
613    /// Returns the number of streams that are held in memory.
614    ///
615    /// A wired stream is a stream that is either active or is closed but must
616    /// stay in memory for some reason. For example, there are still outstanding
617    /// userspace handles pointing to the slot.
618    pub fn num_wired_streams(&self) -> usize {
619        self.inner.num_wired_streams()
620    }
621}
622
623// ===== impl ReadySendRequest =====
624
625impl<B> Future for ReadySendRequest<B>
626where
627    B: Buf,
628{
629    type Output = Result<SendRequest<B>, crate::Error>;
630
631    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
632        match &mut self.inner {
633            Some(send_request) => {
634                ready!(send_request.poll_ready(cx))?;
635            }
636            None => panic!("called `poll` after future completed"),
637        }
638
639        Poll::Ready(Ok(self.inner.take().unwrap()))
640    }
641}
642
643// ===== impl Builder =====
644
645impl Builder {
646    /// Returns a new client builder instance initialized with default
647    /// configuration values.
648    ///
649    /// Configuration methods can be chained on the return value.
650    ///
651    /// # Examples
652    ///
653    /// ```
654    /// # use tokio::io::{AsyncRead, AsyncWrite};
655    /// # use h2::client::*;
656    /// # use bytes::Bytes;
657    /// #
658    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
659    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
660    /// # {
661    /// // `client_fut` is a future representing the completion of the HTTP/2
662    /// // handshake.
663    /// let client_fut = Builder::new()
664    ///     .initial_window_size(1_000_000)
665    ///     .max_concurrent_streams(1000)
666    ///     .handshake(my_io);
667    /// # client_fut.await
668    /// # }
669    /// #
670    /// # pub fn main() {}
671    /// ```
672    pub fn new() -> Builder {
673        Builder {
674            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
675            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
676            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
677            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
678            initial_target_connection_window_size: None,
679            initial_stream_window_increment: None,
680            initial_max_send_streams: usize::MAX,
681            settings: Default::default(),
682            stream_id: 1.into(),
683            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
684            headers_pseudo_order: None,
685            headers_priority: None,
686            headers_order: None,
687        }
688    }
689
690    /// Indicates the initial window size (in octets) for stream-level
691    /// flow control for received data.
692    ///
693    /// The initial window of a stream is used as part of flow control. For more
694    /// details, see [`FlowControl`].
695    ///
696    /// The default value is 65,535.
697    ///
698    /// [`FlowControl`]: ../struct.FlowControl.html
699    ///
700    /// # Examples
701    ///
702    /// ```
703    /// # use tokio::io::{AsyncRead, AsyncWrite};
704    /// # use h2::client::*;
705    /// # use bytes::Bytes;
706    /// #
707    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
708    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
709    /// # {
710    /// // `client_fut` is a future representing the completion of the HTTP/2
711    /// // handshake.
712    /// let client_fut = Builder::new()
713    ///     .initial_window_size(1_000_000)
714    ///     .handshake(my_io);
715    /// # client_fut.await
716    /// # }
717    /// #
718    /// # pub fn main() {}
719    /// ```
720    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
721        self.settings.set_initial_window_size(Some(size));
722        self
723    }
724
725    /// Indicates the initial window size (in octets) for connection-level flow control
726    /// for received data.
727    ///
728    /// The initial window of a connection is used as part of flow control. For more details,
729    /// see [`FlowControl`].
730    ///
731    /// The default value is 65,535.
732    ///
733    /// [`FlowControl`]: ../struct.FlowControl.html
734    ///
735    /// # Examples
736    ///
737    /// ```
738    /// # use tokio::io::{AsyncRead, AsyncWrite};
739    /// # use h2::client::*;
740    /// # use bytes::Bytes;
741    /// #
742    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
743    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
744    /// # {
745    /// // `client_fut` is a future representing the completion of the HTTP/2
746    /// // handshake.
747    /// let client_fut = Builder::new()
748    ///     .initial_connection_window_size(1_000_000)
749    ///     .handshake(my_io);
750    /// # client_fut.await
751    /// # }
752    /// #
753    /// # pub fn main() {}
754    /// ```
755    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
756        self.initial_target_connection_window_size = Some(size);
757        self
758    }
759
760    /// Sets extra receive window capacity to add to new locally-initiated streams.
761    ///
762    /// When set, after creating a new stream, a `WINDOW_UPDATE` frame will be
763    /// sent to increase the stream's receive window by this amount.
764    ///
765    /// This is used for browser fingerprinting (e.g. Firefox sends a stream-level
766    /// WINDOW_UPDATE of 12451840 on the first stream).
767    ///
768    /// # Default
769    ///
770    /// Default is `None`.
771    pub fn initial_stream_window_size_increment(&mut self, size: u32) -> &mut Self {
772        self.initial_stream_window_increment = Some(size);
773        self
774    }
775
776    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
777    /// configured client is able to accept.
778    ///
779    /// The sender may send data frames that are **smaller** than this value,
780    /// but any data larger than `max` will be broken up into multiple `DATA`
781    /// frames.
782    ///
783    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
784    ///
785    /// # Examples
786    ///
787    /// ```
788    /// # use tokio::io::{AsyncRead, AsyncWrite};
789    /// # use h2::client::*;
790    /// # use bytes::Bytes;
791    /// #
792    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
793    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
794    /// # {
795    /// // `client_fut` is a future representing the completion of the HTTP/2
796    /// // handshake.
797    /// let client_fut = Builder::new()
798    ///     .max_frame_size(1_000_000)
799    ///     .handshake(my_io);
800    /// # client_fut.await
801    /// # }
802    /// #
803    /// # pub fn main() {}
804    /// ```
805    ///
806    /// # Panics
807    ///
808    /// This function panics if `max` is not within the legal range specified
809    /// above.
810    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
811        self.settings.set_max_frame_size(Some(max));
812        self
813    }
814
815    /// Sets the max size of received header frames.
816    ///
817    /// This advisory setting informs a peer of the maximum size of header list
818    /// that the sender is prepared to accept, in octets. The value is based on
819    /// the uncompressed size of header fields, including the length of the name
820    /// and value in octets plus an overhead of 32 octets for each header field.
821    ///
822    /// This setting is also used to limit the maximum amount of data that is
823    /// buffered to decode HEADERS frames.
824    ///
825    /// # Examples
826    ///
827    /// ```
828    /// # use tokio::io::{AsyncRead, AsyncWrite};
829    /// # use h2::client::*;
830    /// # use bytes::Bytes;
831    /// #
832    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
833    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
834    /// # {
835    /// // `client_fut` is a future representing the completion of the HTTP/2
836    /// // handshake.
837    /// let client_fut = Builder::new()
838    ///     .max_header_list_size(16 * 1024)
839    ///     .handshake(my_io);
840    /// # client_fut.await
841    /// # }
842    /// #
843    /// # pub fn main() {}
844    /// ```
845    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
846        self.settings.set_max_header_list_size(Some(max));
847        self
848    }
849
850    /// Sets the maximum number of concurrent streams.
851    ///
852    /// The maximum concurrent streams setting only controls the maximum number
853    /// of streams that can be initiated by the remote peer. In other words,
854    /// when this setting is set to 100, this does not limit the number of
855    /// concurrent streams that can be created by the caller.
856    ///
857    /// It is recommended that this value be no smaller than 100, so as to not
858    /// unnecessarily limit parallelism. However, any value is legal, including
859    /// 0. If `max` is set to 0, then the remote will not be permitted to
860    /// initiate streams.
861    ///
862    /// Note that streams in the reserved state, i.e., push promises that have
863    /// been reserved but the stream has not started, do not count against this
864    /// setting.
865    ///
866    /// Also note that if the remote *does* exceed the value set here, it is not
867    /// a protocol level error. Instead, the `h2` library will immediately reset
868    /// the stream.
869    ///
870    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
871    ///
872    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
873    ///
874    /// # Examples
875    ///
876    /// ```
877    /// # use tokio::io::{AsyncRead, AsyncWrite};
878    /// # use h2::client::*;
879    /// # use bytes::Bytes;
880    /// #
881    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
882    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
883    /// # {
884    /// // `client_fut` is a future representing the completion of the HTTP/2
885    /// // handshake.
886    /// let client_fut = Builder::new()
887    ///     .max_concurrent_streams(1000)
888    ///     .handshake(my_io);
889    /// # client_fut.await
890    /// # }
891    /// #
892    /// # pub fn main() {}
893    /// ```
894    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
895        self.settings.set_max_concurrent_streams(Some(max));
896        self
897    }
898
899    /// Sets the initial maximum of locally initiated (send) streams.
900    ///
901    /// The initial settings will be overwritten by the remote peer when
902    /// the SETTINGS frame is received. The new value will be set to the
903    /// `max_concurrent_streams()` from the frame. If no value is advertised in
904    /// the initial SETTINGS frame from the remote peer as part of
905    /// [HTTP/2 Connection Preface], `usize::MAX` will be set.
906    ///
907    /// This setting prevents the caller from exceeding this number of
908    /// streams that are counted towards the concurrency limit.
909    ///
910    /// Sending streams past the limit returned by the peer will be treated
911    /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
912    ///
913    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
914    ///
915    /// The default value is `usize::MAX`.
916    ///
917    /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
918    /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
919    ///
920    /// # Examples
921    ///
922    /// ```
923    /// # use tokio::io::{AsyncRead, AsyncWrite};
924    /// # use h2::client::*;
925    /// # use bytes::Bytes;
926    /// #
927    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
928    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
929    /// # {
930    /// // `client_fut` is a future representing the completion of the HTTP/2
931    /// // handshake.
932    /// let client_fut = Builder::new()
933    ///     .initial_max_send_streams(1000)
934    ///     .handshake(my_io);
935    /// # client_fut.await
936    /// # }
937    /// #
938    /// # pub fn main() {}
939    /// ```
940    pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
941        self.initial_max_send_streams = initial;
942        self
943    }
944
945    /// Sets the maximum number of concurrent locally reset streams.
946    ///
947    /// When a stream is explicitly reset, the HTTP/2 specification requires
948    /// that any further frames received for that stream must be ignored for
949    /// "some time".
950    ///
951    /// In order to satisfy the specification, internal state must be maintained
952    /// to implement the behavior. This state grows linearly with the number of
953    /// streams that are locally reset.
954    ///
955    /// The `max_concurrent_reset_streams` setting configures sets an upper
956    /// bound on the amount of state that is maintained. When this max value is
957    /// reached, the oldest reset stream is purged from memory.
958    ///
959    /// Once the stream has been fully purged from memory, any additional frames
960    /// received for that stream will result in a connection level protocol
961    /// error, forcing the connection to terminate.
962    ///
963    /// The default value is currently 50.
964    ///
965    /// # Examples
966    ///
967    /// ```
968    /// # use tokio::io::{AsyncRead, AsyncWrite};
969    /// # use h2::client::*;
970    /// # use bytes::Bytes;
971    /// #
972    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
973    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
974    /// # {
975    /// // `client_fut` is a future representing the completion of the HTTP/2
976    /// // handshake.
977    /// let client_fut = Builder::new()
978    ///     .max_concurrent_reset_streams(1000)
979    ///     .handshake(my_io);
980    /// # client_fut.await
981    /// # }
982    /// #
983    /// # pub fn main() {}
984    /// ```
985    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
986        self.reset_stream_max = max;
987        self
988    }
989
990    /// Sets the duration to remember locally reset streams.
991    ///
992    /// When a stream is explicitly reset, the HTTP/2 specification requires
993    /// that any further frames received for that stream must be ignored for
994    /// "some time".
995    ///
996    /// In order to satisfy the specification, internal state must be maintained
997    /// to implement the behavior. This state grows linearly with the number of
998    /// streams that are locally reset.
999    ///
1000    /// The `reset_stream_duration` setting configures the max amount of time
1001    /// this state will be maintained in memory. Once the duration elapses, the
1002    /// stream state is purged from memory.
1003    ///
1004    /// Once the stream has been fully purged from memory, any additional frames
1005    /// received for that stream will result in a connection level protocol
1006    /// error, forcing the connection to terminate.
1007    ///
1008    /// The default value is currently 1 second.
1009    ///
1010    /// # Examples
1011    ///
1012    /// ```
1013    /// # use tokio::io::{AsyncRead, AsyncWrite};
1014    /// # use h2::client::*;
1015    /// # use std::time::Duration;
1016    /// # use bytes::Bytes;
1017    /// #
1018    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1019    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1020    /// # {
1021    /// // `client_fut` is a future representing the completion of the HTTP/2
1022    /// // handshake.
1023    /// let client_fut = Builder::new()
1024    ///     .reset_stream_duration(Duration::from_secs(10))
1025    ///     .handshake(my_io);
1026    /// # client_fut.await
1027    /// # }
1028    /// #
1029    /// # pub fn main() {}
1030    /// ```
1031    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1032        self.reset_stream_duration = dur;
1033        self
1034    }
1035
1036    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
1037    ///
1038    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
1039    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
1040    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
1041    ///
1042    /// When the number of local resets exceeds this threshold, the client will close the connection.
1043    ///
1044    /// If you really want to disable this, supply [`Option::None`] here.
1045    /// Disabling this is not recommended and may expose you to DOS attacks.
1046    ///
1047    /// The default value is currently 1024, but could change.
1048    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
1049        self.local_max_error_reset_streams = max;
1050        self
1051    }
1052
1053    /// Sets the maximum number of pending-accept remotely-reset streams.
1054    ///
1055    /// Streams that have been received by the peer, but not accepted by the
1056    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
1057    /// could send a request and then shortly after, realize it is not needed,
1058    /// sending a CANCEL.
1059    ///
1060    /// However, since those streams are now "closed", they don't count towards
1061    /// the max concurrent streams. So, they will sit in the accept queue,
1062    /// using memory.
1063    ///
1064    /// When the number of remotely-reset streams sitting in the pending-accept
1065    /// queue reaches this maximum value, a connection error with the code of
1066    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
1067    /// `Future`.
1068    ///
1069    /// The default value is currently 20, but could change.
1070    ///
1071    /// # Examples
1072    ///
1073    /// ```
1074    /// # use tokio::io::{AsyncRead, AsyncWrite};
1075    /// # use h2::client::*;
1076    /// # use bytes::Bytes;
1077    /// #
1078    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1079    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1080    /// # {
1081    /// // `client_fut` is a future representing the completion of the HTTP/2
1082    /// // handshake.
1083    /// let client_fut = Builder::new()
1084    ///     .max_pending_accept_reset_streams(100)
1085    ///     .handshake(my_io);
1086    /// # client_fut.await
1087    /// # }
1088    /// #
1089    /// # pub fn main() {}
1090    /// ```
1091    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1092        self.pending_accept_reset_stream_max = max;
1093        self
1094    }
1095
1096    /// Sets the maximum send buffer size per stream.
1097    ///
1098    /// Once a stream has buffered up to (or over) the maximum, the stream's
1099    /// flow control will not "poll" additional capacity. Once bytes for the
1100    /// stream have been written to the connection, the send buffer capacity
1101    /// will be freed up again.
1102    ///
1103    /// The default is currently ~400KB, but may change.
1104    ///
1105    /// # Panics
1106    ///
1107    /// This function panics if `max` is larger than `u32::MAX`.
1108    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1109        assert!(max <= u32::MAX as usize);
1110        self.max_send_buffer_size = max;
1111        self
1112    }
1113
1114    /// Enables or disables server push promises.
1115    ///
1116    /// This value is included in the initial SETTINGS handshake.
1117    /// Setting this value to value to
1118    /// false in the initial SETTINGS handshake guarantees that the remote server
1119    /// will never send a push promise.
1120    ///
1121    /// This setting can be changed during the life of a single HTTP/2
1122    /// connection by sending another settings frame updating the value.
1123    ///
1124    /// Default value: `true`.
1125    ///
1126    /// # Examples
1127    ///
1128    /// ```
1129    /// # use tokio::io::{AsyncRead, AsyncWrite};
1130    /// # use h2::client::*;
1131    /// # use std::time::Duration;
1132    /// # use bytes::Bytes;
1133    /// #
1134    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1135    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1136    /// # {
1137    /// // `client_fut` is a future representing the completion of the HTTP/2
1138    /// // handshake.
1139    /// let client_fut = Builder::new()
1140    ///     .enable_push(false)
1141    ///     .handshake(my_io);
1142    /// # client_fut.await
1143    /// # }
1144    /// #
1145    /// # pub fn main() {}
1146    /// ```
1147    pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1148        self.settings.set_enable_push(enabled);
1149        self
1150    }
1151
1152    /// Sets the header table size.
1153    ///
1154    /// This setting informs the peer of the maximum size of the header compression
1155    /// table used to encode header blocks, in octets. The encoder may select any value
1156    /// equal to or less than the header table size specified by the sender.
1157    ///
1158    /// The default value is 4,096.
1159    ///
1160    /// # Examples
1161    ///
1162    /// ```
1163    /// # use tokio::io::{AsyncRead, AsyncWrite};
1164    /// # use h2::client::*;
1165    /// # use bytes::Bytes;
1166    /// #
1167    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1168    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1169    /// # {
1170    /// // `client_fut` is a future representing the completion of the HTTP/2
1171    /// // handshake.
1172    /// let client_fut = Builder::new()
1173    ///     .header_table_size(1_000_000)
1174    ///     .handshake(my_io);
1175    /// # client_fut.await
1176    /// # }
1177    /// #
1178    /// # pub fn main() {}
1179    /// ```
1180    pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1181        self.settings.set_header_table_size(Some(size));
1182        self
1183    }
1184
1185    /// Sets the first stream ID to something other than 1.
1186    #[cfg(feature = "unstable")]
1187    pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1188        self.stream_id = stream_id.into();
1189        assert!(
1190            self.stream_id.is_client_initiated(),
1191            "stream id must be odd"
1192        );
1193        self
1194    }
1195
1196    /// Sets the order of settings parameters in the initial SETTINGS frame.
1197    ///
1198    /// This determines the order in which settings are sent during the HTTP/2 handshake.
1199    /// Customizing the order may be useful for testing or protocol compliance.
1200    pub fn settings_order(&mut self, order: crate::frame::SettingsOrder) -> &mut Self {
1201        self.settings.set_settings_order(order);
1202        self
1203    }
1204
1205    /// Sets the NO_RFC7540_PRIORITIES setting.
1206    ///
1207    /// This setting indicates that the sender does not support RFC 7540 priorities.
1208    /// When set to 1, the sender is indicating it does not use the priority
1209    /// mechanism described in RFC 7540.
1210    pub fn no_rfc7540_priorities(&mut self, enabled: u32) -> &mut Self {
1211        self.settings.set_no_rfc7540_priorities(Some(enabled));
1212        self
1213    }
1214
1215    /// Sets the ENABLE_CONNECT_PROTOCOL setting.
1216    ///
1217    /// This setting enables the use of the extended CONNECT method
1218    /// (RFC 8441). When set to 1, the sender indicates support for
1219    /// extended CONNECT.
1220    pub fn enable_connect_protocol(&mut self, val: u32) -> &mut Self {
1221        self.settings.set_enable_connect_protocol(Some(val));
1222        self
1223    }
1224
1225    /// Sets the HTTP/2 pseudo-header field order for outgoing HEADERS frames.
1226    ///
1227    /// This determines the order in which pseudo-header fields (such as `:method`, `:scheme`, etc.)
1228    /// are encoded in the HEADERS frame. Customizing the order may be useful for interoperability
1229    /// or testing purposes.
1230    pub fn headers_pseudo_order(&mut self, order: crate::frame::PseudoOrder) -> &mut Self {
1231        self.headers_pseudo_order = Some(order);
1232        self
1233    }
1234
1235    /// Sets PRIORITY flag parameters for HEADERS frames.
1236    ///
1237    /// When set, HEADERS frames will include priority data.
1238    /// None = no PRIORITY, Some((weight, dep, exclusive)) = PRIORITY with those values.
1239    pub fn headers_priority(&mut self, data: Option<(u8, u32, bool)>) -> &mut Self {
1240        self.headers_priority = data;
1241        self
1242    }
1243
1244    /// Sets the HTTP/2 regular header ordering for browser fingerprinting.
1245    ///
1246    /// When set, headers are encoded in the specified order instead of hash-based order.
1247    pub fn headers_order(&mut self, order: Vec<http::HeaderName>) -> &mut Self {
1248        self.headers_order = Some(order);
1249        self
1250    }
1251
1252    /// Creates a new configured HTTP/2 client backed by `io`.
1253    ///
1254    /// It is expected that `io` already be in an appropriate state to commence
1255    /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1256    /// preface and the initial settings frame is sent by the client.
1257    ///
1258    /// The handshake future does not wait for the initial settings frame from the
1259    /// server.
1260    ///
1261    /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1262    /// tuple once the HTTP/2 handshake has been completed.
1263    ///
1264    /// This function also allows the caller to configure the send payload data
1265    /// type. See [Outbound data type] for more details.
1266    ///
1267    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1268    /// [`Connection`]: struct.Connection.html
1269    /// [`SendRequest`]: struct.SendRequest.html
1270    /// [Outbound data type]: ../index.html#outbound-data-type.
1271    ///
1272    /// # Examples
1273    ///
1274    /// Basic usage:
1275    ///
1276    /// ```
1277    /// # use tokio::io::{AsyncRead, AsyncWrite};
1278    /// # use h2::client::*;
1279    /// # use bytes::Bytes;
1280    /// #
1281    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1282    ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1283    /// # {
1284    /// // `client_fut` is a future representing the completion of the HTTP/2
1285    /// // handshake.
1286    /// let client_fut = Builder::new()
1287    ///     .handshake(my_io);
1288    /// # client_fut.await
1289    /// # }
1290    /// #
1291    /// # pub fn main() {}
1292    /// ```
1293    ///
1294    /// Configures the send-payload data type. In this case, the outbound data
1295    /// type will be `&'static [u8]`.
1296    ///
1297    /// ```
1298    /// # use tokio::io::{AsyncRead, AsyncWrite};
1299    /// # use h2::client::*;
1300    /// #
1301    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1302    /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1303    /// # {
1304    /// // `client_fut` is a future representing the completion of the HTTP/2
1305    /// // handshake.
1306    /// let client_fut = Builder::new()
1307    ///     .handshake::<_, &'static [u8]>(my_io);
1308    /// # client_fut.await
1309    /// # }
1310    /// #
1311    /// # pub fn main() {}
1312    /// ```
1313    pub fn handshake<T, B>(
1314        &self,
1315        io: T,
1316    ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1317    where
1318        T: AsyncRead + AsyncWrite + Unpin,
1319        B: Buf,
1320    {
1321        Connection::handshake2(io, self.clone())
1322    }
1323}
1324
1325impl Default for Builder {
1326    fn default() -> Builder {
1327        Builder::new()
1328    }
1329}
1330
1331/// Creates a new configured HTTP/2 client with default configuration
1332/// values backed by `io`.
1333///
1334/// It is expected that `io` already be in an appropriate state to commence
1335/// the [HTTP/2 handshake]. See [Handshake] for more details.
1336///
1337/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1338/// tuple once the HTTP/2 handshake has been completed. The returned
1339/// [`Connection`] instance will be using default configuration values. Use
1340/// [`Builder`] to customize the configuration values used by a [`Connection`]
1341/// instance.
1342///
1343/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1344/// [Handshake]: ../index.html#handshake
1345/// [`Connection`]: struct.Connection.html
1346/// [`SendRequest`]: struct.SendRequest.html
1347///
1348/// # Examples
1349///
1350/// ```
1351/// # use tokio::io::{AsyncRead, AsyncWrite};
1352/// # use h2::client;
1353/// # use h2::client::*;
1354/// #
1355/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1356/// # {
1357/// let (send_request, connection) = client::handshake(my_io).await?;
1358/// // The HTTP/2 handshake has completed, now start polling
1359/// // `connection` and use `send_request` to send requests to the
1360/// // server.
1361/// # Ok(())
1362/// # }
1363/// #
1364/// # pub fn main() {}
1365/// ```
1366pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1367where
1368    T: AsyncRead + AsyncWrite + Unpin,
1369{
1370    let builder = Builder::new();
1371    builder
1372        .handshake(io)
1373        .instrument(tracing::trace_span!("client_handshake"))
1374        .await
1375}
1376
1377// ===== impl Connection =====
1378
1379async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1380where
1381    T: AsyncRead + AsyncWrite + Unpin,
1382{
1383    tracing::debug!("binding client connection");
1384
1385    let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1386    io.write_all(msg).await.map_err(crate::Error::from_io)?;
1387
1388    tracing::debug!("client connection bound");
1389
1390    Ok(())
1391}
1392
1393impl<T, B> Connection<T, B>
1394where
1395    T: AsyncRead + AsyncWrite + Unpin,
1396    B: Buf,
1397{
1398    async fn handshake2(
1399        mut io: T,
1400        builder: Builder,
1401    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1402        bind_connection(&mut io).await?;
1403
1404        // Create the codec
1405        let mut codec = Codec::new(io);
1406
1407        tracing::debug!(
1408            "handshake2: settings header_table_size={:?}, enable_push={:?}, initial_window_size={:?}, max_frame_size={:?}, max_header_list_size={:?}",
1409            builder.settings.header_table_size(),
1410            builder.settings.is_push_enabled(),
1411            builder.settings.initial_window_size(),
1412            builder.settings.max_frame_size(),
1413            builder.settings.max_header_list_size(),
1414        );
1415
1416        if let Some(max) = builder.settings.max_frame_size() {
1417            codec.set_max_recv_frame_size(max as usize);
1418        }
1419
1420        if let Some(max) = builder.settings.max_header_list_size() {
1421            codec.set_max_recv_header_list_size(max as usize);
1422        }
1423
1424        // Send initial settings frame
1425        tracing::debug!("handshake2: sending initial settings frame");
1426        codec
1427            .buffer(builder.settings.clone().into())
1428            .expect("invalid SETTINGS frame");
1429
1430        let inner = proto::Connection::new(
1431            codec,
1432            proto::Config {
1433                next_stream_id: builder.stream_id,
1434                initial_max_send_streams: builder.initial_max_send_streams,
1435                max_send_buffer_size: builder.max_send_buffer_size,
1436                reset_stream_duration: builder.reset_stream_duration,
1437                reset_stream_max: builder.reset_stream_max,
1438                remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1439                local_error_reset_streams_max: builder.local_max_error_reset_streams,
1440                settings: builder.settings.clone(),
1441                headers_pseudo_order: builder.headers_pseudo_order.clone(),
1442                headers_priority: builder.headers_priority,
1443                headers_order: builder.headers_order.clone(),
1444                initial_stream_window_increment: builder.initial_stream_window_increment,
1445            },
1446        );
1447        let send_request = SendRequest {
1448            inner: inner.streams().clone(),
1449            pending: None,
1450        };
1451
1452        let mut connection = Connection { inner };
1453        if let Some(sz) = builder.initial_target_connection_window_size {
1454            connection.inner.set_target_window_size(sz);
1455            // Buffer connection WU immediately so it's sent before any HEADERS frames
1456            if let Some(incr) = connection.inner.streams().unclaimed_connection_window() {
1457                let frame = WindowUpdate::new(StreamId::zero(), incr);
1458                connection
1459                    .inner
1460                    .codec
1461                    .buffer(frame.into())
1462                    .expect("invalid WINDOW_UPDATE frame");
1463                connection.inner.streams().inc_connection_window(incr);
1464            }
1465        }
1466
1467        Ok((send_request, connection))
1468    }
1469
1470    /// Sets the target window size for the whole connection.
1471    ///
1472    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1473    /// frame will be immediately sent to the remote, increasing the connection
1474    /// level window by `size - current_value`.
1475    ///
1476    /// If `size` is less than the current value, nothing will happen
1477    /// immediately. However, as window capacity is released by
1478    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1479    /// out until the number of "in flight" bytes drops below `size`.
1480    ///
1481    /// The default value is 65,535.
1482    ///
1483    /// See [`FlowControl`] documentation for more details.
1484    ///
1485    /// [`FlowControl`]: ../struct.FlowControl.html
1486    /// [library level]: ../index.html#flow-control
1487    pub fn set_target_window_size(&mut self, size: u32) {
1488        assert!(size <= proto::MAX_WINDOW_SIZE);
1489        self.inner.set_target_window_size(size);
1490    }
1491
1492    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1493    /// flow control for received data.
1494    ///
1495    /// The `SETTINGS` will be sent to the remote, and only applied once the
1496    /// remote acknowledges the change.
1497    ///
1498    /// This can be used to increase or decrease the window size for existing
1499    /// streams.
1500    ///
1501    /// # Errors
1502    ///
1503    /// Returns an error if a previous call is still pending acknowledgement
1504    /// from the remote endpoint.
1505    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1506        assert!(size <= proto::MAX_WINDOW_SIZE);
1507        self.inner.set_initial_window_size(size)?;
1508        Ok(())
1509    }
1510
1511    /// Takes a `PingPong` instance from the connection.
1512    ///
1513    /// # Note
1514    ///
1515    /// This may only be called once. Calling multiple times will return `None`.
1516    pub fn ping_pong(&mut self) -> Option<PingPong> {
1517        self.inner.take_user_pings().map(PingPong::new)
1518    }
1519
1520    /// Returns the maximum number of concurrent streams that may be initiated
1521    /// by this client.
1522    ///
1523    /// This limit is configured by the server peer by sending the
1524    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1525    /// This method returns the currently acknowledged value received from the
1526    /// remote.
1527    ///
1528    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1529    pub fn max_concurrent_send_streams(&self) -> usize {
1530        self.inner.max_send_streams()
1531    }
1532    /// Returns the maximum number of concurrent streams that may be initiated
1533    /// by the server on this connection.
1534    ///
1535    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1536    /// parameter][1] sent in a `SETTINGS` frame that has been
1537    /// acknowledged by the remote peer. The value to be sent is configured by
1538    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1539    /// with the remote peer.
1540    ///
1541    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1542    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1543    pub fn max_concurrent_recv_streams(&self) -> usize {
1544        self.inner.max_recv_streams()
1545    }
1546}
1547
1548impl<T, B> Future for Connection<T, B>
1549where
1550    T: AsyncRead + AsyncWrite + Unpin,
1551    B: Buf,
1552{
1553    type Output = Result<(), crate::Error>;
1554
1555    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1556        self.inner.maybe_close_connection_if_no_streams();
1557        let had_streams_or_refs = self.inner.has_streams_or_other_references();
1558        let result = self.inner.poll(cx).map_err(Into::into);
1559        // if we had streams/refs, and don't anymore, wake up one more time to
1560        // ensure proper shutdown
1561        if result.is_pending()
1562            && had_streams_or_refs
1563            && !self.inner.has_streams_or_other_references()
1564        {
1565            tracing::trace!("last stream closed during poll, wake again");
1566            cx.waker().wake_by_ref();
1567        }
1568        result
1569    }
1570}
1571
1572impl<T, B> fmt::Debug for Connection<T, B>
1573where
1574    T: AsyncRead + AsyncWrite,
1575    T: fmt::Debug,
1576    B: fmt::Debug + Buf,
1577{
1578    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1579        fmt::Debug::fmt(&self.inner, fmt)
1580    }
1581}
1582
1583// ===== impl ResponseFuture =====
1584
1585impl Future for ResponseFuture {
1586    type Output = Result<Response<RecvStream>, crate::Error>;
1587
1588    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1589        let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1590        let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1591
1592        Poll::Ready(Ok(Response::from_parts(parts, body)))
1593    }
1594}
1595
1596impl ResponseFuture {
1597    /// Returns the stream ID of the response stream.
1598    ///
1599    /// # Panics
1600    ///
1601    /// If the lock on the stream store has been poisoned.
1602    pub fn stream_id(&self) -> crate::StreamId {
1603        crate::StreamId::from_internal(self.inner.stream_id())
1604    }
1605
1606    /// Polls for informational responses (1xx status codes).
1607    ///
1608    /// This method should be called before polling the main response future
1609    /// to check for any informational responses that have been received.
1610    ///
1611    /// Returns `Poll::Ready(Some(response))` if an informational response is available,
1612    /// `Poll::Ready(None)` if no more informational responses are expected,
1613    /// or `Poll::Pending` if no informational response is currently available.
1614    pub fn poll_informational(
1615        &mut self,
1616        cx: &mut Context<'_>,
1617    ) -> Poll<Option<Result<Response<()>, crate::Error>>> {
1618        self.inner.poll_informational(cx).map_err(Into::into)
1619    }
1620
1621    /// Returns a stream of PushPromises
1622    ///
1623    /// # Panics
1624    ///
1625    /// If this method has been called before
1626    /// or the stream was itself was pushed
1627    pub fn push_promises(&mut self) -> PushPromises {
1628        if self.push_promise_consumed {
1629            panic!("Reference to push promises stream taken!");
1630        }
1631        self.push_promise_consumed = true;
1632        PushPromises {
1633            inner: self.inner.clone(),
1634        }
1635    }
1636}
1637
1638// ===== impl PushPromises =====
1639
1640impl PushPromises {
1641    /// Get the next `PushPromise`.
1642    pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1643        crate::poll_fn(move |cx| self.poll_push_promise(cx)).await
1644    }
1645
1646    #[doc(hidden)]
1647    pub fn poll_push_promise(
1648        &mut self,
1649        cx: &mut Context<'_>,
1650    ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1651        match self.inner.poll_pushed(cx) {
1652            Poll::Ready(Some(Ok((request, response)))) => {
1653                let response = PushedResponseFuture {
1654                    inner: ResponseFuture {
1655                        inner: response,
1656                        push_promise_consumed: false,
1657                    },
1658                };
1659                Poll::Ready(Some(Ok(PushPromise { request, response })))
1660            }
1661            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1662            Poll::Ready(None) => Poll::Ready(None),
1663            Poll::Pending => Poll::Pending,
1664        }
1665    }
1666}
1667
1668#[cfg(feature = "stream")]
1669impl futures_core::Stream for PushPromises {
1670    type Item = Result<PushPromise, crate::Error>;
1671
1672    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1673        self.poll_push_promise(cx)
1674    }
1675}
1676
1677// ===== impl PushPromise =====
1678
1679impl PushPromise {
1680    /// Returns a reference to the push promise's request headers.
1681    pub fn request(&self) -> &Request<()> {
1682        &self.request
1683    }
1684
1685    /// Returns a mutable reference to the push promise's request headers.
1686    pub fn request_mut(&mut self) -> &mut Request<()> {
1687        &mut self.request
1688    }
1689
1690    /// Consumes `self`, returning the push promise's request headers and
1691    /// response future.
1692    pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1693        (self.request, self.response)
1694    }
1695}
1696
1697// ===== impl PushedResponseFuture =====
1698
1699impl Future for PushedResponseFuture {
1700    type Output = Result<Response<RecvStream>, crate::Error>;
1701
1702    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1703        Pin::new(&mut self.inner).poll(cx)
1704    }
1705}
1706
1707impl PushedResponseFuture {
1708    /// Returns the stream ID of the response stream.
1709    ///
1710    /// # Panics
1711    ///
1712    /// If the lock on the stream store has been poisoned.
1713    pub fn stream_id(&self) -> crate::StreamId {
1714        self.inner.stream_id()
1715    }
1716}
1717
1718// ===== impl Peer =====
1719
1720impl Peer {
1721    pub fn convert_send_message(
1722        id: StreamId,
1723        request: Request<()>,
1724        protocol: Option<Protocol>,
1725        end_of_stream: bool,
1726        pseudo_order: Option<crate::frame::PseudoOrder>,
1727    ) -> Result<Headers, SendError> {
1728        use http::request::Parts;
1729
1730        let (
1731            Parts {
1732                method,
1733                uri,
1734                headers,
1735                version,
1736                ..
1737            },
1738            _,
1739        ) = request.into_parts();
1740
1741        let is_connect = method == Method::CONNECT;
1742
1743        // Build the set pseudo header set. All requests will include `method`
1744        // and `path`.
1745        let mut pseudo = Pseudo::request(method, uri, protocol, pseudo_order);
1746
1747        if pseudo.scheme.is_none() {
1748            // If the scheme is not set, then there are a two options.
1749            //
1750            // 1) Authority is not set. In this case, a request was issued with
1751            //    a relative URI. This is permitted **only** when forwarding
1752            //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1753            //    this is an error.
1754            //
1755            // 2) Authority is set, then the HTTP method *must* be CONNECT.
1756            //
1757            // It is not possible to have a scheme but not an authority set (the
1758            // `http` crate does not allow it).
1759            //
1760            if pseudo.authority.is_none() {
1761                if version == Version::HTTP_2 {
1762                    return Err(UserError::MissingUriSchemeAndAuthority.into());
1763                } else {
1764                    // This is acceptable as per the above comment. However,
1765                    // HTTP/2 requires that a scheme is set. Since we are
1766                    // forwarding an HTTP 1.1 request, the scheme is set to
1767                    // "http".
1768                    pseudo.set_scheme(uri::Scheme::HTTP);
1769                }
1770            } else if !is_connect {
1771                // TODO: Error
1772            }
1773        }
1774
1775        // Create the HEADERS frame
1776        let mut frame = Headers::new(id, pseudo, headers);
1777
1778        if end_of_stream {
1779            frame.set_end_stream()
1780        }
1781
1782        Ok(frame)
1783    }
1784}
1785
1786impl proto::Peer for Peer {
1787    type Poll = Response<()>;
1788
1789    const NAME: &'static str = "Client";
1790
1791    fn r#dyn() -> proto::DynPeer {
1792        proto::DynPeer::Client
1793    }
1794
1795    /*
1796    fn is_server() -> bool {
1797        false
1798    }
1799    */
1800
1801    fn convert_poll_message(
1802        pseudo: Pseudo,
1803        fields: HeaderMap,
1804        stream_id: StreamId,
1805    ) -> Result<Self::Poll, Error> {
1806        let mut b = Response::builder();
1807
1808        b = b.version(Version::HTTP_2);
1809
1810        if let Some(status) = pseudo.status {
1811            b = b.status(status);
1812        }
1813
1814        let mut response = match b.body(()) {
1815            Ok(response) => response,
1816            Err(_) => {
1817                // TODO: Should there be more specialized handling for different
1818                // kinds of errors
1819                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1820            }
1821        };
1822
1823        *response.headers_mut() = fields;
1824
1825        Ok(response)
1826    }
1827}