hreq_h2/server.rs
1//! Server implementation of the HTTP/2.0 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2.0 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2.0 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2.0 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2.0 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2.0 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/h2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2.0 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2.0 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use h2::server;
68//! use http::{Response, StatusCode};
69//! use tokio::net::TcpListener;
70//!
71//! #[tokio::main]
72//! pub async fn main() {
73//! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//! // Accept all incoming TCP connections.
76//! loop {
77//! if let Ok((socket, _peer_addr)) = listener.accept().await {
78//! // Spawn a new task to process each connection.
79//! tokio::spawn(async {
80//! // Start the HTTP/2.0 connection handshake
81//! let mut h2 = server::handshake(socket).await.unwrap();
82//! // Accept all inbound HTTP/2.0 streams sent over the
83//! // connection.
84//! while let Some(request) = h2.accept().await {
85//! let (request, mut respond) = request.unwrap();
86//! println!("Received request: {:?}", request);
87//!
88//! // Build a response with no body
89//! let response = Response::builder()
90//! .status(StatusCode::OK)
91//! .body(())
92//! .unwrap();
93//!
94//! // Send the response back to the client
95//! respond.send_response(response, true)
96//! .unwrap();
97//! }
98//!
99//! });
100//! }
101//! }
102//! }
103//! ```
104//!
105//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106//! [`handshake`]: fn.handshake.html
107//! [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108//! [`Builder`]: struct.Builder.html
109//! [`Connection`]: struct.Connection.html
110//! [`Connection::poll`]: struct.Connection.html#method.poll
111//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114//! [`RecvStream`]: ../struct.RecvStream.html
115//! [`SendStream`]: ../struct.SendStream.html
116//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118use crate::codec::{Codec, RecvError, UserError};
119use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120use crate::proto::{self, Config, Prioritized};
121use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123use bytes::{Buf, Bytes};
124use futures_io::{AsyncRead, AsyncWrite};
125use http::{HeaderMap, Method, Request, Response};
126use std::future::Future;
127use std::pin::Pin;
128use std::task::{Context, Poll};
129use std::time::Duration;
130use std::{convert, fmt, io, mem};
131
132/// In progress HTTP/2.0 connection handshake future.
133///
134/// This type implements `Future`, yielding a `Connection` instance once the
135/// handshake has completed.
136///
137/// The handshake is completed once the connection preface is fully received
138/// from the client **and** the initial settings frame is sent to the client.
139///
140/// The handshake future does not wait for the initial settings frame from the
141/// client.
142///
143/// See [module] level docs for more details.
144///
145/// [module]: index.html
146#[must_use = "futures do nothing unless polled"]
147pub struct Handshake<T, B: Buf = Bytes> {
148 /// The config to pass to Connection::new after handshake succeeds.
149 builder: Builder,
150 /// The current state of the handshake.
151 state: Handshaking<T, B>,
152}
153
154/// Accepts inbound HTTP/2.0 streams on a connection.
155///
156/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
157/// implements the HTTP/2.0 server logic for that connection. It is responsible
158/// for receiving inbound streams initiated by the client as well as driving the
159/// internal state forward.
160///
161/// `Connection` values are created by calling [`handshake`]. Once a
162/// `Connection` value is obtained, the caller must call [`poll`] or
163/// [`poll_close`] in order to drive the internal connection state forward.
164///
165/// See [module level] documentation for more details
166///
167/// [module level]: index.html
168/// [`handshake`]: struct.Connection.html#method.handshake
169/// [`poll`]: struct.Connection.html#method.poll
170/// [`poll_close`]: struct.Connection.html#method.poll_close
171///
172/// # Examples
173///
174/// ```
175/// # use futures_io::{AsyncRead, AsyncWrite};
176/// # use h2::server;
177/// # use h2::server::*;
178/// #
179/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
180/// let mut server = server::handshake(my_io).await.unwrap();
181/// while let Some(request) = server.accept().await {
182/// let (request, respond) = request.unwrap();
183/// // Process the request and send the response back to the client
184/// // using `respond`.
185/// }
186/// # }
187/// #
188/// # pub fn main() {}
189/// ```
190#[must_use = "streams do nothing unless polled"]
191pub struct Connection<T, B: Buf> {
192 connection: proto::Connection<T, Peer, B>,
193}
194
195/// Builds server connections with custom configuration values.
196///
197/// Methods can be chained in order to set the configuration values.
198///
199/// The server is constructed by calling [`handshake`] and passing the I/O
200/// handle that will back the HTTP/2.0 server.
201///
202/// New instances of `Builder` are obtained via [`Builder::new`].
203///
204/// See function level documentation for details on the various server
205/// configuration settings.
206///
207/// [`Builder::new`]: struct.Builder.html#method.new
208/// [`handshake`]: struct.Builder.html#method.handshake
209///
210/// # Examples
211///
212/// ```
213/// # use futures_io::{AsyncRead, AsyncWrite};
214/// # use h2::server::*;
215/// #
216/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
217/// # -> Handshake<T>
218/// # {
219/// // `server_fut` is a future representing the completion of the HTTP/2.0
220/// // handshake.
221/// let server_fut = Builder::new()
222/// .initial_window_size(1_000_000)
223/// .max_concurrent_streams(1000)
224/// .handshake(my_io);
225/// # server_fut
226/// # }
227/// #
228/// # pub fn main() {}
229/// ```
230#[derive(Clone, Debug)]
231pub struct Builder {
232 /// Time to keep locally reset streams around before reaping.
233 reset_stream_duration: Duration,
234
235 /// Maximum number of locally reset streams to keep at a time.
236 reset_stream_max: usize,
237
238 /// Initial `Settings` frame to send as part of the handshake.
239 settings: Settings,
240
241 /// Initial target window size for new connections.
242 initial_target_connection_window_size: Option<u32>,
243}
244
245/// Send a response back to the client
246///
247/// A `SendResponse` instance is provided when receiving a request and is used
248/// to send the associated response back to the client. It is also used to
249/// explicitly reset the stream with a custom reason.
250///
251/// It will also be used to initiate push promises linked with the associated
252/// stream.
253///
254/// If the `SendResponse` instance is dropped without sending a response, then
255/// the HTTP/2.0 stream will be reset.
256///
257/// See [module] level docs for more details.
258///
259/// [module]: index.html
260#[derive(Debug)]
261pub struct SendResponse<B: Buf> {
262 inner: proto::StreamRef<B>,
263}
264
265/// Send a response to a promised request
266///
267/// A `SendPushedResponse` instance is provided when promising a request and is used
268/// to send the associated response to the client. It is also used to
269/// explicitly reset the stream with a custom reason.
270///
271/// It can not be used to initiate push promises.
272///
273/// If the `SendPushedResponse` instance is dropped without sending a response, then
274/// the HTTP/2.0 stream will be reset.
275///
276/// See [module] level docs for more details.
277///
278/// [module]: index.html
279pub struct SendPushedResponse<B: Buf> {
280 inner: SendResponse<B>,
281}
282
283// Manual implementation necessary because of rust-lang/rust#26925
284impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
285 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
286 write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
287 }
288}
289
290/// Stages of an in-progress handshake.
291enum Handshaking<T, B: Buf> {
292 /// State 1. Connection is flushing pending SETTINGS frame.
293 Flushing(Flush<T, Prioritized<B>>),
294 /// State 2. Connection is waiting for the client preface.
295 ReadingPreface(ReadPreface<T, Prioritized<B>>),
296 /// Dummy state for `mem::replace`.
297 Empty,
298}
299
300/// Flush a Sink
301struct Flush<T, B> {
302 codec: Option<Codec<T, B>>,
303}
304
305/// Read the client connection preface
306struct ReadPreface<T, B> {
307 codec: Option<Codec<T, B>>,
308 pos: usize,
309}
310
311#[derive(Debug)]
312pub(crate) struct Peer;
313
314const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
315
316/// Creates a new configured HTTP/2.0 server with default configuration
317/// values backed by `io`.
318///
319/// It is expected that `io` already be in an appropriate state to commence
320/// the [HTTP/2.0 handshake]. See [Handshake] for more details.
321///
322/// Returns a future which resolves to the [`Connection`] instance once the
323/// HTTP/2.0 handshake has been completed. The returned [`Connection`]
324/// instance will be using default configuration values. Use [`Builder`] to
325/// customize the configuration values used by a [`Connection`] instance.
326///
327/// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
328/// [Handshake]: ../index.html#handshake
329/// [`Connection`]: struct.Connection.html
330///
331/// # Examples
332///
333/// ```
334/// # use futures_io::{AsyncRead, AsyncWrite};
335/// # use h2::server;
336/// # use h2::server::*;
337/// #
338/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
339/// # {
340/// let connection = server::handshake(my_io).await.unwrap();
341/// // The HTTP/2.0 handshake has completed, now use `connection` to
342/// // accept inbound HTTP/2.0 streams.
343/// # }
344/// #
345/// # pub fn main() {}
346/// ```
347pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
348where
349 T: AsyncRead + AsyncWrite + Unpin,
350{
351 Builder::new().handshake(io)
352}
353
354// ===== impl Connection =====
355
356impl<T, B> Connection<T, B>
357where
358 T: AsyncRead + AsyncWrite + Unpin,
359 B: Buf + 'static,
360{
361 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
362 // Create the codec.
363 let mut codec = Codec::new(io);
364
365 if let Some(max) = builder.settings.max_frame_size() {
366 codec.set_max_recv_frame_size(max as usize);
367 }
368
369 if let Some(max) = builder.settings.max_header_list_size() {
370 codec.set_max_recv_header_list_size(max as usize);
371 }
372
373 // Send initial settings frame.
374 codec
375 .buffer(builder.settings.clone().into())
376 .expect("invalid SETTINGS frame");
377
378 // Create the handshake future.
379 let state = Handshaking::from(codec);
380
381 Handshake { builder, state }
382 }
383
384 /// Accept the next incoming request on this connection.
385 pub async fn accept(
386 &mut self,
387 ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
388 futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
389 }
390
391 #[doc(hidden)]
392 pub fn poll_accept(
393 &mut self,
394 cx: &mut Context<'_>,
395 ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
396 // Always try to advance the internal state. Getting Pending also is
397 // needed to allow this function to return Pending.
398 if let Poll::Ready(_) = self.poll_closed(cx)? {
399 // If the socket is closed, don't return anything
400 // TODO: drop any pending streams
401 return Poll::Ready(None);
402 }
403
404 if let Some(inner) = self.connection.next_incoming() {
405 log::trace!("received incoming");
406 let (head, _) = inner.take_request().into_parts();
407 let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
408
409 let request = Request::from_parts(head, body);
410 let respond = SendResponse { inner };
411
412 return Poll::Ready(Some(Ok((request, respond))));
413 }
414
415 Poll::Pending
416 }
417
418 /// Sets the target window size for the whole connection.
419 ///
420 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
421 /// frame will be immediately sent to the remote, increasing the connection
422 /// level window by `size - current_value`.
423 ///
424 /// If `size` is less than the current value, nothing will happen
425 /// immediately. However, as window capacity is released by
426 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
427 /// out until the number of "in flight" bytes drops below `size`.
428 ///
429 /// The default value is 65,535.
430 ///
431 /// See [`FlowControl`] documentation for more details.
432 ///
433 /// [`FlowControl`]: ../struct.FlowControl.html
434 /// [library level]: ../index.html#flow-control
435 pub fn set_target_window_size(&mut self, size: u32) {
436 assert!(size <= proto::MAX_WINDOW_SIZE);
437 self.connection.set_target_window_size(size);
438 }
439
440 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
441 /// flow control for received data.
442 ///
443 /// The `SETTINGS` will be sent to the remote, and only applied once the
444 /// remote acknowledges the change.
445 ///
446 /// This can be used to increase or decrease the window size for existing
447 /// streams.
448 ///
449 /// # Errors
450 ///
451 /// Returns an error if a previous call is still pending acknowledgement
452 /// from the remote endpoint.
453 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
454 assert!(size <= proto::MAX_WINDOW_SIZE);
455 self.connection.set_initial_window_size(size)?;
456 Ok(())
457 }
458
459 /// Returns `Ready` when the underlying connection has closed.
460 ///
461 /// If any new inbound streams are received during a call to `poll_closed`,
462 /// they will be queued and returned on the next call to [`poll_accept`].
463 ///
464 /// This function will advance the internal connection state, driving
465 /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
466 ///
467 /// See [here](index.html#managing-the-connection) for more details.
468 ///
469 /// [`poll_accept`]: struct.Connection.html#method.poll_accept
470 /// [`RecvStream`]: ../struct.RecvStream.html
471 /// [`SendStream`]: ../struct.SendStream.html
472 pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
473 self.connection.poll(cx).map_err(Into::into)
474 }
475
476 #[doc(hidden)]
477 #[deprecated(note = "renamed to poll_closed")]
478 pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
479 self.poll_closed(cx)
480 }
481
482 /// Sets the connection to a GOAWAY state.
483 ///
484 /// Does not terminate the connection. Must continue being polled to close
485 /// connection.
486 ///
487 /// After flushing the GOAWAY frame, the connection is closed. Any
488 /// outstanding streams do not prevent the connection from closing. This
489 /// should usually be reserved for shutting down when something bad
490 /// external to `h2` has happened, and open streams cannot be properly
491 /// handled.
492 ///
493 /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
494 pub fn abrupt_shutdown(&mut self, reason: Reason) {
495 self.connection.go_away_from_user(reason);
496 }
497
498 /// Starts a [graceful shutdown][1] process.
499 ///
500 /// Must continue being polled to close connection.
501 ///
502 /// It's possible to receive more requests after calling this method, since
503 /// they might have been in-flight from the client already. After about
504 /// 1 RTT, no new requests should be accepted. Once all active streams
505 /// have completed, the connection is closed.
506 ///
507 /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
508 pub fn graceful_shutdown(&mut self) {
509 self.connection.go_away_gracefully();
510 }
511
512 /// Takes a `PingPong` instance from the connection.
513 ///
514 /// # Note
515 ///
516 /// This may only be called once. Calling multiple times will return `None`.
517 pub fn ping_pong(&mut self) -> Option<PingPong> {
518 self.connection.take_user_pings().map(PingPong::new)
519 }
520}
521
522#[cfg(feature = "stream")]
523impl<T, B> futures_core::Stream for Connection<T, B>
524where
525 T: AsyncRead + AsyncWrite + Unpin,
526 B: Buf + 'static,
527{
528 type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
529
530 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
531 self.poll_accept(cx)
532 }
533}
534
535impl<T, B> fmt::Debug for Connection<T, B>
536where
537 T: fmt::Debug,
538 B: fmt::Debug + Buf,
539{
540 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
541 fmt.debug_struct("Connection")
542 .field("connection", &self.connection)
543 .finish()
544 }
545}
546
547// ===== impl Builder =====
548
549impl Builder {
550 /// Returns a new server builder instance initialized with default
551 /// configuration values.
552 ///
553 /// Configuration methods can be chained on the return value.
554 ///
555 /// # Examples
556 ///
557 /// ```
558 /// # use futures_io::{AsyncRead, AsyncWrite};
559 /// # use h2::server::*;
560 /// #
561 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
562 /// # -> Handshake<T>
563 /// # {
564 /// // `server_fut` is a future representing the completion of the HTTP/2.0
565 /// // handshake.
566 /// let server_fut = Builder::new()
567 /// .initial_window_size(1_000_000)
568 /// .max_concurrent_streams(1000)
569 /// .handshake(my_io);
570 /// # server_fut
571 /// # }
572 /// #
573 /// # pub fn main() {}
574 /// ```
575 pub fn new() -> Builder {
576 Builder {
577 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
578 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
579 settings: Settings::default(),
580 initial_target_connection_window_size: None,
581 }
582 }
583
584 /// Indicates the initial window size (in octets) for stream-level
585 /// flow control for received data.
586 ///
587 /// The initial window of a stream is used as part of flow control. For more
588 /// details, see [`FlowControl`].
589 ///
590 /// The default value is 65,535.
591 ///
592 /// [`FlowControl`]: ../struct.FlowControl.html
593 ///
594 /// # Examples
595 ///
596 /// ```
597 /// # use futures_io::{AsyncRead, AsyncWrite};
598 /// # use h2::server::*;
599 /// #
600 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
601 /// # -> Handshake<T>
602 /// # {
603 /// // `server_fut` is a future representing the completion of the HTTP/2.0
604 /// // handshake.
605 /// let server_fut = Builder::new()
606 /// .initial_window_size(1_000_000)
607 /// .handshake(my_io);
608 /// # server_fut
609 /// # }
610 /// #
611 /// # pub fn main() {}
612 /// ```
613 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
614 self.settings.set_initial_window_size(Some(size));
615 self
616 }
617
618 /// Indicates the initial window size (in octets) for connection-level flow control
619 /// for received data.
620 ///
621 /// The initial window of a connection is used as part of flow control. For more details,
622 /// see [`FlowControl`].
623 ///
624 /// The default value is 65,535.
625 ///
626 /// [`FlowControl`]: ../struct.FlowControl.html
627 ///
628 /// # Examples
629 ///
630 /// ```
631 /// # use futures_io::{AsyncRead, AsyncWrite};
632 /// # use h2::server::*;
633 /// #
634 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
635 /// # -> Handshake<T>
636 /// # {
637 /// // `server_fut` is a future representing the completion of the HTTP/2.0
638 /// // handshake.
639 /// let server_fut = Builder::new()
640 /// .initial_connection_window_size(1_000_000)
641 /// .handshake(my_io);
642 /// # server_fut
643 /// # }
644 /// #
645 /// # pub fn main() {}
646 /// ```
647 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
648 self.initial_target_connection_window_size = Some(size);
649 self
650 }
651
652 /// Indicates the size (in octets) of the largest HTTP/2.0 frame payload that the
653 /// configured server is able to accept.
654 ///
655 /// The sender may send data frames that are **smaller** than this value,
656 /// but any data larger than `max` will be broken up into multiple `DATA`
657 /// frames.
658 ///
659 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
660 ///
661 /// # Examples
662 ///
663 /// ```
664 /// # use futures_io::{AsyncRead, AsyncWrite};
665 /// # use h2::server::*;
666 /// #
667 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
668 /// # -> Handshake<T>
669 /// # {
670 /// // `server_fut` is a future representing the completion of the HTTP/2.0
671 /// // handshake.
672 /// let server_fut = Builder::new()
673 /// .max_frame_size(1_000_000)
674 /// .handshake(my_io);
675 /// # server_fut
676 /// # }
677 /// #
678 /// # pub fn main() {}
679 /// ```
680 ///
681 /// # Panics
682 ///
683 /// This function panics if `max` is not within the legal range specified
684 /// above.
685 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
686 self.settings.set_max_frame_size(Some(max));
687 self
688 }
689
690 /// Sets the max size of received header frames.
691 ///
692 /// This advisory setting informs a peer of the maximum size of header list
693 /// that the sender is prepared to accept, in octets. The value is based on
694 /// the uncompressed size of header fields, including the length of the name
695 /// and value in octets plus an overhead of 32 octets for each header field.
696 ///
697 /// This setting is also used to limit the maximum amount of data that is
698 /// buffered to decode HEADERS frames.
699 ///
700 /// # Examples
701 ///
702 /// ```
703 /// # use futures_io::{AsyncRead, AsyncWrite};
704 /// # use h2::server::*;
705 /// #
706 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
707 /// # -> Handshake<T>
708 /// # {
709 /// // `server_fut` is a future representing the completion of the HTTP/2.0
710 /// // handshake.
711 /// let server_fut = Builder::new()
712 /// .max_header_list_size(16 * 1024)
713 /// .handshake(my_io);
714 /// # server_fut
715 /// # }
716 /// #
717 /// # pub fn main() {}
718 /// ```
719 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
720 self.settings.set_max_header_list_size(Some(max));
721 self
722 }
723
724 /// Sets the maximum number of concurrent streams.
725 ///
726 /// The maximum concurrent streams setting only controls the maximum number
727 /// of streams that can be initiated by the remote peer. In other words,
728 /// when this setting is set to 100, this does not limit the number of
729 /// concurrent streams that can be created by the caller.
730 ///
731 /// It is recommended that this value be no smaller than 100, so as to not
732 /// unnecessarily limit parallelism. However, any value is legal, including
733 /// 0. If `max` is set to 0, then the remote will not be permitted to
734 /// initiate streams.
735 ///
736 /// Note that streams in the reserved state, i.e., push promises that have
737 /// been reserved but the stream has not started, do not count against this
738 /// setting.
739 ///
740 /// Also note that if the remote *does* exceed the value set here, it is not
741 /// a protocol level error. Instead, the `h2` library will immediately reset
742 /// the stream.
743 ///
744 /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
745 ///
746 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
747 ///
748 /// # Examples
749 ///
750 /// ```
751 /// # use futures_io::{AsyncRead, AsyncWrite};
752 /// # use h2::server::*;
753 /// #
754 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
755 /// # -> Handshake<T>
756 /// # {
757 /// // `server_fut` is a future representing the completion of the HTTP/2.0
758 /// // handshake.
759 /// let server_fut = Builder::new()
760 /// .max_concurrent_streams(1000)
761 /// .handshake(my_io);
762 /// # server_fut
763 /// # }
764 /// #
765 /// # pub fn main() {}
766 /// ```
767 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
768 self.settings.set_max_concurrent_streams(Some(max));
769 self
770 }
771
772 /// Sets the maximum number of concurrent locally reset streams.
773 ///
774 /// When a stream is explicitly reset by either calling
775 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
776 /// before completing the stream, the HTTP/2.0 specification requires that
777 /// any further frames received for that stream must be ignored for "some
778 /// time".
779 ///
780 /// In order to satisfy the specification, internal state must be maintained
781 /// to implement the behavior. This state grows linearly with the number of
782 /// streams that are locally reset.
783 ///
784 /// The `max_concurrent_reset_streams` setting configures sets an upper
785 /// bound on the amount of state that is maintained. When this max value is
786 /// reached, the oldest reset stream is purged from memory.
787 ///
788 /// Once the stream has been fully purged from memory, any additional frames
789 /// received for that stream will result in a connection level protocol
790 /// error, forcing the connection to terminate.
791 ///
792 /// The default value is 10.
793 ///
794 /// # Examples
795 ///
796 /// ```
797 /// # use futures_io::{AsyncRead, AsyncWrite};
798 /// # use h2::server::*;
799 /// #
800 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
801 /// # -> Handshake<T>
802 /// # {
803 /// // `server_fut` is a future representing the completion of the HTTP/2.0
804 /// // handshake.
805 /// let server_fut = Builder::new()
806 /// .max_concurrent_reset_streams(1000)
807 /// .handshake(my_io);
808 /// # server_fut
809 /// # }
810 /// #
811 /// # pub fn main() {}
812 /// ```
813 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
814 self.reset_stream_max = max;
815 self
816 }
817
818 /// Sets the maximum number of concurrent locally reset streams.
819 ///
820 /// When a stream is explicitly reset by either calling
821 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
822 /// before completing the stream, the HTTP/2.0 specification requires that
823 /// any further frames received for that stream must be ignored for "some
824 /// time".
825 ///
826 /// In order to satisfy the specification, internal state must be maintained
827 /// to implement the behavior. This state grows linearly with the number of
828 /// streams that are locally reset.
829 ///
830 /// The `reset_stream_duration` setting configures the max amount of time
831 /// this state will be maintained in memory. Once the duration elapses, the
832 /// stream state is purged from memory.
833 ///
834 /// Once the stream has been fully purged from memory, any additional frames
835 /// received for that stream will result in a connection level protocol
836 /// error, forcing the connection to terminate.
837 ///
838 /// The default value is 30 seconds.
839 ///
840 /// # Examples
841 ///
842 /// ```
843 /// # use futures_io::{AsyncRead, AsyncWrite};
844 /// # use h2::server::*;
845 /// # use std::time::Duration;
846 /// #
847 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
848 /// # -> Handshake<T>
849 /// # {
850 /// // `server_fut` is a future representing the completion of the HTTP/2.0
851 /// // handshake.
852 /// let server_fut = Builder::new()
853 /// .reset_stream_duration(Duration::from_secs(10))
854 /// .handshake(my_io);
855 /// # server_fut
856 /// # }
857 /// #
858 /// # pub fn main() {}
859 /// ```
860 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
861 self.reset_stream_duration = dur;
862 self
863 }
864
865 /// Creates a new configured HTTP/2.0 server backed by `io`.
866 ///
867 /// It is expected that `io` already be in an appropriate state to commence
868 /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
869 ///
870 /// Returns a future which resolves to the [`Connection`] instance once the
871 /// HTTP/2.0 handshake has been completed.
872 ///
873 /// This function also allows the caller to configure the send payload data
874 /// type. See [Outbound data type] for more details.
875 ///
876 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
877 /// [Handshake]: ../index.html#handshake
878 /// [`Connection`]: struct.Connection.html
879 /// [Outbound data type]: ../index.html#outbound-data-type.
880 ///
881 /// # Examples
882 ///
883 /// Basic usage:
884 ///
885 /// ```
886 /// # use futures_io::{AsyncRead, AsyncWrite};
887 /// # use h2::server::*;
888 /// #
889 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
890 /// # -> Handshake<T>
891 /// # {
892 /// // `server_fut` is a future representing the completion of the HTTP/2.0
893 /// // handshake.
894 /// let server_fut = Builder::new()
895 /// .handshake(my_io);
896 /// # server_fut
897 /// # }
898 /// #
899 /// # pub fn main() {}
900 /// ```
901 ///
902 /// Configures the send-payload data type. In this case, the outbound data
903 /// type will be `&'static [u8]`.
904 ///
905 /// ```
906 /// # use futures_io::{AsyncRead, AsyncWrite};
907 /// # use h2::server::*;
908 /// #
909 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
910 /// # -> Handshake<T, &'static [u8]>
911 /// # {
912 /// // `server_fut` is a future representing the completion of the HTTP/2.0
913 /// // handshake.
914 /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
915 /// .handshake(my_io);
916 /// # server_fut
917 /// # }
918 /// #
919 /// # pub fn main() {}
920 /// ```
921 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
922 where
923 T: AsyncRead + AsyncWrite + Unpin,
924 B: Buf + 'static,
925 {
926 Connection::handshake2(io, self.clone())
927 }
928}
929
930impl Default for Builder {
931 fn default() -> Builder {
932 Builder::new()
933 }
934}
935
936// ===== impl SendResponse =====
937
938impl<B: Buf> SendResponse<B> {
939 /// Send a response to a client request.
940 ///
941 /// On success, a [`SendStream`] instance is returned. This instance can be
942 /// used to stream the response body and send trailers.
943 ///
944 /// If a body or trailers will be sent on the returned [`SendStream`]
945 /// instance, then `end_of_stream` must be set to `false` when calling this
946 /// function.
947 ///
948 /// The [`SendResponse`] instance is already associated with a received
949 /// request. This function may only be called once per instance and only if
950 /// [`send_reset`] has not been previously called.
951 ///
952 /// [`SendResponse`]: #
953 /// [`SendStream`]: ../struct.SendStream.html
954 /// [`send_reset`]: #method.send_reset
955 pub fn send_response(
956 &mut self,
957 response: Response<()>,
958 end_of_stream: bool,
959 ) -> Result<SendStream<B>, crate::Error> {
960 self.inner
961 .send_response(response, end_of_stream)
962 .map(|_| SendStream::new(self.inner.clone()))
963 .map_err(Into::into)
964 }
965
966 /// Push a request and response to the client
967 ///
968 /// On success, a [`SendResponse`] instance is returned.
969 ///
970 /// [`SendResponse`]: #
971 pub fn push_request(
972 &mut self,
973 request: Request<()>,
974 ) -> Result<SendPushedResponse<B>, crate::Error> {
975 self.inner
976 .send_push_promise(request)
977 .map(|inner| SendPushedResponse {
978 inner: SendResponse { inner },
979 })
980 .map_err(Into::into)
981 }
982
983 /// Send a stream reset to the peer.
984 ///
985 /// This essentially cancels the stream, including any inbound or outbound
986 /// data streams.
987 ///
988 /// If this function is called before [`send_response`], a call to
989 /// [`send_response`] will result in an error.
990 ///
991 /// If this function is called while a [`SendStream`] instance is active,
992 /// any further use of the instance will result in an error.
993 ///
994 /// This function should only be called once.
995 ///
996 /// [`send_response`]: #method.send_response
997 /// [`SendStream`]: ../struct.SendStream.html
998 pub fn send_reset(&mut self, reason: Reason) {
999 self.inner.send_reset(reason)
1000 }
1001
1002 /// Polls to be notified when the client resets this stream.
1003 ///
1004 /// If stream is still open, this returns `Poll::Pending`, and
1005 /// registers the task to be notified if a `RST_STREAM` is received.
1006 ///
1007 /// If a `RST_STREAM` frame is received for this stream, calling this
1008 /// method will yield the `Reason` for the reset.
1009 ///
1010 /// # Error
1011 ///
1012 /// Calling this method after having called `send_response` will return
1013 /// a user error.
1014 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1015 self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1016 }
1017
1018 /// Returns the stream ID of the response stream.
1019 ///
1020 /// # Panics
1021 ///
1022 /// If the lock on the strean store has been poisoned.
1023 pub fn stream_id(&self) -> crate::StreamId {
1024 crate::StreamId::from_internal(self.inner.stream_id())
1025 }
1026}
1027
1028// ===== impl SendPushedResponse =====
1029
1030impl<B: Buf> SendPushedResponse<B> {
1031 /// Send a response to a promised request.
1032 ///
1033 /// On success, a [`SendStream`] instance is returned. This instance can be
1034 /// used to stream the response body and send trailers.
1035 ///
1036 /// If a body or trailers will be sent on the returned [`SendStream`]
1037 /// instance, then `end_of_stream` must be set to `false` when calling this
1038 /// function.
1039 ///
1040 /// The [`SendPushedResponse`] instance is associated with a promised
1041 /// request. This function may only be called once per instance and only if
1042 /// [`send_reset`] has not been previously called.
1043 ///
1044 /// [`SendPushedResponse`]: #
1045 /// [`SendStream`]: ../struct.SendStream.html
1046 /// [`send_reset`]: #method.send_reset
1047 pub fn send_response(
1048 &mut self,
1049 response: Response<()>,
1050 end_of_stream: bool,
1051 ) -> Result<SendStream<B>, crate::Error> {
1052 self.inner.send_response(response, end_of_stream)
1053 }
1054
1055 /// Send a stream reset to the peer.
1056 ///
1057 /// This essentially cancels the stream, including any inbound or outbound
1058 /// data streams.
1059 ///
1060 /// If this function is called before [`send_response`], a call to
1061 /// [`send_response`] will result in an error.
1062 ///
1063 /// If this function is called while a [`SendStream`] instance is active,
1064 /// any further use of the instance will result in an error.
1065 ///
1066 /// This function should only be called once.
1067 ///
1068 /// [`send_response`]: #method.send_response
1069 /// [`SendStream`]: ../struct.SendStream.html
1070 pub fn send_reset(&mut self, reason: Reason) {
1071 self.inner.send_reset(reason)
1072 }
1073
1074 /// Polls to be notified when the client resets this stream.
1075 ///
1076 /// If stream is still open, this returns `Poll::Pending`, and
1077 /// registers the task to be notified if a `RST_STREAM` is received.
1078 ///
1079 /// If a `RST_STREAM` frame is received for this stream, calling this
1080 /// method will yield the `Reason` for the reset.
1081 ///
1082 /// # Error
1083 ///
1084 /// Calling this method after having called `send_response` will return
1085 /// a user error.
1086 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1087 self.inner.poll_reset(cx)
1088 }
1089
1090 /// Returns the stream ID of the response stream.
1091 ///
1092 /// # Panics
1093 ///
1094 /// If the lock on the strean store has been poisoned.
1095 pub fn stream_id(&self) -> crate::StreamId {
1096 self.inner.stream_id()
1097 }
1098}
1099
1100// ===== impl Flush =====
1101
1102impl<T, B: Buf> Flush<T, B> {
1103 fn new(codec: Codec<T, B>) -> Self {
1104 Flush { codec: Some(codec) }
1105 }
1106}
1107
1108impl<T, B> Future for Flush<T, B>
1109where
1110 T: AsyncWrite + Unpin,
1111 B: Buf,
1112{
1113 type Output = Result<Codec<T, B>, crate::Error>;
1114
1115 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1116 // Flush the codec
1117 ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1118
1119 // Return the codec
1120 Poll::Ready(Ok(self.codec.take().unwrap()))
1121 }
1122}
1123
1124impl<T, B: Buf> ReadPreface<T, B> {
1125 fn new(codec: Codec<T, B>) -> Self {
1126 ReadPreface {
1127 codec: Some(codec),
1128 pos: 0,
1129 }
1130 }
1131
1132 fn inner_mut(&mut self) -> &mut T {
1133 self.codec.as_mut().unwrap().get_mut()
1134 }
1135}
1136
1137impl<T, B> Future for ReadPreface<T, B>
1138where
1139 T: AsyncRead + Unpin,
1140 B: Buf,
1141{
1142 type Output = Result<Codec<T, B>, crate::Error>;
1143
1144 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1145 let mut buf = [0; 24];
1146 let mut rem = PREFACE.len() - self.pos;
1147
1148 while rem > 0 {
1149 let n = ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf[..rem]))
1150 .map_err(crate::Error::from_io)?;
1151 if n == 0 {
1152 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1153 io::ErrorKind::UnexpectedEof,
1154 "connection closed before reading preface",
1155 ))));
1156 }
1157
1158 if PREFACE[self.pos..self.pos + n] != buf[..n] {
1159 proto_err!(conn: "read_preface: invalid preface");
1160 // TODO: Should this just write the GO_AWAY frame directly?
1161 return Poll::Ready(Err(Reason::PROTOCOL_ERROR.into()));
1162 }
1163
1164 self.pos += n;
1165 rem -= n; // TODO test
1166 }
1167
1168 Poll::Ready(Ok(self.codec.take().unwrap()))
1169 }
1170}
1171
1172// ===== impl Handshake =====
1173
1174impl<T, B: Buf> Future for Handshake<T, B>
1175where
1176 T: AsyncRead + AsyncWrite + Unpin,
1177 B: Buf + 'static,
1178{
1179 type Output = Result<Connection<T, B>, crate::Error>;
1180
1181 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1182 log::trace!("Handshake::poll(); state={:?};", self.state);
1183 use crate::server::Handshaking::*;
1184
1185 self.state = if let Flushing(ref mut flush) = self.state {
1186 // We're currently flushing a pending SETTINGS frame. Poll the
1187 // flush future, and, if it's completed, advance our state to wait
1188 // for the client preface.
1189 let codec = match Pin::new(flush).poll(cx)? {
1190 Poll::Pending => {
1191 log::trace!("Handshake::poll(); flush.poll()=Pending");
1192 return Poll::Pending;
1193 }
1194 Poll::Ready(flushed) => {
1195 log::trace!("Handshake::poll(); flush.poll()=Ready");
1196 flushed
1197 }
1198 };
1199 Handshaking::from(ReadPreface::new(codec))
1200 } else {
1201 // Otherwise, we haven't actually advanced the state, but we have
1202 // to replace it with itself, because we have to return a value.
1203 // (note that the assignment to `self.state` has to be outside of
1204 // the `if let` block above in order to placate the borrow checker).
1205 mem::replace(&mut self.state, Handshaking::Empty)
1206 };
1207 let poll = if let ReadingPreface(ref mut read) = self.state {
1208 // We're now waiting for the client preface. Poll the `ReadPreface`
1209 // future. If it has completed, we will create a `Connection` handle
1210 // for the connection.
1211 Pin::new(read).poll(cx)
1212 // Actually creating the `Connection` has to occur outside of this
1213 // `if let` block, because we've borrowed `self` mutably in order
1214 // to poll the state and won't be able to borrow the SETTINGS frame
1215 // as well until we release the borrow for `poll()`.
1216 } else {
1217 unreachable!("Handshake::poll() state was not advanced completely!")
1218 };
1219 poll?.map(|codec| {
1220 let connection = proto::Connection::new(
1221 codec,
1222 Config {
1223 next_stream_id: 2.into(),
1224 // Server does not need to locally initiate any streams
1225 initial_max_send_streams: 0,
1226 reset_stream_duration: self.builder.reset_stream_duration,
1227 reset_stream_max: self.builder.reset_stream_max,
1228 settings: self.builder.settings.clone(),
1229 },
1230 );
1231
1232 log::trace!("Handshake::poll(); connection established!");
1233 let mut c = Connection { connection };
1234 if let Some(sz) = self.builder.initial_target_connection_window_size {
1235 c.set_target_window_size(sz);
1236 }
1237 Ok(c)
1238 })
1239 }
1240}
1241
1242impl<T, B> fmt::Debug for Handshake<T, B>
1243where
1244 T: AsyncRead + AsyncWrite + fmt::Debug,
1245 B: fmt::Debug + Buf,
1246{
1247 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1248 write!(fmt, "server::Handshake")
1249 }
1250}
1251
1252impl Peer {
1253 pub fn convert_send_message(
1254 id: StreamId,
1255 response: Response<()>,
1256 end_of_stream: bool,
1257 ) -> frame::Headers {
1258 use http::response::Parts;
1259
1260 // Extract the components of the HTTP request
1261 let (
1262 Parts {
1263 status, headers, ..
1264 },
1265 _,
1266 ) = response.into_parts();
1267
1268 // Build the set pseudo header set. All requests will include `method`
1269 // and `path`.
1270 let pseudo = Pseudo::response(status);
1271
1272 // Create the HEADERS frame
1273 let mut frame = frame::Headers::new(id, pseudo, headers);
1274
1275 if end_of_stream {
1276 frame.set_end_stream()
1277 }
1278
1279 frame
1280 }
1281
1282 pub fn convert_push_message(
1283 stream_id: StreamId,
1284 promised_id: StreamId,
1285 request: Request<()>,
1286 ) -> Result<frame::PushPromise, UserError> {
1287 use http::request::Parts;
1288
1289 if let Err(e) = frame::PushPromise::validate_request(&request) {
1290 use PushPromiseHeaderError::*;
1291 match e {
1292 NotSafeAndCacheable => log::debug!(
1293 "convert_push_message: method {} is not safe and cacheable",
1294 request.method(),
1295 ),
1296 InvalidContentLength(e) => log::debug!(
1297 "convert_push_message; promised request has invalid content-length {:?}",
1298 e,
1299 ),
1300 }
1301 return Err(UserError::MalformedHeaders);
1302 }
1303
1304 // Extract the components of the HTTP request
1305 let (
1306 Parts {
1307 method,
1308 uri,
1309 headers,
1310 ..
1311 },
1312 _,
1313 ) = request.into_parts();
1314
1315 let pseudo = Pseudo::request(method, uri);
1316
1317 Ok(frame::PushPromise::new(
1318 stream_id,
1319 promised_id,
1320 pseudo,
1321 headers,
1322 ))
1323 }
1324}
1325
1326impl proto::Peer for Peer {
1327 type Poll = Request<()>;
1328
1329 const NAME: &'static str = "Server";
1330
1331 fn is_server() -> bool {
1332 true
1333 }
1334
1335 fn r#dyn() -> proto::DynPeer {
1336 proto::DynPeer::Server
1337 }
1338
1339 fn convert_poll_message(
1340 pseudo: Pseudo,
1341 fields: HeaderMap,
1342 stream_id: StreamId,
1343 ) -> Result<Self::Poll, RecvError> {
1344 use http::{uri, Version};
1345
1346 let mut b = Request::builder();
1347
1348 macro_rules! malformed {
1349 ($($arg:tt)*) => {{
1350 log::debug!($($arg)*);
1351 return Err(RecvError::Stream {
1352 id: stream_id,
1353 reason: Reason::PROTOCOL_ERROR,
1354 });
1355 }}
1356 };
1357
1358 b = b.version(Version::HTTP_2);
1359
1360 let is_connect;
1361 if let Some(method) = pseudo.method {
1362 is_connect = method == Method::CONNECT;
1363 b = b.method(method);
1364 } else {
1365 malformed!("malformed headers: missing method");
1366 }
1367
1368 // Specifying :status for a request is a protocol error
1369 if pseudo.status.is_some() {
1370 log::trace!("malformed headers: :status field on request; PROTOCOL_ERROR");
1371 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
1372 }
1373
1374 // Convert the URI
1375 let mut parts = uri::Parts::default();
1376
1377 // A request translated from HTTP/1 must not include the :authority
1378 // header
1379 if let Some(authority) = pseudo.authority {
1380 let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1381 parts.authority = Some(maybe_authority.or_else(|why| {
1382 malformed!(
1383 "malformed headers: malformed authority ({:?}): {}",
1384 authority,
1385 why,
1386 )
1387 })?);
1388 }
1389
1390 // A :scheme is required, except CONNECT.
1391 if let Some(scheme) = pseudo.scheme {
1392 if is_connect {
1393 malformed!(":scheme in CONNECT");
1394 }
1395 let maybe_scheme = scheme.parse();
1396 let scheme = maybe_scheme.or_else(|why| {
1397 malformed!(
1398 "malformed headers: malformed scheme ({:?}): {}",
1399 scheme,
1400 why,
1401 )
1402 })?;
1403
1404 // It's not possible to build an `Uri` from a scheme and path. So,
1405 // after validating is was a valid scheme, we just have to drop it
1406 // if there isn't an :authority.
1407 if parts.authority.is_some() {
1408 parts.scheme = Some(scheme);
1409 }
1410 } else if !is_connect {
1411 malformed!("malformed headers: missing scheme");
1412 }
1413
1414 if let Some(path) = pseudo.path {
1415 if is_connect {
1416 malformed!(":path in CONNECT");
1417 }
1418
1419 // This cannot be empty
1420 if path.is_empty() {
1421 malformed!("malformed headers: missing path");
1422 }
1423
1424 let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1425 parts.path_and_query = Some(maybe_path.or_else(|why| {
1426 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1427 })?);
1428 }
1429
1430 b = b.uri(parts);
1431
1432 let mut request = match b.body(()) {
1433 Ok(request) => request,
1434 Err(e) => {
1435 // TODO: Should there be more specialized handling for different
1436 // kinds of errors
1437 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1438 return Err(RecvError::Stream {
1439 id: stream_id,
1440 reason: Reason::PROTOCOL_ERROR,
1441 });
1442 }
1443 };
1444
1445 *request.headers_mut() = fields;
1446
1447 Ok(request)
1448 }
1449}
1450
1451// ===== impl Handshaking =====
1452
1453impl<T, B> fmt::Debug for Handshaking<T, B>
1454where
1455 B: Buf,
1456{
1457 #[inline]
1458 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1459 match *self {
1460 Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"),
1461 Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"),
1462 Handshaking::Empty => write!(f, "Handshaking::Empty"),
1463 }
1464 }
1465}
1466
1467impl<T, B> convert::From<Flush<T, Prioritized<B>>> for Handshaking<T, B>
1468where
1469 T: AsyncRead + AsyncWrite,
1470 B: Buf,
1471{
1472 #[inline]
1473 fn from(flush: Flush<T, Prioritized<B>>) -> Self {
1474 Handshaking::Flushing(flush)
1475 }
1476}
1477
1478impl<T, B> convert::From<ReadPreface<T, Prioritized<B>>> for Handshaking<T, B>
1479where
1480 T: AsyncRead + AsyncWrite,
1481 B: Buf,
1482{
1483 #[inline]
1484 fn from(read: ReadPreface<T, Prioritized<B>>) -> Self {
1485 Handshaking::ReadingPreface(read)
1486 }
1487}
1488
1489impl<T, B> convert::From<Codec<T, Prioritized<B>>> for Handshaking<T, B>
1490where
1491 T: AsyncRead + AsyncWrite,
1492 B: Buf,
1493{
1494 #[inline]
1495 fn from(codec: Codec<T, Prioritized<B>>) -> Self {
1496 Handshaking::from(Flush::new(codec))
1497 }
1498}