nntp_proxy/session/handlers/standard.rs
1//! Standard 1:1 routing mode handler
2
3use crate::session::ClientSession;
4use anyhow::Result;
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
6use tokio::net::TcpStream;
7use tracing::{debug, warn};
8
9use crate::command::CommandHandler;
10use crate::constants::buffer::COMMAND;
11use crate::types::BytesTransferred;
12
13impl ClientSession {
14 /// Handle a client connection with a dedicated backend connection (standard 1:1 mode)
15 pub async fn handle_with_pooled_backend<T>(
16 &self,
17 mut client_stream: TcpStream,
18 backend_conn: T,
19 ) -> Result<(u64, u64)>
20 where
21 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
22 {
23 use tokio::io::BufReader;
24
25 // Split streams for independent read/write
26 let (client_read, mut client_write) = client_stream.split();
27 let (mut backend_read, mut backend_write) = tokio::io::split(backend_conn);
28 let mut client_reader = BufReader::new(client_read);
29
30 let mut client_to_backend_bytes = BytesTransferred::zero();
31 let mut backend_to_client_bytes = BytesTransferred::zero();
32
33 // Reuse line buffer to avoid per-iteration allocations
34 let mut line = String::with_capacity(COMMAND);
35
36 // Auth state: username from AUTHINFO USER command
37 let mut auth_username: Option<String> = None;
38
39 // PERFORMANCE: Cache authenticated state to avoid atomic loads after auth succeeds
40 // Auth is disabled or auth happens once, then we skip checks for rest of session
41 let mut skip_auth_check = !self.auth_handler.is_enabled();
42
43 debug!("Client {} session loop starting", self.client_addr);
44
45 // Handle the initial command/response phase where we intercept auth
46 loop {
47 line.clear();
48 let mut buffer = self.buffer_pool.get_buffer().await;
49
50 tokio::select! {
51 // Read command from client
52 result = client_reader.read_line(&mut line) => {
53 match result {
54 Ok(0) => {
55 debug!("Client {} disconnected (0 bytes read)", self.client_addr);
56 break; // Client disconnected
57 }
58 Ok(n) => {
59 debug!("Client {} sent {} bytes: {:?}", self.client_addr, n, line.trim());
60 let trimmed = line.trim();
61 debug!("Client {} command: {}", self.client_addr, trimmed);
62
63 // PERFORMANCE OPTIMIZATION: Skip auth checking after first auth
64 // Auth happens ONCE per session, then thousands of ARTICLE commands follow
65 //
66 // Cache the authenticated state to avoid atomic loads on every command.
67 // Once authenticated, we never go back, so caching is safe.
68 skip_auth_check = skip_auth_check || self.authenticated.load(std::sync::atomic::Ordering::Acquire);
69 if skip_auth_check {
70 // Already authenticated - just forward everything (HOT PATH)
71 backend_write.write_all(line.as_bytes()).await?;
72 client_to_backend_bytes.add(line.len());
73 } else {
74 // Not yet authenticated and auth is enabled - check for auth commands
75 use crate::command::CommandAction;
76 let action = CommandHandler::handle_command(&line);
77 match action {
78 CommandAction::ForwardStateless => {
79 // Reject all non-auth commands before authentication
80 let response = b"480 Authentication required\r\n";
81 client_write.write_all(response).await?;
82 backend_to_client_bytes.add(response.len());
83 }
84 CommandAction::InterceptAuth(auth_action) => {
85 // Store username if this is AUTHINFO USER
86 if let crate::command::AuthAction::RequestPassword(ref username) = auth_action {
87 auth_username = Some(username.clone());
88 }
89
90 // Handle auth and validate
91 let (bytes, auth_success) = self
92 .auth_handler
93 .handle_auth_command(auth_action, &mut client_write, auth_username.as_deref())
94 .await?;
95 backend_to_client_bytes.add(bytes);
96
97 if auth_success {
98 self.authenticated.store(true, std::sync::atomic::Ordering::Release);
99 }
100 }
101 CommandAction::Reject(response) => {
102 // Send rejection response inline
103 client_write.write_all(response.as_bytes()).await?;
104 backend_to_client_bytes.add(response.len());
105 }
106 }
107 }
108 }
109 Err(e) => {
110 warn!("Error reading from client {}: {}", self.client_addr, e);
111 break;
112 }
113 }
114 }
115
116 // Read response from backend and forward to client (for non-auth commands)
117 n = buffer.read_from(&mut backend_read) => {
118 match n {
119 Ok(0) => {
120 break; // Backend disconnected
121 }
122 Ok(n) => {
123 client_write.write_all(&buffer[..n]).await?;
124 backend_to_client_bytes.add(n);
125 }
126 Err(e) => {
127 warn!("Error reading from backend for client {}: {}", self.client_addr, e);
128 break;
129 }
130 }
131 }
132 }
133 }
134
135 Ok((
136 client_to_backend_bytes.as_u64(),
137 backend_to_client_bytes.as_u64(),
138 ))
139 }
140}