1use anyhow::Result;
6use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
7use tracing::{debug, error, warn};
8
9use crate::constants::buffer::COMMAND;
10use crate::pool::BufferPool;
11use crate::types::{BytesTransferred, TransferMetrics};
12
13pub enum ForwardResult {
15 NormalDisconnect(TransferMetrics),
17 BackendError(TransferMetrics),
19}
20
21#[allow(clippy::too_many_arguments)]
26pub async fn bidirectional_forward<R, W, B>(
27 client_reader: &mut R,
28 client_write: &mut W,
29 pooled_conn: &mut B,
30 buffer_pool: &BufferPool,
31 client_addr: std::net::SocketAddr,
32 client_to_backend_bytes: BytesTransferred,
33 backend_to_client_bytes: BytesTransferred,
34) -> Result<ForwardResult>
35where
36 R: AsyncBufReadExt + Unpin,
37 W: AsyncWriteExt + Unpin,
38 B: AsyncReadExt + AsyncWriteExt + Unpin,
39{
40 debug!(
41 "Client {} entering stateful bidirectional forwarding",
42 client_addr
43 );
44
45 let mut buffer_b2c = buffer_pool.get_buffer().await;
46 let mut command = String::with_capacity(COMMAND);
47
48 let mut c2b = client_to_backend_bytes;
49 let mut b2c = backend_to_client_bytes;
50
51 loop {
52 tokio::select! {
53 result = client_reader.read_line(&mut command) => {
55 match result {
56 Ok(0) => {
57 debug!("Client {} disconnected from stateful session", client_addr);
58 break;
59 }
60 Ok(n) => {
61 if let Err(e) = pooled_conn.write_all(command.as_bytes()).await {
62 let err: anyhow::Error = e.into();
63 if crate::pool::is_connection_error(&err) {
64 debug!("Backend write error for client {} ({}), removing connection from pool", client_addr, err);
65 return Ok(ForwardResult::BackendError(
66 TransferMetrics {
67 client_to_backend: c2b,
68 backend_to_client: b2c,
69 }
70 ));
71 }
72 debug!("Backend write error for client {}: {}", client_addr, err);
73 break;
74 }
75 c2b.add(n);
76 command.clear();
77 }
78 Err(e) => {
79 debug!("Client {} read error in stateful mode: {}", client_addr, e);
80 break;
81 }
82 }
83 }
84
85 n = buffer_b2c.read_from(pooled_conn) => {
87 match n {
88 Ok(0) => {
89 debug!("Backend disconnected while in stateful mode for client {}", client_addr);
90 break;
91 }
92 Ok(n) => {
93 if let Err(e) = client_write.write_all(&buffer_b2c[..n]).await {
94 debug!("Client write error for {}: {}", client_addr, e);
95 break;
96 }
97 b2c.add(n);
98 }
99 Err(e) => {
100 let err: anyhow::Error = e.into();
101 if crate::pool::is_connection_error(&err) {
102 debug!("Backend connection error for client {} ({}), removing from pool", client_addr, err);
103 return Ok(ForwardResult::BackendError(
104 TransferMetrics {
105 client_to_backend: c2b,
106 backend_to_client: b2c,
107 }
108 ));
109 }
110 debug!("Backend read error for client {}: {}", client_addr, err);
111 break;
112 }
113 }
114 }
115 }
116 }
117
118 Ok(ForwardResult::NormalDisconnect(TransferMetrics {
119 client_to_backend: c2b,
120 backend_to_client: b2c,
121 }))
122}
123
124pub fn log_client_error(
126 client_addr: std::net::SocketAddr,
127 error: &std::io::Error,
128 metrics: TransferMetrics,
129) {
130 let (c2b, b2c) = metrics.as_tuple();
131 match error.kind() {
132 std::io::ErrorKind::UnexpectedEof => {
133 debug!(
134 "Client {} closed connection (EOF) | ↑{} ↓{}",
135 client_addr,
136 crate::formatting::format_bytes(c2b),
137 crate::formatting::format_bytes(b2c)
138 );
139 }
140 std::io::ErrorKind::BrokenPipe => {
141 debug!(
142 "Client {} connection broken pipe | ↑{} ↓{}",
143 client_addr,
144 crate::formatting::format_bytes(c2b),
145 crate::formatting::format_bytes(b2c)
146 );
147 }
148 std::io::ErrorKind::ConnectionReset => {
149 warn!(
150 "Client {} connection reset | ↑{} ↓{}",
151 client_addr,
152 crate::formatting::format_bytes(c2b),
153 crate::formatting::format_bytes(b2c)
154 );
155 }
156 _ => {
157 warn!(
158 "Error reading from client {}: {} ({:?}) | ↑{} ↓{}",
159 client_addr,
160 error,
161 error.kind(),
162 crate::formatting::format_bytes(c2b),
163 crate::formatting::format_bytes(b2c)
164 );
165 }
166 }
167}
168
169pub fn log_routing_error(
171 client_addr: std::net::SocketAddr,
172 error: &std::io::Error,
173 command: &str,
174 metrics: TransferMetrics,
175 backend_id: crate::types::BackendId,
176) {
177 let (c2b, b2c) = metrics.as_tuple();
178 let trimmed = command.trim();
179 match error.kind() {
180 std::io::ErrorKind::BrokenPipe => {
181 warn!(
182 "Client {} disconnected during '{}' → {:?} (broken pipe) | ↑{} ↓{} | Client closed connection early",
183 client_addr,
184 trimmed,
185 backend_id,
186 crate::formatting::format_bytes(c2b),
187 crate::formatting::format_bytes(b2c)
188 );
189 }
190 std::io::ErrorKind::ConnectionReset => {
191 warn!(
192 "Client {} connection reset during '{}' → {:?} | ↑{} ↓{} | Network issue or client crash",
193 client_addr,
194 trimmed,
195 backend_id,
196 crate::formatting::format_bytes(c2b),
197 crate::formatting::format_bytes(b2c)
198 );
199 }
200 std::io::ErrorKind::ConnectionAborted => {
201 warn!(
202 "Client {} connection aborted during '{}' → {:?} | ↑{} ↓{} | Check debug logs for details",
203 client_addr,
204 trimmed,
205 backend_id,
206 crate::formatting::format_bytes(c2b),
207 crate::formatting::format_bytes(b2c)
208 );
209 }
210 _ => {
211 error!(
212 "Client {} error during '{}' → {:?}: {} ({:?}) | ↑{} ↓{} | Check debug logs",
213 client_addr,
214 trimmed,
215 backend_id,
216 error,
217 error.kind(),
218 crate::formatting::format_bytes(c2b),
219 crate::formatting::format_bytes(b2c)
220 );
221 }
222 }
223}