tokio_rustls_acme/
incoming.rs

1use crate::acceptor::{AcmeAccept, AcmeAcceptor};
2use crate::AcmeState;
3use futures::stream::{FusedStream, FuturesUnordered};
4use futures::Stream;
5use rustls::ServerConfig;
6use std::fmt::Debug;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10use tokio::io::{AsyncRead, AsyncWrite};
11use tokio_rustls::{server::TlsStream, Accept};
12
13pub struct Incoming<
14    TCP: AsyncRead + AsyncWrite + Unpin,
15    ETCP,
16    ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
17    EC: Debug + 'static,
18    EA: Debug + 'static,
19> {
20    state: AcmeState<EC, EA>,
21    acceptor: AcmeAcceptor,
22    rustls_config: Arc<ServerConfig>,
23    tcp_incoming: Option<ITCP>,
24    acme_accepting: FuturesUnordered<AcmeAccept<TCP>>,
25    tls_accepting: FuturesUnordered<Accept<TCP>>,
26}
27
28impl<
29        TCP: AsyncRead + AsyncWrite + Unpin,
30        ETCP,
31        ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
32        EC: Debug + 'static,
33        EA: Debug + 'static,
34    > Unpin for Incoming<TCP, ETCP, ITCP, EC, EA>
35{
36}
37
38impl<
39        TCP: AsyncRead + AsyncWrite + Unpin,
40        ETCP,
41        ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
42        EC: Debug + 'static,
43        EA: Debug + 'static,
44    > Incoming<TCP, ETCP, ITCP, EC, EA>
45{
46    pub fn new(
47        tcp_incoming: ITCP,
48        state: AcmeState<EC, EA>,
49        acceptor: AcmeAcceptor,
50        alpn_protocols: Vec<Vec<u8>>,
51    ) -> Self {
52        let mut config = ServerConfig::builder()
53            .with_no_client_auth()
54            .with_cert_resolver(state.resolver());
55        config.alpn_protocols = alpn_protocols;
56        Self {
57            state,
58            acceptor,
59            rustls_config: Arc::new(config),
60            tcp_incoming: Some(tcp_incoming),
61            acme_accepting: FuturesUnordered::new(),
62            tls_accepting: FuturesUnordered::new(),
63        }
64    }
65}
66
67impl<
68        TCP: AsyncRead + AsyncWrite + Unpin,
69        ETCP,
70        ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
71        EC: Debug + 'static,
72        EA: Debug + 'static,
73    > Stream for Incoming<TCP, ETCP, ITCP, EC, EA>
74{
75    type Item = Result<TlsStream<TCP>, ETCP>;
76
77    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78        loop {
79            match Pin::new(&mut self.state).poll_next(cx) {
80                Poll::Ready(Some(event)) => {
81                    match event {
82                        Ok(ok) => log::info!("event: {:?}", ok),
83                        Err(err) => log::error!("event: {:?}", err),
84                    }
85                    continue;
86                }
87                Poll::Ready(None) => unreachable!(),
88                Poll::Pending => {}
89            }
90            match Pin::new(&mut self.acme_accepting).poll_next(cx) {
91                Poll::Ready(Some(Ok(Some(tls)))) => self
92                    .tls_accepting
93                    .push(tls.into_stream(self.rustls_config.clone())),
94                Poll::Ready(Some(Ok(None))) => {
95                    log::info!("received TLS-ALPN-01 validation request");
96                    continue;
97                }
98                Poll::Ready(Some(Err(err))) => {
99                    log::error!("tls accept failed, {:?}", err);
100                    continue;
101                }
102                Poll::Ready(None) | Poll::Pending => {}
103            }
104            match Pin::new(&mut self.tls_accepting).poll_next(cx) {
105                Poll::Ready(Some(Ok(tls))) => return Poll::Ready(Some(Ok(tls))),
106                Poll::Ready(Some(Err(err))) => {
107                    log::error!("tls accept failed, {:?}", err);
108                    continue;
109                }
110                Poll::Ready(None) | Poll::Pending => {}
111            }
112            let tcp_incoming = match &mut self.tcp_incoming {
113                Some(tcp_incoming) => tcp_incoming,
114                None => match self.is_terminated() {
115                    true => return Poll::Ready(None),
116                    false => return Poll::Pending,
117                },
118            };
119            match Pin::new(tcp_incoming).poll_next(cx) {
120                Poll::Ready(Some(Ok(tcp))) => self.acme_accepting.push(self.acceptor.accept(tcp)),
121                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
122                Poll::Ready(None) => drop(self.tcp_incoming.as_mut().take()),
123                Poll::Pending => return Poll::Pending,
124            }
125        }
126    }
127}
128
129impl<
130        TCP: AsyncRead + AsyncWrite + Unpin,
131        ETCP,
132        ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
133        EC: Debug + 'static,
134        EA: Debug + 'static,
135    > FusedStream for Incoming<TCP, ETCP, ITCP, EC, EA>
136{
137    fn is_terminated(&self) -> bool {
138        self.tcp_incoming.is_none()
139            && self.acme_accepting.is_terminated()
140            && self.tls_accepting.is_terminated()
141    }
142}