1pub mod upgrade;
4
5use hyper::service::HttpService;
6use std::future::Future;
7use std::marker::PhantomPinned;
8use std::mem::MaybeUninit;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::{error::Error as StdError, io, time::Duration};
12
13use bytes::Bytes;
14use futures_core::ready;
15use http::{Request, Response};
16use http_body::Body;
17use hyper::{
18    body::Incoming,
19    rt::{Read, ReadBuf, Timer, Write},
20    service::Service,
21};
22
23#[cfg(feature = "http1")]
24use hyper::server::conn::http1;
25
26#[cfg(feature = "http2")]
27use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
28
29#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
30use std::marker::PhantomData;
31
32use pin_project_lite::pin_project;
33
34use crate::common::rewind::Rewind;
35
36type Error = Box<dyn std::error::Error + Send + Sync>;
37
38type Result<T> = std::result::Result<T, Error>;
39
40const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
41
42#[cfg(feature = "http2")]
44pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
45
46#[cfg(feature = "http2")]
47impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
48
49#[cfg(not(feature = "http2"))]
51pub trait HttpServerConnExec<A, B: Body> {}
52
53#[cfg(not(feature = "http2"))]
54impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
55
56#[derive(Clone, Debug)]
58pub struct Builder<E> {
59    #[cfg(feature = "http1")]
60    http1: http1::Builder,
61    #[cfg(feature = "http2")]
62    http2: http2::Builder<E>,
63    #[cfg(any(feature = "http1", feature = "http2"))]
64    version: Option<Version>,
65    #[cfg(not(feature = "http2"))]
66    _executor: E,
67}
68
69impl<E: Default> Default for Builder<E> {
70    fn default() -> Self {
71        Self::new(E::default())
72    }
73}
74
75impl<E> Builder<E> {
76    pub fn new(executor: E) -> Self {
92        Self {
93            #[cfg(feature = "http1")]
94            http1: http1::Builder::new(),
95            #[cfg(feature = "http2")]
96            http2: http2::Builder::new(executor),
97            #[cfg(any(feature = "http1", feature = "http2"))]
98            version: None,
99            #[cfg(not(feature = "http2"))]
100            _executor: executor,
101        }
102    }
103
104    #[cfg(feature = "http1")]
106    pub fn http1(&mut self) -> Http1Builder<'_, E> {
107        Http1Builder { inner: self }
108    }
109
110    #[cfg(feature = "http2")]
112    pub fn http2(&mut self) -> Http2Builder<'_, E> {
113        Http2Builder { inner: self }
114    }
115
116    #[cfg(feature = "http2")]
122    pub fn http2_only(mut self) -> Self {
123        assert!(self.version.is_none());
124        self.version = Some(Version::H2);
125        self
126    }
127
128    #[cfg(feature = "http1")]
134    pub fn http1_only(mut self) -> Self {
135        assert!(self.version.is_none());
136        self.version = Some(Version::H1);
137        self
138    }
139
140    pub fn is_http1_available(&self) -> bool {
142        match self.version {
143            #[cfg(feature = "http1")]
144            Some(Version::H1) => true,
145            #[cfg(feature = "http2")]
146            Some(Version::H2) => false,
147            #[cfg(any(feature = "http1", feature = "http2"))]
148            _ => true,
149        }
150    }
151
152    pub fn is_http2_available(&self) -> bool {
154        match self.version {
155            #[cfg(feature = "http1")]
156            Some(Version::H1) => false,
157            #[cfg(feature = "http2")]
158            Some(Version::H2) => true,
159            #[cfg(any(feature = "http1", feature = "http2"))]
160            _ => true,
161        }
162    }
163
164    pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
166    where
167        S: Service<Request<Incoming>, Response = Response<B>>,
168        S::Future: 'static,
169        S::Error: Into<Box<dyn StdError + Send + Sync>>,
170        B: Body + 'static,
171        B::Error: Into<Box<dyn StdError + Send + Sync>>,
172        I: Read + Write + Unpin + 'static,
173        E: HttpServerConnExec<S::Future, B>,
174    {
175        let state = match self.version {
176            #[cfg(feature = "http1")]
177            Some(Version::H1) => {
178                let io = Rewind::new_buffered(io, Bytes::new());
179                let conn = self.http1.serve_connection(io, service);
180                ConnState::H1 { conn }
181            }
182            #[cfg(feature = "http2")]
183            Some(Version::H2) => {
184                let io = Rewind::new_buffered(io, Bytes::new());
185                let conn = self.http2.serve_connection(io, service);
186                ConnState::H2 { conn }
187            }
188            #[cfg(any(feature = "http1", feature = "http2"))]
189            _ => ConnState::ReadVersion {
190                read_version: read_version(io),
191                builder: Cow::Borrowed(self),
192                service: Some(service),
193            },
194        };
195
196        Connection { state }
197    }
198
199    pub fn serve_connection_with_upgrades<I, S, B>(
209        &self,
210        io: I,
211        service: S,
212    ) -> UpgradeableConnection<'_, I, S, E>
213    where
214        S: Service<Request<Incoming>, Response = Response<B>>,
215        S::Future: 'static,
216        S::Error: Into<Box<dyn StdError + Send + Sync>>,
217        B: Body + 'static,
218        B::Error: Into<Box<dyn StdError + Send + Sync>>,
219        I: Read + Write + Unpin + Send + 'static,
220        E: HttpServerConnExec<S::Future, B>,
221    {
222        UpgradeableConnection {
223            state: UpgradeableConnState::ReadVersion {
224                read_version: read_version(io),
225                builder: Cow::Borrowed(self),
226                service: Some(service),
227            },
228        }
229    }
230}
231
232#[derive(Copy, Clone, Debug)]
233enum Version {
234    H1,
235    H2,
236}
237
238impl Version {
239    #[must_use]
240    #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
241    pub fn unsupported(self) -> Error {
242        match self {
243            Version::H1 => Error::from("HTTP/1 is not supported"),
244            Version::H2 => Error::from("HTTP/2 is not supported"),
245        }
246    }
247}
248
249fn read_version<I>(io: I) -> ReadVersion<I>
250where
251    I: Read + Unpin,
252{
253    ReadVersion {
254        io: Some(io),
255        buf: [MaybeUninit::uninit(); 24],
256        filled: 0,
257        version: Version::H2,
258        cancelled: false,
259        _pin: PhantomPinned,
260    }
261}
262
263pin_project! {
264    struct ReadVersion<I> {
265        io: Option<I>,
266        buf: [MaybeUninit<u8>; 24],
267        filled: usize,
269        version: Version,
270        cancelled: bool,
271        #[pin]
273        _pin: PhantomPinned,
274    }
275}
276
277impl<I> ReadVersion<I> {
278    pub fn cancel(self: Pin<&mut Self>) {
279        *self.project().cancelled = true;
280    }
281}
282
283impl<I> Future for ReadVersion<I>
284where
285    I: Read + Unpin,
286{
287    type Output = io::Result<(Version, Rewind<I>)>;
288
289    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
290        let this = self.project();
291        if *this.cancelled {
292            return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
293        }
294
295        let mut buf = ReadBuf::uninit(&mut *this.buf);
296        unsafe {
299            buf.unfilled().advance(*this.filled);
300        };
301
302        while buf.filled().len() < H2_PREFACE.len() {
304            let len = buf.filled().len();
305            ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
306            *this.filled = buf.filled().len();
307
308            if buf.filled().len() == len
310                || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
311            {
312                *this.version = Version::H1;
313                break;
314            }
315        }
316
317        let io = this.io.take().unwrap();
318        let buf = buf.filled().to_vec();
319        Poll::Ready(Ok((
320            *this.version,
321            Rewind::new_buffered(io, Bytes::from(buf)),
322        )))
323    }
324}
325
326pin_project! {
327    #[must_use = "futures do nothing unless polled"]
333    pub struct Connection<'a, I, S, E>
334    where
335        S: HttpService<Incoming>,
336    {
337        #[pin]
338        state: ConnState<'a, I, S, E>,
339    }
340}
341
342enum Cow<'a, T> {
344    Borrowed(&'a T),
345    Owned(T),
346}
347
348impl<T> std::ops::Deref for Cow<'_, T> {
349    type Target = T;
350    fn deref(&self) -> &T {
351        match self {
352            Cow::Borrowed(t) => &*t,
353            Cow::Owned(ref t) => t,
354        }
355    }
356}
357
358#[cfg(feature = "http1")]
359type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
360
361#[cfg(not(feature = "http1"))]
362type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
363
364#[cfg(feature = "http2")]
365type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
366
367#[cfg(not(feature = "http2"))]
368type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
369
370pin_project! {
371    #[project = ConnStateProj]
372    enum ConnState<'a, I, S, E>
373    where
374        S: HttpService<Incoming>,
375    {
376        ReadVersion {
377            #[pin]
378            read_version: ReadVersion<I>,
379            builder: Cow<'a, Builder<E>>,
380            service: Option<S>,
381        },
382        H1 {
383            #[pin]
384            conn: Http1Connection<I, S>,
385        },
386        H2 {
387            #[pin]
388            conn: Http2Connection<I, S, E>,
389        },
390    }
391}
392
393impl<I, S, E, B> Connection<'_, I, S, E>
394where
395    S: HttpService<Incoming, ResBody = B>,
396    S::Error: Into<Box<dyn StdError + Send + Sync>>,
397    I: Read + Write + Unpin,
398    B: Body + 'static,
399    B::Error: Into<Box<dyn StdError + Send + Sync>>,
400    E: HttpServerConnExec<S::Future, B>,
401{
402    pub fn graceful_shutdown(self: Pin<&mut Self>) {
411        match self.project().state.project() {
412            ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
413            #[cfg(feature = "http1")]
414            ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
415            #[cfg(feature = "http2")]
416            ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
417            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
418            _ => unreachable!(),
419        }
420    }
421
422    pub fn into_owned(self) -> Connection<'static, I, S, E>
424    where
425        Builder<E>: Clone,
426    {
427        Connection {
428            state: match self.state {
429                ConnState::ReadVersion {
430                    read_version,
431                    builder,
432                    service,
433                } => ConnState::ReadVersion {
434                    read_version,
435                    service,
436                    builder: Cow::Owned(builder.clone()),
437                },
438                #[cfg(feature = "http1")]
439                ConnState::H1 { conn } => ConnState::H1 { conn },
440                #[cfg(feature = "http2")]
441                ConnState::H2 { conn } => ConnState::H2 { conn },
442                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
443                _ => unreachable!(),
444            },
445        }
446    }
447}
448
449impl<I, S, E, B> Future for Connection<'_, I, S, E>
450where
451    S: Service<Request<Incoming>, Response = Response<B>>,
452    S::Future: 'static,
453    S::Error: Into<Box<dyn StdError + Send + Sync>>,
454    B: Body + 'static,
455    B::Error: Into<Box<dyn StdError + Send + Sync>>,
456    I: Read + Write + Unpin + 'static,
457    E: HttpServerConnExec<S::Future, B>,
458{
459    type Output = Result<()>;
460
461    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
462        loop {
463            let mut this = self.as_mut().project();
464
465            match this.state.as_mut().project() {
466                ConnStateProj::ReadVersion {
467                    read_version,
468                    builder,
469                    service,
470                } => {
471                    let (version, io) = ready!(read_version.poll(cx))?;
472                    let service = service.take().unwrap();
473                    match version {
474                        #[cfg(feature = "http1")]
475                        Version::H1 => {
476                            let conn = builder.http1.serve_connection(io, service);
477                            this.state.set(ConnState::H1 { conn });
478                        }
479                        #[cfg(feature = "http2")]
480                        Version::H2 => {
481                            let conn = builder.http2.serve_connection(io, service);
482                            this.state.set(ConnState::H2 { conn });
483                        }
484                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
485                        _ => return Poll::Ready(Err(version.unsupported())),
486                    }
487                }
488                #[cfg(feature = "http1")]
489                ConnStateProj::H1 { conn } => {
490                    return conn.poll(cx).map_err(Into::into);
491                }
492                #[cfg(feature = "http2")]
493                ConnStateProj::H2 { conn } => {
494                    return conn.poll(cx).map_err(Into::into);
495                }
496                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
497                _ => unreachable!(),
498            }
499        }
500    }
501}
502
503pin_project! {
504    #[must_use = "futures do nothing unless polled"]
510    pub struct UpgradeableConnection<'a, I, S, E>
511    where
512        S: HttpService<Incoming>,
513    {
514        #[pin]
515        state: UpgradeableConnState<'a, I, S, E>,
516    }
517}
518
519#[cfg(feature = "http1")]
520type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
521
522#[cfg(not(feature = "http1"))]
523type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
524
525pin_project! {
526    #[project = UpgradeableConnStateProj]
527    enum UpgradeableConnState<'a, I, S, E>
528    where
529        S: HttpService<Incoming>,
530    {
531        ReadVersion {
532            #[pin]
533            read_version: ReadVersion<I>,
534            builder: Cow<'a, Builder<E>>,
535            service: Option<S>,
536        },
537        H1 {
538            #[pin]
539            conn: Http1UpgradeableConnection<Rewind<I>, S>,
540        },
541        H2 {
542            #[pin]
543            conn: Http2Connection<I, S, E>,
544        },
545    }
546}
547
548impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
549where
550    S: HttpService<Incoming, ResBody = B>,
551    S::Error: Into<Box<dyn StdError + Send + Sync>>,
552    I: Read + Write + Unpin,
553    B: Body + 'static,
554    B::Error: Into<Box<dyn StdError + Send + Sync>>,
555    E: HttpServerConnExec<S::Future, B>,
556{
557    pub fn graceful_shutdown(self: Pin<&mut Self>) {
566        match self.project().state.project() {
567            UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
568            #[cfg(feature = "http1")]
569            UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
570            #[cfg(feature = "http2")]
571            UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
572            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
573            _ => unreachable!(),
574        }
575    }
576
577    pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
579    where
580        Builder<E>: Clone,
581    {
582        UpgradeableConnection {
583            state: match self.state {
584                UpgradeableConnState::ReadVersion {
585                    read_version,
586                    builder,
587                    service,
588                } => UpgradeableConnState::ReadVersion {
589                    read_version,
590                    service,
591                    builder: Cow::Owned(builder.clone()),
592                },
593                #[cfg(feature = "http1")]
594                UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
595                #[cfg(feature = "http2")]
596                UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
597                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
598                _ => unreachable!(),
599            },
600        }
601    }
602}
603
604impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
605where
606    S: Service<Request<Incoming>, Response = Response<B>>,
607    S::Future: 'static,
608    S::Error: Into<Box<dyn StdError + Send + Sync>>,
609    B: Body + 'static,
610    B::Error: Into<Box<dyn StdError + Send + Sync>>,
611    I: Read + Write + Unpin + Send + 'static,
612    E: HttpServerConnExec<S::Future, B>,
613{
614    type Output = Result<()>;
615
616    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
617        loop {
618            let mut this = self.as_mut().project();
619
620            match this.state.as_mut().project() {
621                UpgradeableConnStateProj::ReadVersion {
622                    read_version,
623                    builder,
624                    service,
625                } => {
626                    let (version, io) = ready!(read_version.poll(cx))?;
627                    let service = service.take().unwrap();
628                    match version {
629                        #[cfg(feature = "http1")]
630                        Version::H1 => {
631                            let conn = builder.http1.serve_connection(io, service).with_upgrades();
632                            this.state.set(UpgradeableConnState::H1 { conn });
633                        }
634                        #[cfg(feature = "http2")]
635                        Version::H2 => {
636                            let conn = builder.http2.serve_connection(io, service);
637                            this.state.set(UpgradeableConnState::H2 { conn });
638                        }
639                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
640                        _ => return Poll::Ready(Err(version.unsupported())),
641                    }
642                }
643                #[cfg(feature = "http1")]
644                UpgradeableConnStateProj::H1 { conn } => {
645                    return conn.poll(cx).map_err(Into::into);
646                }
647                #[cfg(feature = "http2")]
648                UpgradeableConnStateProj::H2 { conn } => {
649                    return conn.poll(cx).map_err(Into::into);
650                }
651                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
652                _ => unreachable!(),
653            }
654        }
655    }
656}
657
658#[cfg(feature = "http1")]
660pub struct Http1Builder<'a, E> {
661    inner: &'a mut Builder<E>,
662}
663
664#[cfg(feature = "http1")]
665impl<E> Http1Builder<'_, E> {
666    #[cfg(feature = "http2")]
668    pub fn http2(&mut self) -> Http2Builder<'_, E> {
669        Http2Builder { inner: self.inner }
670    }
671
672    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
678        self.inner.http1.auto_date_header(enabled);
679        self
680    }
681
682    pub fn half_close(&mut self, val: bool) -> &mut Self {
691        self.inner.http1.half_close(val);
692        self
693    }
694
695    pub fn keep_alive(&mut self, val: bool) -> &mut Self {
699        self.inner.http1.keep_alive(val);
700        self
701    }
702
703    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
710        self.inner.http1.title_case_headers(enabled);
711        self
712    }
713
714    pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
722        self.inner.http1.ignore_invalid_headers(enabled);
723        self
724    }
725
726    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
740        self.inner.http1.preserve_header_case(enabled);
741        self
742    }
743
744    pub fn max_headers(&mut self, val: usize) -> &mut Self {
760        self.inner.http1.max_headers(val);
761        self
762    }
763
764    pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
774        self.inner.http1.header_read_timeout(read_timeout);
775        self
776    }
777
778    pub fn writev(&mut self, val: bool) -> &mut Self {
791        self.inner.http1.writev(val);
792        self
793    }
794
795    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
803        self.inner.http1.max_buf_size(max);
804        self
805    }
806
807    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
813        self.inner.http1.pipeline_flush(enabled);
814        self
815    }
816
817    pub fn timer<M>(&mut self, timer: M) -> &mut Self
819    where
820        M: Timer + Send + Sync + 'static,
821    {
822        self.inner.http1.timer(timer);
823        self
824    }
825
826    #[cfg(feature = "http2")]
828    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
829    where
830        S: Service<Request<Incoming>, Response = Response<B>>,
831        S::Future: 'static,
832        S::Error: Into<Box<dyn StdError + Send + Sync>>,
833        B: Body + 'static,
834        B::Error: Into<Box<dyn StdError + Send + Sync>>,
835        I: Read + Write + Unpin + 'static,
836        E: HttpServerConnExec<S::Future, B>,
837    {
838        self.inner.serve_connection(io, service).await
839    }
840
841    #[cfg(not(feature = "http2"))]
843    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
844    where
845        S: Service<Request<Incoming>, Response = Response<B>>,
846        S::Future: 'static,
847        S::Error: Into<Box<dyn StdError + Send + Sync>>,
848        B: Body + 'static,
849        B::Error: Into<Box<dyn StdError + Send + Sync>>,
850        I: Read + Write + Unpin + 'static,
851    {
852        self.inner.serve_connection(io, service).await
853    }
854
855    #[cfg(feature = "http2")]
859    pub fn serve_connection_with_upgrades<I, S, B>(
860        &self,
861        io: I,
862        service: S,
863    ) -> UpgradeableConnection<'_, I, S, E>
864    where
865        S: Service<Request<Incoming>, Response = Response<B>>,
866        S::Future: 'static,
867        S::Error: Into<Box<dyn StdError + Send + Sync>>,
868        B: Body + 'static,
869        B::Error: Into<Box<dyn StdError + Send + Sync>>,
870        I: Read + Write + Unpin + Send + 'static,
871        E: HttpServerConnExec<S::Future, B>,
872    {
873        self.inner.serve_connection_with_upgrades(io, service)
874    }
875}
876
877#[cfg(feature = "http2")]
879pub struct Http2Builder<'a, E> {
880    inner: &'a mut Builder<E>,
881}
882
883#[cfg(feature = "http2")]
884impl<E> Http2Builder<'_, E> {
885    #[cfg(feature = "http1")]
886    pub fn http1(&mut self) -> Http1Builder<'_, E> {
888        Http1Builder { inner: self.inner }
889    }
890
891    pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
898        self.inner.http2.max_pending_accept_reset_streams(max);
899        self
900    }
901
902    pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
911        self.inner.http2.max_local_error_reset_streams(max);
912        self
913    }
914
915    pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
924        self.inner.http2.initial_stream_window_size(sz);
925        self
926    }
927
928    pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
934        self.inner.http2.initial_connection_window_size(sz);
935        self
936    }
937
938    pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
944        self.inner.http2.adaptive_window(enabled);
945        self
946    }
947
948    pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
954        self.inner.http2.max_frame_size(sz);
955        self
956    }
957
958    pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
965        self.inner.http2.max_concurrent_streams(max);
966        self
967    }
968
969    pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
979        self.inner.http2.keep_alive_interval(interval);
980        self
981    }
982
983    pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
993        self.inner.http2.keep_alive_timeout(timeout);
994        self
995    }
996
997    pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
1005        self.inner.http2.max_send_buf_size(max);
1006        self
1007    }
1008
1009    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1013        self.inner.http2.enable_connect_protocol();
1014        self
1015    }
1016
1017    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
1021        self.inner.http2.max_header_list_size(max);
1022        self
1023    }
1024
1025    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1027    where
1028        M: Timer + Send + Sync + 'static,
1029    {
1030        self.inner.http2.timer(timer);
1031        self
1032    }
1033
1034    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
1040        self.inner.http2.auto_date_header(enabled);
1041        self
1042    }
1043
1044    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
1046    where
1047        S: Service<Request<Incoming>, Response = Response<B>>,
1048        S::Future: 'static,
1049        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1050        B: Body + 'static,
1051        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1052        I: Read + Write + Unpin + 'static,
1053        E: HttpServerConnExec<S::Future, B>,
1054    {
1055        self.inner.serve_connection(io, service).await
1056    }
1057
1058    pub fn serve_connection_with_upgrades<I, S, B>(
1062        &self,
1063        io: I,
1064        service: S,
1065    ) -> UpgradeableConnection<'_, I, S, E>
1066    where
1067        S: Service<Request<Incoming>, Response = Response<B>>,
1068        S::Future: 'static,
1069        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1070        B: Body + 'static,
1071        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1072        I: Read + Write + Unpin + Send + 'static,
1073        E: HttpServerConnExec<S::Future, B>,
1074    {
1075        self.inner.serve_connection_with_upgrades(io, service)
1076    }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use crate::{
1082        rt::{TokioExecutor, TokioIo},
1083        server::conn::auto,
1084    };
1085    use http::{Request, Response};
1086    use http_body::Body;
1087    use http_body_util::{BodyExt, Empty, Full};
1088    use hyper::{body, body::Bytes, client, service::service_fn};
1089    use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
1090    use tokio::{
1091        net::{TcpListener, TcpStream},
1092        pin,
1093    };
1094
1095    const BODY: &[u8] = b"Hello, world!";
1096
1097    #[test]
1098    fn configuration() {
1099        auto::Builder::new(TokioExecutor::new())
1101            .http1()
1102            .keep_alive(true)
1103            .http2()
1104            .keep_alive_interval(None);
1105        let mut builder = auto::Builder::new(TokioExecutor::new());
1109
1110        builder.http1().keep_alive(true);
1111        builder.http2().keep_alive_interval(None);
1112        }
1114
1115    #[cfg(not(miri))]
1116    #[tokio::test]
1117    async fn http1() {
1118        let addr = start_server(false, false).await;
1119        let mut sender = connect_h1(addr).await;
1120
1121        let response = sender
1122            .send_request(Request::new(Empty::<Bytes>::new()))
1123            .await
1124            .unwrap();
1125
1126        let body = response.into_body().collect().await.unwrap().to_bytes();
1127
1128        assert_eq!(body, BODY);
1129    }
1130
1131    #[cfg(not(miri))]
1132    #[tokio::test]
1133    async fn http2() {
1134        let addr = start_server(false, false).await;
1135        let mut sender = connect_h2(addr).await;
1136
1137        let response = sender
1138            .send_request(Request::new(Empty::<Bytes>::new()))
1139            .await
1140            .unwrap();
1141
1142        let body = response.into_body().collect().await.unwrap().to_bytes();
1143
1144        assert_eq!(body, BODY);
1145    }
1146
1147    #[cfg(not(miri))]
1148    #[tokio::test]
1149    async fn http2_only() {
1150        let addr = start_server(false, true).await;
1151        let mut sender = connect_h2(addr).await;
1152
1153        let response = sender
1154            .send_request(Request::new(Empty::<Bytes>::new()))
1155            .await
1156            .unwrap();
1157
1158        let body = response.into_body().collect().await.unwrap().to_bytes();
1159
1160        assert_eq!(body, BODY);
1161    }
1162
1163    #[cfg(not(miri))]
1164    #[tokio::test]
1165    async fn http2_only_fail_if_client_is_http1() {
1166        let addr = start_server(false, true).await;
1167        let mut sender = connect_h1(addr).await;
1168
1169        let _ = sender
1170            .send_request(Request::new(Empty::<Bytes>::new()))
1171            .await
1172            .expect_err("should fail");
1173    }
1174
1175    #[cfg(not(miri))]
1176    #[tokio::test]
1177    async fn http1_only() {
1178        let addr = start_server(true, false).await;
1179        let mut sender = connect_h1(addr).await;
1180
1181        let response = sender
1182            .send_request(Request::new(Empty::<Bytes>::new()))
1183            .await
1184            .unwrap();
1185
1186        let body = response.into_body().collect().await.unwrap().to_bytes();
1187
1188        assert_eq!(body, BODY);
1189    }
1190
1191    #[cfg(not(miri))]
1192    #[tokio::test]
1193    async fn http1_only_fail_if_client_is_http2() {
1194        let addr = start_server(true, false).await;
1195        let mut sender = connect_h2(addr).await;
1196
1197        let _ = sender
1198            .send_request(Request::new(Empty::<Bytes>::new()))
1199            .await
1200            .expect_err("should fail");
1201    }
1202
1203    #[cfg(not(miri))]
1204    #[tokio::test]
1205    async fn graceful_shutdown() {
1206        let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1207            .await
1208            .unwrap();
1209
1210        let listener_addr = listener.local_addr().unwrap();
1211
1212        let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1214        let _stream = TcpStream::connect(listener_addr).await.unwrap();
1216
1217        let (stream, _) = listen_task.await.unwrap();
1218        let stream = TokioIo::new(stream);
1219        let builder = auto::Builder::new(TokioExecutor::new());
1220        let connection = builder.serve_connection(stream, service_fn(hello));
1221
1222        pin!(connection);
1223
1224        connection.as_mut().graceful_shutdown();
1225
1226        let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1227            .await
1228            .expect("Connection should have finished in a timely manner after graceful shutdown.")
1229            .expect_err("Connection should have been interrupted.");
1230
1231        let connection_error = connection_error
1232            .downcast_ref::<std::io::Error>()
1233            .expect("The error should have been `std::io::Error`.");
1234        assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1235    }
1236
1237    async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1238    where
1239        B: Body + Send + 'static,
1240        B::Data: Send,
1241        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1242    {
1243        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1244        let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1245
1246        tokio::spawn(connection);
1247
1248        sender
1249    }
1250
1251    async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1252    where
1253        B: Body + Unpin + Send + 'static,
1254        B::Data: Send,
1255        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1256    {
1257        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1258        let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1259            .handshake(stream)
1260            .await
1261            .unwrap();
1262
1263        tokio::spawn(connection);
1264
1265        sender
1266    }
1267
1268    async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1269        let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1270        let listener = TcpListener::bind(addr).await.unwrap();
1271
1272        let local_addr = listener.local_addr().unwrap();
1273
1274        tokio::spawn(async move {
1275            loop {
1276                let (stream, _) = listener.accept().await.unwrap();
1277                let stream = TokioIo::new(stream);
1278                tokio::task::spawn(async move {
1279                    let mut builder = auto::Builder::new(TokioExecutor::new());
1280                    if h1_only {
1281                        builder = builder.http1_only();
1282                        builder.serve_connection(stream, service_fn(hello)).await
1283                    } else if h2_only {
1284                        builder = builder.http2_only();
1285                        builder.serve_connection(stream, service_fn(hello)).await
1286                    } else {
1287                        builder
1288                            .http2()
1289                            .max_header_list_size(4096)
1290                            .serve_connection_with_upgrades(stream, service_fn(hello))
1291                            .await
1292                    }
1293                    .unwrap();
1294                });
1295            }
1296        });
1297
1298        local_addr
1299    }
1300
1301    async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1302        Ok(Response::new(Full::new(Bytes::from(BODY))))
1303    }
1304}