flv_tls_proxy/
proxy.rs

1use std::{io::Error as IoError, sync::Arc};
2
3use anyhow::Result;
4use event_listener::Event;
5use futures_util::stream::StreamExt;
6use tracing::{debug, error, info, instrument};
7
8use fluvio_future::net::TcpStream;
9use fluvio_future::rust_tls::{DefaultServerTlsStream, TlsAcceptor};
10
11type TerminateEvent = Arc<Event>;
12
13use crate::authenticator::{Authenticator, NullAuthenticator};
14
15type SharedAuthenticator = Arc<Box<dyn Authenticator>>;
16
17/// start TLS proxy at addr to target
18pub async fn start(addr: &str, acceptor: TlsAcceptor, target: String) -> Result<(), IoError> {
19    let builder = ProxyBuilder::new(addr.to_string(), acceptor, target);
20    builder.start().await
21}
22
23/// start TLS proxy with authenticator at addr to target
24pub async fn start_with_authenticator(
25    addr: &str,
26    acceptor: TlsAcceptor,
27    target: String,
28    authenticator: Box<dyn Authenticator>,
29) -> Result<(), IoError> {
30    let builder =
31        ProxyBuilder::new(addr.to_string(), acceptor, target).with_authenticator(authenticator);
32    builder.start().await
33}
34
35pub struct ProxyBuilder {
36    addr: String,
37    acceptor: TlsAcceptor,
38    target: String,
39    authenticator: Box<dyn Authenticator>,
40    terminate: TerminateEvent,
41}
42
43impl ProxyBuilder {
44    pub fn new(addr: String, acceptor: TlsAcceptor, target: String) -> Self {
45        Self {
46            addr,
47            acceptor,
48            target,
49            authenticator: Box::new(NullAuthenticator),
50            terminate: Arc::new(Event::new()),
51        }
52    }
53
54    pub fn with_authenticator(mut self, authenticator: Box<dyn Authenticator>) -> Self {
55        self.authenticator = authenticator;
56        self
57    }
58
59    pub fn with_terminate(mut self, terminate: TerminateEvent) -> Self {
60        self.terminate = terminate;
61        self
62    }
63
64    #[instrument(skip(self))]
65    pub async fn start(self) -> Result<(), IoError> {
66        use tokio::select;
67
68        use fluvio_future::net::TcpListener;
69        use fluvio_future::task::spawn;
70
71        let listener = TcpListener::bind(&self.addr).await?;
72        info!(self.addr, "proxy started at");
73        let mut incoming = listener.incoming();
74        let shared_authenticator = Arc::new(self.authenticator);
75
76        loop {
77            select! {
78                _ = self.terminate.listen() => {
79                    info!("terminate event received");
80                    return Ok(());
81                }
82                incoming_stream = incoming.next() => {
83                    if let Some(stream) = incoming_stream {
84                        debug!("server: got connection from client");
85                        if let Ok(tcp_stream) = stream {
86                            let acceptor = self.acceptor.clone();
87                            let target = self.target.clone();
88                            spawn(process_stream(
89                                acceptor,
90                                tcp_stream,
91                                target,
92                                shared_authenticator.clone()
93                            ));
94                        } else {
95                            error!("no stream detected");
96                            return Ok(());
97                        }
98
99                    } else {
100                        info!("no more incoming streaming");
101                        return Ok(());
102                    }
103                }
104
105            }
106        }
107    }
108}
109
110/// start TLS stream at addr to target
111#[instrument(skip(acceptor, raw_stream, authenticator))]
112async fn process_stream(
113    acceptor: TlsAcceptor,
114    raw_stream: TcpStream,
115    target: String,
116    authenticator: SharedAuthenticator,
117) {
118    let source = raw_stream
119        .peer_addr()
120        .map(|addr| addr.to_string())
121        .unwrap_or_else(|_| "".to_owned());
122
123    info!(source, "new connection from");
124
125    let handshake = acceptor.accept(raw_stream).await;
126
127    match handshake {
128        Ok(inner_stream) => {
129            info!(source, "handshake success");
130            if let Err(err) = proxy(inner_stream, target, source.clone(), authenticator).await {
131                error!(?source, ?err, "error processing tls");
132            }
133        }
134        Err(err) => error!(?source, ?err, "error handshaking"),
135    }
136}
137
138#[instrument(skip(tls_stream, authenticator))]
139async fn proxy(
140    tls_stream: DefaultServerTlsStream,
141    target: String,
142    source: String,
143    authenticator: SharedAuthenticator,
144) -> Result<()> {
145    use tokio_util::compat::FuturesAsyncReadCompatExt;
146
147    debug!("trying to connect to target");
148    let tcp_stream = TcpStream::connect(&target).await?;
149    info!("open tcp stream");
150
151    let auth_success = authenticator.authenticate(&tls_stream, &tcp_stream).await?;
152    if !auth_success {
153        info!("authentication failed, dropping connection");
154        return Ok(());
155    } else {
156        info!("authentication succeeded");
157    }
158
159    debug!(?source, ?target, "starting bidirectional copy between",);
160
161    // Convert futures AsyncRead/AsyncWrite to tokio AsyncRead/AsyncWrite
162    let mut tls_compat = tls_stream.compat();
163    let mut tcp_compat = tcp_stream.compat();
164
165    match tokio::io::copy_bidirectional(&mut tls_compat, &mut tcp_compat).await {
166        Ok((tls_to_target_bytes, target_to_tls_bytes)) => {
167            info!(
168                ?source,
169                ?tls_to_target_bytes,
170                ?target_to_tls_bytes,
171                "proxy connection completed",
172            );
173        }
174        Err(err) => {
175            error!(?source, ?err, "error in bidirectional");
176        }
177    }
178
179    Ok(())
180}