h2/client.rs
1//! Client implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 client requires the caller to establish the underlying
6//! connection as well as get the connection to a state that is ready to begin
7//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12//!
13//! Once a connection is obtained, it is passed to [`handshake`], which will
14//! begin the [HTTP/2 handshake]. This returns a future that completes once
15//! the handshake process is performed and HTTP/2 streams may be initialized.
16//!
17//! [`handshake`] uses default configuration values. There are a number of
18//! settings that can be changed by using [`Builder`] instead.
19//!
20//! Once the handshake future completes, the caller is provided with a
21//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22//! instance is used to drive the connection (see [Managing the connection]).
23//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24//! requests]).
25//!
26//! # Making requests
27//!
28//! Requests are made using the [`SendRequest`] handle provided by the handshake
29//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30//! the request is sent to the server.
31//!
32//! A request body and request trailers are sent using [`SendRequest`] and the
33//! server's response is returned once the [`ResponseFuture`] future completes.
34//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36//! initialized by the sent request.
37//!
38//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39//! stream can be created, i.e. as long as the current number of active streams
40//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41//! caller will be notified once an existing stream closes, freeing capacity for
42//! the caller. The caller should use [`SendRequest::poll_ready`] to check for
43//! capacity before sending a request to the server.
44//!
45//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47//! to do so will result in an [`Error`] being returned.
48//!
49//! # Managing the connection
50//!
51//! The [`Connection`] instance is used to manage connection state. The caller
52//! is required to call [`Connection::poll`] in order to advance state.
53//! [`SendRequest::send_request`] and other functions have no effect unless
54//! [`Connection::poll`] is called.
55//!
56//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57//! returns `Ready`. At this point, the underlying socket has been closed and no
58//! further work needs to be done.
59//!
60//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61//! submit the [`Connection`] instance to an [executor]. The executor will then
62//! manage polling the connection until the connection is complete.
63//! Alternatively, the caller can call `poll` manually.
64//!
65//! # Example
66//!
67//! ```rust, no_run
68//!
69//! use h2::client;
70//!
71//! use http::{Request, Method};
72//! use std::error::Error;
73//! use tokio::net::TcpStream;
74//!
75//! #[tokio::main]
76//! pub async fn main() -> Result<(), Box<dyn Error>> {
77//! // Establish TCP connection to the server.
78//! let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79//! let (h2, connection) = client::handshake(tcp).await?;
80//! tokio::spawn(async move {
81//! connection.await.unwrap();
82//! });
83//!
84//! let mut h2 = h2.ready().await?;
85//! // Prepare the HTTP request to send to the server.
86//! let request = Request::builder()
87//! .method(Method::GET)
88//! .uri("https://www.example.com/")
89//! .body(())
90//! .unwrap();
91//!
92//! // Send the request. The second tuple item allows the caller
93//! // to stream a request body.
94//! let (response, _) = h2.send_request(request, true).unwrap();
95//!
96//! let (head, mut body) = response.await?.into_parts();
97//!
98//! println!("Received response: {:?}", head);
99//!
100//! // The `flow_control` handle allows the caller to manage
101//! // flow control.
102//! //
103//! // Whenever data is received, the caller is responsible for
104//! // releasing capacity back to the server once it has freed
105//! // the data from memory.
106//! let mut flow_control = body.flow_control().clone();
107//!
108//! while let Some(chunk) = body.data().await {
109//! let chunk = chunk?;
110//! println!("RX: {:?}", chunk);
111//!
112//! // Let the server send more data.
113//! let _ = flow_control.release_capacity(chunk.len());
114//! }
115//!
116//! Ok(())
117//! }
118//! ```
119//!
120//! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121//! [`handshake`]: fn.handshake.html
122//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123//! [`SendRequest`]: struct.SendRequest.html
124//! [`SendStream`]: ../struct.SendStream.html
125//! [Making requests]: #making-requests
126//! [Managing the connection]: #managing-the-connection
127//! [`Connection`]: struct.Connection.html
128//! [`Connection::poll`]: struct.Connection.html#method.poll
129//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131//! [`SendRequest`]: struct.SendRequest.html
132//! [`ResponseFuture`]: struct.ResponseFuture.html
133//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135//! [`Builder`]: struct.Builder.html
136//! [`Error`]: ../struct.Error.html
137
138use crate::codec::{Codec, SendError, UserError};
139use crate::ext::Protocol;
140use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
141use crate::proto::{self, Error};
142use crate::{FlowControl, PingPong, RecvStream, SendStream};
143
144use bytes::{Buf, Bytes};
145use http::{uri, HeaderMap, Method, Request, Response, Version};
146use std::fmt;
147use std::future::Future;
148use std::pin::Pin;
149use std::task::{Context, Poll};
150use std::time::Duration;
151use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
152use tracing::Instrument;
153
154/// Initializes new HTTP/2 streams on a connection by sending a request.
155///
156/// This type does no work itself. Instead, it is a handle to the inner
157/// connection state held by [`Connection`]. If the associated connection
158/// instance is dropped, all `SendRequest` functions will return [`Error`].
159///
160/// [`SendRequest`] instances are able to move to and operate on separate tasks
161/// / threads than their associated [`Connection`] instance. Internally, there
162/// is a buffer used to stage requests before they get written to the
163/// connection. There is no guarantee that requests get written to the
164/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
165///
166/// [`SendRequest`] implements [`Clone`], enabling the creation of many
167/// instances that are backed by a single connection.
168///
169/// See [module] level documentation for more details.
170///
171/// [module]: index.html
172/// [`Connection`]: struct.Connection.html
173/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
174/// [`Error`]: ../struct.Error.html
175pub struct SendRequest<B: Buf> {
176 inner: proto::Streams<B, Peer>,
177 pending: Option<proto::OpaqueStreamRef>,
178}
179
180/// Returns a `SendRequest` instance once it is ready to send at least one
181/// request.
182#[derive(Debug)]
183pub struct ReadySendRequest<B: Buf> {
184 inner: Option<SendRequest<B>>,
185}
186
187/// Manages all state associated with an HTTP/2 client connection.
188///
189/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
190/// implements the HTTP/2 client logic for that connection. It is responsible
191/// for driving the internal state forward, performing the work requested of the
192/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
193/// [`RecvStream`]).
194///
195/// `Connection` values are created by calling [`handshake`]. Once a
196/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
197/// until `Ready` is returned. The easiest way to do this is to submit the
198/// `Connection` instance to an [executor].
199///
200/// [module]: index.html
201/// [`handshake`]: fn.handshake.html
202/// [`SendRequest`]: struct.SendRequest.html
203/// [`ResponseFuture`]: struct.ResponseFuture.html
204/// [`SendStream`]: ../struct.SendStream.html
205/// [`RecvStream`]: ../struct.RecvStream.html
206/// [`poll`]: #method.poll
207/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
208///
209/// # Examples
210///
211/// ```
212/// # use tokio::io::{AsyncRead, AsyncWrite};
213/// # use h2::client;
214/// # use h2::client::*;
215/// #
216/// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
217/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
218/// # {
219/// let (send_request, connection) = client::handshake(my_io).await?;
220/// // Submit the connection handle to an executor.
221/// tokio::spawn(async { connection.await.expect("connection failed"); });
222///
223/// // Now, use `send_request` to initialize HTTP/2 streams.
224/// // ...
225/// # Ok(())
226/// # }
227/// #
228/// # pub fn main() {}
229/// ```
230#[must_use = "futures do nothing unless polled"]
231pub struct Connection<T, B: Buf = Bytes> {
232 inner: proto::Connection<T, Peer, B>,
233}
234
235/// A future of an HTTP response.
236#[derive(Debug)]
237#[must_use = "futures do nothing unless polled"]
238pub struct ResponseFuture {
239 inner: proto::OpaqueStreamRef,
240 push_promise_consumed: bool,
241}
242
243/// A future of a pushed HTTP response.
244///
245/// We have to differentiate between pushed and non pushed because of the spec
246/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
247/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
248/// > that is in either the "open" or "half-closed (remote)" state.
249#[derive(Debug)]
250#[must_use = "futures do nothing unless polled"]
251pub struct PushedResponseFuture {
252 inner: ResponseFuture,
253}
254
255/// A pushed response and corresponding request headers
256#[derive(Debug)]
257pub struct PushPromise {
258 /// The request headers
259 request: Request<()>,
260
261 /// The pushed response
262 response: PushedResponseFuture,
263}
264
265/// A stream of pushed responses and corresponding promised requests
266#[derive(Debug)]
267#[must_use = "streams do nothing unless polled"]
268pub struct PushPromises {
269 inner: proto::OpaqueStreamRef,
270}
271
272/// Builds client connections with custom configuration values.
273///
274/// Methods can be chained in order to set the configuration values.
275///
276/// The client is constructed by calling [`handshake`] and passing the I/O
277/// handle that will back the HTTP/2 server.
278///
279/// New instances of `Builder` are obtained via [`Builder::new`].
280///
281/// See function level documentation for details on the various client
282/// configuration settings.
283///
284/// [`Builder::new`]: struct.Builder.html#method.new
285/// [`handshake`]: struct.Builder.html#method.handshake
286///
287/// # Examples
288///
289/// ```
290/// # use tokio::io::{AsyncRead, AsyncWrite};
291/// # use h2::client::*;
292/// # use bytes::Bytes;
293/// #
294/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
295/// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
296/// # {
297/// // `client_fut` is a future representing the completion of the HTTP/2
298/// // handshake.
299/// let client_fut = Builder::new()
300/// .initial_window_size(1_000_000)
301/// .max_concurrent_streams(1000)
302/// .handshake(my_io);
303/// # client_fut.await
304/// # }
305/// #
306/// # pub fn main() {}
307/// ```
308#[derive(Clone, Debug)]
309pub struct Builder {
310 /// Time to keep locally reset streams around before reaping.
311 reset_stream_duration: Duration,
312
313 /// Initial maximum number of locally initiated (send) streams.
314 /// After receiving a SETTINGS frame from the remote peer,
315 /// the connection will overwrite this value with the
316 /// MAX_CONCURRENT_STREAMS specified in the frame.
317 /// If no value is advertised by the remote peer in the initial SETTINGS
318 /// frame, it will be set to usize::MAX.
319 initial_max_send_streams: usize,
320
321 /// Initial target window size for new connections.
322 initial_target_connection_window_size: Option<u32>,
323
324 /// Maximum amount of bytes to "buffer" for writing per stream.
325 max_send_buffer_size: usize,
326
327 /// Maximum number of locally reset streams to keep at a time.
328 reset_stream_max: usize,
329
330 /// Maximum number of remotely reset streams to allow in the pending
331 /// accept queue.
332 pending_accept_reset_stream_max: usize,
333
334 /// Initial `Settings` frame to send as part of the handshake.
335 settings: Settings,
336
337 /// The stream ID of the first (lowest) stream. Subsequent streams will use
338 /// monotonically increasing stream IDs.
339 stream_id: StreamId,
340
341 /// Maximum number of locally reset streams due to protocol error across
342 /// the lifetime of the connection.
343 ///
344 /// When this gets exceeded, we issue GOAWAYs.
345 local_max_error_reset_streams: Option<usize>,
346
347 /// The headers frame pseudo order
348 headers_pseudo_order: Option<crate::frame::PseudoOrder>,
349
350 /// Whether to include PRIORITY flag in HEADERS frames and its parameters.
351 /// None = no PRIORITY, Some((weight, dep, exclusive)) = PRIORITY with those values.
352 headers_priority: Option<(u8, u32, bool)>,
353
354 /// Optional ordering for HTTP/2 regular headers (for browser fingerprinting).
355 /// When set, headers are encoded in this order instead of hash-based order.
356 headers_order: Option<Vec<http::HeaderName>>,
357}
358
359#[derive(Debug)]
360pub(crate) struct Peer;
361
362// ===== impl SendRequest =====
363
364impl<B> SendRequest<B>
365where
366 B: Buf,
367{
368 /// Returns `Ready` when the connection can initialize a new HTTP/2
369 /// stream.
370 ///
371 /// This function must return `Ready` before `send_request` is called. When
372 /// `Poll::Pending` is returned, the task will be notified once the readiness
373 /// state changes.
374 ///
375 /// See [module] level docs for more details.
376 ///
377 /// [module]: index.html
378 pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
379 ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
380 self.pending = None;
381 Poll::Ready(Ok(()))
382 }
383
384 /// Consumes `self`, returning a future that returns `self` back once it is
385 /// ready to send a request.
386 ///
387 /// This function should be called before calling `send_request`.
388 ///
389 /// This is a functional combinator for [`poll_ready`]. The returned future
390 /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
391 /// the caller.
392 ///
393 /// # Examples
394 ///
395 /// ```rust
396 /// # use h2::client::*;
397 /// # use http::*;
398 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
399 /// # {
400 /// // First, wait until the `send_request` handle is ready to send a new
401 /// // request
402 /// let mut send_request = send_request.ready().await.unwrap();
403 /// // Use `send_request` here.
404 /// # }
405 /// # pub fn main() {}
406 /// ```
407 ///
408 /// See [module] level docs for more details.
409 ///
410 /// [`poll_ready`]: #method.poll_ready
411 /// [module]: index.html
412 pub fn ready(self) -> ReadySendRequest<B> {
413 ReadySendRequest { inner: Some(self) }
414 }
415
416 /// Sends a HTTP/2 request to the server.
417 ///
418 /// `send_request` initializes a new HTTP/2 stream on the associated
419 /// connection, then sends the given request using this new stream. Only the
420 /// request head is sent.
421 ///
422 /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
423 /// are returned. The [`ResponseFuture`] instance is used to get the
424 /// server's response and the [`SendStream`] instance is used to send a
425 /// request body or trailers to the server over the same HTTP/2 stream.
426 ///
427 /// To send a request body or trailers, set `end_of_stream` to `false`.
428 /// Then, use the returned [`SendStream`] instance to stream request body
429 /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
430 /// then attempting to call [`SendStream::send_data`] or
431 /// [`SendStream::send_trailers`] will result in an error.
432 ///
433 /// If no request body or trailers are to be sent, set `end_of_stream` to
434 /// `true` and drop the returned [`SendStream`] instance.
435 ///
436 /// # A note on HTTP versions
437 ///
438 /// The provided `Request` will be encoded differently depending on the
439 /// value of its version field. If the version is set to 2.0, then the
440 /// request is encoded as per the specification recommends.
441 ///
442 /// If the version is set to a lower value, then the request is encoded to
443 /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
444 /// headers are permitted and the `:authority` pseudo header is not
445 /// included.
446 ///
447 /// The caller should always set the request's version field to 2.0 unless
448 /// specifically transmitting an HTTP 1.1 request over 2.0.
449 ///
450 /// # Examples
451 ///
452 /// Sending a request with no body
453 ///
454 /// ```rust
455 /// # use h2::client::*;
456 /// # use http::*;
457 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
458 /// # {
459 /// // First, wait until the `send_request` handle is ready to send a new
460 /// // request
461 /// let mut send_request = send_request.ready().await.unwrap();
462 /// // Prepare the HTTP request to send to the server.
463 /// let request = Request::get("https://www.example.com/")
464 /// .body(())
465 /// .unwrap();
466 ///
467 /// // Send the request to the server. Since we are not sending a
468 /// // body or trailers, we can drop the `SendStream` instance.
469 /// let (response, _) = send_request.send_request(request, true).unwrap();
470 /// let response = response.await.unwrap();
471 /// // Process the response
472 /// # }
473 /// # pub fn main() {}
474 /// ```
475 ///
476 /// Sending a request with a body and trailers
477 ///
478 /// ```rust
479 /// # use h2::client::*;
480 /// # use http::*;
481 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
482 /// # {
483 /// // First, wait until the `send_request` handle is ready to send a new
484 /// // request
485 /// let mut send_request = send_request.ready().await.unwrap();
486 ///
487 /// // Prepare the HTTP request to send to the server.
488 /// let request = Request::get("https://www.example.com/")
489 /// .body(())
490 /// .unwrap();
491 ///
492 /// // Send the request to the server. If we are not sending a
493 /// // body or trailers, we can drop the `SendStream` instance.
494 /// let (response, mut send_stream) = send_request
495 /// .send_request(request, false).unwrap();
496 ///
497 /// // At this point, one option would be to wait for send capacity.
498 /// // Doing so would allow us to not hold data in memory that
499 /// // cannot be sent. However, this is not a requirement, so this
500 /// // example will skip that step. See `SendStream` documentation
501 /// // for more details.
502 /// send_stream.send_data(b"hello", false).unwrap();
503 /// send_stream.send_data(b"world", false).unwrap();
504 ///
505 /// // Send the trailers.
506 /// let mut trailers = HeaderMap::new();
507 /// trailers.insert(
508 /// header::HeaderName::from_bytes(b"my-trailer").unwrap(),
509 /// header::HeaderValue::from_bytes(b"hello").unwrap());
510 ///
511 /// send_stream.send_trailers(trailers).unwrap();
512 ///
513 /// let response = response.await.unwrap();
514 /// // Process the response
515 /// # }
516 /// # pub fn main() {}
517 /// ```
518 ///
519 /// [`ResponseFuture`]: struct.ResponseFuture.html
520 /// [`SendStream`]: ../struct.SendStream.html
521 /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
522 /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
523 pub fn send_request(
524 &mut self,
525 request: Request<()>,
526 end_of_stream: bool,
527 ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
528 self.inner
529 .send_request(request, end_of_stream, self.pending.as_ref())
530 .map_err(Into::into)
531 .map(|(stream, is_full)| {
532 if stream.is_pending_open() && is_full {
533 // Only prevent sending another request when the request queue
534 // is not full.
535 self.pending = Some(stream.clone_to_opaque());
536 }
537
538 let response = ResponseFuture {
539 inner: stream.clone_to_opaque(),
540 push_promise_consumed: false,
541 };
542
543 let stream = SendStream::new(stream);
544
545 (response, stream)
546 })
547 }
548
549 /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
550 ///
551 /// This setting is configured by the server peer by sending the
552 /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
553 /// This method returns the currently acknowledged value received from the
554 /// remote.
555 ///
556 /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
557 /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
558 pub fn is_extended_connect_protocol_enabled(&self) -> bool {
559 self.inner.is_extended_connect_protocol_enabled()
560 }
561
562 /// Returns the current max send streams
563 pub fn current_max_send_streams(&self) -> usize {
564 self.inner.current_max_send_streams()
565 }
566
567 /// Returns the current max recv streams
568 pub fn current_max_recv_streams(&self) -> usize {
569 self.inner.current_max_recv_streams()
570 }
571}
572
573impl<B> fmt::Debug for SendRequest<B>
574where
575 B: Buf,
576{
577 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
578 fmt.debug_struct("SendRequest").finish()
579 }
580}
581
582impl<B> Clone for SendRequest<B>
583where
584 B: Buf,
585{
586 fn clone(&self) -> Self {
587 SendRequest {
588 inner: self.inner.clone(),
589 pending: None,
590 }
591 }
592}
593
594#[cfg(feature = "unstable")]
595impl<B> SendRequest<B>
596where
597 B: Buf,
598{
599 /// Returns the number of active streams.
600 ///
601 /// An active stream is a stream that has not yet transitioned to a closed
602 /// state.
603 pub fn num_active_streams(&self) -> usize {
604 self.inner.num_active_streams()
605 }
606
607 /// Returns the number of streams that are held in memory.
608 ///
609 /// A wired stream is a stream that is either active or is closed but must
610 /// stay in memory for some reason. For example, there are still outstanding
611 /// userspace handles pointing to the slot.
612 pub fn num_wired_streams(&self) -> usize {
613 self.inner.num_wired_streams()
614 }
615}
616
617// ===== impl ReadySendRequest =====
618
619impl<B> Future for ReadySendRequest<B>
620where
621 B: Buf,
622{
623 type Output = Result<SendRequest<B>, crate::Error>;
624
625 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
626 match &mut self.inner {
627 Some(send_request) => {
628 ready!(send_request.poll_ready(cx))?;
629 }
630 None => panic!("called `poll` after future completed"),
631 }
632
633 Poll::Ready(Ok(self.inner.take().unwrap()))
634 }
635}
636
637// ===== impl Builder =====
638
639impl Builder {
640 /// Returns a new client builder instance initialized with default
641 /// configuration values.
642 ///
643 /// Configuration methods can be chained on the return value.
644 ///
645 /// # Examples
646 ///
647 /// ```
648 /// # use tokio::io::{AsyncRead, AsyncWrite};
649 /// # use h2::client::*;
650 /// # use bytes::Bytes;
651 /// #
652 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
653 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
654 /// # {
655 /// // `client_fut` is a future representing the completion of the HTTP/2
656 /// // handshake.
657 /// let client_fut = Builder::new()
658 /// .initial_window_size(1_000_000)
659 /// .max_concurrent_streams(1000)
660 /// .handshake(my_io);
661 /// # client_fut.await
662 /// # }
663 /// #
664 /// # pub fn main() {}
665 /// ```
666 pub fn new() -> Builder {
667 Builder {
668 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
669 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
670 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
671 pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
672 initial_target_connection_window_size: None,
673 initial_max_send_streams: usize::MAX,
674 settings: Default::default(),
675 stream_id: 1.into(),
676 local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
677 headers_pseudo_order: None,
678 headers_priority: None,
679 headers_order: None,
680 }
681 }
682
683 /// Indicates the initial window size (in octets) for stream-level
684 /// flow control for received data.
685 ///
686 /// The initial window of a stream is used as part of flow control. For more
687 /// details, see [`FlowControl`].
688 ///
689 /// The default value is 65,535.
690 ///
691 /// [`FlowControl`]: ../struct.FlowControl.html
692 ///
693 /// # Examples
694 ///
695 /// ```
696 /// # use tokio::io::{AsyncRead, AsyncWrite};
697 /// # use h2::client::*;
698 /// # use bytes::Bytes;
699 /// #
700 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
701 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
702 /// # {
703 /// // `client_fut` is a future representing the completion of the HTTP/2
704 /// // handshake.
705 /// let client_fut = Builder::new()
706 /// .initial_window_size(1_000_000)
707 /// .handshake(my_io);
708 /// # client_fut.await
709 /// # }
710 /// #
711 /// # pub fn main() {}
712 /// ```
713 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
714 self.settings.set_initial_window_size(Some(size));
715 self
716 }
717
718 /// Indicates the initial window size (in octets) for connection-level flow control
719 /// for received data.
720 ///
721 /// The initial window of a connection is used as part of flow control. For more details,
722 /// see [`FlowControl`].
723 ///
724 /// The default value is 65,535.
725 ///
726 /// [`FlowControl`]: ../struct.FlowControl.html
727 ///
728 /// # Examples
729 ///
730 /// ```
731 /// # use tokio::io::{AsyncRead, AsyncWrite};
732 /// # use h2::client::*;
733 /// # use bytes::Bytes;
734 /// #
735 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
736 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
737 /// # {
738 /// // `client_fut` is a future representing the completion of the HTTP/2
739 /// // handshake.
740 /// let client_fut = Builder::new()
741 /// .initial_connection_window_size(1_000_000)
742 /// .handshake(my_io);
743 /// # client_fut.await
744 /// # }
745 /// #
746 /// # pub fn main() {}
747 /// ```
748 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
749 self.initial_target_connection_window_size = Some(size);
750 self
751 }
752
753 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
754 /// configured client is able to accept.
755 ///
756 /// The sender may send data frames that are **smaller** than this value,
757 /// but any data larger than `max` will be broken up into multiple `DATA`
758 /// frames.
759 ///
760 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
761 ///
762 /// # Examples
763 ///
764 /// ```
765 /// # use tokio::io::{AsyncRead, AsyncWrite};
766 /// # use h2::client::*;
767 /// # use bytes::Bytes;
768 /// #
769 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
770 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
771 /// # {
772 /// // `client_fut` is a future representing the completion of the HTTP/2
773 /// // handshake.
774 /// let client_fut = Builder::new()
775 /// .max_frame_size(1_000_000)
776 /// .handshake(my_io);
777 /// # client_fut.await
778 /// # }
779 /// #
780 /// # pub fn main() {}
781 /// ```
782 ///
783 /// # Panics
784 ///
785 /// This function panics if `max` is not within the legal range specified
786 /// above.
787 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
788 self.settings.set_max_frame_size(Some(max));
789 self
790 }
791
792 /// Sets the max size of received header frames.
793 ///
794 /// This advisory setting informs a peer of the maximum size of header list
795 /// that the sender is prepared to accept, in octets. The value is based on
796 /// the uncompressed size of header fields, including the length of the name
797 /// and value in octets plus an overhead of 32 octets for each header field.
798 ///
799 /// This setting is also used to limit the maximum amount of data that is
800 /// buffered to decode HEADERS frames.
801 ///
802 /// # Examples
803 ///
804 /// ```
805 /// # use tokio::io::{AsyncRead, AsyncWrite};
806 /// # use h2::client::*;
807 /// # use bytes::Bytes;
808 /// #
809 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
810 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
811 /// # {
812 /// // `client_fut` is a future representing the completion of the HTTP/2
813 /// // handshake.
814 /// let client_fut = Builder::new()
815 /// .max_header_list_size(16 * 1024)
816 /// .handshake(my_io);
817 /// # client_fut.await
818 /// # }
819 /// #
820 /// # pub fn main() {}
821 /// ```
822 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
823 self.settings.set_max_header_list_size(Some(max));
824 self
825 }
826
827 /// Sets the maximum number of concurrent streams.
828 ///
829 /// The maximum concurrent streams setting only controls the maximum number
830 /// of streams that can be initiated by the remote peer. In other words,
831 /// when this setting is set to 100, this does not limit the number of
832 /// concurrent streams that can be created by the caller.
833 ///
834 /// It is recommended that this value be no smaller than 100, so as to not
835 /// unnecessarily limit parallelism. However, any value is legal, including
836 /// 0. If `max` is set to 0, then the remote will not be permitted to
837 /// initiate streams.
838 ///
839 /// Note that streams in the reserved state, i.e., push promises that have
840 /// been reserved but the stream has not started, do not count against this
841 /// setting.
842 ///
843 /// Also note that if the remote *does* exceed the value set here, it is not
844 /// a protocol level error. Instead, the `h2` library will immediately reset
845 /// the stream.
846 ///
847 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
848 ///
849 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
850 ///
851 /// # Examples
852 ///
853 /// ```
854 /// # use tokio::io::{AsyncRead, AsyncWrite};
855 /// # use h2::client::*;
856 /// # use bytes::Bytes;
857 /// #
858 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
859 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
860 /// # {
861 /// // `client_fut` is a future representing the completion of the HTTP/2
862 /// // handshake.
863 /// let client_fut = Builder::new()
864 /// .max_concurrent_streams(1000)
865 /// .handshake(my_io);
866 /// # client_fut.await
867 /// # }
868 /// #
869 /// # pub fn main() {}
870 /// ```
871 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
872 self.settings.set_max_concurrent_streams(Some(max));
873 self
874 }
875
876 /// Sets the initial maximum of locally initiated (send) streams.
877 ///
878 /// The initial settings will be overwritten by the remote peer when
879 /// the SETTINGS frame is received. The new value will be set to the
880 /// `max_concurrent_streams()` from the frame. If no value is advertised in
881 /// the initial SETTINGS frame from the remote peer as part of
882 /// [HTTP/2 Connection Preface], `usize::MAX` will be set.
883 ///
884 /// This setting prevents the caller from exceeding this number of
885 /// streams that are counted towards the concurrency limit.
886 ///
887 /// Sending streams past the limit returned by the peer will be treated
888 /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
889 ///
890 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
891 ///
892 /// The default value is `usize::MAX`.
893 ///
894 /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
895 /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
896 ///
897 /// # Examples
898 ///
899 /// ```
900 /// # use tokio::io::{AsyncRead, AsyncWrite};
901 /// # use h2::client::*;
902 /// # use bytes::Bytes;
903 /// #
904 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
905 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
906 /// # {
907 /// // `client_fut` is a future representing the completion of the HTTP/2
908 /// // handshake.
909 /// let client_fut = Builder::new()
910 /// .initial_max_send_streams(1000)
911 /// .handshake(my_io);
912 /// # client_fut.await
913 /// # }
914 /// #
915 /// # pub fn main() {}
916 /// ```
917 pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
918 self.initial_max_send_streams = initial;
919 self
920 }
921
922 /// Sets the maximum number of concurrent locally reset streams.
923 ///
924 /// When a stream is explicitly reset, the HTTP/2 specification requires
925 /// that any further frames received for that stream must be ignored for
926 /// "some time".
927 ///
928 /// In order to satisfy the specification, internal state must be maintained
929 /// to implement the behavior. This state grows linearly with the number of
930 /// streams that are locally reset.
931 ///
932 /// The `max_concurrent_reset_streams` setting configures sets an upper
933 /// bound on the amount of state that is maintained. When this max value is
934 /// reached, the oldest reset stream is purged from memory.
935 ///
936 /// Once the stream has been fully purged from memory, any additional frames
937 /// received for that stream will result in a connection level protocol
938 /// error, forcing the connection to terminate.
939 ///
940 /// The default value is currently 50.
941 ///
942 /// # Examples
943 ///
944 /// ```
945 /// # use tokio::io::{AsyncRead, AsyncWrite};
946 /// # use h2::client::*;
947 /// # use bytes::Bytes;
948 /// #
949 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
950 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
951 /// # {
952 /// // `client_fut` is a future representing the completion of the HTTP/2
953 /// // handshake.
954 /// let client_fut = Builder::new()
955 /// .max_concurrent_reset_streams(1000)
956 /// .handshake(my_io);
957 /// # client_fut.await
958 /// # }
959 /// #
960 /// # pub fn main() {}
961 /// ```
962 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
963 self.reset_stream_max = max;
964 self
965 }
966
967 /// Sets the duration to remember locally reset streams.
968 ///
969 /// When a stream is explicitly reset, the HTTP/2 specification requires
970 /// that any further frames received for that stream must be ignored for
971 /// "some time".
972 ///
973 /// In order to satisfy the specification, internal state must be maintained
974 /// to implement the behavior. This state grows linearly with the number of
975 /// streams that are locally reset.
976 ///
977 /// The `reset_stream_duration` setting configures the max amount of time
978 /// this state will be maintained in memory. Once the duration elapses, the
979 /// stream state is purged from memory.
980 ///
981 /// Once the stream has been fully purged from memory, any additional frames
982 /// received for that stream will result in a connection level protocol
983 /// error, forcing the connection to terminate.
984 ///
985 /// The default value is currently 1 second.
986 ///
987 /// # Examples
988 ///
989 /// ```
990 /// # use tokio::io::{AsyncRead, AsyncWrite};
991 /// # use h2::client::*;
992 /// # use std::time::Duration;
993 /// # use bytes::Bytes;
994 /// #
995 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
996 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
997 /// # {
998 /// // `client_fut` is a future representing the completion of the HTTP/2
999 /// // handshake.
1000 /// let client_fut = Builder::new()
1001 /// .reset_stream_duration(Duration::from_secs(10))
1002 /// .handshake(my_io);
1003 /// # client_fut.await
1004 /// # }
1005 /// #
1006 /// # pub fn main() {}
1007 /// ```
1008 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1009 self.reset_stream_duration = dur;
1010 self
1011 }
1012
1013 /// Sets the maximum number of local resets due to protocol errors made by the remote end.
1014 ///
1015 /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
1016 /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
1017 /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
1018 ///
1019 /// When the number of local resets exceeds this threshold, the client will close the connection.
1020 ///
1021 /// If you really want to disable this, supply [`Option::None`] here.
1022 /// Disabling this is not recommended and may expose you to DOS attacks.
1023 ///
1024 /// The default value is currently 1024, but could change.
1025 pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
1026 self.local_max_error_reset_streams = max;
1027 self
1028 }
1029
1030 /// Sets the maximum number of pending-accept remotely-reset streams.
1031 ///
1032 /// Streams that have been received by the peer, but not accepted by the
1033 /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
1034 /// could send a request and then shortly after, realize it is not needed,
1035 /// sending a CANCEL.
1036 ///
1037 /// However, since those streams are now "closed", they don't count towards
1038 /// the max concurrent streams. So, they will sit in the accept queue,
1039 /// using memory.
1040 ///
1041 /// When the number of remotely-reset streams sitting in the pending-accept
1042 /// queue reaches this maximum value, a connection error with the code of
1043 /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
1044 /// `Future`.
1045 ///
1046 /// The default value is currently 20, but could change.
1047 ///
1048 /// # Examples
1049 ///
1050 /// ```
1051 /// # use tokio::io::{AsyncRead, AsyncWrite};
1052 /// # use h2::client::*;
1053 /// # use bytes::Bytes;
1054 /// #
1055 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1056 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1057 /// # {
1058 /// // `client_fut` is a future representing the completion of the HTTP/2
1059 /// // handshake.
1060 /// let client_fut = Builder::new()
1061 /// .max_pending_accept_reset_streams(100)
1062 /// .handshake(my_io);
1063 /// # client_fut.await
1064 /// # }
1065 /// #
1066 /// # pub fn main() {}
1067 /// ```
1068 pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1069 self.pending_accept_reset_stream_max = max;
1070 self
1071 }
1072
1073 /// Sets the maximum send buffer size per stream.
1074 ///
1075 /// Once a stream has buffered up to (or over) the maximum, the stream's
1076 /// flow control will not "poll" additional capacity. Once bytes for the
1077 /// stream have been written to the connection, the send buffer capacity
1078 /// will be freed up again.
1079 ///
1080 /// The default is currently ~400KB, but may change.
1081 ///
1082 /// # Panics
1083 ///
1084 /// This function panics if `max` is larger than `u32::MAX`.
1085 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1086 assert!(max <= u32::MAX as usize);
1087 self.max_send_buffer_size = max;
1088 self
1089 }
1090
1091 /// Enables or disables server push promises.
1092 ///
1093 /// This value is included in the initial SETTINGS handshake.
1094 /// Setting this value to value to
1095 /// false in the initial SETTINGS handshake guarantees that the remote server
1096 /// will never send a push promise.
1097 ///
1098 /// This setting can be changed during the life of a single HTTP/2
1099 /// connection by sending another settings frame updating the value.
1100 ///
1101 /// Default value: `true`.
1102 ///
1103 /// # Examples
1104 ///
1105 /// ```
1106 /// # use tokio::io::{AsyncRead, AsyncWrite};
1107 /// # use h2::client::*;
1108 /// # use std::time::Duration;
1109 /// # use bytes::Bytes;
1110 /// #
1111 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1112 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1113 /// # {
1114 /// // `client_fut` is a future representing the completion of the HTTP/2
1115 /// // handshake.
1116 /// let client_fut = Builder::new()
1117 /// .enable_push(false)
1118 /// .handshake(my_io);
1119 /// # client_fut.await
1120 /// # }
1121 /// #
1122 /// # pub fn main() {}
1123 /// ```
1124 pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1125 self.settings.set_enable_push(enabled);
1126 self
1127 }
1128
1129 /// Sets the header table size.
1130 ///
1131 /// This setting informs the peer of the maximum size of the header compression
1132 /// table used to encode header blocks, in octets. The encoder may select any value
1133 /// equal to or less than the header table size specified by the sender.
1134 ///
1135 /// The default value is 4,096.
1136 ///
1137 /// # Examples
1138 ///
1139 /// ```
1140 /// # use tokio::io::{AsyncRead, AsyncWrite};
1141 /// # use h2::client::*;
1142 /// # use bytes::Bytes;
1143 /// #
1144 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1145 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1146 /// # {
1147 /// // `client_fut` is a future representing the completion of the HTTP/2
1148 /// // handshake.
1149 /// let client_fut = Builder::new()
1150 /// .header_table_size(1_000_000)
1151 /// .handshake(my_io);
1152 /// # client_fut.await
1153 /// # }
1154 /// #
1155 /// # pub fn main() {}
1156 /// ```
1157 pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1158 self.settings.set_header_table_size(Some(size));
1159 self
1160 }
1161
1162 /// Sets the first stream ID to something other than 1.
1163 #[cfg(feature = "unstable")]
1164 pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1165 self.stream_id = stream_id.into();
1166 assert!(
1167 self.stream_id.is_client_initiated(),
1168 "stream id must be odd"
1169 );
1170 self
1171 }
1172
1173 /// Sets the order of settings parameters in the initial SETTINGS frame.
1174 ///
1175 /// This determines the order in which settings are sent during the HTTP/2 handshake.
1176 /// Customizing the order may be useful for testing or protocol compliance.
1177 pub fn settings_order(&mut self, order: crate::frame::SettingsOrder) -> &mut Self {
1178 self.settings.set_settings_order(order);
1179 self
1180 }
1181
1182 /// Sets the NO_RFC7540_PRIORITIES setting.
1183 ///
1184 /// This setting indicates that the sender does not support RFC 7540 priorities.
1185 /// When set to 1, the sender is indicating it does not use the priority
1186 /// mechanism described in RFC 7540.
1187 pub fn no_rfc7540_priorities(&mut self, enabled: u32) -> &mut Self {
1188 self.settings.set_no_rfc7540_priorities(Some(enabled));
1189 self
1190 }
1191
1192 /// Sets the ENABLE_CONNECT_PROTOCOL setting.
1193 ///
1194 /// This setting enables the use of the extended CONNECT method
1195 /// (RFC 8441). When set to 1, the sender indicates support for
1196 /// extended CONNECT.
1197 pub fn enable_connect_protocol(&mut self, val: u32) -> &mut Self {
1198 self.settings.set_enable_connect_protocol(Some(val));
1199 self
1200 }
1201
1202 /// Sets the HTTP/2 pseudo-header field order for outgoing HEADERS frames.
1203 ///
1204 /// This determines the order in which pseudo-header fields (such as `:method`, `:scheme`, etc.)
1205 /// are encoded in the HEADERS frame. Customizing the order may be useful for interoperability
1206 /// or testing purposes.
1207 pub fn headers_pseudo_order(&mut self, order: crate::frame::PseudoOrder) -> &mut Self {
1208 self.headers_pseudo_order = Some(order);
1209 self
1210 }
1211
1212 /// Sets PRIORITY flag parameters for HEADERS frames.
1213 ///
1214 /// When set, HEADERS frames will include priority data.
1215 /// None = no PRIORITY, Some((weight, dep, exclusive)) = PRIORITY with those values.
1216 pub fn headers_priority(&mut self, data: Option<(u8, u32, bool)>) -> &mut Self {
1217 self.headers_priority = data;
1218 self
1219 }
1220
1221 /// Sets the HTTP/2 regular header ordering for browser fingerprinting.
1222 ///
1223 /// When set, headers are encoded in the specified order instead of hash-based order.
1224 pub fn headers_order(&mut self, order: Vec<http::HeaderName>) -> &mut Self {
1225 self.headers_order = Some(order);
1226 self
1227 }
1228
1229 /// Creates a new configured HTTP/2 client backed by `io`.
1230 ///
1231 /// It is expected that `io` already be in an appropriate state to commence
1232 /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1233 /// preface and the initial settings frame is sent by the client.
1234 ///
1235 /// The handshake future does not wait for the initial settings frame from the
1236 /// server.
1237 ///
1238 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1239 /// tuple once the HTTP/2 handshake has been completed.
1240 ///
1241 /// This function also allows the caller to configure the send payload data
1242 /// type. See [Outbound data type] for more details.
1243 ///
1244 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1245 /// [`Connection`]: struct.Connection.html
1246 /// [`SendRequest`]: struct.SendRequest.html
1247 /// [Outbound data type]: ../index.html#outbound-data-type.
1248 ///
1249 /// # Examples
1250 ///
1251 /// Basic usage:
1252 ///
1253 /// ```
1254 /// # use tokio::io::{AsyncRead, AsyncWrite};
1255 /// # use h2::client::*;
1256 /// # use bytes::Bytes;
1257 /// #
1258 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1259 /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1260 /// # {
1261 /// // `client_fut` is a future representing the completion of the HTTP/2
1262 /// // handshake.
1263 /// let client_fut = Builder::new()
1264 /// .handshake(my_io);
1265 /// # client_fut.await
1266 /// # }
1267 /// #
1268 /// # pub fn main() {}
1269 /// ```
1270 ///
1271 /// Configures the send-payload data type. In this case, the outbound data
1272 /// type will be `&'static [u8]`.
1273 ///
1274 /// ```
1275 /// # use tokio::io::{AsyncRead, AsyncWrite};
1276 /// # use h2::client::*;
1277 /// #
1278 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1279 /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1280 /// # {
1281 /// // `client_fut` is a future representing the completion of the HTTP/2
1282 /// // handshake.
1283 /// let client_fut = Builder::new()
1284 /// .handshake::<_, &'static [u8]>(my_io);
1285 /// # client_fut.await
1286 /// # }
1287 /// #
1288 /// # pub fn main() {}
1289 /// ```
1290 pub fn handshake<T, B>(
1291 &self,
1292 io: T,
1293 ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1294 where
1295 T: AsyncRead + AsyncWrite + Unpin,
1296 B: Buf,
1297 {
1298 Connection::handshake2(io, self.clone())
1299 }
1300}
1301
1302impl Default for Builder {
1303 fn default() -> Builder {
1304 Builder::new()
1305 }
1306}
1307
1308/// Creates a new configured HTTP/2 client with default configuration
1309/// values backed by `io`.
1310///
1311/// It is expected that `io` already be in an appropriate state to commence
1312/// the [HTTP/2 handshake]. See [Handshake] for more details.
1313///
1314/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1315/// tuple once the HTTP/2 handshake has been completed. The returned
1316/// [`Connection`] instance will be using default configuration values. Use
1317/// [`Builder`] to customize the configuration values used by a [`Connection`]
1318/// instance.
1319///
1320/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1321/// [Handshake]: ../index.html#handshake
1322/// [`Connection`]: struct.Connection.html
1323/// [`SendRequest`]: struct.SendRequest.html
1324///
1325/// # Examples
1326///
1327/// ```
1328/// # use tokio::io::{AsyncRead, AsyncWrite};
1329/// # use h2::client;
1330/// # use h2::client::*;
1331/// #
1332/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1333/// # {
1334/// let (send_request, connection) = client::handshake(my_io).await?;
1335/// // The HTTP/2 handshake has completed, now start polling
1336/// // `connection` and use `send_request` to send requests to the
1337/// // server.
1338/// # Ok(())
1339/// # }
1340/// #
1341/// # pub fn main() {}
1342/// ```
1343pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1344where
1345 T: AsyncRead + AsyncWrite + Unpin,
1346{
1347 let builder = Builder::new();
1348 builder
1349 .handshake(io)
1350 .instrument(tracing::trace_span!("client_handshake"))
1351 .await
1352}
1353
1354// ===== impl Connection =====
1355
1356async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1357where
1358 T: AsyncRead + AsyncWrite + Unpin,
1359{
1360 tracing::debug!("binding client connection");
1361
1362 let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1363 io.write_all(msg).await.map_err(crate::Error::from_io)?;
1364
1365 tracing::debug!("client connection bound");
1366
1367 Ok(())
1368}
1369
1370impl<T, B> Connection<T, B>
1371where
1372 T: AsyncRead + AsyncWrite + Unpin,
1373 B: Buf,
1374{
1375 async fn handshake2(
1376 mut io: T,
1377 builder: Builder,
1378 ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1379 bind_connection(&mut io).await?;
1380
1381 // Create the codec
1382 let mut codec = Codec::new(io);
1383
1384 tracing::debug!(
1385 "handshake2: settings header_table_size={:?}, enable_push={:?}, initial_window_size={:?}, max_frame_size={:?}, max_header_list_size={:?}",
1386 builder.settings.header_table_size(),
1387 builder.settings.is_push_enabled(),
1388 builder.settings.initial_window_size(),
1389 builder.settings.max_frame_size(),
1390 builder.settings.max_header_list_size(),
1391 );
1392
1393 if let Some(max) = builder.settings.max_frame_size() {
1394 codec.set_max_recv_frame_size(max as usize);
1395 }
1396
1397 if let Some(max) = builder.settings.max_header_list_size() {
1398 codec.set_max_recv_header_list_size(max as usize);
1399 }
1400
1401 // Send initial settings frame
1402 tracing::debug!("handshake2: sending initial settings frame");
1403 codec
1404 .buffer(builder.settings.clone().into())
1405 .expect("invalid SETTINGS frame");
1406
1407 let inner = proto::Connection::new(
1408 codec,
1409 proto::Config {
1410 next_stream_id: builder.stream_id,
1411 initial_max_send_streams: builder.initial_max_send_streams,
1412 max_send_buffer_size: builder.max_send_buffer_size,
1413 reset_stream_duration: builder.reset_stream_duration,
1414 reset_stream_max: builder.reset_stream_max,
1415 remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1416 local_error_reset_streams_max: builder.local_max_error_reset_streams,
1417 settings: builder.settings.clone(),
1418 headers_pseudo_order: builder.headers_pseudo_order.clone(),
1419 headers_priority: builder.headers_priority,
1420 headers_order: builder.headers_order.clone(),
1421 },
1422 );
1423 let send_request = SendRequest {
1424 inner: inner.streams().clone(),
1425 pending: None,
1426 };
1427
1428 let mut connection = Connection { inner };
1429 if let Some(sz) = builder.initial_target_connection_window_size {
1430 connection.set_target_window_size(sz);
1431 }
1432
1433 Ok((send_request, connection))
1434 }
1435
1436 /// Sets the target window size for the whole connection.
1437 ///
1438 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1439 /// frame will be immediately sent to the remote, increasing the connection
1440 /// level window by `size - current_value`.
1441 ///
1442 /// If `size` is less than the current value, nothing will happen
1443 /// immediately. However, as window capacity is released by
1444 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1445 /// out until the number of "in flight" bytes drops below `size`.
1446 ///
1447 /// The default value is 65,535.
1448 ///
1449 /// See [`FlowControl`] documentation for more details.
1450 ///
1451 /// [`FlowControl`]: ../struct.FlowControl.html
1452 /// [library level]: ../index.html#flow-control
1453 pub fn set_target_window_size(&mut self, size: u32) {
1454 assert!(size <= proto::MAX_WINDOW_SIZE);
1455 self.inner.set_target_window_size(size);
1456 }
1457
1458 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1459 /// flow control for received data.
1460 ///
1461 /// The `SETTINGS` will be sent to the remote, and only applied once the
1462 /// remote acknowledges the change.
1463 ///
1464 /// This can be used to increase or decrease the window size for existing
1465 /// streams.
1466 ///
1467 /// # Errors
1468 ///
1469 /// Returns an error if a previous call is still pending acknowledgement
1470 /// from the remote endpoint.
1471 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1472 assert!(size <= proto::MAX_WINDOW_SIZE);
1473 self.inner.set_initial_window_size(size)?;
1474 Ok(())
1475 }
1476
1477 /// Takes a `PingPong` instance from the connection.
1478 ///
1479 /// # Note
1480 ///
1481 /// This may only be called once. Calling multiple times will return `None`.
1482 pub fn ping_pong(&mut self) -> Option<PingPong> {
1483 self.inner.take_user_pings().map(PingPong::new)
1484 }
1485
1486 /// Returns the maximum number of concurrent streams that may be initiated
1487 /// by this client.
1488 ///
1489 /// This limit is configured by the server peer by sending the
1490 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1491 /// This method returns the currently acknowledged value received from the
1492 /// remote.
1493 ///
1494 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1495 pub fn max_concurrent_send_streams(&self) -> usize {
1496 self.inner.max_send_streams()
1497 }
1498 /// Returns the maximum number of concurrent streams that may be initiated
1499 /// by the server on this connection.
1500 ///
1501 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1502 /// parameter][1] sent in a `SETTINGS` frame that has been
1503 /// acknowledged by the remote peer. The value to be sent is configured by
1504 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1505 /// with the remote peer.
1506 ///
1507 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1508 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1509 pub fn max_concurrent_recv_streams(&self) -> usize {
1510 self.inner.max_recv_streams()
1511 }
1512}
1513
1514impl<T, B> Future for Connection<T, B>
1515where
1516 T: AsyncRead + AsyncWrite + Unpin,
1517 B: Buf,
1518{
1519 type Output = Result<(), crate::Error>;
1520
1521 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1522 self.inner.maybe_close_connection_if_no_streams();
1523 let had_streams_or_refs = self.inner.has_streams_or_other_references();
1524 let result = self.inner.poll(cx).map_err(Into::into);
1525 // if we had streams/refs, and don't anymore, wake up one more time to
1526 // ensure proper shutdown
1527 if result.is_pending()
1528 && had_streams_or_refs
1529 && !self.inner.has_streams_or_other_references()
1530 {
1531 tracing::trace!("last stream closed during poll, wake again");
1532 cx.waker().wake_by_ref();
1533 }
1534 result
1535 }
1536}
1537
1538impl<T, B> fmt::Debug for Connection<T, B>
1539where
1540 T: AsyncRead + AsyncWrite,
1541 T: fmt::Debug,
1542 B: fmt::Debug + Buf,
1543{
1544 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1545 fmt::Debug::fmt(&self.inner, fmt)
1546 }
1547}
1548
1549// ===== impl ResponseFuture =====
1550
1551impl Future for ResponseFuture {
1552 type Output = Result<Response<RecvStream>, crate::Error>;
1553
1554 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1555 let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1556 let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1557
1558 Poll::Ready(Ok(Response::from_parts(parts, body)))
1559 }
1560}
1561
1562impl ResponseFuture {
1563 /// Returns the stream ID of the response stream.
1564 ///
1565 /// # Panics
1566 ///
1567 /// If the lock on the stream store has been poisoned.
1568 pub fn stream_id(&self) -> crate::StreamId {
1569 crate::StreamId::from_internal(self.inner.stream_id())
1570 }
1571
1572 /// Polls for informational responses (1xx status codes).
1573 ///
1574 /// This method should be called before polling the main response future
1575 /// to check for any informational responses that have been received.
1576 ///
1577 /// Returns `Poll::Ready(Some(response))` if an informational response is available,
1578 /// `Poll::Ready(None)` if no more informational responses are expected,
1579 /// or `Poll::Pending` if no informational response is currently available.
1580 pub fn poll_informational(
1581 &mut self,
1582 cx: &mut Context<'_>,
1583 ) -> Poll<Option<Result<Response<()>, crate::Error>>> {
1584 self.inner.poll_informational(cx).map_err(Into::into)
1585 }
1586
1587 /// Returns a stream of PushPromises
1588 ///
1589 /// # Panics
1590 ///
1591 /// If this method has been called before
1592 /// or the stream was itself was pushed
1593 pub fn push_promises(&mut self) -> PushPromises {
1594 if self.push_promise_consumed {
1595 panic!("Reference to push promises stream taken!");
1596 }
1597 self.push_promise_consumed = true;
1598 PushPromises {
1599 inner: self.inner.clone(),
1600 }
1601 }
1602}
1603
1604// ===== impl PushPromises =====
1605
1606impl PushPromises {
1607 /// Get the next `PushPromise`.
1608 pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1609 crate::poll_fn(move |cx| self.poll_push_promise(cx)).await
1610 }
1611
1612 #[doc(hidden)]
1613 pub fn poll_push_promise(
1614 &mut self,
1615 cx: &mut Context<'_>,
1616 ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1617 match self.inner.poll_pushed(cx) {
1618 Poll::Ready(Some(Ok((request, response)))) => {
1619 let response = PushedResponseFuture {
1620 inner: ResponseFuture {
1621 inner: response,
1622 push_promise_consumed: false,
1623 },
1624 };
1625 Poll::Ready(Some(Ok(PushPromise { request, response })))
1626 }
1627 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1628 Poll::Ready(None) => Poll::Ready(None),
1629 Poll::Pending => Poll::Pending,
1630 }
1631 }
1632}
1633
1634#[cfg(feature = "stream")]
1635impl futures_core::Stream for PushPromises {
1636 type Item = Result<PushPromise, crate::Error>;
1637
1638 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1639 self.poll_push_promise(cx)
1640 }
1641}
1642
1643// ===== impl PushPromise =====
1644
1645impl PushPromise {
1646 /// Returns a reference to the push promise's request headers.
1647 pub fn request(&self) -> &Request<()> {
1648 &self.request
1649 }
1650
1651 /// Returns a mutable reference to the push promise's request headers.
1652 pub fn request_mut(&mut self) -> &mut Request<()> {
1653 &mut self.request
1654 }
1655
1656 /// Consumes `self`, returning the push promise's request headers and
1657 /// response future.
1658 pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1659 (self.request, self.response)
1660 }
1661}
1662
1663// ===== impl PushedResponseFuture =====
1664
1665impl Future for PushedResponseFuture {
1666 type Output = Result<Response<RecvStream>, crate::Error>;
1667
1668 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1669 Pin::new(&mut self.inner).poll(cx)
1670 }
1671}
1672
1673impl PushedResponseFuture {
1674 /// Returns the stream ID of the response stream.
1675 ///
1676 /// # Panics
1677 ///
1678 /// If the lock on the stream store has been poisoned.
1679 pub fn stream_id(&self) -> crate::StreamId {
1680 self.inner.stream_id()
1681 }
1682}
1683
1684// ===== impl Peer =====
1685
1686impl Peer {
1687 pub fn convert_send_message(
1688 id: StreamId,
1689 request: Request<()>,
1690 protocol: Option<Protocol>,
1691 end_of_stream: bool,
1692 pseudo_order: Option<crate::frame::PseudoOrder>,
1693 ) -> Result<Headers, SendError> {
1694 use http::request::Parts;
1695
1696 let (
1697 Parts {
1698 method,
1699 uri,
1700 headers,
1701 version,
1702 ..
1703 },
1704 _,
1705 ) = request.into_parts();
1706
1707 let is_connect = method == Method::CONNECT;
1708
1709 // Build the set pseudo header set. All requests will include `method`
1710 // and `path`.
1711 let mut pseudo = Pseudo::request(method, uri, protocol, pseudo_order);
1712
1713 if pseudo.scheme.is_none() {
1714 // If the scheme is not set, then there are a two options.
1715 //
1716 // 1) Authority is not set. In this case, a request was issued with
1717 // a relative URI. This is permitted **only** when forwarding
1718 // HTTP 1.x requests. If the HTTP version is set to 2.0, then
1719 // this is an error.
1720 //
1721 // 2) Authority is set, then the HTTP method *must* be CONNECT.
1722 //
1723 // It is not possible to have a scheme but not an authority set (the
1724 // `http` crate does not allow it).
1725 //
1726 if pseudo.authority.is_none() {
1727 if version == Version::HTTP_2 {
1728 return Err(UserError::MissingUriSchemeAndAuthority.into());
1729 } else {
1730 // This is acceptable as per the above comment. However,
1731 // HTTP/2 requires that a scheme is set. Since we are
1732 // forwarding an HTTP 1.1 request, the scheme is set to
1733 // "http".
1734 pseudo.set_scheme(uri::Scheme::HTTP);
1735 }
1736 } else if !is_connect {
1737 // TODO: Error
1738 }
1739 }
1740
1741 // Create the HEADERS frame
1742 let mut frame = Headers::new(id, pseudo, headers);
1743
1744 if end_of_stream {
1745 frame.set_end_stream()
1746 }
1747
1748 Ok(frame)
1749 }
1750}
1751
1752impl proto::Peer for Peer {
1753 type Poll = Response<()>;
1754
1755 const NAME: &'static str = "Client";
1756
1757 fn r#dyn() -> proto::DynPeer {
1758 proto::DynPeer::Client
1759 }
1760
1761 /*
1762 fn is_server() -> bool {
1763 false
1764 }
1765 */
1766
1767 fn convert_poll_message(
1768 pseudo: Pseudo,
1769 fields: HeaderMap,
1770 stream_id: StreamId,
1771 ) -> Result<Self::Poll, Error> {
1772 let mut b = Response::builder();
1773
1774 b = b.version(Version::HTTP_2);
1775
1776 if let Some(status) = pseudo.status {
1777 b = b.status(status);
1778 }
1779
1780 let mut response = match b.body(()) {
1781 Ok(response) => response,
1782 Err(_) => {
1783 // TODO: Should there be more specialized handling for different
1784 // kinds of errors
1785 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1786 }
1787 };
1788
1789 *response.headers_mut() = fields;
1790
1791 Ok(response)
1792 }
1793}