monoio_http/h2/
server.rs

1//! Server implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Monoio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/h2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use h2::server;
68//! use http::{Response, StatusCode};
69//! use monoio::net::TcpListener;
70//!
71//! #[monoio::main]
72//! pub async fn main() {
73//!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//!     // Accept all incoming TCP connections.
76//!     loop {
77//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78//!             // Spawn a new task to process each connection.
79//!             monoio::spawn(async {
80//!                 // Start the HTTP/2 connection handshake
81//!                 let mut h2 = server::handshake(socket).await.unwrap();
82//!                 // Accept all inbound HTTP/2 streams sent over the
83//!                 // connection.
84//!                 while let Some(request) = h2.accept().await {
85//!                     let (request, mut respond) = request.unwrap();
86//!                     println!("Received request: {:?}", request);
87//!
88//!                     // Build a response with no body
89//!                     let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
90//!
91//!                     // Send the response back to the client
92//!                     respond.send_response(response, true).unwrap();
93//!                 }
94//!             });
95//!         }
96//!     }
97//! }
98//! ```
99//!
100//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
101//! [`handshake`]: fn.handshake.html
102//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
103//! [`Builder`]: struct.Builder.html
104//! [`Connection`]: struct.Connection.html
105//! [`Connection::poll`]: struct.Connection.html#method.poll
106//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
107//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
108//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
109//! [`RecvStream`]: ../struct.RecvStream.html
110//! [`SendStream`]: ../struct.SendStream.html
111//! [`TcpListener`]: https://docs.rs/monoio/latest/monoio/net/tcp/struct.TcpListener.html
112
113use std::{
114    cell::UnsafeCell,
115    fmt,
116    future::Future,
117    io,
118    pin::Pin,
119    task::{Context, Poll},
120    time::Duration,
121};
122
123use bytes::{Buf, Bytes, BytesMut};
124use http::{HeaderMap, Method, Request, Response};
125use monoio::{
126    io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent},
127    BufResult,
128};
129use monoio_compat::box_future::MaybeArmedBoxFuture;
130// use monoio::io::{AsyncReadRent, AsyncWriteRent, AsyncReadRentExt};
131use tracing::instrument::{Instrument, Instrumented};
132
133use crate::h2::{
134    codec::{Codec, UserError},
135    frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId},
136    proto::{self, Config, Error, Prioritized},
137    FlowControl, PingPong, RecvStream, SendStream,
138};
139
140/// In progress HTTP/2 connection handshake future.
141///
142/// This type implements `Future`, yielding a `Connection` instance once the
143/// handshake has completed.
144///
145/// The handshake is completed once the connection preface is fully received
146/// from the client **and** the initial settings frame is sent to the client.
147///
148/// The handshake future does not wait for the initial settings frame from the
149/// client.
150///
151/// See [module] level docs for more details.
152///
153/// [module]: index.html
154#[must_use = "futures do nothing unless polled"]
155pub struct Handshake<T, B: Buf = Bytes> {
156    /// The config to pass to Connection::new after handshake succeeds.
157    builder: Builder,
158    /// The current state of the handshake.
159    state: Handshaking<T, B>,
160    /// Span tracking the handshake
161    span: tracing::Span,
162}
163
164/// Accepts inbound HTTP/2 streams on a connection.
165///
166/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
167/// implements the HTTP/2 server logic for that connection. It is responsible
168/// for receiving inbound streams initiated by the client as well as driving the
169/// internal state forward.
170///
171/// `Connection` values are created by calling [`handshake`]. Once a
172/// `Connection` value is obtained, the caller must call [`poll`] or
173/// [`poll_close`] in order to drive the internal connection state forward.
174///
175/// See [module level] documentation for more details
176///
177/// [module level]: index.html
178/// [`handshake`]: struct.Connection.html#method.handshake
179/// [`poll`]: struct.Connection.html#method.poll
180/// [`poll_close`]: struct.Connection.html#method.poll_close
181///
182/// # Examples
183///
184/// ```
185/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
186/// # use h2::server;
187/// # use h2::server::*;
188/// #
189/// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T) {
190/// let mut server = server::handshake(my_io).await.unwrap();
191/// while let Some(request) = server.accept().await {
192///     monoio::spawn(async move {
193///         let (request, respond) = request.unwrap();
194///         // Process the request and send the response back to the client
195///         // using `respond`.
196///     });
197/// }
198/// # }
199/// #
200/// # pub fn main() {}
201/// ```
202#[must_use = "streams do nothing unless polled"]
203pub struct Connection<T, B: Buf> {
204    connection: proto::Connection<T, Peer, B>,
205}
206
207/// Builds server connections with custom configuration values.
208///
209/// Methods can be chained in order to set the configuration values.
210///
211/// The server is constructed by calling [`handshake`] and passing the I/O
212/// handle that will back the HTTP/2 server.
213///
214/// New instances of `Builder` are obtained via [`Builder::new`].
215///
216/// See function level documentation for details on the various server
217/// configuration settings.
218///
219/// [`Builder::new`]: struct.Builder.html#method.new
220/// [`handshake`]: struct.Builder.html#method.handshake
221///
222/// # Examples
223///
224/// ```
225/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
226/// # use h2::server::*;
227/// #
228/// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
229/// # -> Handshake<T>
230/// # {
231/// // `server_fut` is a future representing the completion of the HTTP/2
232/// // handshake.
233/// let server_fut = Builder::new()
234///     .initial_window_size(1_000_000)
235///     .max_concurrent_streams(1000)
236///     .handshake(my_io);
237/// # server_fut
238/// # }
239/// #
240/// # pub fn main() {}
241/// ```
242#[derive(Clone, Debug)]
243pub struct Builder {
244    /// Time to keep locally reset streams around before reaping.
245    reset_stream_duration: Duration,
246
247    /// Maximum number of locally reset streams to keep at a time.
248    reset_stream_max: usize,
249
250    /// Initial `Settings` frame to send as part of the handshake.
251    settings: Settings,
252
253    /// Initial target window size for new connections.
254    initial_target_connection_window_size: Option<u32>,
255
256    /// Maximum amount of bytes to "buffer" for writing per stream.
257    max_send_buffer_size: usize,
258}
259
260/// Send a response back to the client
261///
262/// A `SendResponse` instance is provided when receiving a request and is used
263/// to send the associated response back to the client. It is also used to
264/// explicitly reset the stream with a custom reason.
265///
266/// It will also be used to initiate push promises linked with the associated
267/// stream.
268///
269/// If the `SendResponse` instance is dropped without sending a response, then
270/// the HTTP/2 stream will be reset.
271///
272/// See [module] level docs for more details.
273///
274/// [module]: index.html
275#[derive(Debug)]
276pub struct SendResponse<B: Buf> {
277    inner: proto::StreamRef<B>,
278}
279
280/// Send a response to a promised request
281///
282/// A `SendPushedResponse` instance is provided when promising a request and is used
283/// to send the associated response to the client. It is also used to
284/// explicitly reset the stream with a custom reason.
285///
286/// It can not be used to initiate push promises.
287///
288/// If the `SendPushedResponse` instance is dropped without sending a response, then
289/// the HTTP/2 stream will be reset.
290///
291/// See [module] level docs for more details.
292///
293/// [module]: index.html
294pub struct SendPushedResponse<B: Buf> {
295    inner: SendResponse<B>,
296}
297
298// Manual implementation necessary because of rust-lang/rust#26925
299impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
300    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
301        write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
302    }
303}
304
305/// Stages of an in-progress handshake.
306enum Handshaking<T, B: Buf> {
307    /// State 1. Connection is flushing pending SETTINGS frame.
308    Flushing(Instrumented<Flush<T, Prioritized<B>>>),
309    /// State 2. Connection is waiting for the client preface.
310    ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
311    /// State 3. Handshake is done, polling again would panic.
312    Done,
313}
314
315/// Flush a Sink
316struct Flush<T, B> {
317    codec: Option<Codec<T, B>>,
318}
319
320/// Read the client connection preface
321struct ReadPreface<T, B> {
322    read_fut: MaybeArmedBoxFuture<BufResult<usize, BytesMut>>, //    pos: usize,
323    // Put it at the last to make sure futures depending on it drop first.
324    codec: Option<UnsafeCell<Codec<T, B>>>,
325}
326
327#[derive(Debug)]
328pub(crate) struct Peer;
329
330const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
331
332/// Creates a new configured HTTP/2 server with default configuration
333/// values backed by `io`.
334///
335/// It is expected that `io` already be in an appropriate state to commence
336/// the [HTTP/2 handshake]. See [Handshake] for more details.
337///
338/// Returns a future which resolves to the [`Connection`] instance once the
339/// HTTP/2 handshake has been completed. The returned [`Connection`]
340/// instance will be using default configuration values. Use [`Builder`] to
341/// customize the configuration values used by a [`Connection`] instance.
342///
343/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
344/// [Handshake]: ../index.html#handshake
345/// [`Connection`]: struct.Connection.html
346///
347/// # Examples
348///
349/// ```
350/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
351/// # use h2::server;
352/// # use h2::server::*;
353/// #
354/// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
355/// # {
356/// let connection = server::handshake(my_io).await.unwrap();
357/// // The HTTP/2 handshake has completed, now use `connection` to
358/// // accept inbound HTTP/2 streams.
359/// # }
360/// #
361/// # pub fn main() {}
362/// ```
363pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
364where
365    T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
366{
367    Builder::new().handshake(io)
368}
369
370// ===== impl Connection =====
371
372impl<T, B> Connection<T, B>
373where
374    T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
375    B: Buf + 'static,
376{
377    fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
378        let span = tracing::trace_span!("server_handshake");
379        let entered = span.enter();
380
381        // Create the codec.
382        let mut codec = Codec::new(io);
383
384        if let Some(max) = builder.settings.max_frame_size() {
385            codec.set_max_recv_frame_size(max as usize);
386        }
387
388        if let Some(max) = builder.settings.max_header_list_size() {
389            codec.set_max_recv_header_list_size(max as usize);
390        }
391
392        // Send initial settings frame.
393        codec
394            .buffer(builder.settings.clone().into())
395            .expect("invalid SETTINGS frame");
396
397        // Create the handshake future.
398        let state =
399            Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
400
401        drop(entered);
402
403        Handshake {
404            builder,
405            state,
406            span,
407        }
408    }
409
410    /// Accept the next incoming request on this connection.
411    pub async fn accept(
412        &mut self,
413    ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::h2::Error>> {
414        std::future::poll_fn(move |cx| self.poll_accept(cx)).await
415    }
416
417    #[doc(hidden)]
418    pub fn poll_accept(
419        &mut self,
420        cx: &mut Context<'_>,
421    ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::h2::Error>>> {
422        // Always try to advance the internal state. Getting Pending also is
423        // needed to allow this function to return Pending.
424        if self.poll_closed(cx)?.is_ready() {
425            // If the socket is closed, don't return anything
426            // TODO: drop any pending streams
427            return Poll::Ready(None);
428        }
429
430        if let Some(inner) = self.connection.next_incoming() {
431            tracing::trace!("received incoming");
432            let (head, _) = inner.take_request().into_parts();
433            let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
434
435            let request = Request::from_parts(head, body);
436            let respond = SendResponse { inner };
437
438            return Poll::Ready(Some(Ok((request, respond))));
439        }
440
441        Poll::Pending
442    }
443
444    /// Sets the target window size for the whole connection.
445    ///
446    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
447    /// frame will be immediately sent to the remote, increasing the connection
448    /// level window by `size - current_value`.
449    ///
450    /// If `size` is less than the current value, nothing will happen
451    /// immediately. However, as window capacity is released by
452    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
453    /// out until the number of "in flight" bytes drops below `size`.
454    ///
455    /// The default value is 65,535.
456    ///
457    /// See [`FlowControl`] documentation for more details.
458    ///
459    /// [`FlowControl`]: ../struct.FlowControl.html
460    /// [library level]: ../index.html#flow-control
461    pub fn set_target_window_size(&mut self, size: u32) {
462        assert!(size <= proto::MAX_WINDOW_SIZE);
463        self.connection.set_target_window_size(size);
464    }
465
466    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
467    /// flow control for received data.
468    ///
469    /// The `SETTINGS` will be sent to the remote, and only applied once the
470    /// remote acknowledges the change.
471    ///
472    /// This can be used to increase or decrease the window size for existing
473    /// streams.
474    ///
475    /// # Errors
476    ///
477    /// Returns an error if a previous call is still pending acknowledgement
478    /// from the remote endpoint.
479    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::h2::Error> {
480        assert!(size <= proto::MAX_WINDOW_SIZE);
481        self.connection.set_initial_window_size(size)?;
482        Ok(())
483    }
484
485    /// Enables the [extended CONNECT protocol].
486    ///
487    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
488    ///
489    /// # Errors
490    ///
491    /// Returns an error if a previous call is still pending acknowledgement
492    /// from the remote endpoint.
493    pub fn enable_connect_protocol(&mut self) -> Result<(), crate::h2::Error> {
494        self.connection.set_enable_connect_protocol()?;
495        Ok(())
496    }
497
498    /// Returns `Ready` when the underlying connection has closed.
499    ///
500    /// If any new inbound streams are received during a call to `poll_closed`,
501    /// they will be queued and returned on the next call to [`poll_accept`].
502    ///
503    /// This function will advance the internal connection state, driving
504    /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
505    ///
506    /// See [here](index.html#managing-the-connection) for more details.
507    ///
508    /// [`poll_accept`]: struct.Connection.html#method.poll_accept
509    /// [`RecvStream`]: ../struct.RecvStream.html
510    /// [`SendStream`]: ../struct.SendStream.html
511    pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::h2::Error>> {
512        self.connection.poll(cx).map_err(Into::into)
513    }
514
515    #[doc(hidden)]
516    #[deprecated(note = "renamed to poll_closed")]
517    pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::h2::Error>> {
518        self.poll_closed(cx)
519    }
520
521    /// Sets the connection to a GOAWAY state.
522    ///
523    /// Does not terminate the connection. Must continue being polled to close
524    /// connection.
525    ///
526    /// After flushing the GOAWAY frame, the connection is closed. Any
527    /// outstanding streams do not prevent the connection from closing. This
528    /// should usually be reserved for shutting down when something bad
529    /// external to `h2` has happened, and open streams cannot be properly
530    /// handled.
531    ///
532    /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
533    pub fn abrupt_shutdown(&mut self, reason: Reason) {
534        self.connection.go_away_from_user(reason);
535    }
536
537    /// Starts a [graceful shutdown][1] process.
538    ///
539    /// Must continue being polled to close connection.
540    ///
541    /// It's possible to receive more requests after calling this method, since
542    /// they might have been in-flight from the client already. After about
543    /// 1 RTT, no new requests should be accepted. Once all active streams
544    /// have completed, the connection is closed.
545    ///
546    /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
547    pub fn graceful_shutdown(&mut self) {
548        self.connection.go_away_gracefully();
549    }
550
551    /// Takes a `PingPong` instance from the connection.
552    ///
553    /// # Note
554    ///
555    /// This may only be called once. Calling multiple times will return `None`.
556    pub fn ping_pong(&mut self) -> Option<PingPong> {
557        self.connection.take_user_pings().map(PingPong::new)
558    }
559
560    /// Returns the maximum number of concurrent streams that may be initiated
561    /// by the server on this connection.
562    ///
563    /// This limit is configured by the client peer by sending the
564    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
565    /// This method returns the currently acknowledged value received from the
566    /// remote.
567    ///
568    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
569    pub fn max_concurrent_send_streams(&self) -> usize {
570        self.connection.max_send_streams()
571    }
572
573    /// Returns the maximum number of concurrent streams that may be initiated
574    /// by the client on this connection.
575    ///
576    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
577    /// parameter][1] sent in a `SETTINGS` frame that has been
578    /// acknowledged by the remote peer. The value to be sent is configured by
579    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
580    /// with the remote peer.
581    ///
582    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
583    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
584    pub fn max_concurrent_recv_streams(&self) -> usize {
585        self.connection.max_recv_streams()
586    }
587}
588
589#[cfg(feature = "stream")]
590impl<T, B> futures_core::Stream for Connection<T, B>
591where
592    T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
593    B: Buf + 'static,
594{
595    type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::h2::Error>;
596
597    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
598        self.poll_accept(cx)
599    }
600}
601
602impl<T, B> fmt::Debug for Connection<T, B>
603where
604    T: fmt::Debug,
605    B: fmt::Debug + Buf,
606{
607    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
608        fmt.debug_struct("Connection")
609            .field("connection", &self.connection)
610            .finish()
611    }
612}
613
614// ===== impl Builder =====
615
616impl Builder {
617    /// Returns a new server builder instance initialized with default
618    /// configuration values.
619    ///
620    /// Configuration methods can be chained on the return value.
621    ///
622    /// # Examples
623    ///
624    /// ```
625    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
626    /// # use h2::server::*;
627    /// #
628    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
629    /// # -> Handshake<T>
630    /// # {
631    /// // `server_fut` is a future representing the completion of the HTTP/2
632    /// // handshake.
633    /// let server_fut = Builder::new()
634    ///     .initial_window_size(1_000_000)
635    ///     .max_concurrent_streams(1000)
636    ///     .handshake(my_io);
637    /// # server_fut
638    /// # }
639    /// #
640    /// # pub fn main() {}
641    /// ```
642    pub fn new() -> Builder {
643        Builder {
644            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
645            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
646            settings: Settings::default(),
647            initial_target_connection_window_size: None,
648            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
649        }
650    }
651
652    /// Indicates the initial window size (in octets) for stream-level
653    /// flow control for received data.
654    ///
655    /// The initial window of a stream is used as part of flow control. For more
656    /// details, see [`FlowControl`].
657    ///
658    /// The default value is 65,535.
659    ///
660    /// [`FlowControl`]: ../struct.FlowControl.html
661    ///
662    /// # Examples
663    ///
664    /// ```
665    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
666    /// # use h2::server::*;
667    /// #
668    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
669    /// # -> Handshake<T>
670    /// # {
671    /// // `server_fut` is a future representing the completion of the HTTP/2
672    /// // handshake.
673    /// let server_fut = Builder::new()
674    ///     .initial_window_size(1_000_000)
675    ///     .handshake(my_io);
676    /// # server_fut
677    /// # }
678    /// #
679    /// # pub fn main() {}
680    /// ```
681    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
682        self.settings.set_initial_window_size(Some(size));
683        self
684    }
685
686    /// Indicates the initial window size (in octets) for connection-level flow control
687    /// for received data.
688    ///
689    /// The initial window of a connection is used as part of flow control. For more details,
690    /// see [`FlowControl`].
691    ///
692    /// The default value is 65,535.
693    ///
694    /// [`FlowControl`]: ../struct.FlowControl.html
695    ///
696    /// # Examples
697    ///
698    /// ```
699    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
700    /// # use h2::server::*;
701    /// #
702    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
703    /// # -> Handshake<T>
704    /// # {
705    /// // `server_fut` is a future representing the completion of the HTTP/2
706    /// // handshake.
707    /// let server_fut = Builder::new()
708    ///     .initial_connection_window_size(1_000_000)
709    ///     .handshake(my_io);
710    /// # server_fut
711    /// # }
712    /// #
713    /// # pub fn main() {}
714    /// ```
715    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
716        self.initial_target_connection_window_size = Some(size);
717        self
718    }
719
720    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
721    /// configured server is able to accept.
722    ///
723    /// The sender may send data frames that are **smaller** than this value,
724    /// but any data larger than `max` will be broken up into multiple `DATA`
725    /// frames.
726    ///
727    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
728    ///
729    /// # Examples
730    ///
731    /// ```
732    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
733    /// # use h2::server::*;
734    /// #
735    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
736    /// # -> Handshake<T>
737    /// # {
738    /// // `server_fut` is a future representing the completion of the HTTP/2
739    /// // handshake.
740    /// let server_fut = Builder::new().max_frame_size(1_000_000).handshake(my_io);
741    /// # server_fut
742    /// # }
743    /// #
744    /// # pub fn main() {}
745    /// ```
746    ///
747    /// # Panics
748    ///
749    /// This function panics if `max` is not within the legal range specified
750    /// above.
751    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
752        self.settings.set_max_frame_size(Some(max));
753        self
754    }
755
756    /// Sets the max size of received header frames.
757    ///
758    /// This advisory setting informs a peer of the maximum size of header list
759    /// that the sender is prepared to accept, in octets. The value is based on
760    /// the uncompressed size of header fields, including the length of the name
761    /// and value in octets plus an overhead of 32 octets for each header field.
762    ///
763    /// This setting is also used to limit the maximum amount of data that is
764    /// buffered to decode HEADERS frames.
765    ///
766    /// # Examples
767    ///
768    /// ```
769    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
770    /// # use h2::server::*;
771    /// #
772    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
773    /// # -> Handshake<T>
774    /// # {
775    /// // `server_fut` is a future representing the completion of the HTTP/2
776    /// // handshake.
777    /// let server_fut = Builder::new()
778    ///     .max_header_list_size(16 * 1024)
779    ///     .handshake(my_io);
780    /// # server_fut
781    /// # }
782    /// #
783    /// # pub fn main() {}
784    /// ```
785    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
786        self.settings.set_max_header_list_size(Some(max));
787        self
788    }
789
790    /// Sets the maximum number of concurrent streams.
791    ///
792    /// The maximum concurrent streams setting only controls the maximum number
793    /// of streams that can be initiated by the remote peer. In other words,
794    /// when this setting is set to 100, this does not limit the number of
795    /// concurrent streams that can be created by the caller.
796    ///
797    /// It is recommended that this value be no smaller than 100, so as to not
798    /// unnecessarily limit parallelism. However, any value is legal, including
799    /// 0. If `max` is set to 0, then the remote will not be permitted to
800    /// initiate streams.
801    ///
802    /// Note that streams in the reserved state, i.e., push promises that have
803    /// been reserved but the stream has not started, do not count against this
804    /// setting.
805    ///
806    /// Also note that if the remote *does* exceed the value set here, it is not
807    /// a protocol level error. Instead, the `h2` library will immediately reset
808    /// the stream.
809    ///
810    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
811    ///
812    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
813    ///
814    /// # Examples
815    ///
816    /// ```
817    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
818    /// # use h2::server::*;
819    /// #
820    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
821    /// # -> Handshake<T>
822    /// # {
823    /// // `server_fut` is a future representing the completion of the HTTP/2
824    /// // handshake.
825    /// let server_fut = Builder::new().max_concurrent_streams(1000).handshake(my_io);
826    /// # server_fut
827    /// # }
828    /// #
829    /// # pub fn main() {}
830    /// ```
831    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
832        self.settings.set_max_concurrent_streams(Some(max));
833        self
834    }
835
836    /// Sets the maximum number of concurrent locally reset streams.
837    ///
838    /// When a stream is explicitly reset by either calling
839    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
840    /// before completing the stream, the HTTP/2 specification requires that
841    /// any further frames received for that stream must be ignored for "some
842    /// time".
843    ///
844    /// In order to satisfy the specification, internal state must be maintained
845    /// to implement the behavior. This state grows linearly with the number of
846    /// streams that are locally reset.
847    ///
848    /// The `max_concurrent_reset_streams` setting configures sets an upper
849    /// bound on the amount of state that is maintained. When this max value is
850    /// reached, the oldest reset stream is purged from memory.
851    ///
852    /// Once the stream has been fully purged from memory, any additional frames
853    /// received for that stream will result in a connection level protocol
854    /// error, forcing the connection to terminate.
855    ///
856    /// The default value is 10.
857    ///
858    /// # Examples
859    ///
860    /// ```
861    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
862    /// # use h2::server::*;
863    /// #
864    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
865    /// # -> Handshake<T>
866    /// # {
867    /// // `server_fut` is a future representing the completion of the HTTP/2
868    /// // handshake.
869    /// let server_fut = Builder::new()
870    ///     .max_concurrent_reset_streams(1000)
871    ///     .handshake(my_io);
872    /// # server_fut
873    /// # }
874    /// #
875    /// # pub fn main() {}
876    /// ```
877    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
878        self.reset_stream_max = max;
879        self
880    }
881
882    /// Sets the maximum send buffer size per stream.
883    ///
884    /// Once a stream has buffered up to (or over) the maximum, the stream's
885    /// flow control will not "poll" additional capacity. Once bytes for the
886    /// stream have been written to the connection, the send buffer capacity
887    /// will be freed up again.
888    ///
889    /// The default is currently ~400MB, but may change.
890    ///
891    /// # Panics
892    ///
893    /// This function panics if `max` is larger than `u32::MAX`.
894    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
895        assert!(max <= u32::MAX as usize);
896        self.max_send_buffer_size = max;
897        self
898    }
899
900    /// Sets the maximum number of concurrent locally reset streams.
901    ///
902    /// When a stream is explicitly reset by either calling
903    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
904    /// before completing the stream, the HTTP/2 specification requires that
905    /// any further frames received for that stream must be ignored for "some
906    /// time".
907    ///
908    /// In order to satisfy the specification, internal state must be maintained
909    /// to implement the behavior. This state grows linearly with the number of
910    /// streams that are locally reset.
911    ///
912    /// The `reset_stream_duration` setting configures the max amount of time
913    /// this state will be maintained in memory. Once the duration elapses, the
914    /// stream state is purged from memory.
915    ///
916    /// Once the stream has been fully purged from memory, any additional frames
917    /// received for that stream will result in a connection level protocol
918    /// error, forcing the connection to terminate.
919    ///
920    /// The default value is 30 seconds.
921    ///
922    /// # Examples
923    ///
924    /// ```
925    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
926    /// # use h2::server::*;
927    /// # use std::time::Duration;
928    /// #
929    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
930    /// # -> Handshake<T>
931    /// # {
932    /// // `server_fut` is a future representing the completion of the HTTP/2
933    /// // handshake.
934    /// let server_fut = Builder::new()
935    ///     .reset_stream_duration(Duration::from_secs(10))
936    ///     .handshake(my_io);
937    /// # server_fut
938    /// # }
939    /// #
940    /// # pub fn main() {}
941    /// ```
942    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
943        self.reset_stream_duration = dur;
944        self
945    }
946
947    /// Enables the [extended CONNECT protocol].
948    ///
949    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
950    pub fn enable_connect_protocol(&mut self) -> &mut Self {
951        self.settings.set_enable_connect_protocol(Some(1));
952        self
953    }
954
955    /// Creates a new configured HTTP/2 server backed by `io`.
956    ///
957    /// It is expected that `io` already be in an appropriate state to commence
958    /// the [HTTP/2 handshake]. See [Handshake] for more details.
959    ///
960    /// Returns a future which resolves to the [`Connection`] instance once the
961    /// HTTP/2 handshake has been completed.
962    ///
963    /// This function also allows the caller to configure the send payload data
964    /// type. See [Outbound data type] for more details.
965    ///
966    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
967    /// [Handshake]: ../index.html#handshake
968    /// [`Connection`]: struct.Connection.html
969    /// [Outbound data type]: ../index.html#outbound-data-type.
970    ///
971    /// # Examples
972    ///
973    /// Basic usage:
974    ///
975    /// ```
976    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
977    /// # use h2::server::*;
978    /// #
979    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
980    /// # -> Handshake<T>
981    /// # {
982    /// // `server_fut` is a future representing the completion of the HTTP/2
983    /// // handshake.
984    /// let server_fut = Builder::new().handshake(my_io);
985    /// # server_fut
986    /// # }
987    /// #
988    /// # pub fn main() {}
989    /// ```
990    ///
991    /// Configures the send-payload data type. In this case, the outbound data
992    /// type will be `&'static [u8]`.
993    ///
994    /// ```
995    /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
996    /// # use h2::server::*;
997    /// #
998    /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
999    /// # -> Handshake<T, &'static [u8]>
1000    /// # {
1001    /// // `server_fut` is a future representing the completion of the HTTP/2
1002    /// // handshake.
1003    /// let server_fut: Handshake<_, &'static [u8]> = Builder::new().handshake(my_io);
1004    /// # server_fut
1005    /// # }
1006    /// #
1007    /// # pub fn main() {}
1008    /// ```
1009    pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1010    where
1011        T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1012        B: Buf + 'static,
1013    {
1014        Connection::handshake2(io, self.clone())
1015    }
1016}
1017
1018impl Default for Builder {
1019    fn default() -> Builder {
1020        Builder::new()
1021    }
1022}
1023
1024// ===== impl SendResponse =====
1025
1026impl<B: Buf> SendResponse<B> {
1027    /// Send a response to a client request.
1028    ///
1029    /// On success, a [`SendStream`] instance is returned. This instance can be
1030    /// used to stream the response body and send trailers.
1031    ///
1032    /// If a body or trailers will be sent on the returned [`SendStream`]
1033    /// instance, then `end_of_stream` must be set to `false` when calling this
1034    /// function.
1035    ///
1036    /// The [`SendResponse`] instance is already associated with a received
1037    /// request.  This function may only be called once per instance and only if
1038    /// [`send_reset`] has not been previously called.
1039    ///
1040    /// [`SendResponse`]: #
1041    /// [`SendStream`]: ../struct.SendStream.html
1042    /// [`send_reset`]: #method.send_reset
1043    pub fn send_response(
1044        &mut self,
1045        response: Response<()>,
1046        end_of_stream: bool,
1047    ) -> Result<SendStream<B>, crate::h2::Error> {
1048        self.inner
1049            .send_response(response, end_of_stream)
1050            .map(|_| SendStream::new(self.inner.clone()))
1051            .map_err(Into::into)
1052    }
1053
1054    /// Push a request and response to the client
1055    ///
1056    /// On success, a [`SendResponse`] instance is returned.
1057    ///
1058    /// [`SendResponse`]: #
1059    pub fn push_request(
1060        &mut self,
1061        request: Request<()>,
1062    ) -> Result<SendPushedResponse<B>, crate::h2::Error> {
1063        self.inner
1064            .send_push_promise(request)
1065            .map(|inner| SendPushedResponse {
1066                inner: SendResponse { inner },
1067            })
1068            .map_err(Into::into)
1069    }
1070
1071    /// Send a stream reset to the peer.
1072    ///
1073    /// This essentially cancels the stream, including any inbound or outbound
1074    /// data streams.
1075    ///
1076    /// If this function is called before [`send_response`], a call to
1077    /// [`send_response`] will result in an error.
1078    ///
1079    /// If this function is called while a [`SendStream`] instance is active,
1080    /// any further use of the instance will result in an error.
1081    ///
1082    /// This function should only be called once.
1083    ///
1084    /// [`send_response`]: #method.send_response
1085    /// [`SendStream`]: ../struct.SendStream.html
1086    pub fn send_reset(&mut self, reason: Reason) {
1087        self.inner.send_reset(reason)
1088    }
1089
1090    /// Polls to be notified when the client resets this stream.
1091    ///
1092    /// If stream is still open, this returns `Poll::Pending`, and
1093    /// registers the task to be notified if a `RST_STREAM` is received.
1094    ///
1095    /// If a `RST_STREAM` frame is received for this stream, calling this
1096    /// method will yield the `Reason` for the reset.
1097    ///
1098    /// # Error
1099    ///
1100    /// Calling this method after having called `send_response` will return
1101    /// a user error.
1102    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::h2::Error>> {
1103        self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1104    }
1105
1106    /// Returns the stream ID of the response stream.
1107    ///
1108    /// # Panics
1109    ///
1110    /// If the lock on the stream store has been poisoned.
1111    pub fn stream_id(&self) -> crate::h2::StreamId {
1112        crate::h2::StreamId::from_internal(self.inner.stream_id())
1113    }
1114}
1115
1116// ===== impl SendPushedResponse =====
1117
1118impl<B: Buf> SendPushedResponse<B> {
1119    /// Send a response to a promised request.
1120    ///
1121    /// On success, a [`SendStream`] instance is returned. This instance can be
1122    /// used to stream the response body and send trailers.
1123    ///
1124    /// If a body or trailers will be sent on the returned [`SendStream`]
1125    /// instance, then `end_of_stream` must be set to `false` when calling this
1126    /// function.
1127    ///
1128    /// The [`SendPushedResponse`] instance is associated with a promised
1129    /// request.  This function may only be called once per instance and only if
1130    /// [`send_reset`] has not been previously called.
1131    ///
1132    /// [`SendPushedResponse`]: #
1133    /// [`SendStream`]: ../struct.SendStream.html
1134    /// [`send_reset`]: #method.send_reset
1135    pub fn send_response(
1136        &mut self,
1137        response: Response<()>,
1138        end_of_stream: bool,
1139    ) -> Result<SendStream<B>, crate::h2::Error> {
1140        self.inner.send_response(response, end_of_stream)
1141    }
1142
1143    /// Send a stream reset to the peer.
1144    ///
1145    /// This essentially cancels the stream, including any inbound or outbound
1146    /// data streams.
1147    ///
1148    /// If this function is called before [`send_response`], a call to
1149    /// [`send_response`] will result in an error.
1150    ///
1151    /// If this function is called while a [`SendStream`] instance is active,
1152    /// any further use of the instance will result in an error.
1153    ///
1154    /// This function should only be called once.
1155    ///
1156    /// [`send_response`]: #method.send_response
1157    /// [`SendStream`]: ../struct.SendStream.html
1158    pub fn send_reset(&mut self, reason: Reason) {
1159        self.inner.send_reset(reason)
1160    }
1161
1162    /// Polls to be notified when the client resets this stream.
1163    ///
1164    /// If stream is still open, this returns `Poll::Pending`, and
1165    /// registers the task to be notified if a `RST_STREAM` is received.
1166    ///
1167    /// If a `RST_STREAM` frame is received for this stream, calling this
1168    /// method will yield the `Reason` for the reset.
1169    ///
1170    /// # Error
1171    ///
1172    /// Calling this method after having called `send_response` will return
1173    /// a user error.
1174    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::h2::Error>> {
1175        self.inner.poll_reset(cx)
1176    }
1177
1178    /// Returns the stream ID of the response stream.
1179    ///
1180    /// # Panics
1181    ///
1182    /// If the lock on the stream store has been poisoned.
1183    pub fn stream_id(&self) -> crate::h2::StreamId {
1184        self.inner.stream_id()
1185    }
1186}
1187
1188// ===== impl Flush =====
1189
1190impl<T, B: Buf> Flush<T, B> {
1191    fn new(codec: Codec<T, B>) -> Self {
1192        Flush { codec: Some(codec) }
1193    }
1194}
1195
1196impl<T, B> Future for Flush<T, B>
1197where
1198    T: AsyncWriteRent + Unpin + 'static,
1199    B: Buf,
1200{
1201    type Output = Result<Codec<T, B>, crate::h2::Error>;
1202
1203    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1204        // Flush the codec
1205        ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::h2::Error::from_io)?;
1206
1207        // Return the codec
1208        Poll::Ready(Ok(self.codec.take().unwrap()))
1209    }
1210}
1211
1212impl<T, B: Buf> ReadPreface<T, B> {
1213    fn new(codec: Codec<T, B>) -> Self {
1214        ReadPreface {
1215            codec: Some(UnsafeCell::new(codec)),
1216            read_fut: Default::default(),
1217        }
1218    }
1219}
1220
1221impl<T, B> Future for ReadPreface<T, B>
1222where
1223    T: AsyncReadRent + Unpin + 'static,
1224    B: Buf + 'static,
1225{
1226    type Output = Result<Codec<T, B>, crate::h2::Error>;
1227
1228    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1229        let buf: [u8; PREFACE.len()] = [0; PREFACE.len()];
1230
1231        let owned_buf = BytesMut::from(&buf[..]);
1232
1233        if !self.read_fut.armed() {
1234            let io = unsafe { &mut *(self.codec.as_mut().unwrap().get()) };
1235            let io = io.get_mut();
1236            self.read_fut.arm_future(io.read_exact(owned_buf));
1237        }
1238
1239        {
1240            let (result, ret_buf) = ready!(self.read_fut.poll(cx));
1241            let n = result.map_err(crate::h2::Error::from_io)?;
1242
1243            if n != PREFACE.len() {
1244                println!("!!!!!!!!!!! ReadPreface failed");
1245                return Poll::Ready(Err(crate::h2::Error::from_io(io::Error::new(
1246                    io::ErrorKind::UnexpectedEof,
1247                    "connection closed before reading preface",
1248                ))));
1249            }
1250
1251            if PREFACE[..] != ret_buf[..] {
1252                println!("!!!!!!!!!!! ReadPreface failed");
1253                proto_err!(conn: "read_preface: invalid preface");
1254                // TODO: Should this just write the GO_AWAY frame directly?
1255                return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1256            }
1257        }
1258
1259        let codec = self.codec.take().unwrap();
1260
1261        Poll::Ready(Ok(codec.into_inner()))
1262    }
1263}
1264
1265// ===== impl Handshake =====
1266
1267impl<T, B: Buf> Future for Handshake<T, B>
1268where
1269    T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1270    B: Buf + 'static,
1271{
1272    type Output = Result<Connection<T, B>, crate::h2::Error>;
1273
1274    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1275        let span = self.span.clone(); // XXX(eliza): T_T
1276        let _e = span.enter();
1277        tracing::trace!(state = ?self.state);
1278        tracing::trace!("Poll Handshake");
1279
1280        loop {
1281            match &mut self.state {
1282                Handshaking::Flushing(flush) => {
1283                    // We're currently flushing a pending SETTINGS frame. Poll the
1284                    // flush future, and, if it's completed, advance our state to wait
1285                    // for the client preface.
1286
1287                    let codec = match Pin::new(flush).poll(cx)? {
1288                        Poll::Pending => {
1289                            tracing::trace!(flush.poll = %"Pending");
1290                            return Poll::Pending;
1291                        }
1292                        Poll::Ready(flushed) => {
1293                            tracing::trace!(flush.poll = %"Ready");
1294                            flushed
1295                        }
1296                    };
1297                    self.state = Handshaking::ReadingPreface(
1298                        ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1299                    );
1300                }
1301                Handshaking::ReadingPreface(read) => {
1302                    let codec = ready!(Pin::new(read).poll(cx)?);
1303
1304                    self.state = Handshaking::Done;
1305
1306                    let connection = proto::Connection::new(
1307                        codec,
1308                        Config {
1309                            next_stream_id: 2.into(),
1310                            // Server does not need to locally initiate any streams
1311                            initial_max_send_streams: 0,
1312                            max_send_buffer_size: self.builder.max_send_buffer_size,
1313                            reset_stream_duration: self.builder.reset_stream_duration,
1314                            reset_stream_max: self.builder.reset_stream_max,
1315                            settings: self.builder.settings.clone(),
1316                        },
1317                    );
1318
1319                    tracing::trace!("connection established!");
1320                    let mut c = Connection { connection };
1321                    if let Some(sz) = self.builder.initial_target_connection_window_size {
1322                        c.set_target_window_size(sz);
1323                    }
1324
1325                    return Poll::Ready(Ok(c));
1326                }
1327                Handshaking::Done => {
1328                    panic!("Handshaking::poll() called again after handshaking was complete")
1329                }
1330            }
1331        }
1332    }
1333}
1334
1335impl<T, B> fmt::Debug for Handshake<T, B>
1336where
1337    T: AsyncReadRent + AsyncWriteRent + fmt::Debug,
1338    B: fmt::Debug + Buf,
1339{
1340    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1341        write!(fmt, "server::Handshake")
1342    }
1343}
1344
1345impl Peer {
1346    pub fn convert_send_message(
1347        id: StreamId,
1348        response: Response<()>,
1349        end_of_stream: bool,
1350    ) -> frame::Headers {
1351        use http::response::Parts;
1352
1353        // Extract the components of the HTTP request
1354        let (
1355            Parts {
1356                status, headers, ..
1357            },
1358            _,
1359        ) = response.into_parts();
1360
1361        // Build the set pseudo header set. All requests will include `method`
1362        // and `path`.
1363        let pseudo = Pseudo::response(status);
1364
1365        // Create the HEADERS frame
1366        let mut frame = frame::Headers::new(id, pseudo, headers);
1367
1368        if end_of_stream {
1369            frame.set_end_stream()
1370        }
1371
1372        frame
1373    }
1374
1375    pub fn convert_push_message(
1376        stream_id: StreamId,
1377        promised_id: StreamId,
1378        request: Request<()>,
1379    ) -> Result<frame::PushPromise, UserError> {
1380        use http::request::Parts;
1381
1382        if let Err(e) = frame::PushPromise::validate_request(&request) {
1383            use PushPromiseHeaderError::*;
1384            match e {
1385                NotSafeAndCacheable => tracing::debug!(
1386                    ?promised_id,
1387                    "convert_push_message: method {} is not safe and cacheable",
1388                    request.method(),
1389                ),
1390                InvalidContentLength(e) => tracing::debug!(
1391                    ?promised_id,
1392                    "convert_push_message; promised request has invalid content-length {:?}",
1393                    e,
1394                ),
1395            }
1396            return Err(UserError::MalformedHeaders);
1397        }
1398
1399        // Extract the components of the HTTP request
1400        let (
1401            Parts {
1402                method,
1403                uri,
1404                headers,
1405                ..
1406            },
1407            _,
1408        ) = request.into_parts();
1409
1410        let pseudo = Pseudo::request(method, uri, None);
1411
1412        Ok(frame::PushPromise::new(
1413            stream_id,
1414            promised_id,
1415            pseudo,
1416            headers,
1417        ))
1418    }
1419}
1420
1421impl proto::Peer for Peer {
1422    type Poll = Request<()>;
1423
1424    const NAME: &'static str = "Server";
1425
1426    fn is_server() -> bool {
1427        true
1428    }
1429
1430    fn r#dyn() -> proto::DynPeer {
1431        proto::DynPeer::Server
1432    }
1433
1434    fn convert_poll_message(
1435        pseudo: Pseudo,
1436        fields: HeaderMap,
1437        stream_id: StreamId,
1438    ) -> Result<Self::Poll, Error> {
1439        use http::{uri, Version};
1440
1441        let mut b = Request::builder();
1442
1443        macro_rules! malformed {
1444            ($($arg:tt)*) => {{
1445                tracing::debug!($($arg)*);
1446                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1447            }}
1448        }
1449
1450        b = b.version(Version::HTTP_2);
1451
1452        let is_connect;
1453        if let Some(method) = pseudo.method {
1454            is_connect = method == Method::CONNECT;
1455            b = b.method(method);
1456        } else {
1457            malformed!("malformed headers: missing method");
1458        }
1459
1460        let has_protocol = pseudo.protocol.is_some();
1461        if has_protocol {
1462            if is_connect {
1463                // Assert that we have the right type.
1464                b = b.extension::<crate::h2::ext::Protocol>(pseudo.protocol.unwrap());
1465            } else {
1466                malformed!("malformed headers: :protocol on non-CONNECT request");
1467            }
1468        }
1469
1470        if pseudo.status.is_some() {
1471            malformed!("malformed headers: :status field on request");
1472        }
1473
1474        // Convert the URI
1475        let mut parts = uri::Parts::default();
1476
1477        // A request translated from HTTP/1 must not include the :authority
1478        // header
1479        if let Some(authority) = pseudo.authority {
1480            let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1481            parts.authority = Some(maybe_authority.or_else(|why| {
1482                malformed!(
1483                    "malformed headers: malformed authority ({:?}): {}",
1484                    authority,
1485                    why,
1486                )
1487            })?);
1488        }
1489
1490        // A :scheme is required, except CONNECT.
1491        if let Some(scheme) = pseudo.scheme {
1492            if is_connect && !has_protocol {
1493                malformed!("malformed headers: :scheme in CONNECT");
1494            }
1495            let maybe_scheme = scheme.parse();
1496            let scheme = maybe_scheme.or_else(|why| {
1497                malformed!(
1498                    "malformed headers: malformed scheme ({:?}): {}",
1499                    scheme,
1500                    why,
1501                )
1502            })?;
1503
1504            // It's not possible to build an `Uri` from a scheme and path. So,
1505            // after validating is was a valid scheme, we just have to drop it
1506            // if there isn't an :authority.
1507            if parts.authority.is_some() {
1508                parts.scheme = Some(scheme);
1509            }
1510        } else if !is_connect || has_protocol {
1511            malformed!("malformed headers: missing scheme");
1512        }
1513
1514        if let Some(path) = pseudo.path {
1515            if is_connect && !has_protocol {
1516                malformed!("malformed headers: :path in CONNECT");
1517            }
1518
1519            // This cannot be empty
1520            if path.is_empty() {
1521                malformed!("malformed headers: missing path");
1522            }
1523
1524            let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1525            parts.path_and_query = Some(maybe_path.or_else(|why| {
1526                malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1527            })?);
1528        } else if is_connect && has_protocol {
1529            malformed!("malformed headers: missing path in extended CONNECT");
1530        }
1531
1532        b = b.uri(parts);
1533
1534        let mut request = match b.body(()) {
1535            Ok(request) => request,
1536            Err(e) => {
1537                // TODO: Should there be more specialized handling for different
1538                // kinds of errors
1539                proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1540                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1541            }
1542        };
1543
1544        *request.headers_mut() = fields;
1545
1546        Ok(request)
1547    }
1548}
1549
1550// ===== impl Handshaking =====
1551
1552impl<T, B> fmt::Debug for Handshaking<T, B>
1553where
1554    B: Buf,
1555{
1556    #[inline]
1557    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1558        match *self {
1559            Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1560            Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1561            Handshaking::Done => f.write_str("Done"),
1562        }
1563    }
1564}