1use std::{io::Error as IoError, sync::Arc};
2
3use anyhow::Result;
4use event_listener::Event;
5use futures_util::stream::StreamExt;
6use tracing::{debug, error, info, instrument};
7
8use fluvio_future::net::TcpStream;
9use fluvio_future::rust_tls::{DefaultServerTlsStream, TlsAcceptor};
10
11type TerminateEvent = Arc<Event>;
12
13use crate::authenticator::{Authenticator, NullAuthenticator};
14
15type SharedAuthenticator = Arc<Box<dyn Authenticator>>;
16
17pub async fn start(addr: &str, acceptor: TlsAcceptor, target: String) -> Result<(), IoError> {
19 let builder = ProxyBuilder::new(addr.to_string(), acceptor, target);
20 builder.start().await
21}
22
23pub async fn start_with_authenticator(
25 addr: &str,
26 acceptor: TlsAcceptor,
27 target: String,
28 authenticator: Box<dyn Authenticator>,
29) -> Result<(), IoError> {
30 let builder =
31 ProxyBuilder::new(addr.to_string(), acceptor, target).with_authenticator(authenticator);
32 builder.start().await
33}
34
35pub struct ProxyBuilder {
36 addr: String,
37 acceptor: TlsAcceptor,
38 target: String,
39 authenticator: Box<dyn Authenticator>,
40 terminate: TerminateEvent,
41}
42
43impl ProxyBuilder {
44 pub fn new(addr: String, acceptor: TlsAcceptor, target: String) -> Self {
45 Self {
46 addr,
47 acceptor,
48 target,
49 authenticator: Box::new(NullAuthenticator),
50 terminate: Arc::new(Event::new()),
51 }
52 }
53
54 pub fn with_authenticator(mut self, authenticator: Box<dyn Authenticator>) -> Self {
55 self.authenticator = authenticator;
56 self
57 }
58
59 pub fn with_terminate(mut self, terminate: TerminateEvent) -> Self {
60 self.terminate = terminate;
61 self
62 }
63
64 #[instrument(skip(self))]
65 pub async fn start(self) -> Result<(), IoError> {
66 use tokio::select;
67
68 use fluvio_future::net::TcpListener;
69 use fluvio_future::task::spawn;
70
71 let listener = TcpListener::bind(&self.addr).await?;
72 info!(self.addr, "proxy started at");
73 let mut incoming = listener.incoming();
74 let shared_authenticator = Arc::new(self.authenticator);
75
76 loop {
77 select! {
78 _ = self.terminate.listen() => {
79 info!("terminate event received");
80 return Ok(());
81 }
82 incoming_stream = incoming.next() => {
83 if let Some(stream) = incoming_stream {
84 debug!("server: got connection from client");
85 if let Ok(tcp_stream) = stream {
86 let acceptor = self.acceptor.clone();
87 let target = self.target.clone();
88 spawn(process_stream(
89 acceptor,
90 tcp_stream,
91 target,
92 shared_authenticator.clone()
93 ));
94 } else {
95 error!("no stream detected");
96 return Ok(());
97 }
98
99 } else {
100 info!("no more incoming streaming");
101 return Ok(());
102 }
103 }
104
105 }
106 }
107 }
108}
109
110#[instrument(skip(acceptor, raw_stream, authenticator))]
112async fn process_stream(
113 acceptor: TlsAcceptor,
114 raw_stream: TcpStream,
115 target: String,
116 authenticator: SharedAuthenticator,
117) {
118 let source = raw_stream
119 .peer_addr()
120 .map(|addr| addr.to_string())
121 .unwrap_or_else(|_| "".to_owned());
122
123 info!(source, "new connection from");
124
125 let handshake = acceptor.accept(raw_stream).await;
126
127 match handshake {
128 Ok(inner_stream) => {
129 info!(source, "handshake success");
130 if let Err(err) = proxy(inner_stream, target, source.clone(), authenticator).await {
131 error!(?source, ?err, "error processing tls");
132 }
133 }
134 Err(err) => error!(?source, ?err, "error handshaking"),
135 }
136}
137
138#[instrument(skip(tls_stream, authenticator))]
139async fn proxy(
140 tls_stream: DefaultServerTlsStream,
141 target: String,
142 source: String,
143 authenticator: SharedAuthenticator,
144) -> Result<()> {
145 use tokio_util::compat::FuturesAsyncReadCompatExt;
146
147 debug!("trying to connect to target");
148 let tcp_stream = TcpStream::connect(&target).await?;
149 info!("open tcp stream");
150
151 let auth_success = authenticator.authenticate(&tls_stream, &tcp_stream).await?;
152 if !auth_success {
153 info!("authentication failed, dropping connection");
154 return Ok(());
155 } else {
156 info!("authentication succeeded");
157 }
158
159 debug!(?source, ?target, "starting bidirectional copy between",);
160
161 let mut tls_compat = tls_stream.compat();
163 let mut tcp_compat = tcp_stream.compat();
164
165 match tokio::io::copy_bidirectional(&mut tls_compat, &mut tcp_compat).await {
166 Ok((tls_to_target_bytes, target_to_tls_bytes)) => {
167 info!(
168 ?source,
169 ?tls_to_target_bytes,
170 ?target_to_tls_bytes,
171 "proxy connection completed",
172 );
173 }
174 Err(err) => {
175 error!(?source, ?err, "error in bidirectional");
176 }
177 }
178
179 Ok(())
180}