Skip to main content

nntp_proxy/session/
mod.rs

1//! Session management module
2//!
3//! Handles client sessions with different routing modes.
4//!
5//! This module handles the lifecycle of a client connection, including
6//! command processing, authentication interception, and data transfer.
7//!
8//! # Quick Start
9//!
10//! ## Basic Stateful Session (1:1 mapping)
11//!
12//! ```no_run
13//! use std::net::SocketAddr;
14//! use std::sync::Arc;
15//! use nntp_proxy::session::ClientSession;
16//! use nntp_proxy::pool::BufferPool;
17//! use nntp_proxy::types::BufferSize;
18//! use nntp_proxy::auth::AuthHandler;
19//! use nntp_proxy::metrics::MetricsCollector;
20//!
21//! # fn example() -> anyhow::Result<()> {
22//! let client_addr: SocketAddr = "127.0.0.1:50000".parse()?;
23//! let buffer_pool = BufferPool::new(BufferSize::try_new(8192)?, 10);
24//! let auth = Arc::new(AuthHandler::new(None, None)?);
25//! let metrics = MetricsCollector::new(1);
26//!
27//! // Create a simple 1:1 session (no load balancing)
28//! let session = ClientSession::new(client_addr.into(), buffer_pool, auth, metrics);
29//! assert_eq!(session.mode(), nntp_proxy::session::SessionMode::Stateful);
30//! # Ok(())
31//! # }
32//! ```
33//!
34//! ## Per-Command Routing (Load Balancing)
35//!
36//! ```no_run
37//! use std::sync::Arc;
38//! use std::net::SocketAddr;
39//! use nntp_proxy::session::ClientSession;
40//! use nntp_proxy::pool::BufferPool;
41//! use nntp_proxy::router::BackendSelector;
42//! use nntp_proxy::config::RoutingMode;
43//! use nntp_proxy::types::BufferSize;
44//! use nntp_proxy::auth::AuthHandler;
45//! use nntp_proxy::metrics::MetricsCollector;
46//!
47//! # fn example() -> anyhow::Result<()> {
48//! let addr: SocketAddr = "127.0.0.1:50000".parse()?;
49//! let buffer_pool = BufferPool::new(BufferSize::try_new(8192)?, 10);
50//! let router = Arc::new(BackendSelector::new());
51//! let auth = Arc::new(AuthHandler::new(None, None)?);
52//! let metrics = MetricsCollector::new(1);
53//!
54//! // Each command routed to potentially different backend
55//! let session = ClientSession::builder(addr.into(), buffer_pool, auth, metrics)
56//!     .with_router(router)
57//!     .with_routing_mode(RoutingMode::PerCommand)
58//!     .build();
59//!
60//! assert!(session.is_per_command_routing());
61//! # Ok(())
62//! # }
63//! ```
64//!
65//! ## Hybrid Mode (Best of Both Worlds)
66//!
67//! ```no_run
68//! use std::sync::Arc;
69//! use std::net::SocketAddr;
70//! use nntp_proxy::session::ClientSession;
71//! use nntp_proxy::pool::BufferPool;
72//! use nntp_proxy::router::BackendSelector;
73//! use nntp_proxy::config::RoutingMode;
74//! use nntp_proxy::types::BufferSize;
75//! use nntp_proxy::auth::AuthHandler;
76//! use nntp_proxy::metrics::MetricsCollector;
77//!
78//! # fn example() -> anyhow::Result<()> {
79//! let addr: SocketAddr = "127.0.0.1:50000".parse()?;
80//! let buffer_pool = BufferPool::new(BufferSize::try_new(8192)?, 10);
81//! let router = Arc::new(BackendSelector::new());
82//! let auth = Arc::new(AuthHandler::new(None, None)?);
83//! let metrics = MetricsCollector::new(1);
84//!
85//! // Starts in per-command mode, auto-switches to stateful when needed
86//! let session = ClientSession::builder(addr.into(), buffer_pool, auth, metrics)
87//!     .with_router(router)
88//!     .with_routing_mode(RoutingMode::Hybrid)
89//!     .build();
90//!
91//! // Initially per-command for load balancing
92//! assert_eq!(session.mode(), nntp_proxy::session::SessionMode::PerCommand);
93//! // Will switch to Stateful automatically on GROUP, NEXT, LAST, etc.
94//! # Ok(())
95//! # }
96//! ```
97//!
98//! ## With Caching
99//!
100//! ```no_run
101//! use std::sync::Arc;
102//! use std::net::SocketAddr;
103//! use std::time::Duration;
104//! use nntp_proxy::session::ClientSession;
105//! use nntp_proxy::pool::BufferPool;
106//! use nntp_proxy::router::BackendSelector;
107//! use nntp_proxy::config::RoutingMode;
108//! use nntp_proxy::types::BufferSize;
109//! use nntp_proxy::auth::AuthHandler;
110//! use nntp_proxy::metrics::MetricsCollector;
111//! use nntp_proxy::cache::UnifiedCache;
112//!
113//! # fn example() -> anyhow::Result<()> {
114//! let addr: SocketAddr = "127.0.0.1:50000".parse()?;
115//! let buffer_pool = BufferPool::new(BufferSize::try_new(8192)?, 10);
116//! let router = Arc::new(BackendSelector::new());
117//! let auth = Arc::new(AuthHandler::new(None, None)?);
118//! let metrics = MetricsCollector::new(2); // 2 backends
119//! let cache = Arc::new(UnifiedCache::memory(1000, Duration::from_secs(3600), true));
120//!
121//! // Full-featured session with caching
122//! let session = ClientSession::builder(addr.into(), buffer_pool, auth, metrics)
123//!     .with_router(router)
124//!     .with_routing_mode(RoutingMode::Hybrid)
125//!     .with_cache(cache)
126//!     .build();
127//! # Ok(())
128//! # }
129//! ```
130//!
131//! # Architecture Overview
132//!
133//! ## Three Operating Modes
134//!
135//! 1. **Stateful (1:1) Mode** - `handle_with_pooled_backend()`
136//!    - One client maps to one backend connection for entire session
137//!    - Lowest latency, simplest model
138//!    - Used when routing_mode = Stateful
139//!
140//! 2. **Per-Command Mode (Stateless)** - `handle_per_command_routing()`
141//!    - Each command is independently routed to potentially different backends
142//!    - Enables load balancing across multiple backend servers
143//!    - Rejects stateful commands (GROUP, NEXT, LAST, etc.)
144//!    - Used when routing_mode = PerCommand
145//!
146//! 3. **Hybrid Mode** - `handle_per_command_routing()` + dynamic switching
147//!    - Starts in per-command mode (stateless) for load balancing
148//!    - Automatically switches to stateful mode when stateful command detected
149//!    - Best of both worlds: load balancing + stateful command support
150//!    - Used when routing_mode = Hybrid
151//!
152//! ## Key Functions
153//!
154//! - `handle_stateful_proxy_loop()` - **PERFORMANCE CRITICAL HOT PATH**
155//!   - Bidirectional streaming with tokio::select! for concurrent I/O
156//!   - Used by both stateful mode and hybrid mode after switching
157//!
158//! - `switch_to_stateful_mode()` - Hybrid mode transition
159//!   - Acquires dedicated backend connection
160//!   - Hands off to stateful proxy loop
161//!
162//! - `route_and_execute_command()` - Per-command orchestration
163//!   - Routes command to backend
164//!   - Handles connection pool management
165//!   - Distinguishes backend errors from client disconnects
166
167pub mod auth_state;
168pub mod backend;
169pub(crate) mod common;
170pub mod connection;
171pub mod error_classification;
172pub mod handlers;
173pub mod metrics_ext;
174pub mod mode_state;
175pub mod precheck;
176pub(crate) mod routing;
177pub mod state;
178pub mod streaming;
179
180use std::sync::Arc;
181
182use crate::auth::AuthHandler;
183use crate::config::RoutingMode;
184use crate::metrics::MetricsCollector;
185use crate::pool::BufferPool;
186use crate::router::BackendSelector;
187use crate::types::{ClientAddress, ClientId};
188
189pub use auth_state::AuthState;
190pub use metrics_ext::MetricsRecorder;
191pub use mode_state::{ModeState, SessionMode};
192pub use state::SessionLoopState;
193
194// SessionMode is now exported from mode_state module
195
196/// Represents an active client session
197pub struct ClientSession {
198    client_addr: ClientAddress,
199    buffer_pool: BufferPool,
200    /// Unique identifier for this client
201    client_id: ClientId,
202    /// Optional router for per-command routing mode
203    router: Option<Arc<BackendSelector>>,
204    /// Session mode state (encapsulates mode and routing mode)
205    mode_state: ModeState,
206    /// Authentication handler
207    auth_handler: Arc<AuthHandler>,
208    /// Authentication state (encapsulates auth status and username)
209    auth_state: AuthState,
210    /// Metrics collector for session statistics (always enabled)
211    metrics: crate::metrics::MetricsCollector,
212
213    /// Connection statistics aggregator for logging connection creation
214    connection_stats: Option<crate::metrics::ConnectionStatsAggregator>,
215
216    /// Article cache (always present - tracks backend availability even with capacity=0)
217    cache: Arc<crate::cache::UnifiedCache>,
218
219    /// Whether to cache article bodies (config-driven)
220    cache_articles: bool,
221
222    /// Whether to use adaptive availability prechecking for STAT/HEAD
223    adaptive_precheck: bool,
224}
225
226/// Builder for constructing `ClientSession` instances
227///
228/// Provides a fluent API for creating client sessions with different routing modes.
229///
230/// # Examples
231///
232/// ```
233/// use std::net::SocketAddr;
234/// use std::sync::Arc;
235/// use nntp_proxy::session::ClientSession;
236/// use nntp_proxy::pool::BufferPool;
237/// use nntp_proxy::router::BackendSelector;
238/// use nntp_proxy::config::RoutingMode;
239/// use nntp_proxy::types::BufferSize;
240/// use nntp_proxy::auth::AuthHandler;
241/// use nntp_proxy::metrics::MetricsCollector;
242///
243/// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
244/// let buffer_pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 10);
245/// let auth_handler = Arc::new(AuthHandler::new(None, None).unwrap());
246/// let metrics = MetricsCollector::new(1);
247///
248/// // Stateful 1:1 routing mode
249/// let session = ClientSession::builder(addr.into(), buffer_pool.clone(), auth_handler.clone(), metrics.clone())
250///     .build();
251///
252/// // Per-command routing mode
253/// let router = Arc::new(BackendSelector::new());
254/// let session = ClientSession::builder(addr.into(), buffer_pool.clone(), auth_handler, metrics)
255///     .with_router(router)
256///     .with_routing_mode(RoutingMode::PerCommand)
257///     .build();
258/// ```
259pub struct ClientSessionBuilder {
260    client_addr: ClientAddress,
261    buffer_pool: BufferPool,
262    router: Option<Arc<BackendSelector>>,
263    routing_mode: RoutingMode,
264    auth_handler: Arc<AuthHandler>,
265    metrics: MetricsCollector,
266    connection_stats: Option<crate::metrics::ConnectionStatsAggregator>,
267    cache: Arc<crate::cache::UnifiedCache>,
268    cache_articles: bool,
269    adaptive_precheck: bool,
270}
271
272impl ClientSessionBuilder {
273    /// Configure the session to use per-command routing with a backend router
274    ///
275    /// When a router is provided, the session will route each command independently
276    /// to potentially different backend servers.
277    #[must_use]
278    pub fn with_router(mut self, router: Arc<BackendSelector>) -> Self {
279        self.router = Some(router);
280        self
281    }
282
283    /// Set the routing mode for this session
284    ///
285    /// # Arguments
286    /// * `mode` - The routing mode (Stateful, PerCommand, or Hybrid)
287    ///
288    /// Note: If you use `with_router()`, you typically want PerCommand or Hybrid mode.
289    #[must_use]
290    pub fn with_routing_mode(mut self, mode: RoutingMode) -> Self {
291        self.routing_mode = mode;
292        self
293    }
294
295    /// Set the authentication handler
296    #[must_use]
297    pub fn with_auth_handler(mut self, auth_handler: Arc<AuthHandler>) -> Self {
298        self.auth_handler = auth_handler;
299        self
300    }
301
302    /// Add connection stats aggregation to this session
303    #[must_use]
304    pub fn with_connection_stats(
305        mut self,
306        connection_stats: crate::metrics::ConnectionStatsAggregator,
307    ) -> Self {
308        self.connection_stats = Some(connection_stats);
309        self
310    }
311
312    /// Add article cache to this session (always present for backend availability tracking)
313    #[must_use]
314    pub fn with_cache(mut self, cache: Arc<crate::cache::UnifiedCache>) -> Self {
315        self.cache = cache;
316        self
317    }
318
319    /// Set whether to cache article bodies (default: true)
320    ///
321    /// When false, only backend availability is tracked (saves memory).
322    /// When true, full article bodies are cached.
323    #[must_use]
324    pub fn with_cache_articles(mut self, cache: bool) -> Self {
325        self.cache_articles = cache;
326        self
327    }
328
329    /// Configure adaptive availability prechecking
330    pub fn with_adaptive_precheck(mut self, enable: bool) -> Self {
331        self.adaptive_precheck = enable;
332        self
333    }
334
335    /// Build the client session
336    ///
337    /// Creates a new `ClientSession` with a unique client ID and the configured
338    /// routing mode.
339    #[must_use]
340    pub fn build(self) -> ClientSession {
341        let (mode, routing_mode) = match (&self.router, self.routing_mode) {
342            // Per-command or Hybrid: start in per-command mode (stateless)
343            (Some(_), RoutingMode::PerCommand | RoutingMode::Hybrid) => {
344                (SessionMode::PerCommand, self.routing_mode)
345            }
346            // Stateful mode with router: honor the Stateful mode request
347            (Some(_), RoutingMode::Stateful) => (SessionMode::Stateful, RoutingMode::Stateful),
348            // No router: always Stateful mode
349            (None, _) => (SessionMode::Stateful, RoutingMode::Stateful),
350        };
351
352        ClientSession {
353            client_addr: self.client_addr,
354            buffer_pool: self.buffer_pool,
355            client_id: ClientId::new(),
356            router: self.router,
357            mode_state: ModeState::new(mode, routing_mode),
358            auth_handler: self.auth_handler,
359            auth_state: AuthState::new(),
360            metrics: self.metrics,
361            connection_stats: self.connection_stats,
362            cache: self.cache,
363            cache_articles: self.cache_articles,
364            adaptive_precheck: self.adaptive_precheck,
365        }
366    }
367}
368
369impl ClientSession {
370    /// Create default cache for availability tracking only (no content caching)
371    fn default_cache() -> Arc<crate::cache::UnifiedCache> {
372        const DEFAULT_TTL: std::time::Duration = std::time::Duration::from_secs(3600);
373        Arc::new(crate::cache::UnifiedCache::memory(0, DEFAULT_TTL, false))
374    }
375
376    /// Create a new client session for 1:1 backend mapping
377    #[must_use]
378    pub fn new(
379        client_addr: ClientAddress,
380        buffer_pool: BufferPool,
381        auth_handler: Arc<AuthHandler>,
382        metrics: MetricsCollector,
383    ) -> Self {
384        Self {
385            client_addr,
386            buffer_pool,
387            client_id: ClientId::new(),
388            router: None,
389            mode_state: ModeState::new(SessionMode::Stateful, RoutingMode::Stateful),
390            auth_handler,
391            auth_state: AuthState::new(),
392            metrics,
393            connection_stats: None,
394            cache: Self::default_cache(),
395            cache_articles: true,
396            adaptive_precheck: false,
397        }
398    }
399
400    /// Create a new client session with per-command routing
401    ///
402    /// Each command will be routed to a potentially different backend server
403    /// using round-robin load balancing.
404    #[must_use]
405    pub fn new_with_router(
406        client_addr: ClientAddress,
407        buffer_pool: BufferPool,
408        router: Arc<BackendSelector>,
409        routing_mode: RoutingMode,
410        auth_handler: Arc<AuthHandler>,
411        metrics: MetricsCollector,
412    ) -> Self {
413        Self {
414            client_addr,
415            buffer_pool,
416            client_id: ClientId::new(),
417            router: Some(router),
418            mode_state: ModeState::new(SessionMode::PerCommand, routing_mode),
419            auth_handler,
420            auth_state: AuthState::new(),
421            metrics,
422            connection_stats: None,
423            cache: Arc::new(crate::cache::UnifiedCache::memory(
424                0,
425                std::time::Duration::from_secs(3600),
426                false,
427            )),
428            cache_articles: true,
429            adaptive_precheck: false,
430        }
431    }
432
433    /// Create a builder for constructing a client session
434    ///
435    /// # Examples
436    ///
437    /// ```
438    /// use std::net::SocketAddr;
439    /// use std::sync::Arc;
440    /// use nntp_proxy::session::ClientSession;
441    /// use nntp_proxy::pool::BufferPool;
442    /// use nntp_proxy::types::BufferSize;
443    /// use nntp_proxy::auth::AuthHandler;
444    /// use nntp_proxy::metrics::MetricsCollector;
445    ///
446    /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
447    /// let buffer_pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 10);
448    /// let auth_handler = Arc::new(AuthHandler::new(None, None).unwrap());
449    /// let metrics = MetricsCollector::new(1);
450    ///
451    /// let session = ClientSession::builder(addr.into(), buffer_pool, auth_handler, metrics)
452    ///     .build();
453    /// ```
454    #[must_use]
455    pub fn builder(
456        client_addr: ClientAddress,
457        buffer_pool: BufferPool,
458        auth_handler: Arc<AuthHandler>,
459        metrics: MetricsCollector,
460    ) -> ClientSessionBuilder {
461        ClientSessionBuilder {
462            client_addr,
463            buffer_pool,
464            router: None,
465            routing_mode: RoutingMode::Stateful,
466            auth_handler,
467            metrics,
468            connection_stats: None,
469            cache: Arc::new(crate::cache::UnifiedCache::memory(
470                0,
471                std::time::Duration::from_secs(3600),
472                false,
473            )),
474            cache_articles: true,
475            adaptive_precheck: false,
476        }
477    }
478
479    // Getters and helper methods
480
481    /// Get the unique client ID
482    #[must_use]
483    #[inline]
484    pub fn client_id(&self) -> ClientId {
485        self.client_id
486    }
487
488    /// Check if this session is using per-command routing
489    ///
490    /// Returns true if this session has a router available (regardless of current mode).
491    /// This is slightly different from checking routing mode - a session can have a router
492    /// but be in Stateful mode (e.g., after hybrid mode switches).
493    #[must_use]
494    #[inline]
495    pub fn is_per_command_routing(&self) -> bool {
496        self.router.is_some()
497    }
498
499    /// Get the current session mode
500    #[must_use]
501    #[inline]
502    pub fn mode(&self) -> SessionMode {
503        self.mode_state.mode()
504    }
505
506    /// Get the authenticated username (if any) - zero-cost reference
507    ///
508    /// Returns the authenticated username as an `Arc<str>` for cheap cloning.
509    /// Returns None if the client has not authenticated yet.
510    #[inline]
511    #[must_use]
512    pub fn username(&self) -> Option<Arc<str>> {
513        self.auth_state.username()
514    }
515
516    /// Set the authenticated username (write-once)
517    ///
518    /// This marks the session as authenticated and stores the username.
519    /// Called after successful authentication with the backend.
520    pub(crate) fn set_username(&self, username: Option<String>) {
521        if let Some(name) = username {
522            self.auth_state.mark_authenticated(name);
523        }
524    }
525
526    /// Get the connection stats aggregator (if enabled)
527    #[must_use]
528    #[inline]
529    pub(crate) fn connection_stats(&self) -> Option<&crate::metrics::ConnectionStatsAggregator> {
530        self.connection_stats.as_ref()
531    }
532
533    /// Check if already authenticated (cached for performance)
534    ///
535    /// # Arguments
536    /// * `skip_auth_check` - If true, bypasses the authentication check
537    ///
538    /// # Returns
539    /// Returns true if authenticated or if skip_auth_check is true
540    #[inline]
541    pub(crate) fn is_authenticated_cached(&self, skip_auth_check: bool) -> bool {
542        self.auth_state.is_authenticated_or_skipped(skip_auth_check)
543    }
544}
545
546#[cfg(test)]
547mod tests {
548    use super::*;
549    use crate::auth::AuthHandler;
550    use crate::metrics::MetricsCollector;
551    use crate::types::BufferSize;
552    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
553    use std::sync::Arc;
554
555    /// Helper to create a default AuthHandler for tests (no auth)
556    fn test_auth_handler() -> Arc<AuthHandler> {
557        Arc::new(AuthHandler::new(None, None).unwrap())
558    }
559
560    /// Helper to create a MetricsCollector for tests
561    fn test_metrics() -> MetricsCollector {
562        MetricsCollector::new(1)
563    }
564
565    #[test]
566    fn test_client_session_creation() {
567        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
568        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
569        let session = ClientSession::new(
570            addr.into(),
571            buffer_pool.clone(),
572            test_auth_handler(),
573            test_metrics(),
574        );
575
576        assert_eq!(session.client_addr.port(), 8080);
577        assert_eq!(
578            session.client_addr.ip(),
579            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
580        );
581    }
582
583    #[test]
584    fn test_client_session_with_different_ports() {
585        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
586
587        let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
588        let session1 = ClientSession::new(
589            addr1.into(),
590            buffer_pool.clone(),
591            test_auth_handler(),
592            test_metrics(),
593        );
594
595        let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9090);
596        let session2 = ClientSession::new(
597            addr2.into(),
598            buffer_pool.clone(),
599            test_auth_handler(),
600            test_metrics(),
601        );
602
603        assert_ne!(session1.client_addr.port(), session2.client_addr.port());
604        assert_eq!(session1.client_addr.port(), 8080);
605        assert_eq!(session2.client_addr.port(), 9090);
606    }
607
608    #[test]
609    fn test_client_session_with_ipv6() {
610        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
611        let addr = SocketAddr::new(IpAddr::V6("::1".parse().unwrap()), 8119);
612        let session = ClientSession::new(
613            addr.into(),
614            buffer_pool,
615            test_auth_handler(),
616            test_metrics(),
617        );
618
619        assert_eq!(session.client_addr.port(), 8119);
620        assert!(session.client_addr.is_ipv6());
621    }
622
623    #[test]
624    fn test_buffer_pool_cloning() {
625        let buffer_pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 10);
626        let buffer_pool_clone = buffer_pool.clone();
627
628        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 1234);
629        let _session1 = ClientSession::new(
630            addr.into(),
631            buffer_pool,
632            test_auth_handler(),
633            test_metrics(),
634        );
635        let _session2 = ClientSession::new(
636            addr.into(),
637            buffer_pool_clone,
638            test_auth_handler(),
639            test_metrics(),
640        );
641    }
642
643    #[test]
644    fn test_session_addr_formatting() {
645        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 5555);
646        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
647        let session = ClientSession::new(
648            addr.into(),
649            buffer_pool,
650            test_auth_handler(),
651            test_metrics(),
652        );
653
654        let addr_str = format!("{}", session.client_addr);
655        assert!(addr_str.contains("10.0.0.1"));
656        assert!(addr_str.contains("5555"));
657    }
658
659    #[test]
660    fn test_multiple_sessions_same_buffer_pool() {
661        let buffer_pool = BufferPool::new(BufferSize::try_new(4096).unwrap(), 8);
662        let sessions: Vec<_> = (0..5)
663            .map(|i| {
664                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8000 + i);
665                ClientSession::new(
666                    addr.into(),
667                    buffer_pool.clone(),
668                    test_auth_handler(),
669                    test_metrics(),
670                )
671            })
672            .collect();
673
674        assert_eq!(sessions.len(), 5);
675        for (i, session) in sessions.iter().enumerate() {
676            assert_eq!(session.client_addr.port(), 8000 + i as u16);
677        }
678    }
679
680    #[test]
681    fn test_loopback_address() {
682        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
683        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8119);
684        let session = ClientSession::new(
685            addr.into(),
686            buffer_pool,
687            test_auth_handler(),
688            test_metrics(),
689        );
690
691        assert!(session.client_addr.ip().is_loopback());
692    }
693
694    #[test]
695    fn test_unspecified_address() {
696        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
697        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
698        let session = ClientSession::new(
699            addr.into(),
700            buffer_pool,
701            test_auth_handler(),
702            test_metrics(),
703        );
704
705        assert!(session.client_addr.ip().is_unspecified());
706        assert_eq!(session.client_addr.port(), 0);
707    }
708
709    #[test]
710    fn test_session_without_router() {
711        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
712        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
713        let session = ClientSession::new(
714            addr.into(),
715            buffer_pool,
716            test_auth_handler(),
717            test_metrics(),
718        );
719
720        assert!(!session.is_per_command_routing());
721        assert_eq!(session.client_addr.port(), 8080);
722    }
723
724    #[test]
725    fn test_session_with_router() {
726        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
727        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
728        let router = Arc::new(BackendSelector::new());
729        let session = ClientSession::new_with_router(
730            addr.into(),
731            buffer_pool,
732            router,
733            RoutingMode::PerCommand,
734            test_auth_handler(),
735            test_metrics(),
736        );
737
738        assert!(session.is_per_command_routing());
739        assert_eq!(session.client_addr.port(), 8080);
740    }
741
742    #[test]
743    fn test_client_id_uniqueness() {
744        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
745        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
746
747        let session1 = ClientSession::new(
748            addr.into(),
749            buffer_pool.clone(),
750            test_auth_handler(),
751            test_metrics(),
752        );
753        let session2 = ClientSession::new(
754            addr.into(),
755            buffer_pool,
756            test_auth_handler(),
757            test_metrics(),
758        );
759
760        assert_ne!(session1.client_id(), session2.client_id());
761    }
762
763    #[test]
764    fn test_session_mode_enum() {
765        assert_eq!(SessionMode::PerCommand, SessionMode::PerCommand);
766        assert_eq!(SessionMode::Stateful, SessionMode::Stateful);
767        assert_ne!(SessionMode::PerCommand, SessionMode::Stateful);
768
769        let per_command = format!("{:?}", SessionMode::PerCommand);
770        let stateful = format!("{:?}", SessionMode::Stateful);
771        assert!(per_command.contains("PerCommand"));
772        assert!(stateful.contains("Stateful"));
773    }
774
775    #[test]
776    fn test_hybrid_session_creation() {
777        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
778        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
779        let router = Arc::new(BackendSelector::new());
780
781        let session = ClientSession::new_with_router(
782            addr.into(),
783            buffer_pool,
784            router,
785            RoutingMode::Hybrid,
786            test_auth_handler(),
787            test_metrics(),
788        );
789
790        assert!(session.is_per_command_routing());
791        assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
792        assert_eq!(session.mode(), SessionMode::PerCommand);
793    }
794
795    #[test]
796    fn test_routing_mode_configurations() {
797        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
798        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
799        let router = Arc::new(BackendSelector::new());
800
801        // Stateful mode
802        let session = ClientSession::new_with_router(
803            addr.into(),
804            buffer_pool.clone(),
805            router.clone(),
806            RoutingMode::Stateful,
807            test_auth_handler(),
808            test_metrics(),
809        );
810        assert!(session.is_per_command_routing());
811        assert_eq!(session.mode_state.routing_mode(), RoutingMode::Stateful);
812
813        // PerCommand mode
814        let session = ClientSession::new_with_router(
815            addr.into(),
816            buffer_pool.clone(),
817            router.clone(),
818            RoutingMode::PerCommand,
819            test_auth_handler(),
820            test_metrics(),
821        );
822        assert!(session.is_per_command_routing());
823        assert_eq!(session.mode_state.routing_mode(), RoutingMode::PerCommand);
824        assert_eq!(session.mode(), SessionMode::PerCommand);
825
826        // Hybrid mode
827        let session = ClientSession::new_with_router(
828            addr.into(),
829            buffer_pool,
830            router,
831            RoutingMode::Hybrid,
832            test_auth_handler(),
833            test_metrics(),
834        );
835        assert!(session.is_per_command_routing());
836        assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
837        assert_eq!(session.mode(), SessionMode::PerCommand);
838    }
839
840    #[test]
841    fn test_hybrid_mode_initial_state() {
842        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
843        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
844        let router = Arc::new(BackendSelector::new());
845
846        let session = ClientSession::new_with_router(
847            addr.into(),
848            buffer_pool,
849            router,
850            RoutingMode::Hybrid,
851            test_auth_handler(),
852            test_metrics(),
853        );
854
855        assert_eq!(session.mode(), SessionMode::PerCommand);
856        assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
857        assert!(session.is_per_command_routing());
858    }
859
860    #[test]
861    fn test_is_per_command_routing_logic() {
862        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
863        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
864        let router = Arc::new(BackendSelector::new());
865
866        // Stateful mode has router capability
867        let session = ClientSession::new_with_router(
868            addr.into(),
869            buffer_pool.clone(),
870            router.clone(),
871            RoutingMode::Stateful,
872            test_auth_handler(),
873            test_metrics(),
874        );
875        assert!(session.is_per_command_routing());
876
877        // PerCommand mode
878        let session = ClientSession::new_with_router(
879            addr.into(),
880            buffer_pool.clone(),
881            router.clone(),
882            RoutingMode::PerCommand,
883            test_auth_handler(),
884            test_metrics(),
885        );
886        assert!(session.is_per_command_routing());
887
888        // Hybrid mode (initially)
889        let session = ClientSession::new_with_router(
890            addr.into(),
891            buffer_pool.clone(),
892            router,
893            RoutingMode::Hybrid,
894            test_auth_handler(),
895            test_metrics(),
896        );
897        assert!(session.is_per_command_routing());
898
899        // Session without router
900        let session = ClientSession::new(
901            addr.into(),
902            buffer_pool,
903            test_auth_handler(),
904            test_metrics(),
905        );
906        assert!(!session.is_per_command_routing());
907    }
908
909    // ==================== ClientSessionBuilder Tests ====================
910
911    #[test]
912    fn test_builder_basic_construction() {
913        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
914        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
915        let auth_handler = test_auth_handler();
916
917        let session =
918            ClientSession::builder(addr.into(), buffer_pool, auth_handler, test_metrics()).build();
919
920        assert_eq!(*session.client_addr, addr);
921        assert!(!session.is_per_command_routing());
922        assert_eq!(session.mode(), SessionMode::Stateful);
923        assert_eq!(session.mode_state.routing_mode(), RoutingMode::Stateful);
924    }
925
926    #[test]
927    fn test_builder_with_router() {
928        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
929        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
930        let router = Arc::new(BackendSelector::new());
931
932        let session = ClientSession::builder(
933            addr.into(),
934            buffer_pool,
935            test_auth_handler(),
936            test_metrics(),
937        )
938        .with_router(router)
939        .with_routing_mode(RoutingMode::PerCommand)
940        .build();
941
942        assert!(session.is_per_command_routing());
943        assert_eq!(session.mode(), SessionMode::PerCommand);
944        assert_eq!(session.mode_state.routing_mode(), RoutingMode::PerCommand);
945    }
946
947    #[test]
948    fn test_builder_with_hybrid_mode() {
949        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
950        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
951        let router = Arc::new(BackendSelector::new());
952
953        let session = ClientSession::builder(
954            addr.into(),
955            buffer_pool,
956            test_auth_handler(),
957            test_metrics(),
958        )
959        .with_router(router)
960        .with_routing_mode(RoutingMode::Hybrid)
961        .build();
962
963        assert!(session.is_per_command_routing());
964        assert_eq!(session.mode(), SessionMode::PerCommand);
965        assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
966    }
967
968    #[test]
969    fn test_builder_with_stateful_mode() {
970        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
971        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
972        let router = Arc::new(BackendSelector::new());
973
974        // Builder with router but Stateful mode requested
975        let session = ClientSession::builder(
976            addr.into(),
977            buffer_pool,
978            test_auth_handler(),
979            test_metrics(),
980        )
981        .with_router(router)
982        .with_routing_mode(RoutingMode::Stateful)
983        .build();
984
985        assert!(session.is_per_command_routing());
986        assert_eq!(session.mode(), SessionMode::Stateful);
987        assert_eq!(session.mode_state.routing_mode(), RoutingMode::Stateful);
988    }
989
990    #[test]
991    fn test_builder_without_router_ignores_mode() {
992        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
993        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
994
995        // Request PerCommand mode but no router - should default to Stateful
996        let session = ClientSession::builder(
997            addr.into(),
998            buffer_pool,
999            test_auth_handler(),
1000            test_metrics(),
1001        )
1002        .with_routing_mode(RoutingMode::PerCommand)
1003        .build();
1004
1005        assert!(!session.is_per_command_routing());
1006        assert_eq!(session.mode(), SessionMode::Stateful);
1007        assert_eq!(session.mode_state.routing_mode(), RoutingMode::Stateful);
1008    }
1009
1010    #[test]
1011    fn test_builder_with_auth_handler() {
1012        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1013        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1014        let auth_handler =
1015            Arc::new(AuthHandler::new(Some("user".to_string()), Some("pass".to_string())).unwrap());
1016
1017        let session = ClientSession::builder(
1018            addr.into(),
1019            buffer_pool,
1020            test_auth_handler(),
1021            test_metrics(),
1022        )
1023        .with_auth_handler(auth_handler.clone())
1024        .build();
1025
1026        // Verify auth_handler is set (can't test internals, but creation succeeds)
1027        assert_eq!(*session.client_addr, addr);
1028    }
1029
1030    #[test]
1031    fn test_builder_with_metrics() {
1032        use crate::metrics::MetricsCollector;
1033
1034        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1035        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1036        let metrics = MetricsCollector::new(1); // 1 backend
1037
1038        let session =
1039            ClientSession::builder(addr.into(), buffer_pool, test_auth_handler(), metrics).build();
1040
1041        // Metrics is always present now, not optional
1042        let _ = session.metrics; // Just verify it exists
1043    }
1044
1045    #[test]
1046    fn test_builder_with_connection_stats() {
1047        use crate::metrics::ConnectionStatsAggregator;
1048
1049        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1050        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1051        let stats = ConnectionStatsAggregator::default();
1052
1053        let session = ClientSession::builder(
1054            addr.into(),
1055            buffer_pool,
1056            test_auth_handler(),
1057            test_metrics(),
1058        )
1059        .with_connection_stats(stats)
1060        .build();
1061
1062        assert!(session.connection_stats().is_some());
1063    }
1064
1065    #[test]
1066    fn test_builder_with_cache() {
1067        use crate::cache::UnifiedCache;
1068        use std::time::Duration;
1069
1070        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1071        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1072        let cache = Arc::new(UnifiedCache::memory(100, Duration::from_secs(3600), true));
1073
1074        let session = ClientSession::builder(
1075            addr.into(),
1076            buffer_pool,
1077            test_auth_handler(),
1078            test_metrics(),
1079        )
1080        .with_cache(cache.clone())
1081        .build();
1082
1083        // Cache is always present now (Arc not Option)
1084        assert_eq!(session.cache.capacity(), 100);
1085    }
1086
1087    #[test]
1088    fn test_builder_method_chaining() {
1089        use crate::metrics::MetricsCollector;
1090
1091        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1092        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1093        let router = Arc::new(BackendSelector::new());
1094        let metrics = MetricsCollector::new(1); // 1 backend
1095
1096        // Chain all builder methods
1097        let session =
1098            ClientSession::builder(addr.into(), buffer_pool, test_auth_handler(), metrics)
1099                .with_router(router)
1100                .with_routing_mode(RoutingMode::Hybrid)
1101                .build();
1102
1103        assert!(session.is_per_command_routing());
1104        assert_eq!(session.mode(), SessionMode::PerCommand);
1105        assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
1106        // Metrics is always present now, not optional
1107        let _ = session.metrics; // Just verify it exists
1108    }
1109
1110    // ==================== Session Business Logic Tests ====================
1111
1112    #[test]
1113    fn test_mode_getter() {
1114        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1115        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1116        let router = Arc::new(BackendSelector::new());
1117
1118        // Per-command mode
1119        let session = ClientSession::new_with_router(
1120            addr.into(),
1121            buffer_pool.clone(),
1122            router.clone(),
1123            RoutingMode::PerCommand,
1124            test_auth_handler(),
1125            test_metrics(),
1126        );
1127        assert_eq!(session.mode(), SessionMode::PerCommand);
1128
1129        // Stateful mode (no router)
1130        let session = ClientSession::new(
1131            addr.into(),
1132            buffer_pool,
1133            test_auth_handler(),
1134            test_metrics(),
1135        );
1136        assert_eq!(session.mode(), SessionMode::Stateful);
1137    }
1138
1139    #[test]
1140    fn test_username_initially_none() {
1141        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1142        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1143        let session = ClientSession::new(
1144            addr.into(),
1145            buffer_pool,
1146            test_auth_handler(),
1147            test_metrics(),
1148        );
1149
1150        assert!(session.username().is_none());
1151    }
1152
1153    #[test]
1154    fn test_set_username_and_get() {
1155        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1156        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1157        let session = ClientSession::new(
1158            addr.into(),
1159            buffer_pool,
1160            test_auth_handler(),
1161            test_metrics(),
1162        );
1163
1164        session.set_username(Some("testuser".to_string()));
1165
1166        let username = session.username();
1167        assert!(username.is_some());
1168        assert_eq!(username.as_deref(), Some("testuser"));
1169    }
1170
1171    #[test]
1172    fn test_set_username_none() {
1173        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1174        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1175        let session = ClientSession::new(
1176            addr.into(),
1177            buffer_pool,
1178            test_auth_handler(),
1179            test_metrics(),
1180        );
1181
1182        session.set_username(None);
1183
1184        assert!(session.username().is_none());
1185    }
1186
1187    #[test]
1188    fn test_username_cheap_clone() {
1189        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1190        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1191        let session = ClientSession::new(
1192            addr.into(),
1193            buffer_pool,
1194            test_auth_handler(),
1195            test_metrics(),
1196        );
1197
1198        session.set_username(Some("testuser".to_string()));
1199
1200        let username1 = session.username();
1201        let username2 = session.username();
1202
1203        assert!(username1.is_some());
1204        assert!(username2.is_some());
1205        assert_eq!(username1, username2);
1206        assert_eq!(username1.as_deref(), Some("testuser"));
1207    }
1208
1209    #[test]
1210    fn test_connection_stats_none_by_default() {
1211        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1212        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1213        let session = ClientSession::new(
1214            addr.into(),
1215            buffer_pool,
1216            test_auth_handler(),
1217            test_metrics(),
1218        );
1219
1220        assert!(session.connection_stats().is_none());
1221    }
1222
1223    #[test]
1224    fn test_connection_stats_with_aggregator() {
1225        use crate::metrics::ConnectionStatsAggregator;
1226
1227        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1228        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1229        let stats = ConnectionStatsAggregator::default();
1230
1231        let session = ClientSession::builder(
1232            addr.into(),
1233            buffer_pool,
1234            test_auth_handler(),
1235            test_metrics(),
1236        )
1237        .with_connection_stats(stats)
1238        .build();
1239
1240        assert!(session.connection_stats().is_some());
1241    }
1242
1243    // ==================== Metrics Helper Methods Tests ====================
1244
1245    #[test]
1246    fn test_metrics_direct_calls_no_metrics() {
1247        use crate::types::BackendId;
1248
1249        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1250        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1251        let session = ClientSession::new(
1252            addr.into(),
1253            buffer_pool,
1254            test_auth_handler(),
1255            test_metrics(),
1256        );
1257
1258        // Metrics is always present now
1259        session.metrics.record_command(BackendId::from_index(0));
1260        session.metrics.user_command(session.username().as_deref());
1261        session.metrics.stateful_session_started();
1262        session.metrics.stateful_session_ended();
1263        session
1264            .metrics
1265            .user_bytes_sent(session.username().as_deref(), 1024);
1266        session
1267            .metrics
1268            .user_bytes_received(session.username().as_deref(), 2048);
1269    }
1270
1271    #[test]
1272    fn test_metrics_direct_calls_with_metrics() {
1273        use crate::metrics::MetricsCollector;
1274        use crate::types::BackendId;
1275
1276        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1277        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1278        let metrics = MetricsCollector::new(1); // 1 backend
1279
1280        let session = ClientSession::builder(
1281            addr.into(),
1282            buffer_pool,
1283            test_auth_handler(),
1284            metrics.clone(),
1285        )
1286        .build();
1287
1288        session.set_username(Some("testuser".to_string()));
1289
1290        // Call metrics directly - should record
1291        session.metrics.record_command(BackendId::from_index(0));
1292        session.metrics.user_command(session.username().as_deref());
1293        session.metrics.stateful_session_started();
1294        session.metrics.stateful_session_ended();
1295        session
1296            .metrics
1297            .user_bytes_sent(session.username().as_deref(), 1024);
1298        session
1299            .metrics
1300            .user_bytes_received(session.username().as_deref(), 2048);
1301
1302        // Verify metrics were recorded (snapshot should have data)
1303        let snapshot = metrics.snapshot(None);
1304        assert!(!snapshot.backend_stats.is_empty());
1305        assert!(snapshot.backend_stats[0].total_commands.get() > 0);
1306    }
1307
1308    #[test]
1309    fn test_record_command_with_metrics() {
1310        use crate::metrics::MetricsCollector;
1311        use crate::types::BackendId;
1312
1313        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1314        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1315        let metrics = MetricsCollector::new(1); // 1 backend
1316
1317        let session = ClientSession::builder(
1318            addr.into(),
1319            buffer_pool,
1320            test_auth_handler(),
1321            metrics.clone(),
1322        )
1323        .build();
1324
1325        let backend_id = BackendId::from_index(0);
1326        session.metrics.record_command(backend_id);
1327
1328        let snapshot = metrics.snapshot(None);
1329        assert_eq!(snapshot.backend_stats[0].total_commands.get(), 1);
1330    }
1331
1332    #[test]
1333    fn test_user_bytes_tracking() {
1334        use crate::metrics::MetricsCollector;
1335
1336        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1337        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1338        let metrics = MetricsCollector::new(1); // 1 backend
1339
1340        let session = ClientSession::builder(
1341            addr.into(),
1342            buffer_pool,
1343            test_auth_handler(),
1344            metrics.clone(),
1345        )
1346        .build();
1347
1348        session.set_username(Some("testuser".to_string()));
1349        session
1350            .metrics
1351            .user_bytes_sent(session.username().as_deref(), 1024);
1352        session
1353            .metrics
1354            .user_bytes_received(session.username().as_deref(), 2048);
1355
1356        let snapshot = metrics.snapshot(None);
1357        let user_stats = snapshot
1358            .user_stats
1359            .iter()
1360            .find(|s| s.username == "testuser");
1361        assert!(user_stats.is_some());
1362
1363        let stats = user_stats.unwrap();
1364        assert_eq!(stats.bytes_sent.as_u64(), 1024);
1365        assert_eq!(stats.bytes_received.as_u64(), 2048);
1366    }
1367
1368    #[test]
1369    fn test_stateful_session_tracking() {
1370        use crate::metrics::MetricsCollector;
1371
1372        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1373        let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1374        let metrics = MetricsCollector::new(1); // 1 backend
1375
1376        let session = ClientSession::builder(
1377            addr.into(),
1378            buffer_pool,
1379            test_auth_handler(),
1380            metrics.clone(),
1381        )
1382        .build();
1383
1384        session.metrics.stateful_session_started();
1385
1386        let snapshot = metrics.snapshot(None);
1387        assert_eq!(snapshot.stateful_sessions, 1);
1388
1389        session.metrics.stateful_session_ended();
1390
1391        let snapshot = metrics.snapshot(None);
1392        assert_eq!(snapshot.stateful_sessions, 0);
1393    }
1394}