monoio_http/h2/server.rs
1//! Server implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Monoio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/h2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use h2::server;
68//! use http::{Response, StatusCode};
69//! use monoio::net::TcpListener;
70//!
71//! #[monoio::main]
72//! pub async fn main() {
73//! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//! // Accept all incoming TCP connections.
76//! loop {
77//! if let Ok((socket, _peer_addr)) = listener.accept().await {
78//! // Spawn a new task to process each connection.
79//! monoio::spawn(async {
80//! // Start the HTTP/2 connection handshake
81//! let mut h2 = server::handshake(socket).await.unwrap();
82//! // Accept all inbound HTTP/2 streams sent over the
83//! // connection.
84//! while let Some(request) = h2.accept().await {
85//! let (request, mut respond) = request.unwrap();
86//! println!("Received request: {:?}", request);
87//!
88//! // Build a response with no body
89//! let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
90//!
91//! // Send the response back to the client
92//! respond.send_response(response, true).unwrap();
93//! }
94//! });
95//! }
96//! }
97//! }
98//! ```
99//!
100//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
101//! [`handshake`]: fn.handshake.html
102//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
103//! [`Builder`]: struct.Builder.html
104//! [`Connection`]: struct.Connection.html
105//! [`Connection::poll`]: struct.Connection.html#method.poll
106//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
107//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
108//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
109//! [`RecvStream`]: ../struct.RecvStream.html
110//! [`SendStream`]: ../struct.SendStream.html
111//! [`TcpListener`]: https://docs.rs/monoio/latest/monoio/net/tcp/struct.TcpListener.html
112
113use std::{
114 cell::UnsafeCell,
115 fmt,
116 future::Future,
117 io,
118 pin::Pin,
119 task::{Context, Poll},
120 time::Duration,
121};
122
123use bytes::{Buf, Bytes, BytesMut};
124use http::{HeaderMap, Method, Request, Response};
125use monoio::{
126 io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent},
127 BufResult,
128};
129use monoio_compat::box_future::MaybeArmedBoxFuture;
130// use monoio::io::{AsyncReadRent, AsyncWriteRent, AsyncReadRentExt};
131use tracing::instrument::{Instrument, Instrumented};
132
133use crate::h2::{
134 codec::{Codec, UserError},
135 frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId},
136 proto::{self, Config, Error, Prioritized},
137 FlowControl, PingPong, RecvStream, SendStream,
138};
139
140/// In progress HTTP/2 connection handshake future.
141///
142/// This type implements `Future`, yielding a `Connection` instance once the
143/// handshake has completed.
144///
145/// The handshake is completed once the connection preface is fully received
146/// from the client **and** the initial settings frame is sent to the client.
147///
148/// The handshake future does not wait for the initial settings frame from the
149/// client.
150///
151/// See [module] level docs for more details.
152///
153/// [module]: index.html
154#[must_use = "futures do nothing unless polled"]
155pub struct Handshake<T, B: Buf = Bytes> {
156 /// The config to pass to Connection::new after handshake succeeds.
157 builder: Builder,
158 /// The current state of the handshake.
159 state: Handshaking<T, B>,
160 /// Span tracking the handshake
161 span: tracing::Span,
162}
163
164/// Accepts inbound HTTP/2 streams on a connection.
165///
166/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
167/// implements the HTTP/2 server logic for that connection. It is responsible
168/// for receiving inbound streams initiated by the client as well as driving the
169/// internal state forward.
170///
171/// `Connection` values are created by calling [`handshake`]. Once a
172/// `Connection` value is obtained, the caller must call [`poll`] or
173/// [`poll_close`] in order to drive the internal connection state forward.
174///
175/// See [module level] documentation for more details
176///
177/// [module level]: index.html
178/// [`handshake`]: struct.Connection.html#method.handshake
179/// [`poll`]: struct.Connection.html#method.poll
180/// [`poll_close`]: struct.Connection.html#method.poll_close
181///
182/// # Examples
183///
184/// ```
185/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
186/// # use h2::server;
187/// # use h2::server::*;
188/// #
189/// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T) {
190/// let mut server = server::handshake(my_io).await.unwrap();
191/// while let Some(request) = server.accept().await {
192/// monoio::spawn(async move {
193/// let (request, respond) = request.unwrap();
194/// // Process the request and send the response back to the client
195/// // using `respond`.
196/// });
197/// }
198/// # }
199/// #
200/// # pub fn main() {}
201/// ```
202#[must_use = "streams do nothing unless polled"]
203pub struct Connection<T, B: Buf> {
204 connection: proto::Connection<T, Peer, B>,
205}
206
207/// Builds server connections with custom configuration values.
208///
209/// Methods can be chained in order to set the configuration values.
210///
211/// The server is constructed by calling [`handshake`] and passing the I/O
212/// handle that will back the HTTP/2 server.
213///
214/// New instances of `Builder` are obtained via [`Builder::new`].
215///
216/// See function level documentation for details on the various server
217/// configuration settings.
218///
219/// [`Builder::new`]: struct.Builder.html#method.new
220/// [`handshake`]: struct.Builder.html#method.handshake
221///
222/// # Examples
223///
224/// ```
225/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
226/// # use h2::server::*;
227/// #
228/// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
229/// # -> Handshake<T>
230/// # {
231/// // `server_fut` is a future representing the completion of the HTTP/2
232/// // handshake.
233/// let server_fut = Builder::new()
234/// .initial_window_size(1_000_000)
235/// .max_concurrent_streams(1000)
236/// .handshake(my_io);
237/// # server_fut
238/// # }
239/// #
240/// # pub fn main() {}
241/// ```
242#[derive(Clone, Debug)]
243pub struct Builder {
244 /// Time to keep locally reset streams around before reaping.
245 reset_stream_duration: Duration,
246
247 /// Maximum number of locally reset streams to keep at a time.
248 reset_stream_max: usize,
249
250 /// Initial `Settings` frame to send as part of the handshake.
251 settings: Settings,
252
253 /// Initial target window size for new connections.
254 initial_target_connection_window_size: Option<u32>,
255
256 /// Maximum amount of bytes to "buffer" for writing per stream.
257 max_send_buffer_size: usize,
258}
259
260/// Send a response back to the client
261///
262/// A `SendResponse` instance is provided when receiving a request and is used
263/// to send the associated response back to the client. It is also used to
264/// explicitly reset the stream with a custom reason.
265///
266/// It will also be used to initiate push promises linked with the associated
267/// stream.
268///
269/// If the `SendResponse` instance is dropped without sending a response, then
270/// the HTTP/2 stream will be reset.
271///
272/// See [module] level docs for more details.
273///
274/// [module]: index.html
275#[derive(Debug)]
276pub struct SendResponse<B: Buf> {
277 inner: proto::StreamRef<B>,
278}
279
280/// Send a response to a promised request
281///
282/// A `SendPushedResponse` instance is provided when promising a request and is used
283/// to send the associated response to the client. It is also used to
284/// explicitly reset the stream with a custom reason.
285///
286/// It can not be used to initiate push promises.
287///
288/// If the `SendPushedResponse` instance is dropped without sending a response, then
289/// the HTTP/2 stream will be reset.
290///
291/// See [module] level docs for more details.
292///
293/// [module]: index.html
294pub struct SendPushedResponse<B: Buf> {
295 inner: SendResponse<B>,
296}
297
298// Manual implementation necessary because of rust-lang/rust#26925
299impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
300 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
301 write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
302 }
303}
304
305/// Stages of an in-progress handshake.
306enum Handshaking<T, B: Buf> {
307 /// State 1. Connection is flushing pending SETTINGS frame.
308 Flushing(Instrumented<Flush<T, Prioritized<B>>>),
309 /// State 2. Connection is waiting for the client preface.
310 ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
311 /// State 3. Handshake is done, polling again would panic.
312 Done,
313}
314
315/// Flush a Sink
316struct Flush<T, B> {
317 codec: Option<Codec<T, B>>,
318}
319
320/// Read the client connection preface
321struct ReadPreface<T, B> {
322 read_fut: MaybeArmedBoxFuture<BufResult<usize, BytesMut>>, // pos: usize,
323 // Put it at the last to make sure futures depending on it drop first.
324 codec: Option<UnsafeCell<Codec<T, B>>>,
325}
326
327#[derive(Debug)]
328pub(crate) struct Peer;
329
330const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
331
332/// Creates a new configured HTTP/2 server with default configuration
333/// values backed by `io`.
334///
335/// It is expected that `io` already be in an appropriate state to commence
336/// the [HTTP/2 handshake]. See [Handshake] for more details.
337///
338/// Returns a future which resolves to the [`Connection`] instance once the
339/// HTTP/2 handshake has been completed. The returned [`Connection`]
340/// instance will be using default configuration values. Use [`Builder`] to
341/// customize the configuration values used by a [`Connection`] instance.
342///
343/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
344/// [Handshake]: ../index.html#handshake
345/// [`Connection`]: struct.Connection.html
346///
347/// # Examples
348///
349/// ```
350/// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
351/// # use h2::server;
352/// # use h2::server::*;
353/// #
354/// # async fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
355/// # {
356/// let connection = server::handshake(my_io).await.unwrap();
357/// // The HTTP/2 handshake has completed, now use `connection` to
358/// // accept inbound HTTP/2 streams.
359/// # }
360/// #
361/// # pub fn main() {}
362/// ```
363pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
364where
365 T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
366{
367 Builder::new().handshake(io)
368}
369
370// ===== impl Connection =====
371
372impl<T, B> Connection<T, B>
373where
374 T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
375 B: Buf + 'static,
376{
377 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
378 let span = tracing::trace_span!("server_handshake");
379 let entered = span.enter();
380
381 // Create the codec.
382 let mut codec = Codec::new(io);
383
384 if let Some(max) = builder.settings.max_frame_size() {
385 codec.set_max_recv_frame_size(max as usize);
386 }
387
388 if let Some(max) = builder.settings.max_header_list_size() {
389 codec.set_max_recv_header_list_size(max as usize);
390 }
391
392 // Send initial settings frame.
393 codec
394 .buffer(builder.settings.clone().into())
395 .expect("invalid SETTINGS frame");
396
397 // Create the handshake future.
398 let state =
399 Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
400
401 drop(entered);
402
403 Handshake {
404 builder,
405 state,
406 span,
407 }
408 }
409
410 /// Accept the next incoming request on this connection.
411 pub async fn accept(
412 &mut self,
413 ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::h2::Error>> {
414 std::future::poll_fn(move |cx| self.poll_accept(cx)).await
415 }
416
417 #[doc(hidden)]
418 pub fn poll_accept(
419 &mut self,
420 cx: &mut Context<'_>,
421 ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::h2::Error>>> {
422 // Always try to advance the internal state. Getting Pending also is
423 // needed to allow this function to return Pending.
424 if self.poll_closed(cx)?.is_ready() {
425 // If the socket is closed, don't return anything
426 // TODO: drop any pending streams
427 return Poll::Ready(None);
428 }
429
430 if let Some(inner) = self.connection.next_incoming() {
431 tracing::trace!("received incoming");
432 let (head, _) = inner.take_request().into_parts();
433 let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
434
435 let request = Request::from_parts(head, body);
436 let respond = SendResponse { inner };
437
438 return Poll::Ready(Some(Ok((request, respond))));
439 }
440
441 Poll::Pending
442 }
443
444 /// Sets the target window size for the whole connection.
445 ///
446 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
447 /// frame will be immediately sent to the remote, increasing the connection
448 /// level window by `size - current_value`.
449 ///
450 /// If `size` is less than the current value, nothing will happen
451 /// immediately. However, as window capacity is released by
452 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
453 /// out until the number of "in flight" bytes drops below `size`.
454 ///
455 /// The default value is 65,535.
456 ///
457 /// See [`FlowControl`] documentation for more details.
458 ///
459 /// [`FlowControl`]: ../struct.FlowControl.html
460 /// [library level]: ../index.html#flow-control
461 pub fn set_target_window_size(&mut self, size: u32) {
462 assert!(size <= proto::MAX_WINDOW_SIZE);
463 self.connection.set_target_window_size(size);
464 }
465
466 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
467 /// flow control for received data.
468 ///
469 /// The `SETTINGS` will be sent to the remote, and only applied once the
470 /// remote acknowledges the change.
471 ///
472 /// This can be used to increase or decrease the window size for existing
473 /// streams.
474 ///
475 /// # Errors
476 ///
477 /// Returns an error if a previous call is still pending acknowledgement
478 /// from the remote endpoint.
479 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::h2::Error> {
480 assert!(size <= proto::MAX_WINDOW_SIZE);
481 self.connection.set_initial_window_size(size)?;
482 Ok(())
483 }
484
485 /// Enables the [extended CONNECT protocol].
486 ///
487 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
488 ///
489 /// # Errors
490 ///
491 /// Returns an error if a previous call is still pending acknowledgement
492 /// from the remote endpoint.
493 pub fn enable_connect_protocol(&mut self) -> Result<(), crate::h2::Error> {
494 self.connection.set_enable_connect_protocol()?;
495 Ok(())
496 }
497
498 /// Returns `Ready` when the underlying connection has closed.
499 ///
500 /// If any new inbound streams are received during a call to `poll_closed`,
501 /// they will be queued and returned on the next call to [`poll_accept`].
502 ///
503 /// This function will advance the internal connection state, driving
504 /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
505 ///
506 /// See [here](index.html#managing-the-connection) for more details.
507 ///
508 /// [`poll_accept`]: struct.Connection.html#method.poll_accept
509 /// [`RecvStream`]: ../struct.RecvStream.html
510 /// [`SendStream`]: ../struct.SendStream.html
511 pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::h2::Error>> {
512 self.connection.poll(cx).map_err(Into::into)
513 }
514
515 #[doc(hidden)]
516 #[deprecated(note = "renamed to poll_closed")]
517 pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::h2::Error>> {
518 self.poll_closed(cx)
519 }
520
521 /// Sets the connection to a GOAWAY state.
522 ///
523 /// Does not terminate the connection. Must continue being polled to close
524 /// connection.
525 ///
526 /// After flushing the GOAWAY frame, the connection is closed. Any
527 /// outstanding streams do not prevent the connection from closing. This
528 /// should usually be reserved for shutting down when something bad
529 /// external to `h2` has happened, and open streams cannot be properly
530 /// handled.
531 ///
532 /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
533 pub fn abrupt_shutdown(&mut self, reason: Reason) {
534 self.connection.go_away_from_user(reason);
535 }
536
537 /// Starts a [graceful shutdown][1] process.
538 ///
539 /// Must continue being polled to close connection.
540 ///
541 /// It's possible to receive more requests after calling this method, since
542 /// they might have been in-flight from the client already. After about
543 /// 1 RTT, no new requests should be accepted. Once all active streams
544 /// have completed, the connection is closed.
545 ///
546 /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
547 pub fn graceful_shutdown(&mut self) {
548 self.connection.go_away_gracefully();
549 }
550
551 /// Takes a `PingPong` instance from the connection.
552 ///
553 /// # Note
554 ///
555 /// This may only be called once. Calling multiple times will return `None`.
556 pub fn ping_pong(&mut self) -> Option<PingPong> {
557 self.connection.take_user_pings().map(PingPong::new)
558 }
559
560 /// Returns the maximum number of concurrent streams that may be initiated
561 /// by the server on this connection.
562 ///
563 /// This limit is configured by the client peer by sending the
564 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
565 /// This method returns the currently acknowledged value received from the
566 /// remote.
567 ///
568 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
569 pub fn max_concurrent_send_streams(&self) -> usize {
570 self.connection.max_send_streams()
571 }
572
573 /// Returns the maximum number of concurrent streams that may be initiated
574 /// by the client on this connection.
575 ///
576 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
577 /// parameter][1] sent in a `SETTINGS` frame that has been
578 /// acknowledged by the remote peer. The value to be sent is configured by
579 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
580 /// with the remote peer.
581 ///
582 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
583 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
584 pub fn max_concurrent_recv_streams(&self) -> usize {
585 self.connection.max_recv_streams()
586 }
587}
588
589#[cfg(feature = "stream")]
590impl<T, B> futures_core::Stream for Connection<T, B>
591where
592 T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
593 B: Buf + 'static,
594{
595 type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::h2::Error>;
596
597 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
598 self.poll_accept(cx)
599 }
600}
601
602impl<T, B> fmt::Debug for Connection<T, B>
603where
604 T: fmt::Debug,
605 B: fmt::Debug + Buf,
606{
607 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
608 fmt.debug_struct("Connection")
609 .field("connection", &self.connection)
610 .finish()
611 }
612}
613
614// ===== impl Builder =====
615
616impl Builder {
617 /// Returns a new server builder instance initialized with default
618 /// configuration values.
619 ///
620 /// Configuration methods can be chained on the return value.
621 ///
622 /// # Examples
623 ///
624 /// ```
625 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
626 /// # use h2::server::*;
627 /// #
628 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
629 /// # -> Handshake<T>
630 /// # {
631 /// // `server_fut` is a future representing the completion of the HTTP/2
632 /// // handshake.
633 /// let server_fut = Builder::new()
634 /// .initial_window_size(1_000_000)
635 /// .max_concurrent_streams(1000)
636 /// .handshake(my_io);
637 /// # server_fut
638 /// # }
639 /// #
640 /// # pub fn main() {}
641 /// ```
642 pub fn new() -> Builder {
643 Builder {
644 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
645 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
646 settings: Settings::default(),
647 initial_target_connection_window_size: None,
648 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
649 }
650 }
651
652 /// Indicates the initial window size (in octets) for stream-level
653 /// flow control for received data.
654 ///
655 /// The initial window of a stream is used as part of flow control. For more
656 /// details, see [`FlowControl`].
657 ///
658 /// The default value is 65,535.
659 ///
660 /// [`FlowControl`]: ../struct.FlowControl.html
661 ///
662 /// # Examples
663 ///
664 /// ```
665 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
666 /// # use h2::server::*;
667 /// #
668 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
669 /// # -> Handshake<T>
670 /// # {
671 /// // `server_fut` is a future representing the completion of the HTTP/2
672 /// // handshake.
673 /// let server_fut = Builder::new()
674 /// .initial_window_size(1_000_000)
675 /// .handshake(my_io);
676 /// # server_fut
677 /// # }
678 /// #
679 /// # pub fn main() {}
680 /// ```
681 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
682 self.settings.set_initial_window_size(Some(size));
683 self
684 }
685
686 /// Indicates the initial window size (in octets) for connection-level flow control
687 /// for received data.
688 ///
689 /// The initial window of a connection is used as part of flow control. For more details,
690 /// see [`FlowControl`].
691 ///
692 /// The default value is 65,535.
693 ///
694 /// [`FlowControl`]: ../struct.FlowControl.html
695 ///
696 /// # Examples
697 ///
698 /// ```
699 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
700 /// # use h2::server::*;
701 /// #
702 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
703 /// # -> Handshake<T>
704 /// # {
705 /// // `server_fut` is a future representing the completion of the HTTP/2
706 /// // handshake.
707 /// let server_fut = Builder::new()
708 /// .initial_connection_window_size(1_000_000)
709 /// .handshake(my_io);
710 /// # server_fut
711 /// # }
712 /// #
713 /// # pub fn main() {}
714 /// ```
715 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
716 self.initial_target_connection_window_size = Some(size);
717 self
718 }
719
720 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
721 /// configured server is able to accept.
722 ///
723 /// The sender may send data frames that are **smaller** than this value,
724 /// but any data larger than `max` will be broken up into multiple `DATA`
725 /// frames.
726 ///
727 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
728 ///
729 /// # Examples
730 ///
731 /// ```
732 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
733 /// # use h2::server::*;
734 /// #
735 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
736 /// # -> Handshake<T>
737 /// # {
738 /// // `server_fut` is a future representing the completion of the HTTP/2
739 /// // handshake.
740 /// let server_fut = Builder::new().max_frame_size(1_000_000).handshake(my_io);
741 /// # server_fut
742 /// # }
743 /// #
744 /// # pub fn main() {}
745 /// ```
746 ///
747 /// # Panics
748 ///
749 /// This function panics if `max` is not within the legal range specified
750 /// above.
751 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
752 self.settings.set_max_frame_size(Some(max));
753 self
754 }
755
756 /// Sets the max size of received header frames.
757 ///
758 /// This advisory setting informs a peer of the maximum size of header list
759 /// that the sender is prepared to accept, in octets. The value is based on
760 /// the uncompressed size of header fields, including the length of the name
761 /// and value in octets plus an overhead of 32 octets for each header field.
762 ///
763 /// This setting is also used to limit the maximum amount of data that is
764 /// buffered to decode HEADERS frames.
765 ///
766 /// # Examples
767 ///
768 /// ```
769 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
770 /// # use h2::server::*;
771 /// #
772 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
773 /// # -> Handshake<T>
774 /// # {
775 /// // `server_fut` is a future representing the completion of the HTTP/2
776 /// // handshake.
777 /// let server_fut = Builder::new()
778 /// .max_header_list_size(16 * 1024)
779 /// .handshake(my_io);
780 /// # server_fut
781 /// # }
782 /// #
783 /// # pub fn main() {}
784 /// ```
785 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
786 self.settings.set_max_header_list_size(Some(max));
787 self
788 }
789
790 /// Sets the maximum number of concurrent streams.
791 ///
792 /// The maximum concurrent streams setting only controls the maximum number
793 /// of streams that can be initiated by the remote peer. In other words,
794 /// when this setting is set to 100, this does not limit the number of
795 /// concurrent streams that can be created by the caller.
796 ///
797 /// It is recommended that this value be no smaller than 100, so as to not
798 /// unnecessarily limit parallelism. However, any value is legal, including
799 /// 0. If `max` is set to 0, then the remote will not be permitted to
800 /// initiate streams.
801 ///
802 /// Note that streams in the reserved state, i.e., push promises that have
803 /// been reserved but the stream has not started, do not count against this
804 /// setting.
805 ///
806 /// Also note that if the remote *does* exceed the value set here, it is not
807 /// a protocol level error. Instead, the `h2` library will immediately reset
808 /// the stream.
809 ///
810 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
811 ///
812 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
813 ///
814 /// # Examples
815 ///
816 /// ```
817 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
818 /// # use h2::server::*;
819 /// #
820 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
821 /// # -> Handshake<T>
822 /// # {
823 /// // `server_fut` is a future representing the completion of the HTTP/2
824 /// // handshake.
825 /// let server_fut = Builder::new().max_concurrent_streams(1000).handshake(my_io);
826 /// # server_fut
827 /// # }
828 /// #
829 /// # pub fn main() {}
830 /// ```
831 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
832 self.settings.set_max_concurrent_streams(Some(max));
833 self
834 }
835
836 /// Sets the maximum number of concurrent locally reset streams.
837 ///
838 /// When a stream is explicitly reset by either calling
839 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
840 /// before completing the stream, the HTTP/2 specification requires that
841 /// any further frames received for that stream must be ignored for "some
842 /// time".
843 ///
844 /// In order to satisfy the specification, internal state must be maintained
845 /// to implement the behavior. This state grows linearly with the number of
846 /// streams that are locally reset.
847 ///
848 /// The `max_concurrent_reset_streams` setting configures sets an upper
849 /// bound on the amount of state that is maintained. When this max value is
850 /// reached, the oldest reset stream is purged from memory.
851 ///
852 /// Once the stream has been fully purged from memory, any additional frames
853 /// received for that stream will result in a connection level protocol
854 /// error, forcing the connection to terminate.
855 ///
856 /// The default value is 10.
857 ///
858 /// # Examples
859 ///
860 /// ```
861 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
862 /// # use h2::server::*;
863 /// #
864 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
865 /// # -> Handshake<T>
866 /// # {
867 /// // `server_fut` is a future representing the completion of the HTTP/2
868 /// // handshake.
869 /// let server_fut = Builder::new()
870 /// .max_concurrent_reset_streams(1000)
871 /// .handshake(my_io);
872 /// # server_fut
873 /// # }
874 /// #
875 /// # pub fn main() {}
876 /// ```
877 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
878 self.reset_stream_max = max;
879 self
880 }
881
882 /// Sets the maximum send buffer size per stream.
883 ///
884 /// Once a stream has buffered up to (or over) the maximum, the stream's
885 /// flow control will not "poll" additional capacity. Once bytes for the
886 /// stream have been written to the connection, the send buffer capacity
887 /// will be freed up again.
888 ///
889 /// The default is currently ~400MB, but may change.
890 ///
891 /// # Panics
892 ///
893 /// This function panics if `max` is larger than `u32::MAX`.
894 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
895 assert!(max <= u32::MAX as usize);
896 self.max_send_buffer_size = max;
897 self
898 }
899
900 /// Sets the maximum number of concurrent locally reset streams.
901 ///
902 /// When a stream is explicitly reset by either calling
903 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
904 /// before completing the stream, the HTTP/2 specification requires that
905 /// any further frames received for that stream must be ignored for "some
906 /// time".
907 ///
908 /// In order to satisfy the specification, internal state must be maintained
909 /// to implement the behavior. This state grows linearly with the number of
910 /// streams that are locally reset.
911 ///
912 /// The `reset_stream_duration` setting configures the max amount of time
913 /// this state will be maintained in memory. Once the duration elapses, the
914 /// stream state is purged from memory.
915 ///
916 /// Once the stream has been fully purged from memory, any additional frames
917 /// received for that stream will result in a connection level protocol
918 /// error, forcing the connection to terminate.
919 ///
920 /// The default value is 30 seconds.
921 ///
922 /// # Examples
923 ///
924 /// ```
925 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
926 /// # use h2::server::*;
927 /// # use std::time::Duration;
928 /// #
929 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
930 /// # -> Handshake<T>
931 /// # {
932 /// // `server_fut` is a future representing the completion of the HTTP/2
933 /// // handshake.
934 /// let server_fut = Builder::new()
935 /// .reset_stream_duration(Duration::from_secs(10))
936 /// .handshake(my_io);
937 /// # server_fut
938 /// # }
939 /// #
940 /// # pub fn main() {}
941 /// ```
942 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
943 self.reset_stream_duration = dur;
944 self
945 }
946
947 /// Enables the [extended CONNECT protocol].
948 ///
949 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
950 pub fn enable_connect_protocol(&mut self) -> &mut Self {
951 self.settings.set_enable_connect_protocol(Some(1));
952 self
953 }
954
955 /// Creates a new configured HTTP/2 server backed by `io`.
956 ///
957 /// It is expected that `io` already be in an appropriate state to commence
958 /// the [HTTP/2 handshake]. See [Handshake] for more details.
959 ///
960 /// Returns a future which resolves to the [`Connection`] instance once the
961 /// HTTP/2 handshake has been completed.
962 ///
963 /// This function also allows the caller to configure the send payload data
964 /// type. See [Outbound data type] for more details.
965 ///
966 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
967 /// [Handshake]: ../index.html#handshake
968 /// [`Connection`]: struct.Connection.html
969 /// [Outbound data type]: ../index.html#outbound-data-type.
970 ///
971 /// # Examples
972 ///
973 /// Basic usage:
974 ///
975 /// ```
976 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
977 /// # use h2::server::*;
978 /// #
979 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
980 /// # -> Handshake<T>
981 /// # {
982 /// // `server_fut` is a future representing the completion of the HTTP/2
983 /// // handshake.
984 /// let server_fut = Builder::new().handshake(my_io);
985 /// # server_fut
986 /// # }
987 /// #
988 /// # pub fn main() {}
989 /// ```
990 ///
991 /// Configures the send-payload data type. In this case, the outbound data
992 /// type will be `&'static [u8]`.
993 ///
994 /// ```
995 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
996 /// # use h2::server::*;
997 /// #
998 /// # fn doc<T: AsyncReadRent + AsyncWriteRent + Unpin>(my_io: T)
999 /// # -> Handshake<T, &'static [u8]>
1000 /// # {
1001 /// // `server_fut` is a future representing the completion of the HTTP/2
1002 /// // handshake.
1003 /// let server_fut: Handshake<_, &'static [u8]> = Builder::new().handshake(my_io);
1004 /// # server_fut
1005 /// # }
1006 /// #
1007 /// # pub fn main() {}
1008 /// ```
1009 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1010 where
1011 T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1012 B: Buf + 'static,
1013 {
1014 Connection::handshake2(io, self.clone())
1015 }
1016}
1017
1018impl Default for Builder {
1019 fn default() -> Builder {
1020 Builder::new()
1021 }
1022}
1023
1024// ===== impl SendResponse =====
1025
1026impl<B: Buf> SendResponse<B> {
1027 /// Send a response to a client request.
1028 ///
1029 /// On success, a [`SendStream`] instance is returned. This instance can be
1030 /// used to stream the response body and send trailers.
1031 ///
1032 /// If a body or trailers will be sent on the returned [`SendStream`]
1033 /// instance, then `end_of_stream` must be set to `false` when calling this
1034 /// function.
1035 ///
1036 /// The [`SendResponse`] instance is already associated with a received
1037 /// request. This function may only be called once per instance and only if
1038 /// [`send_reset`] has not been previously called.
1039 ///
1040 /// [`SendResponse`]: #
1041 /// [`SendStream`]: ../struct.SendStream.html
1042 /// [`send_reset`]: #method.send_reset
1043 pub fn send_response(
1044 &mut self,
1045 response: Response<()>,
1046 end_of_stream: bool,
1047 ) -> Result<SendStream<B>, crate::h2::Error> {
1048 self.inner
1049 .send_response(response, end_of_stream)
1050 .map(|_| SendStream::new(self.inner.clone()))
1051 .map_err(Into::into)
1052 }
1053
1054 /// Push a request and response to the client
1055 ///
1056 /// On success, a [`SendResponse`] instance is returned.
1057 ///
1058 /// [`SendResponse`]: #
1059 pub fn push_request(
1060 &mut self,
1061 request: Request<()>,
1062 ) -> Result<SendPushedResponse<B>, crate::h2::Error> {
1063 self.inner
1064 .send_push_promise(request)
1065 .map(|inner| SendPushedResponse {
1066 inner: SendResponse { inner },
1067 })
1068 .map_err(Into::into)
1069 }
1070
1071 /// Send a stream reset to the peer.
1072 ///
1073 /// This essentially cancels the stream, including any inbound or outbound
1074 /// data streams.
1075 ///
1076 /// If this function is called before [`send_response`], a call to
1077 /// [`send_response`] will result in an error.
1078 ///
1079 /// If this function is called while a [`SendStream`] instance is active,
1080 /// any further use of the instance will result in an error.
1081 ///
1082 /// This function should only be called once.
1083 ///
1084 /// [`send_response`]: #method.send_response
1085 /// [`SendStream`]: ../struct.SendStream.html
1086 pub fn send_reset(&mut self, reason: Reason) {
1087 self.inner.send_reset(reason)
1088 }
1089
1090 /// Polls to be notified when the client resets this stream.
1091 ///
1092 /// If stream is still open, this returns `Poll::Pending`, and
1093 /// registers the task to be notified if a `RST_STREAM` is received.
1094 ///
1095 /// If a `RST_STREAM` frame is received for this stream, calling this
1096 /// method will yield the `Reason` for the reset.
1097 ///
1098 /// # Error
1099 ///
1100 /// Calling this method after having called `send_response` will return
1101 /// a user error.
1102 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::h2::Error>> {
1103 self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1104 }
1105
1106 /// Returns the stream ID of the response stream.
1107 ///
1108 /// # Panics
1109 ///
1110 /// If the lock on the stream store has been poisoned.
1111 pub fn stream_id(&self) -> crate::h2::StreamId {
1112 crate::h2::StreamId::from_internal(self.inner.stream_id())
1113 }
1114}
1115
1116// ===== impl SendPushedResponse =====
1117
1118impl<B: Buf> SendPushedResponse<B> {
1119 /// Send a response to a promised request.
1120 ///
1121 /// On success, a [`SendStream`] instance is returned. This instance can be
1122 /// used to stream the response body and send trailers.
1123 ///
1124 /// If a body or trailers will be sent on the returned [`SendStream`]
1125 /// instance, then `end_of_stream` must be set to `false` when calling this
1126 /// function.
1127 ///
1128 /// The [`SendPushedResponse`] instance is associated with a promised
1129 /// request. This function may only be called once per instance and only if
1130 /// [`send_reset`] has not been previously called.
1131 ///
1132 /// [`SendPushedResponse`]: #
1133 /// [`SendStream`]: ../struct.SendStream.html
1134 /// [`send_reset`]: #method.send_reset
1135 pub fn send_response(
1136 &mut self,
1137 response: Response<()>,
1138 end_of_stream: bool,
1139 ) -> Result<SendStream<B>, crate::h2::Error> {
1140 self.inner.send_response(response, end_of_stream)
1141 }
1142
1143 /// Send a stream reset to the peer.
1144 ///
1145 /// This essentially cancels the stream, including any inbound or outbound
1146 /// data streams.
1147 ///
1148 /// If this function is called before [`send_response`], a call to
1149 /// [`send_response`] will result in an error.
1150 ///
1151 /// If this function is called while a [`SendStream`] instance is active,
1152 /// any further use of the instance will result in an error.
1153 ///
1154 /// This function should only be called once.
1155 ///
1156 /// [`send_response`]: #method.send_response
1157 /// [`SendStream`]: ../struct.SendStream.html
1158 pub fn send_reset(&mut self, reason: Reason) {
1159 self.inner.send_reset(reason)
1160 }
1161
1162 /// Polls to be notified when the client resets this stream.
1163 ///
1164 /// If stream is still open, this returns `Poll::Pending`, and
1165 /// registers the task to be notified if a `RST_STREAM` is received.
1166 ///
1167 /// If a `RST_STREAM` frame is received for this stream, calling this
1168 /// method will yield the `Reason` for the reset.
1169 ///
1170 /// # Error
1171 ///
1172 /// Calling this method after having called `send_response` will return
1173 /// a user error.
1174 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::h2::Error>> {
1175 self.inner.poll_reset(cx)
1176 }
1177
1178 /// Returns the stream ID of the response stream.
1179 ///
1180 /// # Panics
1181 ///
1182 /// If the lock on the stream store has been poisoned.
1183 pub fn stream_id(&self) -> crate::h2::StreamId {
1184 self.inner.stream_id()
1185 }
1186}
1187
1188// ===== impl Flush =====
1189
1190impl<T, B: Buf> Flush<T, B> {
1191 fn new(codec: Codec<T, B>) -> Self {
1192 Flush { codec: Some(codec) }
1193 }
1194}
1195
1196impl<T, B> Future for Flush<T, B>
1197where
1198 T: AsyncWriteRent + Unpin + 'static,
1199 B: Buf,
1200{
1201 type Output = Result<Codec<T, B>, crate::h2::Error>;
1202
1203 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1204 // Flush the codec
1205 ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::h2::Error::from_io)?;
1206
1207 // Return the codec
1208 Poll::Ready(Ok(self.codec.take().unwrap()))
1209 }
1210}
1211
1212impl<T, B: Buf> ReadPreface<T, B> {
1213 fn new(codec: Codec<T, B>) -> Self {
1214 ReadPreface {
1215 codec: Some(UnsafeCell::new(codec)),
1216 read_fut: Default::default(),
1217 }
1218 }
1219}
1220
1221impl<T, B> Future for ReadPreface<T, B>
1222where
1223 T: AsyncReadRent + Unpin + 'static,
1224 B: Buf + 'static,
1225{
1226 type Output = Result<Codec<T, B>, crate::h2::Error>;
1227
1228 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1229 let buf: [u8; PREFACE.len()] = [0; PREFACE.len()];
1230
1231 let owned_buf = BytesMut::from(&buf[..]);
1232
1233 if !self.read_fut.armed() {
1234 let io = unsafe { &mut *(self.codec.as_mut().unwrap().get()) };
1235 let io = io.get_mut();
1236 self.read_fut.arm_future(io.read_exact(owned_buf));
1237 }
1238
1239 {
1240 let (result, ret_buf) = ready!(self.read_fut.poll(cx));
1241 let n = result.map_err(crate::h2::Error::from_io)?;
1242
1243 if n != PREFACE.len() {
1244 println!("!!!!!!!!!!! ReadPreface failed");
1245 return Poll::Ready(Err(crate::h2::Error::from_io(io::Error::new(
1246 io::ErrorKind::UnexpectedEof,
1247 "connection closed before reading preface",
1248 ))));
1249 }
1250
1251 if PREFACE[..] != ret_buf[..] {
1252 println!("!!!!!!!!!!! ReadPreface failed");
1253 proto_err!(conn: "read_preface: invalid preface");
1254 // TODO: Should this just write the GO_AWAY frame directly?
1255 return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1256 }
1257 }
1258
1259 let codec = self.codec.take().unwrap();
1260
1261 Poll::Ready(Ok(codec.into_inner()))
1262 }
1263}
1264
1265// ===== impl Handshake =====
1266
1267impl<T, B: Buf> Future for Handshake<T, B>
1268where
1269 T: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
1270 B: Buf + 'static,
1271{
1272 type Output = Result<Connection<T, B>, crate::h2::Error>;
1273
1274 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1275 let span = self.span.clone(); // XXX(eliza): T_T
1276 let _e = span.enter();
1277 tracing::trace!(state = ?self.state);
1278 tracing::trace!("Poll Handshake");
1279
1280 loop {
1281 match &mut self.state {
1282 Handshaking::Flushing(flush) => {
1283 // We're currently flushing a pending SETTINGS frame. Poll the
1284 // flush future, and, if it's completed, advance our state to wait
1285 // for the client preface.
1286
1287 let codec = match Pin::new(flush).poll(cx)? {
1288 Poll::Pending => {
1289 tracing::trace!(flush.poll = %"Pending");
1290 return Poll::Pending;
1291 }
1292 Poll::Ready(flushed) => {
1293 tracing::trace!(flush.poll = %"Ready");
1294 flushed
1295 }
1296 };
1297 self.state = Handshaking::ReadingPreface(
1298 ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1299 );
1300 }
1301 Handshaking::ReadingPreface(read) => {
1302 let codec = ready!(Pin::new(read).poll(cx)?);
1303
1304 self.state = Handshaking::Done;
1305
1306 let connection = proto::Connection::new(
1307 codec,
1308 Config {
1309 next_stream_id: 2.into(),
1310 // Server does not need to locally initiate any streams
1311 initial_max_send_streams: 0,
1312 max_send_buffer_size: self.builder.max_send_buffer_size,
1313 reset_stream_duration: self.builder.reset_stream_duration,
1314 reset_stream_max: self.builder.reset_stream_max,
1315 settings: self.builder.settings.clone(),
1316 },
1317 );
1318
1319 tracing::trace!("connection established!");
1320 let mut c = Connection { connection };
1321 if let Some(sz) = self.builder.initial_target_connection_window_size {
1322 c.set_target_window_size(sz);
1323 }
1324
1325 return Poll::Ready(Ok(c));
1326 }
1327 Handshaking::Done => {
1328 panic!("Handshaking::poll() called again after handshaking was complete")
1329 }
1330 }
1331 }
1332 }
1333}
1334
1335impl<T, B> fmt::Debug for Handshake<T, B>
1336where
1337 T: AsyncReadRent + AsyncWriteRent + fmt::Debug,
1338 B: fmt::Debug + Buf,
1339{
1340 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1341 write!(fmt, "server::Handshake")
1342 }
1343}
1344
1345impl Peer {
1346 pub fn convert_send_message(
1347 id: StreamId,
1348 response: Response<()>,
1349 end_of_stream: bool,
1350 ) -> frame::Headers {
1351 use http::response::Parts;
1352
1353 // Extract the components of the HTTP request
1354 let (
1355 Parts {
1356 status, headers, ..
1357 },
1358 _,
1359 ) = response.into_parts();
1360
1361 // Build the set pseudo header set. All requests will include `method`
1362 // and `path`.
1363 let pseudo = Pseudo::response(status);
1364
1365 // Create the HEADERS frame
1366 let mut frame = frame::Headers::new(id, pseudo, headers);
1367
1368 if end_of_stream {
1369 frame.set_end_stream()
1370 }
1371
1372 frame
1373 }
1374
1375 pub fn convert_push_message(
1376 stream_id: StreamId,
1377 promised_id: StreamId,
1378 request: Request<()>,
1379 ) -> Result<frame::PushPromise, UserError> {
1380 use http::request::Parts;
1381
1382 if let Err(e) = frame::PushPromise::validate_request(&request) {
1383 use PushPromiseHeaderError::*;
1384 match e {
1385 NotSafeAndCacheable => tracing::debug!(
1386 ?promised_id,
1387 "convert_push_message: method {} is not safe and cacheable",
1388 request.method(),
1389 ),
1390 InvalidContentLength(e) => tracing::debug!(
1391 ?promised_id,
1392 "convert_push_message; promised request has invalid content-length {:?}",
1393 e,
1394 ),
1395 }
1396 return Err(UserError::MalformedHeaders);
1397 }
1398
1399 // Extract the components of the HTTP request
1400 let (
1401 Parts {
1402 method,
1403 uri,
1404 headers,
1405 ..
1406 },
1407 _,
1408 ) = request.into_parts();
1409
1410 let pseudo = Pseudo::request(method, uri, None);
1411
1412 Ok(frame::PushPromise::new(
1413 stream_id,
1414 promised_id,
1415 pseudo,
1416 headers,
1417 ))
1418 }
1419}
1420
1421impl proto::Peer for Peer {
1422 type Poll = Request<()>;
1423
1424 const NAME: &'static str = "Server";
1425
1426 fn is_server() -> bool {
1427 true
1428 }
1429
1430 fn r#dyn() -> proto::DynPeer {
1431 proto::DynPeer::Server
1432 }
1433
1434 fn convert_poll_message(
1435 pseudo: Pseudo,
1436 fields: HeaderMap,
1437 stream_id: StreamId,
1438 ) -> Result<Self::Poll, Error> {
1439 use http::{uri, Version};
1440
1441 let mut b = Request::builder();
1442
1443 macro_rules! malformed {
1444 ($($arg:tt)*) => {{
1445 tracing::debug!($($arg)*);
1446 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1447 }}
1448 }
1449
1450 b = b.version(Version::HTTP_2);
1451
1452 let is_connect;
1453 if let Some(method) = pseudo.method {
1454 is_connect = method == Method::CONNECT;
1455 b = b.method(method);
1456 } else {
1457 malformed!("malformed headers: missing method");
1458 }
1459
1460 let has_protocol = pseudo.protocol.is_some();
1461 if has_protocol {
1462 if is_connect {
1463 // Assert that we have the right type.
1464 b = b.extension::<crate::h2::ext::Protocol>(pseudo.protocol.unwrap());
1465 } else {
1466 malformed!("malformed headers: :protocol on non-CONNECT request");
1467 }
1468 }
1469
1470 if pseudo.status.is_some() {
1471 malformed!("malformed headers: :status field on request");
1472 }
1473
1474 // Convert the URI
1475 let mut parts = uri::Parts::default();
1476
1477 // A request translated from HTTP/1 must not include the :authority
1478 // header
1479 if let Some(authority) = pseudo.authority {
1480 let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1481 parts.authority = Some(maybe_authority.or_else(|why| {
1482 malformed!(
1483 "malformed headers: malformed authority ({:?}): {}",
1484 authority,
1485 why,
1486 )
1487 })?);
1488 }
1489
1490 // A :scheme is required, except CONNECT.
1491 if let Some(scheme) = pseudo.scheme {
1492 if is_connect && !has_protocol {
1493 malformed!("malformed headers: :scheme in CONNECT");
1494 }
1495 let maybe_scheme = scheme.parse();
1496 let scheme = maybe_scheme.or_else(|why| {
1497 malformed!(
1498 "malformed headers: malformed scheme ({:?}): {}",
1499 scheme,
1500 why,
1501 )
1502 })?;
1503
1504 // It's not possible to build an `Uri` from a scheme and path. So,
1505 // after validating is was a valid scheme, we just have to drop it
1506 // if there isn't an :authority.
1507 if parts.authority.is_some() {
1508 parts.scheme = Some(scheme);
1509 }
1510 } else if !is_connect || has_protocol {
1511 malformed!("malformed headers: missing scheme");
1512 }
1513
1514 if let Some(path) = pseudo.path {
1515 if is_connect && !has_protocol {
1516 malformed!("malformed headers: :path in CONNECT");
1517 }
1518
1519 // This cannot be empty
1520 if path.is_empty() {
1521 malformed!("malformed headers: missing path");
1522 }
1523
1524 let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1525 parts.path_and_query = Some(maybe_path.or_else(|why| {
1526 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1527 })?);
1528 } else if is_connect && has_protocol {
1529 malformed!("malformed headers: missing path in extended CONNECT");
1530 }
1531
1532 b = b.uri(parts);
1533
1534 let mut request = match b.body(()) {
1535 Ok(request) => request,
1536 Err(e) => {
1537 // TODO: Should there be more specialized handling for different
1538 // kinds of errors
1539 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1540 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1541 }
1542 };
1543
1544 *request.headers_mut() = fields;
1545
1546 Ok(request)
1547 }
1548}
1549
1550// ===== impl Handshaking =====
1551
1552impl<T, B> fmt::Debug for Handshaking<T, B>
1553where
1554 B: Buf,
1555{
1556 #[inline]
1557 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1558 match *self {
1559 Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1560 Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1561 Handshaking::Done => f.write_str("Done"),
1562 }
1563 }
1564}