http2/
server.rs

1//! Server implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/http2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use http2::server;
68//! use http::{Response, StatusCode};
69//! use tokio::net::TcpListener;
70//!
71//! #[tokio::main]
72//! pub async fn main() {
73//!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//!     // Accept all incoming TCP connections.
76//!     loop {
77//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78//!             // Spawn a new task to process each connection.
79//!             tokio::spawn(async {
80//!                 // Start the HTTP/2 connection handshake
81//!                 let mut http2 = server::handshake(socket).await.unwrap();
82//!                 // Accept all inbound HTTP/2 streams sent over the
83//!                 // connection.
84//!                 while let Some(request) = http2.accept().await {
85//!                     let (request, mut respond) = request.unwrap();
86//!                     println!("Received request: {:?}", request);
87//!
88//!                     // Build a response with no body
89//!                     let response = Response::builder()
90//!                         .status(StatusCode::OK)
91//!                         .body(())
92//!                         .unwrap();
93//!
94//!                     // Send the response back to the client
95//!                     respond.send_response(response, true)
96//!                         .unwrap();
97//!                 }
98//!
99//!             });
100//!         }
101//!     }
102//! }
103//! ```
104//!
105//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106//! [`handshake`]: fn.handshake.html
107//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108//! [`Builder`]: struct.Builder.html
109//! [`Connection`]: struct.Connection.html
110//! [`Connection::poll`]: struct.Connection.html#method.poll
111//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114//! [`RecvStream`]: ../struct.RecvStream.html
115//! [`SendStream`]: ../struct.SendStream.html
116//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118use crate::codec::{Codec, UserError};
119use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120use crate::proto::{self, Config, Error, Prioritized};
121use crate::{tracing, FlowControl, PingPong, RecvStream, SendStream};
122
123#[cfg(feature = "tracing")]
124use ::tracing::instrument::{Instrument, Instrumented};
125use bytes::{Buf, Bytes};
126use http::{HeaderMap, Method, Request, Response};
127use std::future::Future;
128use std::pin::Pin;
129use std::task::{Context, Poll};
130use std::time::Duration;
131use std::{fmt, io};
132use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
133
134/// In progress HTTP/2 connection handshake future.
135///
136/// This type implements `Future`, yielding a `Connection` instance once the
137/// handshake has completed.
138///
139/// The handshake is completed once the connection preface is fully received
140/// from the client **and** the initial settings frame is sent to the client.
141///
142/// The handshake future does not wait for the initial settings frame from the
143/// client.
144///
145/// See [module] level docs for more details.
146///
147/// [module]: index.html
148#[must_use = "futures do nothing unless polled"]
149pub struct Handshake<T, B: Buf = Bytes> {
150    /// The config to pass to Connection::new after handshake succeeds.
151    builder: Builder,
152    /// The current state of the handshake.
153    state: Handshaking<T, B>,
154    /// Span tracking the handshake
155    #[cfg(feature = "tracing")]
156    span: ::tracing::Span,
157}
158
159/// Accepts inbound HTTP/2 streams on a connection.
160///
161/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
162/// implements the HTTP/2 server logic for that connection. It is responsible
163/// for receiving inbound streams initiated by the client as well as driving the
164/// internal state forward.
165///
166/// `Connection` values are created by calling [`handshake`]. Once a
167/// `Connection` value is obtained, the caller must call [`poll`] or
168/// [`poll_close`] in order to drive the internal connection state forward.
169///
170/// See [module level] documentation for more details
171///
172/// [module level]: index.html
173/// [`handshake`]: struct.Connection.html#method.handshake
174/// [`poll`]: struct.Connection.html#method.poll
175/// [`poll_close`]: struct.Connection.html#method.poll_close
176///
177/// # Examples
178///
179/// ```
180/// # use tokio::io::{AsyncRead, AsyncWrite};
181/// # use http2::server;
182/// # use http2::server::*;
183/// #
184/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
185/// let mut server = server::handshake(my_io).await.unwrap();
186/// while let Some(request) = server.accept().await {
187///     tokio::spawn(async move {
188///         let (request, respond) = request.unwrap();
189///         // Process the request and send the response back to the client
190///         // using `respond`.
191///     });
192/// }
193/// # }
194/// #
195/// # pub fn main() {}
196/// ```
197#[must_use = "streams do nothing unless polled"]
198pub struct Connection<T, B: Buf> {
199    connection: proto::Connection<T, Peer, B>,
200}
201
202/// Builds server connections with custom configuration values.
203///
204/// Methods can be chained in order to set the configuration values.
205///
206/// The server is constructed by calling [`handshake`] and passing the I/O
207/// handle that will back the HTTP/2 server.
208///
209/// New instances of `Builder` are obtained via [`Builder::new`].
210///
211/// See function level documentation for details on the various server
212/// configuration settings.
213///
214/// [`Builder::new`]: struct.Builder.html#method.new
215/// [`handshake`]: struct.Builder.html#method.handshake
216///
217/// # Examples
218///
219/// ```
220/// # use tokio::io::{AsyncRead, AsyncWrite};
221/// # use http2::server::*;
222/// #
223/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
224/// # -> Handshake<T>
225/// # {
226/// // `server_fut` is a future representing the completion of the HTTP/2
227/// // handshake.
228/// let server_fut = Builder::new()
229///     .initial_window_size(1_000_000)
230///     .max_concurrent_streams(1000)
231///     .handshake(my_io);
232/// # server_fut
233/// # }
234/// #
235/// # pub fn main() {}
236/// ```
237#[derive(Clone, Debug)]
238pub struct Builder {
239    /// Time to keep locally reset streams around before reaping.
240    reset_stream_duration: Duration,
241
242    /// Maximum number of locally reset streams to keep at a time.
243    reset_stream_max: usize,
244
245    /// Maximum number of remotely reset streams to allow in the pending
246    /// accept queue.
247    pending_accept_reset_stream_max: usize,
248
249    /// Initial `Settings` frame to send as part of the handshake.
250    settings: Settings,
251
252    /// Initial target window size for new connections.
253    initial_target_connection_window_size: Option<u32>,
254
255    /// Maximum amount of bytes to "buffer" for writing per stream.
256    max_send_buffer_size: usize,
257
258    /// Maximum number of locally reset streams due to protocol error across
259    /// the lifetime of the connection.
260    ///
261    /// When this gets exceeded, we issue GOAWAYs.
262    local_max_error_reset_streams: Option<usize>,
263}
264
265/// Send a response back to the client
266///
267/// A `SendResponse` instance is provided when receiving a request and is used
268/// to send the associated response back to the client. It is also used to
269/// explicitly reset the stream with a custom reason.
270///
271/// It will also be used to initiate push promises linked with the associated
272/// stream.
273///
274/// If the `SendResponse` instance is dropped without sending a response, then
275/// the HTTP/2 stream will be reset.
276///
277/// See [module] level docs for more details.
278///
279/// [module]: index.html
280#[derive(Debug)]
281pub struct SendResponse<B: Buf> {
282    inner: proto::StreamRef<B>,
283}
284
285/// Send a response to a promised request
286///
287/// A `SendPushedResponse` instance is provided when promising a request and is used
288/// to send the associated response to the client. It is also used to
289/// explicitly reset the stream with a custom reason.
290///
291/// It can not be used to initiate push promises.
292///
293/// If the `SendPushedResponse` instance is dropped without sending a response, then
294/// the HTTP/2 stream will be reset.
295///
296/// See [module] level docs for more details.
297///
298/// [module]: index.html
299pub struct SendPushedResponse<B: Buf> {
300    inner: SendResponse<B>,
301}
302
303// Manual implementation necessary because of rust-lang/rust#26925
304impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
305    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
306        write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
307    }
308}
309
310/// Stages of an in-progress handshake.
311enum Handshaking<T, B: Buf> {
312    /// State 1. Connection is flushing pending SETTINGS frame.
313    #[cfg(feature = "tracing")]
314    Flushing(Instrumented<Flush<T, Prioritized<B>>>),
315    #[cfg(not(feature = "tracing"))]
316    Flushing(Flush<T, Prioritized<B>>),
317
318    /// State 2. Connection is waiting for the client preface.
319    #[cfg(feature = "tracing")]
320    ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
321    #[cfg(not(feature = "tracing"))]
322    ReadingPreface(ReadPreface<T, Prioritized<B>>),
323
324    /// State 3. Handshake is done, polling again would panic.
325    Done,
326}
327
328/// Flush a Sink
329struct Flush<T, B> {
330    codec: Option<Codec<T, B>>,
331}
332
333/// Read the client connection preface
334struct ReadPreface<T, B> {
335    codec: Option<Codec<T, B>>,
336    pos: usize,
337}
338
339#[derive(Debug)]
340pub(crate) struct Peer;
341
342const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
343
344/// Creates a new configured HTTP/2 server with default configuration
345/// values backed by `io`.
346///
347/// It is expected that `io` already be in an appropriate state to commence
348/// the [HTTP/2 handshake]. See [Handshake] for more details.
349///
350/// Returns a future which resolves to the [`Connection`] instance once the
351/// HTTP/2 handshake has been completed. The returned [`Connection`]
352/// instance will be using default configuration values. Use [`Builder`] to
353/// customize the configuration values used by a [`Connection`] instance.
354///
355/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
356/// [Handshake]: ../index.html#handshake
357/// [`Connection`]: struct.Connection.html
358///
359/// # Examples
360///
361/// ```
362/// # use tokio::io::{AsyncRead, AsyncWrite};
363/// # use http2::server;
364/// # use http2::server::*;
365/// #
366/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
367/// # {
368/// let connection = server::handshake(my_io).await.unwrap();
369/// // The HTTP/2 handshake has completed, now use `connection` to
370/// // accept inbound HTTP/2 streams.
371/// # }
372/// #
373/// # pub fn main() {}
374/// ```
375pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
376where
377    T: AsyncRead + AsyncWrite + Unpin,
378{
379    Builder::new().handshake(io)
380}
381
382// ===== impl Connection =====
383
384impl<T, B> Connection<T, B>
385where
386    T: AsyncRead + AsyncWrite + Unpin,
387    B: Buf,
388{
389    fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
390        #[cfg(feature = "tracing")]
391        let span = ::tracing::trace_span!("server_handshake");
392        #[cfg(feature = "tracing")]
393        let entered = span.enter();
394
395        // Create the codec.
396        let mut codec = Codec::new(io);
397
398        if let Some(max) = builder.settings.max_frame_size() {
399            codec.set_max_recv_frame_size(max as usize);
400        }
401
402        if let Some(max) = builder.settings.max_header_list_size() {
403            codec.set_max_recv_header_list_size(max as usize);
404        }
405
406        // Send initial settings frame.
407        codec
408            .buffer(builder.settings.clone().into())
409            .expect("invalid SETTINGS frame");
410
411        // Create the handshake future.
412        #[cfg(feature = "tracing")]
413        let state =
414            Handshaking::Flushing(Flush::new(codec).instrument(::tracing::trace_span!("flush")));
415        #[cfg(not(feature = "tracing"))]
416        let state = Handshaking::Flushing(Flush::new(codec));
417
418        #[cfg(feature = "tracing")]
419        drop(entered);
420
421        Handshake {
422            builder,
423            state,
424            #[cfg(feature = "tracing")]
425            span,
426        }
427    }
428
429    /// Accept the next incoming request on this connection.
430    pub async fn accept(
431        &mut self,
432    ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
433        crate::poll_fn(move |cx| self.poll_accept(cx)).await
434    }
435
436    #[doc(hidden)]
437    pub fn poll_accept(
438        &mut self,
439        cx: &mut Context<'_>,
440    ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
441        // Always try to advance the internal state. Getting Pending also is
442        // needed to allow this function to return Pending.
443        if self.poll_closed(cx)?.is_ready() {
444            // If the socket is closed, don't return anything
445            // TODO: drop any pending streams
446            return Poll::Ready(None);
447        }
448
449        if let Some(inner) = self.connection.next_incoming() {
450            tracing::trace!("received incoming");
451            let (head, _) = inner.take_request().into_parts();
452            let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
453
454            let request = Request::from_parts(head, body);
455            let respond = SendResponse { inner };
456
457            return Poll::Ready(Some(Ok((request, respond))));
458        }
459
460        Poll::Pending
461    }
462
463    /// Sets the target window size for the whole connection.
464    ///
465    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
466    /// frame will be immediately sent to the remote, increasing the connection
467    /// level window by `size - current_value`.
468    ///
469    /// If `size` is less than the current value, nothing will happen
470    /// immediately. However, as window capacity is released by
471    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
472    /// out until the number of "in flight" bytes drops below `size`.
473    ///
474    /// The default value is 65,535.
475    ///
476    /// See [`FlowControl`] documentation for more details.
477    ///
478    /// [`FlowControl`]: ../struct.FlowControl.html
479    /// [library level]: ../index.html#flow-control
480    pub fn set_target_window_size(&mut self, size: u32) {
481        assert!(size <= proto::MAX_WINDOW_SIZE);
482        self.connection.set_target_window_size(size);
483    }
484
485    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
486    /// flow control for received data.
487    ///
488    /// The `SETTINGS` will be sent to the remote, and only applied once the
489    /// remote acknowledges the change.
490    ///
491    /// This can be used to increase or decrease the window size for existing
492    /// streams.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if a previous call is still pending acknowledgement
497    /// from the remote endpoint.
498    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
499        assert!(size <= proto::MAX_WINDOW_SIZE);
500        self.connection.set_initial_window_size(size)?;
501        Ok(())
502    }
503
504    /// Enables the [extended CONNECT protocol].
505    ///
506    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if a previous call is still pending acknowledgement
511    /// from the remote endpoint.
512    pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
513        self.connection.set_enable_connect_protocol()?;
514        Ok(())
515    }
516
517    /// Returns `Ready` when the underlying connection has closed.
518    ///
519    /// If any new inbound streams are received during a call to `poll_closed`,
520    /// they will be queued and returned on the next call to [`poll_accept`].
521    ///
522    /// This function will advance the internal connection state, driving
523    /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
524    ///
525    /// See [here](index.html#managing-the-connection) for more details.
526    ///
527    /// [`poll_accept`]: struct.Connection.html#method.poll_accept
528    /// [`RecvStream`]: ../struct.RecvStream.html
529    /// [`SendStream`]: ../struct.SendStream.html
530    pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
531        self.connection.poll(cx).map_err(Into::into)
532    }
533
534    /// Sets the connection to a GOAWAY state.
535    ///
536    /// Does not terminate the connection. Must continue being polled to close
537    /// connection.
538    ///
539    /// After flushing the GOAWAY frame, the connection is closed. Any
540    /// outstanding streams do not prevent the connection from closing. This
541    /// should usually be reserved for shutting down when something bad
542    /// external to `http2` has happened, and open streams cannot be properly
543    /// handled.
544    ///
545    /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
546    pub fn abrupt_shutdown(&mut self, reason: Reason) {
547        self.connection.go_away_from_user(reason);
548    }
549
550    /// Starts a [graceful shutdown][1] process.
551    ///
552    /// Must continue being polled to close connection.
553    ///
554    /// It's possible to receive more requests after calling this method, since
555    /// they might have been in-flight from the client already. After about
556    /// 1 RTT, no new requests should be accepted. Once all active streams
557    /// have completed, the connection is closed.
558    ///
559    /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
560    pub fn graceful_shutdown(&mut self) {
561        self.connection.go_away_gracefully();
562    }
563
564    /// Takes a `PingPong` instance from the connection.
565    ///
566    /// # Note
567    ///
568    /// This may only be called once. Calling multiple times will return `None`.
569    pub fn ping_pong(&mut self) -> Option<PingPong> {
570        self.connection.take_user_pings().map(PingPong::new)
571    }
572
573    /// Checks if there are any streams
574    pub fn has_streams(&self) -> bool {
575        self.connection.has_streams()
576    }
577
578    /// Returns the maximum number of concurrent streams that may be initiated
579    /// by the server on this connection.
580    ///
581    /// This limit is configured by the client peer by sending the
582    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
583    /// This method returns the currently acknowledged value received from the
584    /// remote.
585    ///
586    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
587    pub fn max_concurrent_send_streams(&self) -> usize {
588        self.connection.max_send_streams()
589    }
590
591    /// Returns the maximum number of concurrent streams that may be initiated
592    /// by the client on this connection.
593    ///
594    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
595    /// parameter][1] sent in a `SETTINGS` frame that has been
596    /// acknowledged by the remote peer. The value to be sent is configured by
597    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
598    /// with the remote peer.
599    ///
600    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
601    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
602    pub fn max_concurrent_recv_streams(&self) -> usize {
603        self.connection.max_recv_streams()
604    }
605
606    // Could disappear at anytime.
607    #[doc(hidden)]
608    #[cfg(feature = "unstable")]
609    pub fn num_wired_streams(&self) -> usize {
610        self.connection.num_wired_streams()
611    }
612}
613
614#[cfg(feature = "stream")]
615impl<T, B> futures_core::Stream for Connection<T, B>
616where
617    T: AsyncRead + AsyncWrite + Unpin,
618    B: Buf,
619{
620    type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
621
622    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
623        self.poll_accept(cx)
624    }
625}
626
627impl<T, B> fmt::Debug for Connection<T, B>
628where
629    T: fmt::Debug,
630    B: fmt::Debug + Buf,
631{
632    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
633        fmt.debug_struct("Connection")
634            .field("connection", &self.connection)
635            .finish()
636    }
637}
638
639// ===== impl Builder =====
640
641impl Builder {
642    /// Returns a new server builder instance initialized with default
643    /// configuration values.
644    ///
645    /// Configuration methods can be chained on the return value.
646    ///
647    /// # Examples
648    ///
649    /// ```
650    /// # use tokio::io::{AsyncRead, AsyncWrite};
651    /// # use http2::server::*;
652    /// #
653    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
654    /// # -> Handshake<T>
655    /// # {
656    /// // `server_fut` is a future representing the completion of the HTTP/2
657    /// // handshake.
658    /// let server_fut = Builder::new()
659    ///     .initial_window_size(1_000_000)
660    ///     .max_concurrent_streams(1000)
661    ///     .handshake(my_io);
662    /// # server_fut
663    /// # }
664    /// #
665    /// # pub fn main() {}
666    /// ```
667    pub fn new() -> Builder {
668        Builder {
669            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
670            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
671            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
672            settings: Settings::default(),
673            initial_target_connection_window_size: None,
674            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
675
676            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
677        }
678    }
679
680    /// Indicates the initial window size (in octets) for stream-level
681    /// flow control for received data.
682    ///
683    /// The initial window of a stream is used as part of flow control. For more
684    /// details, see [`FlowControl`].
685    ///
686    /// The default value is 65,535.
687    ///
688    /// [`FlowControl`]: ../struct.FlowControl.html
689    ///
690    /// # Examples
691    ///
692    /// ```
693    /// # use tokio::io::{AsyncRead, AsyncWrite};
694    /// # use http2::server::*;
695    /// #
696    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
697    /// # -> Handshake<T>
698    /// # {
699    /// // `server_fut` is a future representing the completion of the HTTP/2
700    /// // handshake.
701    /// let server_fut = Builder::new()
702    ///     .initial_window_size(1_000_000)
703    ///     .handshake(my_io);
704    /// # server_fut
705    /// # }
706    /// #
707    /// # pub fn main() {}
708    /// ```
709    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
710        self.settings.set_initial_window_size(Some(size));
711        self
712    }
713
714    /// Indicates the initial window size (in octets) for connection-level flow control
715    /// for received data.
716    ///
717    /// The initial window of a connection is used as part of flow control. For more details,
718    /// see [`FlowControl`].
719    ///
720    /// The default value is 65,535.
721    ///
722    /// [`FlowControl`]: ../struct.FlowControl.html
723    ///
724    /// # Examples
725    ///
726    /// ```
727    /// # use tokio::io::{AsyncRead, AsyncWrite};
728    /// # use http2::server::*;
729    /// #
730    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
731    /// # -> Handshake<T>
732    /// # {
733    /// // `server_fut` is a future representing the completion of the HTTP/2
734    /// // handshake.
735    /// let server_fut = Builder::new()
736    ///     .initial_connection_window_size(1_000_000)
737    ///     .handshake(my_io);
738    /// # server_fut
739    /// # }
740    /// #
741    /// # pub fn main() {}
742    /// ```
743    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
744        self.initial_target_connection_window_size = Some(size);
745        self
746    }
747
748    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
749    /// configured server is able to accept.
750    ///
751    /// The sender may send data frames that are **smaller** than this value,
752    /// but any data larger than `max` will be broken up into multiple `DATA`
753    /// frames.
754    ///
755    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
756    ///
757    /// # Examples
758    ///
759    /// ```
760    /// # use tokio::io::{AsyncRead, AsyncWrite};
761    /// # use http2::server::*;
762    /// #
763    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
764    /// # -> Handshake<T>
765    /// # {
766    /// // `server_fut` is a future representing the completion of the HTTP/2
767    /// // handshake.
768    /// let server_fut = Builder::new()
769    ///     .max_frame_size(1_000_000)
770    ///     .handshake(my_io);
771    /// # server_fut
772    /// # }
773    /// #
774    /// # pub fn main() {}
775    /// ```
776    ///
777    /// # Panics
778    ///
779    /// This function panics if `max` is not within the legal range specified
780    /// above.
781    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
782        self.settings.set_max_frame_size(Some(max));
783        self
784    }
785
786    /// Sets the max size of received header frames.
787    ///
788    /// This advisory setting informs a peer of the maximum size of header list
789    /// that the sender is prepared to accept, in octets. The value is based on
790    /// the uncompressed size of header fields, including the length of the name
791    /// and value in octets plus an overhead of 32 octets for each header field.
792    ///
793    /// This setting is also used to limit the maximum amount of data that is
794    /// buffered to decode HEADERS frames.
795    ///
796    /// # Examples
797    ///
798    /// ```
799    /// # use tokio::io::{AsyncRead, AsyncWrite};
800    /// # use http2::server::*;
801    /// #
802    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
803    /// # -> Handshake<T>
804    /// # {
805    /// // `server_fut` is a future representing the completion of the HTTP/2
806    /// // handshake.
807    /// let server_fut = Builder::new()
808    ///     .max_header_list_size(16 * 1024)
809    ///     .handshake(my_io);
810    /// # server_fut
811    /// # }
812    /// #
813    /// # pub fn main() {}
814    /// ```
815    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
816        self.settings.set_max_header_list_size(Some(max));
817        self
818    }
819
820    /// Sets the maximum number of concurrent streams.
821    ///
822    /// The maximum concurrent streams setting only controls the maximum number
823    /// of streams that can be initiated by the remote peer. In other words,
824    /// when this setting is set to 100, this does not limit the number of
825    /// concurrent streams that can be created by the caller.
826    ///
827    /// It is recommended that this value be no smaller than 100, so as to not
828    /// unnecessarily limit parallelism. However, any value is legal, including
829    /// 0. If `max` is set to 0, then the remote will not be permitted to
830    /// initiate streams.
831    ///
832    /// Note that streams in the reserved state, i.e., push promises that have
833    /// been reserved but the stream has not started, do not count against this
834    /// setting.
835    ///
836    /// Also note that if the remote *does* exceed the value set here, it is not
837    /// a protocol level error. Instead, the `http2` library will immediately reset
838    /// the stream.
839    ///
840    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
841    ///
842    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
843    ///
844    /// # Examples
845    ///
846    /// ```
847    /// # use tokio::io::{AsyncRead, AsyncWrite};
848    /// # use http2::server::*;
849    /// #
850    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
851    /// # -> Handshake<T>
852    /// # {
853    /// // `server_fut` is a future representing the completion of the HTTP/2
854    /// // handshake.
855    /// let server_fut = Builder::new()
856    ///     .max_concurrent_streams(1000)
857    ///     .handshake(my_io);
858    /// # server_fut
859    /// # }
860    /// #
861    /// # pub fn main() {}
862    /// ```
863    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
864        self.settings.set_max_concurrent_streams(Some(max));
865        self
866    }
867
868    /// Sets the maximum number of concurrent locally reset streams.
869    ///
870    /// When a stream is explicitly reset by either calling
871    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
872    /// before completing the stream, the HTTP/2 specification requires that
873    /// any further frames received for that stream must be ignored for "some
874    /// time".
875    ///
876    /// In order to satisfy the specification, internal state must be maintained
877    /// to implement the behavior. This state grows linearly with the number of
878    /// streams that are locally reset.
879    ///
880    /// The `max_concurrent_reset_streams` setting configures sets an upper
881    /// bound on the amount of state that is maintained. When this max value is
882    /// reached, the oldest reset stream is purged from memory.
883    ///
884    /// Once the stream has been fully purged from memory, any additional frames
885    /// received for that stream will result in a connection level protocol
886    /// error, forcing the connection to terminate.
887    ///
888    /// The default value is 10.
889    ///
890    /// # Examples
891    ///
892    /// ```
893    /// # use tokio::io::{AsyncRead, AsyncWrite};
894    /// # use http2::server::*;
895    /// #
896    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
897    /// # -> Handshake<T>
898    /// # {
899    /// // `server_fut` is a future representing the completion of the HTTP/2
900    /// // handshake.
901    /// let server_fut = Builder::new()
902    ///     .max_concurrent_reset_streams(1000)
903    ///     .handshake(my_io);
904    /// # server_fut
905    /// # }
906    /// #
907    /// # pub fn main() {}
908    /// ```
909    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
910        self.reset_stream_max = max;
911        self
912    }
913
914    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
915    ///
916    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
917    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
918    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
919    ///
920    /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
921    /// `ENHANCE_YOUR_CALM` to the client.
922    ///
923    /// If you really want to disable this, supply [`Option::None`] here.
924    /// Disabling this is not recommended and may expose you to DOS attacks.
925    ///
926    /// The default value is currently 1024, but could change.
927    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
928        self.local_max_error_reset_streams = max;
929        self
930    }
931
932    /// Sets the maximum number of pending-accept remotely-reset streams.
933    ///
934    /// Streams that have been received by the peer, but not accepted by the
935    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
936    /// could send a request and then shortly after, realize it is not needed,
937    /// sending a CANCEL.
938    ///
939    /// However, since those streams are now "closed", they don't count towards
940    /// the max concurrent streams. So, they will sit in the accept queue,
941    /// using memory.
942    ///
943    /// When the number of remotely-reset streams sitting in the pending-accept
944    /// queue reaches this maximum value, a connection error with the code of
945    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
946    /// `Future`.
947    ///
948    /// The default value is currently 20, but could change.
949    ///
950    /// # Examples
951    ///
952    ///
953    /// ```
954    /// # use tokio::io::{AsyncRead, AsyncWrite};
955    /// # use http2::server::*;
956    /// #
957    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
958    /// # -> Handshake<T>
959    /// # {
960    /// // `server_fut` is a future representing the completion of the HTTP/2
961    /// // handshake.
962    /// let server_fut = Builder::new()
963    ///     .max_pending_accept_reset_streams(100)
964    ///     .handshake(my_io);
965    /// # server_fut
966    /// # }
967    /// #
968    /// # pub fn main() {}
969    /// ```
970    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
971        self.pending_accept_reset_stream_max = max;
972        self
973    }
974
975    /// Sets the maximum send buffer size per stream.
976    ///
977    /// Once a stream has buffered up to (or over) the maximum, the stream's
978    /// flow control will not "poll" additional capacity. Once bytes for the
979    /// stream have been written to the connection, the send buffer capacity
980    /// will be freed up again.
981    ///
982    /// The default is currently ~400KB, but may change.
983    ///
984    /// # Panics
985    ///
986    /// This function panics if `max` is larger than `u32::MAX`.
987    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
988        assert!(max <= u32::MAX as usize);
989        self.max_send_buffer_size = max;
990        self
991    }
992
993    /// Sets the maximum number of concurrent locally reset streams.
994    ///
995    /// When a stream is explicitly reset by either calling
996    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
997    /// before completing the stream, the HTTP/2 specification requires that
998    /// any further frames received for that stream must be ignored for "some
999    /// time".
1000    ///
1001    /// In order to satisfy the specification, internal state must be maintained
1002    /// to implement the behavior. This state grows linearly with the number of
1003    /// streams that are locally reset.
1004    ///
1005    /// The `reset_stream_duration` setting configures the max amount of time
1006    /// this state will be maintained in memory. Once the duration elapses, the
1007    /// stream state is purged from memory.
1008    ///
1009    /// Once the stream has been fully purged from memory, any additional frames
1010    /// received for that stream will result in a connection level protocol
1011    /// error, forcing the connection to terminate.
1012    ///
1013    /// The default value is 30 seconds.
1014    ///
1015    /// # Examples
1016    ///
1017    /// ```
1018    /// # use tokio::io::{AsyncRead, AsyncWrite};
1019    /// # use http2::server::*;
1020    /// # use std::time::Duration;
1021    /// #
1022    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1023    /// # -> Handshake<T>
1024    /// # {
1025    /// // `server_fut` is a future representing the completion of the HTTP/2
1026    /// // handshake.
1027    /// let server_fut = Builder::new()
1028    ///     .reset_stream_duration(Duration::from_secs(10))
1029    ///     .handshake(my_io);
1030    /// # server_fut
1031    /// # }
1032    /// #
1033    /// # pub fn main() {}
1034    /// ```
1035    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1036        self.reset_stream_duration = dur;
1037        self
1038    }
1039
1040    /// Enables the [extended CONNECT protocol].
1041    ///
1042    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1043    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1044        self.settings.set_enable_connect_protocol(Some(1));
1045        self
1046    }
1047
1048    /// Creates a new configured HTTP/2 server backed by `io`.
1049    ///
1050    /// It is expected that `io` already be in an appropriate state to commence
1051    /// the [HTTP/2 handshake]. See [Handshake] for more details.
1052    ///
1053    /// Returns a future which resolves to the [`Connection`] instance once the
1054    /// HTTP/2 handshake has been completed.
1055    ///
1056    /// This function also allows the caller to configure the send payload data
1057    /// type. See [Outbound data type] for more details.
1058    ///
1059    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1060    /// [Handshake]: ../index.html#handshake
1061    /// [`Connection`]: struct.Connection.html
1062    /// [Outbound data type]: ../index.html#outbound-data-type.
1063    ///
1064    /// # Examples
1065    ///
1066    /// Basic usage:
1067    ///
1068    /// ```
1069    /// # use tokio::io::{AsyncRead, AsyncWrite};
1070    /// # use http2::server::*;
1071    /// #
1072    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1073    /// # -> Handshake<T>
1074    /// # {
1075    /// // `server_fut` is a future representing the completion of the HTTP/2
1076    /// // handshake.
1077    /// let server_fut = Builder::new()
1078    ///     .handshake(my_io);
1079    /// # server_fut
1080    /// # }
1081    /// #
1082    /// # pub fn main() {}
1083    /// ```
1084    ///
1085    /// Configures the send-payload data type. In this case, the outbound data
1086    /// type will be `&'static [u8]`.
1087    ///
1088    /// ```
1089    /// # use tokio::io::{AsyncRead, AsyncWrite};
1090    /// # use http2::server::*;
1091    /// #
1092    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1093    /// # -> Handshake<T, &'static [u8]>
1094    /// # {
1095    /// // `server_fut` is a future representing the completion of the HTTP/2
1096    /// // handshake.
1097    /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1098    ///     .handshake(my_io);
1099    /// # server_fut
1100    /// # }
1101    /// #
1102    /// # pub fn main() {}
1103    /// ```
1104    pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1105    where
1106        T: AsyncRead + AsyncWrite + Unpin,
1107        B: Buf,
1108    {
1109        Connection::handshake2(io, self.clone())
1110    }
1111}
1112
1113impl Default for Builder {
1114    fn default() -> Builder {
1115        Builder::new()
1116    }
1117}
1118
1119// ===== impl SendResponse =====
1120
1121impl<B: Buf> SendResponse<B> {
1122    /// Send a response to a client request.
1123    ///
1124    /// On success, a [`SendStream`] instance is returned. This instance can be
1125    /// used to stream the response body and send trailers.
1126    ///
1127    /// If a body or trailers will be sent on the returned [`SendStream`]
1128    /// instance, then `end_of_stream` must be set to `false` when calling this
1129    /// function.
1130    ///
1131    /// The [`SendResponse`] instance is already associated with a received
1132    /// request.  This function may only be called once per instance and only if
1133    /// [`send_reset`] has not been previously called.
1134    ///
1135    /// [`SendResponse`]: #
1136    /// [`SendStream`]: ../struct.SendStream.html
1137    /// [`send_reset`]: #method.send_reset
1138    pub fn send_response(
1139        &mut self,
1140        response: Response<()>,
1141        end_of_stream: bool,
1142    ) -> Result<SendStream<B>, crate::Error> {
1143        self.inner
1144            .send_response(response, end_of_stream)
1145            .map(|_| SendStream::new(self.inner.clone()))
1146            .map_err(Into::into)
1147    }
1148
1149    /// Push a request and response to the client
1150    ///
1151    /// On success, a [`SendResponse`] instance is returned.
1152    ///
1153    /// [`SendResponse`]: #
1154    pub fn push_request(
1155        &mut self,
1156        request: Request<()>,
1157    ) -> Result<SendPushedResponse<B>, crate::Error> {
1158        self.inner
1159            .send_push_promise(request)
1160            .map(|inner| SendPushedResponse {
1161                inner: SendResponse { inner },
1162            })
1163            .map_err(Into::into)
1164    }
1165
1166    /// Send a stream reset to the peer.
1167    ///
1168    /// This essentially cancels the stream, including any inbound or outbound
1169    /// data streams.
1170    ///
1171    /// If this function is called before [`send_response`], a call to
1172    /// [`send_response`] will result in an error.
1173    ///
1174    /// If this function is called while a [`SendStream`] instance is active,
1175    /// any further use of the instance will result in an error.
1176    ///
1177    /// This function should only be called once.
1178    ///
1179    /// [`send_response`]: #method.send_response
1180    /// [`SendStream`]: ../struct.SendStream.html
1181    pub fn send_reset(&mut self, reason: Reason) {
1182        self.inner.send_reset(reason)
1183    }
1184
1185    /// Polls to be notified when the client resets this stream.
1186    ///
1187    /// If stream is still open, this returns `Poll::Pending`, and
1188    /// registers the task to be notified if a `RST_STREAM` is received.
1189    ///
1190    /// If a `RST_STREAM` frame is received for this stream, calling this
1191    /// method will yield the `Reason` for the reset.
1192    ///
1193    /// # Error
1194    ///
1195    /// Calling this method after having called `send_response` will return
1196    /// a user error.
1197    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1198        self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1199    }
1200
1201    /// Returns the stream ID of the response stream.
1202    ///
1203    /// # Panics
1204    ///
1205    /// If the lock on the stream store has been poisoned.
1206    pub fn stream_id(&self) -> crate::StreamId {
1207        crate::StreamId::from_internal(self.inner.stream_id())
1208    }
1209}
1210
1211// ===== impl SendPushedResponse =====
1212
1213impl<B: Buf> SendPushedResponse<B> {
1214    /// Send a response to a promised request.
1215    ///
1216    /// On success, a [`SendStream`] instance is returned. This instance can be
1217    /// used to stream the response body and send trailers.
1218    ///
1219    /// If a body or trailers will be sent on the returned [`SendStream`]
1220    /// instance, then `end_of_stream` must be set to `false` when calling this
1221    /// function.
1222    ///
1223    /// The [`SendPushedResponse`] instance is associated with a promised
1224    /// request.  This function may only be called once per instance and only if
1225    /// [`send_reset`] has not been previously called.
1226    ///
1227    /// [`SendPushedResponse`]: #
1228    /// [`SendStream`]: ../struct.SendStream.html
1229    /// [`send_reset`]: #method.send_reset
1230    pub fn send_response(
1231        &mut self,
1232        response: Response<()>,
1233        end_of_stream: bool,
1234    ) -> Result<SendStream<B>, crate::Error> {
1235        self.inner.send_response(response, end_of_stream)
1236    }
1237
1238    /// Send a stream reset to the peer.
1239    ///
1240    /// This essentially cancels the stream, including any inbound or outbound
1241    /// data streams.
1242    ///
1243    /// If this function is called before [`send_response`], a call to
1244    /// [`send_response`] will result in an error.
1245    ///
1246    /// If this function is called while a [`SendStream`] instance is active,
1247    /// any further use of the instance will result in an error.
1248    ///
1249    /// This function should only be called once.
1250    ///
1251    /// [`send_response`]: #method.send_response
1252    /// [`SendStream`]: ../struct.SendStream.html
1253    pub fn send_reset(&mut self, reason: Reason) {
1254        self.inner.send_reset(reason)
1255    }
1256
1257    /// Polls to be notified when the client resets this stream.
1258    ///
1259    /// If stream is still open, this returns `Poll::Pending`, and
1260    /// registers the task to be notified if a `RST_STREAM` is received.
1261    ///
1262    /// If a `RST_STREAM` frame is received for this stream, calling this
1263    /// method will yield the `Reason` for the reset.
1264    ///
1265    /// # Error
1266    ///
1267    /// Calling this method after having called `send_response` will return
1268    /// a user error.
1269    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1270        self.inner.poll_reset(cx)
1271    }
1272
1273    /// Returns the stream ID of the response stream.
1274    ///
1275    /// # Panics
1276    ///
1277    /// If the lock on the stream store has been poisoned.
1278    pub fn stream_id(&self) -> crate::StreamId {
1279        self.inner.stream_id()
1280    }
1281}
1282
1283// ===== impl Flush =====
1284
1285impl<T, B: Buf> Flush<T, B> {
1286    fn new(codec: Codec<T, B>) -> Self {
1287        Flush { codec: Some(codec) }
1288    }
1289}
1290
1291impl<T, B> Future for Flush<T, B>
1292where
1293    T: AsyncWrite + Unpin,
1294    B: Buf,
1295{
1296    type Output = Result<Codec<T, B>, crate::Error>;
1297
1298    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1299        // Flush the codec
1300        ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1301
1302        // Return the codec
1303        Poll::Ready(Ok(self.codec.take().unwrap()))
1304    }
1305}
1306
1307impl<T, B: Buf> ReadPreface<T, B> {
1308    fn new(codec: Codec<T, B>) -> Self {
1309        ReadPreface {
1310            codec: Some(codec),
1311            pos: 0,
1312        }
1313    }
1314
1315    fn inner_mut(&mut self) -> &mut T {
1316        self.codec.as_mut().unwrap().get_mut()
1317    }
1318}
1319
1320impl<T, B> Future for ReadPreface<T, B>
1321where
1322    T: AsyncRead + Unpin,
1323    B: Buf,
1324{
1325    type Output = Result<Codec<T, B>, crate::Error>;
1326
1327    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1328        let mut buf = [0; 24];
1329        let mut rem = PREFACE.len() - self.pos;
1330
1331        while rem > 0 {
1332            let mut buf = ReadBuf::new(&mut buf[..rem]);
1333            ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1334                .map_err(crate::Error::from_io)?;
1335            let n = buf.filled().len();
1336            if n == 0 {
1337                return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1338                    io::ErrorKind::UnexpectedEof,
1339                    "connection closed before reading preface",
1340                ))));
1341            }
1342
1343            if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1344                proto_err!(conn: "read_preface: invalid preface");
1345                // TODO: Should this just write the GO_AWAY frame directly?
1346                return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1347            }
1348
1349            self.pos += n;
1350            rem -= n; // TODO test
1351        }
1352
1353        Poll::Ready(Ok(self.codec.take().unwrap()))
1354    }
1355}
1356
1357// ===== impl Handshake =====
1358
1359impl<T, B: Buf> Future for Handshake<T, B>
1360where
1361    T: AsyncRead + AsyncWrite + Unpin,
1362    B: Buf,
1363{
1364    type Output = Result<Connection<T, B>, crate::Error>;
1365
1366    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1367        #[cfg(feature = "tracing")]
1368        let _span = self.span.clone().entered(); // XXX(eliza): T_T
1369        tracing::trace!(state = ?self.state);
1370
1371        loop {
1372            match &mut self.state {
1373                Handshaking::Flushing(flush) => {
1374                    // We're currently flushing a pending SETTINGS frame. Poll the
1375                    // flush future, and, if it's completed, advance our state to wait
1376                    // for the client preface.
1377                    let codec = match Pin::new(flush).poll(cx)? {
1378                        Poll::Pending => {
1379                            tracing::trace!(flush.poll = %"Pending");
1380                            return Poll::Pending;
1381                        }
1382                        Poll::Ready(flushed) => {
1383                            tracing::trace!(flush.poll = %"Ready");
1384                            flushed
1385                        }
1386                    };
1387                    self.state = Handshaking::ReadingPreface(
1388                        #[cfg(feature = "tracing")]
1389                        ReadPreface::new(codec).instrument(::tracing::trace_span!("read_preface")),
1390                        #[cfg(not(feature = "tracing"))]
1391                        ReadPreface::new(codec),
1392                    );
1393                }
1394                Handshaking::ReadingPreface(read) => {
1395                    let codec = ready!(Pin::new(read).poll(cx)?);
1396
1397                    self.state = Handshaking::Done;
1398
1399                    let connection = proto::Connection::new(
1400                        codec,
1401                        Config {
1402                            next_stream_id: 2.into(),
1403                            // Server does not need to locally initiate any streams
1404                            initial_max_send_streams: 0,
1405                            max_send_buffer_size: self.builder.max_send_buffer_size,
1406                            reset_stream_duration: self.builder.reset_stream_duration,
1407                            reset_stream_max: self.builder.reset_stream_max,
1408                            remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1409                            local_error_reset_streams_max: self
1410                                .builder
1411                                .local_max_error_reset_streams,
1412                            settings: self.builder.settings.clone(),
1413                            headers_stream_dependency: None,
1414                            headers_pseudo_order: None,
1415                            priorities: None,
1416                        },
1417                    );
1418
1419                    tracing::trace!("connection established!");
1420                    let mut c = Connection { connection };
1421                    if let Some(sz) = self.builder.initial_target_connection_window_size {
1422                        c.set_target_window_size(sz);
1423                    }
1424
1425                    return Poll::Ready(Ok(c));
1426                }
1427                Handshaking::Done => {
1428                    panic!("Handshaking::poll() called again after handshaking was complete")
1429                }
1430            }
1431        }
1432    }
1433}
1434
1435impl<T, B> fmt::Debug for Handshake<T, B>
1436where
1437    T: AsyncRead + AsyncWrite + fmt::Debug,
1438    B: fmt::Debug + Buf,
1439{
1440    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1441        write!(fmt, "server::Handshake")
1442    }
1443}
1444
1445impl Peer {
1446    pub fn convert_send_message(
1447        id: StreamId,
1448        response: Response<()>,
1449        end_of_stream: bool,
1450    ) -> frame::Headers {
1451        use http::response::Parts;
1452
1453        // Extract the components of the HTTP request
1454        let (
1455            Parts {
1456                status, headers, ..
1457            },
1458            _,
1459        ) = response.into_parts();
1460
1461        // Build the set pseudo header set. All requests will include `method`
1462        // and `path`.
1463        let pseudo = Pseudo::response(status);
1464
1465        // Create the HEADERS frame
1466        let mut frame = frame::Headers::new(id, pseudo, headers);
1467
1468        if end_of_stream {
1469            frame.set_end_stream()
1470        }
1471
1472        frame
1473    }
1474
1475    pub fn convert_push_message(
1476        stream_id: StreamId,
1477        promised_id: StreamId,
1478        request: Request<()>,
1479    ) -> Result<frame::PushPromise, UserError> {
1480        use http::request::Parts;
1481
1482        if let Err(e) = frame::PushPromise::validate_request(&request) {
1483            use PushPromiseHeaderError::*;
1484            match e {
1485                NotSafeAndCacheable => tracing::debug!(
1486                    ?promised_id,
1487                    "convert_push_message: method {} is not safe and cacheable",
1488                    request.method(),
1489                ),
1490                InvalidContentLength(_e) => tracing::debug!(
1491                    ?promised_id,
1492                    "convert_push_message; promised request has invalid content-length {:?}",
1493                    _e,
1494                ),
1495            }
1496            return Err(UserError::MalformedHeaders);
1497        }
1498
1499        // Extract the components of the HTTP request
1500        let (
1501            Parts {
1502                method,
1503                uri,
1504                headers,
1505                ..
1506            },
1507            _,
1508        ) = request.into_parts();
1509
1510        let pseudo = Pseudo::request(method, uri, None);
1511
1512        Ok(frame::PushPromise::new(
1513            stream_id,
1514            promised_id,
1515            pseudo,
1516            headers,
1517        ))
1518    }
1519}
1520
1521impl proto::Peer for Peer {
1522    type Poll = Request<()>;
1523
1524    #[cfg(feature = "tracing")]
1525    const NAME: &'static str = "Server";
1526
1527    /*
1528    fn is_server() -> bool {
1529        true
1530    }
1531    */
1532
1533    fn r#dyn() -> proto::DynPeer {
1534        proto::DynPeer::Server
1535    }
1536
1537    fn convert_poll_message(
1538        pseudo: Pseudo,
1539        fields: HeaderMap,
1540        stream_id: StreamId,
1541    ) -> Result<Self::Poll, Error> {
1542        use http::{uri, Version};
1543
1544        let mut b = Request::builder();
1545
1546        macro_rules! malformed {
1547            ($($arg:tt)*) => {{
1548                tracing::debug!($($arg)*);
1549                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1550            }}
1551        }
1552
1553        b = b.version(Version::HTTP_2);
1554
1555        let is_connect;
1556        if let Some(method) = pseudo.method {
1557            is_connect = method == Method::CONNECT;
1558            b = b.method(method);
1559        } else {
1560            malformed!("malformed headers: missing method");
1561        }
1562
1563        let has_protocol = pseudo.protocol.is_some();
1564        if has_protocol {
1565            if is_connect {
1566                // Assert that we have the right type.
1567                b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1568            } else {
1569                malformed!("malformed headers: :protocol on non-CONNECT request");
1570            }
1571        }
1572
1573        if pseudo.status.is_some() {
1574            malformed!("malformed headers: :status field on request");
1575        }
1576
1577        // Convert the URI
1578        let mut parts = uri::Parts::default();
1579
1580        // A request translated from HTTP/1 must not include the :authority
1581        // header
1582        if let Some(authority) = pseudo.authority {
1583            let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1584            parts.authority = Some(maybe_authority.or_else(|_why| {
1585                malformed!(
1586                    "malformed headers: malformed authority ({:?}): {}",
1587                    authority,
1588                    _why,
1589                )
1590            })?);
1591        }
1592
1593        // A :scheme is required, except CONNECT.
1594        if let Some(scheme) = pseudo.scheme {
1595            if is_connect && !has_protocol {
1596                malformed!("malformed headers: :scheme in CONNECT");
1597            }
1598            let maybe_scheme = scheme.parse();
1599            let scheme = maybe_scheme.or_else(|_why| {
1600                malformed!(
1601                    "malformed headers: malformed scheme ({:?}): {}",
1602                    scheme,
1603                    _why,
1604                )
1605            })?;
1606
1607            // It's not possible to build an `Uri` from a scheme and path. So,
1608            // after validating is was a valid scheme, we just have to drop it
1609            // if there isn't an :authority.
1610            if parts.authority.is_some() {
1611                parts.scheme = Some(scheme);
1612            }
1613        } else if !is_connect || has_protocol {
1614            malformed!("malformed headers: missing scheme");
1615        }
1616
1617        if let Some(path) = pseudo.path {
1618            if is_connect && !has_protocol {
1619                malformed!("malformed headers: :path in CONNECT");
1620            }
1621
1622            // This cannot be empty
1623            if path.is_empty() {
1624                malformed!("malformed headers: missing path");
1625            }
1626
1627            let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1628            parts.path_and_query = Some(maybe_path.or_else(|_why| {
1629                malformed!("malformed headers: malformed path ({:?}): {}", path, _why,)
1630            })?);
1631        } else if is_connect && has_protocol {
1632            malformed!("malformed headers: missing path in extended CONNECT");
1633        }
1634
1635        b = b.uri(parts);
1636
1637        let mut request = match b.body(()) {
1638            Ok(request) => request,
1639            Err(_e) => {
1640                // TODO: Should there be more specialized handling for different
1641                // kinds of errors
1642                proto_err!(stream: "error building request: {}; stream={:?}", _e, stream_id);
1643                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1644            }
1645        };
1646
1647        *request.headers_mut() = fields;
1648
1649        Ok(request)
1650    }
1651}
1652
1653// ===== impl Handshaking =====
1654
1655impl<T, B> fmt::Debug for Handshaking<T, B>
1656where
1657    B: Buf,
1658{
1659    #[inline]
1660    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1661        match *self {
1662            Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1663            Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1664            Handshaking::Done => f.write_str("Done"),
1665        }
1666    }
1667}