hreq_h2/
server.rs

1//! Server implementation of the HTTP/2.0 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2.0 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2.0 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2.0 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2.0 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2.0 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/h2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2.0 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2.0 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use h2::server;
68//! use http::{Response, StatusCode};
69//! use tokio::net::TcpListener;
70//!
71//! #[tokio::main]
72//! pub async fn main() {
73//!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//!     // Accept all incoming TCP connections.
76//!     loop {
77//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78//!             // Spawn a new task to process each connection.
79//!             tokio::spawn(async {
80//!                 // Start the HTTP/2.0 connection handshake
81//!                 let mut h2 = server::handshake(socket).await.unwrap();
82//!                 // Accept all inbound HTTP/2.0 streams sent over the
83//!                 // connection.
84//!                 while let Some(request) = h2.accept().await {
85//!                     let (request, mut respond) = request.unwrap();
86//!                     println!("Received request: {:?}", request);
87//!
88//!                     // Build a response with no body
89//!                     let response = Response::builder()
90//!                         .status(StatusCode::OK)
91//!                         .body(())
92//!                         .unwrap();
93//!
94//!                     // Send the response back to the client
95//!                     respond.send_response(response, true)
96//!                         .unwrap();
97//!                 }
98//!
99//!             });
100//!         }
101//!     }
102//! }
103//! ```
104//!
105//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106//! [`handshake`]: fn.handshake.html
107//! [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108//! [`Builder`]: struct.Builder.html
109//! [`Connection`]: struct.Connection.html
110//! [`Connection::poll`]: struct.Connection.html#method.poll
111//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114//! [`RecvStream`]: ../struct.RecvStream.html
115//! [`SendStream`]: ../struct.SendStream.html
116//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118use crate::codec::{Codec, RecvError, UserError};
119use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120use crate::proto::{self, Config, Prioritized};
121use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123use bytes::{Buf, Bytes};
124use futures_io::{AsyncRead, AsyncWrite};
125use http::{HeaderMap, Method, Request, Response};
126use std::future::Future;
127use std::pin::Pin;
128use std::task::{Context, Poll};
129use std::time::Duration;
130use std::{convert, fmt, io, mem};
131
132/// In progress HTTP/2.0 connection handshake future.
133///
134/// This type implements `Future`, yielding a `Connection` instance once the
135/// handshake has completed.
136///
137/// The handshake is completed once the connection preface is fully received
138/// from the client **and** the initial settings frame is sent to the client.
139///
140/// The handshake future does not wait for the initial settings frame from the
141/// client.
142///
143/// See [module] level docs for more details.
144///
145/// [module]: index.html
146#[must_use = "futures do nothing unless polled"]
147pub struct Handshake<T, B: Buf = Bytes> {
148    /// The config to pass to Connection::new after handshake succeeds.
149    builder: Builder,
150    /// The current state of the handshake.
151    state: Handshaking<T, B>,
152}
153
154/// Accepts inbound HTTP/2.0 streams on a connection.
155///
156/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
157/// implements the HTTP/2.0 server logic for that connection. It is responsible
158/// for receiving inbound streams initiated by the client as well as driving the
159/// internal state forward.
160///
161/// `Connection` values are created by calling [`handshake`]. Once a
162/// `Connection` value is obtained, the caller must call [`poll`] or
163/// [`poll_close`] in order to drive the internal connection state forward.
164///
165/// See [module level] documentation for more details
166///
167/// [module level]: index.html
168/// [`handshake`]: struct.Connection.html#method.handshake
169/// [`poll`]: struct.Connection.html#method.poll
170/// [`poll_close`]: struct.Connection.html#method.poll_close
171///
172/// # Examples
173///
174/// ```
175/// # use futures_io::{AsyncRead, AsyncWrite};
176/// # use h2::server;
177/// # use h2::server::*;
178/// #
179/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
180/// let mut server = server::handshake(my_io).await.unwrap();
181/// while let Some(request) = server.accept().await {
182///     let (request, respond) = request.unwrap();
183///     // Process the request and send the response back to the client
184///     // using `respond`.
185/// }
186/// # }
187/// #
188/// # pub fn main() {}
189/// ```
190#[must_use = "streams do nothing unless polled"]
191pub struct Connection<T, B: Buf> {
192    connection: proto::Connection<T, Peer, B>,
193}
194
195/// Builds server connections with custom configuration values.
196///
197/// Methods can be chained in order to set the configuration values.
198///
199/// The server is constructed by calling [`handshake`] and passing the I/O
200/// handle that will back the HTTP/2.0 server.
201///
202/// New instances of `Builder` are obtained via [`Builder::new`].
203///
204/// See function level documentation for details on the various server
205/// configuration settings.
206///
207/// [`Builder::new`]: struct.Builder.html#method.new
208/// [`handshake`]: struct.Builder.html#method.handshake
209///
210/// # Examples
211///
212/// ```
213/// # use futures_io::{AsyncRead, AsyncWrite};
214/// # use h2::server::*;
215/// #
216/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
217/// # -> Handshake<T>
218/// # {
219/// // `server_fut` is a future representing the completion of the HTTP/2.0
220/// // handshake.
221/// let server_fut = Builder::new()
222///     .initial_window_size(1_000_000)
223///     .max_concurrent_streams(1000)
224///     .handshake(my_io);
225/// # server_fut
226/// # }
227/// #
228/// # pub fn main() {}
229/// ```
230#[derive(Clone, Debug)]
231pub struct Builder {
232    /// Time to keep locally reset streams around before reaping.
233    reset_stream_duration: Duration,
234
235    /// Maximum number of locally reset streams to keep at a time.
236    reset_stream_max: usize,
237
238    /// Initial `Settings` frame to send as part of the handshake.
239    settings: Settings,
240
241    /// Initial target window size for new connections.
242    initial_target_connection_window_size: Option<u32>,
243}
244
245/// Send a response back to the client
246///
247/// A `SendResponse` instance is provided when receiving a request and is used
248/// to send the associated response back to the client. It is also used to
249/// explicitly reset the stream with a custom reason.
250///
251/// It will also be used to initiate push promises linked with the associated
252/// stream.
253///
254/// If the `SendResponse` instance is dropped without sending a response, then
255/// the HTTP/2.0 stream will be reset.
256///
257/// See [module] level docs for more details.
258///
259/// [module]: index.html
260#[derive(Debug)]
261pub struct SendResponse<B: Buf> {
262    inner: proto::StreamRef<B>,
263}
264
265/// Send a response to a promised request
266///
267/// A `SendPushedResponse` instance is provided when promising a request and is used
268/// to send the associated response to the client. It is also used to
269/// explicitly reset the stream with a custom reason.
270///
271/// It can not be used to initiate push promises.
272///
273/// If the `SendPushedResponse` instance is dropped without sending a response, then
274/// the HTTP/2.0 stream will be reset.
275///
276/// See [module] level docs for more details.
277///
278/// [module]: index.html
279pub struct SendPushedResponse<B: Buf> {
280    inner: SendResponse<B>,
281}
282
283// Manual implementation necessary because of rust-lang/rust#26925
284impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
285    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
286        write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
287    }
288}
289
290/// Stages of an in-progress handshake.
291enum Handshaking<T, B: Buf> {
292    /// State 1. Connection is flushing pending SETTINGS frame.
293    Flushing(Flush<T, Prioritized<B>>),
294    /// State 2. Connection is waiting for the client preface.
295    ReadingPreface(ReadPreface<T, Prioritized<B>>),
296    /// Dummy state for `mem::replace`.
297    Empty,
298}
299
300/// Flush a Sink
301struct Flush<T, B> {
302    codec: Option<Codec<T, B>>,
303}
304
305/// Read the client connection preface
306struct ReadPreface<T, B> {
307    codec: Option<Codec<T, B>>,
308    pos: usize,
309}
310
311#[derive(Debug)]
312pub(crate) struct Peer;
313
314const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
315
316/// Creates a new configured HTTP/2.0 server with default configuration
317/// values backed by `io`.
318///
319/// It is expected that `io` already be in an appropriate state to commence
320/// the [HTTP/2.0 handshake]. See [Handshake] for more details.
321///
322/// Returns a future which resolves to the [`Connection`] instance once the
323/// HTTP/2.0 handshake has been completed. The returned [`Connection`]
324/// instance will be using default configuration values. Use [`Builder`] to
325/// customize the configuration values used by a [`Connection`] instance.
326///
327/// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
328/// [Handshake]: ../index.html#handshake
329/// [`Connection`]: struct.Connection.html
330///
331/// # Examples
332///
333/// ```
334/// # use futures_io::{AsyncRead, AsyncWrite};
335/// # use h2::server;
336/// # use h2::server::*;
337/// #
338/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
339/// # {
340/// let connection = server::handshake(my_io).await.unwrap();
341/// // The HTTP/2.0 handshake has completed, now use `connection` to
342/// // accept inbound HTTP/2.0 streams.
343/// # }
344/// #
345/// # pub fn main() {}
346/// ```
347pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
348where
349    T: AsyncRead + AsyncWrite + Unpin,
350{
351    Builder::new().handshake(io)
352}
353
354// ===== impl Connection =====
355
356impl<T, B> Connection<T, B>
357where
358    T: AsyncRead + AsyncWrite + Unpin,
359    B: Buf + 'static,
360{
361    fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
362        // Create the codec.
363        let mut codec = Codec::new(io);
364
365        if let Some(max) = builder.settings.max_frame_size() {
366            codec.set_max_recv_frame_size(max as usize);
367        }
368
369        if let Some(max) = builder.settings.max_header_list_size() {
370            codec.set_max_recv_header_list_size(max as usize);
371        }
372
373        // Send initial settings frame.
374        codec
375            .buffer(builder.settings.clone().into())
376            .expect("invalid SETTINGS frame");
377
378        // Create the handshake future.
379        let state = Handshaking::from(codec);
380
381        Handshake { builder, state }
382    }
383
384    /// Accept the next incoming request on this connection.
385    pub async fn accept(
386        &mut self,
387    ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
388        futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
389    }
390
391    #[doc(hidden)]
392    pub fn poll_accept(
393        &mut self,
394        cx: &mut Context<'_>,
395    ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
396        // Always try to advance the internal state. Getting Pending also is
397        // needed to allow this function to return Pending.
398        if let Poll::Ready(_) = self.poll_closed(cx)? {
399            // If the socket is closed, don't return anything
400            // TODO: drop any pending streams
401            return Poll::Ready(None);
402        }
403
404        if let Some(inner) = self.connection.next_incoming() {
405            log::trace!("received incoming");
406            let (head, _) = inner.take_request().into_parts();
407            let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
408
409            let request = Request::from_parts(head, body);
410            let respond = SendResponse { inner };
411
412            return Poll::Ready(Some(Ok((request, respond))));
413        }
414
415        Poll::Pending
416    }
417
418    /// Sets the target window size for the whole connection.
419    ///
420    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
421    /// frame will be immediately sent to the remote, increasing the connection
422    /// level window by `size - current_value`.
423    ///
424    /// If `size` is less than the current value, nothing will happen
425    /// immediately. However, as window capacity is released by
426    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
427    /// out until the number of "in flight" bytes drops below `size`.
428    ///
429    /// The default value is 65,535.
430    ///
431    /// See [`FlowControl`] documentation for more details.
432    ///
433    /// [`FlowControl`]: ../struct.FlowControl.html
434    /// [library level]: ../index.html#flow-control
435    pub fn set_target_window_size(&mut self, size: u32) {
436        assert!(size <= proto::MAX_WINDOW_SIZE);
437        self.connection.set_target_window_size(size);
438    }
439
440    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
441    /// flow control for received data.
442    ///
443    /// The `SETTINGS` will be sent to the remote, and only applied once the
444    /// remote acknowledges the change.
445    ///
446    /// This can be used to increase or decrease the window size for existing
447    /// streams.
448    ///
449    /// # Errors
450    ///
451    /// Returns an error if a previous call is still pending acknowledgement
452    /// from the remote endpoint.
453    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
454        assert!(size <= proto::MAX_WINDOW_SIZE);
455        self.connection.set_initial_window_size(size)?;
456        Ok(())
457    }
458
459    /// Returns `Ready` when the underlying connection has closed.
460    ///
461    /// If any new inbound streams are received during a call to `poll_closed`,
462    /// they will be queued and returned on the next call to [`poll_accept`].
463    ///
464    /// This function will advance the internal connection state, driving
465    /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
466    ///
467    /// See [here](index.html#managing-the-connection) for more details.
468    ///
469    /// [`poll_accept`]: struct.Connection.html#method.poll_accept
470    /// [`RecvStream`]: ../struct.RecvStream.html
471    /// [`SendStream`]: ../struct.SendStream.html
472    pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
473        self.connection.poll(cx).map_err(Into::into)
474    }
475
476    #[doc(hidden)]
477    #[deprecated(note = "renamed to poll_closed")]
478    pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
479        self.poll_closed(cx)
480    }
481
482    /// Sets the connection to a GOAWAY state.
483    ///
484    /// Does not terminate the connection. Must continue being polled to close
485    /// connection.
486    ///
487    /// After flushing the GOAWAY frame, the connection is closed. Any
488    /// outstanding streams do not prevent the connection from closing. This
489    /// should usually be reserved for shutting down when something bad
490    /// external to `h2` has happened, and open streams cannot be properly
491    /// handled.
492    ///
493    /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
494    pub fn abrupt_shutdown(&mut self, reason: Reason) {
495        self.connection.go_away_from_user(reason);
496    }
497
498    /// Starts a [graceful shutdown][1] process.
499    ///
500    /// Must continue being polled to close connection.
501    ///
502    /// It's possible to receive more requests after calling this method, since
503    /// they might have been in-flight from the client already. After about
504    /// 1 RTT, no new requests should be accepted. Once all active streams
505    /// have completed, the connection is closed.
506    ///
507    /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
508    pub fn graceful_shutdown(&mut self) {
509        self.connection.go_away_gracefully();
510    }
511
512    /// Takes a `PingPong` instance from the connection.
513    ///
514    /// # Note
515    ///
516    /// This may only be called once. Calling multiple times will return `None`.
517    pub fn ping_pong(&mut self) -> Option<PingPong> {
518        self.connection.take_user_pings().map(PingPong::new)
519    }
520}
521
522#[cfg(feature = "stream")]
523impl<T, B> futures_core::Stream for Connection<T, B>
524where
525    T: AsyncRead + AsyncWrite + Unpin,
526    B: Buf + 'static,
527{
528    type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
529
530    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
531        self.poll_accept(cx)
532    }
533}
534
535impl<T, B> fmt::Debug for Connection<T, B>
536where
537    T: fmt::Debug,
538    B: fmt::Debug + Buf,
539{
540    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
541        fmt.debug_struct("Connection")
542            .field("connection", &self.connection)
543            .finish()
544    }
545}
546
547// ===== impl Builder =====
548
549impl Builder {
550    /// Returns a new server builder instance initialized with default
551    /// configuration values.
552    ///
553    /// Configuration methods can be chained on the return value.
554    ///
555    /// # Examples
556    ///
557    /// ```
558    /// # use futures_io::{AsyncRead, AsyncWrite};
559    /// # use h2::server::*;
560    /// #
561    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
562    /// # -> Handshake<T>
563    /// # {
564    /// // `server_fut` is a future representing the completion of the HTTP/2.0
565    /// // handshake.
566    /// let server_fut = Builder::new()
567    ///     .initial_window_size(1_000_000)
568    ///     .max_concurrent_streams(1000)
569    ///     .handshake(my_io);
570    /// # server_fut
571    /// # }
572    /// #
573    /// # pub fn main() {}
574    /// ```
575    pub fn new() -> Builder {
576        Builder {
577            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
578            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
579            settings: Settings::default(),
580            initial_target_connection_window_size: None,
581        }
582    }
583
584    /// Indicates the initial window size (in octets) for stream-level
585    /// flow control for received data.
586    ///
587    /// The initial window of a stream is used as part of flow control. For more
588    /// details, see [`FlowControl`].
589    ///
590    /// The default value is 65,535.
591    ///
592    /// [`FlowControl`]: ../struct.FlowControl.html
593    ///
594    /// # Examples
595    ///
596    /// ```
597    /// # use futures_io::{AsyncRead, AsyncWrite};
598    /// # use h2::server::*;
599    /// #
600    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
601    /// # -> Handshake<T>
602    /// # {
603    /// // `server_fut` is a future representing the completion of the HTTP/2.0
604    /// // handshake.
605    /// let server_fut = Builder::new()
606    ///     .initial_window_size(1_000_000)
607    ///     .handshake(my_io);
608    /// # server_fut
609    /// # }
610    /// #
611    /// # pub fn main() {}
612    /// ```
613    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
614        self.settings.set_initial_window_size(Some(size));
615        self
616    }
617
618    /// Indicates the initial window size (in octets) for connection-level flow control
619    /// for received data.
620    ///
621    /// The initial window of a connection is used as part of flow control. For more details,
622    /// see [`FlowControl`].
623    ///
624    /// The default value is 65,535.
625    ///
626    /// [`FlowControl`]: ../struct.FlowControl.html
627    ///
628    /// # Examples
629    ///
630    /// ```
631    /// # use futures_io::{AsyncRead, AsyncWrite};
632    /// # use h2::server::*;
633    /// #
634    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
635    /// # -> Handshake<T>
636    /// # {
637    /// // `server_fut` is a future representing the completion of the HTTP/2.0
638    /// // handshake.
639    /// let server_fut = Builder::new()
640    ///     .initial_connection_window_size(1_000_000)
641    ///     .handshake(my_io);
642    /// # server_fut
643    /// # }
644    /// #
645    /// # pub fn main() {}
646    /// ```
647    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
648        self.initial_target_connection_window_size = Some(size);
649        self
650    }
651
652    /// Indicates the size (in octets) of the largest HTTP/2.0 frame payload that the
653    /// configured server is able to accept.
654    ///
655    /// The sender may send data frames that are **smaller** than this value,
656    /// but any data larger than `max` will be broken up into multiple `DATA`
657    /// frames.
658    ///
659    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
660    ///
661    /// # Examples
662    ///
663    /// ```
664    /// # use futures_io::{AsyncRead, AsyncWrite};
665    /// # use h2::server::*;
666    /// #
667    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
668    /// # -> Handshake<T>
669    /// # {
670    /// // `server_fut` is a future representing the completion of the HTTP/2.0
671    /// // handshake.
672    /// let server_fut = Builder::new()
673    ///     .max_frame_size(1_000_000)
674    ///     .handshake(my_io);
675    /// # server_fut
676    /// # }
677    /// #
678    /// # pub fn main() {}
679    /// ```
680    ///
681    /// # Panics
682    ///
683    /// This function panics if `max` is not within the legal range specified
684    /// above.
685    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
686        self.settings.set_max_frame_size(Some(max));
687        self
688    }
689
690    /// Sets the max size of received header frames.
691    ///
692    /// This advisory setting informs a peer of the maximum size of header list
693    /// that the sender is prepared to accept, in octets. The value is based on
694    /// the uncompressed size of header fields, including the length of the name
695    /// and value in octets plus an overhead of 32 octets for each header field.
696    ///
697    /// This setting is also used to limit the maximum amount of data that is
698    /// buffered to decode HEADERS frames.
699    ///
700    /// # Examples
701    ///
702    /// ```
703    /// # use futures_io::{AsyncRead, AsyncWrite};
704    /// # use h2::server::*;
705    /// #
706    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
707    /// # -> Handshake<T>
708    /// # {
709    /// // `server_fut` is a future representing the completion of the HTTP/2.0
710    /// // handshake.
711    /// let server_fut = Builder::new()
712    ///     .max_header_list_size(16 * 1024)
713    ///     .handshake(my_io);
714    /// # server_fut
715    /// # }
716    /// #
717    /// # pub fn main() {}
718    /// ```
719    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
720        self.settings.set_max_header_list_size(Some(max));
721        self
722    }
723
724    /// Sets the maximum number of concurrent streams.
725    ///
726    /// The maximum concurrent streams setting only controls the maximum number
727    /// of streams that can be initiated by the remote peer. In other words,
728    /// when this setting is set to 100, this does not limit the number of
729    /// concurrent streams that can be created by the caller.
730    ///
731    /// It is recommended that this value be no smaller than 100, so as to not
732    /// unnecessarily limit parallelism. However, any value is legal, including
733    /// 0. If `max` is set to 0, then the remote will not be permitted to
734    /// initiate streams.
735    ///
736    /// Note that streams in the reserved state, i.e., push promises that have
737    /// been reserved but the stream has not started, do not count against this
738    /// setting.
739    ///
740    /// Also note that if the remote *does* exceed the value set here, it is not
741    /// a protocol level error. Instead, the `h2` library will immediately reset
742    /// the stream.
743    ///
744    /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
745    ///
746    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
747    ///
748    /// # Examples
749    ///
750    /// ```
751    /// # use futures_io::{AsyncRead, AsyncWrite};
752    /// # use h2::server::*;
753    /// #
754    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
755    /// # -> Handshake<T>
756    /// # {
757    /// // `server_fut` is a future representing the completion of the HTTP/2.0
758    /// // handshake.
759    /// let server_fut = Builder::new()
760    ///     .max_concurrent_streams(1000)
761    ///     .handshake(my_io);
762    /// # server_fut
763    /// # }
764    /// #
765    /// # pub fn main() {}
766    /// ```
767    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
768        self.settings.set_max_concurrent_streams(Some(max));
769        self
770    }
771
772    /// Sets the maximum number of concurrent locally reset streams.
773    ///
774    /// When a stream is explicitly reset by either calling
775    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
776    /// before completing the stream, the HTTP/2.0 specification requires that
777    /// any further frames received for that stream must be ignored for "some
778    /// time".
779    ///
780    /// In order to satisfy the specification, internal state must be maintained
781    /// to implement the behavior. This state grows linearly with the number of
782    /// streams that are locally reset.
783    ///
784    /// The `max_concurrent_reset_streams` setting configures sets an upper
785    /// bound on the amount of state that is maintained. When this max value is
786    /// reached, the oldest reset stream is purged from memory.
787    ///
788    /// Once the stream has been fully purged from memory, any additional frames
789    /// received for that stream will result in a connection level protocol
790    /// error, forcing the connection to terminate.
791    ///
792    /// The default value is 10.
793    ///
794    /// # Examples
795    ///
796    /// ```
797    /// # use futures_io::{AsyncRead, AsyncWrite};
798    /// # use h2::server::*;
799    /// #
800    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
801    /// # -> Handshake<T>
802    /// # {
803    /// // `server_fut` is a future representing the completion of the HTTP/2.0
804    /// // handshake.
805    /// let server_fut = Builder::new()
806    ///     .max_concurrent_reset_streams(1000)
807    ///     .handshake(my_io);
808    /// # server_fut
809    /// # }
810    /// #
811    /// # pub fn main() {}
812    /// ```
813    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
814        self.reset_stream_max = max;
815        self
816    }
817
818    /// Sets the maximum number of concurrent locally reset streams.
819    ///
820    /// When a stream is explicitly reset by either calling
821    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
822    /// before completing the stream, the HTTP/2.0 specification requires that
823    /// any further frames received for that stream must be ignored for "some
824    /// time".
825    ///
826    /// In order to satisfy the specification, internal state must be maintained
827    /// to implement the behavior. This state grows linearly with the number of
828    /// streams that are locally reset.
829    ///
830    /// The `reset_stream_duration` setting configures the max amount of time
831    /// this state will be maintained in memory. Once the duration elapses, the
832    /// stream state is purged from memory.
833    ///
834    /// Once the stream has been fully purged from memory, any additional frames
835    /// received for that stream will result in a connection level protocol
836    /// error, forcing the connection to terminate.
837    ///
838    /// The default value is 30 seconds.
839    ///
840    /// # Examples
841    ///
842    /// ```
843    /// # use futures_io::{AsyncRead, AsyncWrite};
844    /// # use h2::server::*;
845    /// # use std::time::Duration;
846    /// #
847    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
848    /// # -> Handshake<T>
849    /// # {
850    /// // `server_fut` is a future representing the completion of the HTTP/2.0
851    /// // handshake.
852    /// let server_fut = Builder::new()
853    ///     .reset_stream_duration(Duration::from_secs(10))
854    ///     .handshake(my_io);
855    /// # server_fut
856    /// # }
857    /// #
858    /// # pub fn main() {}
859    /// ```
860    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
861        self.reset_stream_duration = dur;
862        self
863    }
864
865    /// Creates a new configured HTTP/2.0 server backed by `io`.
866    ///
867    /// It is expected that `io` already be in an appropriate state to commence
868    /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
869    ///
870    /// Returns a future which resolves to the [`Connection`] instance once the
871    /// HTTP/2.0 handshake has been completed.
872    ///
873    /// This function also allows the caller to configure the send payload data
874    /// type. See [Outbound data type] for more details.
875    ///
876    /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
877    /// [Handshake]: ../index.html#handshake
878    /// [`Connection`]: struct.Connection.html
879    /// [Outbound data type]: ../index.html#outbound-data-type.
880    ///
881    /// # Examples
882    ///
883    /// Basic usage:
884    ///
885    /// ```
886    /// # use futures_io::{AsyncRead, AsyncWrite};
887    /// # use h2::server::*;
888    /// #
889    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
890    /// # -> Handshake<T>
891    /// # {
892    /// // `server_fut` is a future representing the completion of the HTTP/2.0
893    /// // handshake.
894    /// let server_fut = Builder::new()
895    ///     .handshake(my_io);
896    /// # server_fut
897    /// # }
898    /// #
899    /// # pub fn main() {}
900    /// ```
901    ///
902    /// Configures the send-payload data type. In this case, the outbound data
903    /// type will be `&'static [u8]`.
904    ///
905    /// ```
906    /// # use futures_io::{AsyncRead, AsyncWrite};
907    /// # use h2::server::*;
908    /// #
909    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
910    /// # -> Handshake<T, &'static [u8]>
911    /// # {
912    /// // `server_fut` is a future representing the completion of the HTTP/2.0
913    /// // handshake.
914    /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
915    ///     .handshake(my_io);
916    /// # server_fut
917    /// # }
918    /// #
919    /// # pub fn main() {}
920    /// ```
921    pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
922    where
923        T: AsyncRead + AsyncWrite + Unpin,
924        B: Buf + 'static,
925    {
926        Connection::handshake2(io, self.clone())
927    }
928}
929
930impl Default for Builder {
931    fn default() -> Builder {
932        Builder::new()
933    }
934}
935
936// ===== impl SendResponse =====
937
938impl<B: Buf> SendResponse<B> {
939    /// Send a response to a client request.
940    ///
941    /// On success, a [`SendStream`] instance is returned. This instance can be
942    /// used to stream the response body and send trailers.
943    ///
944    /// If a body or trailers will be sent on the returned [`SendStream`]
945    /// instance, then `end_of_stream` must be set to `false` when calling this
946    /// function.
947    ///
948    /// The [`SendResponse`] instance is already associated with a received
949    /// request.  This function may only be called once per instance and only if
950    /// [`send_reset`] has not been previously called.
951    ///
952    /// [`SendResponse`]: #
953    /// [`SendStream`]: ../struct.SendStream.html
954    /// [`send_reset`]: #method.send_reset
955    pub fn send_response(
956        &mut self,
957        response: Response<()>,
958        end_of_stream: bool,
959    ) -> Result<SendStream<B>, crate::Error> {
960        self.inner
961            .send_response(response, end_of_stream)
962            .map(|_| SendStream::new(self.inner.clone()))
963            .map_err(Into::into)
964    }
965
966    /// Push a request and response to the client
967    ///
968    /// On success, a [`SendResponse`] instance is returned.
969    ///
970    /// [`SendResponse`]: #
971    pub fn push_request(
972        &mut self,
973        request: Request<()>,
974    ) -> Result<SendPushedResponse<B>, crate::Error> {
975        self.inner
976            .send_push_promise(request)
977            .map(|inner| SendPushedResponse {
978                inner: SendResponse { inner },
979            })
980            .map_err(Into::into)
981    }
982
983    /// Send a stream reset to the peer.
984    ///
985    /// This essentially cancels the stream, including any inbound or outbound
986    /// data streams.
987    ///
988    /// If this function is called before [`send_response`], a call to
989    /// [`send_response`] will result in an error.
990    ///
991    /// If this function is called while a [`SendStream`] instance is active,
992    /// any further use of the instance will result in an error.
993    ///
994    /// This function should only be called once.
995    ///
996    /// [`send_response`]: #method.send_response
997    /// [`SendStream`]: ../struct.SendStream.html
998    pub fn send_reset(&mut self, reason: Reason) {
999        self.inner.send_reset(reason)
1000    }
1001
1002    /// Polls to be notified when the client resets this stream.
1003    ///
1004    /// If stream is still open, this returns `Poll::Pending`, and
1005    /// registers the task to be notified if a `RST_STREAM` is received.
1006    ///
1007    /// If a `RST_STREAM` frame is received for this stream, calling this
1008    /// method will yield the `Reason` for the reset.
1009    ///
1010    /// # Error
1011    ///
1012    /// Calling this method after having called `send_response` will return
1013    /// a user error.
1014    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1015        self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1016    }
1017
1018    /// Returns the stream ID of the response stream.
1019    ///
1020    /// # Panics
1021    ///
1022    /// If the lock on the strean store has been poisoned.
1023    pub fn stream_id(&self) -> crate::StreamId {
1024        crate::StreamId::from_internal(self.inner.stream_id())
1025    }
1026}
1027
1028// ===== impl SendPushedResponse =====
1029
1030impl<B: Buf> SendPushedResponse<B> {
1031    /// Send a response to a promised request.
1032    ///
1033    /// On success, a [`SendStream`] instance is returned. This instance can be
1034    /// used to stream the response body and send trailers.
1035    ///
1036    /// If a body or trailers will be sent on the returned [`SendStream`]
1037    /// instance, then `end_of_stream` must be set to `false` when calling this
1038    /// function.
1039    ///
1040    /// The [`SendPushedResponse`] instance is associated with a promised
1041    /// request.  This function may only be called once per instance and only if
1042    /// [`send_reset`] has not been previously called.
1043    ///
1044    /// [`SendPushedResponse`]: #
1045    /// [`SendStream`]: ../struct.SendStream.html
1046    /// [`send_reset`]: #method.send_reset
1047    pub fn send_response(
1048        &mut self,
1049        response: Response<()>,
1050        end_of_stream: bool,
1051    ) -> Result<SendStream<B>, crate::Error> {
1052        self.inner.send_response(response, end_of_stream)
1053    }
1054
1055    /// Send a stream reset to the peer.
1056    ///
1057    /// This essentially cancels the stream, including any inbound or outbound
1058    /// data streams.
1059    ///
1060    /// If this function is called before [`send_response`], a call to
1061    /// [`send_response`] will result in an error.
1062    ///
1063    /// If this function is called while a [`SendStream`] instance is active,
1064    /// any further use of the instance will result in an error.
1065    ///
1066    /// This function should only be called once.
1067    ///
1068    /// [`send_response`]: #method.send_response
1069    /// [`SendStream`]: ../struct.SendStream.html
1070    pub fn send_reset(&mut self, reason: Reason) {
1071        self.inner.send_reset(reason)
1072    }
1073
1074    /// Polls to be notified when the client resets this stream.
1075    ///
1076    /// If stream is still open, this returns `Poll::Pending`, and
1077    /// registers the task to be notified if a `RST_STREAM` is received.
1078    ///
1079    /// If a `RST_STREAM` frame is received for this stream, calling this
1080    /// method will yield the `Reason` for the reset.
1081    ///
1082    /// # Error
1083    ///
1084    /// Calling this method after having called `send_response` will return
1085    /// a user error.
1086    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1087        self.inner.poll_reset(cx)
1088    }
1089
1090    /// Returns the stream ID of the response stream.
1091    ///
1092    /// # Panics
1093    ///
1094    /// If the lock on the strean store has been poisoned.
1095    pub fn stream_id(&self) -> crate::StreamId {
1096        self.inner.stream_id()
1097    }
1098}
1099
1100// ===== impl Flush =====
1101
1102impl<T, B: Buf> Flush<T, B> {
1103    fn new(codec: Codec<T, B>) -> Self {
1104        Flush { codec: Some(codec) }
1105    }
1106}
1107
1108impl<T, B> Future for Flush<T, B>
1109where
1110    T: AsyncWrite + Unpin,
1111    B: Buf,
1112{
1113    type Output = Result<Codec<T, B>, crate::Error>;
1114
1115    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1116        // Flush the codec
1117        ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1118
1119        // Return the codec
1120        Poll::Ready(Ok(self.codec.take().unwrap()))
1121    }
1122}
1123
1124impl<T, B: Buf> ReadPreface<T, B> {
1125    fn new(codec: Codec<T, B>) -> Self {
1126        ReadPreface {
1127            codec: Some(codec),
1128            pos: 0,
1129        }
1130    }
1131
1132    fn inner_mut(&mut self) -> &mut T {
1133        self.codec.as_mut().unwrap().get_mut()
1134    }
1135}
1136
1137impl<T, B> Future for ReadPreface<T, B>
1138where
1139    T: AsyncRead + Unpin,
1140    B: Buf,
1141{
1142    type Output = Result<Codec<T, B>, crate::Error>;
1143
1144    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1145        let mut buf = [0; 24];
1146        let mut rem = PREFACE.len() - self.pos;
1147
1148        while rem > 0 {
1149            let n = ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf[..rem]))
1150                .map_err(crate::Error::from_io)?;
1151            if n == 0 {
1152                return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1153                    io::ErrorKind::UnexpectedEof,
1154                    "connection closed before reading preface",
1155                ))));
1156            }
1157
1158            if PREFACE[self.pos..self.pos + n] != buf[..n] {
1159                proto_err!(conn: "read_preface: invalid preface");
1160                // TODO: Should this just write the GO_AWAY frame directly?
1161                return Poll::Ready(Err(Reason::PROTOCOL_ERROR.into()));
1162            }
1163
1164            self.pos += n;
1165            rem -= n; // TODO test
1166        }
1167
1168        Poll::Ready(Ok(self.codec.take().unwrap()))
1169    }
1170}
1171
1172// ===== impl Handshake =====
1173
1174impl<T, B: Buf> Future for Handshake<T, B>
1175where
1176    T: AsyncRead + AsyncWrite + Unpin,
1177    B: Buf + 'static,
1178{
1179    type Output = Result<Connection<T, B>, crate::Error>;
1180
1181    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1182        log::trace!("Handshake::poll(); state={:?};", self.state);
1183        use crate::server::Handshaking::*;
1184
1185        self.state = if let Flushing(ref mut flush) = self.state {
1186            // We're currently flushing a pending SETTINGS frame. Poll the
1187            // flush future, and, if it's completed, advance our state to wait
1188            // for the client preface.
1189            let codec = match Pin::new(flush).poll(cx)? {
1190                Poll::Pending => {
1191                    log::trace!("Handshake::poll(); flush.poll()=Pending");
1192                    return Poll::Pending;
1193                }
1194                Poll::Ready(flushed) => {
1195                    log::trace!("Handshake::poll(); flush.poll()=Ready");
1196                    flushed
1197                }
1198            };
1199            Handshaking::from(ReadPreface::new(codec))
1200        } else {
1201            // Otherwise, we haven't actually advanced the state, but we have
1202            // to replace it with itself, because we have to return a value.
1203            // (note that the assignment to `self.state` has to be outside of
1204            // the `if let` block above in order to placate the borrow checker).
1205            mem::replace(&mut self.state, Handshaking::Empty)
1206        };
1207        let poll = if let ReadingPreface(ref mut read) = self.state {
1208            // We're now waiting for the client preface. Poll the `ReadPreface`
1209            // future. If it has completed, we will create a `Connection` handle
1210            // for the connection.
1211            Pin::new(read).poll(cx)
1212        // Actually creating the `Connection` has to occur outside of this
1213        // `if let` block, because we've borrowed `self` mutably in order
1214        // to poll the state and won't be able to borrow the SETTINGS frame
1215        // as well until we release the borrow for `poll()`.
1216        } else {
1217            unreachable!("Handshake::poll() state was not advanced completely!")
1218        };
1219        poll?.map(|codec| {
1220            let connection = proto::Connection::new(
1221                codec,
1222                Config {
1223                    next_stream_id: 2.into(),
1224                    // Server does not need to locally initiate any streams
1225                    initial_max_send_streams: 0,
1226                    reset_stream_duration: self.builder.reset_stream_duration,
1227                    reset_stream_max: self.builder.reset_stream_max,
1228                    settings: self.builder.settings.clone(),
1229                },
1230            );
1231
1232            log::trace!("Handshake::poll(); connection established!");
1233            let mut c = Connection { connection };
1234            if let Some(sz) = self.builder.initial_target_connection_window_size {
1235                c.set_target_window_size(sz);
1236            }
1237            Ok(c)
1238        })
1239    }
1240}
1241
1242impl<T, B> fmt::Debug for Handshake<T, B>
1243where
1244    T: AsyncRead + AsyncWrite + fmt::Debug,
1245    B: fmt::Debug + Buf,
1246{
1247    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1248        write!(fmt, "server::Handshake")
1249    }
1250}
1251
1252impl Peer {
1253    pub fn convert_send_message(
1254        id: StreamId,
1255        response: Response<()>,
1256        end_of_stream: bool,
1257    ) -> frame::Headers {
1258        use http::response::Parts;
1259
1260        // Extract the components of the HTTP request
1261        let (
1262            Parts {
1263                status, headers, ..
1264            },
1265            _,
1266        ) = response.into_parts();
1267
1268        // Build the set pseudo header set. All requests will include `method`
1269        // and `path`.
1270        let pseudo = Pseudo::response(status);
1271
1272        // Create the HEADERS frame
1273        let mut frame = frame::Headers::new(id, pseudo, headers);
1274
1275        if end_of_stream {
1276            frame.set_end_stream()
1277        }
1278
1279        frame
1280    }
1281
1282    pub fn convert_push_message(
1283        stream_id: StreamId,
1284        promised_id: StreamId,
1285        request: Request<()>,
1286    ) -> Result<frame::PushPromise, UserError> {
1287        use http::request::Parts;
1288
1289        if let Err(e) = frame::PushPromise::validate_request(&request) {
1290            use PushPromiseHeaderError::*;
1291            match e {
1292                NotSafeAndCacheable => log::debug!(
1293                    "convert_push_message: method {} is not safe and cacheable",
1294                    request.method(),
1295                ),
1296                InvalidContentLength(e) => log::debug!(
1297                    "convert_push_message; promised request has invalid content-length {:?}",
1298                    e,
1299                ),
1300            }
1301            return Err(UserError::MalformedHeaders);
1302        }
1303
1304        // Extract the components of the HTTP request
1305        let (
1306            Parts {
1307                method,
1308                uri,
1309                headers,
1310                ..
1311            },
1312            _,
1313        ) = request.into_parts();
1314
1315        let pseudo = Pseudo::request(method, uri);
1316
1317        Ok(frame::PushPromise::new(
1318            stream_id,
1319            promised_id,
1320            pseudo,
1321            headers,
1322        ))
1323    }
1324}
1325
1326impl proto::Peer for Peer {
1327    type Poll = Request<()>;
1328
1329    const NAME: &'static str = "Server";
1330
1331    fn is_server() -> bool {
1332        true
1333    }
1334
1335    fn r#dyn() -> proto::DynPeer {
1336        proto::DynPeer::Server
1337    }
1338
1339    fn convert_poll_message(
1340        pseudo: Pseudo,
1341        fields: HeaderMap,
1342        stream_id: StreamId,
1343    ) -> Result<Self::Poll, RecvError> {
1344        use http::{uri, Version};
1345
1346        let mut b = Request::builder();
1347
1348        macro_rules! malformed {
1349            ($($arg:tt)*) => {{
1350                log::debug!($($arg)*);
1351                return Err(RecvError::Stream {
1352                    id: stream_id,
1353                    reason: Reason::PROTOCOL_ERROR,
1354                });
1355            }}
1356        };
1357
1358        b = b.version(Version::HTTP_2);
1359
1360        let is_connect;
1361        if let Some(method) = pseudo.method {
1362            is_connect = method == Method::CONNECT;
1363            b = b.method(method);
1364        } else {
1365            malformed!("malformed headers: missing method");
1366        }
1367
1368        // Specifying :status for a request is a protocol error
1369        if pseudo.status.is_some() {
1370            log::trace!("malformed headers: :status field on request; PROTOCOL_ERROR");
1371            return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
1372        }
1373
1374        // Convert the URI
1375        let mut parts = uri::Parts::default();
1376
1377        // A request translated from HTTP/1 must not include the :authority
1378        // header
1379        if let Some(authority) = pseudo.authority {
1380            let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1381            parts.authority = Some(maybe_authority.or_else(|why| {
1382                malformed!(
1383                    "malformed headers: malformed authority ({:?}): {}",
1384                    authority,
1385                    why,
1386                )
1387            })?);
1388        }
1389
1390        // A :scheme is required, except CONNECT.
1391        if let Some(scheme) = pseudo.scheme {
1392            if is_connect {
1393                malformed!(":scheme in CONNECT");
1394            }
1395            let maybe_scheme = scheme.parse();
1396            let scheme = maybe_scheme.or_else(|why| {
1397                malformed!(
1398                    "malformed headers: malformed scheme ({:?}): {}",
1399                    scheme,
1400                    why,
1401                )
1402            })?;
1403
1404            // It's not possible to build an `Uri` from a scheme and path. So,
1405            // after validating is was a valid scheme, we just have to drop it
1406            // if there isn't an :authority.
1407            if parts.authority.is_some() {
1408                parts.scheme = Some(scheme);
1409            }
1410        } else if !is_connect {
1411            malformed!("malformed headers: missing scheme");
1412        }
1413
1414        if let Some(path) = pseudo.path {
1415            if is_connect {
1416                malformed!(":path in CONNECT");
1417            }
1418
1419            // This cannot be empty
1420            if path.is_empty() {
1421                malformed!("malformed headers: missing path");
1422            }
1423
1424            let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1425            parts.path_and_query = Some(maybe_path.or_else(|why| {
1426                malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1427            })?);
1428        }
1429
1430        b = b.uri(parts);
1431
1432        let mut request = match b.body(()) {
1433            Ok(request) => request,
1434            Err(e) => {
1435                // TODO: Should there be more specialized handling for different
1436                // kinds of errors
1437                proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1438                return Err(RecvError::Stream {
1439                    id: stream_id,
1440                    reason: Reason::PROTOCOL_ERROR,
1441                });
1442            }
1443        };
1444
1445        *request.headers_mut() = fields;
1446
1447        Ok(request)
1448    }
1449}
1450
1451// ===== impl Handshaking =====
1452
1453impl<T, B> fmt::Debug for Handshaking<T, B>
1454where
1455    B: Buf,
1456{
1457    #[inline]
1458    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1459        match *self {
1460            Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"),
1461            Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"),
1462            Handshaking::Empty => write!(f, "Handshaking::Empty"),
1463        }
1464    }
1465}
1466
1467impl<T, B> convert::From<Flush<T, Prioritized<B>>> for Handshaking<T, B>
1468where
1469    T: AsyncRead + AsyncWrite,
1470    B: Buf,
1471{
1472    #[inline]
1473    fn from(flush: Flush<T, Prioritized<B>>) -> Self {
1474        Handshaking::Flushing(flush)
1475    }
1476}
1477
1478impl<T, B> convert::From<ReadPreface<T, Prioritized<B>>> for Handshaking<T, B>
1479where
1480    T: AsyncRead + AsyncWrite,
1481    B: Buf,
1482{
1483    #[inline]
1484    fn from(read: ReadPreface<T, Prioritized<B>>) -> Self {
1485        Handshaking::ReadingPreface(read)
1486    }
1487}
1488
1489impl<T, B> convert::From<Codec<T, Prioritized<B>>> for Handshaking<T, B>
1490where
1491    T: AsyncRead + AsyncWrite,
1492    B: Buf,
1493{
1494    #[inline]
1495    fn from(codec: Codec<T, Prioritized<B>>) -> Self {
1496        Handshaking::from(Flush::new(codec))
1497    }
1498}