nntp_proxy/session/
connection.rs

1//! Connection utilities for stateful session handling
2//!
3//! Handles bidirectional forwarding and error logging for client-backend connections.
4
5use anyhow::Result;
6use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
7use tracing::{debug, error, warn};
8
9use crate::constants::buffer::COMMAND;
10use crate::pool::BufferPool;
11use crate::types::{BytesTransferred, TransferMetrics};
12
13/// Result of bidirectional forwarding
14pub enum ForwardResult {
15    /// Normal disconnection
16    NormalDisconnect(TransferMetrics),
17    /// Backend error - connection should be removed from pool
18    BackendError(TransferMetrics),
19}
20
21/// Bidirectional forwarding between client and backend in stateful mode
22///
23/// Uses tokio::select! to forward data in both directions until either side disconnects.
24/// Returns ForwardResult indicating whether connection should be removed from pool.
25#[allow(clippy::too_many_arguments)]
26pub async fn bidirectional_forward<R, W, B>(
27    client_reader: &mut R,
28    client_write: &mut W,
29    pooled_conn: &mut B,
30    buffer_pool: &BufferPool,
31    client_addr: std::net::SocketAddr,
32    client_to_backend_bytes: BytesTransferred,
33    backend_to_client_bytes: BytesTransferred,
34) -> Result<ForwardResult>
35where
36    R: AsyncBufReadExt + Unpin,
37    W: AsyncWriteExt + Unpin,
38    B: AsyncReadExt + AsyncWriteExt + Unpin,
39{
40    debug!(
41        "Client {} entering stateful bidirectional forwarding",
42        client_addr
43    );
44
45    let mut buffer_b2c = buffer_pool.get_buffer().await;
46    let mut command = String::with_capacity(COMMAND);
47
48    let mut c2b = client_to_backend_bytes;
49    let mut b2c = backend_to_client_bytes;
50
51    loop {
52        tokio::select! {
53            // Read from client and forward to backend
54            result = client_reader.read_line(&mut command) => {
55                match result {
56                    Ok(0) => {
57                        debug!("Client {} disconnected from stateful session", client_addr);
58                        break;
59                    }
60                    Ok(n) => {
61                        if let Err(e) = pooled_conn.write_all(command.as_bytes()).await {
62                            let err: anyhow::Error = e.into();
63                            if crate::pool::is_connection_error(&err) {
64                                debug!("Backend write error for client {} ({}), removing connection from pool", client_addr, err);
65                                return Ok(ForwardResult::BackendError(
66                                    TransferMetrics {
67                                        client_to_backend: c2b,
68                                        backend_to_client: b2c,
69                                    }
70                                ));
71                            }
72                            debug!("Backend write error for client {}: {}", client_addr, err);
73                            break;
74                        }
75                        c2b.add(n);
76                        command.clear();
77                    }
78                    Err(e) => {
79                        debug!("Client {} read error in stateful mode: {}", client_addr, e);
80                        break;
81                    }
82                }
83            }
84
85            // Read from backend and forward to client
86            n = buffer_b2c.read_from(pooled_conn) => {
87                match n {
88                    Ok(0) => {
89                        debug!("Backend disconnected while in stateful mode for client {}", client_addr);
90                        break;
91                    }
92                    Ok(n) => {
93                        if let Err(e) = client_write.write_all(&buffer_b2c[..n]).await {
94                            debug!("Client write error for {}: {}", client_addr, e);
95                            break;
96                        }
97                        b2c.add(n);
98                    }
99                    Err(e) => {
100                        let err: anyhow::Error = e.into();
101                        if crate::pool::is_connection_error(&err) {
102                            debug!("Backend connection error for client {} ({}), removing from pool", client_addr, err);
103                            return Ok(ForwardResult::BackendError(
104                                TransferMetrics {
105                                    client_to_backend: c2b,
106                                    backend_to_client: b2c,
107                                }
108                            ));
109                        }
110                        debug!("Backend read error for client {}: {}", client_addr, err);
111                        break;
112                    }
113                }
114            }
115        }
116    }
117
118    Ok(ForwardResult::NormalDisconnect(TransferMetrics {
119        client_to_backend: c2b,
120        backend_to_client: b2c,
121    }))
122}
123
124/// Log client disconnect/error with appropriate log level and context
125pub fn log_client_error(
126    client_addr: std::net::SocketAddr,
127    error: &std::io::Error,
128    metrics: TransferMetrics,
129) {
130    let (c2b, b2c) = metrics.as_tuple();
131    match error.kind() {
132        std::io::ErrorKind::UnexpectedEof => {
133            debug!(
134                "Client {} closed connection (EOF) | ↑{} ↓{}",
135                client_addr,
136                crate::formatting::format_bytes(c2b),
137                crate::formatting::format_bytes(b2c)
138            );
139        }
140        std::io::ErrorKind::BrokenPipe => {
141            debug!(
142                "Client {} connection broken pipe | ↑{} ↓{}",
143                client_addr,
144                crate::formatting::format_bytes(c2b),
145                crate::formatting::format_bytes(b2c)
146            );
147        }
148        std::io::ErrorKind::ConnectionReset => {
149            warn!(
150                "Client {} connection reset | ↑{} ↓{}",
151                client_addr,
152                crate::formatting::format_bytes(c2b),
153                crate::formatting::format_bytes(b2c)
154            );
155        }
156        _ => {
157            warn!(
158                "Error reading from client {}: {} ({:?}) | ↑{} ↓{}",
159                client_addr,
160                error,
161                error.kind(),
162                crate::formatting::format_bytes(c2b),
163                crate::formatting::format_bytes(b2c)
164            );
165        }
166    }
167}
168
169/// Log routing command error with detailed context
170pub fn log_routing_error(
171    client_addr: std::net::SocketAddr,
172    error: &std::io::Error,
173    command: &str,
174    metrics: TransferMetrics,
175    backend_id: crate::types::BackendId,
176) {
177    let (c2b, b2c) = metrics.as_tuple();
178    let trimmed = command.trim();
179    match error.kind() {
180        std::io::ErrorKind::BrokenPipe => {
181            warn!(
182                "Client {} disconnected during '{}' → {:?} (broken pipe) | ↑{} ↓{} | Client closed connection early",
183                client_addr,
184                trimmed,
185                backend_id,
186                crate::formatting::format_bytes(c2b),
187                crate::formatting::format_bytes(b2c)
188            );
189        }
190        std::io::ErrorKind::ConnectionReset => {
191            warn!(
192                "Client {} connection reset during '{}' → {:?} | ↑{} ↓{} | Network issue or client crash",
193                client_addr,
194                trimmed,
195                backend_id,
196                crate::formatting::format_bytes(c2b),
197                crate::formatting::format_bytes(b2c)
198            );
199        }
200        std::io::ErrorKind::ConnectionAborted => {
201            warn!(
202                "Client {} connection aborted during '{}' → {:?} | ↑{} ↓{} | Check debug logs for details",
203                client_addr,
204                trimmed,
205                backend_id,
206                crate::formatting::format_bytes(c2b),
207                crate::formatting::format_bytes(b2c)
208            );
209        }
210        _ => {
211            error!(
212                "Client {} error during '{}' → {:?}: {} ({:?}) | ↑{} ↓{} | Check debug logs",
213                client_addr,
214                trimmed,
215                backend_id,
216                error,
217                error.kind(),
218                crate::formatting::format_bytes(c2b),
219                crate::formatting::format_bytes(b2c)
220            );
221        }
222    }
223}