1use tokio::net::TcpListener;
9use tokio::task::JoinHandle;
10use tracing::{debug, error, info};
11
12use crate::ProxyError;
13use crate::config::StreamConfig;
14
15pub async fn start_stream_listeners(
20 config: &StreamConfig,
21) -> Result<Vec<JoinHandle<()>>, ProxyError> {
22 let mut handles = Vec::with_capacity(config.listeners.len());
23
24 for listener_cfg in &config.listeners {
25 let listen_addr = listener_cfg.listen;
26 let target = listener_cfg.proxy.clone();
27
28 let tcp_listener = TcpListener::bind(listen_addr).await?;
29 info!(
30 listen = %listen_addr,
31 target = %target,
32 "stream proxy listening"
33 );
34
35 let handle = tokio::spawn(async move {
36 run_stream_listener(tcp_listener, target).await;
37 });
38
39 handles.push(handle);
40 }
41
42 Ok(handles)
43}
44
45async fn run_stream_listener(listener: TcpListener, target: String) {
50 loop {
51 let (inbound, client_addr) = match listener.accept().await {
52 Ok(conn) => conn,
53 Err(e) => {
54 error!("stream accept error: {e}");
55 continue;
56 }
57 };
58
59 let target = target.clone();
60 tokio::spawn(async move {
61 debug!(
62 client = %client_addr,
63 target = %target,
64 "stream proxy: new connection"
65 );
66
67 match tokio::net::TcpStream::connect(&target).await {
68 Ok(outbound) => {
69 if let Err(e) = proxy_bidirectional(inbound, outbound).await {
70 debug!(
71 client = %client_addr,
72 target = %target,
73 error = %e,
74 "stream proxy: connection ended"
75 );
76 } else {
77 debug!(
78 client = %client_addr,
79 target = %target,
80 "stream proxy: connection closed"
81 );
82 }
83 }
84 Err(e) => {
85 error!(
86 client = %client_addr,
87 target = %target,
88 error = %e,
89 "stream proxy: failed to connect to upstream"
90 );
91 }
92 }
93 });
94 }
95}
96
97async fn proxy_bidirectional(
100 mut inbound: tokio::net::TcpStream,
101 mut outbound: tokio::net::TcpStream,
102) -> Result<(), std::io::Error> {
103 let (client_to_server, server_to_client) =
104 tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await?;
105
106 debug!(
107 client_to_server = client_to_server,
108 server_to_client = server_to_client,
109 "stream proxy: transfer complete"
110 );
111
112 Ok(())
113}