http2/server.rs
1//! Server implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/http2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use http2::server;
68//! use http::{Response, StatusCode};
69//! use tokio::net::TcpListener;
70//!
71//! #[tokio::main]
72//! pub async fn main() {
73//! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//! // Accept all incoming TCP connections.
76//! loop {
77//! if let Ok((socket, _peer_addr)) = listener.accept().await {
78//! // Spawn a new task to process each connection.
79//! tokio::spawn(async {
80//! // Start the HTTP/2 connection handshake
81//! let mut http2 = server::handshake(socket).await.unwrap();
82//! // Accept all inbound HTTP/2 streams sent over the
83//! // connection.
84//! while let Some(request) = http2.accept().await {
85//! let (request, mut respond) = request.unwrap();
86//! println!("Received request: {:?}", request);
87//!
88//! // Build a response with no body
89//! let response = Response::builder()
90//! .status(StatusCode::OK)
91//! .body(())
92//! .unwrap();
93//!
94//! // Send the response back to the client
95//! respond.send_response(response, true)
96//! .unwrap();
97//! }
98//!
99//! });
100//! }
101//! }
102//! }
103//! ```
104//!
105//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106//! [`handshake`]: fn.handshake.html
107//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108//! [`Builder`]: struct.Builder.html
109//! [`Connection`]: struct.Connection.html
110//! [`Connection::poll`]: struct.Connection.html#method.poll
111//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114//! [`RecvStream`]: ../struct.RecvStream.html
115//! [`SendStream`]: ../struct.SendStream.html
116//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118use crate::codec::{Codec, UserError};
119use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120use crate::proto::{self, Config, Error, Prioritized};
121use crate::{tracing, FlowControl, PingPong, RecvStream, SendStream};
122
123#[cfg(feature = "tracing")]
124use ::tracing::instrument::{Instrument, Instrumented};
125use bytes::{Buf, Bytes};
126use http::{HeaderMap, Method, Request, Response};
127use std::future::Future;
128use std::pin::Pin;
129use std::task::{Context, Poll};
130use std::time::Duration;
131use std::{fmt, io};
132use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
133
134/// In progress HTTP/2 connection handshake future.
135///
136/// This type implements `Future`, yielding a `Connection` instance once the
137/// handshake has completed.
138///
139/// The handshake is completed once the connection preface is fully received
140/// from the client **and** the initial settings frame is sent to the client.
141///
142/// The handshake future does not wait for the initial settings frame from the
143/// client.
144///
145/// See [module] level docs for more details.
146///
147/// [module]: index.html
148#[must_use = "futures do nothing unless polled"]
149pub struct Handshake<T, B: Buf = Bytes> {
150 /// The config to pass to Connection::new after handshake succeeds.
151 builder: Builder,
152 /// The current state of the handshake.
153 state: Handshaking<T, B>,
154 /// Span tracking the handshake
155 #[cfg(feature = "tracing")]
156 span: ::tracing::Span,
157}
158
159/// Accepts inbound HTTP/2 streams on a connection.
160///
161/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
162/// implements the HTTP/2 server logic for that connection. It is responsible
163/// for receiving inbound streams initiated by the client as well as driving the
164/// internal state forward.
165///
166/// `Connection` values are created by calling [`handshake`]. Once a
167/// `Connection` value is obtained, the caller must call [`poll`] or
168/// [`poll_close`] in order to drive the internal connection state forward.
169///
170/// See [module level] documentation for more details
171///
172/// [module level]: index.html
173/// [`handshake`]: struct.Connection.html#method.handshake
174/// [`poll`]: struct.Connection.html#method.poll
175/// [`poll_close`]: struct.Connection.html#method.poll_close
176///
177/// # Examples
178///
179/// ```
180/// # use tokio::io::{AsyncRead, AsyncWrite};
181/// # use http2::server;
182/// # use http2::server::*;
183/// #
184/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
185/// let mut server = server::handshake(my_io).await.unwrap();
186/// while let Some(request) = server.accept().await {
187/// tokio::spawn(async move {
188/// let (request, respond) = request.unwrap();
189/// // Process the request and send the response back to the client
190/// // using `respond`.
191/// });
192/// }
193/// # }
194/// #
195/// # pub fn main() {}
196/// ```
197#[must_use = "streams do nothing unless polled"]
198pub struct Connection<T, B: Buf> {
199 connection: proto::Connection<T, Peer, B>,
200}
201
202/// Builds server connections with custom configuration values.
203///
204/// Methods can be chained in order to set the configuration values.
205///
206/// The server is constructed by calling [`handshake`] and passing the I/O
207/// handle that will back the HTTP/2 server.
208///
209/// New instances of `Builder` are obtained via [`Builder::new`].
210///
211/// See function level documentation for details on the various server
212/// configuration settings.
213///
214/// [`Builder::new`]: struct.Builder.html#method.new
215/// [`handshake`]: struct.Builder.html#method.handshake
216///
217/// # Examples
218///
219/// ```
220/// # use tokio::io::{AsyncRead, AsyncWrite};
221/// # use http2::server::*;
222/// #
223/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
224/// # -> Handshake<T>
225/// # {
226/// // `server_fut` is a future representing the completion of the HTTP/2
227/// // handshake.
228/// let server_fut = Builder::new()
229/// .initial_window_size(1_000_000)
230/// .max_concurrent_streams(1000)
231/// .handshake(my_io);
232/// # server_fut
233/// # }
234/// #
235/// # pub fn main() {}
236/// ```
237#[derive(Clone, Debug)]
238pub struct Builder {
239 /// Time to keep locally reset streams around before reaping.
240 reset_stream_duration: Duration,
241
242 /// Maximum number of locally reset streams to keep at a time.
243 reset_stream_max: usize,
244
245 /// Maximum number of remotely reset streams to allow in the pending
246 /// accept queue.
247 pending_accept_reset_stream_max: usize,
248
249 /// Initial `Settings` frame to send as part of the handshake.
250 settings: Settings,
251
252 /// Initial target window size for new connections.
253 initial_target_connection_window_size: Option<u32>,
254
255 /// Maximum amount of bytes to "buffer" for writing per stream.
256 max_send_buffer_size: usize,
257
258 /// Maximum number of locally reset streams due to protocol error across
259 /// the lifetime of the connection.
260 ///
261 /// When this gets exceeded, we issue GOAWAYs.
262 local_max_error_reset_streams: Option<usize>,
263}
264
265/// Send a response back to the client
266///
267/// A `SendResponse` instance is provided when receiving a request and is used
268/// to send the associated response back to the client. It is also used to
269/// explicitly reset the stream with a custom reason.
270///
271/// It will also be used to initiate push promises linked with the associated
272/// stream.
273///
274/// If the `SendResponse` instance is dropped without sending a response, then
275/// the HTTP/2 stream will be reset.
276///
277/// See [module] level docs for more details.
278///
279/// [module]: index.html
280#[derive(Debug)]
281pub struct SendResponse<B: Buf> {
282 inner: proto::StreamRef<B>,
283}
284
285/// Send a response to a promised request
286///
287/// A `SendPushedResponse` instance is provided when promising a request and is used
288/// to send the associated response to the client. It is also used to
289/// explicitly reset the stream with a custom reason.
290///
291/// It can not be used to initiate push promises.
292///
293/// If the `SendPushedResponse` instance is dropped without sending a response, then
294/// the HTTP/2 stream will be reset.
295///
296/// See [module] level docs for more details.
297///
298/// [module]: index.html
299pub struct SendPushedResponse<B: Buf> {
300 inner: SendResponse<B>,
301}
302
303// Manual implementation necessary because of rust-lang/rust#26925
304impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
305 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
306 write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
307 }
308}
309
310/// Stages of an in-progress handshake.
311enum Handshaking<T, B: Buf> {
312 /// State 1. Connection is flushing pending SETTINGS frame.
313 #[cfg(feature = "tracing")]
314 Flushing(Instrumented<Flush<T, Prioritized<B>>>),
315 #[cfg(not(feature = "tracing"))]
316 Flushing(Flush<T, Prioritized<B>>),
317
318 /// State 2. Connection is waiting for the client preface.
319 #[cfg(feature = "tracing")]
320 ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
321 #[cfg(not(feature = "tracing"))]
322 ReadingPreface(ReadPreface<T, Prioritized<B>>),
323
324 /// State 3. Handshake is done, polling again would panic.
325 Done,
326}
327
328/// Flush a Sink
329struct Flush<T, B> {
330 codec: Option<Codec<T, B>>,
331}
332
333/// Read the client connection preface
334struct ReadPreface<T, B> {
335 codec: Option<Codec<T, B>>,
336 pos: usize,
337}
338
339#[derive(Debug)]
340pub(crate) struct Peer;
341
342const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
343
344/// Creates a new configured HTTP/2 server with default configuration
345/// values backed by `io`.
346///
347/// It is expected that `io` already be in an appropriate state to commence
348/// the [HTTP/2 handshake]. See [Handshake] for more details.
349///
350/// Returns a future which resolves to the [`Connection`] instance once the
351/// HTTP/2 handshake has been completed. The returned [`Connection`]
352/// instance will be using default configuration values. Use [`Builder`] to
353/// customize the configuration values used by a [`Connection`] instance.
354///
355/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
356/// [Handshake]: ../index.html#handshake
357/// [`Connection`]: struct.Connection.html
358///
359/// # Examples
360///
361/// ```
362/// # use tokio::io::{AsyncRead, AsyncWrite};
363/// # use http2::server;
364/// # use http2::server::*;
365/// #
366/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
367/// # {
368/// let connection = server::handshake(my_io).await.unwrap();
369/// // The HTTP/2 handshake has completed, now use `connection` to
370/// // accept inbound HTTP/2 streams.
371/// # }
372/// #
373/// # pub fn main() {}
374/// ```
375pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
376where
377 T: AsyncRead + AsyncWrite + Unpin,
378{
379 Builder::new().handshake(io)
380}
381
382// ===== impl Connection =====
383
384impl<T, B> Connection<T, B>
385where
386 T: AsyncRead + AsyncWrite + Unpin,
387 B: Buf,
388{
389 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
390 #[cfg(feature = "tracing")]
391 let span = ::tracing::trace_span!("server_handshake");
392 #[cfg(feature = "tracing")]
393 let entered = span.enter();
394
395 // Create the codec.
396 let mut codec = Codec::new(io);
397
398 if let Some(max) = builder.settings.max_frame_size() {
399 codec.set_max_recv_frame_size(max as usize);
400 }
401
402 if let Some(max) = builder.settings.max_header_list_size() {
403 codec.set_max_recv_header_list_size(max as usize);
404 }
405
406 // Send initial settings frame.
407 codec
408 .buffer(builder.settings.clone().into())
409 .expect("invalid SETTINGS frame");
410
411 // Create the handshake future.
412 #[cfg(feature = "tracing")]
413 let state =
414 Handshaking::Flushing(Flush::new(codec).instrument(::tracing::trace_span!("flush")));
415 #[cfg(not(feature = "tracing"))]
416 let state = Handshaking::Flushing(Flush::new(codec));
417
418 #[cfg(feature = "tracing")]
419 drop(entered);
420
421 Handshake {
422 builder,
423 state,
424 #[cfg(feature = "tracing")]
425 span,
426 }
427 }
428
429 /// Accept the next incoming request on this connection.
430 pub async fn accept(
431 &mut self,
432 ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
433 crate::poll_fn(move |cx| self.poll_accept(cx)).await
434 }
435
436 #[doc(hidden)]
437 pub fn poll_accept(
438 &mut self,
439 cx: &mut Context<'_>,
440 ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
441 // Always try to advance the internal state. Getting Pending also is
442 // needed to allow this function to return Pending.
443 if self.poll_closed(cx)?.is_ready() {
444 // If the socket is closed, don't return anything
445 // TODO: drop any pending streams
446 return Poll::Ready(None);
447 }
448
449 if let Some(inner) = self.connection.next_incoming() {
450 tracing::trace!("received incoming");
451 let (head, _) = inner.take_request().into_parts();
452 let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
453
454 let request = Request::from_parts(head, body);
455 let respond = SendResponse { inner };
456
457 return Poll::Ready(Some(Ok((request, respond))));
458 }
459
460 Poll::Pending
461 }
462
463 /// Sets the target window size for the whole connection.
464 ///
465 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
466 /// frame will be immediately sent to the remote, increasing the connection
467 /// level window by `size - current_value`.
468 ///
469 /// If `size` is less than the current value, nothing will happen
470 /// immediately. However, as window capacity is released by
471 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
472 /// out until the number of "in flight" bytes drops below `size`.
473 ///
474 /// The default value is 65,535.
475 ///
476 /// See [`FlowControl`] documentation for more details.
477 ///
478 /// [`FlowControl`]: ../struct.FlowControl.html
479 /// [library level]: ../index.html#flow-control
480 pub fn set_target_window_size(&mut self, size: u32) {
481 assert!(size <= proto::MAX_WINDOW_SIZE);
482 self.connection.set_target_window_size(size);
483 }
484
485 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
486 /// flow control for received data.
487 ///
488 /// The `SETTINGS` will be sent to the remote, and only applied once the
489 /// remote acknowledges the change.
490 ///
491 /// This can be used to increase or decrease the window size for existing
492 /// streams.
493 ///
494 /// # Errors
495 ///
496 /// Returns an error if a previous call is still pending acknowledgement
497 /// from the remote endpoint.
498 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
499 assert!(size <= proto::MAX_WINDOW_SIZE);
500 self.connection.set_initial_window_size(size)?;
501 Ok(())
502 }
503
504 /// Enables the [extended CONNECT protocol].
505 ///
506 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
507 ///
508 /// # Errors
509 ///
510 /// Returns an error if a previous call is still pending acknowledgement
511 /// from the remote endpoint.
512 pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
513 self.connection.set_enable_connect_protocol()?;
514 Ok(())
515 }
516
517 /// Returns `Ready` when the underlying connection has closed.
518 ///
519 /// If any new inbound streams are received during a call to `poll_closed`,
520 /// they will be queued and returned on the next call to [`poll_accept`].
521 ///
522 /// This function will advance the internal connection state, driving
523 /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
524 ///
525 /// See [here](index.html#managing-the-connection) for more details.
526 ///
527 /// [`poll_accept`]: struct.Connection.html#method.poll_accept
528 /// [`RecvStream`]: ../struct.RecvStream.html
529 /// [`SendStream`]: ../struct.SendStream.html
530 pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
531 self.connection.poll(cx).map_err(Into::into)
532 }
533
534 /// Sets the connection to a GOAWAY state.
535 ///
536 /// Does not terminate the connection. Must continue being polled to close
537 /// connection.
538 ///
539 /// After flushing the GOAWAY frame, the connection is closed. Any
540 /// outstanding streams do not prevent the connection from closing. This
541 /// should usually be reserved for shutting down when something bad
542 /// external to `http2` has happened, and open streams cannot be properly
543 /// handled.
544 ///
545 /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
546 pub fn abrupt_shutdown(&mut self, reason: Reason) {
547 self.connection.go_away_from_user(reason);
548 }
549
550 /// Starts a [graceful shutdown][1] process.
551 ///
552 /// Must continue being polled to close connection.
553 ///
554 /// It's possible to receive more requests after calling this method, since
555 /// they might have been in-flight from the client already. After about
556 /// 1 RTT, no new requests should be accepted. Once all active streams
557 /// have completed, the connection is closed.
558 ///
559 /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
560 pub fn graceful_shutdown(&mut self) {
561 self.connection.go_away_gracefully();
562 }
563
564 /// Takes a `PingPong` instance from the connection.
565 ///
566 /// # Note
567 ///
568 /// This may only be called once. Calling multiple times will return `None`.
569 pub fn ping_pong(&mut self) -> Option<PingPong> {
570 self.connection.take_user_pings().map(PingPong::new)
571 }
572
573 /// Checks if there are any streams
574 pub fn has_streams(&self) -> bool {
575 self.connection.has_streams()
576 }
577
578 /// Returns the maximum number of concurrent streams that may be initiated
579 /// by the server on this connection.
580 ///
581 /// This limit is configured by the client peer by sending the
582 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
583 /// This method returns the currently acknowledged value received from the
584 /// remote.
585 ///
586 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
587 pub fn max_concurrent_send_streams(&self) -> usize {
588 self.connection.max_send_streams()
589 }
590
591 /// Returns the maximum number of concurrent streams that may be initiated
592 /// by the client on this connection.
593 ///
594 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
595 /// parameter][1] sent in a `SETTINGS` frame that has been
596 /// acknowledged by the remote peer. The value to be sent is configured by
597 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
598 /// with the remote peer.
599 ///
600 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
601 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
602 pub fn max_concurrent_recv_streams(&self) -> usize {
603 self.connection.max_recv_streams()
604 }
605
606 // Could disappear at anytime.
607 #[doc(hidden)]
608 #[cfg(feature = "unstable")]
609 pub fn num_wired_streams(&self) -> usize {
610 self.connection.num_wired_streams()
611 }
612}
613
614#[cfg(feature = "stream")]
615impl<T, B> futures_core::Stream for Connection<T, B>
616where
617 T: AsyncRead + AsyncWrite + Unpin,
618 B: Buf,
619{
620 type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
621
622 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
623 self.poll_accept(cx)
624 }
625}
626
627impl<T, B> fmt::Debug for Connection<T, B>
628where
629 T: fmt::Debug,
630 B: fmt::Debug + Buf,
631{
632 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
633 fmt.debug_struct("Connection")
634 .field("connection", &self.connection)
635 .finish()
636 }
637}
638
639// ===== impl Builder =====
640
641impl Builder {
642 /// Returns a new server builder instance initialized with default
643 /// configuration values.
644 ///
645 /// Configuration methods can be chained on the return value.
646 ///
647 /// # Examples
648 ///
649 /// ```
650 /// # use tokio::io::{AsyncRead, AsyncWrite};
651 /// # use http2::server::*;
652 /// #
653 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
654 /// # -> Handshake<T>
655 /// # {
656 /// // `server_fut` is a future representing the completion of the HTTP/2
657 /// // handshake.
658 /// let server_fut = Builder::new()
659 /// .initial_window_size(1_000_000)
660 /// .max_concurrent_streams(1000)
661 /// .handshake(my_io);
662 /// # server_fut
663 /// # }
664 /// #
665 /// # pub fn main() {}
666 /// ```
667 pub fn new() -> Builder {
668 Builder {
669 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
670 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
671 pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
672 settings: Settings::default(),
673 initial_target_connection_window_size: None,
674 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
675
676 local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
677 }
678 }
679
680 /// Indicates the initial window size (in octets) for stream-level
681 /// flow control for received data.
682 ///
683 /// The initial window of a stream is used as part of flow control. For more
684 /// details, see [`FlowControl`].
685 ///
686 /// The default value is 65,535.
687 ///
688 /// [`FlowControl`]: ../struct.FlowControl.html
689 ///
690 /// # Examples
691 ///
692 /// ```
693 /// # use tokio::io::{AsyncRead, AsyncWrite};
694 /// # use http2::server::*;
695 /// #
696 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
697 /// # -> Handshake<T>
698 /// # {
699 /// // `server_fut` is a future representing the completion of the HTTP/2
700 /// // handshake.
701 /// let server_fut = Builder::new()
702 /// .initial_window_size(1_000_000)
703 /// .handshake(my_io);
704 /// # server_fut
705 /// # }
706 /// #
707 /// # pub fn main() {}
708 /// ```
709 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
710 self.settings.set_initial_window_size(Some(size));
711 self
712 }
713
714 /// Indicates the initial window size (in octets) for connection-level flow control
715 /// for received data.
716 ///
717 /// The initial window of a connection is used as part of flow control. For more details,
718 /// see [`FlowControl`].
719 ///
720 /// The default value is 65,535.
721 ///
722 /// [`FlowControl`]: ../struct.FlowControl.html
723 ///
724 /// # Examples
725 ///
726 /// ```
727 /// # use tokio::io::{AsyncRead, AsyncWrite};
728 /// # use http2::server::*;
729 /// #
730 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
731 /// # -> Handshake<T>
732 /// # {
733 /// // `server_fut` is a future representing the completion of the HTTP/2
734 /// // handshake.
735 /// let server_fut = Builder::new()
736 /// .initial_connection_window_size(1_000_000)
737 /// .handshake(my_io);
738 /// # server_fut
739 /// # }
740 /// #
741 /// # pub fn main() {}
742 /// ```
743 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
744 self.initial_target_connection_window_size = Some(size);
745 self
746 }
747
748 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
749 /// configured server is able to accept.
750 ///
751 /// The sender may send data frames that are **smaller** than this value,
752 /// but any data larger than `max` will be broken up into multiple `DATA`
753 /// frames.
754 ///
755 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
756 ///
757 /// # Examples
758 ///
759 /// ```
760 /// # use tokio::io::{AsyncRead, AsyncWrite};
761 /// # use http2::server::*;
762 /// #
763 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
764 /// # -> Handshake<T>
765 /// # {
766 /// // `server_fut` is a future representing the completion of the HTTP/2
767 /// // handshake.
768 /// let server_fut = Builder::new()
769 /// .max_frame_size(1_000_000)
770 /// .handshake(my_io);
771 /// # server_fut
772 /// # }
773 /// #
774 /// # pub fn main() {}
775 /// ```
776 ///
777 /// # Panics
778 ///
779 /// This function panics if `max` is not within the legal range specified
780 /// above.
781 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
782 self.settings.set_max_frame_size(Some(max));
783 self
784 }
785
786 /// Sets the max size of received header frames.
787 ///
788 /// This advisory setting informs a peer of the maximum size of header list
789 /// that the sender is prepared to accept, in octets. The value is based on
790 /// the uncompressed size of header fields, including the length of the name
791 /// and value in octets plus an overhead of 32 octets for each header field.
792 ///
793 /// This setting is also used to limit the maximum amount of data that is
794 /// buffered to decode HEADERS frames.
795 ///
796 /// # Examples
797 ///
798 /// ```
799 /// # use tokio::io::{AsyncRead, AsyncWrite};
800 /// # use http2::server::*;
801 /// #
802 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
803 /// # -> Handshake<T>
804 /// # {
805 /// // `server_fut` is a future representing the completion of the HTTP/2
806 /// // handshake.
807 /// let server_fut = Builder::new()
808 /// .max_header_list_size(16 * 1024)
809 /// .handshake(my_io);
810 /// # server_fut
811 /// # }
812 /// #
813 /// # pub fn main() {}
814 /// ```
815 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
816 self.settings.set_max_header_list_size(Some(max));
817 self
818 }
819
820 /// Sets the maximum number of concurrent streams.
821 ///
822 /// The maximum concurrent streams setting only controls the maximum number
823 /// of streams that can be initiated by the remote peer. In other words,
824 /// when this setting is set to 100, this does not limit the number of
825 /// concurrent streams that can be created by the caller.
826 ///
827 /// It is recommended that this value be no smaller than 100, so as to not
828 /// unnecessarily limit parallelism. However, any value is legal, including
829 /// 0. If `max` is set to 0, then the remote will not be permitted to
830 /// initiate streams.
831 ///
832 /// Note that streams in the reserved state, i.e., push promises that have
833 /// been reserved but the stream has not started, do not count against this
834 /// setting.
835 ///
836 /// Also note that if the remote *does* exceed the value set here, it is not
837 /// a protocol level error. Instead, the `http2` library will immediately reset
838 /// the stream.
839 ///
840 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
841 ///
842 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
843 ///
844 /// # Examples
845 ///
846 /// ```
847 /// # use tokio::io::{AsyncRead, AsyncWrite};
848 /// # use http2::server::*;
849 /// #
850 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
851 /// # -> Handshake<T>
852 /// # {
853 /// // `server_fut` is a future representing the completion of the HTTP/2
854 /// // handshake.
855 /// let server_fut = Builder::new()
856 /// .max_concurrent_streams(1000)
857 /// .handshake(my_io);
858 /// # server_fut
859 /// # }
860 /// #
861 /// # pub fn main() {}
862 /// ```
863 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
864 self.settings.set_max_concurrent_streams(Some(max));
865 self
866 }
867
868 /// Sets the maximum number of concurrent locally reset streams.
869 ///
870 /// When a stream is explicitly reset by either calling
871 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
872 /// before completing the stream, the HTTP/2 specification requires that
873 /// any further frames received for that stream must be ignored for "some
874 /// time".
875 ///
876 /// In order to satisfy the specification, internal state must be maintained
877 /// to implement the behavior. This state grows linearly with the number of
878 /// streams that are locally reset.
879 ///
880 /// The `max_concurrent_reset_streams` setting configures sets an upper
881 /// bound on the amount of state that is maintained. When this max value is
882 /// reached, the oldest reset stream is purged from memory.
883 ///
884 /// Once the stream has been fully purged from memory, any additional frames
885 /// received for that stream will result in a connection level protocol
886 /// error, forcing the connection to terminate.
887 ///
888 /// The default value is 10.
889 ///
890 /// # Examples
891 ///
892 /// ```
893 /// # use tokio::io::{AsyncRead, AsyncWrite};
894 /// # use http2::server::*;
895 /// #
896 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
897 /// # -> Handshake<T>
898 /// # {
899 /// // `server_fut` is a future representing the completion of the HTTP/2
900 /// // handshake.
901 /// let server_fut = Builder::new()
902 /// .max_concurrent_reset_streams(1000)
903 /// .handshake(my_io);
904 /// # server_fut
905 /// # }
906 /// #
907 /// # pub fn main() {}
908 /// ```
909 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
910 self.reset_stream_max = max;
911 self
912 }
913
914 /// Sets the maximum number of local resets due to protocol errors made by the remote end.
915 ///
916 /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
917 /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
918 /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
919 ///
920 /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
921 /// `ENHANCE_YOUR_CALM` to the client.
922 ///
923 /// If you really want to disable this, supply [`Option::None`] here.
924 /// Disabling this is not recommended and may expose you to DOS attacks.
925 ///
926 /// The default value is currently 1024, but could change.
927 pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
928 self.local_max_error_reset_streams = max;
929 self
930 }
931
932 /// Sets the maximum number of pending-accept remotely-reset streams.
933 ///
934 /// Streams that have been received by the peer, but not accepted by the
935 /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
936 /// could send a request and then shortly after, realize it is not needed,
937 /// sending a CANCEL.
938 ///
939 /// However, since those streams are now "closed", they don't count towards
940 /// the max concurrent streams. So, they will sit in the accept queue,
941 /// using memory.
942 ///
943 /// When the number of remotely-reset streams sitting in the pending-accept
944 /// queue reaches this maximum value, a connection error with the code of
945 /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
946 /// `Future`.
947 ///
948 /// The default value is currently 20, but could change.
949 ///
950 /// # Examples
951 ///
952 ///
953 /// ```
954 /// # use tokio::io::{AsyncRead, AsyncWrite};
955 /// # use http2::server::*;
956 /// #
957 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
958 /// # -> Handshake<T>
959 /// # {
960 /// // `server_fut` is a future representing the completion of the HTTP/2
961 /// // handshake.
962 /// let server_fut = Builder::new()
963 /// .max_pending_accept_reset_streams(100)
964 /// .handshake(my_io);
965 /// # server_fut
966 /// # }
967 /// #
968 /// # pub fn main() {}
969 /// ```
970 pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
971 self.pending_accept_reset_stream_max = max;
972 self
973 }
974
975 /// Sets the maximum send buffer size per stream.
976 ///
977 /// Once a stream has buffered up to (or over) the maximum, the stream's
978 /// flow control will not "poll" additional capacity. Once bytes for the
979 /// stream have been written to the connection, the send buffer capacity
980 /// will be freed up again.
981 ///
982 /// The default is currently ~400KB, but may change.
983 ///
984 /// # Panics
985 ///
986 /// This function panics if `max` is larger than `u32::MAX`.
987 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
988 assert!(max <= u32::MAX as usize);
989 self.max_send_buffer_size = max;
990 self
991 }
992
993 /// Sets the maximum number of concurrent locally reset streams.
994 ///
995 /// When a stream is explicitly reset by either calling
996 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
997 /// before completing the stream, the HTTP/2 specification requires that
998 /// any further frames received for that stream must be ignored for "some
999 /// time".
1000 ///
1001 /// In order to satisfy the specification, internal state must be maintained
1002 /// to implement the behavior. This state grows linearly with the number of
1003 /// streams that are locally reset.
1004 ///
1005 /// The `reset_stream_duration` setting configures the max amount of time
1006 /// this state will be maintained in memory. Once the duration elapses, the
1007 /// stream state is purged from memory.
1008 ///
1009 /// Once the stream has been fully purged from memory, any additional frames
1010 /// received for that stream will result in a connection level protocol
1011 /// error, forcing the connection to terminate.
1012 ///
1013 /// The default value is 30 seconds.
1014 ///
1015 /// # Examples
1016 ///
1017 /// ```
1018 /// # use tokio::io::{AsyncRead, AsyncWrite};
1019 /// # use http2::server::*;
1020 /// # use std::time::Duration;
1021 /// #
1022 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1023 /// # -> Handshake<T>
1024 /// # {
1025 /// // `server_fut` is a future representing the completion of the HTTP/2
1026 /// // handshake.
1027 /// let server_fut = Builder::new()
1028 /// .reset_stream_duration(Duration::from_secs(10))
1029 /// .handshake(my_io);
1030 /// # server_fut
1031 /// # }
1032 /// #
1033 /// # pub fn main() {}
1034 /// ```
1035 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1036 self.reset_stream_duration = dur;
1037 self
1038 }
1039
1040 /// Enables the [extended CONNECT protocol].
1041 ///
1042 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1043 pub fn enable_connect_protocol(&mut self) -> &mut Self {
1044 self.settings.set_enable_connect_protocol(Some(1));
1045 self
1046 }
1047
1048 /// Creates a new configured HTTP/2 server backed by `io`.
1049 ///
1050 /// It is expected that `io` already be in an appropriate state to commence
1051 /// the [HTTP/2 handshake]. See [Handshake] for more details.
1052 ///
1053 /// Returns a future which resolves to the [`Connection`] instance once the
1054 /// HTTP/2 handshake has been completed.
1055 ///
1056 /// This function also allows the caller to configure the send payload data
1057 /// type. See [Outbound data type] for more details.
1058 ///
1059 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1060 /// [Handshake]: ../index.html#handshake
1061 /// [`Connection`]: struct.Connection.html
1062 /// [Outbound data type]: ../index.html#outbound-data-type.
1063 ///
1064 /// # Examples
1065 ///
1066 /// Basic usage:
1067 ///
1068 /// ```
1069 /// # use tokio::io::{AsyncRead, AsyncWrite};
1070 /// # use http2::server::*;
1071 /// #
1072 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1073 /// # -> Handshake<T>
1074 /// # {
1075 /// // `server_fut` is a future representing the completion of the HTTP/2
1076 /// // handshake.
1077 /// let server_fut = Builder::new()
1078 /// .handshake(my_io);
1079 /// # server_fut
1080 /// # }
1081 /// #
1082 /// # pub fn main() {}
1083 /// ```
1084 ///
1085 /// Configures the send-payload data type. In this case, the outbound data
1086 /// type will be `&'static [u8]`.
1087 ///
1088 /// ```
1089 /// # use tokio::io::{AsyncRead, AsyncWrite};
1090 /// # use http2::server::*;
1091 /// #
1092 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1093 /// # -> Handshake<T, &'static [u8]>
1094 /// # {
1095 /// // `server_fut` is a future representing the completion of the HTTP/2
1096 /// // handshake.
1097 /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1098 /// .handshake(my_io);
1099 /// # server_fut
1100 /// # }
1101 /// #
1102 /// # pub fn main() {}
1103 /// ```
1104 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1105 where
1106 T: AsyncRead + AsyncWrite + Unpin,
1107 B: Buf,
1108 {
1109 Connection::handshake2(io, self.clone())
1110 }
1111}
1112
1113impl Default for Builder {
1114 fn default() -> Builder {
1115 Builder::new()
1116 }
1117}
1118
1119// ===== impl SendResponse =====
1120
1121impl<B: Buf> SendResponse<B> {
1122 /// Send a response to a client request.
1123 ///
1124 /// On success, a [`SendStream`] instance is returned. This instance can be
1125 /// used to stream the response body and send trailers.
1126 ///
1127 /// If a body or trailers will be sent on the returned [`SendStream`]
1128 /// instance, then `end_of_stream` must be set to `false` when calling this
1129 /// function.
1130 ///
1131 /// The [`SendResponse`] instance is already associated with a received
1132 /// request. This function may only be called once per instance and only if
1133 /// [`send_reset`] has not been previously called.
1134 ///
1135 /// [`SendResponse`]: #
1136 /// [`SendStream`]: ../struct.SendStream.html
1137 /// [`send_reset`]: #method.send_reset
1138 pub fn send_response(
1139 &mut self,
1140 response: Response<()>,
1141 end_of_stream: bool,
1142 ) -> Result<SendStream<B>, crate::Error> {
1143 self.inner
1144 .send_response(response, end_of_stream)
1145 .map(|_| SendStream::new(self.inner.clone()))
1146 .map_err(Into::into)
1147 }
1148
1149 /// Push a request and response to the client
1150 ///
1151 /// On success, a [`SendResponse`] instance is returned.
1152 ///
1153 /// [`SendResponse`]: #
1154 pub fn push_request(
1155 &mut self,
1156 request: Request<()>,
1157 ) -> Result<SendPushedResponse<B>, crate::Error> {
1158 self.inner
1159 .send_push_promise(request)
1160 .map(|inner| SendPushedResponse {
1161 inner: SendResponse { inner },
1162 })
1163 .map_err(Into::into)
1164 }
1165
1166 /// Send a stream reset to the peer.
1167 ///
1168 /// This essentially cancels the stream, including any inbound or outbound
1169 /// data streams.
1170 ///
1171 /// If this function is called before [`send_response`], a call to
1172 /// [`send_response`] will result in an error.
1173 ///
1174 /// If this function is called while a [`SendStream`] instance is active,
1175 /// any further use of the instance will result in an error.
1176 ///
1177 /// This function should only be called once.
1178 ///
1179 /// [`send_response`]: #method.send_response
1180 /// [`SendStream`]: ../struct.SendStream.html
1181 pub fn send_reset(&mut self, reason: Reason) {
1182 self.inner.send_reset(reason)
1183 }
1184
1185 /// Polls to be notified when the client resets this stream.
1186 ///
1187 /// If stream is still open, this returns `Poll::Pending`, and
1188 /// registers the task to be notified if a `RST_STREAM` is received.
1189 ///
1190 /// If a `RST_STREAM` frame is received for this stream, calling this
1191 /// method will yield the `Reason` for the reset.
1192 ///
1193 /// # Error
1194 ///
1195 /// Calling this method after having called `send_response` will return
1196 /// a user error.
1197 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1198 self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1199 }
1200
1201 /// Returns the stream ID of the response stream.
1202 ///
1203 /// # Panics
1204 ///
1205 /// If the lock on the stream store has been poisoned.
1206 pub fn stream_id(&self) -> crate::StreamId {
1207 crate::StreamId::from_internal(self.inner.stream_id())
1208 }
1209}
1210
1211// ===== impl SendPushedResponse =====
1212
1213impl<B: Buf> SendPushedResponse<B> {
1214 /// Send a response to a promised request.
1215 ///
1216 /// On success, a [`SendStream`] instance is returned. This instance can be
1217 /// used to stream the response body and send trailers.
1218 ///
1219 /// If a body or trailers will be sent on the returned [`SendStream`]
1220 /// instance, then `end_of_stream` must be set to `false` when calling this
1221 /// function.
1222 ///
1223 /// The [`SendPushedResponse`] instance is associated with a promised
1224 /// request. This function may only be called once per instance and only if
1225 /// [`send_reset`] has not been previously called.
1226 ///
1227 /// [`SendPushedResponse`]: #
1228 /// [`SendStream`]: ../struct.SendStream.html
1229 /// [`send_reset`]: #method.send_reset
1230 pub fn send_response(
1231 &mut self,
1232 response: Response<()>,
1233 end_of_stream: bool,
1234 ) -> Result<SendStream<B>, crate::Error> {
1235 self.inner.send_response(response, end_of_stream)
1236 }
1237
1238 /// Send a stream reset to the peer.
1239 ///
1240 /// This essentially cancels the stream, including any inbound or outbound
1241 /// data streams.
1242 ///
1243 /// If this function is called before [`send_response`], a call to
1244 /// [`send_response`] will result in an error.
1245 ///
1246 /// If this function is called while a [`SendStream`] instance is active,
1247 /// any further use of the instance will result in an error.
1248 ///
1249 /// This function should only be called once.
1250 ///
1251 /// [`send_response`]: #method.send_response
1252 /// [`SendStream`]: ../struct.SendStream.html
1253 pub fn send_reset(&mut self, reason: Reason) {
1254 self.inner.send_reset(reason)
1255 }
1256
1257 /// Polls to be notified when the client resets this stream.
1258 ///
1259 /// If stream is still open, this returns `Poll::Pending`, and
1260 /// registers the task to be notified if a `RST_STREAM` is received.
1261 ///
1262 /// If a `RST_STREAM` frame is received for this stream, calling this
1263 /// method will yield the `Reason` for the reset.
1264 ///
1265 /// # Error
1266 ///
1267 /// Calling this method after having called `send_response` will return
1268 /// a user error.
1269 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1270 self.inner.poll_reset(cx)
1271 }
1272
1273 /// Returns the stream ID of the response stream.
1274 ///
1275 /// # Panics
1276 ///
1277 /// If the lock on the stream store has been poisoned.
1278 pub fn stream_id(&self) -> crate::StreamId {
1279 self.inner.stream_id()
1280 }
1281}
1282
1283// ===== impl Flush =====
1284
1285impl<T, B: Buf> Flush<T, B> {
1286 fn new(codec: Codec<T, B>) -> Self {
1287 Flush { codec: Some(codec) }
1288 }
1289}
1290
1291impl<T, B> Future for Flush<T, B>
1292where
1293 T: AsyncWrite + Unpin,
1294 B: Buf,
1295{
1296 type Output = Result<Codec<T, B>, crate::Error>;
1297
1298 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1299 // Flush the codec
1300 ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1301
1302 // Return the codec
1303 Poll::Ready(Ok(self.codec.take().unwrap()))
1304 }
1305}
1306
1307impl<T, B: Buf> ReadPreface<T, B> {
1308 fn new(codec: Codec<T, B>) -> Self {
1309 ReadPreface {
1310 codec: Some(codec),
1311 pos: 0,
1312 }
1313 }
1314
1315 fn inner_mut(&mut self) -> &mut T {
1316 self.codec.as_mut().unwrap().get_mut()
1317 }
1318}
1319
1320impl<T, B> Future for ReadPreface<T, B>
1321where
1322 T: AsyncRead + Unpin,
1323 B: Buf,
1324{
1325 type Output = Result<Codec<T, B>, crate::Error>;
1326
1327 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1328 let mut buf = [0; 24];
1329 let mut rem = PREFACE.len() - self.pos;
1330
1331 while rem > 0 {
1332 let mut buf = ReadBuf::new(&mut buf[..rem]);
1333 ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1334 .map_err(crate::Error::from_io)?;
1335 let n = buf.filled().len();
1336 if n == 0 {
1337 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1338 io::ErrorKind::UnexpectedEof,
1339 "connection closed before reading preface",
1340 ))));
1341 }
1342
1343 if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1344 proto_err!(conn: "read_preface: invalid preface");
1345 // TODO: Should this just write the GO_AWAY frame directly?
1346 return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1347 }
1348
1349 self.pos += n;
1350 rem -= n; // TODO test
1351 }
1352
1353 Poll::Ready(Ok(self.codec.take().unwrap()))
1354 }
1355}
1356
1357// ===== impl Handshake =====
1358
1359impl<T, B: Buf> Future for Handshake<T, B>
1360where
1361 T: AsyncRead + AsyncWrite + Unpin,
1362 B: Buf,
1363{
1364 type Output = Result<Connection<T, B>, crate::Error>;
1365
1366 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1367 #[cfg(feature = "tracing")]
1368 let _span = self.span.clone().entered(); // XXX(eliza): T_T
1369 tracing::trace!(state = ?self.state);
1370
1371 loop {
1372 match &mut self.state {
1373 Handshaking::Flushing(flush) => {
1374 // We're currently flushing a pending SETTINGS frame. Poll the
1375 // flush future, and, if it's completed, advance our state to wait
1376 // for the client preface.
1377 let codec = match Pin::new(flush).poll(cx)? {
1378 Poll::Pending => {
1379 tracing::trace!(flush.poll = %"Pending");
1380 return Poll::Pending;
1381 }
1382 Poll::Ready(flushed) => {
1383 tracing::trace!(flush.poll = %"Ready");
1384 flushed
1385 }
1386 };
1387 self.state = Handshaking::ReadingPreface(
1388 #[cfg(feature = "tracing")]
1389 ReadPreface::new(codec).instrument(::tracing::trace_span!("read_preface")),
1390 #[cfg(not(feature = "tracing"))]
1391 ReadPreface::new(codec),
1392 );
1393 }
1394 Handshaking::ReadingPreface(read) => {
1395 let codec = ready!(Pin::new(read).poll(cx)?);
1396
1397 self.state = Handshaking::Done;
1398
1399 let connection = proto::Connection::new(
1400 codec,
1401 Config {
1402 next_stream_id: 2.into(),
1403 // Server does not need to locally initiate any streams
1404 initial_max_send_streams: 0,
1405 max_send_buffer_size: self.builder.max_send_buffer_size,
1406 reset_stream_duration: self.builder.reset_stream_duration,
1407 reset_stream_max: self.builder.reset_stream_max,
1408 remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1409 local_error_reset_streams_max: self
1410 .builder
1411 .local_max_error_reset_streams,
1412 settings: self.builder.settings.clone(),
1413 headers_stream_dependency: None,
1414 headers_pseudo_order: None,
1415 priorities: None,
1416 },
1417 );
1418
1419 tracing::trace!("connection established!");
1420 let mut c = Connection { connection };
1421 if let Some(sz) = self.builder.initial_target_connection_window_size {
1422 c.set_target_window_size(sz);
1423 }
1424
1425 return Poll::Ready(Ok(c));
1426 }
1427 Handshaking::Done => {
1428 panic!("Handshaking::poll() called again after handshaking was complete")
1429 }
1430 }
1431 }
1432 }
1433}
1434
1435impl<T, B> fmt::Debug for Handshake<T, B>
1436where
1437 T: AsyncRead + AsyncWrite + fmt::Debug,
1438 B: fmt::Debug + Buf,
1439{
1440 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1441 write!(fmt, "server::Handshake")
1442 }
1443}
1444
1445impl Peer {
1446 pub fn convert_send_message(
1447 id: StreamId,
1448 response: Response<()>,
1449 end_of_stream: bool,
1450 ) -> frame::Headers {
1451 use http::response::Parts;
1452
1453 // Extract the components of the HTTP request
1454 let (
1455 Parts {
1456 status, headers, ..
1457 },
1458 _,
1459 ) = response.into_parts();
1460
1461 // Build the set pseudo header set. All requests will include `method`
1462 // and `path`.
1463 let pseudo = Pseudo::response(status);
1464
1465 // Create the HEADERS frame
1466 let mut frame = frame::Headers::new(id, pseudo, headers);
1467
1468 if end_of_stream {
1469 frame.set_end_stream()
1470 }
1471
1472 frame
1473 }
1474
1475 pub fn convert_push_message(
1476 stream_id: StreamId,
1477 promised_id: StreamId,
1478 request: Request<()>,
1479 ) -> Result<frame::PushPromise, UserError> {
1480 use http::request::Parts;
1481
1482 if let Err(e) = frame::PushPromise::validate_request(&request) {
1483 use PushPromiseHeaderError::*;
1484 match e {
1485 NotSafeAndCacheable => tracing::debug!(
1486 ?promised_id,
1487 "convert_push_message: method {} is not safe and cacheable",
1488 request.method(),
1489 ),
1490 InvalidContentLength(_e) => tracing::debug!(
1491 ?promised_id,
1492 "convert_push_message; promised request has invalid content-length {:?}",
1493 _e,
1494 ),
1495 }
1496 return Err(UserError::MalformedHeaders);
1497 }
1498
1499 // Extract the components of the HTTP request
1500 let (
1501 Parts {
1502 method,
1503 uri,
1504 headers,
1505 ..
1506 },
1507 _,
1508 ) = request.into_parts();
1509
1510 let pseudo = Pseudo::request(method, uri, None);
1511
1512 Ok(frame::PushPromise::new(
1513 stream_id,
1514 promised_id,
1515 pseudo,
1516 headers,
1517 ))
1518 }
1519}
1520
1521impl proto::Peer for Peer {
1522 type Poll = Request<()>;
1523
1524 #[cfg(feature = "tracing")]
1525 const NAME: &'static str = "Server";
1526
1527 /*
1528 fn is_server() -> bool {
1529 true
1530 }
1531 */
1532
1533 fn r#dyn() -> proto::DynPeer {
1534 proto::DynPeer::Server
1535 }
1536
1537 fn convert_poll_message(
1538 pseudo: Pseudo,
1539 fields: HeaderMap,
1540 stream_id: StreamId,
1541 ) -> Result<Self::Poll, Error> {
1542 use http::{uri, Version};
1543
1544 let mut b = Request::builder();
1545
1546 macro_rules! malformed {
1547 ($($arg:tt)*) => {{
1548 tracing::debug!($($arg)*);
1549 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1550 }}
1551 }
1552
1553 b = b.version(Version::HTTP_2);
1554
1555 let is_connect;
1556 if let Some(method) = pseudo.method {
1557 is_connect = method == Method::CONNECT;
1558 b = b.method(method);
1559 } else {
1560 malformed!("malformed headers: missing method");
1561 }
1562
1563 let has_protocol = pseudo.protocol.is_some();
1564 if has_protocol {
1565 if is_connect {
1566 // Assert that we have the right type.
1567 b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1568 } else {
1569 malformed!("malformed headers: :protocol on non-CONNECT request");
1570 }
1571 }
1572
1573 if pseudo.status.is_some() {
1574 malformed!("malformed headers: :status field on request");
1575 }
1576
1577 // Convert the URI
1578 let mut parts = uri::Parts::default();
1579
1580 // A request translated from HTTP/1 must not include the :authority
1581 // header
1582 if let Some(authority) = pseudo.authority {
1583 let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1584 parts.authority = Some(maybe_authority.or_else(|_why| {
1585 malformed!(
1586 "malformed headers: malformed authority ({:?}): {}",
1587 authority,
1588 _why,
1589 )
1590 })?);
1591 }
1592
1593 // A :scheme is required, except CONNECT.
1594 if let Some(scheme) = pseudo.scheme {
1595 if is_connect && !has_protocol {
1596 malformed!("malformed headers: :scheme in CONNECT");
1597 }
1598 let maybe_scheme = scheme.parse();
1599 let scheme = maybe_scheme.or_else(|_why| {
1600 malformed!(
1601 "malformed headers: malformed scheme ({:?}): {}",
1602 scheme,
1603 _why,
1604 )
1605 })?;
1606
1607 // It's not possible to build an `Uri` from a scheme and path. So,
1608 // after validating is was a valid scheme, we just have to drop it
1609 // if there isn't an :authority.
1610 if parts.authority.is_some() {
1611 parts.scheme = Some(scheme);
1612 }
1613 } else if !is_connect || has_protocol {
1614 malformed!("malformed headers: missing scheme");
1615 }
1616
1617 if let Some(path) = pseudo.path {
1618 if is_connect && !has_protocol {
1619 malformed!("malformed headers: :path in CONNECT");
1620 }
1621
1622 // This cannot be empty
1623 if path.is_empty() {
1624 malformed!("malformed headers: missing path");
1625 }
1626
1627 let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1628 parts.path_and_query = Some(maybe_path.or_else(|_why| {
1629 malformed!("malformed headers: malformed path ({:?}): {}", path, _why,)
1630 })?);
1631 } else if is_connect && has_protocol {
1632 malformed!("malformed headers: missing path in extended CONNECT");
1633 }
1634
1635 b = b.uri(parts);
1636
1637 let mut request = match b.body(()) {
1638 Ok(request) => request,
1639 Err(_e) => {
1640 // TODO: Should there be more specialized handling for different
1641 // kinds of errors
1642 proto_err!(stream: "error building request: {}; stream={:?}", _e, stream_id);
1643 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1644 }
1645 };
1646
1647 *request.headers_mut() = fields;
1648
1649 Ok(request)
1650 }
1651}
1652
1653// ===== impl Handshaking =====
1654
1655impl<T, B> fmt::Debug for Handshaking<T, B>
1656where
1657 B: Buf,
1658{
1659 #[inline]
1660 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1661 match *self {
1662 Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1663 Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1664 Handshaking::Done => f.write_str("Done"),
1665 }
1666 }
1667}