zlayer_proxy/stream/
tcp.rs1use std::net::SocketAddr;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9
10use super::registry::StreamRegistry;
11
12pub struct TcpStreamService {
17 registry: Arc<StreamRegistry>,
18 listen_port: u16,
19}
20
21impl TcpStreamService {
22 #[must_use]
24 pub fn new(registry: Arc<StreamRegistry>, listen_port: u16) -> Self {
25 Self {
26 registry,
27 listen_port,
28 }
29 }
30
31 #[must_use]
33 pub fn port(&self) -> u16 {
34 self.listen_port
35 }
36
37 #[must_use]
39 pub fn registry(&self) -> &Arc<StreamRegistry> {
40 &self.registry
41 }
42
43 pub async fn serve(self: Arc<Self>, listener: TcpListener) {
49 tracing::info!(port = self.listen_port, "TCP stream proxy listening");
50
51 loop {
52 let (client_stream, client_addr) = match listener.accept().await {
53 Ok(conn) => conn,
54 Err(e) => {
55 tracing::warn!(
57 port = self.listen_port,
58 error = %e,
59 "TCP accept error, retrying"
60 );
61 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
62 continue;
63 }
64 };
65
66 let svc = Arc::clone(&self);
67 tokio::spawn(async move {
68 svc.handle_raw_connection(client_stream, client_addr).await;
69 });
70 }
71 }
72
73 async fn handle_raw_connection(
75 &self,
76 client_stream: tokio::net::TcpStream,
77 client_addr: SocketAddr,
78 ) {
79 let Some(service) = self.registry.resolve_tcp(self.listen_port) else {
81 tracing::warn!(
82 port = self.listen_port,
83 client = %client_addr,
84 "No service registered for TCP port"
85 );
86 return;
87 };
88
89 let Some(backend) = service.select_backend() else {
91 tracing::warn!(
92 port = self.listen_port,
93 service = %service.name,
94 client = %client_addr,
95 "No backends available for TCP service"
96 );
97 return;
98 };
99
100 tracing::debug!(
101 port = self.listen_port,
102 service = %service.name,
103 client = %client_addr,
104 backend = %backend,
105 "Proxying TCP connection"
106 );
107
108 let upstream = match tokio::net::TcpStream::connect(backend).await {
110 Ok(stream) => stream,
111 Err(e) => {
112 tracing::warn!(
113 error = %e,
114 backend = %backend,
115 service = %service.name,
116 client = %client_addr,
117 "Failed to connect to TCP backend"
118 );
119 return;
120 }
121 };
122
123 Self::duplex_raw(client_stream, upstream).await;
125 }
126
127 async fn duplex_raw(
132 mut downstream: tokio::net::TcpStream,
133 mut upstream: tokio::net::TcpStream,
134 ) {
135 match tokio::io::copy_bidirectional(&mut downstream, &mut upstream).await {
136 Ok((down_to_up, up_to_down)) => {
137 tracing::debug!(
138 down_to_up = down_to_up,
139 up_to_down = up_to_down,
140 "TCP tunnel closed"
141 );
142 }
143 Err(e) => {
144 tracing::debug!(error = %e, "TCP tunnel error");
145 }
146 }
147 }
148}