hyper_serve/tls_rustls/
future.rs1use crate::tls_rustls::RustlsConfig;
4use pin_project_lite::pin_project;
5use std::io::{Error, ErrorKind};
6use std::time::Duration;
7use std::{
8 fmt,
9 future::Future,
10 io,
11 pin::Pin,
12 task::{Context, Poll},
13};
14use tokio::io::{AsyncRead, AsyncWrite};
15use tokio::time::{timeout, Timeout};
16use tokio_rustls::{server::TlsStream, Accept, TlsAcceptor};
17
18pin_project! {
19 pub struct RustlsAcceptorFuture<F, I, S> {
21 #[pin]
22 inner: AcceptFuture<F, I, S>,
23 config: Option<RustlsConfig>,
24 }
25}
26
27impl<F, I, S> RustlsAcceptorFuture<F, I, S> {
28 pub(crate) fn new(future: F, config: RustlsConfig, handshake_timeout: Duration) -> Self {
29 let inner = AcceptFuture::Inner {
30 future,
31 handshake_timeout,
32 };
33 let config = Some(config);
34
35 Self { inner, config }
36 }
37}
38
39impl<F, I, S> fmt::Debug for RustlsAcceptorFuture<F, I, S> {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("RustlsAcceptorFuture").finish()
42 }
43}
44
45pin_project! {
46 #[project = AcceptFutureProj]
47 enum AcceptFuture<F, I, S> {
48 Inner {
49 #[pin]
50 future: F,
51 handshake_timeout: Duration,
52 },
53 Accept {
54 #[pin]
55 future: Timeout<Accept<I>>,
56 service: Option<S>,
57 },
58 }
59}
60
61impl<F, I, S> Future for RustlsAcceptorFuture<F, I, S>
62where
63 F: Future<Output = io::Result<(I, S)>>,
64 I: AsyncRead + AsyncWrite + Unpin,
65{
66 type Output = io::Result<(TlsStream<I>, S)>;
67
68 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69 let mut this = self.project();
70
71 loop {
72 match this.inner.as_mut().project() {
73 AcceptFutureProj::Inner {
74 future,
75 handshake_timeout,
76 } => {
77 match future.poll(cx) {
78 Poll::Ready(Ok((stream, service))) => {
79 let server_config = this.config
80 .take()
81 .expect("config is not set. this is a bug in hyper-serve, please report")
82 .get_inner();
83
84 let acceptor = TlsAcceptor::from(server_config);
85 let future = acceptor.accept(stream);
86
87 let service = Some(service);
88 let handshake_timeout = *handshake_timeout;
89
90 this.inner.set(AcceptFuture::Accept {
91 future: timeout(handshake_timeout, future),
92 service,
93 });
94 }
95 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
96 Poll::Pending => return Poll::Pending,
97 }
98 }
99 AcceptFutureProj::Accept { future, service } => match future.poll(cx) {
100 Poll::Ready(Ok(Ok(stream))) => {
101 let service = service.take().expect("future polled after ready");
102
103 return Poll::Ready(Ok((stream, service)));
104 }
105 Poll::Ready(Ok(Err(e))) => return Poll::Ready(Err(e)),
106 Poll::Ready(Err(timeout)) => {
107 return Poll::Ready(Err(Error::new(ErrorKind::TimedOut, timeout)))
108 }
109 Poll::Pending => return Poll::Pending,
110 },
111 }
112 }
113 }
114}