1pub mod upgrade;
4
5use hyper::service::HttpService;
6use std::future::Future;
7use std::marker::PhantomPinned;
8use std::mem::MaybeUninit;
9use std::pin::Pin;
10use std::task::{ready, Context, Poll};
11use std::{error::Error as StdError, io, time::Duration};
12
13use bytes::Bytes;
14use http::{Request, Response};
15use http_body::Body;
16use hyper::{
17 body::Incoming,
18 rt::{Read, ReadBuf, Timer, Write},
19 service::Service,
20};
21
22#[cfg(feature = "http1")]
23use hyper::server::conn::http1;
24
25#[cfg(feature = "http2")]
26use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
27
28#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
29use std::marker::PhantomData;
30
31use pin_project_lite::pin_project;
32
33use crate::common::rewind::Rewind;
34
35type Error = Box<dyn std::error::Error + Send + Sync>;
36
37type Result<T> = std::result::Result<T, Error>;
38
39const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
40
41#[cfg(feature = "http2")]
43pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
44
45#[cfg(feature = "http2")]
46impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
47
48#[cfg(not(feature = "http2"))]
50pub trait HttpServerConnExec<A, B: Body> {}
51
52#[cfg(not(feature = "http2"))]
53impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
54
55#[derive(Clone, Debug)]
57pub struct Builder<E> {
58 #[cfg(feature = "http1")]
59 http1: http1::Builder,
60 #[cfg(feature = "http2")]
61 http2: http2::Builder<E>,
62 #[cfg(any(feature = "http1", feature = "http2"))]
63 version: Option<Version>,
64 #[cfg(not(feature = "http2"))]
65 _executor: E,
66}
67
68impl<E: Default> Default for Builder<E> {
69 fn default() -> Self {
70 Self::new(E::default())
71 }
72}
73
74impl<E> Builder<E> {
75 pub fn new(executor: E) -> Self {
91 Self {
92 #[cfg(feature = "http1")]
93 http1: http1::Builder::new(),
94 #[cfg(feature = "http2")]
95 http2: http2::Builder::new(executor),
96 #[cfg(any(feature = "http1", feature = "http2"))]
97 version: None,
98 #[cfg(not(feature = "http2"))]
99 _executor: executor,
100 }
101 }
102
103 #[cfg(feature = "http1")]
105 pub fn http1(&mut self) -> Http1Builder<'_, E> {
106 Http1Builder { inner: self }
107 }
108
109 #[cfg(feature = "http2")]
111 pub fn http2(&mut self) -> Http2Builder<'_, E> {
112 Http2Builder { inner: self }
113 }
114
115 #[cfg(feature = "http2")]
121 pub fn http2_only(mut self) -> Self {
122 assert!(self.version.is_none());
123 self.version = Some(Version::H2);
124 self
125 }
126
127 #[cfg(feature = "http1")]
133 pub fn http1_only(mut self) -> Self {
134 assert!(self.version.is_none());
135 self.version = Some(Version::H1);
136 self
137 }
138
139 pub fn is_http1_available(&self) -> bool {
141 match self.version {
142 #[cfg(feature = "http1")]
143 Some(Version::H1) => true,
144 #[cfg(feature = "http2")]
145 Some(Version::H2) => false,
146 #[cfg(any(feature = "http1", feature = "http2"))]
147 _ => true,
148 }
149 }
150
151 pub fn is_http2_available(&self) -> bool {
153 match self.version {
154 #[cfg(feature = "http1")]
155 Some(Version::H1) => false,
156 #[cfg(feature = "http2")]
157 Some(Version::H2) => true,
158 #[cfg(any(feature = "http1", feature = "http2"))]
159 _ => true,
160 }
161 }
162
163 #[cfg(feature = "http1")]
183 pub fn title_case_headers(mut self, enabled: bool) -> Self {
184 self.http1.title_case_headers(enabled);
185 self
186 }
187
188 #[cfg(feature = "http1")]
207 pub fn preserve_header_case(mut self, enabled: bool) -> Self {
208 self.http1.preserve_header_case(enabled);
209 self
210 }
211
212 pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
214 where
215 S: Service<Request<Incoming>, Response = Response<B>>,
216 S::Future: 'static,
217 S::Error: Into<Box<dyn StdError + Send + Sync>>,
218 B: Body + 'static,
219 B::Error: Into<Box<dyn StdError + Send + Sync>>,
220 I: Read + Write + Unpin + 'static,
221 E: HttpServerConnExec<S::Future, B>,
222 {
223 let state = match self.version {
224 #[cfg(feature = "http1")]
225 Some(Version::H1) => {
226 let io = Rewind::new_buffered(io, Bytes::new());
227 let conn = self.http1.serve_connection(io, service);
228 ConnState::H1 { conn }
229 }
230 #[cfg(feature = "http2")]
231 Some(Version::H2) => {
232 let io = Rewind::new_buffered(io, Bytes::new());
233 let conn = self.http2.serve_connection(io, service);
234 ConnState::H2 { conn }
235 }
236 #[cfg(any(feature = "http1", feature = "http2"))]
237 _ => ConnState::ReadVersion {
238 read_version: read_version(io),
239 builder: Cow::Borrowed(self),
240 service: Some(service),
241 },
242 };
243
244 Connection { state }
245 }
246
247 pub fn serve_connection_with_upgrades<I, S, B>(
257 &self,
258 io: I,
259 service: S,
260 ) -> UpgradeableConnection<'_, I, S, E>
261 where
262 S: Service<Request<Incoming>, Response = Response<B>>,
263 S::Future: 'static,
264 S::Error: Into<Box<dyn StdError + Send + Sync>>,
265 B: Body + 'static,
266 B::Error: Into<Box<dyn StdError + Send + Sync>>,
267 I: Read + Write + Unpin + Send + 'static,
268 E: HttpServerConnExec<S::Future, B>,
269 {
270 UpgradeableConnection {
271 state: UpgradeableConnState::ReadVersion {
272 read_version: read_version(io),
273 builder: Cow::Borrowed(self),
274 service: Some(service),
275 },
276 }
277 }
278}
279
280#[derive(Copy, Clone, Debug)]
281enum Version {
282 H1,
283 H2,
284}
285
286impl Version {
287 #[must_use]
288 #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
289 pub fn unsupported(self) -> Error {
290 match self {
291 Version::H1 => Error::from("HTTP/1 is not supported"),
292 Version::H2 => Error::from("HTTP/2 is not supported"),
293 }
294 }
295}
296
297fn read_version<I>(io: I) -> ReadVersion<I>
298where
299 I: Read + Unpin,
300{
301 ReadVersion {
302 io: Some(io),
303 buf: [MaybeUninit::uninit(); 24],
304 filled: 0,
305 version: Version::H2,
306 cancelled: false,
307 _pin: PhantomPinned,
308 }
309}
310
311pin_project! {
312 struct ReadVersion<I> {
313 io: Option<I>,
314 buf: [MaybeUninit<u8>; 24],
315 filled: usize,
317 version: Version,
318 cancelled: bool,
319 #[pin]
321 _pin: PhantomPinned,
322 }
323}
324
325impl<I> ReadVersion<I> {
326 pub fn cancel(self: Pin<&mut Self>) {
327 *self.project().cancelled = true;
328 }
329}
330
331impl<I> Future for ReadVersion<I>
332where
333 I: Read + Unpin,
334{
335 type Output = io::Result<(Version, Rewind<I>)>;
336
337 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
338 let this = self.project();
339 if *this.cancelled {
340 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
341 }
342
343 let mut buf = ReadBuf::uninit(&mut *this.buf);
344 unsafe {
347 buf.unfilled().advance(*this.filled);
348 };
349
350 while buf.filled().len() < H2_PREFACE.len() {
352 let len = buf.filled().len();
353 ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
354 *this.filled = buf.filled().len();
355
356 if buf.filled().len() == len
358 || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
359 {
360 *this.version = Version::H1;
361 break;
362 }
363 }
364
365 let io = this.io.take().unwrap();
366 let buf = buf.filled().to_vec();
367 Poll::Ready(Ok((
368 *this.version,
369 Rewind::new_buffered(io, Bytes::from(buf)),
370 )))
371 }
372}
373
374pin_project! {
375 #[must_use = "futures do nothing unless polled"]
381 pub struct Connection<'a, I, S, E>
382 where
383 S: HttpService<Incoming>,
384 {
385 #[pin]
386 state: ConnState<'a, I, S, E>,
387 }
388}
389
390enum Cow<'a, T> {
392 Borrowed(&'a T),
393 Owned(T),
394}
395
396impl<T> std::ops::Deref for Cow<'_, T> {
397 type Target = T;
398 fn deref(&self) -> &T {
399 match self {
400 Cow::Borrowed(t) => &*t,
401 Cow::Owned(ref t) => t,
402 }
403 }
404}
405
406#[cfg(feature = "http1")]
407type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
408
409#[cfg(not(feature = "http1"))]
410type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
411
412#[cfg(feature = "http2")]
413type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
414
415#[cfg(not(feature = "http2"))]
416type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
417
418pin_project! {
419 #[project = ConnStateProj]
420 enum ConnState<'a, I, S, E>
421 where
422 S: HttpService<Incoming>,
423 {
424 ReadVersion {
425 #[pin]
426 read_version: ReadVersion<I>,
427 builder: Cow<'a, Builder<E>>,
428 service: Option<S>,
429 },
430 H1 {
431 #[pin]
432 conn: Http1Connection<I, S>,
433 },
434 H2 {
435 #[pin]
436 conn: Http2Connection<I, S, E>,
437 },
438 }
439}
440
441impl<I, S, E, B> Connection<'_, I, S, E>
442where
443 S: HttpService<Incoming, ResBody = B>,
444 S::Error: Into<Box<dyn StdError + Send + Sync>>,
445 I: Read + Write + Unpin,
446 B: Body + 'static,
447 B::Error: Into<Box<dyn StdError + Send + Sync>>,
448 E: HttpServerConnExec<S::Future, B>,
449{
450 pub fn graceful_shutdown(self: Pin<&mut Self>) {
459 match self.project().state.project() {
460 ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
461 #[cfg(feature = "http1")]
462 ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
463 #[cfg(feature = "http2")]
464 ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
465 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
466 _ => unreachable!(),
467 }
468 }
469
470 pub fn into_owned(self) -> Connection<'static, I, S, E>
472 where
473 Builder<E>: Clone,
474 {
475 Connection {
476 state: match self.state {
477 ConnState::ReadVersion {
478 read_version,
479 builder,
480 service,
481 } => ConnState::ReadVersion {
482 read_version,
483 service,
484 builder: Cow::Owned(builder.clone()),
485 },
486 #[cfg(feature = "http1")]
487 ConnState::H1 { conn } => ConnState::H1 { conn },
488 #[cfg(feature = "http2")]
489 ConnState::H2 { conn } => ConnState::H2 { conn },
490 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
491 _ => unreachable!(),
492 },
493 }
494 }
495}
496
497impl<I, S, E, B> Future for Connection<'_, I, S, E>
498where
499 S: Service<Request<Incoming>, Response = Response<B>>,
500 S::Future: 'static,
501 S::Error: Into<Box<dyn StdError + Send + Sync>>,
502 B: Body + 'static,
503 B::Error: Into<Box<dyn StdError + Send + Sync>>,
504 I: Read + Write + Unpin + 'static,
505 E: HttpServerConnExec<S::Future, B>,
506{
507 type Output = Result<()>;
508
509 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
510 loop {
511 let mut this = self.as_mut().project();
512
513 match this.state.as_mut().project() {
514 ConnStateProj::ReadVersion {
515 read_version,
516 builder,
517 service,
518 } => {
519 let (version, io) = ready!(read_version.poll(cx))?;
520 let service = service.take().unwrap();
521 match version {
522 #[cfg(feature = "http1")]
523 Version::H1 => {
524 let conn = builder.http1.serve_connection(io, service);
525 this.state.set(ConnState::H1 { conn });
526 }
527 #[cfg(feature = "http2")]
528 Version::H2 => {
529 let conn = builder.http2.serve_connection(io, service);
530 this.state.set(ConnState::H2 { conn });
531 }
532 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
533 _ => return Poll::Ready(Err(version.unsupported())),
534 }
535 }
536 #[cfg(feature = "http1")]
537 ConnStateProj::H1 { conn } => {
538 return conn.poll(cx).map_err(Into::into);
539 }
540 #[cfg(feature = "http2")]
541 ConnStateProj::H2 { conn } => {
542 return conn.poll(cx).map_err(Into::into);
543 }
544 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
545 _ => unreachable!(),
546 }
547 }
548 }
549}
550
551pin_project! {
552 #[must_use = "futures do nothing unless polled"]
558 pub struct UpgradeableConnection<'a, I, S, E>
559 where
560 S: HttpService<Incoming>,
561 {
562 #[pin]
563 state: UpgradeableConnState<'a, I, S, E>,
564 }
565}
566
567#[cfg(feature = "http1")]
568type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
569
570#[cfg(not(feature = "http1"))]
571type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
572
573pin_project! {
574 #[project = UpgradeableConnStateProj]
575 enum UpgradeableConnState<'a, I, S, E>
576 where
577 S: HttpService<Incoming>,
578 {
579 ReadVersion {
580 #[pin]
581 read_version: ReadVersion<I>,
582 builder: Cow<'a, Builder<E>>,
583 service: Option<S>,
584 },
585 H1 {
586 #[pin]
587 conn: Http1UpgradeableConnection<Rewind<I>, S>,
588 },
589 H2 {
590 #[pin]
591 conn: Http2Connection<I, S, E>,
592 },
593 }
594}
595
596impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
597where
598 S: HttpService<Incoming, ResBody = B>,
599 S::Error: Into<Box<dyn StdError + Send + Sync>>,
600 I: Read + Write + Unpin,
601 B: Body + 'static,
602 B::Error: Into<Box<dyn StdError + Send + Sync>>,
603 E: HttpServerConnExec<S::Future, B>,
604{
605 pub fn graceful_shutdown(self: Pin<&mut Self>) {
614 match self.project().state.project() {
615 UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
616 #[cfg(feature = "http1")]
617 UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
618 #[cfg(feature = "http2")]
619 UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
620 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
621 _ => unreachable!(),
622 }
623 }
624
625 pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
627 where
628 Builder<E>: Clone,
629 {
630 UpgradeableConnection {
631 state: match self.state {
632 UpgradeableConnState::ReadVersion {
633 read_version,
634 builder,
635 service,
636 } => UpgradeableConnState::ReadVersion {
637 read_version,
638 service,
639 builder: Cow::Owned(builder.clone()),
640 },
641 #[cfg(feature = "http1")]
642 UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
643 #[cfg(feature = "http2")]
644 UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
645 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
646 _ => unreachable!(),
647 },
648 }
649 }
650}
651
652impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
653where
654 S: Service<Request<Incoming>, Response = Response<B>>,
655 S::Future: 'static,
656 S::Error: Into<Box<dyn StdError + Send + Sync>>,
657 B: Body + 'static,
658 B::Error: Into<Box<dyn StdError + Send + Sync>>,
659 I: Read + Write + Unpin + Send + 'static,
660 E: HttpServerConnExec<S::Future, B>,
661{
662 type Output = Result<()>;
663
664 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
665 loop {
666 let mut this = self.as_mut().project();
667
668 match this.state.as_mut().project() {
669 UpgradeableConnStateProj::ReadVersion {
670 read_version,
671 builder,
672 service,
673 } => {
674 let (version, io) = ready!(read_version.poll(cx))?;
675 let service = service.take().unwrap();
676 match version {
677 #[cfg(feature = "http1")]
678 Version::H1 => {
679 let conn = builder.http1.serve_connection(io, service).with_upgrades();
680 this.state.set(UpgradeableConnState::H1 { conn });
681 }
682 #[cfg(feature = "http2")]
683 Version::H2 => {
684 let conn = builder.http2.serve_connection(io, service);
685 this.state.set(UpgradeableConnState::H2 { conn });
686 }
687 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
688 _ => return Poll::Ready(Err(version.unsupported())),
689 }
690 }
691 #[cfg(feature = "http1")]
692 UpgradeableConnStateProj::H1 { conn } => {
693 return conn.poll(cx).map_err(Into::into);
694 }
695 #[cfg(feature = "http2")]
696 UpgradeableConnStateProj::H2 { conn } => {
697 return conn.poll(cx).map_err(Into::into);
698 }
699 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
700 _ => unreachable!(),
701 }
702 }
703 }
704}
705
706#[cfg(feature = "http1")]
708pub struct Http1Builder<'a, E> {
709 inner: &'a mut Builder<E>,
710}
711
712#[cfg(feature = "http1")]
713impl<E> Http1Builder<'_, E> {
714 #[cfg(feature = "http2")]
716 pub fn http2(&mut self) -> Http2Builder<'_, E> {
717 Http2Builder { inner: self.inner }
718 }
719
720 pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
726 self.inner.http1.auto_date_header(enabled);
727 self
728 }
729
730 pub fn half_close(&mut self, val: bool) -> &mut Self {
739 self.inner.http1.half_close(val);
740 self
741 }
742
743 pub fn keep_alive(&mut self, val: bool) -> &mut Self {
747 self.inner.http1.keep_alive(val);
748 self
749 }
750
751 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
758 self.inner.http1.title_case_headers(enabled);
759 self
760 }
761
762 pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
770 self.inner.http1.ignore_invalid_headers(enabled);
771 self
772 }
773
774 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
788 self.inner.http1.preserve_header_case(enabled);
789 self
790 }
791
792 pub fn max_headers(&mut self, val: usize) -> &mut Self {
808 self.inner.http1.max_headers(val);
809 self
810 }
811
812 pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
822 self.inner.http1.header_read_timeout(read_timeout);
823 self
824 }
825
826 pub fn writev(&mut self, val: bool) -> &mut Self {
839 self.inner.http1.writev(val);
840 self
841 }
842
843 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
851 self.inner.http1.max_buf_size(max);
852 self
853 }
854
855 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
861 self.inner.http1.pipeline_flush(enabled);
862 self
863 }
864
865 pub fn timer<M>(&mut self, timer: M) -> &mut Self
867 where
868 M: Timer + Send + Sync + 'static,
869 {
870 self.inner.http1.timer(timer);
871 self
872 }
873
874 #[cfg(feature = "http2")]
876 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
877 where
878 S: Service<Request<Incoming>, Response = Response<B>>,
879 S::Future: 'static,
880 S::Error: Into<Box<dyn StdError + Send + Sync>>,
881 B: Body + 'static,
882 B::Error: Into<Box<dyn StdError + Send + Sync>>,
883 I: Read + Write + Unpin + 'static,
884 E: HttpServerConnExec<S::Future, B>,
885 {
886 self.inner.serve_connection(io, service).await
887 }
888
889 #[cfg(not(feature = "http2"))]
891 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
892 where
893 S: Service<Request<Incoming>, Response = Response<B>>,
894 S::Future: 'static,
895 S::Error: Into<Box<dyn StdError + Send + Sync>>,
896 B: Body + 'static,
897 B::Error: Into<Box<dyn StdError + Send + Sync>>,
898 I: Read + Write + Unpin + 'static,
899 {
900 self.inner.serve_connection(io, service).await
901 }
902
903 #[cfg(feature = "http2")]
907 pub fn serve_connection_with_upgrades<I, S, B>(
908 &self,
909 io: I,
910 service: S,
911 ) -> UpgradeableConnection<'_, I, S, E>
912 where
913 S: Service<Request<Incoming>, Response = Response<B>>,
914 S::Future: 'static,
915 S::Error: Into<Box<dyn StdError + Send + Sync>>,
916 B: Body + 'static,
917 B::Error: Into<Box<dyn StdError + Send + Sync>>,
918 I: Read + Write + Unpin + Send + 'static,
919 E: HttpServerConnExec<S::Future, B>,
920 {
921 self.inner.serve_connection_with_upgrades(io, service)
922 }
923}
924
925#[cfg(feature = "http2")]
927pub struct Http2Builder<'a, E> {
928 inner: &'a mut Builder<E>,
929}
930
931#[cfg(feature = "http2")]
932impl<E> Http2Builder<'_, E> {
933 #[cfg(feature = "http1")]
934 pub fn http1(&mut self) -> Http1Builder<'_, E> {
936 Http1Builder { inner: self.inner }
937 }
938
939 pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
946 self.inner.http2.max_pending_accept_reset_streams(max);
947 self
948 }
949
950 pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
959 self.inner.http2.max_local_error_reset_streams(max);
960 self
961 }
962
963 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
972 self.inner.http2.initial_stream_window_size(sz);
973 self
974 }
975
976 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
982 self.inner.http2.initial_connection_window_size(sz);
983 self
984 }
985
986 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
992 self.inner.http2.adaptive_window(enabled);
993 self
994 }
995
996 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1002 self.inner.http2.max_frame_size(sz);
1003 self
1004 }
1005
1006 pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
1013 self.inner.http2.max_concurrent_streams(max);
1014 self
1015 }
1016
1017 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
1027 self.inner.http2.keep_alive_interval(interval);
1028 self
1029 }
1030
1031 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1041 self.inner.http2.keep_alive_timeout(timeout);
1042 self
1043 }
1044
1045 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
1053 self.inner.http2.max_send_buf_size(max);
1054 self
1055 }
1056
1057 pub fn enable_connect_protocol(&mut self) -> &mut Self {
1061 self.inner.http2.enable_connect_protocol();
1062 self
1063 }
1064
1065 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
1069 self.inner.http2.max_header_list_size(max);
1070 self
1071 }
1072
1073 pub fn timer<M>(&mut self, timer: M) -> &mut Self
1075 where
1076 M: Timer + Send + Sync + 'static,
1077 {
1078 self.inner.http2.timer(timer);
1079 self
1080 }
1081
1082 pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
1088 self.inner.http2.auto_date_header(enabled);
1089 self
1090 }
1091
1092 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
1094 where
1095 S: Service<Request<Incoming>, Response = Response<B>>,
1096 S::Future: 'static,
1097 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1098 B: Body + 'static,
1099 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1100 I: Read + Write + Unpin + 'static,
1101 E: HttpServerConnExec<S::Future, B>,
1102 {
1103 self.inner.serve_connection(io, service).await
1104 }
1105
1106 pub fn serve_connection_with_upgrades<I, S, B>(
1110 &self,
1111 io: I,
1112 service: S,
1113 ) -> UpgradeableConnection<'_, I, S, E>
1114 where
1115 S: Service<Request<Incoming>, Response = Response<B>>,
1116 S::Future: 'static,
1117 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1118 B: Body + 'static,
1119 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1120 I: Read + Write + Unpin + Send + 'static,
1121 E: HttpServerConnExec<S::Future, B>,
1122 {
1123 self.inner.serve_connection_with_upgrades(io, service)
1124 }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129 use crate::{
1130 rt::{TokioExecutor, TokioIo},
1131 server::conn::auto,
1132 };
1133 use http::{Request, Response};
1134 use http_body::Body;
1135 use http_body_util::{BodyExt, Empty, Full};
1136 use hyper::{body, body::Bytes, client, service::service_fn};
1137 use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
1138 use tokio::{
1139 net::{TcpListener, TcpStream},
1140 pin,
1141 };
1142
1143 const BODY: &[u8] = b"Hello, world!";
1144
1145 #[test]
1146 fn configuration() {
1147 auto::Builder::new(TokioExecutor::new())
1149 .http1()
1150 .keep_alive(true)
1151 .http2()
1152 .keep_alive_interval(None);
1153 let mut builder = auto::Builder::new(TokioExecutor::new());
1157
1158 builder.http1().keep_alive(true);
1159 builder.http2().keep_alive_interval(None);
1160 }
1162
1163 #[test]
1164 #[cfg(feature = "http1")]
1165 fn title_case_headers_configuration() {
1166 auto::Builder::new(TokioExecutor::new()).title_case_headers(true);
1168
1169 auto::Builder::new(TokioExecutor::new())
1171 .title_case_headers(true)
1172 .http1_only();
1173 }
1174
1175 #[test]
1176 #[cfg(feature = "http1")]
1177 fn preserve_header_case_configuration() {
1178 auto::Builder::new(TokioExecutor::new()).preserve_header_case(true);
1180
1181 auto::Builder::new(TokioExecutor::new())
1183 .preserve_header_case(true)
1184 .http1_only();
1185 }
1186
1187 #[cfg(not(miri))]
1188 #[tokio::test]
1189 async fn http1() {
1190 let addr = start_server(false, false).await;
1191 let mut sender = connect_h1(addr).await;
1192
1193 let response = sender
1194 .send_request(Request::new(Empty::<Bytes>::new()))
1195 .await
1196 .unwrap();
1197
1198 let body = response.into_body().collect().await.unwrap().to_bytes();
1199
1200 assert_eq!(body, BODY);
1201 }
1202
1203 #[cfg(not(miri))]
1204 #[tokio::test]
1205 async fn http2() {
1206 let addr = start_server(false, false).await;
1207 let mut sender = connect_h2(addr).await;
1208
1209 let response = sender
1210 .send_request(Request::new(Empty::<Bytes>::new()))
1211 .await
1212 .unwrap();
1213
1214 let body = response.into_body().collect().await.unwrap().to_bytes();
1215
1216 assert_eq!(body, BODY);
1217 }
1218
1219 #[cfg(not(miri))]
1220 #[tokio::test]
1221 async fn http2_only() {
1222 let addr = start_server(false, true).await;
1223 let mut sender = connect_h2(addr).await;
1224
1225 let response = sender
1226 .send_request(Request::new(Empty::<Bytes>::new()))
1227 .await
1228 .unwrap();
1229
1230 let body = response.into_body().collect().await.unwrap().to_bytes();
1231
1232 assert_eq!(body, BODY);
1233 }
1234
1235 #[cfg(not(miri))]
1236 #[tokio::test]
1237 async fn http2_only_fail_if_client_is_http1() {
1238 let addr = start_server(false, true).await;
1239 let mut sender = connect_h1(addr).await;
1240
1241 let _ = sender
1242 .send_request(Request::new(Empty::<Bytes>::new()))
1243 .await
1244 .expect_err("should fail");
1245 }
1246
1247 #[cfg(not(miri))]
1248 #[tokio::test]
1249 async fn http1_only() {
1250 let addr = start_server(true, false).await;
1251 let mut sender = connect_h1(addr).await;
1252
1253 let response = sender
1254 .send_request(Request::new(Empty::<Bytes>::new()))
1255 .await
1256 .unwrap();
1257
1258 let body = response.into_body().collect().await.unwrap().to_bytes();
1259
1260 assert_eq!(body, BODY);
1261 }
1262
1263 #[cfg(not(miri))]
1264 #[tokio::test]
1265 async fn http1_only_fail_if_client_is_http2() {
1266 let addr = start_server(true, false).await;
1267 let mut sender = connect_h2(addr).await;
1268
1269 let _ = sender
1270 .send_request(Request::new(Empty::<Bytes>::new()))
1271 .await
1272 .expect_err("should fail");
1273 }
1274
1275 #[cfg(not(miri))]
1276 #[tokio::test]
1277 async fn graceful_shutdown() {
1278 let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1279 .await
1280 .unwrap();
1281
1282 let listener_addr = listener.local_addr().unwrap();
1283
1284 let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1286 let _stream = TcpStream::connect(listener_addr).await.unwrap();
1288
1289 let (stream, _) = listen_task.await.unwrap();
1290 let stream = TokioIo::new(stream);
1291 let builder = auto::Builder::new(TokioExecutor::new());
1292 let connection = builder.serve_connection(stream, service_fn(hello));
1293
1294 pin!(connection);
1295
1296 connection.as_mut().graceful_shutdown();
1297
1298 let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1299 .await
1300 .expect("Connection should have finished in a timely manner after graceful shutdown.")
1301 .expect_err("Connection should have been interrupted.");
1302
1303 let connection_error = connection_error
1304 .downcast_ref::<std::io::Error>()
1305 .expect("The error should have been `std::io::Error`.");
1306 assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1307 }
1308
1309 async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1310 where
1311 B: Body + Send + 'static,
1312 B::Data: Send,
1313 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1314 {
1315 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1316 let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1317
1318 tokio::spawn(connection);
1319
1320 sender
1321 }
1322
1323 async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1324 where
1325 B: Body + Unpin + Send + 'static,
1326 B::Data: Send,
1327 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1328 {
1329 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1330 let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1331 .handshake(stream)
1332 .await
1333 .unwrap();
1334
1335 tokio::spawn(connection);
1336
1337 sender
1338 }
1339
1340 async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1341 let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1342 let listener = TcpListener::bind(addr).await.unwrap();
1343
1344 let local_addr = listener.local_addr().unwrap();
1345
1346 tokio::spawn(async move {
1347 loop {
1348 let (stream, _) = listener.accept().await.unwrap();
1349 let stream = TokioIo::new(stream);
1350 tokio::task::spawn(async move {
1351 let mut builder = auto::Builder::new(TokioExecutor::new());
1352 if h1_only {
1353 builder = builder.http1_only();
1354 builder.serve_connection(stream, service_fn(hello)).await
1355 } else if h2_only {
1356 builder = builder.http2_only();
1357 builder.serve_connection(stream, service_fn(hello)).await
1358 } else {
1359 builder
1360 .http2()
1361 .max_header_list_size(4096)
1362 .serve_connection_with_upgrades(stream, service_fn(hello))
1363 .await
1364 }
1365 .unwrap();
1366 });
1367 }
1368 });
1369
1370 local_addr
1371 }
1372
1373 async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1374 Ok(Response::new(Full::new(Bytes::from(BODY))))
1375 }
1376}