1use futures_util::ready;
4use hyper::service::HttpService;
5use std::future::Future;
6use std::marker::PhantomPinned;
7use std::mem::MaybeUninit;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::{error::Error as StdError, io, time::Duration};
11
12use bytes::Bytes;
13use http::{Request, Response};
14use http_body::Body;
15use hyper::{
16 body::Incoming,
17 rt::{Read, ReadBuf, Timer, Write},
18 service::Service,
19};
20
21#[cfg(feature = "http1")]
22use hyper::server::conn::http1;
23
24#[cfg(feature = "http2")]
25use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
26
27#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
28use std::marker::PhantomData;
29
30use pin_project_lite::pin_project;
31
32use crate::common::rewind::Rewind;
33
34type Error = Box<dyn std::error::Error + Send + Sync>;
35
36type Result<T> = std::result::Result<T, Error>;
37
38const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
39
40#[cfg(feature = "http2")]
42pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
43
44#[cfg(feature = "http2")]
45impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
46
47#[cfg(not(feature = "http2"))]
49pub trait HttpServerConnExec<A, B: Body> {}
50
51#[cfg(not(feature = "http2"))]
52impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
53
54#[derive(Clone, Debug)]
56pub struct Builder<E> {
57 #[cfg(feature = "http1")]
58 http1: http1::Builder,
59 #[cfg(feature = "http2")]
60 http2: http2::Builder<E>,
61 #[cfg(any(feature = "http1", feature = "http2"))]
62 version: Option<Version>,
63 #[cfg(not(feature = "http2"))]
64 _executor: E,
65}
66
67impl<E> Builder<E> {
68 pub fn new(executor: E) -> Self {
84 Self {
85 #[cfg(feature = "http1")]
86 http1: http1::Builder::new(),
87 #[cfg(feature = "http2")]
88 http2: http2::Builder::new(executor),
89 #[cfg(any(feature = "http1", feature = "http2"))]
90 version: None,
91 #[cfg(not(feature = "http2"))]
92 _executor: executor,
93 }
94 }
95
96 #[cfg(feature = "http1")]
98 pub fn http1(&mut self) -> Http1Builder<'_, E> {
99 Http1Builder { inner: self }
100 }
101
102 #[cfg(feature = "http2")]
104 pub fn http2(&mut self) -> Http2Builder<'_, E> {
105 Http2Builder { inner: self }
106 }
107
108 #[cfg(feature = "http2")]
112 pub fn http2_only(mut self) -> Self {
113 assert!(self.version.is_none());
114 self.version = Some(Version::H2);
115 self
116 }
117
118 #[cfg(feature = "http1")]
122 pub fn http1_only(mut self) -> Self {
123 assert!(self.version.is_none());
124 self.version = Some(Version::H1);
125 self
126 }
127
128 pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
130 where
131 S: Service<Request<Incoming>, Response = Response<B>>,
132 S::Future: 'static,
133 S::Error: Into<Box<dyn StdError + Send + Sync>>,
134 B: Body + 'static,
135 B::Error: Into<Box<dyn StdError + Send + Sync>>,
136 I: Read + Write + Unpin + 'static,
137 E: HttpServerConnExec<S::Future, B>,
138 {
139 let state = match self.version {
140 #[cfg(feature = "http1")]
141 Some(Version::H1) => {
142 let io = Rewind::new_buffered(io, Bytes::new());
143 let conn = self.http1.serve_connection(io, service);
144 ConnState::H1 { conn }
145 }
146 #[cfg(feature = "http2")]
147 Some(Version::H2) => {
148 let io = Rewind::new_buffered(io, Bytes::new());
149 let conn = self.http2.serve_connection(io, service);
150 ConnState::H2 { conn }
151 }
152 #[cfg(any(feature = "http1", feature = "http2"))]
153 _ => ConnState::ReadVersion {
154 read_version: read_version(io),
155 builder: Cow::Borrowed(self),
156 service: Some(service),
157 },
158 };
159
160 Connection { state }
161 }
162
163 pub fn serve_connection_with_upgrades<I, S, B>(
167 &self,
168 io: I,
169 service: S,
170 ) -> UpgradeableConnection<'_, I, S, E>
171 where
172 S: Service<Request<Incoming>, Response = Response<B>>,
173 S::Future: 'static,
174 S::Error: Into<Box<dyn StdError + Send + Sync>>,
175 B: Body + 'static,
176 B::Error: Into<Box<dyn StdError + Send + Sync>>,
177 I: Read + Write + Unpin + Send + 'static,
178 E: HttpServerConnExec<S::Future, B>,
179 {
180 UpgradeableConnection {
181 state: UpgradeableConnState::ReadVersion {
182 read_version: read_version(io),
183 builder: Cow::Borrowed(self),
184 service: Some(service),
185 },
186 }
187 }
188}
189
190#[derive(Copy, Clone, Debug)]
191enum Version {
192 H1,
193 H2,
194}
195
196impl Version {
197 #[must_use]
198 #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
199 pub fn unsupported(self) -> Error {
200 match self {
201 Version::H1 => Error::from("HTTP/1 is not supported"),
202 Version::H2 => Error::from("HTTP/2 is not supported"),
203 }
204 }
205}
206
207fn read_version<I>(io: I) -> ReadVersion<I>
208where
209 I: Read + Unpin,
210{
211 ReadVersion {
212 io: Some(io),
213 buf: [MaybeUninit::uninit(); 24],
214 filled: 0,
215 version: Version::H2,
216 cancelled: false,
217 _pin: PhantomPinned,
218 }
219}
220
221pin_project! {
222 struct ReadVersion<I> {
223 io: Option<I>,
224 buf: [MaybeUninit<u8>; 24],
225 filled: usize,
227 version: Version,
228 cancelled: bool,
229 #[pin]
231 _pin: PhantomPinned,
232 }
233}
234
235impl<I> ReadVersion<I> {
236 pub fn cancel(self: Pin<&mut Self>) {
237 *self.project().cancelled = true;
238 }
239}
240
241impl<I> Future for ReadVersion<I>
242where
243 I: Read + Unpin,
244{
245 type Output = io::Result<(Version, Rewind<I>)>;
246
247 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248 let this = self.project();
249 if *this.cancelled {
250 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
251 }
252
253 let mut buf = ReadBuf::uninit(&mut *this.buf);
254 unsafe {
257 buf.unfilled().advance(*this.filled);
258 };
259
260 while buf.filled().len() < H2_PREFACE.len() {
262 let len = buf.filled().len();
263 ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
264 *this.filled = buf.filled().len();
265
266 if buf.filled().len() == len
268 || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
269 {
270 *this.version = Version::H1;
271 break;
272 }
273 }
274
275 let io = this.io.take().unwrap();
276 let buf = buf.filled().to_vec();
277 Poll::Ready(Ok((
278 *this.version,
279 Rewind::new_buffered(io, Bytes::from(buf)),
280 )))
281 }
282}
283
284pin_project! {
285 pub struct Connection<'a, I, S, E>
287 where
288 S: HttpService<Incoming>,
289 {
290 #[pin]
291 state: ConnState<'a, I, S, E>,
292 }
293}
294
295enum Cow<'a, T> {
297 Borrowed(&'a T),
298 Owned(T),
299}
300
301impl<'a, T> std::ops::Deref for Cow<'a, T> {
302 type Target = T;
303 fn deref(&self) -> &T {
304 match self {
305 Cow::Borrowed(t) => &*t,
306 Cow::Owned(ref t) => t,
307 }
308 }
309}
310
311#[cfg(feature = "http1")]
312type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
313
314#[cfg(not(feature = "http1"))]
315type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
316
317#[cfg(feature = "http2")]
318type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
319
320#[cfg(not(feature = "http2"))]
321type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
322
323pin_project! {
324 #[project = ConnStateProj]
325 enum ConnState<'a, I, S, E>
326 where
327 S: HttpService<Incoming>,
328 {
329 ReadVersion {
330 #[pin]
331 read_version: ReadVersion<I>,
332 builder: Cow<'a, Builder<E>>,
333 service: Option<S>,
334 },
335 H1 {
336 #[pin]
337 conn: Http1Connection<I, S>,
338 },
339 H2 {
340 #[pin]
341 conn: Http2Connection<I, S, E>,
342 },
343 }
344}
345
346impl<I, S, E, B> Connection<'_, I, S, E>
347where
348 S: HttpService<Incoming, ResBody = B>,
349 S::Error: Into<Box<dyn StdError + Send + Sync>>,
350 I: Read + Write + Unpin,
351 B: Body + 'static,
352 B::Error: Into<Box<dyn StdError + Send + Sync>>,
353 E: HttpServerConnExec<S::Future, B>,
354{
355 pub fn graceful_shutdown(self: Pin<&mut Self>) {
364 match self.project().state.project() {
365 ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
366 #[cfg(feature = "http1")]
367 ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
368 #[cfg(feature = "http2")]
369 ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
370 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
371 _ => unreachable!(),
372 }
373 }
374
375 pub fn into_owned(self) -> Connection<'static, I, S, E>
377 where
378 Builder<E>: Clone,
379 {
380 Connection {
381 state: match self.state {
382 ConnState::ReadVersion {
383 read_version,
384 builder,
385 service,
386 } => ConnState::ReadVersion {
387 read_version,
388 service,
389 builder: Cow::Owned(builder.clone()),
390 },
391 #[cfg(feature = "http1")]
392 ConnState::H1 { conn } => ConnState::H1 { conn },
393 #[cfg(feature = "http2")]
394 ConnState::H2 { conn } => ConnState::H2 { conn },
395 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
396 _ => unreachable!(),
397 },
398 }
399 }
400}
401
402impl<I, S, E, B> Future for Connection<'_, I, S, E>
403where
404 S: Service<Request<Incoming>, Response = Response<B>>,
405 S::Future: 'static,
406 S::Error: Into<Box<dyn StdError + Send + Sync>>,
407 B: Body + 'static,
408 B::Error: Into<Box<dyn StdError + Send + Sync>>,
409 I: Read + Write + Unpin + 'static,
410 E: HttpServerConnExec<S::Future, B>,
411{
412 type Output = Result<()>;
413
414 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
415 loop {
416 let mut this = self.as_mut().project();
417
418 match this.state.as_mut().project() {
419 ConnStateProj::ReadVersion {
420 read_version,
421 builder,
422 service,
423 } => {
424 let (version, io) = ready!(read_version.poll(cx))?;
425 let service = service.take().unwrap();
426 match version {
427 #[cfg(feature = "http1")]
428 Version::H1 => {
429 let conn = builder.http1.serve_connection(io, service);
430 this.state.set(ConnState::H1 { conn });
431 }
432 #[cfg(feature = "http2")]
433 Version::H2 => {
434 let conn = builder.http2.serve_connection(io, service);
435 this.state.set(ConnState::H2 { conn });
436 }
437 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
438 _ => return Poll::Ready(Err(version.unsupported())),
439 }
440 }
441 #[cfg(feature = "http1")]
442 ConnStateProj::H1 { conn } => {
443 return conn.poll(cx).map_err(Into::into);
444 }
445 #[cfg(feature = "http2")]
446 ConnStateProj::H2 { conn } => {
447 return conn.poll(cx).map_err(Into::into);
448 }
449 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
450 _ => unreachable!(),
451 }
452 }
453 }
454}
455
456pin_project! {
457 pub struct UpgradeableConnection<'a, I, S, E>
459 where
460 S: HttpService<Incoming>,
461 {
462 #[pin]
463 state: UpgradeableConnState<'a, I, S, E>,
464 }
465}
466
467#[cfg(feature = "http1")]
468type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
469
470#[cfg(not(feature = "http1"))]
471type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
472
473pin_project! {
474 #[project = UpgradeableConnStateProj]
475 enum UpgradeableConnState<'a, I, S, E>
476 where
477 S: HttpService<Incoming>,
478 {
479 ReadVersion {
480 #[pin]
481 read_version: ReadVersion<I>,
482 builder: Cow<'a, Builder<E>>,
483 service: Option<S>,
484 },
485 H1 {
486 #[pin]
487 conn: Http1UpgradeableConnection<Rewind<I>, S>,
488 },
489 H2 {
490 #[pin]
491 conn: Http2Connection<I, S, E>,
492 },
493 }
494}
495
496impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
497where
498 S: HttpService<Incoming, ResBody = B>,
499 S::Error: Into<Box<dyn StdError + Send + Sync>>,
500 I: Read + Write + Unpin,
501 B: Body + 'static,
502 B::Error: Into<Box<dyn StdError + Send + Sync>>,
503 E: HttpServerConnExec<S::Future, B>,
504{
505 pub fn graceful_shutdown(self: Pin<&mut Self>) {
514 match self.project().state.project() {
515 UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
516 #[cfg(feature = "http1")]
517 UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
518 #[cfg(feature = "http2")]
519 UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
520 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
521 _ => unreachable!(),
522 }
523 }
524
525 pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
527 where
528 Builder<E>: Clone,
529 {
530 UpgradeableConnection {
531 state: match self.state {
532 UpgradeableConnState::ReadVersion {
533 read_version,
534 builder,
535 service,
536 } => UpgradeableConnState::ReadVersion {
537 read_version,
538 service,
539 builder: Cow::Owned(builder.clone()),
540 },
541 #[cfg(feature = "http1")]
542 UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
543 #[cfg(feature = "http2")]
544 UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
545 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
546 _ => unreachable!(),
547 },
548 }
549 }
550}
551
552impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
553where
554 S: Service<Request<Incoming>, Response = Response<B>>,
555 S::Future: 'static,
556 S::Error: Into<Box<dyn StdError + Send + Sync>>,
557 B: Body + 'static,
558 B::Error: Into<Box<dyn StdError + Send + Sync>>,
559 I: Read + Write + Unpin + Send + 'static,
560 E: HttpServerConnExec<S::Future, B>,
561{
562 type Output = Result<()>;
563
564 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
565 loop {
566 let mut this = self.as_mut().project();
567
568 match this.state.as_mut().project() {
569 UpgradeableConnStateProj::ReadVersion {
570 read_version,
571 builder,
572 service,
573 } => {
574 let (version, io) = ready!(read_version.poll(cx))?;
575 let service = service.take().unwrap();
576 match version {
577 #[cfg(feature = "http1")]
578 Version::H1 => {
579 let conn = builder.http1.serve_connection(io, service).with_upgrades();
580 this.state.set(UpgradeableConnState::H1 { conn });
581 }
582 #[cfg(feature = "http2")]
583 Version::H2 => {
584 let conn = builder.http2.serve_connection(io, service);
585 this.state.set(UpgradeableConnState::H2 { conn });
586 }
587 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
588 _ => return Poll::Ready(Err(version.unsupported())),
589 }
590 }
591 #[cfg(feature = "http1")]
592 UpgradeableConnStateProj::H1 { conn } => {
593 return conn.poll(cx).map_err(Into::into);
594 }
595 #[cfg(feature = "http2")]
596 UpgradeableConnStateProj::H2 { conn } => {
597 return conn.poll(cx).map_err(Into::into);
598 }
599 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
600 _ => unreachable!(),
601 }
602 }
603 }
604}
605
606#[cfg(feature = "http1")]
608pub struct Http1Builder<'a, E> {
609 inner: &'a mut Builder<E>,
610}
611
612#[cfg(feature = "http1")]
613impl<E> Http1Builder<'_, E> {
614 #[cfg(feature = "http2")]
616 pub fn http2(&mut self) -> Http2Builder<'_, E> {
617 Http2Builder { inner: self.inner }
618 }
619
620 pub fn half_close(&mut self, val: bool) -> &mut Self {
629 self.inner.http1.half_close(val);
630 self
631 }
632
633 pub fn keep_alive(&mut self, val: bool) -> &mut Self {
637 self.inner.http1.keep_alive(val);
638 self
639 }
640
641 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
648 self.inner.http1.title_case_headers(enabled);
649 self
650 }
651
652 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
666 self.inner.http1.preserve_header_case(enabled);
667 self
668 }
669
670 pub fn max_headers(&mut self, val: usize) -> &mut Self {
686 self.inner.http1.max_headers(val);
687 self
688 }
689
690 pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
695 self.inner.http1.header_read_timeout(read_timeout);
696 self
697 }
698
699 pub fn writev(&mut self, val: bool) -> &mut Self {
712 self.inner.http1.writev(val);
713 self
714 }
715
716 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
724 self.inner.http1.max_buf_size(max);
725 self
726 }
727
728 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
734 self.inner.http1.pipeline_flush(enabled);
735 self
736 }
737
738 pub fn timer<M>(&mut self, timer: M) -> &mut Self
740 where
741 M: Timer + Send + Sync + 'static,
742 {
743 self.inner.http1.timer(timer);
744 self
745 }
746
747 #[cfg(feature = "http2")]
749 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
750 where
751 S: Service<Request<Incoming>, Response = Response<B>>,
752 S::Future: 'static,
753 S::Error: Into<Box<dyn StdError + Send + Sync>>,
754 B: Body + 'static,
755 B::Error: Into<Box<dyn StdError + Send + Sync>>,
756 I: Read + Write + Unpin + 'static,
757 E: HttpServerConnExec<S::Future, B>,
758 {
759 self.inner.serve_connection(io, service).await
760 }
761
762 #[cfg(not(feature = "http2"))]
764 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
765 where
766 S: Service<Request<Incoming>, Response = Response<B>>,
767 S::Future: 'static,
768 S::Error: Into<Box<dyn StdError + Send + Sync>>,
769 B: Body + 'static,
770 B::Error: Into<Box<dyn StdError + Send + Sync>>,
771 I: Read + Write + Unpin + 'static,
772 {
773 self.inner.serve_connection(io, service).await
774 }
775
776 #[cfg(feature = "http2")]
780 pub fn serve_connection_with_upgrades<I, S, B>(
781 &self,
782 io: I,
783 service: S,
784 ) -> UpgradeableConnection<'_, I, S, E>
785 where
786 S: Service<Request<Incoming>, Response = Response<B>>,
787 S::Future: 'static,
788 S::Error: Into<Box<dyn StdError + Send + Sync>>,
789 B: Body + 'static,
790 B::Error: Into<Box<dyn StdError + Send + Sync>>,
791 I: Read + Write + Unpin + Send + 'static,
792 E: HttpServerConnExec<S::Future, B>,
793 {
794 self.inner.serve_connection_with_upgrades(io, service)
795 }
796}
797
798#[cfg(feature = "http2")]
800pub struct Http2Builder<'a, E> {
801 inner: &'a mut Builder<E>,
802}
803
804#[cfg(feature = "http2")]
805impl<E> Http2Builder<'_, E> {
806 #[cfg(feature = "http1")]
807 pub fn http1(&mut self) -> Http1Builder<'_, E> {
809 Http1Builder { inner: self.inner }
810 }
811
812 pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
819 self.inner.http2.max_pending_accept_reset_streams(max);
820 self
821 }
822
823 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
832 self.inner.http2.initial_stream_window_size(sz);
833 self
834 }
835
836 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
842 self.inner.http2.initial_connection_window_size(sz);
843 self
844 }
845
846 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
852 self.inner.http2.adaptive_window(enabled);
853 self
854 }
855
856 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
862 self.inner.http2.max_frame_size(sz);
863 self
864 }
865
866 pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
873 self.inner.http2.max_concurrent_streams(max);
874 self
875 }
876
877 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
887 self.inner.http2.keep_alive_interval(interval);
888 self
889 }
890
891 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
901 self.inner.http2.keep_alive_timeout(timeout);
902 self
903 }
904
905 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
913 self.inner.http2.max_send_buf_size(max);
914 self
915 }
916
917 pub fn enable_connect_protocol(&mut self) -> &mut Self {
921 self.inner.http2.enable_connect_protocol();
922 self
923 }
924
925 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
929 self.inner.http2.max_header_list_size(max);
930 self
931 }
932
933 pub fn timer<M>(&mut self, timer: M) -> &mut Self
935 where
936 M: Timer + Send + Sync + 'static,
937 {
938 self.inner.http2.timer(timer);
939 self
940 }
941
942 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
944 where
945 S: Service<Request<Incoming>, Response = Response<B>>,
946 S::Future: 'static,
947 S::Error: Into<Box<dyn StdError + Send + Sync>>,
948 B: Body + 'static,
949 B::Error: Into<Box<dyn StdError + Send + Sync>>,
950 I: Read + Write + Unpin + 'static,
951 E: HttpServerConnExec<S::Future, B>,
952 {
953 self.inner.serve_connection(io, service).await
954 }
955
956 pub fn serve_connection_with_upgrades<I, S, B>(
960 &self,
961 io: I,
962 service: S,
963 ) -> UpgradeableConnection<'_, I, S, E>
964 where
965 S: Service<Request<Incoming>, Response = Response<B>>,
966 S::Future: 'static,
967 S::Error: Into<Box<dyn StdError + Send + Sync>>,
968 B: Body + 'static,
969 B::Error: Into<Box<dyn StdError + Send + Sync>>,
970 I: Read + Write + Unpin + Send + 'static,
971 E: HttpServerConnExec<S::Future, B>,
972 {
973 self.inner.serve_connection_with_upgrades(io, service)
974 }
975}
976
977#[cfg(test)]
978mod tests {
979 use crate::{
980 rt::{TokioExecutor, TokioIo},
981 server::conn::auto,
982 };
983 use http::{Request, Response};
984 use http_body::Body;
985 use http_body_util::{BodyExt, Empty, Full};
986 use hyper::{body, body::Bytes, client, service::service_fn};
987 use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
988 use tokio::{
989 net::{TcpListener, TcpStream},
990 pin,
991 };
992
993 const BODY: &[u8] = b"Hello, world!";
994
995 #[test]
996 fn configuration() {
997 auto::Builder::new(TokioExecutor::new())
999 .http1()
1000 .keep_alive(true)
1001 .http2()
1002 .keep_alive_interval(None);
1003 let mut builder = auto::Builder::new(TokioExecutor::new());
1007
1008 builder.http1().keep_alive(true);
1009 builder.http2().keep_alive_interval(None);
1010 }
1012
1013 #[cfg(not(miri))]
1014 #[tokio::test]
1015 async fn http1() {
1016 let addr = start_server(false, false).await;
1017 let mut sender = connect_h1(addr).await;
1018
1019 let response = sender
1020 .send_request(Request::new(Empty::<Bytes>::new()))
1021 .await
1022 .unwrap();
1023
1024 let body = response.into_body().collect().await.unwrap().to_bytes();
1025
1026 assert_eq!(body, BODY);
1027 }
1028
1029 #[cfg(not(miri))]
1030 #[tokio::test]
1031 async fn http2() {
1032 let addr = start_server(false, false).await;
1033 let mut sender = connect_h2(addr).await;
1034
1035 let response = sender
1036 .send_request(Request::new(Empty::<Bytes>::new()))
1037 .await
1038 .unwrap();
1039
1040 let body = response.into_body().collect().await.unwrap().to_bytes();
1041
1042 assert_eq!(body, BODY);
1043 }
1044
1045 #[cfg(not(miri))]
1046 #[tokio::test]
1047 async fn http2_only() {
1048 let addr = start_server(false, true).await;
1049 let mut sender = connect_h2(addr).await;
1050
1051 let response = sender
1052 .send_request(Request::new(Empty::<Bytes>::new()))
1053 .await
1054 .unwrap();
1055
1056 let body = response.into_body().collect().await.unwrap().to_bytes();
1057
1058 assert_eq!(body, BODY);
1059 }
1060
1061 #[cfg(not(miri))]
1062 #[tokio::test]
1063 async fn http2_only_fail_if_client_is_http1() {
1064 let addr = start_server(false, true).await;
1065 let mut sender = connect_h1(addr).await;
1066
1067 let _ = sender
1068 .send_request(Request::new(Empty::<Bytes>::new()))
1069 .await
1070 .expect_err("should fail");
1071 }
1072
1073 #[cfg(not(miri))]
1074 #[tokio::test]
1075 async fn http1_only() {
1076 let addr = start_server(true, false).await;
1077 let mut sender = connect_h1(addr).await;
1078
1079 let response = sender
1080 .send_request(Request::new(Empty::<Bytes>::new()))
1081 .await
1082 .unwrap();
1083
1084 let body = response.into_body().collect().await.unwrap().to_bytes();
1085
1086 assert_eq!(body, BODY);
1087 }
1088
1089 #[cfg(not(miri))]
1090 #[tokio::test]
1091 async fn http1_only_fail_if_client_is_http2() {
1092 let addr = start_server(true, false).await;
1093 let mut sender = connect_h2(addr).await;
1094
1095 let _ = sender
1096 .send_request(Request::new(Empty::<Bytes>::new()))
1097 .await
1098 .expect_err("should fail");
1099 }
1100
1101 #[cfg(not(miri))]
1102 #[tokio::test]
1103 async fn graceful_shutdown() {
1104 let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1105 .await
1106 .unwrap();
1107
1108 let listener_addr = listener.local_addr().unwrap();
1109
1110 let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1112 let _stream = TcpStream::connect(listener_addr).await.unwrap();
1114
1115 let (stream, _) = listen_task.await.unwrap();
1116 let stream = TokioIo::new(stream);
1117 let builder = auto::Builder::new(TokioExecutor::new());
1118 let connection = builder.serve_connection(stream, service_fn(hello));
1119
1120 pin!(connection);
1121
1122 connection.as_mut().graceful_shutdown();
1123
1124 let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1125 .await
1126 .expect("Connection should have finished in a timely manner after graceful shutdown.")
1127 .expect_err("Connection should have been interrupted.");
1128
1129 let connection_error = connection_error
1130 .downcast_ref::<std::io::Error>()
1131 .expect("The error should have been `std::io::Error`.");
1132 assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1133 }
1134
1135 async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1136 where
1137 B: Body + Send + 'static,
1138 B::Data: Send,
1139 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1140 {
1141 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1142 let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1143
1144 tokio::spawn(connection);
1145
1146 sender
1147 }
1148
1149 async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1150 where
1151 B: Body + Unpin + Send + 'static,
1152 B::Data: Send,
1153 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1154 {
1155 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1156 let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1157 .handshake(stream)
1158 .await
1159 .unwrap();
1160
1161 tokio::spawn(connection);
1162
1163 sender
1164 }
1165
1166 async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1167 let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1168 let listener = TcpListener::bind(addr).await.unwrap();
1169
1170 let local_addr = listener.local_addr().unwrap();
1171
1172 tokio::spawn(async move {
1173 loop {
1174 let (stream, _) = listener.accept().await.unwrap();
1175 let stream = TokioIo::new(stream);
1176 tokio::task::spawn(async move {
1177 let mut builder = auto::Builder::new(TokioExecutor::new());
1178 if h1_only {
1179 builder = builder.http1_only();
1180 builder.serve_connection(stream, service_fn(hello)).await
1181 } else if h2_only {
1182 builder = builder.http2_only();
1183 builder.serve_connection(stream, service_fn(hello)).await
1184 } else {
1185 builder
1186 .http2()
1187 .max_header_list_size(4096)
1188 .serve_connection_with_upgrades(stream, service_fn(hello))
1189 .await
1190 }
1191 .unwrap();
1192 });
1193 }
1194 });
1195
1196 local_addr
1197 }
1198
1199 async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1200 Ok(Response::new(Full::new(Bytes::from(BODY))))
1201 }
1202}