hyper_util_wasm/server/conn/
auto.rs

1//! Http1 or Http2 connection.
2
3use futures_util::ready;
4use hyper::service::HttpService;
5use std::future::Future;
6use std::marker::PhantomPinned;
7use std::mem::MaybeUninit;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::{error::Error as StdError, io, time::Duration};
11
12use bytes::Bytes;
13use http::{Request, Response};
14use http_body::Body;
15use hyper::{
16    body::Incoming,
17    rt::{Read, ReadBuf, Timer, Write},
18    service::Service,
19};
20
21#[cfg(feature = "http1")]
22use hyper::server::conn::http1;
23
24#[cfg(feature = "http2")]
25use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
26
27#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
28use std::marker::PhantomData;
29
30use pin_project_lite::pin_project;
31
32use crate::common::rewind::Rewind;
33
34type Error = Box<dyn std::error::Error + Send + Sync>;
35
36type Result<T> = std::result::Result<T, Error>;
37
38const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
39
40/// Exactly equivalent to [`Http2ServerConnExec`].
41#[cfg(feature = "http2")]
42pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
43
44#[cfg(feature = "http2")]
45impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
46
47/// Exactly equivalent to [`Http2ServerConnExec`].
48#[cfg(not(feature = "http2"))]
49pub trait HttpServerConnExec<A, B: Body> {}
50
51#[cfg(not(feature = "http2"))]
52impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
53
54/// Http1 or Http2 connection builder.
55#[derive(Clone, Debug)]
56pub struct Builder<E> {
57    #[cfg(feature = "http1")]
58    http1: http1::Builder,
59    #[cfg(feature = "http2")]
60    http2: http2::Builder<E>,
61    #[cfg(any(feature = "http1", feature = "http2"))]
62    version: Option<Version>,
63    #[cfg(not(feature = "http2"))]
64    _executor: E,
65}
66
67impl<E> Builder<E> {
68    /// Create a new auto connection builder.
69    ///
70    /// `executor` parameter should be a type that implements
71    /// [`Executor`](hyper::rt::Executor) trait.
72    ///
73    /// # Example
74    ///
75    /// ```
76    /// use hyper_util::{
77    ///     rt::TokioExecutor,
78    ///     server::conn::auto,
79    /// };
80    ///
81    /// auto::Builder::new(TokioExecutor::new());
82    /// ```
83    pub fn new(executor: E) -> Self {
84        Self {
85            #[cfg(feature = "http1")]
86            http1: http1::Builder::new(),
87            #[cfg(feature = "http2")]
88            http2: http2::Builder::new(executor),
89            #[cfg(any(feature = "http1", feature = "http2"))]
90            version: None,
91            #[cfg(not(feature = "http2"))]
92            _executor: executor,
93        }
94    }
95
96    /// Http1 configuration.
97    #[cfg(feature = "http1")]
98    pub fn http1(&mut self) -> Http1Builder<'_, E> {
99        Http1Builder { inner: self }
100    }
101
102    /// Http2 configuration.
103    #[cfg(feature = "http2")]
104    pub fn http2(&mut self) -> Http2Builder<'_, E> {
105        Http2Builder { inner: self }
106    }
107
108    /// Only accepts HTTP/2
109    ///
110    /// Does not do anything if used with [`serve_connection_with_upgrades`]
111    #[cfg(feature = "http2")]
112    pub fn http2_only(mut self) -> Self {
113        assert!(self.version.is_none());
114        self.version = Some(Version::H2);
115        self
116    }
117
118    /// Only accepts HTTP/1
119    ///
120    /// Does not do anything if used with [`serve_connection_with_upgrades`]
121    #[cfg(feature = "http1")]
122    pub fn http1_only(mut self) -> Self {
123        assert!(self.version.is_none());
124        self.version = Some(Version::H1);
125        self
126    }
127
128    /// Bind a connection together with a [`Service`].
129    pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
130    where
131        S: Service<Request<Incoming>, Response = Response<B>>,
132        S::Future: 'static,
133        S::Error: Into<Box<dyn StdError + Send + Sync>>,
134        B: Body + 'static,
135        B::Error: Into<Box<dyn StdError + Send + Sync>>,
136        I: Read + Write + Unpin + 'static,
137        E: HttpServerConnExec<S::Future, B>,
138    {
139        let state = match self.version {
140            #[cfg(feature = "http1")]
141            Some(Version::H1) => {
142                let io = Rewind::new_buffered(io, Bytes::new());
143                let conn = self.http1.serve_connection(io, service);
144                ConnState::H1 { conn }
145            }
146            #[cfg(feature = "http2")]
147            Some(Version::H2) => {
148                let io = Rewind::new_buffered(io, Bytes::new());
149                let conn = self.http2.serve_connection(io, service);
150                ConnState::H2 { conn }
151            }
152            #[cfg(any(feature = "http1", feature = "http2"))]
153            _ => ConnState::ReadVersion {
154                read_version: read_version(io),
155                builder: Cow::Borrowed(self),
156                service: Some(service),
157            },
158        };
159
160        Connection { state }
161    }
162
163    /// Bind a connection together with a [`Service`], with the ability to
164    /// handle HTTP upgrades. This requires that the IO object implements
165    /// `Send`.
166    pub fn serve_connection_with_upgrades<I, S, B>(
167        &self,
168        io: I,
169        service: S,
170    ) -> UpgradeableConnection<'_, I, S, E>
171    where
172        S: Service<Request<Incoming>, Response = Response<B>>,
173        S::Future: 'static,
174        S::Error: Into<Box<dyn StdError + Send + Sync>>,
175        B: Body + 'static,
176        B::Error: Into<Box<dyn StdError + Send + Sync>>,
177        I: Read + Write + Unpin + Send + 'static,
178        E: HttpServerConnExec<S::Future, B>,
179    {
180        UpgradeableConnection {
181            state: UpgradeableConnState::ReadVersion {
182                read_version: read_version(io),
183                builder: Cow::Borrowed(self),
184                service: Some(service),
185            },
186        }
187    }
188}
189
190#[derive(Copy, Clone, Debug)]
191enum Version {
192    H1,
193    H2,
194}
195
196impl Version {
197    #[must_use]
198    #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
199    pub fn unsupported(self) -> Error {
200        match self {
201            Version::H1 => Error::from("HTTP/1 is not supported"),
202            Version::H2 => Error::from("HTTP/2 is not supported"),
203        }
204    }
205}
206
207fn read_version<I>(io: I) -> ReadVersion<I>
208where
209    I: Read + Unpin,
210{
211    ReadVersion {
212        io: Some(io),
213        buf: [MaybeUninit::uninit(); 24],
214        filled: 0,
215        version: Version::H2,
216        cancelled: false,
217        _pin: PhantomPinned,
218    }
219}
220
221pin_project! {
222    struct ReadVersion<I> {
223        io: Option<I>,
224        buf: [MaybeUninit<u8>; 24],
225        // the amount of `buf` thats been filled
226        filled: usize,
227        version: Version,
228        cancelled: bool,
229        // Make this future `!Unpin` for compatibility with async trait methods.
230        #[pin]
231        _pin: PhantomPinned,
232    }
233}
234
235impl<I> ReadVersion<I> {
236    pub fn cancel(self: Pin<&mut Self>) {
237        *self.project().cancelled = true;
238    }
239}
240
241impl<I> Future for ReadVersion<I>
242where
243    I: Read + Unpin,
244{
245    type Output = io::Result<(Version, Rewind<I>)>;
246
247    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248        let this = self.project();
249        if *this.cancelled {
250            return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
251        }
252
253        let mut buf = ReadBuf::uninit(&mut *this.buf);
254        // SAFETY: `this.filled` tracks how many bytes have been read (and thus initialized) and
255        // we're only advancing by that many.
256        unsafe {
257            buf.unfilled().advance(*this.filled);
258        };
259
260        // We start as H2 and switch to H1 as soon as we don't have the preface.
261        while buf.filled().len() < H2_PREFACE.len() {
262            let len = buf.filled().len();
263            ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
264            *this.filled = buf.filled().len();
265
266            // We starts as H2 and switch to H1 when we don't get the preface.
267            if buf.filled().len() == len
268                || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
269            {
270                *this.version = Version::H1;
271                break;
272            }
273        }
274
275        let io = this.io.take().unwrap();
276        let buf = buf.filled().to_vec();
277        Poll::Ready(Ok((
278            *this.version,
279            Rewind::new_buffered(io, Bytes::from(buf)),
280        )))
281    }
282}
283
284pin_project! {
285    /// Connection future.
286    pub struct Connection<'a, I, S, E>
287    where
288        S: HttpService<Incoming>,
289    {
290        #[pin]
291        state: ConnState<'a, I, S, E>,
292    }
293}
294
295// A custom COW, since the libstd is has ToOwned bounds that are too eager.
296enum Cow<'a, T> {
297    Borrowed(&'a T),
298    Owned(T),
299}
300
301impl<'a, T> std::ops::Deref for Cow<'a, T> {
302    type Target = T;
303    fn deref(&self) -> &T {
304        match self {
305            Cow::Borrowed(t) => &*t,
306            Cow::Owned(ref t) => t,
307        }
308    }
309}
310
311#[cfg(feature = "http1")]
312type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
313
314#[cfg(not(feature = "http1"))]
315type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
316
317#[cfg(feature = "http2")]
318type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
319
320#[cfg(not(feature = "http2"))]
321type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
322
323pin_project! {
324    #[project = ConnStateProj]
325    enum ConnState<'a, I, S, E>
326    where
327        S: HttpService<Incoming>,
328    {
329        ReadVersion {
330            #[pin]
331            read_version: ReadVersion<I>,
332            builder: Cow<'a, Builder<E>>,
333            service: Option<S>,
334        },
335        H1 {
336            #[pin]
337            conn: Http1Connection<I, S>,
338        },
339        H2 {
340            #[pin]
341            conn: Http2Connection<I, S, E>,
342        },
343    }
344}
345
346impl<I, S, E, B> Connection<'_, I, S, E>
347where
348    S: HttpService<Incoming, ResBody = B>,
349    S::Error: Into<Box<dyn StdError + Send + Sync>>,
350    I: Read + Write + Unpin,
351    B: Body + 'static,
352    B::Error: Into<Box<dyn StdError + Send + Sync>>,
353    E: HttpServerConnExec<S::Future, B>,
354{
355    /// Start a graceful shutdown process for this connection.
356    ///
357    /// This `Connection` should continue to be polled until shutdown can finish.
358    ///
359    /// # Note
360    ///
361    /// This should only be called while the `Connection` future is still pending. If called after
362    /// `Connection::poll` has resolved, this does nothing.
363    pub fn graceful_shutdown(self: Pin<&mut Self>) {
364        match self.project().state.project() {
365            ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
366            #[cfg(feature = "http1")]
367            ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
368            #[cfg(feature = "http2")]
369            ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
370            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
371            _ => unreachable!(),
372        }
373    }
374
375    /// Make this Connection static, instead of borrowing from Builder.
376    pub fn into_owned(self) -> Connection<'static, I, S, E>
377    where
378        Builder<E>: Clone,
379    {
380        Connection {
381            state: match self.state {
382                ConnState::ReadVersion {
383                    read_version,
384                    builder,
385                    service,
386                } => ConnState::ReadVersion {
387                    read_version,
388                    service,
389                    builder: Cow::Owned(builder.clone()),
390                },
391                #[cfg(feature = "http1")]
392                ConnState::H1 { conn } => ConnState::H1 { conn },
393                #[cfg(feature = "http2")]
394                ConnState::H2 { conn } => ConnState::H2 { conn },
395                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
396                _ => unreachable!(),
397            },
398        }
399    }
400}
401
402impl<I, S, E, B> Future for Connection<'_, I, S, E>
403where
404    S: Service<Request<Incoming>, Response = Response<B>>,
405    S::Future: 'static,
406    S::Error: Into<Box<dyn StdError + Send + Sync>>,
407    B: Body + 'static,
408    B::Error: Into<Box<dyn StdError + Send + Sync>>,
409    I: Read + Write + Unpin + 'static,
410    E: HttpServerConnExec<S::Future, B>,
411{
412    type Output = Result<()>;
413
414    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
415        loop {
416            let mut this = self.as_mut().project();
417
418            match this.state.as_mut().project() {
419                ConnStateProj::ReadVersion {
420                    read_version,
421                    builder,
422                    service,
423                } => {
424                    let (version, io) = ready!(read_version.poll(cx))?;
425                    let service = service.take().unwrap();
426                    match version {
427                        #[cfg(feature = "http1")]
428                        Version::H1 => {
429                            let conn = builder.http1.serve_connection(io, service);
430                            this.state.set(ConnState::H1 { conn });
431                        }
432                        #[cfg(feature = "http2")]
433                        Version::H2 => {
434                            let conn = builder.http2.serve_connection(io, service);
435                            this.state.set(ConnState::H2 { conn });
436                        }
437                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
438                        _ => return Poll::Ready(Err(version.unsupported())),
439                    }
440                }
441                #[cfg(feature = "http1")]
442                ConnStateProj::H1 { conn } => {
443                    return conn.poll(cx).map_err(Into::into);
444                }
445                #[cfg(feature = "http2")]
446                ConnStateProj::H2 { conn } => {
447                    return conn.poll(cx).map_err(Into::into);
448                }
449                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
450                _ => unreachable!(),
451            }
452        }
453    }
454}
455
456pin_project! {
457    /// Connection future.
458    pub struct UpgradeableConnection<'a, I, S, E>
459    where
460        S: HttpService<Incoming>,
461    {
462        #[pin]
463        state: UpgradeableConnState<'a, I, S, E>,
464    }
465}
466
467#[cfg(feature = "http1")]
468type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
469
470#[cfg(not(feature = "http1"))]
471type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
472
473pin_project! {
474    #[project = UpgradeableConnStateProj]
475    enum UpgradeableConnState<'a, I, S, E>
476    where
477        S: HttpService<Incoming>,
478    {
479        ReadVersion {
480            #[pin]
481            read_version: ReadVersion<I>,
482            builder: Cow<'a, Builder<E>>,
483            service: Option<S>,
484        },
485        H1 {
486            #[pin]
487            conn: Http1UpgradeableConnection<Rewind<I>, S>,
488        },
489        H2 {
490            #[pin]
491            conn: Http2Connection<I, S, E>,
492        },
493    }
494}
495
496impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
497where
498    S: HttpService<Incoming, ResBody = B>,
499    S::Error: Into<Box<dyn StdError + Send + Sync>>,
500    I: Read + Write + Unpin,
501    B: Body + 'static,
502    B::Error: Into<Box<dyn StdError + Send + Sync>>,
503    E: HttpServerConnExec<S::Future, B>,
504{
505    /// Start a graceful shutdown process for this connection.
506    ///
507    /// This `UpgradeableConnection` should continue to be polled until shutdown can finish.
508    ///
509    /// # Note
510    ///
511    /// This should only be called while the `Connection` future is still nothing. pending. If
512    /// called after `UpgradeableConnection::poll` has resolved, this does nothing.
513    pub fn graceful_shutdown(self: Pin<&mut Self>) {
514        match self.project().state.project() {
515            UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
516            #[cfg(feature = "http1")]
517            UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
518            #[cfg(feature = "http2")]
519            UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
520            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
521            _ => unreachable!(),
522        }
523    }
524
525    /// Make this Connection static, instead of borrowing from Builder.
526    pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
527    where
528        Builder<E>: Clone,
529    {
530        UpgradeableConnection {
531            state: match self.state {
532                UpgradeableConnState::ReadVersion {
533                    read_version,
534                    builder,
535                    service,
536                } => UpgradeableConnState::ReadVersion {
537                    read_version,
538                    service,
539                    builder: Cow::Owned(builder.clone()),
540                },
541                #[cfg(feature = "http1")]
542                UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
543                #[cfg(feature = "http2")]
544                UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
545                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
546                _ => unreachable!(),
547            },
548        }
549    }
550}
551
552impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
553where
554    S: Service<Request<Incoming>, Response = Response<B>>,
555    S::Future: 'static,
556    S::Error: Into<Box<dyn StdError + Send + Sync>>,
557    B: Body + 'static,
558    B::Error: Into<Box<dyn StdError + Send + Sync>>,
559    I: Read + Write + Unpin + Send + 'static,
560    E: HttpServerConnExec<S::Future, B>,
561{
562    type Output = Result<()>;
563
564    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
565        loop {
566            let mut this = self.as_mut().project();
567
568            match this.state.as_mut().project() {
569                UpgradeableConnStateProj::ReadVersion {
570                    read_version,
571                    builder,
572                    service,
573                } => {
574                    let (version, io) = ready!(read_version.poll(cx))?;
575                    let service = service.take().unwrap();
576                    match version {
577                        #[cfg(feature = "http1")]
578                        Version::H1 => {
579                            let conn = builder.http1.serve_connection(io, service).with_upgrades();
580                            this.state.set(UpgradeableConnState::H1 { conn });
581                        }
582                        #[cfg(feature = "http2")]
583                        Version::H2 => {
584                            let conn = builder.http2.serve_connection(io, service);
585                            this.state.set(UpgradeableConnState::H2 { conn });
586                        }
587                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
588                        _ => return Poll::Ready(Err(version.unsupported())),
589                    }
590                }
591                #[cfg(feature = "http1")]
592                UpgradeableConnStateProj::H1 { conn } => {
593                    return conn.poll(cx).map_err(Into::into);
594                }
595                #[cfg(feature = "http2")]
596                UpgradeableConnStateProj::H2 { conn } => {
597                    return conn.poll(cx).map_err(Into::into);
598                }
599                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
600                _ => unreachable!(),
601            }
602        }
603    }
604}
605
606/// Http1 part of builder.
607#[cfg(feature = "http1")]
608pub struct Http1Builder<'a, E> {
609    inner: &'a mut Builder<E>,
610}
611
612#[cfg(feature = "http1")]
613impl<E> Http1Builder<'_, E> {
614    /// Http2 configuration.
615    #[cfg(feature = "http2")]
616    pub fn http2(&mut self) -> Http2Builder<'_, E> {
617        Http2Builder { inner: self.inner }
618    }
619
620    /// Set whether HTTP/1 connections should support half-closures.
621    ///
622    /// Clients can chose to shutdown their write-side while waiting
623    /// for the server to respond. Setting this to `true` will
624    /// prevent closing the connection immediately if `read`
625    /// detects an EOF in the middle of a request.
626    ///
627    /// Default is `false`.
628    pub fn half_close(&mut self, val: bool) -> &mut Self {
629        self.inner.http1.half_close(val);
630        self
631    }
632
633    /// Enables or disables HTTP/1 keep-alive.
634    ///
635    /// Default is true.
636    pub fn keep_alive(&mut self, val: bool) -> &mut Self {
637        self.inner.http1.keep_alive(val);
638        self
639    }
640
641    /// Set whether HTTP/1 connections will write header names as title case at
642    /// the socket level.
643    ///
644    /// Note that this setting does not affect HTTP/2.
645    ///
646    /// Default is false.
647    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
648        self.inner.http1.title_case_headers(enabled);
649        self
650    }
651
652    /// Set whether to support preserving original header cases.
653    ///
654    /// Currently, this will record the original cases received, and store them
655    /// in a private extension on the `Request`. It will also look for and use
656    /// such an extension in any provided `Response`.
657    ///
658    /// Since the relevant extension is still private, there is no way to
659    /// interact with the original cases. The only effect this can have now is
660    /// to forward the cases in a proxy-like fashion.
661    ///
662    /// Note that this setting does not affect HTTP/2.
663    ///
664    /// Default is false.
665    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
666        self.inner.http1.preserve_header_case(enabled);
667        self
668    }
669
670    /// Set the maximum number of headers.
671    ///
672    /// When a request is received, the parser will reserve a buffer to store headers for optimal
673    /// performance.
674    ///
675    /// If server receives more headers than the buffer size, it responds to the client with
676    /// "431 Request Header Fields Too Large".
677    ///
678    /// The headers is allocated on the stack by default, which has higher performance. After
679    /// setting this value, headers will be allocated in heap memory, that is, heap memory
680    /// allocation will occur for each request, and there will be a performance drop of about 5%.
681    ///
682    /// Note that this setting does not affect HTTP/2.
683    ///
684    /// Default is 100.
685    pub fn max_headers(&mut self, val: usize) -> &mut Self {
686        self.inner.http1.max_headers(val);
687        self
688    }
689
690    /// Set a timeout for reading client request headers. If a client does not
691    /// transmit the entire header within this time, the connection is closed.
692    ///
693    /// Default is None.
694    pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
695        self.inner.http1.header_read_timeout(read_timeout);
696        self
697    }
698
699    /// Set whether HTTP/1 connections should try to use vectored writes,
700    /// or always flatten into a single buffer.
701    ///
702    /// Note that setting this to false may mean more copies of body data,
703    /// but may also improve performance when an IO transport doesn't
704    /// support vectored writes well, such as most TLS implementations.
705    ///
706    /// Setting this to true will force hyper to use queued strategy
707    /// which may eliminate unnecessary cloning on some TLS backends
708    ///
709    /// Default is `auto`. In this mode hyper will try to guess which
710    /// mode to use
711    pub fn writev(&mut self, val: bool) -> &mut Self {
712        self.inner.http1.writev(val);
713        self
714    }
715
716    /// Set the maximum buffer size for the connection.
717    ///
718    /// Default is ~400kb.
719    ///
720    /// # Panics
721    ///
722    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
723    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
724        self.inner.http1.max_buf_size(max);
725        self
726    }
727
728    /// Aggregates flushes to better support pipelined responses.
729    ///
730    /// Experimental, may have bugs.
731    ///
732    /// Default is false.
733    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
734        self.inner.http1.pipeline_flush(enabled);
735        self
736    }
737
738    /// Set the timer used in background tasks.
739    pub fn timer<M>(&mut self, timer: M) -> &mut Self
740    where
741        M: Timer + Send + Sync + 'static,
742    {
743        self.inner.http1.timer(timer);
744        self
745    }
746
747    /// Bind a connection together with a [`Service`].
748    #[cfg(feature = "http2")]
749    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
750    where
751        S: Service<Request<Incoming>, Response = Response<B>>,
752        S::Future: 'static,
753        S::Error: Into<Box<dyn StdError + Send + Sync>>,
754        B: Body + 'static,
755        B::Error: Into<Box<dyn StdError + Send + Sync>>,
756        I: Read + Write + Unpin + 'static,
757        E: HttpServerConnExec<S::Future, B>,
758    {
759        self.inner.serve_connection(io, service).await
760    }
761
762    /// Bind a connection together with a [`Service`].
763    #[cfg(not(feature = "http2"))]
764    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
765    where
766        S: Service<Request<Incoming>, Response = Response<B>>,
767        S::Future: 'static,
768        S::Error: Into<Box<dyn StdError + Send + Sync>>,
769        B: Body + 'static,
770        B::Error: Into<Box<dyn StdError + Send + Sync>>,
771        I: Read + Write + Unpin + 'static,
772    {
773        self.inner.serve_connection(io, service).await
774    }
775
776    /// Bind a connection together with a [`Service`], with the ability to
777    /// handle HTTP upgrades. This requires that the IO object implements
778    /// `Send`.
779    #[cfg(feature = "http2")]
780    pub fn serve_connection_with_upgrades<I, S, B>(
781        &self,
782        io: I,
783        service: S,
784    ) -> UpgradeableConnection<'_, I, S, E>
785    where
786        S: Service<Request<Incoming>, Response = Response<B>>,
787        S::Future: 'static,
788        S::Error: Into<Box<dyn StdError + Send + Sync>>,
789        B: Body + 'static,
790        B::Error: Into<Box<dyn StdError + Send + Sync>>,
791        I: Read + Write + Unpin + Send + 'static,
792        E: HttpServerConnExec<S::Future, B>,
793    {
794        self.inner.serve_connection_with_upgrades(io, service)
795    }
796}
797
798/// Http2 part of builder.
799#[cfg(feature = "http2")]
800pub struct Http2Builder<'a, E> {
801    inner: &'a mut Builder<E>,
802}
803
804#[cfg(feature = "http2")]
805impl<E> Http2Builder<'_, E> {
806    #[cfg(feature = "http1")]
807    /// Http1 configuration.
808    pub fn http1(&mut self) -> Http1Builder<'_, E> {
809        Http1Builder { inner: self.inner }
810    }
811
812    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
813    ///
814    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
815    /// As of v0.4.0, it is 20.
816    ///
817    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
818    pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
819        self.inner.http2.max_pending_accept_reset_streams(max);
820        self
821    }
822
823    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
824    /// stream-level flow control.
825    ///
826    /// Passing `None` will do nothing.
827    ///
828    /// If not set, hyper will use a default.
829    ///
830    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
831    pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
832        self.inner.http2.initial_stream_window_size(sz);
833        self
834    }
835
836    /// Sets the max connection-level flow control for HTTP2.
837    ///
838    /// Passing `None` will do nothing.
839    ///
840    /// If not set, hyper will use a default.
841    pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
842        self.inner.http2.initial_connection_window_size(sz);
843        self
844    }
845
846    /// Sets whether to use an adaptive flow control.
847    ///
848    /// Enabling this will override the limits set in
849    /// `http2_initial_stream_window_size` and
850    /// `http2_initial_connection_window_size`.
851    pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
852        self.inner.http2.adaptive_window(enabled);
853        self
854    }
855
856    /// Sets the maximum frame size to use for HTTP2.
857    ///
858    /// Passing `None` will do nothing.
859    ///
860    /// If not set, hyper will use a default.
861    pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
862        self.inner.http2.max_frame_size(sz);
863        self
864    }
865
866    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
867    /// connections.
868    ///
869    /// Default is 200. Passing `None` will remove any limit.
870    ///
871    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
872    pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
873        self.inner.http2.max_concurrent_streams(max);
874        self
875    }
876
877    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
878    /// connection alive.
879    ///
880    /// Pass `None` to disable HTTP2 keep-alive.
881    ///
882    /// Default is currently disabled.
883    ///
884    /// # Cargo Feature
885    ///
886    pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
887        self.inner.http2.keep_alive_interval(interval);
888        self
889    }
890
891    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
892    ///
893    /// If the ping is not acknowledged within the timeout, the connection will
894    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
895    ///
896    /// Default is 20 seconds.
897    ///
898    /// # Cargo Feature
899    ///
900    pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
901        self.inner.http2.keep_alive_timeout(timeout);
902        self
903    }
904
905    /// Set the maximum write buffer size for each HTTP/2 stream.
906    ///
907    /// Default is currently ~400KB, but may change.
908    ///
909    /// # Panics
910    ///
911    /// The value must be no larger than `u32::MAX`.
912    pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
913        self.inner.http2.max_send_buf_size(max);
914        self
915    }
916
917    /// Enables the [extended CONNECT protocol].
918    ///
919    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
920    pub fn enable_connect_protocol(&mut self) -> &mut Self {
921        self.inner.http2.enable_connect_protocol();
922        self
923    }
924
925    /// Sets the max size of received header frames.
926    ///
927    /// Default is currently ~16MB, but may change.
928    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
929        self.inner.http2.max_header_list_size(max);
930        self
931    }
932
933    /// Set the timer used in background tasks.
934    pub fn timer<M>(&mut self, timer: M) -> &mut Self
935    where
936        M: Timer + Send + Sync + 'static,
937    {
938        self.inner.http2.timer(timer);
939        self
940    }
941
942    /// Bind a connection together with a [`Service`].
943    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
944    where
945        S: Service<Request<Incoming>, Response = Response<B>>,
946        S::Future: 'static,
947        S::Error: Into<Box<dyn StdError + Send + Sync>>,
948        B: Body + 'static,
949        B::Error: Into<Box<dyn StdError + Send + Sync>>,
950        I: Read + Write + Unpin + 'static,
951        E: HttpServerConnExec<S::Future, B>,
952    {
953        self.inner.serve_connection(io, service).await
954    }
955
956    /// Bind a connection together with a [`Service`], with the ability to
957    /// handle HTTP upgrades. This requires that the IO object implements
958    /// `Send`.
959    pub fn serve_connection_with_upgrades<I, S, B>(
960        &self,
961        io: I,
962        service: S,
963    ) -> UpgradeableConnection<'_, I, S, E>
964    where
965        S: Service<Request<Incoming>, Response = Response<B>>,
966        S::Future: 'static,
967        S::Error: Into<Box<dyn StdError + Send + Sync>>,
968        B: Body + 'static,
969        B::Error: Into<Box<dyn StdError + Send + Sync>>,
970        I: Read + Write + Unpin + Send + 'static,
971        E: HttpServerConnExec<S::Future, B>,
972    {
973        self.inner.serve_connection_with_upgrades(io, service)
974    }
975}
976
977#[cfg(test)]
978mod tests {
979    use crate::{
980        rt::{TokioExecutor, TokioIo},
981        server::conn::auto,
982    };
983    use http::{Request, Response};
984    use http_body::Body;
985    use http_body_util::{BodyExt, Empty, Full};
986    use hyper::{body, body::Bytes, client, service::service_fn};
987    use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
988    use tokio::{
989        net::{TcpListener, TcpStream},
990        pin,
991    };
992
993    const BODY: &[u8] = b"Hello, world!";
994
995    #[test]
996    fn configuration() {
997        // One liner.
998        auto::Builder::new(TokioExecutor::new())
999            .http1()
1000            .keep_alive(true)
1001            .http2()
1002            .keep_alive_interval(None);
1003        //  .serve_connection(io, service);
1004
1005        // Using variable.
1006        let mut builder = auto::Builder::new(TokioExecutor::new());
1007
1008        builder.http1().keep_alive(true);
1009        builder.http2().keep_alive_interval(None);
1010        // builder.serve_connection(io, service);
1011    }
1012
1013    #[cfg(not(miri))]
1014    #[tokio::test]
1015    async fn http1() {
1016        let addr = start_server(false, false).await;
1017        let mut sender = connect_h1(addr).await;
1018
1019        let response = sender
1020            .send_request(Request::new(Empty::<Bytes>::new()))
1021            .await
1022            .unwrap();
1023
1024        let body = response.into_body().collect().await.unwrap().to_bytes();
1025
1026        assert_eq!(body, BODY);
1027    }
1028
1029    #[cfg(not(miri))]
1030    #[tokio::test]
1031    async fn http2() {
1032        let addr = start_server(false, false).await;
1033        let mut sender = connect_h2(addr).await;
1034
1035        let response = sender
1036            .send_request(Request::new(Empty::<Bytes>::new()))
1037            .await
1038            .unwrap();
1039
1040        let body = response.into_body().collect().await.unwrap().to_bytes();
1041
1042        assert_eq!(body, BODY);
1043    }
1044
1045    #[cfg(not(miri))]
1046    #[tokio::test]
1047    async fn http2_only() {
1048        let addr = start_server(false, true).await;
1049        let mut sender = connect_h2(addr).await;
1050
1051        let response = sender
1052            .send_request(Request::new(Empty::<Bytes>::new()))
1053            .await
1054            .unwrap();
1055
1056        let body = response.into_body().collect().await.unwrap().to_bytes();
1057
1058        assert_eq!(body, BODY);
1059    }
1060
1061    #[cfg(not(miri))]
1062    #[tokio::test]
1063    async fn http2_only_fail_if_client_is_http1() {
1064        let addr = start_server(false, true).await;
1065        let mut sender = connect_h1(addr).await;
1066
1067        let _ = sender
1068            .send_request(Request::new(Empty::<Bytes>::new()))
1069            .await
1070            .expect_err("should fail");
1071    }
1072
1073    #[cfg(not(miri))]
1074    #[tokio::test]
1075    async fn http1_only() {
1076        let addr = start_server(true, false).await;
1077        let mut sender = connect_h1(addr).await;
1078
1079        let response = sender
1080            .send_request(Request::new(Empty::<Bytes>::new()))
1081            .await
1082            .unwrap();
1083
1084        let body = response.into_body().collect().await.unwrap().to_bytes();
1085
1086        assert_eq!(body, BODY);
1087    }
1088
1089    #[cfg(not(miri))]
1090    #[tokio::test]
1091    async fn http1_only_fail_if_client_is_http2() {
1092        let addr = start_server(true, false).await;
1093        let mut sender = connect_h2(addr).await;
1094
1095        let _ = sender
1096            .send_request(Request::new(Empty::<Bytes>::new()))
1097            .await
1098            .expect_err("should fail");
1099    }
1100
1101    #[cfg(not(miri))]
1102    #[tokio::test]
1103    async fn graceful_shutdown() {
1104        let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1105            .await
1106            .unwrap();
1107
1108        let listener_addr = listener.local_addr().unwrap();
1109
1110        // Spawn the task in background so that we can connect there
1111        let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1112        // Only connect a stream, do not send headers or anything
1113        let _stream = TcpStream::connect(listener_addr).await.unwrap();
1114
1115        let (stream, _) = listen_task.await.unwrap();
1116        let stream = TokioIo::new(stream);
1117        let builder = auto::Builder::new(TokioExecutor::new());
1118        let connection = builder.serve_connection(stream, service_fn(hello));
1119
1120        pin!(connection);
1121
1122        connection.as_mut().graceful_shutdown();
1123
1124        let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1125            .await
1126            .expect("Connection should have finished in a timely manner after graceful shutdown.")
1127            .expect_err("Connection should have been interrupted.");
1128
1129        let connection_error = connection_error
1130            .downcast_ref::<std::io::Error>()
1131            .expect("The error should have been `std::io::Error`.");
1132        assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1133    }
1134
1135    async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1136    where
1137        B: Body + Send + 'static,
1138        B::Data: Send,
1139        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1140    {
1141        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1142        let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1143
1144        tokio::spawn(connection);
1145
1146        sender
1147    }
1148
1149    async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1150    where
1151        B: Body + Unpin + Send + 'static,
1152        B::Data: Send,
1153        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1154    {
1155        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1156        let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1157            .handshake(stream)
1158            .await
1159            .unwrap();
1160
1161        tokio::spawn(connection);
1162
1163        sender
1164    }
1165
1166    async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1167        let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1168        let listener = TcpListener::bind(addr).await.unwrap();
1169
1170        let local_addr = listener.local_addr().unwrap();
1171
1172        tokio::spawn(async move {
1173            loop {
1174                let (stream, _) = listener.accept().await.unwrap();
1175                let stream = TokioIo::new(stream);
1176                tokio::task::spawn(async move {
1177                    let mut builder = auto::Builder::new(TokioExecutor::new());
1178                    if h1_only {
1179                        builder = builder.http1_only();
1180                        builder.serve_connection(stream, service_fn(hello)).await
1181                    } else if h2_only {
1182                        builder = builder.http2_only();
1183                        builder.serve_connection(stream, service_fn(hello)).await
1184                    } else {
1185                        builder
1186                            .http2()
1187                            .max_header_list_size(4096)
1188                            .serve_connection_with_upgrades(stream, service_fn(hello))
1189                            .await
1190                    }
1191                    .unwrap();
1192                });
1193            }
1194        });
1195
1196        local_addr
1197    }
1198
1199    async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1200        Ok(Response::new(Full::new(Bytes::from(BODY))))
1201    }
1202}