Skip to main content

gatel_core/stream/
mod.rs

1//! L4 TCP stream proxy.
2//!
3//! Provides simple bidirectional TCP proxying for non-HTTP protocols such as
4//! MySQL, Redis, or any other TCP-based service. Each configured listener
5//! binds on a local address and forwards all traffic to a single upstream
6//! target using `tokio::io::copy_bidirectional`.
7
8use tokio::net::TcpListener;
9use tokio::task::JoinHandle;
10use tracing::{debug, error, info};
11
12use crate::ProxyError;
13use crate::config::StreamConfig;
14
15/// Start TCP stream listeners for all entries in the given `StreamConfig`.
16///
17/// Returns a `Vec<JoinHandle<()>>` — one per listener. Each handle runs
18/// indefinitely, accepting connections and proxying them.
19pub async fn start_stream_listeners(
20    config: &StreamConfig,
21) -> Result<Vec<JoinHandle<()>>, ProxyError> {
22    let mut handles = Vec::with_capacity(config.listeners.len());
23
24    for listener_cfg in &config.listeners {
25        let listen_addr = listener_cfg.listen;
26        let target = listener_cfg.proxy.clone();
27
28        let tcp_listener = TcpListener::bind(listen_addr).await?;
29        info!(
30            listen = %listen_addr,
31            target = %target,
32            "stream proxy listening"
33        );
34
35        let handle = tokio::spawn(async move {
36            run_stream_listener(tcp_listener, target).await;
37        });
38
39        handles.push(handle);
40    }
41
42    Ok(handles)
43}
44
45/// Accept loop for a single stream listener.
46///
47/// For each incoming connection, spawns a task that connects to the upstream
48/// target and copies bytes bidirectionally.
49async fn run_stream_listener(listener: TcpListener, target: String) {
50    loop {
51        let (inbound, client_addr) = match listener.accept().await {
52            Ok(conn) => conn,
53            Err(e) => {
54                error!("stream accept error: {e}");
55                continue;
56            }
57        };
58
59        let target = target.clone();
60        tokio::spawn(async move {
61            debug!(
62                client = %client_addr,
63                target = %target,
64                "stream proxy: new connection"
65            );
66
67            match tokio::net::TcpStream::connect(&target).await {
68                Ok(outbound) => {
69                    if let Err(e) = proxy_bidirectional(inbound, outbound).await {
70                        debug!(
71                            client = %client_addr,
72                            target = %target,
73                            error = %e,
74                            "stream proxy: connection ended"
75                        );
76                    } else {
77                        debug!(
78                            client = %client_addr,
79                            target = %target,
80                            "stream proxy: connection closed"
81                        );
82                    }
83                }
84                Err(e) => {
85                    error!(
86                        client = %client_addr,
87                        target = %target,
88                        error = %e,
89                        "stream proxy: failed to connect to upstream"
90                    );
91                }
92            }
93        });
94    }
95}
96
97/// Copy bytes bidirectionally between two TCP streams until either side
98/// closes or an error occurs.
99async fn proxy_bidirectional(
100    mut inbound: tokio::net::TcpStream,
101    mut outbound: tokio::net::TcpStream,
102) -> Result<(), std::io::Error> {
103    let (client_to_server, server_to_client) =
104        tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await?;
105
106    debug!(
107        client_to_server = client_to_server,
108        server_to_client = server_to_client,
109        "stream proxy: transfer complete"
110    );
111
112    Ok(())
113}