Skip to main content

hyper_util/server/conn/auto/
mod.rs

1//! Http1 or Http2 connection.
2
3pub mod upgrade;
4
5use hyper::service::HttpService;
6use std::future::Future;
7use std::marker::PhantomPinned;
8use std::mem::MaybeUninit;
9use std::pin::Pin;
10use std::task::{ready, Context, Poll};
11use std::{error::Error as StdError, io, time::Duration};
12
13use bytes::Bytes;
14use http::{Request, Response};
15use http_body::Body;
16use hyper::{
17    body::Incoming,
18    rt::{Read, ReadBuf, Timer, Write},
19    service::Service,
20};
21
22#[cfg(feature = "http1")]
23use hyper::server::conn::http1;
24
25#[cfg(feature = "http2")]
26use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
27
28#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
29use std::marker::PhantomData;
30
31use pin_project_lite::pin_project;
32
33use crate::common::rewind::Rewind;
34
35type Error = Box<dyn std::error::Error + Send + Sync>;
36
37type Result<T> = std::result::Result<T, Error>;
38
39const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
40
41/// Exactly equivalent to [`Http2ServerConnExec`].
42#[cfg(feature = "http2")]
43pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
44
45#[cfg(feature = "http2")]
46impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
47
48/// Exactly equivalent to [`Http2ServerConnExec`].
49#[cfg(not(feature = "http2"))]
50pub trait HttpServerConnExec<A, B: Body> {}
51
52#[cfg(not(feature = "http2"))]
53impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
54
55/// Http1 or Http2 connection builder.
56#[derive(Clone, Debug)]
57pub struct Builder<E> {
58    #[cfg(feature = "http1")]
59    http1: http1::Builder,
60    #[cfg(feature = "http2")]
61    http2: http2::Builder<E>,
62    #[cfg(any(feature = "http1", feature = "http2"))]
63    version: Option<Version>,
64    #[cfg(not(feature = "http2"))]
65    _executor: E,
66}
67
68impl<E: Default> Default for Builder<E> {
69    fn default() -> Self {
70        Self::new(E::default())
71    }
72}
73
74impl<E> Builder<E> {
75    /// Create a new auto connection builder.
76    ///
77    /// `executor` parameter should be a type that implements
78    /// [`Executor`](hyper::rt::Executor) trait.
79    ///
80    /// # Example
81    ///
82    /// ```
83    /// use hyper_util::{
84    ///     rt::TokioExecutor,
85    ///     server::conn::auto,
86    /// };
87    ///
88    /// auto::Builder::new(TokioExecutor::new());
89    /// ```
90    pub fn new(executor: E) -> Self {
91        Self {
92            #[cfg(feature = "http1")]
93            http1: http1::Builder::new(),
94            #[cfg(feature = "http2")]
95            http2: http2::Builder::new(executor),
96            #[cfg(any(feature = "http1", feature = "http2"))]
97            version: None,
98            #[cfg(not(feature = "http2"))]
99            _executor: executor,
100        }
101    }
102
103    /// Http1 configuration.
104    #[cfg(feature = "http1")]
105    pub fn http1(&mut self) -> Http1Builder<'_, E> {
106        Http1Builder { inner: self }
107    }
108
109    /// Http2 configuration.
110    #[cfg(feature = "http2")]
111    pub fn http2(&mut self) -> Http2Builder<'_, E> {
112        Http2Builder { inner: self }
113    }
114
115    /// Only accepts HTTP/2
116    ///
117    /// Does not do anything if used with [`serve_connection_with_upgrades`]
118    ///
119    /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
120    #[cfg(feature = "http2")]
121    pub fn http2_only(mut self) -> Self {
122        assert!(self.version.is_none());
123        self.version = Some(Version::H2);
124        self
125    }
126
127    /// Only accepts HTTP/1
128    ///
129    /// Does not do anything if used with [`serve_connection_with_upgrades`]
130    ///
131    /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
132    #[cfg(feature = "http1")]
133    pub fn http1_only(mut self) -> Self {
134        assert!(self.version.is_none());
135        self.version = Some(Version::H1);
136        self
137    }
138
139    /// Returns `true` if this builder can serve an HTTP/1.1-based connection.
140    pub fn is_http1_available(&self) -> bool {
141        match self.version {
142            #[cfg(feature = "http1")]
143            Some(Version::H1) => true,
144            #[cfg(feature = "http2")]
145            Some(Version::H2) => false,
146            #[cfg(any(feature = "http1", feature = "http2"))]
147            _ => true,
148        }
149    }
150
151    /// Returns `true` if this builder can serve an HTTP/2-based connection.
152    pub fn is_http2_available(&self) -> bool {
153        match self.version {
154            #[cfg(feature = "http1")]
155            Some(Version::H1) => false,
156            #[cfg(feature = "http2")]
157            Some(Version::H2) => true,
158            #[cfg(any(feature = "http1", feature = "http2"))]
159            _ => true,
160        }
161    }
162
163    /// Set whether HTTP/1 connections will write header names as title case at
164    /// the socket level.
165    ///
166    /// This setting only affects HTTP/1 connections. HTTP/2 connections are
167    /// not affected by this setting.
168    ///
169    /// Default is false.
170    ///
171    /// # Example
172    ///
173    /// ```
174    /// use hyper_util::{
175    ///     rt::TokioExecutor,
176    ///     server::conn::auto,
177    /// };
178    ///
179    /// auto::Builder::new(TokioExecutor::new())
180    ///     .title_case_headers(true);
181    /// ```
182    #[cfg(feature = "http1")]
183    pub fn title_case_headers(mut self, enabled: bool) -> Self {
184        self.http1.title_case_headers(enabled);
185        self
186    }
187
188    /// Set whether HTTP/1 connections will preserve the original case of header names.
189    ///
190    /// This setting only affects HTTP/1 connections. HTTP/2 connections are
191    /// not affected by this setting.
192    ///
193    /// Default is false.
194    ///
195    /// # Example
196    ///
197    /// ```
198    /// use hyper_util::{
199    ///     rt::TokioExecutor,
200    ///     server::conn::auto,
201    /// };
202    ///
203    /// auto::Builder::new(TokioExecutor::new())
204    ///     .preserve_header_case(true);
205    /// ```
206    #[cfg(feature = "http1")]
207    pub fn preserve_header_case(mut self, enabled: bool) -> Self {
208        self.http1.preserve_header_case(enabled);
209        self
210    }
211
212    /// Bind a connection together with a [`Service`].
213    pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
214    where
215        S: Service<Request<Incoming>, Response = Response<B>>,
216        S::Future: 'static,
217        S::Error: Into<Box<dyn StdError + Send + Sync>>,
218        B: Body + 'static,
219        B::Error: Into<Box<dyn StdError + Send + Sync>>,
220        I: Read + Write + Unpin + 'static,
221        E: HttpServerConnExec<S::Future, B>,
222    {
223        let state = match self.version {
224            #[cfg(feature = "http1")]
225            Some(Version::H1) => {
226                let io = Rewind::new_buffered(io, Bytes::new());
227                let conn = self.http1.serve_connection(io, service);
228                ConnState::H1 { conn }
229            }
230            #[cfg(feature = "http2")]
231            Some(Version::H2) => {
232                let io = Rewind::new_buffered(io, Bytes::new());
233                let conn = self.http2.serve_connection(io, service);
234                ConnState::H2 { conn }
235            }
236            #[cfg(any(feature = "http1", feature = "http2"))]
237            _ => ConnState::ReadVersion {
238                read_version: read_version(io),
239                builder: Cow::Borrowed(self),
240                service: Some(service),
241            },
242        };
243
244        Connection { state }
245    }
246
247    /// Bind a connection together with a [`Service`], with the ability to
248    /// handle HTTP upgrades. This requires that the IO object implements
249    /// `Send`.
250    ///
251    /// Note that if you ever want to use [`hyper::upgrade::Upgraded::downcast`]
252    /// with this crate, you'll need to use [`hyper_util::server::conn::auto::upgrade::downcast`]
253    /// instead. See the documentation of the latter to understand why.
254    ///
255    /// [`hyper_util::server::conn::auto::upgrade::downcast`]: crate::server::conn::auto::upgrade::downcast
256    pub fn serve_connection_with_upgrades<I, S, B>(
257        &self,
258        io: I,
259        service: S,
260    ) -> UpgradeableConnection<'_, I, S, E>
261    where
262        S: Service<Request<Incoming>, Response = Response<B>>,
263        S::Future: 'static,
264        S::Error: Into<Box<dyn StdError + Send + Sync>>,
265        B: Body + 'static,
266        B::Error: Into<Box<dyn StdError + Send + Sync>>,
267        I: Read + Write + Unpin + Send + 'static,
268        E: HttpServerConnExec<S::Future, B>,
269    {
270        UpgradeableConnection {
271            state: UpgradeableConnState::ReadVersion {
272                read_version: read_version(io),
273                builder: Cow::Borrowed(self),
274                service: Some(service),
275            },
276        }
277    }
278}
279
280#[derive(Copy, Clone, Debug)]
281enum Version {
282    H1,
283    H2,
284}
285
286impl Version {
287    #[must_use]
288    #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
289    pub fn unsupported(self) -> Error {
290        match self {
291            Version::H1 => Error::from("HTTP/1 is not supported"),
292            Version::H2 => Error::from("HTTP/2 is not supported"),
293        }
294    }
295}
296
297fn read_version<I>(io: I) -> ReadVersion<I>
298where
299    I: Read + Unpin,
300{
301    ReadVersion {
302        io: Some(io),
303        buf: [MaybeUninit::uninit(); 24],
304        filled: 0,
305        version: Version::H2,
306        cancelled: false,
307        _pin: PhantomPinned,
308    }
309}
310
311pin_project! {
312    struct ReadVersion<I> {
313        io: Option<I>,
314        buf: [MaybeUninit<u8>; 24],
315        // the amount of `buf` thats been filled
316        filled: usize,
317        version: Version,
318        cancelled: bool,
319        // Make this future `!Unpin` for compatibility with async trait methods.
320        #[pin]
321        _pin: PhantomPinned,
322    }
323}
324
325impl<I> ReadVersion<I> {
326    pub fn cancel(self: Pin<&mut Self>) {
327        *self.project().cancelled = true;
328    }
329}
330
331impl<I> Future for ReadVersion<I>
332where
333    I: Read + Unpin,
334{
335    type Output = io::Result<(Version, Rewind<I>)>;
336
337    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
338        let this = self.project();
339        if *this.cancelled {
340            return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
341        }
342
343        let mut buf = ReadBuf::uninit(&mut *this.buf);
344        // SAFETY: `this.filled` tracks how many bytes have been read (and thus initialized) and
345        // we're only advancing by that many.
346        unsafe {
347            buf.unfilled().advance(*this.filled);
348        };
349
350        // We start as H2 and switch to H1 as soon as we don't have the preface.
351        while buf.filled().len() < H2_PREFACE.len() {
352            let len = buf.filled().len();
353            ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
354            *this.filled = buf.filled().len();
355
356            // We starts as H2 and switch to H1 when we don't get the preface.
357            if buf.filled().len() == len
358                || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
359            {
360                *this.version = Version::H1;
361                break;
362            }
363        }
364
365        let io = this.io.take().unwrap();
366        let buf = buf.filled().to_vec();
367        Poll::Ready(Ok((
368            *this.version,
369            Rewind::new_buffered(io, Bytes::from(buf)),
370        )))
371    }
372}
373
374pin_project! {
375    /// A [`Future`](core::future::Future) representing an HTTP/1 connection, returned from
376    /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection).
377    ///
378    /// To drive HTTP on this connection this future **must be polled**, typically with
379    /// `.await`. If it isn't polled, no progress will be made on this connection.
380    #[must_use = "futures do nothing unless polled"]
381    pub struct Connection<'a, I, S, E>
382    where
383        S: HttpService<Incoming>,
384    {
385        #[pin]
386        state: ConnState<'a, I, S, E>,
387    }
388}
389
390// A custom COW, since the libstd is has ToOwned bounds that are too eager.
391enum Cow<'a, T> {
392    Borrowed(&'a T),
393    Owned(T),
394}
395
396impl<T> std::ops::Deref for Cow<'_, T> {
397    type Target = T;
398    fn deref(&self) -> &T {
399        match self {
400            Cow::Borrowed(t) => &*t,
401            Cow::Owned(ref t) => t,
402        }
403    }
404}
405
406#[cfg(feature = "http1")]
407type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
408
409#[cfg(not(feature = "http1"))]
410type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
411
412#[cfg(feature = "http2")]
413type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
414
415#[cfg(not(feature = "http2"))]
416type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
417
418pin_project! {
419    #[project = ConnStateProj]
420    enum ConnState<'a, I, S, E>
421    where
422        S: HttpService<Incoming>,
423    {
424        ReadVersion {
425            #[pin]
426            read_version: ReadVersion<I>,
427            builder: Cow<'a, Builder<E>>,
428            service: Option<S>,
429        },
430        H1 {
431            #[pin]
432            conn: Http1Connection<I, S>,
433        },
434        H2 {
435            #[pin]
436            conn: Http2Connection<I, S, E>,
437        },
438    }
439}
440
441impl<I, S, E, B> Connection<'_, I, S, E>
442where
443    S: HttpService<Incoming, ResBody = B>,
444    S::Error: Into<Box<dyn StdError + Send + Sync>>,
445    I: Read + Write + Unpin,
446    B: Body + 'static,
447    B::Error: Into<Box<dyn StdError + Send + Sync>>,
448    E: HttpServerConnExec<S::Future, B>,
449{
450    /// Start a graceful shutdown process for this connection.
451    ///
452    /// This `Connection` should continue to be polled until shutdown can finish.
453    ///
454    /// # Note
455    ///
456    /// This should only be called while the `Connection` future is still pending. If called after
457    /// `Connection::poll` has resolved, this does nothing.
458    pub fn graceful_shutdown(self: Pin<&mut Self>) {
459        match self.project().state.project() {
460            ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
461            #[cfg(feature = "http1")]
462            ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
463            #[cfg(feature = "http2")]
464            ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
465            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
466            _ => unreachable!(),
467        }
468    }
469
470    /// Make this Connection static, instead of borrowing from Builder.
471    pub fn into_owned(self) -> Connection<'static, I, S, E>
472    where
473        Builder<E>: Clone,
474    {
475        Connection {
476            state: match self.state {
477                ConnState::ReadVersion {
478                    read_version,
479                    builder,
480                    service,
481                } => ConnState::ReadVersion {
482                    read_version,
483                    service,
484                    builder: Cow::Owned(builder.clone()),
485                },
486                #[cfg(feature = "http1")]
487                ConnState::H1 { conn } => ConnState::H1 { conn },
488                #[cfg(feature = "http2")]
489                ConnState::H2 { conn } => ConnState::H2 { conn },
490                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
491                _ => unreachable!(),
492            },
493        }
494    }
495}
496
497impl<I, S, E, B> Future for Connection<'_, I, S, E>
498where
499    S: Service<Request<Incoming>, Response = Response<B>>,
500    S::Future: 'static,
501    S::Error: Into<Box<dyn StdError + Send + Sync>>,
502    B: Body + 'static,
503    B::Error: Into<Box<dyn StdError + Send + Sync>>,
504    I: Read + Write + Unpin + 'static,
505    E: HttpServerConnExec<S::Future, B>,
506{
507    type Output = Result<()>;
508
509    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
510        loop {
511            let mut this = self.as_mut().project();
512
513            match this.state.as_mut().project() {
514                ConnStateProj::ReadVersion {
515                    read_version,
516                    builder,
517                    service,
518                } => {
519                    let (version, io) = ready!(read_version.poll(cx))?;
520                    let service = service.take().unwrap();
521                    match version {
522                        #[cfg(feature = "http1")]
523                        Version::H1 => {
524                            let conn = builder.http1.serve_connection(io, service);
525                            this.state.set(ConnState::H1 { conn });
526                        }
527                        #[cfg(feature = "http2")]
528                        Version::H2 => {
529                            let conn = builder.http2.serve_connection(io, service);
530                            this.state.set(ConnState::H2 { conn });
531                        }
532                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
533                        _ => return Poll::Ready(Err(version.unsupported())),
534                    }
535                }
536                #[cfg(feature = "http1")]
537                ConnStateProj::H1 { conn } => {
538                    return conn.poll(cx).map_err(Into::into);
539                }
540                #[cfg(feature = "http2")]
541                ConnStateProj::H2 { conn } => {
542                    return conn.poll(cx).map_err(Into::into);
543                }
544                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
545                _ => unreachable!(),
546            }
547        }
548    }
549}
550
551pin_project! {
552    /// An upgradable [`Connection`], returned by
553    /// [`Builder::serve_upgradable_connection`](struct.Builder.html#method.serve_connection_with_upgrades).
554    ///
555    /// To drive HTTP on this connection this future **must be polled**, typically with
556    /// `.await`. If it isn't polled, no progress will be made on this connection.
557    #[must_use = "futures do nothing unless polled"]
558    pub struct UpgradeableConnection<'a, I, S, E>
559    where
560        S: HttpService<Incoming>,
561    {
562        #[pin]
563        state: UpgradeableConnState<'a, I, S, E>,
564    }
565}
566
567#[cfg(feature = "http1")]
568type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
569
570#[cfg(not(feature = "http1"))]
571type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
572
573pin_project! {
574    #[project = UpgradeableConnStateProj]
575    enum UpgradeableConnState<'a, I, S, E>
576    where
577        S: HttpService<Incoming>,
578    {
579        ReadVersion {
580            #[pin]
581            read_version: ReadVersion<I>,
582            builder: Cow<'a, Builder<E>>,
583            service: Option<S>,
584        },
585        H1 {
586            #[pin]
587            conn: Http1UpgradeableConnection<Rewind<I>, S>,
588        },
589        H2 {
590            #[pin]
591            conn: Http2Connection<I, S, E>,
592        },
593    }
594}
595
596impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
597where
598    S: HttpService<Incoming, ResBody = B>,
599    S::Error: Into<Box<dyn StdError + Send + Sync>>,
600    I: Read + Write + Unpin,
601    B: Body + 'static,
602    B::Error: Into<Box<dyn StdError + Send + Sync>>,
603    E: HttpServerConnExec<S::Future, B>,
604{
605    /// Start a graceful shutdown process for this connection.
606    ///
607    /// This `UpgradeableConnection` should continue to be polled until shutdown can finish.
608    ///
609    /// # Note
610    ///
611    /// This should only be called while the `Connection` future is still nothing. pending. If
612    /// called after `UpgradeableConnection::poll` has resolved, this does nothing.
613    pub fn graceful_shutdown(self: Pin<&mut Self>) {
614        match self.project().state.project() {
615            UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
616            #[cfg(feature = "http1")]
617            UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
618            #[cfg(feature = "http2")]
619            UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
620            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
621            _ => unreachable!(),
622        }
623    }
624
625    /// Make this Connection static, instead of borrowing from Builder.
626    pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
627    where
628        Builder<E>: Clone,
629    {
630        UpgradeableConnection {
631            state: match self.state {
632                UpgradeableConnState::ReadVersion {
633                    read_version,
634                    builder,
635                    service,
636                } => UpgradeableConnState::ReadVersion {
637                    read_version,
638                    service,
639                    builder: Cow::Owned(builder.clone()),
640                },
641                #[cfg(feature = "http1")]
642                UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
643                #[cfg(feature = "http2")]
644                UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
645                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
646                _ => unreachable!(),
647            },
648        }
649    }
650}
651
652impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
653where
654    S: Service<Request<Incoming>, Response = Response<B>>,
655    S::Future: 'static,
656    S::Error: Into<Box<dyn StdError + Send + Sync>>,
657    B: Body + 'static,
658    B::Error: Into<Box<dyn StdError + Send + Sync>>,
659    I: Read + Write + Unpin + Send + 'static,
660    E: HttpServerConnExec<S::Future, B>,
661{
662    type Output = Result<()>;
663
664    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
665        loop {
666            let mut this = self.as_mut().project();
667
668            match this.state.as_mut().project() {
669                UpgradeableConnStateProj::ReadVersion {
670                    read_version,
671                    builder,
672                    service,
673                } => {
674                    let (version, io) = ready!(read_version.poll(cx))?;
675                    let service = service.take().unwrap();
676                    match version {
677                        #[cfg(feature = "http1")]
678                        Version::H1 => {
679                            let conn = builder.http1.serve_connection(io, service).with_upgrades();
680                            this.state.set(UpgradeableConnState::H1 { conn });
681                        }
682                        #[cfg(feature = "http2")]
683                        Version::H2 => {
684                            let conn = builder.http2.serve_connection(io, service);
685                            this.state.set(UpgradeableConnState::H2 { conn });
686                        }
687                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
688                        _ => return Poll::Ready(Err(version.unsupported())),
689                    }
690                }
691                #[cfg(feature = "http1")]
692                UpgradeableConnStateProj::H1 { conn } => {
693                    return conn.poll(cx).map_err(Into::into);
694                }
695                #[cfg(feature = "http2")]
696                UpgradeableConnStateProj::H2 { conn } => {
697                    return conn.poll(cx).map_err(Into::into);
698                }
699                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
700                _ => unreachable!(),
701            }
702        }
703    }
704}
705
706/// Http1 part of builder.
707#[cfg(feature = "http1")]
708pub struct Http1Builder<'a, E> {
709    inner: &'a mut Builder<E>,
710}
711
712#[cfg(feature = "http1")]
713impl<E> Http1Builder<'_, E> {
714    /// Http2 configuration.
715    #[cfg(feature = "http2")]
716    pub fn http2(&mut self) -> Http2Builder<'_, E> {
717        Http2Builder { inner: self.inner }
718    }
719
720    /// Set whether the `date` header should be included in HTTP responses.
721    ///
722    /// Note that including the `date` header is recommended by RFC 7231.
723    ///
724    /// Default is true.
725    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
726        self.inner.http1.auto_date_header(enabled);
727        self
728    }
729
730    /// Set whether HTTP/1 connections should support half-closures.
731    ///
732    /// Clients can chose to shutdown their write-side while waiting
733    /// for the server to respond. Setting this to `true` will
734    /// prevent closing the connection immediately if `read`
735    /// detects an EOF in the middle of a request.
736    ///
737    /// Default is `false`.
738    pub fn half_close(&mut self, val: bool) -> &mut Self {
739        self.inner.http1.half_close(val);
740        self
741    }
742
743    /// Enables or disables HTTP/1 keep-alive.
744    ///
745    /// Default is true.
746    pub fn keep_alive(&mut self, val: bool) -> &mut Self {
747        self.inner.http1.keep_alive(val);
748        self
749    }
750
751    /// Set whether HTTP/1 connections will write header names as title case at
752    /// the socket level.
753    ///
754    /// Note that this setting does not affect HTTP/2.
755    ///
756    /// Default is false.
757    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
758        self.inner.http1.title_case_headers(enabled);
759        self
760    }
761
762    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
763    ///
764    /// If this is enabled and a header line does not start with a valid header
765    /// name, or does not include a colon at all, the line will be silently ignored
766    /// and no error will be reported.
767    ///
768    /// Default is false.
769    pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
770        self.inner.http1.ignore_invalid_headers(enabled);
771        self
772    }
773
774    /// Set whether to support preserving original header cases.
775    ///
776    /// Currently, this will record the original cases received, and store them
777    /// in a private extension on the `Request`. It will also look for and use
778    /// such an extension in any provided `Response`.
779    ///
780    /// Since the relevant extension is still private, there is no way to
781    /// interact with the original cases. The only effect this can have now is
782    /// to forward the cases in a proxy-like fashion.
783    ///
784    /// Note that this setting does not affect HTTP/2.
785    ///
786    /// Default is false.
787    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
788        self.inner.http1.preserve_header_case(enabled);
789        self
790    }
791
792    /// Set the maximum number of headers.
793    ///
794    /// When a request is received, the parser will reserve a buffer to store headers for optimal
795    /// performance.
796    ///
797    /// If server receives more headers than the buffer size, it responds to the client with
798    /// "431 Request Header Fields Too Large".
799    ///
800    /// The headers is allocated on the stack by default, which has higher performance. After
801    /// setting this value, headers will be allocated in heap memory, that is, heap memory
802    /// allocation will occur for each request, and there will be a performance drop of about 5%.
803    ///
804    /// Note that this setting does not affect HTTP/2.
805    ///
806    /// Default is 100.
807    pub fn max_headers(&mut self, val: usize) -> &mut Self {
808        self.inner.http1.max_headers(val);
809        self
810    }
811
812    /// Set a timeout for reading client request headers. If a client does not
813    /// transmit the entire header within this time, the connection is closed.
814    ///
815    /// Requires a [`Timer`] set by [`Http1Builder::timer`] to take effect. Panics if `header_read_timeout` is configured
816    /// without a [`Timer`].
817    ///
818    /// Pass `None` to disable.
819    ///
820    /// Default is currently 30 seconds, but do not depend on that.
821    pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
822        self.inner.http1.header_read_timeout(read_timeout);
823        self
824    }
825
826    /// Set whether HTTP/1 connections should try to use vectored writes,
827    /// or always flatten into a single buffer.
828    ///
829    /// Note that setting this to false may mean more copies of body data,
830    /// but may also improve performance when an IO transport doesn't
831    /// support vectored writes well, such as most TLS implementations.
832    ///
833    /// Setting this to true will force hyper to use queued strategy
834    /// which may eliminate unnecessary cloning on some TLS backends
835    ///
836    /// Default is `auto`. In this mode hyper will try to guess which
837    /// mode to use
838    pub fn writev(&mut self, val: bool) -> &mut Self {
839        self.inner.http1.writev(val);
840        self
841    }
842
843    /// Set the maximum buffer size for the connection.
844    ///
845    /// Default is ~400kb.
846    ///
847    /// # Panics
848    ///
849    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
850    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
851        self.inner.http1.max_buf_size(max);
852        self
853    }
854
855    /// Aggregates flushes to better support pipelined responses.
856    ///
857    /// Experimental, may have bugs.
858    ///
859    /// Default is false.
860    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
861        self.inner.http1.pipeline_flush(enabled);
862        self
863    }
864
865    /// Set the timer used in background tasks.
866    pub fn timer<M>(&mut self, timer: M) -> &mut Self
867    where
868        M: Timer + Send + Sync + 'static,
869    {
870        self.inner.http1.timer(timer);
871        self
872    }
873
874    /// Bind a connection together with a [`Service`].
875    #[cfg(feature = "http2")]
876    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
877    where
878        S: Service<Request<Incoming>, Response = Response<B>>,
879        S::Future: 'static,
880        S::Error: Into<Box<dyn StdError + Send + Sync>>,
881        B: Body + 'static,
882        B::Error: Into<Box<dyn StdError + Send + Sync>>,
883        I: Read + Write + Unpin + 'static,
884        E: HttpServerConnExec<S::Future, B>,
885    {
886        self.inner.serve_connection(io, service).await
887    }
888
889    /// Bind a connection together with a [`Service`].
890    #[cfg(not(feature = "http2"))]
891    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
892    where
893        S: Service<Request<Incoming>, Response = Response<B>>,
894        S::Future: 'static,
895        S::Error: Into<Box<dyn StdError + Send + Sync>>,
896        B: Body + 'static,
897        B::Error: Into<Box<dyn StdError + Send + Sync>>,
898        I: Read + Write + Unpin + 'static,
899    {
900        self.inner.serve_connection(io, service).await
901    }
902
903    /// Bind a connection together with a [`Service`], with the ability to
904    /// handle HTTP upgrades. This requires that the IO object implements
905    /// `Send`.
906    #[cfg(feature = "http2")]
907    pub fn serve_connection_with_upgrades<I, S, B>(
908        &self,
909        io: I,
910        service: S,
911    ) -> UpgradeableConnection<'_, I, S, E>
912    where
913        S: Service<Request<Incoming>, Response = Response<B>>,
914        S::Future: 'static,
915        S::Error: Into<Box<dyn StdError + Send + Sync>>,
916        B: Body + 'static,
917        B::Error: Into<Box<dyn StdError + Send + Sync>>,
918        I: Read + Write + Unpin + Send + 'static,
919        E: HttpServerConnExec<S::Future, B>,
920    {
921        self.inner.serve_connection_with_upgrades(io, service)
922    }
923}
924
925/// Http2 part of builder.
926#[cfg(feature = "http2")]
927pub struct Http2Builder<'a, E> {
928    inner: &'a mut Builder<E>,
929}
930
931#[cfg(feature = "http2")]
932impl<E> Http2Builder<'_, E> {
933    #[cfg(feature = "http1")]
934    /// Http1 configuration.
935    pub fn http1(&mut self) -> Http1Builder<'_, E> {
936        Http1Builder { inner: self.inner }
937    }
938
939    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
940    ///
941    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
942    /// As of v0.4.0, it is 20.
943    ///
944    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
945    pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
946        self.inner.http2.max_pending_accept_reset_streams(max);
947        self
948    }
949
950    /// Configures the maximum number of local reset streams allowed before a GOAWAY will be sent.
951    ///
952    /// If not set, hyper will use a default, currently of 1024.
953    ///
954    /// If `None` is supplied, hyper will not apply any limit.
955    /// This is not advised, as it can potentially expose servers to DOS vulnerabilities.
956    ///
957    /// See <https://rustsec.org/advisories/RUSTSEC-2024-0003.html> for more information.
958    pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
959        self.inner.http2.max_local_error_reset_streams(max);
960        self
961    }
962
963    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
964    /// stream-level flow control.
965    ///
966    /// Passing `None` will do nothing.
967    ///
968    /// If not set, hyper will use a default.
969    ///
970    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
971    pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
972        self.inner.http2.initial_stream_window_size(sz);
973        self
974    }
975
976    /// Sets the max connection-level flow control for HTTP2.
977    ///
978    /// Passing `None` will do nothing.
979    ///
980    /// If not set, hyper will use a default.
981    pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
982        self.inner.http2.initial_connection_window_size(sz);
983        self
984    }
985
986    /// Sets whether to use an adaptive flow control.
987    ///
988    /// Enabling this will override the limits set in
989    /// `http2_initial_stream_window_size` and
990    /// `http2_initial_connection_window_size`.
991    pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
992        self.inner.http2.adaptive_window(enabled);
993        self
994    }
995
996    /// Sets the maximum frame size to use for HTTP2.
997    ///
998    /// Passing `None` will do nothing.
999    ///
1000    /// If not set, hyper will use a default.
1001    pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1002        self.inner.http2.max_frame_size(sz);
1003        self
1004    }
1005
1006    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
1007    /// connections.
1008    ///
1009    /// Default is 200. Passing `None` will remove any limit.
1010    ///
1011    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
1012    pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
1013        self.inner.http2.max_concurrent_streams(max);
1014        self
1015    }
1016
1017    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1018    /// connection alive.
1019    ///
1020    /// Pass `None` to disable HTTP2 keep-alive.
1021    ///
1022    /// Default is currently disabled.
1023    ///
1024    /// # Cargo Feature
1025    ///
1026    pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
1027        self.inner.http2.keep_alive_interval(interval);
1028        self
1029    }
1030
1031    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1032    ///
1033    /// If the ping is not acknowledged within the timeout, the connection will
1034    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1035    ///
1036    /// Default is 20 seconds.
1037    ///
1038    /// # Cargo Feature
1039    ///
1040    pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1041        self.inner.http2.keep_alive_timeout(timeout);
1042        self
1043    }
1044
1045    /// Set the maximum write buffer size for each HTTP/2 stream.
1046    ///
1047    /// Default is currently ~400KB, but may change.
1048    ///
1049    /// # Panics
1050    ///
1051    /// The value must be no larger than `u32::MAX`.
1052    pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
1053        self.inner.http2.max_send_buf_size(max);
1054        self
1055    }
1056
1057    /// Enables the [extended CONNECT protocol].
1058    ///
1059    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1060    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1061        self.inner.http2.enable_connect_protocol();
1062        self
1063    }
1064
1065    /// Sets the max size of received header frames.
1066    ///
1067    /// Default is currently ~16MB, but may change.
1068    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
1069        self.inner.http2.max_header_list_size(max);
1070        self
1071    }
1072
1073    /// Set the timer used in background tasks.
1074    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1075    where
1076        M: Timer + Send + Sync + 'static,
1077    {
1078        self.inner.http2.timer(timer);
1079        self
1080    }
1081
1082    /// Set whether the `date` header should be included in HTTP responses.
1083    ///
1084    /// Note that including the `date` header is recommended by RFC 7231.
1085    ///
1086    /// Default is true.
1087    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
1088        self.inner.http2.auto_date_header(enabled);
1089        self
1090    }
1091
1092    /// Bind a connection together with a [`Service`].
1093    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
1094    where
1095        S: Service<Request<Incoming>, Response = Response<B>>,
1096        S::Future: 'static,
1097        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1098        B: Body + 'static,
1099        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1100        I: Read + Write + Unpin + 'static,
1101        E: HttpServerConnExec<S::Future, B>,
1102    {
1103        self.inner.serve_connection(io, service).await
1104    }
1105
1106    /// Bind a connection together with a [`Service`], with the ability to
1107    /// handle HTTP upgrades. This requires that the IO object implements
1108    /// `Send`.
1109    pub fn serve_connection_with_upgrades<I, S, B>(
1110        &self,
1111        io: I,
1112        service: S,
1113    ) -> UpgradeableConnection<'_, I, S, E>
1114    where
1115        S: Service<Request<Incoming>, Response = Response<B>>,
1116        S::Future: 'static,
1117        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1118        B: Body + 'static,
1119        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1120        I: Read + Write + Unpin + Send + 'static,
1121        E: HttpServerConnExec<S::Future, B>,
1122    {
1123        self.inner.serve_connection_with_upgrades(io, service)
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use crate::{
1130        rt::{TokioExecutor, TokioIo},
1131        server::conn::auto,
1132    };
1133    use http::{Request, Response};
1134    use http_body::Body;
1135    use http_body_util::{BodyExt, Empty, Full};
1136    use hyper::{body, body::Bytes, client, service::service_fn};
1137    use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
1138    use tokio::{
1139        net::{TcpListener, TcpStream},
1140        pin,
1141    };
1142
1143    const BODY: &[u8] = b"Hello, world!";
1144
1145    #[test]
1146    fn configuration() {
1147        // One liner.
1148        auto::Builder::new(TokioExecutor::new())
1149            .http1()
1150            .keep_alive(true)
1151            .http2()
1152            .keep_alive_interval(None);
1153        //  .serve_connection(io, service);
1154
1155        // Using variable.
1156        let mut builder = auto::Builder::new(TokioExecutor::new());
1157
1158        builder.http1().keep_alive(true);
1159        builder.http2().keep_alive_interval(None);
1160        // builder.serve_connection(io, service);
1161    }
1162
1163    #[test]
1164    #[cfg(feature = "http1")]
1165    fn title_case_headers_configuration() {
1166        // Test title_case_headers can be set on the main builder
1167        auto::Builder::new(TokioExecutor::new()).title_case_headers(true);
1168
1169        // Can be combined with other configuration
1170        auto::Builder::new(TokioExecutor::new())
1171            .title_case_headers(true)
1172            .http1_only();
1173    }
1174
1175    #[test]
1176    #[cfg(feature = "http1")]
1177    fn preserve_header_case_configuration() {
1178        // Test preserve_header_case can be set on the main builder
1179        auto::Builder::new(TokioExecutor::new()).preserve_header_case(true);
1180
1181        // Can be combined with other configuration
1182        auto::Builder::new(TokioExecutor::new())
1183            .preserve_header_case(true)
1184            .http1_only();
1185    }
1186
1187    #[cfg(not(miri))]
1188    #[tokio::test]
1189    async fn http1() {
1190        let addr = start_server(false, false).await;
1191        let mut sender = connect_h1(addr).await;
1192
1193        let response = sender
1194            .send_request(Request::new(Empty::<Bytes>::new()))
1195            .await
1196            .unwrap();
1197
1198        let body = response.into_body().collect().await.unwrap().to_bytes();
1199
1200        assert_eq!(body, BODY);
1201    }
1202
1203    #[cfg(not(miri))]
1204    #[tokio::test]
1205    async fn http2() {
1206        let addr = start_server(false, false).await;
1207        let mut sender = connect_h2(addr).await;
1208
1209        let response = sender
1210            .send_request(Request::new(Empty::<Bytes>::new()))
1211            .await
1212            .unwrap();
1213
1214        let body = response.into_body().collect().await.unwrap().to_bytes();
1215
1216        assert_eq!(body, BODY);
1217    }
1218
1219    #[cfg(not(miri))]
1220    #[tokio::test]
1221    async fn http2_only() {
1222        let addr = start_server(false, true).await;
1223        let mut sender = connect_h2(addr).await;
1224
1225        let response = sender
1226            .send_request(Request::new(Empty::<Bytes>::new()))
1227            .await
1228            .unwrap();
1229
1230        let body = response.into_body().collect().await.unwrap().to_bytes();
1231
1232        assert_eq!(body, BODY);
1233    }
1234
1235    #[cfg(not(miri))]
1236    #[tokio::test]
1237    async fn http2_only_fail_if_client_is_http1() {
1238        let addr = start_server(false, true).await;
1239        let mut sender = connect_h1(addr).await;
1240
1241        let _ = sender
1242            .send_request(Request::new(Empty::<Bytes>::new()))
1243            .await
1244            .expect_err("should fail");
1245    }
1246
1247    #[cfg(not(miri))]
1248    #[tokio::test]
1249    async fn http1_only() {
1250        let addr = start_server(true, false).await;
1251        let mut sender = connect_h1(addr).await;
1252
1253        let response = sender
1254            .send_request(Request::new(Empty::<Bytes>::new()))
1255            .await
1256            .unwrap();
1257
1258        let body = response.into_body().collect().await.unwrap().to_bytes();
1259
1260        assert_eq!(body, BODY);
1261    }
1262
1263    #[cfg(not(miri))]
1264    #[tokio::test]
1265    async fn http1_only_fail_if_client_is_http2() {
1266        let addr = start_server(true, false).await;
1267        let mut sender = connect_h2(addr).await;
1268
1269        let _ = sender
1270            .send_request(Request::new(Empty::<Bytes>::new()))
1271            .await
1272            .expect_err("should fail");
1273    }
1274
1275    #[cfg(not(miri))]
1276    #[tokio::test]
1277    async fn graceful_shutdown() {
1278        let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1279            .await
1280            .unwrap();
1281
1282        let listener_addr = listener.local_addr().unwrap();
1283
1284        // Spawn the task in background so that we can connect there
1285        let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1286        // Only connect a stream, do not send headers or anything
1287        let _stream = TcpStream::connect(listener_addr).await.unwrap();
1288
1289        let (stream, _) = listen_task.await.unwrap();
1290        let stream = TokioIo::new(stream);
1291        let builder = auto::Builder::new(TokioExecutor::new());
1292        let connection = builder.serve_connection(stream, service_fn(hello));
1293
1294        pin!(connection);
1295
1296        connection.as_mut().graceful_shutdown();
1297
1298        let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1299            .await
1300            .expect("Connection should have finished in a timely manner after graceful shutdown.")
1301            .expect_err("Connection should have been interrupted.");
1302
1303        let connection_error = connection_error
1304            .downcast_ref::<std::io::Error>()
1305            .expect("The error should have been `std::io::Error`.");
1306        assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1307    }
1308
1309    async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1310    where
1311        B: Body + Send + 'static,
1312        B::Data: Send,
1313        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1314    {
1315        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1316        let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1317
1318        tokio::spawn(connection);
1319
1320        sender
1321    }
1322
1323    async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1324    where
1325        B: Body + Unpin + Send + 'static,
1326        B::Data: Send,
1327        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1328    {
1329        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1330        let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1331            .handshake(stream)
1332            .await
1333            .unwrap();
1334
1335        tokio::spawn(connection);
1336
1337        sender
1338    }
1339
1340    async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1341        let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1342        let listener = TcpListener::bind(addr).await.unwrap();
1343
1344        let local_addr = listener.local_addr().unwrap();
1345
1346        tokio::spawn(async move {
1347            loop {
1348                let (stream, _) = listener.accept().await.unwrap();
1349                let stream = TokioIo::new(stream);
1350                tokio::task::spawn(async move {
1351                    let mut builder = auto::Builder::new(TokioExecutor::new());
1352                    if h1_only {
1353                        builder = builder.http1_only();
1354                        builder.serve_connection(stream, service_fn(hello)).await
1355                    } else if h2_only {
1356                        builder = builder.http2_only();
1357                        builder.serve_connection(stream, service_fn(hello)).await
1358                    } else {
1359                        builder
1360                            .http2()
1361                            .max_header_list_size(4096)
1362                            .serve_connection_with_upgrades(stream, service_fn(hello))
1363                            .await
1364                    }
1365                    .unwrap();
1366                });
1367            }
1368        });
1369
1370        local_addr
1371    }
1372
1373    async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1374        Ok(Response::new(Full::new(Bytes::from(BODY))))
1375    }
1376}