1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::{
    net::SocketAddr,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
    time::Duration,
};

use anyhow::Result;
use futures::Stream;
use hyper::server::{
    accept::Accept,
    conn::{AddrIncoming, AddrStream},
};
use log::{error, warn};
use rustls::{server::Acceptor, ServerConfig};
use tokio::sync::{mpsc, watch};
use tokio_rustls::{server::TlsStream, LazyConfigAcceptor};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};

pub struct TlsIncoming {
    incoming: StreamWrapper,
    tls_config: watch::Receiver<Option<Arc<ServerConfig>>>,
}

struct StreamWrapper(AddrIncoming);

impl Stream for StreamWrapper {
    type Item = Result<AddrStream, std::io::Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.0).poll_accept(cx)
    }
}

impl TlsIncoming {
    pub fn new(
        listen: SocketAddr,
        nodelay: bool,
        keepalive: Option<Duration>,
        tls_config: watch::Receiver<Option<Arc<ServerConfig>>>,
    ) -> Result<Self> {
        let mut incoming = AddrIncoming::bind(&listen)?;
        incoming.set_nodelay(nodelay);
        incoming.set_keepalive(keepalive);

        Ok(Self {
            incoming: StreamWrapper(incoming),
            tls_config,
        })
    }

    pub fn start(mut self) -> impl Stream<Item = Result<TlsStream<AddrStream>, std::io::Error>> {
        let (sender, receiver) = mpsc::channel::<Result<TlsStream<AddrStream>, std::io::Error>>(10);
        tokio::spawn(async move {
            loop {
                let client = match self.incoming.next().await {
                    Some(Ok(x)) => x,
                    Some(Err(e)) => {
                        error!("error during accepting TCP client: {e}");
                        continue;
                    }
                    None => break,
                };
                let Some(server_config) = self.tls_config.borrow().clone() else {
                    warn!("inbound TLS connection dropped (no certificates loaded, but were configured)");
                    continue
                };

                let lazy = LazyConfigAcceptor::new(Acceptor::default(), client);
                let sender = sender.clone();
                tokio::spawn(async move {
                    let accepted = match lazy.await {
                        Ok(x) => x,
                        Err(e) => {
                            error!("error during TLS init: {e}");
                            return;
                        }
                    };
                    let tls_stream = accepted.into_stream(server_config).await;
                    if sender.send(tls_stream).await.is_err() {
                        error!("TLS acceptor hung");
                    }
                });
            }
        });
        ReceiverStream::new(receiver)
    }
}