Skip to main content

zlayer_proxy/stream/
tcp.rs

1//! TCP stream proxy service
2//!
3//! Implements raw TCP proxying with a standalone `serve()` method.
4//! Provides bidirectional tunneling between clients and backends.
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9
10use super::registry::StreamRegistry;
11
12/// TCP stream proxy service
13///
14/// Listens on a port and proxies TCP connections to registered backends
15/// using round-robin load balancing.
16pub struct TcpStreamService {
17    registry: Arc<StreamRegistry>,
18    listen_port: u16,
19}
20
21impl TcpStreamService {
22    /// Create a new TCP stream service
23    #[must_use]
24    pub fn new(registry: Arc<StreamRegistry>, listen_port: u16) -> Self {
25        Self {
26            registry,
27            listen_port,
28        }
29    }
30
31    /// Get the listen port
32    #[must_use]
33    pub fn port(&self) -> u16 {
34        self.listen_port
35    }
36
37    /// Get a reference to the registry
38    #[must_use]
39    pub fn registry(&self) -> &Arc<StreamRegistry> {
40        &self.registry
41    }
42
43    /// Run a standalone TCP accept loop on the given listener.
44    ///
45    /// For each accepted connection, resolves a backend from the registry and
46    /// spawns a task to perform bidirectional tunneling. This method runs
47    /// indefinitely until the listener encounters a fatal error.
48    pub async fn serve(self: Arc<Self>, listener: TcpListener) {
49        tracing::info!(port = self.listen_port, "TCP stream proxy listening");
50
51        loop {
52            let (client_stream, client_addr) = match listener.accept().await {
53                Ok(conn) => conn,
54                Err(e) => {
55                    // Transient errors (too many open files, etc.) -- log and retry
56                    tracing::warn!(
57                        port = self.listen_port,
58                        error = %e,
59                        "TCP accept error, retrying"
60                    );
61                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
62                    continue;
63                }
64            };
65
66            let svc = Arc::clone(&self);
67            tokio::spawn(async move {
68                svc.handle_raw_connection(client_stream, client_addr).await;
69            });
70        }
71    }
72
73    /// Handle a single raw TCP connection (resolve backend, tunnel).
74    async fn handle_raw_connection(
75        &self,
76        client_stream: tokio::net::TcpStream,
77        client_addr: SocketAddr,
78    ) {
79        // Resolve service for this port
80        let Some(service) = self.registry.resolve_tcp(self.listen_port) else {
81            tracing::warn!(
82                port = self.listen_port,
83                client = %client_addr,
84                "No service registered for TCP port"
85            );
86            return;
87        };
88
89        // Select backend using round-robin
90        let Some(backend) = service.select_backend() else {
91            tracing::warn!(
92                port = self.listen_port,
93                service = %service.name,
94                client = %client_addr,
95                "No backends available for TCP service"
96            );
97            return;
98        };
99
100        tracing::debug!(
101            port = self.listen_port,
102            service = %service.name,
103            client = %client_addr,
104            backend = %backend,
105            "Proxying TCP connection"
106        );
107
108        // Connect to the upstream backend
109        let upstream = match tokio::net::TcpStream::connect(backend).await {
110            Ok(stream) => stream,
111            Err(e) => {
112                tracing::warn!(
113                    error = %e,
114                    backend = %backend,
115                    service = %service.name,
116                    client = %client_addr,
117                    "Failed to connect to TCP backend"
118                );
119                return;
120            }
121        };
122
123        // Perform bidirectional tunneling using raw TcpStreams
124        Self::duplex_raw(client_stream, upstream).await;
125    }
126
127    /// Bidirectional data copy between two raw `TcpStreams`.
128    ///
129    /// Uses `tokio::io::copy_bidirectional` for efficient zero-copy-capable
130    /// proxying when the OS supports it (e.g. splice on Linux).
131    async fn duplex_raw(
132        mut downstream: tokio::net::TcpStream,
133        mut upstream: tokio::net::TcpStream,
134    ) {
135        match tokio::io::copy_bidirectional(&mut downstream, &mut upstream).await {
136            Ok((down_to_up, up_to_down)) => {
137                tracing::debug!(
138                    down_to_up = down_to_up,
139                    up_to_down = up_to_down,
140                    "TCP tunnel closed"
141                );
142            }
143            Err(e) => {
144                tracing::debug!(error = %e, "TCP tunnel error");
145            }
146        }
147    }
148}