rama_http_core/h2/
server.rs

1//! Server implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/h2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use rama_http_core::h2::server;
68//! use rama_http_types::{Response, StatusCode};
69//! use tokio::net::TcpListener;
70//!
71//! #[tokio::main]
72//! pub async fn main() {
73//!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//!     // Accept all incoming TCP connections.
76//!     loop {
77//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78//!             // Spawn a new task to process each connection.
79//!             tokio::spawn(async {
80//!                 // Start the HTTP/2 connection handshake
81//!                 let mut h2 = server::handshake(socket).await.unwrap();
82//!                 // Accept all inbound HTTP/2 streams sent over the
83//!                 // connection.
84//!                 while let Some(request) = h2.accept().await {
85//!                     let (request, mut respond) = request.unwrap();
86//!                     println!("Received request: {:?}", request);
87//!
88//!                     // Build a response with no body
89//!                     let response = Response::builder()
90//!                         .status(StatusCode::OK)
91//!                         .body(())
92//!                         .unwrap();
93//!
94//!                     // Send the response back to the client
95//!                     respond.send_response(response, true)
96//!                         .unwrap();
97//!                 }
98//!
99//!             });
100//!         }
101//!     }
102//! }
103//! ```
104//!
105//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106//! [`handshake`]: fn.handshake.html
107//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108//! [`Builder`]: struct.Builder.html
109//! [`Connection`]: struct.Connection.html
110//! [`Connection::poll`]: struct.Connection.html#method.poll
111//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114//! [`RecvStream`]: ../struct.RecvStream.html
115//! [`SendStream`]: ../struct.SendStream.html
116//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118use crate::h2::codec::{Codec, UserError};
119use crate::h2::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120use crate::h2::proto::{self, Config, Error, Prioritized};
121use crate::h2::{FlowControl, PingPong, RecvStream, SendStream};
122
123use bytes::{Buf, Bytes};
124use rama_http_types::proto::h1::headers::original::OriginalHttp1Headers;
125use rama_http_types::proto::h2::PseudoHeaderOrder;
126use rama_http_types::{HeaderMap, Method, Request, Response};
127use std::pin::Pin;
128use std::task::{Context, Poll};
129use std::time::Duration;
130use std::{fmt, io};
131use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
132use tracing::instrument::{Instrument, Instrumented};
133
134/// In progress HTTP/2 connection handshake future.
135///
136/// This type implements `Future`, yielding a `Connection` instance once the
137/// handshake has completed.
138///
139/// The handshake is completed once the connection preface is fully received
140/// from the client **and** the initial settings frame is sent to the client.
141///
142/// The handshake future does not wait for the initial settings frame from the
143/// client.
144///
145/// See [module] level docs for more details.
146///
147/// [module]: index.html
148#[must_use = "futures do nothing unless polled"]
149pub struct Handshake<T, B: Buf = Bytes> {
150    /// The config to pass to Connection::new after handshake succeeds.
151    builder: Builder,
152    /// The current state of the handshake.
153    state: Handshaking<T, B>,
154    /// Span tracking the handshake
155    span: tracing::Span,
156}
157
158/// Accepts inbound HTTP/2 streams on a connection.
159///
160/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
161/// implements the HTTP/2 server logic for that connection. It is responsible
162/// for receiving inbound streams initiated by the client as well as driving the
163/// internal state forward.
164///
165/// `Connection` values are created by calling [`handshake`]. Once a
166/// `Connection` value is obtained, the caller must call [`poll`] or
167/// [`poll_close`] in order to drive the internal connection state forward.
168///
169/// See [module level] documentation for more details
170///
171/// [module level]: index.html
172/// [`handshake`]: struct.Connection.html#method.handshake
173/// [`poll`]: struct.Connection.html#method.poll
174/// [`poll_close`]: struct.Connection.html#method.poll_close
175///
176/// # Examples
177///
178/// ```
179/// # use tokio::io::{AsyncRead, AsyncWrite};
180/// # use rama_http_core::h2::server;
181/// # use rama_http_core::h2::server::*;
182/// #
183/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
184/// let mut server = server::handshake(my_io).await.unwrap();
185/// while let Some(request) = server.accept().await {
186///     tokio::spawn(async move {
187///         let (request, respond) = request.unwrap();
188///         // Process the request and send the response back to the client
189///         // using `respond`.
190///     });
191/// }
192/// # }
193/// #
194/// # pub fn main() {}
195/// ```
196#[must_use = "streams do nothing unless polled"]
197pub struct Connection<T, B: Buf> {
198    connection: proto::Connection<T, Peer, B>,
199}
200
201/// Builds server connections with custom configuration values.
202///
203/// Methods can be chained in order to set the configuration values.
204///
205/// The server is constructed by calling [`handshake`] and passing the I/O
206/// handle that will back the HTTP/2 server.
207///
208/// New instances of `Builder` are obtained via [`Builder::new`].
209///
210/// See function level documentation for details on the various server
211/// configuration settings.
212///
213/// [`Builder::new`]: struct.Builder.html#method.new
214/// [`handshake`]: struct.Builder.html#method.handshake
215///
216/// # Examples
217///
218/// ```
219/// # use tokio::io::{AsyncRead, AsyncWrite};
220/// # use rama_http_core::h2::server::*;
221/// #
222/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
223/// # -> Handshake<T>
224/// # {
225/// // `server_fut` is a future representing the completion of the HTTP/2
226/// // handshake.
227/// let server_fut = Builder::new()
228///     .initial_window_size(1_000_000)
229///     .max_concurrent_streams(1000)
230///     .handshake(my_io);
231/// # server_fut
232/// # }
233/// #
234/// # pub fn main() {}
235/// ```
236#[derive(Clone, Debug)]
237pub struct Builder {
238    /// Time to keep locally reset streams around before reaping.
239    reset_stream_duration: Duration,
240
241    /// Maximum number of locally reset streams to keep at a time.
242    reset_stream_max: usize,
243
244    /// Maximum number of remotely reset streams to allow in the pending
245    /// accept queue.
246    pending_accept_reset_stream_max: usize,
247
248    /// Initial `Settings` frame to send as part of the handshake.
249    settings: Settings,
250
251    /// Initial target window size for new connections.
252    initial_target_connection_window_size: Option<u32>,
253
254    /// Maximum amount of bytes to "buffer" for writing per stream.
255    max_send_buffer_size: usize,
256
257    /// Maximum number of locally reset streams due to protocol error across
258    /// the lifetime of the connection.
259    ///
260    /// When this gets exceeded, we issue GOAWAYs.
261    local_max_error_reset_streams: Option<usize>,
262}
263
264/// Send a response back to the client
265///
266/// A `SendResponse` instance is provided when receiving a request and is used
267/// to send the associated response back to the client. It is also used to
268/// explicitly reset the stream with a custom reason.
269///
270/// It will also be used to initiate push promises linked with the associated
271/// stream.
272///
273/// If the `SendResponse` instance is dropped without sending a response, then
274/// the HTTP/2 stream will be reset.
275///
276/// See [module] level docs for more details.
277///
278/// [module]: index.html
279#[derive(Debug)]
280pub struct SendResponse<B: Buf> {
281    inner: proto::StreamRef<B>,
282}
283
284/// Send a response to a promised request
285///
286/// A `SendPushedResponse` instance is provided when promising a request and is used
287/// to send the associated response to the client. It is also used to
288/// explicitly reset the stream with a custom reason.
289///
290/// It can not be used to initiate push promises.
291///
292/// If the `SendPushedResponse` instance is dropped without sending a response, then
293/// the HTTP/2 stream will be reset.
294///
295/// See [module] level docs for more details.
296///
297/// [module]: index.html
298pub struct SendPushedResponse<B: Buf> {
299    inner: SendResponse<B>,
300}
301
302// Manual implementation necessary because of rust-lang/rust#26925
303impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
304    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
305        write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
306    }
307}
308
309/// Stages of an in-progress handshake.
310enum Handshaking<T, B: Buf> {
311    /// State 1. Connection is flushing pending SETTINGS frame.
312    Flushing(Instrumented<Flush<T, Prioritized<B>>>),
313    /// State 2. Connection is waiting for the client preface.
314    ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
315    /// State 3. Handshake is done, polling again would panic.
316    Done,
317}
318
319/// Flush a Sink
320struct Flush<T, B> {
321    codec: Option<Codec<T, B>>,
322}
323
324/// Read the client connection preface
325struct ReadPreface<T, B> {
326    codec: Option<Codec<T, B>>,
327    pos: usize,
328}
329
330#[derive(Debug)]
331pub(crate) struct Peer;
332
333const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
334
335/// Creates a new configured HTTP/2 server with default configuration
336/// values backed by `io`.
337///
338/// It is expected that `io` already be in an appropriate state to commence
339/// the [HTTP/2 handshake]. See [Handshake] for more details.
340///
341/// Returns a future which resolves to the [`Connection`] instance once the
342/// HTTP/2 handshake has been completed. The returned [`Connection`]
343/// instance will be using default configuration values. Use [`Builder`] to
344/// customize the configuration values used by a [`Connection`] instance.
345///
346/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
347/// [Handshake]: ../index.html#handshake
348/// [`Connection`]: struct.Connection.html
349///
350/// # Examples
351///
352/// ```
353/// # use tokio::io::{AsyncRead, AsyncWrite};
354/// # use rama_http_core::h2::server;
355/// # use rama_http_core::h2::server::*;
356/// #
357/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
358/// # {
359/// let connection = server::handshake(my_io).await.unwrap();
360/// // The HTTP/2 handshake has completed, now use `connection` to
361/// // accept inbound HTTP/2 streams.
362/// # }
363/// #
364/// # pub fn main() {}
365/// ```
366pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
367where
368    T: AsyncRead + AsyncWrite + Unpin,
369{
370    Builder::new().handshake(io)
371}
372
373// ===== impl Connection =====
374
375impl<T, B> Connection<T, B>
376where
377    T: AsyncRead + AsyncWrite + Unpin,
378    B: Buf,
379{
380    fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
381        let span = tracing::trace_span!("server_handshake");
382        let entered = span.enter();
383
384        // Create the codec.
385        let mut codec = Codec::new(io);
386
387        if let Some(max) = builder.settings.max_frame_size() {
388            codec.set_max_recv_frame_size(max as usize);
389        }
390
391        if let Some(max) = builder.settings.max_header_list_size() {
392            codec.set_max_recv_header_list_size(max as usize);
393        }
394
395        // Send initial settings frame.
396        codec
397            .buffer(builder.settings.clone().into())
398            .expect("invalid SETTINGS frame");
399
400        // Create the handshake future.
401        let state =
402            Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
403
404        drop(entered);
405
406        Handshake {
407            builder,
408            state,
409            span,
410        }
411    }
412
413    /// Accept the next incoming request on this connection.
414    pub async fn accept(
415        &mut self,
416    ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::h2::Error>> {
417        crate::h2::poll_fn(move |cx| self.poll_accept(cx)).await
418    }
419
420    #[doc(hidden)]
421    pub fn poll_accept(
422        &mut self,
423        cx: &mut Context<'_>,
424    ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::h2::Error>>> {
425        // Always try to advance the internal state. Getting Pending also is
426        // needed to allow this function to return Pending.
427        if self.poll_closed(cx)?.is_ready() {
428            // If the socket is closed, don't return anything
429            // TODO: drop any pending streams
430            return Poll::Ready(None);
431        }
432
433        if let Some(inner) = self.connection.next_incoming() {
434            tracing::trace!("received incoming");
435            let (head, _) = inner.take_request().into_parts();
436            let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
437
438            let request = Request::from_parts(head, body);
439            let respond = SendResponse { inner };
440
441            return Poll::Ready(Some(Ok((request, respond))));
442        }
443
444        Poll::Pending
445    }
446
447    /// Sets the target window size for the whole connection.
448    ///
449    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
450    /// frame will be immediately sent to the remote, increasing the connection
451    /// level window by `size - current_value`.
452    ///
453    /// If `size` is less than the current value, nothing will happen
454    /// immediately. However, as window capacity is released by
455    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
456    /// out until the number of "in flight" bytes drops below `size`.
457    ///
458    /// The default value is 65,535.
459    ///
460    /// See [`FlowControl`] documentation for more details.
461    ///
462    /// [`FlowControl`]: ../struct.FlowControl.html
463    /// [library level]: ../index.html#flow-control
464    pub fn set_target_window_size(&mut self, size: u32) {
465        assert!(size <= proto::MAX_WINDOW_SIZE);
466        self.connection.set_target_window_size(size);
467    }
468
469    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
470    /// flow control for received data.
471    ///
472    /// The `SETTINGS` will be sent to the remote, and only applied once the
473    /// remote acknowledges the change.
474    ///
475    /// This can be used to increase or decrease the window size for existing
476    /// streams.
477    ///
478    /// # Errors
479    ///
480    /// Returns an error if a previous call is still pending acknowledgement
481    /// from the remote endpoint.
482    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::h2::Error> {
483        assert!(size <= proto::MAX_WINDOW_SIZE);
484        self.connection.set_initial_window_size(size)?;
485        Ok(())
486    }
487
488    /// Enables the [extended CONNECT protocol].
489    ///
490    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
491    ///
492    /// # Errors
493    ///
494    /// Returns an error if a previous call is still pending acknowledgement
495    /// from the remote endpoint.
496    pub fn enable_connect_protocol(&mut self) -> Result<(), crate::h2::Error> {
497        self.connection.set_enable_connect_protocol()?;
498        Ok(())
499    }
500
501    /// Returns `Ready` when the underlying connection has closed.
502    ///
503    /// If any new inbound streams are received during a call to `poll_closed`,
504    /// they will be queued and returned on the next call to [`poll_accept`].
505    ///
506    /// This function will advance the internal connection state, driving
507    /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
508    ///
509    /// See [here](index.html#managing-the-connection) for more details.
510    ///
511    /// [`poll_accept`]: struct.Connection.html#method.poll_accept
512    /// [`RecvStream`]: ../struct.RecvStream.html
513    /// [`SendStream`]: ../struct.SendStream.html
514    pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::h2::Error>> {
515        self.connection.poll(cx).map_err(Into::into)
516    }
517
518    /// Sets the connection to a GOAWAY state.
519    ///
520    /// Does not terminate the connection. Must continue being polled to close
521    /// connection.
522    ///
523    /// After flushing the GOAWAY frame, the connection is closed. Any
524    /// outstanding streams do not prevent the connection from closing. This
525    /// should usually be reserved for shutting down when something bad
526    /// external to `h2` has happened, and open streams cannot be properly
527    /// handled.
528    ///
529    /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
530    pub fn abrupt_shutdown(&mut self, reason: Reason) {
531        self.connection.go_away_from_user(reason);
532    }
533
534    /// Starts a [graceful shutdown][1] process.
535    ///
536    /// Must continue being polled to close connection.
537    ///
538    /// It's possible to receive more requests after calling this method, since
539    /// they might have been in-flight from the client already. After about
540    /// 1 RTT, no new requests should be accepted. Once all active streams
541    /// have completed, the connection is closed.
542    ///
543    /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
544    pub fn graceful_shutdown(&mut self) {
545        self.connection.go_away_gracefully();
546    }
547
548    /// Takes a `PingPong` instance from the connection.
549    ///
550    /// # Note
551    ///
552    /// This may only be called once. Calling multiple times will return `None`.
553    pub fn ping_pong(&mut self) -> Option<PingPong> {
554        self.connection.take_user_pings().map(PingPong::new)
555    }
556
557    /// Checks if there are any streams
558    pub fn has_streams(&self) -> bool {
559        self.connection.has_streams()
560    }
561
562    /// Returns the maximum number of concurrent streams that may be initiated
563    /// by the server on this connection.
564    ///
565    /// This limit is configured by the client peer by sending the
566    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
567    /// This method returns the currently acknowledged value received from the
568    /// remote.
569    ///
570    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
571    pub fn max_concurrent_send_streams(&self) -> usize {
572        self.connection.max_send_streams()
573    }
574
575    /// Returns the maximum number of concurrent streams that may be initiated
576    /// by the client on this connection.
577    ///
578    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
579    /// parameter][1] sent in a `SETTINGS` frame that has been
580    /// acknowledged by the remote peer. The value to be sent is configured by
581    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
582    /// with the remote peer.
583    ///
584    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
585    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
586    pub fn max_concurrent_recv_streams(&self) -> usize {
587        self.connection.max_recv_streams()
588    }
589
590    // Could disappear at anytime.
591    #[doc(hidden)]
592    #[cfg(feature = "unstable")]
593    pub fn num_wired_streams(&self) -> usize {
594        self.connection.num_wired_streams()
595    }
596}
597
598impl<T, B> futures_core::Stream for Connection<T, B>
599where
600    T: AsyncRead + AsyncWrite + Unpin,
601    B: Buf,
602{
603    type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::h2::Error>;
604
605    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
606        self.poll_accept(cx)
607    }
608}
609
610impl<T, B> fmt::Debug for Connection<T, B>
611where
612    T: fmt::Debug,
613    B: fmt::Debug + Buf,
614{
615    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
616        fmt.debug_struct("Connection")
617            .field("connection", &self.connection)
618            .finish()
619    }
620}
621
622// ===== impl Builder =====
623
624impl Builder {
625    /// Returns a new server builder instance initialized with default
626    /// configuration values.
627    ///
628    /// Configuration methods can be chained on the return value.
629    ///
630    /// # Examples
631    ///
632    /// ```
633    /// # use tokio::io::{AsyncRead, AsyncWrite};
634    /// # use rama_http_core::h2::server::*;
635    /// #
636    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
637    /// # -> Handshake<T>
638    /// # {
639    /// // `server_fut` is a future representing the completion of the HTTP/2
640    /// // handshake.
641    /// let server_fut = Builder::new()
642    ///     .initial_window_size(1_000_000)
643    ///     .max_concurrent_streams(1000)
644    ///     .handshake(my_io);
645    /// # server_fut
646    /// # }
647    /// #
648    /// # pub fn main() {}
649    /// ```
650    pub fn new() -> Builder {
651        Builder {
652            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
653            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
654            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
655            settings: Settings::default(),
656            initial_target_connection_window_size: None,
657            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
658
659            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
660        }
661    }
662
663    /// Indicates the initial window size (in octets) for stream-level
664    /// flow control for received data.
665    ///
666    /// The initial window of a stream is used as part of flow control. For more
667    /// details, see [`FlowControl`].
668    ///
669    /// The default value is 65,535.
670    ///
671    /// [`FlowControl`]: ../struct.FlowControl.html
672    ///
673    /// # Examples
674    ///
675    /// ```
676    /// # use tokio::io::{AsyncRead, AsyncWrite};
677    /// # use rama_http_core::h2::server::*;
678    /// #
679    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
680    /// # -> Handshake<T>
681    /// # {
682    /// // `server_fut` is a future representing the completion of the HTTP/2
683    /// // handshake.
684    /// let server_fut = Builder::new()
685    ///     .initial_window_size(1_000_000)
686    ///     .handshake(my_io);
687    /// # server_fut
688    /// # }
689    /// #
690    /// # pub fn main() {}
691    /// ```
692    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
693        self.settings.set_initial_window_size(Some(size));
694        self
695    }
696
697    /// Indicates the initial window size (in octets) for connection-level flow control
698    /// for received data.
699    ///
700    /// The initial window of a connection is used as part of flow control. For more details,
701    /// see [`FlowControl`].
702    ///
703    /// The default value is 65,535.
704    ///
705    /// [`FlowControl`]: ../struct.FlowControl.html
706    ///
707    /// # Examples
708    ///
709    /// ```
710    /// # use tokio::io::{AsyncRead, AsyncWrite};
711    /// # use rama_http_core::h2::server::*;
712    /// #
713    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
714    /// # -> Handshake<T>
715    /// # {
716    /// // `server_fut` is a future representing the completion of the HTTP/2
717    /// // handshake.
718    /// let server_fut = Builder::new()
719    ///     .initial_connection_window_size(1_000_000)
720    ///     .handshake(my_io);
721    /// # server_fut
722    /// # }
723    /// #
724    /// # pub fn main() {}
725    /// ```
726    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
727        self.initial_target_connection_window_size = Some(size);
728        self
729    }
730
731    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
732    /// configured server is able to accept.
733    ///
734    /// The sender may send data frames that are **smaller** than this value,
735    /// but any data larger than `max` will be broken up into multiple `DATA`
736    /// frames.
737    ///
738    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
739    ///
740    /// # Examples
741    ///
742    /// ```
743    /// # use tokio::io::{AsyncRead, AsyncWrite};
744    /// # use rama_http_core::h2::server::*;
745    /// #
746    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
747    /// # -> Handshake<T>
748    /// # {
749    /// // `server_fut` is a future representing the completion of the HTTP/2
750    /// // handshake.
751    /// let server_fut = Builder::new()
752    ///     .max_frame_size(1_000_000)
753    ///     .handshake(my_io);
754    /// # server_fut
755    /// # }
756    /// #
757    /// # pub fn main() {}
758    /// ```
759    ///
760    /// # Panics
761    ///
762    /// This function panics if `max` is not within the legal range specified
763    /// above.
764    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
765        self.settings.set_max_frame_size(Some(max));
766        self
767    }
768
769    /// Sets the max size of received header frames.
770    ///
771    /// This advisory setting informs a peer of the maximum size of header list
772    /// that the sender is prepared to accept, in octets. The value is based on
773    /// the uncompressed size of header fields, including the length of the name
774    /// and value in octets plus an overhead of 32 octets for each header field.
775    ///
776    /// This setting is also used to limit the maximum amount of data that is
777    /// buffered to decode HEADERS frames.
778    ///
779    /// # Examples
780    ///
781    /// ```
782    /// # use tokio::io::{AsyncRead, AsyncWrite};
783    /// # use rama_http_core::h2::server::*;
784    /// #
785    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
786    /// # -> Handshake<T>
787    /// # {
788    /// // `server_fut` is a future representing the completion of the HTTP/2
789    /// // handshake.
790    /// let server_fut = Builder::new()
791    ///     .max_header_list_size(16 * 1024)
792    ///     .handshake(my_io);
793    /// # server_fut
794    /// # }
795    /// #
796    /// # pub fn main() {}
797    /// ```
798    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
799        self.settings.set_max_header_list_size(Some(max));
800        self
801    }
802
803    /// Sets the maximum number of concurrent streams.
804    ///
805    /// The maximum concurrent streams setting only controls the maximum number
806    /// of streams that can be initiated by the remote peer. In other words,
807    /// when this setting is set to 100, this does not limit the number of
808    /// concurrent streams that can be created by the caller.
809    ///
810    /// It is recommended that this value be no smaller than 100, so as to not
811    /// unnecessarily limit parallelism. However, any value is legal, including
812    /// 0. If `max` is set to 0, then the remote will not be permitted to
813    /// initiate streams.
814    ///
815    /// Note that streams in the reserved state, i.e., push promises that have
816    /// been reserved but the stream has not started, do not count against this
817    /// setting.
818    ///
819    /// Also note that if the remote *does* exceed the value set here, it is not
820    /// a protocol level error. Instead, the `h2` library will immediately reset
821    /// the stream.
822    ///
823    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
824    ///
825    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
826    ///
827    /// # Examples
828    ///
829    /// ```
830    /// # use tokio::io::{AsyncRead, AsyncWrite};
831    /// # use rama_http_core::h2::server::*;
832    /// #
833    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
834    /// # -> Handshake<T>
835    /// # {
836    /// // `server_fut` is a future representing the completion of the HTTP/2
837    /// // handshake.
838    /// let server_fut = Builder::new()
839    ///     .max_concurrent_streams(1000)
840    ///     .handshake(my_io);
841    /// # server_fut
842    /// # }
843    /// #
844    /// # pub fn main() {}
845    /// ```
846    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
847        self.settings.set_max_concurrent_streams(Some(max));
848        self
849    }
850
851    /// Sets the maximum number of concurrent locally reset streams.
852    ///
853    /// When a stream is explicitly reset by either calling
854    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
855    /// before completing the stream, the HTTP/2 specification requires that
856    /// any further frames received for that stream must be ignored for "some
857    /// time".
858    ///
859    /// In order to satisfy the specification, internal state must be maintained
860    /// to implement the behavior. This state grows linearly with the number of
861    /// streams that are locally reset.
862    ///
863    /// The `max_concurrent_reset_streams` setting configures sets an upper
864    /// bound on the amount of state that is maintained. When this max value is
865    /// reached, the oldest reset stream is purged from memory.
866    ///
867    /// Once the stream has been fully purged from memory, any additional frames
868    /// received for that stream will result in a connection level protocol
869    /// error, forcing the connection to terminate.
870    ///
871    /// The default value is 10.
872    ///
873    /// # Examples
874    ///
875    /// ```
876    /// # use tokio::io::{AsyncRead, AsyncWrite};
877    /// # use rama_http_core::h2::server::*;
878    /// #
879    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
880    /// # -> Handshake<T>
881    /// # {
882    /// // `server_fut` is a future representing the completion of the HTTP/2
883    /// // handshake.
884    /// let server_fut = Builder::new()
885    ///     .max_concurrent_reset_streams(1000)
886    ///     .handshake(my_io);
887    /// # server_fut
888    /// # }
889    /// #
890    /// # pub fn main() {}
891    /// ```
892    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
893        self.reset_stream_max = max;
894        self
895    }
896
897    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
898    ///
899    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
900    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
901    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
902    ///
903    /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
904    /// `ENHANCE_YOUR_CALM` to the client.
905    ///
906    /// If you really want to disable this, supply [`Option::None`] here.
907    /// Disabling this is not recommended and may expose you to DOS attacks.
908    ///
909    /// The default value is currently 1024, but could change.
910    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
911        self.local_max_error_reset_streams = max;
912        self
913    }
914
915    /// Sets the maximum number of pending-accept remotely-reset streams.
916    ///
917    /// Streams that have been received by the peer, but not accepted by the
918    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
919    /// could send a request and then shortly after, realize it is not needed,
920    /// sending a CANCEL.
921    ///
922    /// However, since those streams are now "closed", they don't count towards
923    /// the max concurrent streams. So, they will sit in the accept queue,
924    /// using memory.
925    ///
926    /// When the number of remotely-reset streams sitting in the pending-accept
927    /// queue reaches this maximum value, a connection error with the code of
928    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
929    /// `Future`.
930    ///
931    /// The default value is currently 20, but could change.
932    ///
933    /// # Examples
934    ///
935    ///
936    /// ```
937    /// # use tokio::io::{AsyncRead, AsyncWrite};
938    /// # use rama_http_core::h2::server::*;
939    /// #
940    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
941    /// # -> Handshake<T>
942    /// # {
943    /// // `server_fut` is a future representing the completion of the HTTP/2
944    /// // handshake.
945    /// let server_fut = Builder::new()
946    ///     .max_pending_accept_reset_streams(100)
947    ///     .handshake(my_io);
948    /// # server_fut
949    /// # }
950    /// #
951    /// # pub fn main() {}
952    /// ```
953    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
954        self.pending_accept_reset_stream_max = max;
955        self
956    }
957
958    /// Sets the maximum send buffer size per stream.
959    ///
960    /// Once a stream has buffered up to (or over) the maximum, the stream's
961    /// flow control will not "poll" additional capacity. Once bytes for the
962    /// stream have been written to the connection, the send buffer capacity
963    /// will be freed up again.
964    ///
965    /// The default is currently ~400KB, but may change.
966    ///
967    /// # Panics
968    ///
969    /// This function panics if `max` is larger than `u32::MAX`.
970    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
971        assert!(max <= u32::MAX as usize);
972        self.max_send_buffer_size = max;
973        self
974    }
975
976    /// Sets the maximum number of concurrent locally reset streams.
977    ///
978    /// When a stream is explicitly reset by either calling
979    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
980    /// before completing the stream, the HTTP/2 specification requires that
981    /// any further frames received for that stream must be ignored for "some
982    /// time".
983    ///
984    /// In order to satisfy the specification, internal state must be maintained
985    /// to implement the behavior. This state grows linearly with the number of
986    /// streams that are locally reset.
987    ///
988    /// The `reset_stream_duration` setting configures the max amount of time
989    /// this state will be maintained in memory. Once the duration elapses, the
990    /// stream state is purged from memory.
991    ///
992    /// Once the stream has been fully purged from memory, any additional frames
993    /// received for that stream will result in a connection level protocol
994    /// error, forcing the connection to terminate.
995    ///
996    /// The default value is 30 seconds.
997    ///
998    /// # Examples
999    ///
1000    /// ```
1001    /// # use tokio::io::{AsyncRead, AsyncWrite};
1002    /// # use rama_http_core::h2::server::*;
1003    /// # use std::time::Duration;
1004    /// #
1005    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1006    /// # -> Handshake<T>
1007    /// # {
1008    /// // `server_fut` is a future representing the completion of the HTTP/2
1009    /// // handshake.
1010    /// let server_fut = Builder::new()
1011    ///     .reset_stream_duration(Duration::from_secs(10))
1012    ///     .handshake(my_io);
1013    /// # server_fut
1014    /// # }
1015    /// #
1016    /// # pub fn main() {}
1017    /// ```
1018    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1019        self.reset_stream_duration = dur;
1020        self
1021    }
1022
1023    /// Enables the [extended CONNECT protocol].
1024    ///
1025    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1026    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1027        self.settings.set_enable_connect_protocol(Some(1));
1028        self
1029    }
1030
1031    /// Creates a new configured HTTP/2 server backed by `io`.
1032    ///
1033    /// It is expected that `io` already be in an appropriate state to commence
1034    /// the [HTTP/2 handshake]. See [Handshake] for more details.
1035    ///
1036    /// Returns a future which resolves to the [`Connection`] instance once the
1037    /// HTTP/2 handshake has been completed.
1038    ///
1039    /// This function also allows the caller to configure the send payload data
1040    /// type. See [Outbound data type] for more details.
1041    ///
1042    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1043    /// [Handshake]: ../index.html#handshake
1044    /// [`Connection`]: struct.Connection.html
1045    /// [Outbound data type]: ../index.html#outbound-data-type.
1046    ///
1047    /// # Examples
1048    ///
1049    /// Basic usage:
1050    ///
1051    /// ```
1052    /// # use tokio::io::{AsyncRead, AsyncWrite};
1053    /// # use rama_http_core::h2::server::*;
1054    /// #
1055    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1056    /// # -> Handshake<T>
1057    /// # {
1058    /// // `server_fut` is a future representing the completion of the HTTP/2
1059    /// // handshake.
1060    /// let server_fut = Builder::new()
1061    ///     .handshake(my_io);
1062    /// # server_fut
1063    /// # }
1064    /// #
1065    /// # pub fn main() {}
1066    /// ```
1067    ///
1068    /// Configures the send-payload data type. In this case, the outbound data
1069    /// type will be `&'static [u8]`.
1070    ///
1071    /// ```
1072    /// # use tokio::io::{AsyncRead, AsyncWrite};
1073    /// # use rama_http_core::h2::server::*;
1074    /// #
1075    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1076    /// # -> Handshake<T, &'static [u8]>
1077    /// # {
1078    /// // `server_fut` is a future representing the completion of the HTTP/2
1079    /// // handshake.
1080    /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1081    ///     .handshake(my_io);
1082    /// # server_fut
1083    /// # }
1084    /// #
1085    /// # pub fn main() {}
1086    /// ```
1087    pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1088    where
1089        T: AsyncRead + AsyncWrite + Unpin,
1090        B: Buf,
1091    {
1092        Connection::handshake2(io, self.clone())
1093    }
1094}
1095
1096impl Default for Builder {
1097    fn default() -> Builder {
1098        Builder::new()
1099    }
1100}
1101
1102// ===== impl SendResponse =====
1103
1104impl<B: Buf> SendResponse<B> {
1105    /// Send a response to a client request.
1106    ///
1107    /// On success, a [`SendStream`] instance is returned. This instance can be
1108    /// used to stream the response body and send trailers.
1109    ///
1110    /// If a body or trailers will be sent on the returned [`SendStream`]
1111    /// instance, then `end_of_stream` must be set to `false` when calling this
1112    /// function.
1113    ///
1114    /// The [`SendResponse`] instance is already associated with a received
1115    /// request.  This function may only be called once per instance and only if
1116    /// [`send_reset`] has not been previously called.
1117    ///
1118    /// [`SendResponse`]: #
1119    /// [`SendStream`]: ../struct.SendStream.html
1120    /// [`send_reset`]: #method.send_reset
1121    pub fn send_response(
1122        &mut self,
1123        response: Response<()>,
1124        end_of_stream: bool,
1125    ) -> Result<SendStream<B>, crate::h2::Error> {
1126        self.inner
1127            .send_response(response, end_of_stream)
1128            .map(|_| SendStream::new(self.inner.clone()))
1129            .map_err(Into::into)
1130    }
1131
1132    /// Push a request and response to the client
1133    ///
1134    /// On success, a [`SendResponse`] instance is returned.
1135    ///
1136    /// [`SendResponse`]: #
1137    pub fn push_request(
1138        &mut self,
1139        request: Request<()>,
1140    ) -> Result<SendPushedResponse<B>, crate::h2::Error> {
1141        self.inner
1142            .send_push_promise(request)
1143            .map(|inner| SendPushedResponse {
1144                inner: SendResponse { inner },
1145            })
1146            .map_err(Into::into)
1147    }
1148
1149    /// Send a stream reset to the peer.
1150    ///
1151    /// This essentially cancels the stream, including any inbound or outbound
1152    /// data streams.
1153    ///
1154    /// If this function is called before [`send_response`], a call to
1155    /// [`send_response`] will result in an error.
1156    ///
1157    /// If this function is called while a [`SendStream`] instance is active,
1158    /// any further use of the instance will result in an error.
1159    ///
1160    /// This function should only be called once.
1161    ///
1162    /// [`send_response`]: #method.send_response
1163    /// [`SendStream`]: ../struct.SendStream.html
1164    pub fn send_reset(&mut self, reason: Reason) {
1165        self.inner.send_reset(reason)
1166    }
1167
1168    /// Polls to be notified when the client resets this stream.
1169    ///
1170    /// If stream is still open, this returns `Poll::Pending`, and
1171    /// registers the task to be notified if a `RST_STREAM` is received.
1172    ///
1173    /// If a `RST_STREAM` frame is received for this stream, calling this
1174    /// method will yield the `Reason` for the reset.
1175    ///
1176    /// # Error
1177    ///
1178    /// Calling this method after having called `send_response` will return
1179    /// a user error.
1180    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::h2::Error>> {
1181        self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1182    }
1183
1184    /// Returns the stream ID of the response stream.
1185    ///
1186    /// # Panics
1187    ///
1188    /// If the lock on the stream store has been poisoned.
1189    pub fn stream_id(&self) -> crate::h2::StreamId {
1190        crate::h2::StreamId::from_internal(self.inner.stream_id())
1191    }
1192}
1193
1194// ===== impl SendPushedResponse =====
1195
1196impl<B: Buf> SendPushedResponse<B> {
1197    /// Send a response to a promised request.
1198    ///
1199    /// On success, a [`SendStream`] instance is returned. This instance can be
1200    /// used to stream the response body and send trailers.
1201    ///
1202    /// If a body or trailers will be sent on the returned [`SendStream`]
1203    /// instance, then `end_of_stream` must be set to `false` when calling this
1204    /// function.
1205    ///
1206    /// The [`SendPushedResponse`] instance is associated with a promised
1207    /// request.  This function may only be called once per instance and only if
1208    /// [`send_reset`] has not been previously called.
1209    ///
1210    /// [`SendPushedResponse`]: #
1211    /// [`SendStream`]: ../struct.SendStream.html
1212    /// [`send_reset`]: #method.send_reset
1213    pub fn send_response(
1214        &mut self,
1215        response: Response<()>,
1216        end_of_stream: bool,
1217    ) -> Result<SendStream<B>, crate::h2::Error> {
1218        self.inner.send_response(response, end_of_stream)
1219    }
1220
1221    /// Send a stream reset to the peer.
1222    ///
1223    /// This essentially cancels the stream, including any inbound or outbound
1224    /// data streams.
1225    ///
1226    /// If this function is called before [`send_response`], a call to
1227    /// [`send_response`] will result in an error.
1228    ///
1229    /// If this function is called while a [`SendStream`] instance is active,
1230    /// any further use of the instance will result in an error.
1231    ///
1232    /// This function should only be called once.
1233    ///
1234    /// [`send_response`]: #method.send_response
1235    /// [`SendStream`]: ../struct.SendStream.html
1236    pub fn send_reset(&mut self, reason: Reason) {
1237        self.inner.send_reset(reason)
1238    }
1239
1240    /// Polls to be notified when the client resets this stream.
1241    ///
1242    /// If stream is still open, this returns `Poll::Pending`, and
1243    /// registers the task to be notified if a `RST_STREAM` is received.
1244    ///
1245    /// If a `RST_STREAM` frame is received for this stream, calling this
1246    /// method will yield the `Reason` for the reset.
1247    ///
1248    /// # Error
1249    ///
1250    /// Calling this method after having called `send_response` will return
1251    /// a user error.
1252    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::h2::Error>> {
1253        self.inner.poll_reset(cx)
1254    }
1255
1256    /// Returns the stream ID of the response stream.
1257    ///
1258    /// # Panics
1259    ///
1260    /// If the lock on the stream store has been poisoned.
1261    pub fn stream_id(&self) -> crate::h2::StreamId {
1262        self.inner.stream_id()
1263    }
1264}
1265
1266// ===== impl Flush =====
1267
1268impl<T, B: Buf> Flush<T, B> {
1269    fn new(codec: Codec<T, B>) -> Self {
1270        Flush { codec: Some(codec) }
1271    }
1272}
1273
1274impl<T, B> Future for Flush<T, B>
1275where
1276    T: AsyncWrite + Unpin,
1277    B: Buf,
1278{
1279    type Output = Result<Codec<T, B>, crate::h2::Error>;
1280
1281    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1282        // Flush the codec
1283        ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::h2::Error::from_io)?;
1284
1285        // Return the codec
1286        Poll::Ready(Ok(self.codec.take().unwrap()))
1287    }
1288}
1289
1290impl<T, B: Buf> ReadPreface<T, B> {
1291    fn new(codec: Codec<T, B>) -> Self {
1292        ReadPreface {
1293            codec: Some(codec),
1294            pos: 0,
1295        }
1296    }
1297
1298    fn inner_mut(&mut self) -> &mut T {
1299        self.codec.as_mut().unwrap().get_mut()
1300    }
1301}
1302
1303impl<T, B> Future for ReadPreface<T, B>
1304where
1305    T: AsyncRead + Unpin,
1306    B: Buf,
1307{
1308    type Output = Result<Codec<T, B>, crate::h2::Error>;
1309
1310    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1311        let mut buf = [0; 24];
1312        let mut rem = PREFACE.len() - self.pos;
1313
1314        while rem > 0 {
1315            let mut buf = ReadBuf::new(&mut buf[..rem]);
1316            ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1317                .map_err(crate::h2::Error::from_io)?;
1318            let n = buf.filled().len();
1319            if n == 0 {
1320                return Poll::Ready(Err(crate::h2::Error::from_io(io::Error::new(
1321                    io::ErrorKind::UnexpectedEof,
1322                    "connection closed before reading preface",
1323                ))));
1324            }
1325
1326            if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1327                proto_err!(conn: "read_preface: invalid preface");
1328                // TODO: Should this just write the GO_AWAY frame directly?
1329                return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1330            }
1331
1332            self.pos += n;
1333            rem -= n; // TODO test
1334        }
1335
1336        Poll::Ready(Ok(self.codec.take().unwrap()))
1337    }
1338}
1339
1340// ===== impl Handshake =====
1341
1342impl<T, B: Buf> Future for Handshake<T, B>
1343where
1344    T: AsyncRead + AsyncWrite + Unpin,
1345    B: Buf,
1346{
1347    type Output = Result<Connection<T, B>, crate::h2::Error>;
1348
1349    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1350        let span = self.span.clone(); // XXX(eliza): T_T
1351        let _e = span.enter();
1352        tracing::trace!(state = ?self.state);
1353
1354        loop {
1355            match &mut self.state {
1356                Handshaking::Flushing(flush) => {
1357                    // We're currently flushing a pending SETTINGS frame. Poll the
1358                    // flush future, and, if it's completed, advance our state to wait
1359                    // for the client preface.
1360                    let codec = match Pin::new(flush).poll(cx)? {
1361                        Poll::Pending => {
1362                            tracing::trace!(flush.poll = %"Pending");
1363                            return Poll::Pending;
1364                        }
1365                        Poll::Ready(flushed) => {
1366                            tracing::trace!(flush.poll = %"Ready");
1367                            flushed
1368                        }
1369                    };
1370                    self.state = Handshaking::ReadingPreface(
1371                        ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1372                    );
1373                }
1374                Handshaking::ReadingPreface(read) => {
1375                    let codec = ready!(Pin::new(read).poll(cx)?);
1376
1377                    self.state = Handshaking::Done;
1378
1379                    let connection = proto::Connection::new(
1380                        codec,
1381                        Config {
1382                            next_stream_id: 2.into(),
1383                            // Server does not need to locally initiate any streams
1384                            initial_max_send_streams: 0,
1385                            max_send_buffer_size: self.builder.max_send_buffer_size,
1386                            reset_stream_duration: self.builder.reset_stream_duration,
1387                            reset_stream_max: self.builder.reset_stream_max,
1388                            remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1389                            local_error_reset_streams_max: self
1390                                .builder
1391                                .local_max_error_reset_streams,
1392                            settings: self.builder.settings.clone(),
1393                            headers_priority: None,
1394                            headers_pseudo_order: None,
1395                            priority: None,
1396                        },
1397                    );
1398
1399                    tracing::trace!("connection established!");
1400                    let mut c = Connection { connection };
1401                    if let Some(sz) = self.builder.initial_target_connection_window_size {
1402                        c.set_target_window_size(sz);
1403                    }
1404
1405                    return Poll::Ready(Ok(c));
1406                }
1407                Handshaking::Done => {
1408                    panic!("Handshaking::poll() called again after handshaking was complete")
1409                }
1410            }
1411        }
1412    }
1413}
1414
1415impl<T, B> fmt::Debug for Handshake<T, B>
1416where
1417    T: AsyncRead + AsyncWrite + fmt::Debug,
1418    B: fmt::Debug + Buf,
1419{
1420    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1421        write!(fmt, "server::Handshake")
1422    }
1423}
1424
1425impl Peer {
1426    pub(crate) fn convert_send_message(
1427        id: StreamId,
1428        response: Response<()>,
1429        end_of_stream: bool,
1430    ) -> frame::Headers {
1431        use rama_http_types::dep::http::response::Parts;
1432
1433        // Extract the components of the HTTP request
1434        let (
1435            Parts {
1436                status,
1437                headers,
1438                mut extensions,
1439                ..
1440            },
1441            _,
1442        ) = response.into_parts();
1443
1444        // Build the set pseudo header set. All requests will include `method`
1445        // and `path`.
1446        let mut pseudo = Pseudo::response(status);
1447
1448        // reuse order if defined
1449        if let Some(order) = extensions.remove::<PseudoHeaderOrder>() {
1450            if !order.is_empty() {
1451                pseudo.order = order;
1452            }
1453        }
1454
1455        let header_order: OriginalHttp1Headers = extensions.remove().unwrap_or_default();
1456
1457        // Create the HEADERS frame
1458        let mut frame = frame::Headers::new(id, pseudo, headers, header_order, None);
1459
1460        if end_of_stream {
1461            frame.set_end_stream()
1462        }
1463
1464        frame
1465    }
1466
1467    pub(crate) fn convert_push_message(
1468        stream_id: StreamId,
1469        promised_id: StreamId,
1470        request: Request<()>,
1471    ) -> Result<frame::PushPromise, UserError> {
1472        use rama_http_types::dep::http::request::Parts;
1473
1474        if let Err(e) = frame::PushPromise::validate_request(&request) {
1475            match e {
1476                PushPromiseHeaderError::NotSafeAndCacheable => tracing::debug!(
1477                    ?promised_id,
1478                    "convert_push_message: method {} is not safe and cacheable",
1479                    request.method(),
1480                ),
1481                PushPromiseHeaderError::InvalidContentLength(e) => tracing::debug!(
1482                    ?promised_id,
1483                    "convert_push_message; promised request has invalid content-length {:?}",
1484                    e,
1485                ),
1486            }
1487            return Err(UserError::MalformedHeaders);
1488        }
1489
1490        // Extract the components of the HTTP request
1491        let (
1492            Parts {
1493                method,
1494                uri,
1495                headers,
1496                mut extensions,
1497                ..
1498            },
1499            _,
1500        ) = request.into_parts();
1501
1502        let mut pseudo = Pseudo::request(method, uri, None);
1503
1504        // reuse order if defined
1505        if let Some(order) = extensions.remove::<PseudoHeaderOrder>() {
1506            if !order.is_empty() {
1507                pseudo.order = order;
1508            }
1509        }
1510
1511        let header_order: OriginalHttp1Headers = extensions.remove().unwrap_or_default();
1512
1513        Ok(frame::PushPromise::new(
1514            stream_id,
1515            promised_id,
1516            pseudo,
1517            headers,
1518            header_order,
1519        ))
1520    }
1521}
1522
1523impl proto::Peer for Peer {
1524    type Poll = Request<()>;
1525
1526    const NAME: &'static str = "Server";
1527
1528    /*
1529    fn is_server() -> bool {
1530        true
1531    }
1532    */
1533
1534    fn r#dyn() -> proto::DynPeer {
1535        proto::DynPeer::Server
1536    }
1537
1538    fn convert_poll_message(
1539        pseudo: Pseudo,
1540        fields: HeaderMap,
1541        field_order: OriginalHttp1Headers,
1542        stream_id: StreamId,
1543    ) -> Result<Self::Poll, Error> {
1544        use rama_http_types::{Version, dep::http::uri};
1545
1546        let mut b = Request::builder();
1547
1548        macro_rules! malformed {
1549            ($($arg:tt)*) => {{
1550                tracing::debug!($($arg)*);
1551                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1552            }}
1553        }
1554
1555        b = b.version(Version::HTTP_2);
1556
1557        let is_connect;
1558        if let Some(method) = pseudo.method {
1559            is_connect = method == Method::CONNECT;
1560            b = b.method(method);
1561        } else {
1562            malformed!("malformed headers: missing method");
1563        }
1564
1565        let has_protocol = pseudo.protocol.is_some();
1566        if has_protocol {
1567            if is_connect {
1568                // Assert that we have the right type.
1569                b = b.extension::<crate::h2::ext::Protocol>(pseudo.protocol.unwrap());
1570            } else {
1571                malformed!("malformed headers: :protocol on non-CONNECT request");
1572            }
1573        }
1574
1575        if pseudo.status.is_some() {
1576            malformed!("malformed headers: :status field on request");
1577        }
1578
1579        // Convert the URI
1580        let mut parts = uri::Parts::default();
1581
1582        // A request translated from HTTP/1 must not include the :authority
1583        // header
1584        if let Some(authority) = pseudo.authority {
1585            let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1586            parts.authority = Some(maybe_authority.or_else(|why| {
1587                malformed!(
1588                    "malformed headers: malformed authority ({:?}): {}",
1589                    authority,
1590                    why,
1591                )
1592            })?);
1593        }
1594
1595        // A :scheme is required, except CONNECT.
1596        if let Some(scheme) = pseudo.scheme {
1597            if is_connect && !has_protocol {
1598                malformed!("malformed headers: :scheme in CONNECT");
1599            }
1600            let maybe_scheme = scheme.parse();
1601            let scheme = maybe_scheme.or_else(|why| {
1602                malformed!(
1603                    "malformed headers: malformed scheme ({:?}): {}",
1604                    scheme,
1605                    why,
1606                )
1607            })?;
1608
1609            // It's not possible to build an `Uri` from a scheme and path. So,
1610            // after validating is was a valid scheme, we just have to drop it
1611            // if there isn't an :authority.
1612            if parts.authority.is_some() {
1613                parts.scheme = Some(scheme);
1614            }
1615        } else if !is_connect || has_protocol {
1616            malformed!("malformed headers: missing scheme");
1617        }
1618
1619        if let Some(path) = pseudo.path {
1620            if is_connect && !has_protocol {
1621                malformed!("malformed headers: :path in CONNECT");
1622            }
1623
1624            // This cannot be empty
1625            if path.is_empty() {
1626                malformed!("malformed headers: missing path");
1627            }
1628
1629            let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1630            parts.path_and_query = Some(maybe_path.or_else(|why| {
1631                malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1632            })?);
1633        } else if is_connect && has_protocol {
1634            malformed!("malformed headers: missing path in extended CONNECT");
1635        }
1636
1637        b = b.uri(parts);
1638
1639        let mut request = match b.body(()) {
1640            Ok(request) => request,
1641            Err(e) => {
1642                // TODO: Should there be more specialized handling for different
1643                // kinds of errors
1644                proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1645                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1646            }
1647        };
1648
1649        if !pseudo.order.is_empty() {
1650            request.extensions_mut().insert(pseudo.order);
1651        }
1652
1653        if !field_order.is_empty() {
1654            request.extensions_mut().insert(field_order);
1655        }
1656
1657        *request.headers_mut() = fields;
1658
1659        Ok(request)
1660    }
1661}
1662
1663// ===== impl Handshaking =====
1664
1665impl<T, B> fmt::Debug for Handshaking<T, B>
1666where
1667    B: Buf,
1668{
1669    #[inline]
1670    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1671        match *self {
1672            Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1673            Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1674            Handshaking::Done => f.write_str("Done"),
1675        }
1676    }
1677}