1use crate::acceptor::{AcmeAccept, AcmeAcceptor};
2use crate::AcmeState;
3use futures::stream::{FusedStream, FuturesUnordered};
4use futures::Stream;
5use rustls::ServerConfig;
6use std::fmt::Debug;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10use tokio::io::{AsyncRead, AsyncWrite};
11use tokio_rustls::{server::TlsStream, Accept};
12
13pub struct Incoming<
14 TCP: AsyncRead + AsyncWrite + Unpin,
15 ETCP,
16 ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
17 EC: Debug + 'static,
18 EA: Debug + 'static,
19> {
20 state: AcmeState<EC, EA>,
21 acceptor: AcmeAcceptor,
22 rustls_config: Arc<ServerConfig>,
23 tcp_incoming: Option<ITCP>,
24 acme_accepting: FuturesUnordered<AcmeAccept<TCP>>,
25 tls_accepting: FuturesUnordered<Accept<TCP>>,
26}
27
28impl<
29 TCP: AsyncRead + AsyncWrite + Unpin,
30 ETCP,
31 ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
32 EC: Debug + 'static,
33 EA: Debug + 'static,
34 > Unpin for Incoming<TCP, ETCP, ITCP, EC, EA>
35{
36}
37
38impl<
39 TCP: AsyncRead + AsyncWrite + Unpin,
40 ETCP,
41 ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
42 EC: Debug + 'static,
43 EA: Debug + 'static,
44 > Incoming<TCP, ETCP, ITCP, EC, EA>
45{
46 pub fn new(
47 tcp_incoming: ITCP,
48 state: AcmeState<EC, EA>,
49 acceptor: AcmeAcceptor,
50 alpn_protocols: Vec<Vec<u8>>,
51 ) -> Self {
52 let mut config = ServerConfig::builder()
53 .with_no_client_auth()
54 .with_cert_resolver(state.resolver());
55 config.alpn_protocols = alpn_protocols;
56 Self {
57 state,
58 acceptor,
59 rustls_config: Arc::new(config),
60 tcp_incoming: Some(tcp_incoming),
61 acme_accepting: FuturesUnordered::new(),
62 tls_accepting: FuturesUnordered::new(),
63 }
64 }
65}
66
67impl<
68 TCP: AsyncRead + AsyncWrite + Unpin,
69 ETCP,
70 ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
71 EC: Debug + 'static,
72 EA: Debug + 'static,
73 > Stream for Incoming<TCP, ETCP, ITCP, EC, EA>
74{
75 type Item = Result<TlsStream<TCP>, ETCP>;
76
77 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78 loop {
79 match Pin::new(&mut self.state).poll_next(cx) {
80 Poll::Ready(Some(event)) => {
81 match event {
82 Ok(ok) => log::info!("event: {:?}", ok),
83 Err(err) => log::error!("event: {:?}", err),
84 }
85 continue;
86 }
87 Poll::Ready(None) => unreachable!(),
88 Poll::Pending => {}
89 }
90 match Pin::new(&mut self.acme_accepting).poll_next(cx) {
91 Poll::Ready(Some(Ok(Some(tls)))) => self
92 .tls_accepting
93 .push(tls.into_stream(self.rustls_config.clone())),
94 Poll::Ready(Some(Ok(None))) => {
95 log::info!("received TLS-ALPN-01 validation request");
96 continue;
97 }
98 Poll::Ready(Some(Err(err))) => {
99 log::error!("tls accept failed, {:?}", err);
100 continue;
101 }
102 Poll::Ready(None) | Poll::Pending => {}
103 }
104 match Pin::new(&mut self.tls_accepting).poll_next(cx) {
105 Poll::Ready(Some(Ok(tls))) => return Poll::Ready(Some(Ok(tls))),
106 Poll::Ready(Some(Err(err))) => {
107 log::error!("tls accept failed, {:?}", err);
108 continue;
109 }
110 Poll::Ready(None) | Poll::Pending => {}
111 }
112 let tcp_incoming = match &mut self.tcp_incoming {
113 Some(tcp_incoming) => tcp_incoming,
114 None => match self.is_terminated() {
115 true => return Poll::Ready(None),
116 false => return Poll::Pending,
117 },
118 };
119 match Pin::new(tcp_incoming).poll_next(cx) {
120 Poll::Ready(Some(Ok(tcp))) => self.acme_accepting.push(self.acceptor.accept(tcp)),
121 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
122 Poll::Ready(None) => drop(self.tcp_incoming.as_mut().take()),
123 Poll::Pending => return Poll::Pending,
124 }
125 }
126 }
127}
128
129impl<
130 TCP: AsyncRead + AsyncWrite + Unpin,
131 ETCP,
132 ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
133 EC: Debug + 'static,
134 EA: Debug + 'static,
135 > FusedStream for Incoming<TCP, ETCP, ITCP, EC, EA>
136{
137 fn is_terminated(&self) -> bool {
138 self.tcp_incoming.is_none()
139 && self.acme_accepting.is_terminated()
140 && self.tls_accepting.is_terminated()
141 }
142}