monoio_http/h2/client.rs
1//! Client implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 client requires the caller to establish the underlying
6//! connection as well as get the connection to a state that is ready to begin
7//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Monoio's [`TcpStream`] to connect to a remote
11//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12//!
13//! Once a connection is obtained, it is passed to [`handshake`], which will
14//! begin the [HTTP/2 handshake]. This returns a future that completes once
15//! the handshake process is performed and HTTP/2 streams may be initialized.
16//!
17//! [`handshake`] uses default configuration values. There are a number of
18//! settings that can be changed by using [`Builder`] instead.
19//!
20//! Once the handshake future completes, the caller is provided with a
21//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22//! instance is used to drive the connection (see [Managing the connection]).
23//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24//! requests]).
25//!
26//! # Making requests
27//!
28//! Requests are made using the [`SendRequest`] handle provided by the handshake
29//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30//! the request is sent to the server.
31//!
32//! A request body and request trailers are sent using [`SendRequest`] and the
33//! server's response is returned once the [`ResponseFuture`] future completes.
34//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36//! initialized by the sent request.
37//!
38//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39//! stream can be created, i.e. as long as the current number of active streams
40//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41//! caller will be notified once an existing stream closes, freeing capacity for
42//! the caller. The caller should use [`SendRequest::poll_ready`] to check for
43//! capacity before sending a request to the server.
44//!
45//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47//! to do so will result in an [`Error`] being returned.
48//!
49//! # Managing the connection
50//!
51//! The [`Connection`] instance is used to manage connection state. The caller
52//! is required to call [`Connection::poll`] in order to advance state.
53//! [`SendRequest::send_request`] and other functions have no effect unless
54//! [`Connection::poll`] is called.
55//!
56//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57//! returns `Ready`. At this point, the underlying socket has been closed and no
58//! further work needs to be done.
59//!
60//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61//! submit the [`Connection`] instance to an [executor]. The executor will then
62//! manage polling the connection until the connection is complete.
63//! Alternatively, the caller can call `poll` manually.
64//!
65//! # Example
66//!
67//! ```rust, no_run
68//! use std::error::Error;
69//!
70//! use h2::client;
71//! use http::{Method, Request};
72//! use monoio::net::TcpStream;
73//!
74//! #[monoio::main]
75//! pub async fn main() -> Result<(), Box<dyn Error>> {
76//! // Establish TCP connection to the server.
77//! let tcp = TcpStream::connect("127.0.0.1:5928").await?;
78//! let (h2, connection) = client::handshake(tcp).await?;
79//! monoio::spawn(async move {
80//! connection.await.unwrap();
81//! });
82//!
83//! let mut h2 = h2.ready().await?;
84//! // Prepare the HTTP request to send to the server.
85//! let request = Request::builder()
86//! .method(Method::GET)
87//! .uri("https://www.example.com/")
88//! .body(())
89//! .unwrap();
90//!
91//! // Send the request. The second tuple item allows the caller
92//! // to stream a request body.
93//! let (response, _) = h2.send_request(request, true).unwrap();
94//!
95//! let (head, mut body) = response.await?.into_parts();
96//!
97//! println!("Received response: {:?}", head);
98//!
99//! // The `flow_control` handle allows the caller to manage
100//! // flow control.
101//! //
102//! // Whenever data is received, the caller is responsible for
103//! // releasing capacity back to the server once it has freed
104//! // the data from memory.
105//! let mut flow_control = body.flow_control().clone();
106//!
107//! while let Some(chunk) = body.data().await {
108//! let chunk = chunk?;
109//! println!("RX: {:?}", chunk);
110//!
111//! // Let the server send more data.
112//! let _ = flow_control.release_capacity(chunk.len());
113//! }
114//!
115//! Ok(())
116//! }
117//! ```
118//!
119//! [`TcpStream`]: https://docs.rs/monoio/latest/monoio/net/tcp/struct.TcpStream.html
120//! [`handshake`]: fn.handshake.html
121//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
122//! [`SendRequest`]: struct.SendRequest.html
123//! [`SendStream`]: ../struct.SendStream.html
124//! [Making requests]: #making-requests
125//! [Managing the connection]: #managing-the-connection
126//! [`Connection`]: struct.Connection.html
127//! [`Connection::poll`]: struct.Connection.html#method.poll
128//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
129//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
130//! [`SendRequest`]: struct.SendRequest.html
131//! [`ResponseFuture`]: struct.ResponseFuture.html
132//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
133//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
134//! [`Builder`]: struct.Builder.html
135//! [`Error`]: ../struct.Error.html
136
137use std::{
138 fmt,
139 future::Future,
140 pin::Pin,
141 task::{Context, Poll},
142 time::Duration,
143};
144
145use bytes::{Buf, Bytes};
146use http::{uri, HeaderMap, Method, Request, Response, Version};
147use monoio::io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt};
148use tracing::Instrument;
149
150use crate::{
151 common::error::HttpError,
152 h2::{
153 codec::{Codec, SendError, UserError},
154 ext::Protocol,
155 frame::{Headers, Pseudo, Reason, Settings, StreamId},
156 proto::{self, Error},
157 FlowControl, PingPong, RecvStream, SendStream,
158 },
159};
160
161/// Initializes new HTTP/2 streams on a connection by sending a request.
162///
163/// This type does no work itself. Instead, it is a handle to the inner
164/// connection state held by [`Connection`]. If the associated connection
165/// instance is dropped, all `SendRequest` functions will return [`Error`].
166///
167/// [`SendRequest`] instances are able to move to and operate on separate tasks
168/// / threads than their associated [`Connection`] instance. Internally, there
169/// is a buffer used to stage requests before they get written to the
170/// connection. There is no guarantee that requests get written to the
171/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
172///
173/// [`SendRequest`] implements [`Clone`], enabling the creation of many
174/// instances that are backed by a single connection.
175///
176/// See [module] level documentation for more details.
177///
178/// [module]: index.html
179/// [`Connection`]: struct.Connection.html
180/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
181/// [`Error`]: ../struct.Error.html
182pub struct SendRequest<B: Buf> {
183 inner: proto::Streams<B, Peer>,
184 pending: Option<proto::OpaqueStreamRef>,
185}
186
187/// Returns a `SendRequest` instance once it is ready to send at least one
188/// request.
189#[derive(Debug)]
190pub struct ReadySendRequest<B: Buf> {
191 inner: Option<SendRequest<B>>,
192}
193
194/// Manages all state associated with an HTTP/2 client connection.
195///
196/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
197/// implements the HTTP/2 client logic for that connection. It is responsible
198/// for driving the internal state forward, performing the work requested of the
199/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
200/// [`RecvStream`]).
201///
202/// `Connection` values are created by calling [`handshake`]. Once a
203/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
204/// until `Ready` is returned. The easiest way to do this is to submit the
205/// `Connection` instance to an [executor].
206///
207/// [module]: index.html
208/// [`handshake`]: fn.handshake.html
209/// [`SendRequest`]: struct.SendRequest.html
210/// [`ResponseFuture`]: struct.ResponseFuture.html
211/// [`SendStream`]: ../struct.SendStream.html
212/// [`RecvStream`]: ../struct.RecvStream.html
213/// [`poll`]: #method.poll
214/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
215///
216/// # Examples
217///
218/// ```
219/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
220/// # use h2::client;
221/// # use h2::client::*;
222/// #
223/// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
224/// # where T: AsyncReadRent + AsyncWriteRent + Send + Unpin + 'static,
225/// # {
226/// let (send_request, connection) = client::handshake(my_io).await?;
227/// // Submit the connection handle to an executor.
228/// monoio::spawn(async {
229/// connection.await.expect("connection failed");
230/// });
231///
232/// // Now, use `send_request` to initialize HTTP/2 streams.
233/// // ...
234/// # Ok(())
235/// # }
236/// #
237/// # pub fn main() {}
238/// ```
239#[must_use = "futures do nothing unless polled"]
240pub struct Connection<T, B: Buf = Bytes> {
241 inner: proto::Connection<T, Peer, B>,
242}
243
244/// A future of an HTTP response.
245#[derive(Debug)]
246#[must_use = "futures do nothing unless polled"]
247pub struct ResponseFuture {
248 inner: proto::OpaqueStreamRef,
249 push_promise_consumed: bool,
250}
251
252/// A future of a pushed HTTP response.
253///
254/// We have to differentiate between pushed and non pushed because of the spec
255/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
256/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
257/// > that is in either the "open" or "half-closed (remote)" state.
258#[derive(Debug)]
259#[must_use = "futures do nothing unless polled"]
260pub struct PushedResponseFuture {
261 inner: ResponseFuture,
262}
263
264/// A pushed response and corresponding request headers
265#[derive(Debug)]
266pub struct PushPromise {
267 /// The request headers
268 request: Request<()>,
269
270 /// The pushed response
271 response: PushedResponseFuture,
272}
273
274/// A stream of pushed responses and corresponding promised requests
275#[derive(Debug)]
276#[must_use = "streams do nothing unless polled"]
277pub struct PushPromises {
278 inner: proto::OpaqueStreamRef,
279}
280
281/// Builds client connections with custom configuration values.
282///
283/// Methods can be chained in order to set the configuration values.
284///
285/// The client is constructed by calling [`handshake`] and passing the I/O
286/// handle that will back the HTTP/2 server.
287///
288/// New instances of `Builder` are obtained via [`Builder::new`].
289///
290/// See function level documentation for details on the various client
291/// configuration settings.
292///
293/// [`Builder::new`]: struct.Builder.html#method.new
294/// [`handshake`]: struct.Builder.html#method.handshake
295///
296/// # Examples
297///
298/// ```
299/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
300/// # use h2::client::*;
301/// # use bytes::Bytes;
302/// #
303/// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
304/// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
305/// # {
306/// // `client_fut` is a future representing the completion of the HTTP/2
307/// // handshake.
308/// let client_fut = Builder::new()
309/// .initial_window_size(1_000_000)
310/// .max_concurrent_streams(1000)
311/// .handshake(my_io);
312/// # client_fut.await
313/// # }
314/// #
315/// # pub fn main() {}
316/// ```
317#[derive(Clone, Debug)]
318pub struct Builder {
319 /// Time to keep locally reset streams around before reaping.
320 reset_stream_duration: Duration,
321
322 /// Initial maximum number of locally initiated (send) streams.
323 /// After receiving a Settings frame from the remote peer,
324 /// the connection will overwrite this value with the
325 /// MAX_CONCURRENT_STREAMS specified in the frame.
326 initial_max_send_streams: usize,
327
328 /// Initial target window size for new connections.
329 initial_target_connection_window_size: Option<u32>,
330
331 /// Maximum amount of bytes to "buffer" for writing per stream.
332 max_send_buffer_size: usize,
333
334 /// Maximum number of locally reset streams to keep at a time.
335 reset_stream_max: usize,
336
337 /// Initial `Settings` frame to send as part of the handshake.
338 settings: Settings,
339
340 /// The stream ID of the first (lowest) stream. Subsequent streams will use
341 /// monotonically increasing stream IDs.
342 stream_id: StreamId,
343}
344
345#[derive(Debug)]
346pub(crate) struct Peer;
347
348// ===== impl SendRequest =====
349
350impl<B> SendRequest<B>
351where
352 B: Buf,
353{
354 /// Returns `Ready` when the connection can initialize a new HTTP/2
355 /// stream.
356 ///
357 /// This function must return `Ready` before `send_request` is called. When
358 /// `Poll::Pending` is returned, the task will be notified once the readiness
359 /// state changes.
360 ///
361 /// See [module] level docs for more details.
362 ///
363 /// [module]: index.html
364 pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::h2::Error>> {
365 ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
366 self.pending = None;
367 Poll::Ready(Ok(()))
368 }
369
370 /// Consumes `self`, returning a future that returns `self` back once it is
371 /// ready to send a request.
372 ///
373 /// This function should be called before calling `send_request`.
374 ///
375 /// This is a functional combinator for [`poll_ready`]. The returned future
376 /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
377 /// the caller.
378 ///
379 /// # Examples
380 ///
381 /// ```rust
382 /// # use h2::client::*;
383 /// # use http::*;
384 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
385 /// # {
386 /// // First, wait until the `send_request` handle is ready to send a new
387 /// // request
388 /// let mut send_request = send_request.ready().await.unwrap();
389 /// // Use `send_request` here.
390 /// # }
391 /// # pub fn main() {}
392 /// ```
393 ///
394 /// See [module] level docs for more details.
395 ///
396 /// [`poll_ready`]: #method.poll_ready
397 /// [module]: index.html
398 pub fn ready(self) -> ReadySendRequest<B> {
399 ReadySendRequest { inner: Some(self) }
400 }
401
402 /// Sends a HTTP/2 request to the server.
403 ///
404 /// `send_request` initializes a new HTTP/2 stream on the associated
405 /// connection, then sends the given request using this new stream. Only the
406 /// request head is sent.
407 ///
408 /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
409 /// are returned. The [`ResponseFuture`] instance is used to get the
410 /// server's response and the [`SendStream`] instance is used to send a
411 /// request body or trailers to the server over the same HTTP/2 stream.
412 ///
413 /// To send a request body or trailers, set `end_of_stream` to `false`.
414 /// Then, use the returned [`SendStream`] instance to stream request body
415 /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
416 /// then attempting to call [`SendStream::send_data`] or
417 /// [`SendStream::send_trailers`] will result in an error.
418 ///
419 /// If no request body or trailers are to be sent, set `end_of_stream` to
420 /// `true` and drop the returned [`SendStream`] instance.
421 ///
422 /// # A note on HTTP versions
423 ///
424 /// The provided `Request` will be encoded differently depending on the
425 /// value of its version field. If the version is set to 2.0, then the
426 /// request is encoded as per the specification recommends.
427 ///
428 /// If the version is set to a lower value, then the request is encoded to
429 /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
430 /// headers are permitted and the `:authority` pseudo header is not
431 /// included.
432 ///
433 /// The caller should always set the request's version field to 2.0 unless
434 /// specifically transmitting an HTTP 1.1 request over 2.0.
435 ///
436 /// # Examples
437 ///
438 /// Sending a request with no body
439 ///
440 /// ```rust
441 /// # use h2::client::*;
442 /// # use http::*;
443 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
444 /// # {
445 /// // First, wait until the `send_request` handle is ready to send a new
446 /// // request
447 /// let mut send_request = send_request.ready().await.unwrap();
448 /// // Prepare the HTTP request to send to the server.
449 /// let request = Request::get("https://www.example.com/").body(()).unwrap();
450 ///
451 /// // Send the request to the server. Since we are not sending a
452 /// // body or trailers, we can drop the `SendStream` instance.
453 /// let (response, _) = send_request.send_request(request, true).unwrap();
454 /// let response = response.await.unwrap();
455 /// // Process the response
456 /// # }
457 /// # pub fn main() {}
458 /// ```
459 ///
460 /// Sending a request with a body and trailers
461 ///
462 /// ```rust
463 /// # use h2::client::*;
464 /// # use http::*;
465 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
466 /// # {
467 /// // First, wait until the `send_request` handle is ready to send a new
468 /// // request
469 /// let mut send_request = send_request.ready().await.unwrap();
470 ///
471 /// // Prepare the HTTP request to send to the server.
472 /// let request = Request::get("https://www.example.com/").body(()).unwrap();
473 ///
474 /// // Send the request to the server. If we are not sending a
475 /// // body or trailers, we can drop the `SendStream` instance.
476 /// let (response, mut send_stream) = send_request.send_request(request, false).unwrap();
477 ///
478 /// // At this point, one option would be to wait for send capacity.
479 /// // Doing so would allow us to not hold data in memory that
480 /// // cannot be sent. However, this is not a requirement, so this
481 /// // example will skip that step. See `SendStream` documentation
482 /// // for more details.
483 /// send_stream.send_data(b"hello", false).unwrap();
484 /// send_stream.send_data(b"world", false).unwrap();
485 ///
486 /// // Send the trailers.
487 /// let mut trailers = HeaderMap::new();
488 /// trailers.insert(
489 /// header::HeaderName::from_bytes(b"my-trailer").unwrap(),
490 /// header::HeaderValue::from_bytes(b"hello").unwrap(),
491 /// );
492 ///
493 /// send_stream.send_trailers(trailers).unwrap();
494 ///
495 /// let response = response.await.unwrap();
496 /// // Process the response
497 /// # }
498 /// # pub fn main() {}
499 /// ```
500 ///
501 /// [`ResponseFuture`]: struct.ResponseFuture.html
502 /// [`SendStream`]: ../struct.SendStream.html
503 /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
504 /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
505 pub fn send_request(
506 &mut self,
507 request: Request<()>,
508 end_of_stream: bool,
509 ) -> Result<(ResponseFuture, SendStream<B>), crate::h2::Error> {
510 self.inner
511 .send_request(request, end_of_stream, self.pending.as_ref())
512 .map_err(Into::into)
513 .map(|stream| {
514 if stream.is_pending_open() {
515 self.pending = Some(stream.clone_to_opaque());
516 }
517
518 let response = ResponseFuture {
519 inner: stream.clone_to_opaque(),
520 push_promise_consumed: false,
521 };
522
523 let stream = SendStream::new(stream);
524
525 (response, stream)
526 })
527 }
528
529 /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
530 ///
531 /// This setting is configured by the server peer by sending the
532 /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
533 /// This method returns the currently acknowledged value received from the
534 /// remote.
535 ///
536 /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
537 /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
538 pub fn is_extended_connect_protocol_enabled(&self) -> bool {
539 self.inner.is_extended_connect_protocol_enabled()
540 }
541
542 pub fn has_conn_error(&self) -> bool {
543 self.inner.has_conn_error()
544 }
545
546 pub fn conn_error(&self) -> Option<HttpError> {
547 self.inner.conn_error().map(HttpError::from)
548 }
549}
550
551impl<B> fmt::Debug for SendRequest<B>
552where
553 B: Buf,
554{
555 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
556 fmt.debug_struct("SendRequest").finish()
557 }
558}
559
560impl<B> Clone for SendRequest<B>
561where
562 B: Buf,
563{
564 fn clone(&self) -> Self {
565 SendRequest {
566 inner: self.inner.clone(),
567 pending: None,
568 }
569 }
570}
571
572impl<B> SendRequest<B>
573where
574 B: Buf,
575{
576 /// Returns the number of active streams.
577 ///
578 /// An active stream is a stream that has not yet transitioned to a closed
579 /// state.
580 pub fn num_active_streams(&self) -> usize {
581 self.inner.num_active_streams()
582 }
583
584 /// Returns the number of streams that are held in memory.
585 ///
586 /// A wired stream is a stream that is either active or is closed but must
587 /// stay in memory for some reason. For example, there are still outstanding
588 /// userspace handles pointing to the slot.
589 pub fn num_wired_streams(&self) -> usize {
590 self.inner.num_wired_streams()
591 }
592}
593
594// ===== impl ReadySendRequest =====
595
596impl<B> Future for ReadySendRequest<B>
597where
598 B: Buf,
599{
600 type Output = Result<SendRequest<B>, crate::h2::Error>;
601
602 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
603 match &mut self.inner {
604 Some(send_request) => {
605 ready!(send_request.poll_ready(cx))?;
606 }
607 None => panic!("called `poll` after future completed"),
608 }
609
610 Poll::Ready(Ok(self.inner.take().unwrap()))
611 }
612}
613
614// ===== impl Builder =====
615
616impl Builder {
617 /// Returns a new client builder instance initialized with default
618 /// configuration values.
619 ///
620 /// Configuration methods can be chained on the return value.
621 ///
622 /// # Examples
623 ///
624 /// ```
625 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
626 /// # use h2::client::*;
627 /// # use bytes::Bytes;
628 /// #
629 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
630 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
631 /// # {
632 /// // `client_fut` is a future representing the completion of the HTTP/2
633 /// // handshake.
634 /// let client_fut = Builder::new()
635 /// .initial_window_size(1_000_000)
636 /// .max_concurrent_streams(1000)
637 /// .handshake(my_io);
638 /// # client_fut.await
639 /// # }
640 /// #
641 /// # pub fn main() {}
642 /// ```
643 pub fn new() -> Builder {
644 Builder {
645 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
646 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
647 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
648 initial_target_connection_window_size: None,
649 initial_max_send_streams: usize::MAX,
650 settings: Default::default(),
651 stream_id: 1.into(),
652 }
653 }
654
655 /// Indicates the initial window size (in octets) for stream-level
656 /// flow control for received data.
657 ///
658 /// The initial window of a stream is used as part of flow control. For more
659 /// details, see [`FlowControl`].
660 ///
661 /// The default value is 65,535.
662 ///
663 /// [`FlowControl`]: ../struct.FlowControl.html
664 ///
665 /// # Examples
666 ///
667 /// ```
668 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
669 /// # use h2::client::*;
670 /// # use bytes::Bytes;
671 /// #
672 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
673 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
674 /// # {
675 /// // `client_fut` is a future representing the completion of the HTTP/2
676 /// // handshake.
677 /// let client_fut = Builder::new()
678 /// .initial_window_size(1_000_000)
679 /// .handshake(my_io);
680 /// # client_fut.await
681 /// # }
682 /// #
683 /// # pub fn main() {}
684 /// ```
685 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
686 self.settings.set_initial_window_size(Some(size));
687 self
688 }
689
690 /// Indicates the initial window size (in octets) for connection-level flow control
691 /// for received data.
692 ///
693 /// The initial window of a connection is used as part of flow control. For more details,
694 /// see [`FlowControl`].
695 ///
696 /// The default value is 65,535.
697 ///
698 /// [`FlowControl`]: ../struct.FlowControl.html
699 ///
700 /// # Examples
701 ///
702 /// ```
703 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
704 /// # use h2::client::*;
705 /// # use bytes::Bytes;
706 /// #
707 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
708 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
709 /// # {
710 /// // `client_fut` is a future representing the completion of the HTTP/2
711 /// // handshake.
712 /// let client_fut = Builder::new()
713 /// .initial_connection_window_size(1_000_000)
714 /// .handshake(my_io);
715 /// # client_fut.await
716 /// # }
717 /// #
718 /// # pub fn main() {}
719 /// ```
720 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
721 self.initial_target_connection_window_size = Some(size);
722 self
723 }
724
725 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
726 /// configured client is able to accept.
727 ///
728 /// The sender may send data frames that are **smaller** than this value,
729 /// but any data larger than `max` will be broken up into multiple `DATA`
730 /// frames.
731 ///
732 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
733 ///
734 /// # Examples
735 ///
736 /// ```
737 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
738 /// # use h2::client::*;
739 /// # use bytes::Bytes;
740 /// #
741 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
742 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
743 /// # {
744 /// // `client_fut` is a future representing the completion of the HTTP/2
745 /// // handshake.
746 /// let client_fut = Builder::new().max_frame_size(1_000_000).handshake(my_io);
747 /// # client_fut.await
748 /// # }
749 /// #
750 /// # pub fn main() {}
751 /// ```
752 ///
753 /// # Panics
754 ///
755 /// This function panics if `max` is not within the legal range specified
756 /// above.
757 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
758 self.settings.set_max_frame_size(Some(max));
759 self
760 }
761
762 /// Sets the max size of received header frames.
763 ///
764 /// This advisory setting informs a peer of the maximum size of header list
765 /// that the sender is prepared to accept, in octets. The value is based on
766 /// the uncompressed size of header fields, including the length of the name
767 /// and value in octets plus an overhead of 32 octets for each header field.
768 ///
769 /// This setting is also used to limit the maximum amount of data that is
770 /// buffered to decode HEADERS frames.
771 ///
772 /// # Examples
773 ///
774 /// ```
775 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
776 /// # use h2::client::*;
777 /// # use bytes::Bytes;
778 /// #
779 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
780 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
781 /// # {
782 /// // `client_fut` is a future representing the completion of the HTTP/2
783 /// // handshake.
784 /// let client_fut = Builder::new()
785 /// .max_header_list_size(16 * 1024)
786 /// .handshake(my_io);
787 /// # client_fut.await
788 /// # }
789 /// #
790 /// # pub fn main() {}
791 /// ```
792 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
793 self.settings.set_max_header_list_size(Some(max));
794 self
795 }
796
797 /// Sets the maximum number of concurrent streams.
798 ///
799 /// The maximum concurrent streams setting only controls the maximum number
800 /// of streams that can be initiated by the remote peer. In other words,
801 /// when this setting is set to 100, this does not limit the number of
802 /// concurrent streams that can be created by the caller.
803 ///
804 /// It is recommended that this value be no smaller than 100, so as to not
805 /// unnecessarily limit parallelism. However, any value is legal, including
806 /// 0. If `max` is set to 0, then the remote will not be permitted to
807 /// initiate streams.
808 ///
809 /// Note that streams in the reserved state, i.e., push promises that have
810 /// been reserved but the stream has not started, do not count against this
811 /// setting.
812 ///
813 /// Also note that if the remote *does* exceed the value set here, it is not
814 /// a protocol level error. Instead, the `h2` library will immediately reset
815 /// the stream.
816 ///
817 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
818 ///
819 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
820 ///
821 /// # Examples
822 ///
823 /// ```
824 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
825 /// # use h2::client::*;
826 /// # use bytes::Bytes;
827 /// #
828 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
829 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
830 /// # {
831 /// // `client_fut` is a future representing the completion of the HTTP/2
832 /// // handshake.
833 /// let client_fut = Builder::new().max_concurrent_streams(1000).handshake(my_io);
834 /// # client_fut.await
835 /// # }
836 /// #
837 /// # pub fn main() {}
838 /// ```
839 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
840 self.settings.set_max_concurrent_streams(Some(max));
841 self
842 }
843
844 /// Sets the initial maximum of locally initiated (send) streams.
845 ///
846 /// The initial settings will be overwritten by the remote peer when
847 /// the Settings frame is received. The new value will be set to the
848 /// `max_concurrent_streams()` from the frame.
849 ///
850 /// This setting prevents the caller from exceeding this number of
851 /// streams that are counted towards the concurrency limit.
852 ///
853 /// Sending streams past the limit returned by the peer will be treated
854 /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
855 ///
856 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
857 ///
858 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
859 ///
860 /// # Examples
861 ///
862 /// ```
863 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
864 /// # use h2::client::*;
865 /// # use bytes::Bytes;
866 /// #
867 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
868 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
869 /// # {
870 /// // `client_fut` is a future representing the completion of the HTTP/2
871 /// // handshake.
872 /// let client_fut = Builder::new()
873 /// .initial_max_send_streams(1000)
874 /// .handshake(my_io);
875 /// # client_fut.await
876 /// # }
877 /// #
878 /// # pub fn main() {}
879 /// ```
880 pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
881 self.initial_max_send_streams = initial;
882 self
883 }
884
885 /// Sets the maximum number of concurrent locally reset streams.
886 ///
887 /// When a stream is explicitly reset, the HTTP/2 specification requires
888 /// that any further frames received for that stream must be ignored for
889 /// "some time".
890 ///
891 /// In order to satisfy the specification, internal state must be maintained
892 /// to implement the behavior. This state grows linearly with the number of
893 /// streams that are locally reset.
894 ///
895 /// The `max_concurrent_reset_streams` setting configures sets an upper
896 /// bound on the amount of state that is maintained. When this max value is
897 /// reached, the oldest reset stream is purged from memory.
898 ///
899 /// Once the stream has been fully purged from memory, any additional frames
900 /// received for that stream will result in a connection level protocol
901 /// error, forcing the connection to terminate.
902 ///
903 /// The default value is 10.
904 ///
905 /// # Examples
906 ///
907 /// ```
908 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
909 /// # use h2::client::*;
910 /// # use bytes::Bytes;
911 /// #
912 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
913 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
914 /// # {
915 /// // `client_fut` is a future representing the completion of the HTTP/2
916 /// // handshake.
917 /// let client_fut = Builder::new()
918 /// .max_concurrent_reset_streams(1000)
919 /// .handshake(my_io);
920 /// # client_fut.await
921 /// # }
922 /// #
923 /// # pub fn main() {}
924 /// ```
925 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
926 self.reset_stream_max = max;
927 self
928 }
929
930 /// Sets the duration to remember locally reset streams.
931 ///
932 /// When a stream is explicitly reset, the HTTP/2 specification requires
933 /// that any further frames received for that stream must be ignored for
934 /// "some time".
935 ///
936 /// In order to satisfy the specification, internal state must be maintained
937 /// to implement the behavior. This state grows linearly with the number of
938 /// streams that are locally reset.
939 ///
940 /// The `reset_stream_duration` setting configures the max amount of time
941 /// this state will be maintained in memory. Once the duration elapses, the
942 /// stream state is purged from memory.
943 ///
944 /// Once the stream has been fully purged from memory, any additional frames
945 /// received for that stream will result in a connection level protocol
946 /// error, forcing the connection to terminate.
947 ///
948 /// The default value is 30 seconds.
949 ///
950 /// # Examples
951 ///
952 /// ```
953 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
954 /// # use h2::client::*;
955 /// # use std::time::Duration;
956 /// # use bytes::Bytes;
957 /// #
958 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
959 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
960 /// # {
961 /// // `client_fut` is a future representing the completion of the HTTP/2
962 /// // handshake.
963 /// let client_fut = Builder::new()
964 /// .reset_stream_duration(Duration::from_secs(10))
965 /// .handshake(my_io);
966 /// # client_fut.await
967 /// # }
968 /// #
969 /// # pub fn main() {}
970 /// ```
971 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
972 self.reset_stream_duration = dur;
973 self
974 }
975
976 /// Sets the maximum send buffer size per stream.
977 ///
978 /// Once a stream has buffered up to (or over) the maximum, the stream's
979 /// flow control will not "poll" additional capacity. Once bytes for the
980 /// stream have been written to the connection, the send buffer capacity
981 /// will be freed up again.
982 ///
983 /// The default is currently ~400MB, but may change.
984 ///
985 /// # Panics
986 ///
987 /// This function panics if `max` is larger than `u32::MAX`.
988 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
989 assert!(max <= u32::MAX as usize);
990 self.max_send_buffer_size = max;
991 self
992 }
993
994 /// Enables or disables server push promises.
995 ///
996 /// This value is included in the initial SETTINGS handshake.
997 /// Setting this value to value to
998 /// false in the initial SETTINGS handshake guarantees that the remote server
999 /// will never send a push promise.
1000 ///
1001 /// This setting can be changed during the life of a single HTTP/2
1002 /// connection by sending another settings frame updating the value.
1003 ///
1004 /// Default value: `true`.
1005 ///
1006 /// # Examples
1007 ///
1008 /// ```
1009 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
1010 /// # use h2::client::*;
1011 /// # use std::time::Duration;
1012 /// # use bytes::Bytes;
1013 /// #
1014 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
1015 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1016 /// # {
1017 /// // `client_fut` is a future representing the completion of the HTTP/2
1018 /// // handshake.
1019 /// let client_fut = Builder::new().enable_push(false).handshake(my_io);
1020 /// # client_fut.await
1021 /// # }
1022 /// #
1023 /// # pub fn main() {}
1024 /// ```
1025 pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1026 self.settings.set_enable_push(enabled);
1027 self
1028 }
1029
1030 /// Sets the first stream ID to something other than 1.
1031 #[cfg(feature = "unstable")]
1032 pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1033 self.stream_id = stream_id.into();
1034 assert!(
1035 self.stream_id.is_client_initiated(),
1036 "stream id must be odd"
1037 );
1038 self
1039 }
1040
1041 /// Creates a new configured HTTP/2 client backed by `io`.
1042 ///
1043 /// It is expected that `io` already be in an appropriate state to commence
1044 /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1045 /// preface and the initial settings frame is sent by the client.
1046 ///
1047 /// The handshake future does not wait for the initial settings frame from the
1048 /// server.
1049 ///
1050 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1051 /// tuple once the HTTP/2 handshake has been completed.
1052 ///
1053 /// This function also allows the caller to configure the send payload data
1054 /// type. See [Outbound data type] for more details.
1055 ///
1056 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1057 /// [`Connection`]: struct.Connection.html
1058 /// [`SendRequest`]: struct.SendRequest.html
1059 /// [Outbound data type]: ../index.html#outbound-data-type.
1060 ///
1061 /// # Examples
1062 ///
1063 /// Basic usage:
1064 ///
1065 /// ```
1066 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
1067 /// # use h2::client::*;
1068 /// # use bytes::Bytes;
1069 /// #
1070 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
1071 /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1072 /// # {
1073 /// // `client_fut` is a future representing the completion of the HTTP/2
1074 /// // handshake.
1075 /// let client_fut = Builder::new()
1076 /// .handshake(my_io);
1077 /// # client_fut.await
1078 /// # }
1079 /// #
1080 /// # pub fn main() {}
1081 /// ```
1082 ///
1083 /// Configures the send-payload data type. In this case, the outbound data
1084 /// type will be `&'static [u8]`.
1085 ///
1086 /// ```
1087 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
1088 /// # use h2::client::*;
1089 /// #
1090 /// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
1091 /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1092 /// # {
1093 /// // `client_fut` is a future representing the completion of the HTTP/2
1094 /// // handshake.
1095 /// let client_fut = Builder::new().handshake::<_, &'static [u8]>(my_io);
1096 /// # client_fut.await
1097 /// # }
1098 /// #
1099 /// # pub fn main() {}
1100 /// ```
1101 pub fn handshake<T, B>(
1102 &self,
1103 io: T,
1104 ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::h2::Error>>
1105 where
1106 T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1107 B: Buf + 'static,
1108 {
1109 Connection::handshake2(io, self.clone())
1110 }
1111}
1112
1113impl Default for Builder {
1114 fn default() -> Builder {
1115 Builder::new()
1116 }
1117}
1118
1119/// Creates a new configured HTTP/2 client with default configuration
1120/// values backed by `io`.
1121///
1122/// It is expected that `io` already be in an appropriate state to commence
1123/// the [HTTP/2 handshake]. See [Handshake] for more details.
1124///
1125/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1126/// tuple once the HTTP/2 handshake has been completed. The returned
1127/// [`Connection`] instance will be using default configuration values. Use
1128/// [`Builder`] to customize the configuration values used by a [`Connection`]
1129/// instance.
1130///
1131/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1132/// [Handshake]: ../index.html#handshake
1133/// [`Connection`]: struct.Connection.html
1134/// [`SendRequest`]: struct.SendRequest.html
1135///
1136/// # Examples
1137///
1138/// ```
1139/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
1140/// # use h2::client;
1141/// # use h2::client::*;
1142/// #
1143/// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T) -> Result<(), h2::Error>
1144/// # {
1145/// let (send_request, connection) = client::handshake(my_io).await?;
1146/// // The HTTP/2 handshake has completed, now start polling
1147/// // `connection` and use `send_request` to send requests to the
1148/// // server.
1149/// # Ok(())
1150/// # }
1151/// #
1152/// # pub fn main() {}
1153/// ```
1154pub async fn handshake<T>(
1155 io: T,
1156) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::h2::Error>
1157where
1158 T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1159{
1160 let builder = Builder::new();
1161 builder
1162 .handshake(io)
1163 .instrument(tracing::trace_span!("client_handshake"))
1164 .await
1165}
1166
1167// ===== impl Connection =====
1168
1169async fn bind_connection<T>(io: &mut T) -> Result<(), crate::h2::Error>
1170where
1171 T: AsyncReadRent + AsyncWriteRent + Unpin,
1172{
1173 tracing::debug!("binding client connection");
1174
1175 let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1176 let (res, _) = io.write_all(msg).await;
1177
1178 res.map_err(crate::h2::Error::from_io)?;
1179
1180 tracing::debug!("client connection bound");
1181
1182 Ok(())
1183}
1184
1185impl<T, B> Connection<T, B>
1186where
1187 T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1188 B: Buf + 'static,
1189{
1190 async fn handshake2(
1191 mut io: T,
1192 builder: Builder,
1193 ) -> Result<(SendRequest<B>, Connection<T, B>), crate::h2::Error> {
1194 bind_connection(&mut io).await?;
1195
1196 // Create the codec
1197 let mut codec = Codec::new(io);
1198
1199 if let Some(max) = builder.settings.max_frame_size() {
1200 codec.set_max_recv_frame_size(max as usize);
1201 }
1202
1203 if let Some(max) = builder.settings.max_header_list_size() {
1204 codec.set_max_recv_header_list_size(max as usize);
1205 }
1206
1207 // Send initial settings frame
1208 codec
1209 .buffer(builder.settings.clone().into())
1210 .expect("invalid SETTINGS frame");
1211
1212 let inner = proto::Connection::new(
1213 codec,
1214 proto::Config {
1215 next_stream_id: builder.stream_id,
1216 initial_max_send_streams: builder.initial_max_send_streams,
1217 max_send_buffer_size: builder.max_send_buffer_size,
1218 reset_stream_duration: builder.reset_stream_duration,
1219 reset_stream_max: builder.reset_stream_max,
1220 settings: builder.settings.clone(),
1221 },
1222 );
1223 let send_request = SendRequest {
1224 inner: inner.streams().clone(),
1225 pending: None,
1226 };
1227
1228 let mut connection = Connection { inner };
1229 if let Some(sz) = builder.initial_target_connection_window_size {
1230 connection.set_target_window_size(sz);
1231 }
1232
1233 Ok((send_request, connection))
1234 }
1235
1236 /// Sets the target window size for the whole connection.
1237 ///
1238 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1239 /// frame will be immediately sent to the remote, increasing the connection
1240 /// level window by `size - current_value`.
1241 ///
1242 /// If `size` is less than the current value, nothing will happen
1243 /// immediately. However, as window capacity is released by
1244 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1245 /// out until the number of "in flight" bytes drops below `size`.
1246 ///
1247 /// The default value is 65,535.
1248 ///
1249 /// See [`FlowControl`] documentation for more details.
1250 ///
1251 /// [`FlowControl`]: ../struct.FlowControl.html
1252 /// [library level]: ../index.html#flow-control
1253 pub fn set_target_window_size(&mut self, size: u32) {
1254 assert!(size <= proto::MAX_WINDOW_SIZE);
1255 self.inner.set_target_window_size(size);
1256 }
1257
1258 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1259 /// flow control for received data.
1260 ///
1261 /// The `SETTINGS` will be sent to the remote, and only applied once the
1262 /// remote acknowledges the change.
1263 ///
1264 /// This can be used to increase or decrease the window size for existing
1265 /// streams.
1266 ///
1267 /// # Errors
1268 ///
1269 /// Returns an error if a previous call is still pending acknowledgement
1270 /// from the remote endpoint.
1271 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::h2::Error> {
1272 assert!(size <= proto::MAX_WINDOW_SIZE);
1273 self.inner.set_initial_window_size(size)?;
1274 Ok(())
1275 }
1276
1277 /// Takes a `PingPong` instance from the connection.
1278 ///
1279 /// # Note
1280 ///
1281 /// This may only be called once. Calling multiple times will return `None`.
1282 pub fn ping_pong(&mut self) -> Option<PingPong> {
1283 self.inner.take_user_pings().map(PingPong::new)
1284 }
1285
1286 /// Returns the maximum number of concurrent streams that may be initiated
1287 /// by this client.
1288 ///
1289 /// This limit is configured by the server peer by sending the
1290 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1291 /// This method returns the currently acknowledged value received from the
1292 /// remote.
1293 ///
1294 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1295 pub fn max_concurrent_send_streams(&self) -> usize {
1296 self.inner.max_send_streams()
1297 }
1298 /// Returns the maximum number of concurrent streams that may be initiated
1299 /// by the server on this connection.
1300 ///
1301 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1302 /// parameter][1] sent in a `SETTINGS` frame that has been
1303 /// acknowledged by the remote peer. The value to be sent is configured by
1304 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1305 /// with the remote peer.
1306 ///
1307 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1308 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1309 pub fn max_concurrent_recv_streams(&self) -> usize {
1310 self.inner.max_recv_streams()
1311 }
1312}
1313
1314impl<T, B> Future for Connection<T, B>
1315where
1316 T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1317 B: Buf + 'static,
1318{
1319 type Output = Result<(), crate::h2::Error>;
1320
1321 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1322 self.inner.maybe_close_connection_if_no_streams();
1323 self.inner.poll(cx).map_err(Into::into)
1324 }
1325}
1326
1327impl<T, B> fmt::Debug for Connection<T, B>
1328where
1329 T: AsyncReadRent + AsyncWriteRent,
1330 T: fmt::Debug,
1331 B: fmt::Debug + Buf,
1332{
1333 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1334 fmt::Debug::fmt(&self.inner, fmt)
1335 }
1336}
1337
1338// ===== impl ResponseFuture =====
1339
1340impl Future for ResponseFuture {
1341 type Output = Result<Response<RecvStream>, crate::h2::Error>;
1342
1343 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1344 let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1345 let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1346
1347 Poll::Ready(Ok(Response::from_parts(parts, body)))
1348 }
1349}
1350
1351impl ResponseFuture {
1352 /// Returns the stream ID of the response stream.
1353 ///
1354 /// # Panics
1355 ///
1356 /// If the lock on the stream store has been poisoned.
1357 pub fn stream_id(&self) -> crate::h2::StreamId {
1358 crate::h2::StreamId::from_internal(self.inner.stream_id())
1359 }
1360 /// Returns a stream of PushPromises
1361 ///
1362 /// # Panics
1363 ///
1364 /// If this method has been called before
1365 /// or the stream was itself was pushed
1366 pub fn push_promises(&mut self) -> PushPromises {
1367 if self.push_promise_consumed {
1368 panic!("Reference to push promises stream taken!");
1369 }
1370 self.push_promise_consumed = true;
1371 PushPromises {
1372 inner: self.inner.clone(),
1373 }
1374 }
1375}
1376
1377// ===== impl PushPromises =====
1378
1379impl PushPromises {
1380 /// Get the next `PushPromise`.
1381 pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::h2::Error>> {
1382 std::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
1383 }
1384
1385 #[doc(hidden)]
1386 pub fn poll_push_promise(
1387 &mut self,
1388 cx: &Context<'_>,
1389 ) -> Poll<Option<Result<PushPromise, crate::h2::Error>>> {
1390 match self.inner.poll_pushed(cx) {
1391 Poll::Ready(Some(Ok((request, response)))) => {
1392 let response = PushedResponseFuture {
1393 inner: ResponseFuture {
1394 inner: response,
1395 push_promise_consumed: false,
1396 },
1397 };
1398 Poll::Ready(Some(Ok(PushPromise { request, response })))
1399 }
1400 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1401 Poll::Ready(None) => Poll::Ready(None),
1402 Poll::Pending => Poll::Pending,
1403 }
1404 }
1405}
1406
1407#[cfg(feature = "stream")]
1408impl futures_core::Stream for PushPromises {
1409 type Item = Result<PushPromise, crate::h2::Error>;
1410
1411 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1412 self.poll_push_promise(cx)
1413 }
1414}
1415
1416// ===== impl PushPromise =====
1417
1418impl PushPromise {
1419 /// Returns a reference to the push promise's request headers.
1420 pub fn request(&self) -> &Request<()> {
1421 &self.request
1422 }
1423
1424 /// Returns a mutable reference to the push promise's request headers.
1425 pub fn request_mut(&mut self) -> &mut Request<()> {
1426 &mut self.request
1427 }
1428
1429 /// Consumes `self`, returning the push promise's request headers and
1430 /// response future.
1431 pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1432 (self.request, self.response)
1433 }
1434}
1435
1436// ===== impl PushedResponseFuture =====
1437
1438impl Future for PushedResponseFuture {
1439 type Output = Result<Response<RecvStream>, crate::h2::Error>;
1440
1441 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1442 Pin::new(&mut self.inner).poll(cx)
1443 }
1444}
1445
1446impl PushedResponseFuture {
1447 /// Returns the stream ID of the response stream.
1448 ///
1449 /// # Panics
1450 ///
1451 /// If the lock on the stream store has been poisoned.
1452 pub fn stream_id(&self) -> crate::h2::StreamId {
1453 self.inner.stream_id()
1454 }
1455}
1456
1457// ===== impl Peer =====
1458
1459impl Peer {
1460 pub fn convert_send_message(
1461 id: StreamId,
1462 request: Request<()>,
1463 protocol: Option<Protocol>,
1464 end_of_stream: bool,
1465 ) -> Result<Headers, SendError> {
1466 use http::request::Parts;
1467
1468 let (
1469 Parts {
1470 method,
1471 uri,
1472 headers,
1473 version,
1474 ..
1475 },
1476 _,
1477 ) = request.into_parts();
1478
1479 let is_connect = method == Method::CONNECT;
1480
1481 // Build the set pseudo header set. All requests will include `method`
1482 // and `path`.
1483 let mut pseudo = Pseudo::request(method, uri, protocol);
1484
1485 if pseudo.scheme.is_none() {
1486 // If the scheme is not set, then there are a two options.
1487 //
1488 // 1) Authority is not set. In this case, a request was issued with a relative URI. This
1489 // is permitted **only** when forwarding HTTP 1.x requests. If the HTTP version is
1490 // set to 2.0, then this is an error.
1491 //
1492 // 2) Authority is set, then the HTTP method *must* be CONNECT.
1493 //
1494 // It is not possible to have a scheme but not an authority set (the
1495 // `http` crate does not allow it).
1496 //
1497 if pseudo.authority.is_none() {
1498 if version == Version::HTTP_2 {
1499 return Err(UserError::MissingUriSchemeAndAuthority.into());
1500 } else {
1501 // This is acceptable as per the above comment. However,
1502 // HTTP/2 requires that a scheme is set. Since we are
1503 // forwarding an HTTP 1.1 request, the scheme is set to
1504 // "http".
1505 pseudo.set_scheme(uri::Scheme::HTTP);
1506 }
1507 } else if !is_connect {
1508 // TODO: Error
1509 }
1510 }
1511
1512 // Create the HEADERS frame
1513 let mut frame = Headers::new(id, pseudo, headers);
1514
1515 if end_of_stream {
1516 frame.set_end_stream()
1517 }
1518
1519 Ok(frame)
1520 }
1521}
1522
1523impl proto::Peer for Peer {
1524 type Poll = Response<()>;
1525
1526 const NAME: &'static str = "Client";
1527
1528 fn r#dyn() -> proto::DynPeer {
1529 proto::DynPeer::Client
1530 }
1531
1532 fn is_server() -> bool {
1533 false
1534 }
1535
1536 fn convert_poll_message(
1537 pseudo: Pseudo,
1538 fields: HeaderMap,
1539 stream_id: StreamId,
1540 ) -> Result<Self::Poll, Error> {
1541 let mut b = Response::builder();
1542
1543 b = b.version(Version::HTTP_2);
1544
1545 if let Some(status) = pseudo.status {
1546 b = b.status(status);
1547 }
1548
1549 let mut response = match b.body(()) {
1550 Ok(response) => response,
1551 Err(_) => {
1552 // TODO: Should there be more specialized handling for different
1553 // kinds of errors
1554 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1555 }
1556 };
1557
1558 *response.headers_mut() = fields;
1559
1560 Ok(response)
1561 }
1562}