rama_http_core/h2/
client.rs

1//! Client implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 client requires the caller to establish the underlying
6//! connection as well as get the connection to a state that is ready to begin
7//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12//!
13//! Once a connection is obtained, it is passed to [`handshake`], which will
14//! begin the [HTTP/2 handshake]. This returns a future that completes once
15//! the handshake process is performed and HTTP/2 streams may be initialized.
16//!
17//! [`handshake`] uses default configuration values. There are a number of
18//! settings that can be changed by using [`Builder`] instead.
19//!
20//! Once the handshake future completes, the caller is provided with a
21//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22//! instance is used to drive the connection (see [Managing the connection]).
23//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24//! requests]).
25//!
26//! # Making requests
27//!
28//! Requests are made using the [`SendRequest`] handle provided by the handshake
29//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30//! the request is sent to the server.
31//!
32//! A request body and request trailers are sent using [`SendRequest`] and the
33//! server's response is returned once the [`ResponseFuture`] future completes.
34//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36//! initialized by the sent request.
37//!
38//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39//! stream can be created, i.e. as long as the current number of active streams
40//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41//! caller will be notified once an existing stream closes, freeing capacity for
42//! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43//! capacity before sending a request to the server.
44//!
45//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47//! to do so will result in an [`Error`] being returned.
48//!
49//! # Managing the connection
50//!
51//! The [`Connection`] instance is used to manage connection state. The caller
52//! is required to call [`Connection::poll`] in order to advance state.
53//! [`SendRequest::send_request`] and other functions have no effect unless
54//! [`Connection::poll`] is called.
55//!
56//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57//! returns `Ready`. At this point, the underlying socket has been closed and no
58//! further work needs to be done.
59//!
60//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61//! submit the [`Connection`] instance to an [executor]. The executor will then
62//! manage polling the connection until the connection is complete.
63//! Alternatively, the caller can call `poll` manually.
64//!
65//! # Example
66//!
67//! ```rust, no_run
68//!
69//! use rama_http_core::h2::client;
70//! use rama_http_types::{Request, Method};
71//! use std::error::Error;
72//! use tokio::net::TcpStream;
73//!
74//! #[tokio::main]
75//! pub async fn main() -> Result<(), Box<dyn Error>> {
76//!     // Establish TCP connection to the server.
77//!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
78//!     let (h2, connection) = client::handshake(tcp).await?;
79//!     tokio::spawn(async move {
80//!         connection.await.unwrap();
81//!     });
82//!
83//!     let mut h2 = h2.ready().await?;
84//!     // Prepare the HTTP request to send to the server.
85//!     let request = Request::builder()
86//!                     .method(Method::GET)
87//!                     .uri("https://www.example.com/")
88//!                     .body(())
89//!                     .unwrap();
90//!
91//!     // Send the request. The second tuple item allows the caller
92//!     // to stream a request body.
93//!     let (response, _) = h2.send_request(request, true).unwrap();
94//!
95//!     let (head, mut body) = response.await?.into_parts();
96//!
97//!     println!("Received response: {:?}", head);
98//!
99//!     // The `flow_control` handle allows the caller to manage
100//!     // flow control.
101//!     //
102//!     // Whenever data is received, the caller is responsible for
103//!     // releasing capacity back to the server once it has freed
104//!     // the data from memory.
105//!     let mut flow_control = body.flow_control().clone();
106//!
107//!     while let Some(chunk) = body.data().await {
108//!         let chunk = chunk?;
109//!         println!("RX: {:?}", chunk);
110//!
111//!         // Let the server send more data.
112//!         let _ = flow_control.release_capacity(chunk.len());
113//!     }
114//!
115//!     Ok(())
116//! }
117//! ```
118//!
119//! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
120//! [`handshake`]: fn.handshake.html
121//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
122//! [`SendRequest`]: struct.SendRequest.html
123//! [`SendStream`]: ../struct.SendStream.html
124//! [Making requests]: #making-requests
125//! [Managing the connection]: #managing-the-connection
126//! [`Connection`]: struct.Connection.html
127//! [`Connection::poll`]: struct.Connection.html#method.poll
128//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
129//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
130//! [`SendRequest`]: struct.SendRequest.html
131//! [`ResponseFuture`]: struct.ResponseFuture.html
132//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
133//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
134//! [`Builder`]: struct.Builder.html
135//! [`Error`]: ../struct.Error.html
136
137use crate::h2::codec::{Codec, SendError, UserError};
138use crate::h2::ext::Protocol;
139use crate::h2::frame::{Headers, Pseudo, Reason, Settings, StreamId};
140use crate::h2::proto::{self, Error};
141use crate::h2::{FlowControl, PingPong, RecvStream, SendStream};
142
143use bytes::{Buf, Bytes};
144use rama_http_types::dep::http::{request, uri};
145use rama_http_types::proto::h1::headers::original::OriginalHttp1Headers;
146use rama_http_types::proto::h2::PseudoHeaderOrder;
147use rama_http_types::proto::h2::frame::{SettingOrder, SettingsConfig};
148use rama_http_types::{HeaderMap, Method, Request, Response, Version};
149use std::borrow::Cow;
150use std::fmt;
151use std::pin::Pin;
152use std::task::{Context, Poll};
153use std::time::Duration;
154use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
155use tracing::Instrument;
156
157use super::frame::{Priority, StreamDependency};
158
159/// Initializes new HTTP/2 streams on a connection by sending a request.
160///
161/// This type does no work itself. Instead, it is a handle to the inner
162/// connection state held by [`Connection`]. If the associated connection
163/// instance is dropped, all `SendRequest` functions will return [`Error`].
164///
165/// [`SendRequest`] instances are able to move to and operate on separate tasks
166/// / threads than their associated [`Connection`] instance. Internally, there
167/// is a buffer used to stage requests before they get written to the
168/// connection. There is no guarantee that requests get written to the
169/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
170///
171/// [`SendRequest`] implements [`Clone`], enabling the creation of many
172/// instances that are backed by a single connection.
173///
174/// See [module] level documentation for more details.
175///
176/// [module]: index.html
177/// [`Connection`]: struct.Connection.html
178/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
179/// [`Error`]: ../struct.Error.html
180pub struct SendRequest<B: Buf> {
181    inner: proto::Streams<B, Peer>,
182    pending: Option<proto::OpaqueStreamRef>,
183}
184
185/// Returns a `SendRequest` instance once it is ready to send at least one
186/// request.
187#[derive(Debug)]
188pub struct ReadySendRequest<B: Buf> {
189    inner: Option<SendRequest<B>>,
190}
191
192/// Manages all state associated with an HTTP/2 client connection.
193///
194/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
195/// implements the HTTP/2 client logic for that connection. It is responsible
196/// for driving the internal state forward, performing the work requested of the
197/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
198/// [`RecvStream`]).
199///
200/// `Connection` values are created by calling [`handshake`]. Once a
201/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
202/// until `Ready` is returned. The easiest way to do this is to submit the
203/// `Connection` instance to an [executor].
204///
205/// [module]: index.html
206/// [`handshake`]: fn.handshake.html
207/// [`SendRequest`]: struct.SendRequest.html
208/// [`ResponseFuture`]: struct.ResponseFuture.html
209/// [`SendStream`]: ../struct.SendStream.html
210/// [`RecvStream`]: ../struct.RecvStream.html
211/// [`poll`]: #method.poll
212/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
213///
214/// # Examples
215///
216/// ```
217/// # use tokio::io::{AsyncRead, AsyncWrite};
218/// # use rama_http_core::h2::client;
219/// # use rama_http_core::h2::client::*;
220/// #
221/// # async fn doc<T>(my_io: T) -> Result<(), rama_http_core::h2::Error>
222/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
223/// # {
224///     let (send_request, connection) = client::handshake(my_io).await?;
225///     // Submit the connection handle to an executor.
226///     tokio::spawn(async { connection.await.expect("connection failed"); });
227///
228///     // Now, use `send_request` to initialize HTTP/2 streams.
229///     // ...
230/// # Ok(())
231/// # }
232/// #
233/// # pub fn main() {}
234/// ```
235#[must_use = "futures do nothing unless polled"]
236pub struct Connection<T, B: Buf = Bytes> {
237    inner: proto::Connection<T, Peer, B>,
238}
239
240/// A future of an HTTP response.
241#[derive(Debug)]
242#[must_use = "futures do nothing unless polled"]
243pub struct ResponseFuture {
244    inner: proto::OpaqueStreamRef,
245    push_promise_consumed: bool,
246}
247
248/// A future of a pushed HTTP response.
249///
250/// We have to differentiate between pushed and non pushed because of the spec
251/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
252/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
253/// > that is in either the "open" or "half-closed (remote)" state.
254#[derive(Debug)]
255#[must_use = "futures do nothing unless polled"]
256pub struct PushedResponseFuture {
257    inner: ResponseFuture,
258}
259
260/// A pushed response and corresponding request headers
261#[derive(Debug)]
262pub struct PushPromise {
263    /// The request headers
264    request: Request<()>,
265
266    /// The pushed response
267    response: PushedResponseFuture,
268}
269
270/// A stream of pushed responses and corresponding promised requests
271#[derive(Debug)]
272#[must_use = "streams do nothing unless polled"]
273pub struct PushPromises {
274    inner: proto::OpaqueStreamRef,
275}
276
277/// Builds client connections with custom configuration values.
278///
279/// Methods can be chained in order to set the configuration values.
280///
281/// The client is constructed by calling [`handshake`] and passing the I/O
282/// handle that will back the HTTP/2 server.
283///
284/// New instances of `Builder` are obtained via [`Builder::new`].
285///
286/// See function level documentation for details on the various client
287/// configuration settings.
288///
289/// [`Builder::new`]: struct.Builder.html#method.new
290/// [`handshake`]: struct.Builder.html#method.handshake
291///
292/// # Examples
293///
294/// ```
295/// # use tokio::io::{AsyncRead, AsyncWrite};
296/// # use rama_http_core::h2::client::*;
297/// # use bytes::Bytes;
298/// #
299/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
300///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
301/// # {
302/// // `client_fut` is a future representing the completion of the HTTP/2
303/// // handshake.
304/// let client_fut = Builder::new()
305///     .initial_window_size(1_000_000)
306///     .max_concurrent_streams(1000)
307///     .handshake(my_io);
308/// # client_fut.await
309/// # }
310/// #
311/// # pub fn main() {}
312/// ```
313#[derive(Clone, Debug)]
314pub struct Builder {
315    /// Time to keep locally reset streams around before reaping.
316    reset_stream_duration: Duration,
317
318    /// Initial maximum number of locally initiated (send) streams.
319    /// After receiving a SETTINGS frame from the remote peer,
320    /// the connection will overwrite this value with the
321    /// MAX_CONCURRENT_STREAMS specified in the frame.
322    /// If no value is advertised by the remote peer in the initial SETTINGS
323    /// frame, it will be set to usize::MAX.
324    initial_max_send_streams: usize,
325
326    /// Initial target window size for new connections.
327    initial_target_connection_window_size: Option<u32>,
328
329    /// Maximum amount of bytes to "buffer" for writing per stream.
330    max_send_buffer_size: usize,
331
332    /// Maximum number of locally reset streams to keep at a time.
333    reset_stream_max: usize,
334
335    /// Maximum number of remotely reset streams to allow in the pending
336    /// accept queue.
337    pending_accept_reset_stream_max: usize,
338
339    /// Initial `Settings` frame to send as part of the handshake.
340    settings: Settings,
341
342    /// The stream ID of the first (lowest) stream. Subsequent streams will use
343    /// monotonically increasing stream IDs.
344    stream_id: StreamId,
345
346    /// Maximum number of locally reset streams due to protocol error across
347    /// the lifetime of the connection.
348    ///
349    /// When this gets exceeded, we issue GOAWAYs.
350    local_max_error_reset_streams: Option<usize>,
351
352    /// The headers frame pseudo order
353    headers_pseudo_order: Option<PseudoHeaderOrder>,
354
355    /// The headers frame priority
356    headers_priority: Option<StreamDependency>,
357
358    /// Priority stream list
359    priority: Option<Cow<'static, [Priority]>>,
360}
361
362#[derive(Debug)]
363pub(crate) struct Peer;
364
365// ===== impl SendRequest =====
366
367impl<B> SendRequest<B>
368where
369    B: Buf,
370{
371    /// Returns `Ready` when the connection can initialize a new HTTP/2
372    /// stream.
373    ///
374    /// This function must return `Ready` before `send_request` is called. When
375    /// `Poll::Pending` is returned, the task will be notified once the readiness
376    /// state changes.
377    ///
378    /// See [module] level docs for more details.
379    ///
380    /// [module]: index.html
381    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::h2::Error>> {
382        ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
383        self.pending = None;
384        Poll::Ready(Ok(()))
385    }
386
387    /// Consumes `self`, returning a future that returns `self` back once it is
388    /// ready to send a request.
389    ///
390    /// This function should be called before calling `send_request`.
391    ///
392    /// This is a functional combinator for [`poll_ready`]. The returned future
393    /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
394    /// the caller.
395    ///
396    /// # Examples
397    ///
398    /// ```rust
399    /// # use rama_http_core::h2::client::*;
400    /// # use rama_http_types::*;
401    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
402    /// # {
403    /// // First, wait until the `send_request` handle is ready to send a new
404    /// // request
405    /// let mut send_request = send_request.ready().await.unwrap();
406    /// // Use `send_request` here.
407    /// # }
408    /// # pub fn main() {}
409    /// ```
410    ///
411    /// See [module] level docs for more details.
412    ///
413    /// [`poll_ready`]: #method.poll_ready
414    /// [module]: index.html
415    pub fn ready(self) -> ReadySendRequest<B> {
416        ReadySendRequest { inner: Some(self) }
417    }
418
419    /// Sends a HTTP/2 request to the server.
420    ///
421    /// `send_request` initializes a new HTTP/2 stream on the associated
422    /// connection, then sends the given request using this new stream. Only the
423    /// request head is sent.
424    ///
425    /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
426    /// are returned. The [`ResponseFuture`] instance is used to get the
427    /// server's response and the [`SendStream`] instance is used to send a
428    /// request body or trailers to the server over the same HTTP/2 stream.
429    ///
430    /// To send a request body or trailers, set `end_of_stream` to `false`.
431    /// Then, use the returned [`SendStream`] instance to stream request body
432    /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
433    /// then attempting to call [`SendStream::send_data`] or
434    /// [`SendStream::send_trailers`] will result in an error.
435    ///
436    /// If no request body or trailers are to be sent, set `end_of_stream` to
437    /// `true` and drop the returned [`SendStream`] instance.
438    ///
439    /// # A note on HTTP versions
440    ///
441    /// The provided `Request` will be encoded differently depending on the
442    /// value of its version field. If the version is set to 2.0, then the
443    /// request is encoded as per the specification recommends.
444    ///
445    /// If the version is set to a lower value, then the request is encoded to
446    /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
447    /// headers are permitted and the `:authority` pseudo header is not
448    /// included.
449    ///
450    /// The caller should always set the request's version field to 2.0 unless
451    /// specifically transmitting an HTTP 1.1 request over 2.0.
452    ///
453    /// # Examples
454    ///
455    /// Sending a request with no body
456    ///
457    /// ```rust
458    /// # use rama_http_core::h2::client::*;
459    /// # use rama_http_types::*;
460    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
461    /// # {
462    /// // First, wait until the `send_request` handle is ready to send a new
463    /// // request
464    /// let mut send_request = send_request.ready().await.unwrap();
465    /// // Prepare the HTTP request to send to the server.
466    /// let request = Request::get("https://www.example.com/")
467    ///     .body(())
468    ///     .unwrap();
469    ///
470    /// // Send the request to the server. Since we are not sending a
471    /// // body or trailers, we can drop the `SendStream` instance.
472    /// let (response, _) = send_request.send_request(request, true).unwrap();
473    /// let response = response.await.unwrap();
474    /// // Process the response
475    /// # }
476    /// # pub fn main() {}
477    /// ```
478    ///
479    /// Sending a request with a body and trailers
480    ///
481    /// ```rust
482    /// # use rama_http_core::h2::client::*;
483    /// # use rama_http_types::*;
484    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
485    /// # {
486    /// // First, wait until the `send_request` handle is ready to send a new
487    /// // request
488    /// let mut send_request = send_request.ready().await.unwrap();
489    ///
490    /// // Prepare the HTTP request to send to the server.
491    /// let request = Request::get("https://www.example.com/")
492    ///     .body(())
493    ///     .unwrap();
494    ///
495    /// // Send the request to the server. If we are not sending a
496    /// // body or trailers, we can drop the `SendStream` instance.
497    /// let (response, mut send_stream) = send_request
498    ///     .send_request(request, false).unwrap();
499    ///
500    /// // At this point, one option would be to wait for send capacity.
501    /// // Doing so would allow us to not hold data in memory that
502    /// // cannot be sent. However, this is not a requirement, so this
503    /// // example will skip that step. See `SendStream` documentation
504    /// // for more details.
505    /// send_stream.send_data(b"hello", false).unwrap();
506    /// send_stream.send_data(b"world", false).unwrap();
507    ///
508    /// // Send the trailers.
509    /// let mut trailers = HeaderMap::new();
510    /// trailers.insert(
511    ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
512    ///     header::HeaderValue::from_bytes(b"hello").unwrap());
513    ///
514    /// send_stream.send_trailers(trailers, Default::default()).unwrap();
515    ///
516    /// let response = response.await.unwrap();
517    /// // Process the response
518    /// # }
519    /// # pub fn main() {}
520    /// ```
521    ///
522    /// [`ResponseFuture`]: struct.ResponseFuture.html
523    /// [`SendStream`]: ../struct.SendStream.html
524    /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
525    /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
526    pub fn send_request(
527        &mut self,
528        request: Request<()>,
529        end_of_stream: bool,
530    ) -> Result<(ResponseFuture, SendStream<B>), crate::h2::Error> {
531        self.inner
532            .send_request(request, end_of_stream, self.pending.as_ref())
533            .map_err(Into::into)
534            .map(|(stream, is_full)| {
535                if stream.is_pending_open() && is_full {
536                    // Only prevent sending another request when the request queue
537                    // is not full.
538                    self.pending = Some(stream.clone_to_opaque());
539                }
540
541                let response = ResponseFuture {
542                    inner: stream.clone_to_opaque(),
543                    push_promise_consumed: false,
544                };
545
546                let stream = SendStream::new(stream);
547
548                (response, stream)
549            })
550    }
551
552    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
553    ///
554    /// This setting is configured by the server peer by sending the
555    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
556    /// This method returns the currently acknowledged value received from the
557    /// remote.
558    ///
559    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
560    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
561    pub fn is_extended_connect_protocol_enabled(&self) -> bool {
562        self.inner.is_extended_connect_protocol_enabled()
563    }
564
565    /// Returns the current max send streams
566    pub fn current_max_send_streams(&self) -> usize {
567        self.inner.current_max_send_streams()
568    }
569
570    /// Returns the current max recv streams
571    pub fn current_max_recv_streams(&self) -> usize {
572        self.inner.current_max_recv_streams()
573    }
574}
575
576impl<B> fmt::Debug for SendRequest<B>
577where
578    B: Buf,
579{
580    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
581        fmt.debug_struct("SendRequest").finish()
582    }
583}
584
585impl<B> Clone for SendRequest<B>
586where
587    B: Buf,
588{
589    fn clone(&self) -> Self {
590        SendRequest {
591            inner: self.inner.clone(),
592            pending: None,
593        }
594    }
595}
596
597#[cfg(feature = "unstable")]
598impl<B> SendRequest<B>
599where
600    B: Buf,
601{
602    /// Returns the number of active streams.
603    ///
604    /// An active stream is a stream that has not yet transitioned to a closed
605    /// state.
606    pub fn num_active_streams(&self) -> usize {
607        self.inner.num_active_streams()
608    }
609
610    /// Returns the number of streams that are held in memory.
611    ///
612    /// A wired stream is a stream that is either active or is closed but must
613    /// stay in memory for some reason. For example, there are still outstanding
614    /// userspace handles pointing to the slot.
615    pub fn num_wired_streams(&self) -> usize {
616        self.inner.num_wired_streams()
617    }
618}
619
620// ===== impl ReadySendRequest =====
621
622impl<B> Future for ReadySendRequest<B>
623where
624    B: Buf,
625{
626    type Output = Result<SendRequest<B>, crate::h2::Error>;
627
628    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
629        match &mut self.inner {
630            Some(send_request) => {
631                ready!(send_request.poll_ready(cx))?;
632            }
633            None => panic!("called `poll` after future completed"),
634        }
635
636        Poll::Ready(Ok(self.inner.take().unwrap()))
637    }
638}
639
640// ===== impl Builder =====
641
642impl Builder {
643    /// Returns a new client builder instance initialized with default
644    /// configuration values.
645    ///
646    /// Configuration methods can be chained on the return value.
647    ///
648    /// # Examples
649    ///
650    /// ```
651    /// # use tokio::io::{AsyncRead, AsyncWrite};
652    /// # use rama_http_core::h2::client::*;
653    /// # use bytes::Bytes;
654    /// #
655    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
656    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
657    /// # {
658    /// // `client_fut` is a future representing the completion of the HTTP/2
659    /// // handshake.
660    /// let client_fut = Builder::new()
661    ///     .initial_window_size(1_000_000)
662    ///     .max_concurrent_streams(1000)
663    ///     .handshake(my_io);
664    /// # client_fut.await
665    /// # }
666    /// #
667    /// # pub fn main() {}
668    /// ```
669    pub fn new() -> Builder {
670        Builder {
671            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
672            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
673            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
674            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
675            initial_target_connection_window_size: None,
676            initial_max_send_streams: usize::MAX,
677            settings: Default::default(),
678            stream_id: 1.into(),
679            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
680            headers_pseudo_order: None,
681            headers_priority: None,
682            priority: None,
683        }
684    }
685
686    /// Set http2 header pseudo order
687    pub fn headers_pseudo_order(&mut self, order: PseudoHeaderOrder) -> &mut Self {
688        self.headers_pseudo_order = Some(order);
689        self
690    }
691
692    /// Set http2 header priority
693    pub fn headers_priority(&mut self, headers_priority: StreamDependency) -> &mut Self {
694        self.headers_priority = Some(headers_priority);
695        self
696    }
697
698    /// Settings frame order
699    pub fn setting_order(&mut self, order: SettingOrder) -> &mut Self {
700        self.settings.set_setting_order(Some(order));
701        self
702    }
703
704    /// Priority stream list
705    pub fn priority(&mut self, priority: Cow<'static, [Priority]>) -> &mut Self {
706        self.priority = Some(priority);
707        self
708    }
709
710    pub fn setting_config(&mut self, config: SettingsConfig) -> &mut Self {
711        self.settings.set_config(config);
712        self
713    }
714
715    /// Indicates the initial window size (in octets) for stream-level
716    /// flow control for received data.
717    ///
718    /// The initial window of a stream is used as part of flow control. For more
719    /// details, see [`FlowControl`].
720    ///
721    /// The default value is 65,535.
722    ///
723    /// [`FlowControl`]: ../struct.FlowControl.html
724    ///
725    /// # Examples
726    ///
727    /// ```
728    /// # use tokio::io::{AsyncRead, AsyncWrite};
729    /// # use rama_http_core::h2::client::*;
730    /// # use bytes::Bytes;
731    /// #
732    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
733    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
734    /// # {
735    /// // `client_fut` is a future representing the completion of the HTTP/2
736    /// // handshake.
737    /// let client_fut = Builder::new()
738    ///     .initial_window_size(1_000_000)
739    ///     .handshake(my_io);
740    /// # client_fut.await
741    /// # }
742    /// #
743    /// # pub fn main() {}
744    /// ```
745    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
746        self.settings.set_initial_window_size(Some(size));
747        self
748    }
749
750    /// Indicates the initial window size (in octets) for connection-level flow control
751    /// for received data.
752    ///
753    /// The initial window of a connection is used as part of flow control. For more details,
754    /// see [`FlowControl`].
755    ///
756    /// The default value is 65,535.
757    ///
758    /// [`FlowControl`]: ../struct.FlowControl.html
759    ///
760    /// # Examples
761    ///
762    /// ```
763    /// # use tokio::io::{AsyncRead, AsyncWrite};
764    /// # use rama_http_core::h2::client::*;
765    /// # use bytes::Bytes;
766    /// #
767    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
768    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
769    /// # {
770    /// // `client_fut` is a future representing the completion of the HTTP/2
771    /// // handshake.
772    /// let client_fut = Builder::new()
773    ///     .initial_connection_window_size(1_000_000)
774    ///     .handshake(my_io);
775    /// # client_fut.await
776    /// # }
777    /// #
778    /// # pub fn main() {}
779    /// ```
780    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
781        self.initial_target_connection_window_size = Some(size);
782        self
783    }
784
785    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
786    /// configured client is able to accept.
787    ///
788    /// The sender may send data frames that are **smaller** than this value,
789    /// but any data larger than `max` will be broken up into multiple `DATA`
790    /// frames.
791    ///
792    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
793    ///
794    /// # Examples
795    ///
796    /// ```
797    /// # use tokio::io::{AsyncRead, AsyncWrite};
798    /// # use rama_http_core::h2::client::*;
799    /// # use bytes::Bytes;
800    /// #
801    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
802    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
803    /// # {
804    /// // `client_fut` is a future representing the completion of the HTTP/2
805    /// // handshake.
806    /// let client_fut = Builder::new()
807    ///     .max_frame_size(1_000_000)
808    ///     .handshake(my_io);
809    /// # client_fut.await
810    /// # }
811    /// #
812    /// # pub fn main() {}
813    /// ```
814    ///
815    /// # Panics
816    ///
817    /// This function panics if `max` is not within the legal range specified
818    /// above.
819    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
820        self.settings.set_max_frame_size(Some(max));
821        self
822    }
823
824    /// Sets the max size of received header frames.
825    ///
826    /// This advisory setting informs a peer of the maximum size of header list
827    /// that the sender is prepared to accept, in octets. The value is based on
828    /// the uncompressed size of header fields, including the length of the name
829    /// and value in octets plus an overhead of 32 octets for each header field.
830    ///
831    /// This setting is also used to limit the maximum amount of data that is
832    /// buffered to decode HEADERS frames.
833    ///
834    /// # Examples
835    ///
836    /// ```
837    /// # use tokio::io::{AsyncRead, AsyncWrite};
838    /// # use rama_http_core::h2::client::*;
839    /// # use bytes::Bytes;
840    /// #
841    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
842    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
843    /// # {
844    /// // `client_fut` is a future representing the completion of the HTTP/2
845    /// // handshake.
846    /// let client_fut = Builder::new()
847    ///     .max_header_list_size(16 * 1024)
848    ///     .handshake(my_io);
849    /// # client_fut.await
850    /// # }
851    /// #
852    /// # pub fn main() {}
853    /// ```
854    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
855        self.settings.set_max_header_list_size(Some(max));
856        self
857    }
858
859    /// Sets the maximum number of concurrent streams.
860    ///
861    /// The maximum concurrent streams setting only controls the maximum number
862    /// of streams that can be initiated by the remote peer. In other words,
863    /// when this setting is set to 100, this does not limit the number of
864    /// concurrent streams that can be created by the caller.
865    ///
866    /// It is recommended that this value be no smaller than 100, so as to not
867    /// unnecessarily limit parallelism. However, any value is legal, including
868    /// 0. If `max` is set to 0, then the remote will not be permitted to
869    /// initiate streams.
870    ///
871    /// Note that streams in the reserved state, i.e., push promises that have
872    /// been reserved but the stream has not started, do not count against this
873    /// setting.
874    ///
875    /// Also note that if the remote *does* exceed the value set here, it is not
876    /// a protocol level error. Instead, the `h2` library will immediately reset
877    /// the stream.
878    ///
879    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
880    ///
881    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
882    ///
883    /// # Examples
884    ///
885    /// ```
886    /// # use tokio::io::{AsyncRead, AsyncWrite};
887    /// # use rama_http_core::h2::client::*;
888    /// # use bytes::Bytes;
889    /// #
890    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
891    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
892    /// # {
893    /// // `client_fut` is a future representing the completion of the HTTP/2
894    /// // handshake.
895    /// let client_fut = Builder::new()
896    ///     .max_concurrent_streams(1000)
897    ///     .handshake(my_io);
898    /// # client_fut.await
899    /// # }
900    /// #
901    /// # pub fn main() {}
902    /// ```
903    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
904        self.settings.set_max_concurrent_streams(Some(max));
905        self
906    }
907
908    /// Sets the initial maximum of locally initiated (send) streams.
909    ///
910    /// The initial settings will be overwritten by the remote peer when
911    /// the SETTINGS frame is received. The new value will be set to the
912    /// `max_concurrent_streams()` from the frame. If no value is advertised in
913    /// the initial SETTINGS frame from the remote peer as part of
914    /// [HTTP/2 Connection Preface], `usize::MAX` will be set.
915    ///
916    /// This setting prevents the caller from exceeding this number of
917    /// streams that are counted towards the concurrency limit.
918    ///
919    /// Sending streams past the limit returned by the peer will be treated
920    /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
921    ///
922    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
923    ///
924    /// The default value is `usize::MAX`.
925    ///
926    /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
927    /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
928    ///
929    /// # Examples
930    ///
931    /// ```
932    /// # use tokio::io::{AsyncRead, AsyncWrite};
933    /// # use rama_http_core::h2::client::*;
934    /// # use bytes::Bytes;
935    /// #
936    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
937    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
938    /// # {
939    /// // `client_fut` is a future representing the completion of the HTTP/2
940    /// // handshake.
941    /// let client_fut = Builder::new()
942    ///     .initial_max_send_streams(1000)
943    ///     .handshake(my_io);
944    /// # client_fut.await
945    /// # }
946    /// #
947    /// # pub fn main() {}
948    /// ```
949    pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
950        self.initial_max_send_streams = initial;
951        self
952    }
953
954    /// Sets the maximum number of concurrent locally reset streams.
955    ///
956    /// When a stream is explicitly reset, the HTTP/2 specification requires
957    /// that any further frames received for that stream must be ignored for
958    /// "some time".
959    ///
960    /// In order to satisfy the specification, internal state must be maintained
961    /// to implement the behavior. This state grows linearly with the number of
962    /// streams that are locally reset.
963    ///
964    /// The `max_concurrent_reset_streams` setting configures sets an upper
965    /// bound on the amount of state that is maintained. When this max value is
966    /// reached, the oldest reset stream is purged from memory.
967    ///
968    /// Once the stream has been fully purged from memory, any additional frames
969    /// received for that stream will result in a connection level protocol
970    /// error, forcing the connection to terminate.
971    ///
972    /// The default value is 10.
973    ///
974    /// # Examples
975    ///
976    /// ```
977    /// # use tokio::io::{AsyncRead, AsyncWrite};
978    /// # use rama_http_core::h2::client::*;
979    /// # use bytes::Bytes;
980    /// #
981    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
982    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
983    /// # {
984    /// // `client_fut` is a future representing the completion of the HTTP/2
985    /// // handshake.
986    /// let client_fut = Builder::new()
987    ///     .max_concurrent_reset_streams(1000)
988    ///     .handshake(my_io);
989    /// # client_fut.await
990    /// # }
991    /// #
992    /// # pub fn main() {}
993    /// ```
994    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
995        self.reset_stream_max = max;
996        self
997    }
998
999    /// Sets the duration to remember locally reset streams.
1000    ///
1001    /// When a stream is explicitly reset, the HTTP/2 specification requires
1002    /// that any further frames received for that stream must be ignored for
1003    /// "some time".
1004    ///
1005    /// In order to satisfy the specification, internal state must be maintained
1006    /// to implement the behavior. This state grows linearly with the number of
1007    /// streams that are locally reset.
1008    ///
1009    /// The `reset_stream_duration` setting configures the max amount of time
1010    /// this state will be maintained in memory. Once the duration elapses, the
1011    /// stream state is purged from memory.
1012    ///
1013    /// Once the stream has been fully purged from memory, any additional frames
1014    /// received for that stream will result in a connection level protocol
1015    /// error, forcing the connection to terminate.
1016    ///
1017    /// The default value is 30 seconds.
1018    ///
1019    /// # Examples
1020    ///
1021    /// ```
1022    /// # use tokio::io::{AsyncRead, AsyncWrite};
1023    /// # use rama_http_core::h2::client::*;
1024    /// # use std::time::Duration;
1025    /// # use bytes::Bytes;
1026    /// #
1027    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1028    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
1029    /// # {
1030    /// // `client_fut` is a future representing the completion of the HTTP/2
1031    /// // handshake.
1032    /// let client_fut = Builder::new()
1033    ///     .reset_stream_duration(Duration::from_secs(10))
1034    ///     .handshake(my_io);
1035    /// # client_fut.await
1036    /// # }
1037    /// #
1038    /// # pub fn main() {}
1039    /// ```
1040    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1041        self.reset_stream_duration = dur;
1042        self
1043    }
1044
1045    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
1046    ///
1047    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
1048    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
1049    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
1050    ///
1051    /// When the number of local resets exceeds this threshold, the client will close the connection.
1052    ///
1053    /// If you really want to disable this, supply [`Option::None`] here.
1054    /// Disabling this is not recommended and may expose you to DOS attacks.
1055    ///
1056    /// The default value is currently 1024, but could change.
1057    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
1058        self.local_max_error_reset_streams = max;
1059        self
1060    }
1061
1062    /// Sets the maximum number of pending-accept remotely-reset streams.
1063    ///
1064    /// Streams that have been received by the peer, but not accepted by the
1065    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
1066    /// could send a request and then shortly after, realize it is not needed,
1067    /// sending a CANCEL.
1068    ///
1069    /// However, since those streams are now "closed", they don't count towards
1070    /// the max concurrent streams. So, they will sit in the accept queue,
1071    /// using memory.
1072    ///
1073    /// When the number of remotely-reset streams sitting in the pending-accept
1074    /// queue reaches this maximum value, a connection error with the code of
1075    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
1076    /// `Future`.
1077    ///
1078    /// The default value is currently 20, but could change.
1079    ///
1080    /// # Examples
1081    ///
1082    /// ```
1083    /// # use tokio::io::{AsyncRead, AsyncWrite};
1084    /// # use rama_http_core::h2::client::*;
1085    /// # use bytes::Bytes;
1086    /// #
1087    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1088    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
1089    /// # {
1090    /// // `client_fut` is a future representing the completion of the HTTP/2
1091    /// // handshake.
1092    /// let client_fut = Builder::new()
1093    ///     .max_pending_accept_reset_streams(100)
1094    ///     .handshake(my_io);
1095    /// # client_fut.await
1096    /// # }
1097    /// #
1098    /// # pub fn main() {}
1099    /// ```
1100    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1101        self.pending_accept_reset_stream_max = max;
1102        self
1103    }
1104
1105    /// Sets the maximum send buffer size per stream.
1106    ///
1107    /// Once a stream has buffered up to (or over) the maximum, the stream's
1108    /// flow control will not "poll" additional capacity. Once bytes for the
1109    /// stream have been written to the connection, the send buffer capacity
1110    /// will be freed up again.
1111    ///
1112    /// The default is currently ~400KB, but may change.
1113    ///
1114    /// # Panics
1115    ///
1116    /// This function panics if `max` is larger than `u32::MAX`.
1117    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1118        assert!(max <= u32::MAX as usize);
1119        self.max_send_buffer_size = max;
1120        self
1121    }
1122
1123    /// Enables or disables server push promises.
1124    ///
1125    /// This value is included in the initial SETTINGS handshake.
1126    /// Setting this value to value to
1127    /// false in the initial SETTINGS handshake guarantees that the remote server
1128    /// will never send a push promise.
1129    ///
1130    /// This setting can be changed during the life of a single HTTP/2
1131    /// connection by sending another settings frame updating the value.
1132    ///
1133    /// Default value: `true`.
1134    ///
1135    /// # Examples
1136    ///
1137    /// ```
1138    /// # use tokio::io::{AsyncRead, AsyncWrite};
1139    /// # use rama_http_core::h2::client::*;
1140    /// # use std::time::Duration;
1141    /// # use bytes::Bytes;
1142    /// #
1143    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1144    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
1145    /// # {
1146    /// // `client_fut` is a future representing the completion of the HTTP/2
1147    /// // handshake.
1148    /// let client_fut = Builder::new()
1149    ///     .enable_push(false)
1150    ///     .handshake(my_io);
1151    /// # client_fut.await
1152    /// # }
1153    /// #
1154    /// # pub fn main() {}
1155    /// ```
1156    pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1157        self.settings.set_enable_push(enabled);
1158        self
1159    }
1160
1161    pub fn enable_connect_protocol(&mut self, value: u32) -> &mut Self {
1162        self.settings.set_enable_connect_protocol(Some(value));
1163        self
1164    }
1165
1166    pub fn unknown_setting_9(&mut self, value: u32) -> &mut Self {
1167        self.settings.set_unknown_setting_9(Some(value));
1168        self
1169    }
1170
1171    /// Sets the header table size.
1172    ///
1173    /// This setting informs the peer of the maximum size of the header compression
1174    /// table used to encode header blocks, in octets. The encoder may select any value
1175    /// equal to or less than the header table size specified by the sender.
1176    ///
1177    /// The default value is 4,096.
1178    ///
1179    /// # Examples
1180    ///
1181    /// ```
1182    /// # use tokio::io::{AsyncRead, AsyncWrite};
1183    /// # use rama_http_core::h2::client::*;
1184    /// # use bytes::Bytes;
1185    /// #
1186    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1187    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
1188    /// # {
1189    /// // `client_fut` is a future representing the completion of the HTTP/2
1190    /// // handshake.
1191    /// let client_fut = Builder::new()
1192    ///     .header_table_size(1_000_000)
1193    ///     .handshake(my_io);
1194    /// # client_fut.await
1195    /// # }
1196    /// #
1197    /// # pub fn main() {}
1198    /// ```
1199    pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1200        self.settings.set_header_table_size(Some(size));
1201        self
1202    }
1203
1204    /// Sets the first stream ID to something other than 1.
1205    pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1206        self.stream_id = stream_id.into();
1207        assert!(
1208            self.stream_id.is_client_initiated(),
1209            "stream id must be odd"
1210        );
1211        self
1212    }
1213
1214    /// Creates a new configured HTTP/2 client backed by `io`.
1215    ///
1216    /// It is expected that `io` already be in an appropriate state to commence
1217    /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1218    /// preface and the initial settings frame is sent by the client.
1219    ///
1220    /// The handshake future does not wait for the initial settings frame from the
1221    /// server.
1222    ///
1223    /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1224    /// tuple once the HTTP/2 handshake has been completed.
1225    ///
1226    /// This function also allows the caller to configure the send payload data
1227    /// type. See [Outbound data type] for more details.
1228    ///
1229    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1230    /// [`Connection`]: struct.Connection.html
1231    /// [`SendRequest`]: struct.SendRequest.html
1232    /// [Outbound data type]: ../index.html#outbound-data-type.
1233    ///
1234    /// # Examples
1235    ///
1236    /// Basic usage:
1237    ///
1238    /// ```
1239    /// # use tokio::io::{AsyncRead, AsyncWrite};
1240    /// # use rama_http_core::h2::client::*;
1241    /// # use bytes::Bytes;
1242    /// #
1243    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1244    ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), rama_http_core::h2::Error>
1245    /// # {
1246    /// // `client_fut` is a future representing the completion of the HTTP/2
1247    /// // handshake.
1248    /// let client_fut = Builder::new()
1249    ///     .handshake(my_io);
1250    /// # client_fut.await
1251    /// # }
1252    /// #
1253    /// # pub fn main() {}
1254    /// ```
1255    ///
1256    /// Configures the send-payload data type. In this case, the outbound data
1257    /// type will be `&'static [u8]`.
1258    ///
1259    /// ```
1260    /// # use tokio::io::{AsyncRead, AsyncWrite};
1261    /// # use rama_http_core::h2::client::*;
1262    /// #
1263    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1264    /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), rama_http_core::h2::Error>
1265    /// # {
1266    /// // `client_fut` is a future representing the completion of the HTTP/2
1267    /// // handshake.
1268    /// let client_fut = Builder::new()
1269    ///     .handshake::<_, &'static [u8]>(my_io);
1270    /// # client_fut.await
1271    /// # }
1272    /// #
1273    /// # pub fn main() {}
1274    /// ```
1275    pub fn handshake<T, B>(
1276        &self,
1277        io: T,
1278    ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::h2::Error>> + use<T, B>
1279    where
1280        T: AsyncRead + AsyncWrite + Unpin,
1281        B: Buf,
1282    {
1283        Connection::handshake2(io, self.clone())
1284    }
1285}
1286
1287impl Default for Builder {
1288    fn default() -> Builder {
1289        Builder::new()
1290    }
1291}
1292
1293/// Creates a new configured HTTP/2 client with default configuration
1294/// values backed by `io`.
1295///
1296/// It is expected that `io` already be in an appropriate state to commence
1297/// the [HTTP/2 handshake]. See [Handshake] for more details.
1298///
1299/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1300/// tuple once the HTTP/2 handshake has been completed. The returned
1301/// [`Connection`] instance will be using default configuration values. Use
1302/// [`Builder`] to customize the configuration values used by a [`Connection`]
1303/// instance.
1304///
1305/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1306/// [Handshake]: ../index.html#handshake
1307/// [`Connection`]: struct.Connection.html
1308/// [`SendRequest`]: struct.SendRequest.html
1309///
1310/// # Examples
1311///
1312/// ```
1313/// # use tokio::io::{AsyncRead, AsyncWrite};
1314/// # use rama_http_core::h2::client;
1315/// # use rama_http_core::h2::client::*;
1316/// #
1317/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), rama_http_core::h2::Error>
1318/// # {
1319/// let (send_request, connection) = client::handshake(my_io).await?;
1320/// // The HTTP/2 handshake has completed, now start polling
1321/// // `connection` and use `send_request` to send requests to the
1322/// // server.
1323/// # Ok(())
1324/// # }
1325/// #
1326/// # pub fn main() {}
1327/// ```
1328pub async fn handshake<T>(
1329    io: T,
1330) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::h2::Error>
1331where
1332    T: AsyncRead + AsyncWrite + Unpin,
1333{
1334    let builder = Builder::new();
1335    builder
1336        .handshake(io)
1337        .instrument(tracing::trace_span!("client_handshake"))
1338        .await
1339}
1340
1341// ===== impl Connection =====
1342
1343async fn bind_connection<T>(io: &mut T) -> Result<(), crate::h2::Error>
1344where
1345    T: AsyncRead + AsyncWrite + Unpin,
1346{
1347    tracing::debug!("binding client connection");
1348
1349    let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1350    io.write_all(msg).await.map_err(crate::h2::Error::from_io)?;
1351
1352    tracing::debug!("client connection bound");
1353
1354    Ok(())
1355}
1356
1357impl<T, B> Connection<T, B>
1358where
1359    T: AsyncRead + AsyncWrite + Unpin,
1360    B: Buf,
1361{
1362    async fn handshake2(
1363        mut io: T,
1364        builder: Builder,
1365    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::h2::Error> {
1366        bind_connection(&mut io).await?;
1367
1368        // Create the codec
1369        let mut codec = Codec::new(io);
1370
1371        if let Some(max) = builder.settings.max_frame_size() {
1372            codec.set_max_recv_frame_size(max as usize);
1373        }
1374
1375        if let Some(max) = builder.settings.max_header_list_size() {
1376            codec.set_max_recv_header_list_size(max as usize);
1377        }
1378
1379        // Send initial settings frame
1380        codec
1381            .buffer(builder.settings.clone().into())
1382            .expect("invalid SETTINGS frame");
1383
1384        let inner = proto::Connection::new(
1385            codec,
1386            proto::Config {
1387                next_stream_id: builder.stream_id,
1388                initial_max_send_streams: builder.initial_max_send_streams,
1389                max_send_buffer_size: builder.max_send_buffer_size,
1390                reset_stream_duration: builder.reset_stream_duration,
1391                reset_stream_max: builder.reset_stream_max,
1392                remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1393                local_error_reset_streams_max: builder.local_max_error_reset_streams,
1394                settings: builder.settings,
1395                headers_pseudo_order: builder.headers_pseudo_order,
1396                headers_priority: builder.headers_priority,
1397                priority: builder.priority,
1398            },
1399        );
1400        let send_request = SendRequest {
1401            inner: inner.streams().clone(),
1402            pending: None,
1403        };
1404
1405        let mut connection = Connection { inner };
1406        if let Some(sz) = builder.initial_target_connection_window_size {
1407            connection.set_target_window_size(sz);
1408        }
1409
1410        Ok((send_request, connection))
1411    }
1412
1413    /// Sets the target window size for the whole connection.
1414    ///
1415    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1416    /// frame will be immediately sent to the remote, increasing the connection
1417    /// level window by `size - current_value`.
1418    ///
1419    /// If `size` is less than the current value, nothing will happen
1420    /// immediately. However, as window capacity is released by
1421    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1422    /// out until the number of "in flight" bytes drops below `size`.
1423    ///
1424    /// The default value is 65,535.
1425    ///
1426    /// See [`FlowControl`] documentation for more details.
1427    ///
1428    /// [`FlowControl`]: ../struct.FlowControl.html
1429    /// [library level]: ../index.html#flow-control
1430    pub fn set_target_window_size(&mut self, size: u32) {
1431        assert!(size <= proto::MAX_WINDOW_SIZE);
1432        self.inner.set_target_window_size(size);
1433    }
1434
1435    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1436    /// flow control for received data.
1437    ///
1438    /// The `SETTINGS` will be sent to the remote, and only applied once the
1439    /// remote acknowledges the change.
1440    ///
1441    /// This can be used to increase or decrease the window size for existing
1442    /// streams.
1443    ///
1444    /// # Errors
1445    ///
1446    /// Returns an error if a previous call is still pending acknowledgement
1447    /// from the remote endpoint.
1448    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::h2::Error> {
1449        assert!(size <= proto::MAX_WINDOW_SIZE);
1450        self.inner.set_initial_window_size(size)?;
1451        Ok(())
1452    }
1453
1454    /// Takes a `PingPong` instance from the connection.
1455    ///
1456    /// # Note
1457    ///
1458    /// This may only be called once. Calling multiple times will return `None`.
1459    pub fn ping_pong(&mut self) -> Option<PingPong> {
1460        self.inner.take_user_pings().map(PingPong::new)
1461    }
1462
1463    /// Returns the maximum number of concurrent streams that may be initiated
1464    /// by this client.
1465    ///
1466    /// This limit is configured by the server peer by sending the
1467    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1468    /// This method returns the currently acknowledged value received from the
1469    /// remote.
1470    ///
1471    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1472    pub fn max_concurrent_send_streams(&self) -> usize {
1473        self.inner.max_send_streams()
1474    }
1475    /// Returns the maximum number of concurrent streams that may be initiated
1476    /// by the server on this connection.
1477    ///
1478    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1479    /// parameter][1] sent in a `SETTINGS` frame that has been
1480    /// acknowledged by the remote peer. The value to be sent is configured by
1481    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1482    /// with the remote peer.
1483    ///
1484    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1485    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1486    pub fn max_concurrent_recv_streams(&self) -> usize {
1487        self.inner.max_recv_streams()
1488    }
1489}
1490
1491impl<T, B> Future for Connection<T, B>
1492where
1493    T: AsyncRead + AsyncWrite + Unpin,
1494    B: Buf,
1495{
1496    type Output = Result<(), crate::h2::Error>;
1497
1498    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1499        self.inner.maybe_close_connection_if_no_streams();
1500        let had_streams_or_refs = self.inner.has_streams_or_other_references();
1501        let result = self.inner.poll(cx).map_err(Into::into);
1502        // if we had streams/refs, and don't anymore, wake up one more time to
1503        // ensure proper shutdown
1504        if result.is_pending()
1505            && had_streams_or_refs
1506            && !self.inner.has_streams_or_other_references()
1507        {
1508            tracing::trace!("last stream closed during poll, wake again");
1509            cx.waker().wake_by_ref();
1510        }
1511        result
1512    }
1513}
1514
1515impl<T, B> fmt::Debug for Connection<T, B>
1516where
1517    T: AsyncRead + AsyncWrite,
1518    T: fmt::Debug,
1519    B: fmt::Debug + Buf,
1520{
1521    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1522        fmt::Debug::fmt(&self.inner, fmt)
1523    }
1524}
1525
1526// ===== impl ResponseFuture =====
1527
1528impl Future for ResponseFuture {
1529    type Output = Result<Response<RecvStream>, crate::h2::Error>;
1530
1531    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1532        let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1533        let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1534
1535        Poll::Ready(Ok(Response::from_parts(parts, body)))
1536    }
1537}
1538
1539impl ResponseFuture {
1540    /// Returns the stream ID of the response stream.
1541    ///
1542    /// # Panics
1543    ///
1544    /// If the lock on the stream store has been poisoned.
1545    pub fn stream_id(&self) -> crate::h2::StreamId {
1546        crate::h2::StreamId::from_internal(self.inner.stream_id())
1547    }
1548    /// Returns a stream of PushPromises
1549    ///
1550    /// # Panics
1551    ///
1552    /// If this method has been called before
1553    /// or the stream was itself was pushed
1554    pub fn push_promises(&mut self) -> PushPromises {
1555        if self.push_promise_consumed {
1556            panic!("Reference to push promises stream taken!");
1557        }
1558        self.push_promise_consumed = true;
1559        PushPromises {
1560            inner: self.inner.clone(),
1561        }
1562    }
1563}
1564
1565// ===== impl PushPromises =====
1566
1567impl PushPromises {
1568    /// Get the next `PushPromise`.
1569    pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::h2::Error>> {
1570        crate::h2::poll_fn(move |cx| self.poll_push_promise(cx)).await
1571    }
1572
1573    #[doc(hidden)]
1574    pub fn poll_push_promise(
1575        &mut self,
1576        cx: &mut Context<'_>,
1577    ) -> Poll<Option<Result<PushPromise, crate::h2::Error>>> {
1578        match self.inner.poll_pushed(cx) {
1579            Poll::Ready(Some(Ok((request, response)))) => {
1580                let response = PushedResponseFuture {
1581                    inner: ResponseFuture {
1582                        inner: response,
1583                        push_promise_consumed: false,
1584                    },
1585                };
1586                Poll::Ready(Some(Ok(PushPromise { request, response })))
1587            }
1588            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1589            Poll::Ready(None) => Poll::Ready(None),
1590            Poll::Pending => Poll::Pending,
1591        }
1592    }
1593}
1594
1595impl futures_core::Stream for PushPromises {
1596    type Item = Result<PushPromise, crate::h2::Error>;
1597
1598    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1599        self.poll_push_promise(cx)
1600    }
1601}
1602
1603// ===== impl PushPromise =====
1604
1605impl PushPromise {
1606    /// Returns a reference to the push promise's request headers.
1607    pub fn request(&self) -> &Request<()> {
1608        &self.request
1609    }
1610
1611    /// Returns a mutable reference to the push promise's request headers.
1612    pub fn request_mut(&mut self) -> &mut Request<()> {
1613        &mut self.request
1614    }
1615
1616    /// Consumes `self`, returning the push promise's request headers and
1617    /// response future.
1618    pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1619        (self.request, self.response)
1620    }
1621}
1622
1623// ===== impl PushedResponseFuture =====
1624
1625impl Future for PushedResponseFuture {
1626    type Output = Result<Response<RecvStream>, crate::h2::Error>;
1627
1628    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1629        Pin::new(&mut self.inner).poll(cx)
1630    }
1631}
1632
1633impl PushedResponseFuture {
1634    /// Returns the stream ID of the response stream.
1635    ///
1636    /// # Panics
1637    ///
1638    /// If the lock on the stream store has been poisoned.
1639    pub fn stream_id(&self) -> crate::h2::StreamId {
1640        self.inner.stream_id()
1641    }
1642}
1643
1644// ===== impl Peer =====
1645
1646impl Peer {
1647    pub(crate) fn convert_send_message(
1648        id: StreamId,
1649        request: Request<()>,
1650        protocol: Option<Protocol>,
1651        end_of_stream: bool,
1652        headers_pseudo_order: Option<PseudoHeaderOrder>,
1653        headers_priority: Option<StreamDependency>,
1654        priority: Option<Cow<'static, [Priority]>>,
1655    ) -> Result<(Option<Cow<'static, [Priority]>>, Headers), SendError> {
1656        use request::Parts;
1657
1658        let (
1659            Parts {
1660                method,
1661                uri,
1662                headers,
1663                version,
1664                mut extensions,
1665                ..
1666            },
1667            _,
1668        ) = request.into_parts();
1669
1670        let is_connect = method == Method::CONNECT;
1671
1672        // Build the set pseudo header set. All requests will include `method`
1673        // and `path`.
1674        let mut pseudo = Pseudo::request(method, uri, protocol);
1675
1676        // reuse order if defined
1677        if let Some(order) = extensions
1678            .remove::<PseudoHeaderOrder>()
1679            .or(headers_pseudo_order)
1680        {
1681            if !order.is_empty() {
1682                pseudo.order = order;
1683            }
1684        }
1685
1686        let header_order: OriginalHttp1Headers = extensions.remove().unwrap_or_default();
1687
1688        if pseudo.scheme.is_none() {
1689            // If the scheme is not set, then there are a two options.
1690            //
1691            // 1) Authority is not set. In this case, a request was issued with
1692            //    a relative URI. This is permitted **only** when forwarding
1693            //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1694            //    this is an error.
1695            //
1696            // 2) Authority is set, then the HTTP method *must* be CONNECT.
1697            //
1698            // It is not possible to have a scheme but not an authority set (the
1699            // `http` crate does not allow it).
1700            //
1701            if pseudo.authority.is_none() {
1702                if version == Version::HTTP_2 {
1703                    return Err(UserError::MissingUriSchemeAndAuthority.into());
1704                } else {
1705                    // This is acceptable as per the above comment. However,
1706                    // HTTP/2 requires that a scheme is set. Since we are
1707                    // forwarding an HTTP 1.1 request, the scheme is set to
1708                    // "http".
1709                    pseudo.set_scheme(uri::Scheme::HTTP);
1710                }
1711            } else if !is_connect {
1712                // TODO: Error
1713            }
1714        }
1715
1716        // Check the priority list for overflowed stream IDs
1717        if let Some(ref priority) = priority {
1718            if let Some(last_priority) = priority.last() {
1719                // Ensure the next stream ID does not overflow
1720                if last_priority.stream_id().next_id()? > id {
1721                    return Err(UserError::OverflowedStreamId.into());
1722                }
1723            }
1724        }
1725
1726        // Create the HEADERS frame
1727        let mut frame = Headers::new(id, pseudo, headers, header_order, headers_priority);
1728
1729        if end_of_stream {
1730            frame.set_end_stream()
1731        }
1732
1733        Ok((priority, frame))
1734    }
1735}
1736
1737impl proto::Peer for Peer {
1738    type Poll = Response<()>;
1739
1740    const NAME: &'static str = "Client";
1741
1742    fn r#dyn() -> proto::DynPeer {
1743        proto::DynPeer::Client
1744    }
1745
1746    /*
1747    fn is_server() -> bool {
1748        false
1749    }
1750    */
1751
1752    fn convert_poll_message(
1753        pseudo: Pseudo,
1754        fields: HeaderMap,
1755        field_order: OriginalHttp1Headers,
1756        stream_id: StreamId,
1757    ) -> Result<Self::Poll, Error> {
1758        let mut b = Response::builder();
1759
1760        b = b.version(Version::HTTP_2);
1761
1762        if let Some(status) = pseudo.status {
1763            b = b.status(status);
1764        }
1765
1766        let mut response = match b.body(()) {
1767            Ok(response) => response,
1768            Err(_) => {
1769                // TODO: Should there be more specialized handling for different
1770                // kinds of errors
1771                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1772            }
1773        };
1774
1775        if !pseudo.order.is_empty() {
1776            response.extensions_mut().insert(pseudo.order);
1777        }
1778
1779        if !field_order.is_empty() {
1780            response.extensions_mut().insert(field_order);
1781        }
1782
1783        *response.headers_mut() = fields;
1784
1785        Ok(response)
1786    }
1787}