nntp_proxy/session/handlers/
standard.rs

1//! Standard 1:1 routing mode handler
2
3use crate::session::ClientSession;
4use anyhow::Result;
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
6use tokio::net::TcpStream;
7use tracing::{debug, warn};
8
9use crate::command::CommandHandler;
10use crate::constants::buffer::COMMAND;
11use crate::types::BytesTransferred;
12
13impl ClientSession {
14    /// Handle a client connection with a dedicated backend connection (standard 1:1 mode)
15    pub async fn handle_with_pooled_backend<T>(
16        &self,
17        mut client_stream: TcpStream,
18        backend_conn: T,
19    ) -> Result<(u64, u64)>
20    where
21        T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
22    {
23        use tokio::io::BufReader;
24
25        // Split streams for independent read/write
26        let (client_read, mut client_write) = client_stream.split();
27        let (mut backend_read, mut backend_write) = tokio::io::split(backend_conn);
28        let mut client_reader = BufReader::new(client_read);
29
30        let mut client_to_backend_bytes = BytesTransferred::zero();
31        let mut backend_to_client_bytes = BytesTransferred::zero();
32
33        // Reuse line buffer to avoid per-iteration allocations
34        let mut line = String::with_capacity(COMMAND);
35
36        // Auth state: username from AUTHINFO USER command
37        let mut auth_username: Option<String> = None;
38
39        // PERFORMANCE: Cache authenticated state to avoid atomic loads after auth succeeds
40        // Auth is disabled or auth happens once, then we skip checks for rest of session
41        let mut skip_auth_check = !self.auth_handler.is_enabled();
42
43        debug!("Client {} session loop starting", self.client_addr);
44
45        // Handle the initial command/response phase where we intercept auth
46        loop {
47            line.clear();
48            let mut buffer = self.buffer_pool.get_buffer().await;
49
50            tokio::select! {
51                // Read command from client
52                result = client_reader.read_line(&mut line) => {
53                    match result {
54                        Ok(0) => {
55                            debug!("Client {} disconnected (0 bytes read)", self.client_addr);
56                            break; // Client disconnected
57                        }
58                        Ok(n) => {
59                            debug!("Client {} sent {} bytes: {:?}", self.client_addr, n, line.trim());
60                            let trimmed = line.trim();
61                            debug!("Client {} command: {}", self.client_addr, trimmed);
62
63                            // PERFORMANCE OPTIMIZATION: Skip auth checking after first auth
64                            // Auth happens ONCE per session, then thousands of ARTICLE commands follow
65                            //
66                            // Cache the authenticated state to avoid atomic loads on every command.
67                            // Once authenticated, we never go back, so caching is safe.
68                            skip_auth_check = skip_auth_check || self.authenticated.load(std::sync::atomic::Ordering::Acquire);
69                            if skip_auth_check {
70                                // Already authenticated - just forward everything (HOT PATH)
71                                backend_write.write_all(line.as_bytes()).await?;
72                                client_to_backend_bytes.add(line.len());
73                            } else {
74                                // Not yet authenticated and auth is enabled - check for auth commands
75                                use crate::command::CommandAction;
76                                let action = CommandHandler::handle_command(&line);
77                                match action {
78                                    CommandAction::ForwardStateless => {
79                                        // Reject all non-auth commands before authentication
80                                        let response = b"480 Authentication required\r\n";
81                                        client_write.write_all(response).await?;
82                                        backend_to_client_bytes.add(response.len());
83                                    }
84                                    CommandAction::InterceptAuth(auth_action) => {
85                                        // Store username if this is AUTHINFO USER
86                                        if let crate::command::AuthAction::RequestPassword(ref username) = auth_action {
87                                            auth_username = Some(username.clone());
88                                        }
89
90                                        // Handle auth and validate
91                                        let (bytes, auth_success) = self
92                                            .auth_handler
93                                            .handle_auth_command(auth_action, &mut client_write, auth_username.as_deref())
94                                            .await?;
95                                        backend_to_client_bytes.add(bytes);
96
97                                        if auth_success {
98                                            self.authenticated.store(true, std::sync::atomic::Ordering::Release);
99                                        }
100                                    }
101                                    CommandAction::Reject(response) => {
102                                        // Send rejection response inline
103                                        client_write.write_all(response.as_bytes()).await?;
104                                        backend_to_client_bytes.add(response.len());
105                                    }
106                                }
107                            }
108                        }
109                        Err(e) => {
110                            warn!("Error reading from client {}: {}", self.client_addr, e);
111                            break;
112                        }
113                    }
114                }
115
116                // Read response from backend and forward to client (for non-auth commands)
117                n = buffer.read_from(&mut backend_read) => {
118                    match n {
119                        Ok(0) => {
120                            break; // Backend disconnected
121                        }
122                        Ok(n) => {
123                            client_write.write_all(&buffer[..n]).await?;
124                            backend_to_client_bytes.add(n);
125                        }
126                        Err(e) => {
127                            warn!("Error reading from backend for client {}: {}", self.client_addr, e);
128                            break;
129                        }
130                    }
131                }
132            }
133        }
134
135        Ok((
136            client_to_backend_bytes.as_u64(),
137            backend_to_client_bytes.as_u64(),
138        ))
139    }
140}