Skip to main content

nntp_proxy/
proxy.rs

1//! NNTP Proxy implementation
2//!
3//! This module contains the main `NntpProxy` struct which orchestrates
4//! connection handling, routing, and resource management.
5
6use anyhow::{Context, Result};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10use tokio::net::TcpStream;
11use tracing::{debug, info, warn};
12
13use crate::auth::AuthHandler;
14use crate::cache::{HybridCacheConfig, UnifiedCache};
15use crate::config::{Config, RoutingMode, Server};
16use crate::constants::buffer::{POOL, POOL_COUNT};
17use crate::metrics::{ConnectionStatsAggregator, MetricsCollector};
18use crate::network::NetworkOptimizer;
19use crate::pool::{BufferPool, ConnectionProvider, DeadpoolConnectionProvider, prewarm_pools};
20use crate::router;
21use crate::session::ClientSession;
22use crate::types::ClientAddress;
23use crate::types::{self, BufferSize, TransferMetrics};
24
25/// Builder for constructing an `NntpProxy` with optional configuration overrides
26///
27/// # Examples
28///
29/// Basic usage with defaults:
30/// ```no_run
31/// # #[tokio::main]
32/// # async fn main() -> anyhow::Result<()> {
33/// use nntp_proxy::{NntpProxyBuilder, Config, RoutingMode};
34/// use nntp_proxy::config::load_config;
35///
36/// let config = load_config("config.toml")?;
37/// let proxy = NntpProxyBuilder::new(config)
38///     .with_routing_mode(RoutingMode::Hybrid)
39///     .build()
40///     .await?;
41/// # Ok(())
42/// # }
43/// ```
44///
45/// With custom buffer pool size:
46/// ```no_run
47/// # #[tokio::main]
48/// # async fn main() -> anyhow::Result<()> {
49/// use nntp_proxy::{NntpProxyBuilder, Config, RoutingMode};
50/// use nntp_proxy::config::load_config;
51///
52/// let config = load_config("config.toml")?;
53/// let proxy = NntpProxyBuilder::new(config)
54///     .with_routing_mode(RoutingMode::PerCommand)
55///     .with_buffer_pool_size(512 * 1024)  // 512KB buffers
56///     .with_buffer_pool_count(64)         // 64 buffers
57///     .build()
58///     .await?;
59/// # Ok(())
60/// # }
61/// ```
62#[derive(Debug)]
63pub struct NntpProxyBuilder {
64    config: Config,
65    routing_mode: RoutingMode,
66    buffer_size: Option<usize>,
67    buffer_count: Option<usize>,
68}
69
70impl NntpProxyBuilder {
71    /// Create a new builder with the given configuration
72    ///
73    /// The routing mode defaults to `Standard` (1:1) mode.
74    #[must_use]
75    pub fn new(config: Config) -> Self {
76        Self {
77            config,
78            routing_mode: RoutingMode::Stateful,
79            buffer_size: None,
80            buffer_count: None,
81        }
82    }
83
84    /// Set the routing mode
85    ///
86    /// Available modes:
87    /// - `Standard`: 1:1 client-to-backend mapping (default)
88    /// - `PerCommand`: Each command routes to a different backend
89    /// - `Hybrid`: Starts in per-command mode, switches to stateful when needed
90    #[must_use]
91    pub fn with_routing_mode(mut self, mode: RoutingMode) -> Self {
92        self.routing_mode = mode;
93        self
94    }
95
96    /// Override the default buffer pool size (256KB)
97    ///
98    /// This affects the size of each buffer in the pool. Larger buffers
99    /// can improve throughput for large article transfers but use more memory.
100    #[must_use]
101    pub fn with_buffer_pool_size(mut self, size: usize) -> Self {
102        self.buffer_size = Some(size);
103        self
104    }
105
106    /// Override the default buffer pool count (32)
107    ///
108    /// This affects how many buffers are pre-allocated. Should roughly match
109    /// the expected number of concurrent connections.
110    #[must_use]
111    pub fn with_buffer_pool_count(mut self, count: usize) -> Self {
112        self.buffer_count = Some(count);
113        self
114    }
115
116    /// Build the `NntpProxy` instance
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if:
121    /// - No servers are configured
122    /// - Connection providers cannot be created
123    /// - Buffer size is zero
124    /// - Hybrid cache initialization fails (if disk cache configured)
125    pub async fn build(self) -> Result<NntpProxy> {
126        if self.config.servers.is_empty() {
127            anyhow::bail!("No servers configured in configuration");
128        }
129
130        // Use provided values or defaults
131        let buffer_size = self.buffer_size.unwrap_or(POOL);
132        let buffer_count = self.buffer_count.unwrap_or(POOL_COUNT);
133
134        // Create deadpool connection providers for each server
135        let connection_providers: Result<Vec<DeadpoolConnectionProvider>> = self
136            .config
137            .servers
138            .iter()
139            .map(|server| {
140                info!(
141                    "Configuring deadpool connection provider for '{}'",
142                    server.name
143                );
144                DeadpoolConnectionProvider::from_server_config(server)
145            })
146            .collect();
147
148        let connection_providers = connection_providers?;
149
150        let buffer_pool = BufferPool::new(
151            BufferSize::try_new(buffer_size)
152                .map_err(|_| anyhow::anyhow!("Buffer size must be non-zero"))?,
153            buffer_count,
154        );
155
156        // Create metrics collector (before moving servers)
157        let metrics = MetricsCollector::new(self.config.servers.len());
158
159        let servers = Arc::new(self.config.servers);
160
161        // Create backend selector and add all backends
162        let router = Arc::new({
163            use types::BackendId;
164            let backend_strategy = self.config.proxy.backend_selection;
165            connection_providers.iter().enumerate().fold(
166                router::BackendSelector::with_strategy(backend_strategy),
167                |mut r, (idx, provider)| {
168                    let backend_id = BackendId::from_index(idx);
169                    r.add_backend(
170                        backend_id,
171                        servers[idx].name.clone(),
172                        provider.clone(),
173                        servers[idx].tier,
174                    );
175                    r
176                },
177            )
178        });
179
180        // Create auth handler from config
181        let auth_handler = {
182            let all_users: Vec<(String, String)> = self
183                .config
184                .client_auth
185                .all_users()
186                .into_iter()
187                .map(|(u, p)| (u.to_string(), p.to_string()))
188                .collect();
189
190            if all_users.is_empty() {
191                Arc::new(AuthHandler::default())
192            } else {
193                Arc::new(AuthHandler::with_users(all_users).with_context(|| {
194                    "Invalid authentication configuration. \
195                             If you set username/password in config, they cannot be empty. \
196                             Remove them entirely to disable authentication."
197                })?)
198            }
199        };
200
201        // Create article cache (always enabled for availability tracking)
202        // If max_capacity=0, only tracks which backends have articles (no content caching)
203        let (cache, cache_articles) = if let Some(cache_config) = &self.config.cache {
204            let capacity = cache_config.max_capacity.as_u64();
205            let cache_articles = cache_config.cache_articles;
206
207            // Check if disk cache is configured
208            let cache = if let Some(disk_config) = &cache_config.disk {
209                // Create hybrid cache with disk tier
210                let hybrid_config = HybridCacheConfig {
211                    memory_capacity: capacity,
212                    disk_path: disk_config.path.clone(),
213                    disk_capacity: disk_config.capacity.as_u64(),
214                    shards: disk_config.shards,
215                    compression: disk_config.compression,
216                    ttl: cache_config.ttl,
217                    cache_articles,
218                };
219
220                info!(
221                    "Initializing hybrid cache: memory={}MB, disk={}GB at {:?}",
222                    capacity / (1024 * 1024),
223                    disk_config.capacity.as_u64() / (1024 * 1024 * 1024),
224                    disk_config.path
225                );
226
227                Arc::new(
228                    UnifiedCache::hybrid(hybrid_config)
229                        .await
230                        .context("Failed to initialize hybrid disk cache")?,
231                )
232            } else {
233                // Memory-only cache
234                Arc::new(UnifiedCache::memory(
235                    capacity,
236                    cache_config.ttl,
237                    cache_articles,
238                ))
239            };
240
241            if capacity > 0 {
242                if cache_articles {
243                    info!(
244                        "Article cache enabled: max_capacity={}, ttl={}s (full caching)",
245                        cache_config.max_capacity,
246                        cache_config.ttl.as_secs()
247                    );
248                } else {
249                    info!(
250                        "Article cache enabled: max_capacity={}, ttl={}s (availability-only, bodies not cached)",
251                        cache_config.max_capacity,
252                        cache_config.ttl.as_secs()
253                    );
254                }
255            } else {
256                info!("Backend availability tracking enabled (cache disabled, capacity=0)");
257            }
258            (cache, cache_articles)
259        } else {
260            debug!("Cache not configured, using availability-only mode (capacity=0)");
261            (
262                Arc::new(UnifiedCache::memory(0, Duration::from_secs(3600), false)),
263                true,
264            )
265        };
266
267        // Extract adaptive_precheck from cache config (default: false)
268        let adaptive_precheck = self
269            .config
270            .cache
271            .as_ref()
272            .map(|c| c.adaptive_precheck)
273            .unwrap_or(false);
274
275        let start_instant = Instant::now();
276
277        Ok(NntpProxy {
278            servers,
279            router,
280            connection_providers,
281            buffer_pool,
282            routing_mode: self.routing_mode,
283            auth_handler,
284            metrics,
285            connection_stats: ConnectionStatsAggregator::new(),
286            cache,
287            cache_articles,
288            adaptive_precheck,
289            last_activity_nanos: Arc::new(AtomicU64::new(0)),
290            active_clients: Arc::new(AtomicUsize::new(0)),
291            start_instant,
292        })
293    }
294
295    /// Build the `NntpProxy` instance (synchronous version)
296    ///
297    /// This version only supports memory-only cache. If disk cache is configured
298    /// in the config, it will be ignored and a warning will be logged.
299    /// Use `build()` for full disk cache support.
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if:
304    /// - No servers are configured
305    /// - Connection providers cannot be created
306    /// - Buffer size is zero
307    pub fn build_sync(self) -> Result<NntpProxy> {
308        if self.config.servers.is_empty() {
309            anyhow::bail!("No servers configured in configuration");
310        }
311
312        // Use provided values or defaults
313        let buffer_size = self.buffer_size.unwrap_or(POOL);
314        let buffer_count = self.buffer_count.unwrap_or(POOL_COUNT);
315
316        // Create deadpool connection providers for each server
317        let connection_providers: Result<Vec<DeadpoolConnectionProvider>> = self
318            .config
319            .servers
320            .iter()
321            .map(|server| {
322                info!(
323                    "Configuring deadpool connection provider for '{}'",
324                    server.name
325                );
326                DeadpoolConnectionProvider::from_server_config(server)
327            })
328            .collect();
329
330        let connection_providers = connection_providers?;
331
332        let buffer_pool = BufferPool::new(
333            BufferSize::try_new(buffer_size)
334                .map_err(|_| anyhow::anyhow!("Buffer size must be non-zero"))?,
335            buffer_count,
336        );
337
338        // Create metrics collector (before moving servers)
339        let metrics = MetricsCollector::new(self.config.servers.len());
340
341        let servers = Arc::new(self.config.servers);
342
343        // Create backend selector and add all backends
344        let router = Arc::new({
345            use types::BackendId;
346            let backend_strategy = self.config.proxy.backend_selection;
347            connection_providers.iter().enumerate().fold(
348                router::BackendSelector::with_strategy(backend_strategy),
349                |mut r, (idx, provider)| {
350                    let backend_id = BackendId::from_index(idx);
351                    r.add_backend(
352                        backend_id,
353                        servers[idx].name.clone(),
354                        provider.clone(),
355                        servers[idx].tier,
356                    );
357                    r
358                },
359            )
360        });
361
362        // Create auth handler from config
363        let auth_handler = {
364            let all_users: Vec<(String, String)> = self
365                .config
366                .client_auth
367                .all_users()
368                .into_iter()
369                .map(|(u, p)| (u.to_string(), p.to_string()))
370                .collect();
371
372            if all_users.is_empty() {
373                Arc::new(AuthHandler::default())
374            } else {
375                Arc::new(AuthHandler::with_users(all_users).with_context(|| {
376                    "Invalid authentication configuration. \
377                             If you set username/password in config, they cannot be empty. \
378                             Remove them entirely to disable authentication."
379                })?)
380            }
381        };
382
383        // Create article cache (memory-only in sync version)
384        let (cache, cache_articles) = if let Some(cache_config) = &self.config.cache {
385            let capacity = cache_config.max_capacity.as_u64();
386            let cache_articles = cache_config.cache_articles;
387
388            // Warn if disk cache is configured but we're using sync build
389            if cache_config.disk.is_some() {
390                warn!(
391                    "Disk cache configured but build_sync() called - using memory-only cache. Use build() for disk cache support."
392                );
393            }
394
395            // Memory-only cache
396            let cache = Arc::new(UnifiedCache::memory(
397                capacity,
398                cache_config.ttl,
399                cache_articles,
400            ));
401
402            if capacity > 0 {
403                if cache_articles {
404                    info!(
405                        "Article cache enabled: max_capacity={}, ttl={}s (full caching)",
406                        cache_config.max_capacity,
407                        cache_config.ttl.as_secs()
408                    );
409                } else {
410                    info!(
411                        "Article cache enabled: max_capacity={}, ttl={}s (availability-only, bodies not cached)",
412                        cache_config.max_capacity,
413                        cache_config.ttl.as_secs()
414                    );
415                }
416            } else {
417                info!("Backend availability tracking enabled (cache disabled, capacity=0)");
418            }
419            (cache, cache_articles)
420        } else {
421            debug!("Cache not configured, using availability-only mode (capacity=0)");
422            (
423                Arc::new(UnifiedCache::memory(0, Duration::from_secs(3600), false)),
424                true,
425            )
426        };
427
428        // Extract adaptive_precheck from cache config (default: false)
429        let adaptive_precheck = self
430            .config
431            .cache
432            .as_ref()
433            .map(|c| c.adaptive_precheck)
434            .unwrap_or(false);
435
436        let start_instant = Instant::now();
437
438        Ok(NntpProxy {
439            servers,
440            router,
441            connection_providers,
442            buffer_pool,
443            routing_mode: self.routing_mode,
444            auth_handler,
445            metrics,
446            connection_stats: ConnectionStatsAggregator::new(),
447            cache,
448            cache_articles,
449            adaptive_precheck,
450            last_activity_nanos: Arc::new(AtomicU64::new(0)),
451            active_clients: Arc::new(AtomicUsize::new(0)),
452            start_instant,
453        })
454    }
455}
456
457#[derive(Debug, Clone)]
458pub struct NntpProxy {
459    servers: Arc<Vec<Server>>,
460    /// Backend selector for round-robin load balancing
461    router: Arc<router::BackendSelector>,
462    /// Connection providers per server - easily swappable implementation
463    connection_providers: Vec<DeadpoolConnectionProvider>,
464    /// Buffer pool for I/O operations
465    buffer_pool: BufferPool,
466    /// Routing mode (Standard, PerCommand, or Hybrid)
467    routing_mode: RoutingMode,
468    /// Authentication handler for client auth interception
469    auth_handler: Arc<AuthHandler>,
470    /// Metrics collector for TUI and monitoring (always enabled)
471    metrics: MetricsCollector,
472    /// Connection statistics aggregator (reduces log spam)
473    connection_stats: ConnectionStatsAggregator,
474    /// Article cache (always present - tracks backend availability even with capacity=0)
475    cache: Arc<UnifiedCache>,
476    /// Whether to cache article bodies (config-driven)
477    cache_articles: bool,
478    /// Whether to use adaptive availability prechecking for STAT/HEAD
479    adaptive_precheck: bool,
480    /// Timestamp (as epoch nanos) when last client disconnected (for idle detection)
481    /// Uses epoch nanos since Instant isn't shareable across threads easily
482    last_activity_nanos: Arc<AtomicU64>,
483    /// Number of currently active client connections
484    active_clients: Arc<AtomicUsize>,
485    /// Reference instant for converting nanos to duration
486    start_instant: Instant,
487}
488
489/// Classify an error as a client disconnect (broken pipe/connection reset)
490///
491/// Returns true for errors that indicate the client disconnected normally,
492/// which should be logged at DEBUG level rather than WARN.
493///
494/// # Examples
495///
496/// ```
497/// use std::io::{Error, ErrorKind};
498/// use nntp_proxy::is_client_disconnect_error;
499///
500/// let broken_pipe = Error::from(ErrorKind::BrokenPipe);
501/// let wrapped = anyhow::Error::from(broken_pipe);
502/// assert!(is_client_disconnect_error(&wrapped));
503///
504/// let other_error = anyhow::anyhow!("some other error");
505/// assert!(!is_client_disconnect_error(&other_error));
506/// ```
507#[inline]
508pub fn is_client_disconnect_error(e: &anyhow::Error) -> bool {
509    crate::session::error_classification::ErrorClassifier::is_client_disconnect(e)
510}
511
512impl NntpProxy {
513    // Helper methods for session management
514
515    /// Idle timeout after which pools are cleared when a new client connects (5 minutes)
516    const IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60);
517
518    #[inline]
519    fn record_connection_opened(&self) {
520        self.metrics.connection_opened();
521    }
522
523    #[inline]
524    fn record_connection_closed(&self) {
525        self.metrics.connection_closed();
526    }
527
528    /// Increment active client count
529    ///
530    /// Call this when a new client connection is accepted.
531    #[inline]
532    fn increment_active_clients(&self) {
533        self.active_clients.fetch_add(1, Ordering::Relaxed);
534    }
535
536    /// Decrement active client count and update last activity timestamp
537    ///
538    /// Call this when a client connection closes.
539    #[inline]
540    fn decrement_active_clients(&self) {
541        let prev = self.active_clients.fetch_sub(1, Ordering::Relaxed);
542
543        // When last client disconnects, record the timestamp
544        if prev == 1 {
545            let nanos = self.start_instant.elapsed().as_nanos() as u64;
546            self.last_activity_nanos.store(nanos, Ordering::Relaxed);
547        }
548    }
549
550    /// Check if pools should be cleared due to idle timeout
551    ///
552    /// Returns true if pools were cleared.
553    /// Pools are cleared when:
554    /// 1. No clients are currently active
555    /// 2. The last activity was more than IDLE_TIMEOUT ago
556    ///
557    /// This prevents stale connections from accumulating during overnight idle periods.
558    fn check_and_clear_stale_pools(&self) -> bool {
559        // Fast path: if there are active clients, pools are in use
560        if self.active_clients.load(Ordering::Relaxed) > 0 {
561            return false;
562        }
563
564        let last_activity_nanos = self.last_activity_nanos.load(Ordering::Relaxed);
565
566        // If never been active, no need to clear
567        if last_activity_nanos == 0 {
568            return false;
569        }
570
571        let last_activity = Duration::from_nanos(last_activity_nanos);
572        let now = self.start_instant.elapsed();
573        let idle_duration = now.saturating_sub(last_activity);
574
575        if idle_duration > Self::IDLE_TIMEOUT {
576            info!(
577                idle_secs = idle_duration.as_secs(),
578                pool_count = self.connection_providers.len(),
579                "Clearing stale pool connections after idle timeout"
580            );
581
582            for provider in &self.connection_providers {
583                provider.clear_idle_connections();
584            }
585
586            true
587        } else {
588            false
589        }
590    }
591
592    /// Build a session with standard configuration (conditionally enables metrics)
593    fn build_session(
594        &self,
595        client_addr: ClientAddress,
596        router: Option<Arc<router::BackendSelector>>,
597        routing_mode: RoutingMode,
598        cache: Arc<UnifiedCache>,
599    ) -> ClientSession {
600        // Start with base builder
601        let builder = ClientSession::builder(
602            client_addr,
603            self.buffer_pool.clone(),
604            self.auth_handler.clone(),
605            self.metrics.clone(),
606        )
607        .with_routing_mode(routing_mode)
608        .with_connection_stats(self.connection_stats.clone())
609        .with_cache(cache)
610        .with_cache_articles(self.cache_articles)
611        .with_adaptive_precheck(self.adaptive_precheck);
612
613        // Apply optional router
614        let builder = match router {
615            Some(r) => builder.with_router(r),
616            None => builder,
617        };
618
619        builder.build()
620    }
621
622    /// Log session completion and record stats
623    fn log_session_completion(
624        &self,
625        client_addr: ClientAddress,
626        session_id: &str,
627        session: &ClientSession,
628        routing_mode: crate::config::RoutingMode,
629        metrics: &types::TransferMetrics,
630    ) {
631        self.connection_stats
632            .record_disconnection(session.username().as_deref(), routing_mode.short_name());
633
634        debug!(
635            "Session {} [{}] ↑{} ↓{}",
636            client_addr,
637            session_id,
638            crate::formatting::format_bytes(metrics.client_to_backend.as_u64()),
639            crate::formatting::format_bytes(metrics.backend_to_client.as_u64())
640        );
641    }
642
643    /// Create a new `NntpProxy` with the given configuration and routing mode
644    ///
645    /// This is a convenience method that uses the builder internally.
646    /// For more control over configuration, use [`NntpProxy::builder`].
647    ///
648    /// # Examples
649    ///
650    /// ```no_run
651    /// # use nntp_proxy::{NntpProxy, Config, RoutingMode};
652    /// # use nntp_proxy::config::load_config;
653    /// # #[tokio::main]
654    /// # async fn main() -> anyhow::Result<()> {
655    /// let config = load_config("config.toml")?;
656    /// let proxy = NntpProxy::new(config, RoutingMode::Hybrid).await?;
657    /// # Ok(())
658    /// # }
659    /// ```
660    pub async fn new(config: Config, routing_mode: RoutingMode) -> Result<Self> {
661        NntpProxyBuilder::new(config)
662            .with_routing_mode(routing_mode)
663            .build()
664            .await
665    }
666
667    /// Create a new `NntpProxy` synchronously (memory-only cache)
668    ///
669    /// This version only supports memory-only cache. If disk cache is configured,
670    /// it will be ignored. Use [`NntpProxy::new`] for full disk cache support.
671    ///
672    /// # Examples
673    ///
674    /// ```no_run
675    /// # use nntp_proxy::{NntpProxy, Config, RoutingMode};
676    /// # use nntp_proxy::config::load_config;
677    /// # fn main() -> anyhow::Result<()> {
678    /// let config = load_config("config.toml")?;
679    /// let proxy = NntpProxy::new_sync(config, RoutingMode::Hybrid)?;
680    /// # Ok(())
681    /// # }
682    /// ```
683    pub fn new_sync(config: Config, routing_mode: RoutingMode) -> Result<Self> {
684        NntpProxyBuilder::new(config)
685            .with_routing_mode(routing_mode)
686            .build_sync()
687    }
688
689    /// Create a builder for more fine-grained control over proxy configuration
690    ///
691    /// # Examples
692    ///
693    /// ```no_run
694    /// # use nntp_proxy::{NntpProxy, Config, RoutingMode};
695    /// # use nntp_proxy::config::load_config;
696    /// # #[tokio::main]
697    /// # async fn main() -> anyhow::Result<()> {
698    /// let config = load_config("config.toml")?;
699    /// let proxy = NntpProxy::builder(config)
700    ///     .with_routing_mode(RoutingMode::Hybrid)
701    ///     .with_buffer_pool_size(512 * 1024)
702    ///     .build()
703    ///     .await?;
704    /// # Ok(())
705    /// # }
706    /// ```
707    #[must_use]
708    pub fn builder(config: Config) -> NntpProxyBuilder {
709        NntpProxyBuilder::new(config)
710    }
711
712    /// Prewarm all connection pools before accepting clients
713    /// Creates all connections concurrently and returns when ready
714    pub async fn prewarm_connections(&self) -> Result<()> {
715        prewarm_pools(&self.connection_providers, &self.servers).await
716    }
717
718    /// Gracefully shutdown all connection pools
719    pub async fn graceful_shutdown(&self) {
720        info!("Initiating graceful shutdown...");
721
722        // Flush cache writes to disk before shutting down
723        // With WriteOnInsertion, writes are enqueued async - close() waits for completion
724        info!("Flushing disk cache writes...");
725        if let Err(e) = self.cache.close().await {
726            warn!("Error closing cache: {}", e);
727        }
728
729        info!("Shutting down connection pools...");
730        for provider in &self.connection_providers {
731            provider.graceful_shutdown().await;
732        }
733
734        info!("All connection pools have been shut down gracefully");
735    }
736
737    /// Get the list of servers
738    #[must_use]
739    #[inline]
740    pub fn servers(&self) -> &[Server] {
741        &self.servers
742    }
743
744    /// Get the router
745    #[must_use]
746    #[inline]
747    pub fn router(&self) -> &Arc<router::BackendSelector> {
748        &self.router
749    }
750
751    /// Get the connection providers
752    #[must_use]
753    #[inline]
754    pub fn connection_providers(&self) -> &[DeadpoolConnectionProvider] {
755        &self.connection_providers
756    }
757
758    /// Get the buffer pool
759    #[must_use]
760    #[inline]
761    pub fn buffer_pool(&self) -> &BufferPool {
762        &self.buffer_pool
763    }
764
765    /// Get the article cache (always present - capacity 0 if not configured)
766    #[must_use]
767    #[inline]
768    pub fn cache(&self) -> &Arc<UnifiedCache> {
769        &self.cache
770    }
771
772    /// Get the metrics collector
773    #[must_use]
774    #[inline]
775    pub fn metrics(&self) -> &MetricsCollector {
776        &self.metrics
777    }
778
779    /// Get connection stats aggregator
780    #[must_use]
781    #[inline]
782    pub fn connection_stats(&self) -> &ConnectionStatsAggregator {
783        &self.connection_stats
784    }
785
786    /// Log backend routing selection
787    #[inline]
788    fn log_routing_selection(
789        &self,
790        client_addr: ClientAddress,
791        backend_id: crate::types::BackendId,
792        server: &Server,
793    ) {
794        info!(
795            "Routing client {} to backend {:?} ({}:{})",
796            client_addr, backend_id, server.host, server.port
797        );
798    }
799
800    /// Log connection pool status for monitoring
801    #[inline]
802    fn log_pool_status(&self, server_idx: usize) {
803        let pool_status = self.connection_providers[server_idx].status();
804        debug!(
805            "Pool status for {}: {}/{} available, {} created",
806            self.servers[server_idx].name,
807            pool_status.available,
808            pool_status.max_size,
809            pool_status.created
810        );
811    }
812
813    /// Prepare stateful connection - route, greet, optimize
814    async fn prepare_stateful_connection(
815        &self,
816        client_stream: &mut TcpStream,
817        client_addr: ClientAddress,
818    ) -> Result<crate::types::BackendId> {
819        self.record_connection_opened();
820
821        let client_id = types::ClientId::new();
822        let backend_id = self.router.route_command(client_id, "")?;
823        let server_idx = backend_id.as_index();
824
825        self.log_routing_selection(client_addr, backend_id, &self.servers[server_idx]);
826        self.send_greeting(client_stream, client_addr).await?;
827        self.log_pool_status(server_idx);
828        self.apply_tcp_optimizations(client_stream);
829
830        Ok(backend_id)
831    }
832
833    /// Prepare per-command connection - record, greet, optimize
834    async fn prepare_per_command_connection(
835        &self,
836        client_stream: &mut TcpStream,
837        client_addr: ClientAddress,
838    ) -> Result<()> {
839        self.record_connection_opened();
840        self.send_greeting(client_stream, client_addr).await?;
841        self.apply_tcp_optimizations(client_stream);
842        Ok(())
843    }
844
845    /// Create session with router and cache configuration
846    #[inline]
847    fn create_session(
848        &self,
849        client_addr: ClientAddress,
850        router: Option<Arc<crate::router::BackendSelector>>,
851    ) -> ClientSession {
852        self.build_session(client_addr, router, self.routing_mode, self.cache.clone())
853    }
854
855    /// Generate short session ID for logging
856    #[inline]
857    fn generate_session_id(&self, session: &ClientSession) -> String {
858        crate::formatting::short_id(session.client_id().as_uuid())
859    }
860
861    /// Send greeting to client
862    #[inline]
863    async fn send_greeting(
864        &self,
865        client_stream: &mut TcpStream,
866        client_addr: ClientAddress,
867    ) -> Result<()> {
868        crate::protocol::send_proxy_greeting(client_stream, client_addr).await
869    }
870
871    /// Apply TCP optimizations to client socket
872    #[inline]
873    fn apply_tcp_optimizations(&self, client_stream: &TcpStream) {
874        use crate::network::TcpOptimizer;
875        TcpOptimizer::new(client_stream)
876            .optimize()
877            .map_err(|e| debug!("Failed to optimize client socket: {}", e))
878            .ok();
879    }
880
881    /// Get display name for current routing mode
882    #[inline]
883    fn routing_mode_display_name(&self) -> &'static str {
884        if self.cache.entry_count() > 0 {
885            "caching"
886        } else {
887            "per-command"
888        }
889    }
890
891    /// Finalize stateful session with metrics and cleanup
892    fn finalize_stateful_session(
893        &self,
894        metrics: Result<TransferMetrics>,
895        client_addr: ClientAddress,
896        session_id: &str,
897        session: &ClientSession,
898        backend_id: crate::types::BackendId,
899    ) -> Result<()> {
900        self.record_connection_if_unauthenticated(session);
901        self.router.complete_command(backend_id);
902        self.record_session_metrics(metrics, client_addr, session_id, session, Some(backend_id))?;
903        self.record_connection_closed();
904        Ok(())
905    }
906
907    /// Finalize per-command session with logging and cleanup
908    fn finalize_per_command_session(
909        &self,
910        metrics: Result<TransferMetrics>,
911        client_addr: ClientAddress,
912        session_id: &str,
913        session: &ClientSession,
914    ) -> Result<()> {
915        self.record_session_metrics(metrics, client_addr, session_id, session, None)?;
916        self.record_connection_closed();
917        Ok(())
918    }
919
920    /// Record connection for unauthenticated sessions only
921    #[inline]
922    fn record_connection_if_unauthenticated(&self, session: &ClientSession) {
923        if !self.auth_handler.is_enabled() || session.username().is_none() {
924            let mode = self.session_mode_label(session.mode());
925            self.connection_stats
926                .record_connection(session.username().as_deref(), mode);
927        }
928    }
929
930    /// Record session metrics and log completion or errors
931    fn record_session_metrics(
932        &self,
933        metrics: Result<TransferMetrics>,
934        client_addr: ClientAddress,
935        session_id: &str,
936        session: &ClientSession,
937        backend_id: Option<crate::types::BackendId>,
938    ) -> Result<()> {
939        match metrics {
940            Ok(m) => {
941                self.log_session_completion(
942                    client_addr,
943                    session_id,
944                    session,
945                    self.routing_mode,
946                    &m,
947                );
948
949                if let Some(bid) = backend_id {
950                    self.metrics
951                        .record_client_to_backend_bytes_for(bid, m.client_to_backend.as_u64());
952                    self.metrics
953                        .record_backend_to_client_bytes_for(bid, m.backend_to_client.as_u64());
954                }
955                Ok(())
956            }
957            Err(e) => {
958                if let Some(bid) = backend_id {
959                    self.metrics.record_error(bid);
960                }
961
962                // Only log non-client-disconnect errors (avoid spam from normal disconnects)
963                if !is_client_disconnect_error(&e) {
964                    warn!("Session error for client {}: {:?}", client_addr, e);
965                }
966                Err(e)
967            }
968        }
969    }
970
971    /// Get session mode label for logging
972    #[inline]
973    fn session_mode_label(&self, session_mode: crate::session::SessionMode) -> &'static str {
974        use crate::session::SessionMode;
975        match (session_mode, self.routing_mode) {
976            (SessionMode::PerCommand, _) => "per-command",
977            (SessionMode::Stateful, RoutingMode::Stateful) => "standard",
978            (SessionMode::Stateful, RoutingMode::Hybrid) => "hybrid",
979            (SessionMode::Stateful, _) => "stateful",
980        }
981    }
982
983    pub async fn handle_client(
984        &self,
985        mut client_stream: TcpStream,
986        client_addr: ClientAddress,
987    ) -> Result<()> {
988        debug!("New client connection from {}", client_addr);
989
990        // Check for stale pools before handling (lazy recreation after idle)
991        self.check_and_clear_stale_pools();
992        self.increment_active_clients();
993
994        let result = async {
995            let backend_id = self
996                .prepare_stateful_connection(&mut client_stream, client_addr)
997                .await?;
998            let server_idx = backend_id.as_index();
999
1000            let session = self.create_session(client_addr, None);
1001            let session_id = self.generate_session_id(&session);
1002
1003            debug!("Starting stateful session for client {}", client_addr);
1004
1005            let metrics = session
1006                .handle_stateful_session(
1007                    client_stream,
1008                    backend_id,
1009                    &self.connection_providers[server_idx],
1010                    &self.servers[server_idx].name,
1011                )
1012                .await;
1013
1014            self.finalize_stateful_session(metrics, client_addr, &session_id, &session, backend_id)
1015        }
1016        .await;
1017
1018        self.decrement_active_clients();
1019        result
1020    }
1021
1022    /// Handle client connection using per-command routing mode
1023    ///
1024    /// This creates a session with the router, allowing commands from this client
1025    /// to be routed to different backends based on load balancing.  
1026    pub async fn handle_client_per_command_routing(
1027        &self,
1028        client_stream: TcpStream,
1029        client_addr: ClientAddress,
1030    ) -> Result<()> {
1031        // Check for stale pools before handling (lazy recreation after idle)
1032        self.check_and_clear_stale_pools();
1033        self.increment_active_clients();
1034
1035        let result = self
1036            .handle_per_command_client(client_stream, client_addr)
1037            .await;
1038
1039        self.decrement_active_clients();
1040        result
1041    }
1042
1043    /// Handle a per-command routing session
1044    async fn handle_per_command_client(
1045        &self,
1046        mut client_stream: TcpStream,
1047        client_addr: ClientAddress,
1048    ) -> Result<()> {
1049        let mode_label = self.routing_mode_display_name();
1050        debug!(
1051            "New {} routing client connection from {}",
1052            mode_label, client_addr
1053        );
1054
1055        self.prepare_per_command_connection(&mut client_stream, client_addr)
1056            .await?;
1057
1058        let session = self.create_session(client_addr, Some(self.router.clone()));
1059        let session_id = self.generate_session_id(&session);
1060
1061        let metrics = session
1062            .handle_per_command_routing(client_stream)
1063            .await
1064            .with_context(|| {
1065                format!(
1066                    "{} routing session failed for {} [{}]",
1067                    mode_label, client_addr, session_id
1068                )
1069            });
1070
1071        self.finalize_per_command_session(metrics, client_addr, &session_id, &session)
1072    }
1073}
1074
1075#[cfg(test)]
1076mod tests {
1077    use super::*;
1078    use std::sync::Arc;
1079
1080    fn create_test_config() -> Config {
1081        use crate::config::{health_check_max_per_cycle, health_check_pool_timeout};
1082        use crate::types::{HostName, MaxConnections, Port, ServerName};
1083        Config {
1084            servers: vec![
1085                Server {
1086                    host: HostName::try_new("server1.example.com".to_string()).unwrap(),
1087                    port: Port::try_new(119).unwrap(),
1088                    name: ServerName::try_new("Test Server 1".to_string()).unwrap(),
1089                    username: None,
1090                    password: None,
1091                    max_connections: MaxConnections::try_new(5).unwrap(),
1092                    use_tls: false,
1093                    tls_verify_cert: true,
1094                    tls_cert_path: None,
1095                    connection_keepalive: None,
1096                    health_check_max_per_cycle: health_check_max_per_cycle(),
1097                    health_check_pool_timeout: health_check_pool_timeout(),
1098                    tier: 0,
1099                },
1100                Server {
1101                    host: HostName::try_new("server2.example.com".to_string()).unwrap(),
1102                    port: Port::try_new(119).unwrap(),
1103                    name: ServerName::try_new("Test Server 2".to_string()).unwrap(),
1104                    username: None,
1105                    password: None,
1106                    max_connections: MaxConnections::try_new(8).unwrap(),
1107                    use_tls: false,
1108                    tls_verify_cert: true,
1109                    tls_cert_path: None,
1110                    connection_keepalive: None,
1111                    health_check_max_per_cycle: health_check_max_per_cycle(),
1112                    health_check_pool_timeout: health_check_pool_timeout(),
1113                    tier: 0,
1114                },
1115                Server {
1116                    host: HostName::try_new("server3.example.com".to_string()).unwrap(),
1117                    port: Port::try_new(119).unwrap(),
1118                    name: ServerName::try_new("Test Server 3".to_string()).unwrap(),
1119                    username: None,
1120                    password: None,
1121                    max_connections: MaxConnections::try_new(12).unwrap(),
1122                    use_tls: false,
1123                    tls_verify_cert: true,
1124                    tls_cert_path: None,
1125                    connection_keepalive: None,
1126                    health_check_max_per_cycle: health_check_max_per_cycle(),
1127                    health_check_pool_timeout: health_check_pool_timeout(),
1128                    tier: 0,
1129                },
1130            ],
1131            ..Default::default()
1132        }
1133    }
1134
1135    #[test]
1136    fn test_proxy_creation_with_servers() {
1137        let config = create_test_config();
1138        let proxy = Arc::new(
1139            NntpProxy::new_sync(config, RoutingMode::Stateful).expect("Failed to create proxy"),
1140        );
1141
1142        assert_eq!(proxy.servers().len(), 3);
1143        assert_eq!(proxy.servers()[0].name.as_str(), "Test Server 1");
1144    }
1145
1146    #[test]
1147    fn test_proxy_creation_with_empty_servers() {
1148        let config = Config {
1149            servers: vec![],
1150            ..Default::default()
1151        };
1152        let result = NntpProxy::new_sync(config, RoutingMode::Stateful);
1153
1154        assert!(result.is_err());
1155        assert!(
1156            result
1157                .unwrap_err()
1158                .to_string()
1159                .contains("No servers configured")
1160        );
1161    }
1162
1163    #[test]
1164    fn test_proxy_has_router() {
1165        let config = create_test_config();
1166        let proxy = Arc::new(
1167            NntpProxy::new_sync(config, RoutingMode::Stateful).expect("Failed to create proxy"),
1168        );
1169
1170        // Proxy should have a router with backends
1171        assert_eq!(proxy.router.backend_count(), 3);
1172    }
1173
1174    #[test]
1175    fn test_builder_basic_usage() {
1176        let config = create_test_config();
1177        let proxy = NntpProxy::builder(config)
1178            .build_sync()
1179            .expect("Failed to build proxy");
1180
1181        assert_eq!(proxy.servers().len(), 3);
1182        assert_eq!(proxy.router.backend_count(), 3);
1183    }
1184
1185    #[test]
1186    fn test_builder_with_routing_mode() {
1187        let config = create_test_config();
1188        let proxy = NntpProxy::builder(config)
1189            .with_routing_mode(RoutingMode::PerCommand)
1190            .build_sync()
1191            .expect("Failed to build proxy");
1192
1193        assert_eq!(proxy.servers().len(), 3);
1194    }
1195
1196    #[test]
1197    fn test_builder_with_custom_buffer_pool() {
1198        let config = create_test_config();
1199        let proxy = NntpProxy::builder(config)
1200            .with_buffer_pool_size(512 * 1024)
1201            .with_buffer_pool_count(64)
1202            .build_sync()
1203            .expect("Failed to build proxy");
1204
1205        assert_eq!(proxy.servers().len(), 3);
1206        // Pool size and count are used internally but not exposed for verification
1207    }
1208
1209    #[test]
1210    fn test_builder_with_all_options() {
1211        let config = create_test_config();
1212        let proxy = NntpProxy::builder(config)
1213            .with_routing_mode(RoutingMode::Hybrid)
1214            .with_buffer_pool_size(1024 * 1024)
1215            .with_buffer_pool_count(16)
1216            .build_sync()
1217            .expect("Failed to build proxy");
1218
1219        assert_eq!(proxy.servers().len(), 3);
1220        assert_eq!(proxy.router.backend_count(), 3);
1221    }
1222
1223    #[test]
1224    fn test_builder_empty_servers_error() {
1225        let config = Config {
1226            servers: vec![],
1227            ..Default::default()
1228        };
1229        let result = NntpProxy::builder(config).build_sync();
1230
1231        assert!(result.is_err());
1232        assert!(
1233            result
1234                .unwrap_err()
1235                .to_string()
1236                .contains("No servers configured")
1237        );
1238    }
1239
1240    #[test]
1241    fn test_backward_compatibility_new() {
1242        // Ensure NntpProxy::new_sync() still works (it uses builder internally)
1243        let config = create_test_config();
1244        let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful)
1245            .expect("Failed to create proxy with new_sync()");
1246
1247        assert_eq!(proxy.servers().len(), 3);
1248        assert_eq!(proxy.router.backend_count(), 3);
1249    }
1250
1251    // Tests for is_client_disconnect_error function
1252    mod error_classification {
1253        use super::*;
1254        use std::io::{Error, ErrorKind};
1255
1256        #[test]
1257        fn test_broken_pipe_is_client_disconnect() {
1258            let io_err = Error::from(ErrorKind::BrokenPipe);
1259            let err = anyhow::Error::from(io_err);
1260            assert!(is_client_disconnect_error(&err));
1261        }
1262
1263        #[test]
1264        fn test_connection_reset_is_client_disconnect() {
1265            let io_err = Error::from(ErrorKind::ConnectionReset);
1266            let err = anyhow::Error::from(io_err);
1267            assert!(is_client_disconnect_error(&err));
1268        }
1269
1270        #[test]
1271        fn test_other_io_errors_not_client_disconnect() {
1272            let error_kinds = vec![
1273                ErrorKind::NotFound,
1274                ErrorKind::PermissionDenied,
1275                ErrorKind::ConnectionRefused,
1276                ErrorKind::ConnectionAborted,
1277                ErrorKind::AddrInUse,
1278                ErrorKind::AddrNotAvailable,
1279                ErrorKind::TimedOut,
1280                ErrorKind::Interrupted,
1281                ErrorKind::UnexpectedEof,
1282                ErrorKind::WouldBlock,
1283            ];
1284
1285            for kind in error_kinds {
1286                let io_err = Error::from(kind);
1287                let err = anyhow::Error::from(io_err);
1288                assert!(
1289                    !is_client_disconnect_error(&err),
1290                    "{:?} should not be classified as client disconnect",
1291                    kind
1292                );
1293            }
1294        }
1295
1296        #[test]
1297        fn test_non_io_error_not_client_disconnect() {
1298            let err = anyhow::anyhow!("generic error message");
1299            assert!(!is_client_disconnect_error(&err));
1300        }
1301
1302        #[test]
1303        fn test_wrapped_broken_pipe_error() {
1304            let io_err = Error::from(ErrorKind::BrokenPipe);
1305            let err = anyhow::Error::from(io_err).context("failed to write to client");
1306            assert!(is_client_disconnect_error(&err));
1307        }
1308
1309        #[test]
1310        fn test_wrapped_connection_reset_error() {
1311            let io_err = Error::from(ErrorKind::ConnectionReset);
1312            let err = anyhow::Error::from(io_err).context("failed to read from client");
1313            assert!(is_client_disconnect_error(&err));
1314        }
1315
1316        #[test]
1317        fn test_deeply_wrapped_error() {
1318            let io_err = Error::from(ErrorKind::BrokenPipe);
1319            let err = anyhow::Error::from(io_err)
1320                .context("inner context")
1321                .context("outer context");
1322            assert!(is_client_disconnect_error(&err));
1323        }
1324
1325        #[test]
1326        fn test_custom_io_error_message() {
1327            let io_err = Error::new(ErrorKind::BrokenPipe, "custom broken pipe");
1328            let err = anyhow::Error::from(io_err);
1329            assert!(is_client_disconnect_error(&err));
1330        }
1331    }
1332
1333    // Tests for new DRY helper methods
1334    mod helper_methods {
1335        use super::*;
1336        use crate::session::SessionMode;
1337
1338        #[test]
1339        fn test_session_mode_label_per_command() {
1340            let config = create_test_config();
1341            let proxy = NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap();
1342
1343            let label = proxy.session_mode_label(SessionMode::PerCommand);
1344            assert_eq!(label, "per-command");
1345        }
1346
1347        #[test]
1348        fn test_session_mode_label_stateful_standard() {
1349            let config = create_test_config();
1350            let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1351
1352            let label = proxy.session_mode_label(SessionMode::Stateful);
1353            assert_eq!(label, "standard");
1354        }
1355
1356        #[test]
1357        fn test_session_mode_label_stateful_hybrid() {
1358            let config = create_test_config();
1359            let proxy = NntpProxy::new_sync(config, RoutingMode::Hybrid).unwrap();
1360
1361            let label = proxy.session_mode_label(SessionMode::Stateful);
1362            assert_eq!(label, "hybrid");
1363        }
1364
1365        #[test]
1366        fn test_routing_mode_display_name_caching() {
1367            let config = create_test_config();
1368            let proxy = NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap();
1369
1370            // Empty cache (default 0 capacity) should return "per-command"
1371            assert_eq!(proxy.routing_mode_display_name(), "per-command");
1372        }
1373
1374        #[test]
1375        fn test_generate_session_id_format() {
1376            let config = create_test_config();
1377            let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1378
1379            let session = proxy.create_session(
1380                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1381                None,
1382            );
1383
1384            let session_id = proxy.generate_session_id(&session);
1385
1386            // Should be a short UUID (8 characters)
1387            assert_eq!(session_id.len(), 8);
1388        }
1389
1390        #[test]
1391        fn test_create_session_without_router() {
1392            let config = create_test_config();
1393            let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1394
1395            let session = proxy.create_session(
1396                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1397                None,
1398            );
1399
1400            // Session should be created successfully
1401            assert_eq!(session.mode(), SessionMode::Stateful);
1402        }
1403
1404        #[test]
1405        fn test_create_session_with_router() {
1406            let config = create_test_config();
1407            let proxy = NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap();
1408
1409            let session = proxy.create_session(
1410                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1411                Some(proxy.router.clone()),
1412            );
1413
1414            // Session mode depends on router presence and config
1415            // With router, it should be per-command
1416            assert_eq!(session.mode(), SessionMode::PerCommand);
1417        }
1418
1419        #[test]
1420        fn test_record_connection_if_unauthenticated_no_auth() {
1421            let config = create_test_config();
1422            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1423
1424            let session = proxy.create_session(
1425                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1426                None,
1427            );
1428
1429            // Should not panic
1430            proxy.record_connection_if_unauthenticated(&session);
1431
1432            // Connection should be recorded (we can't easily verify without exposing internals)
1433        }
1434
1435        #[test]
1436        fn test_record_session_metrics_success() {
1437            let config = create_test_config();
1438            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1439
1440            let session = proxy.create_session(
1441                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1442                None,
1443            );
1444            let session_id = proxy.generate_session_id(&session);
1445
1446            let metrics = TransferMetrics {
1447                client_to_backend: crate::types::ClientToBackendBytes::new(1024),
1448                backend_to_client: crate::types::BackendToClientBytes::new(2048),
1449            };
1450
1451            let result = proxy.record_session_metrics(
1452                Ok(metrics),
1453                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1454                &session_id,
1455                &session,
1456                Some(crate::types::BackendId::from_index(0)),
1457            );
1458
1459            assert!(result.is_ok());
1460        }
1461
1462        #[test]
1463        fn test_record_session_metrics_error() {
1464            let config = create_test_config();
1465            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1466
1467            let session = proxy.create_session(
1468                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1469                None,
1470            );
1471            let session_id = proxy.generate_session_id(&session);
1472
1473            let result = proxy.record_session_metrics(
1474                Err(anyhow::anyhow!("test error")),
1475                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1476                &session_id,
1477                &session,
1478                Some(crate::types::BackendId::from_index(0)),
1479            );
1480
1481            assert!(result.is_err());
1482            assert_eq!(result.unwrap_err().to_string(), "test error");
1483        }
1484
1485        #[tokio::test]
1486        async fn test_prepare_per_command_connection() {
1487            let config = create_test_config();
1488            let proxy = Arc::new(
1489                NntpProxy::new(config, RoutingMode::PerCommand)
1490                    .await
1491                    .unwrap(),
1492            );
1493
1494            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1495            let addr = listener.local_addr().unwrap();
1496
1497            // Spawn a simple acceptor that reads greeting
1498            tokio::spawn(async move {
1499                let (stream, _) = listener.accept().await.unwrap();
1500                let mut buf = [0u8; 1024];
1501                let _ = stream.try_read(&mut buf); // Read greeting
1502            });
1503
1504            let mut stream = tokio::net::TcpStream::connect(addr).await.unwrap();
1505            let client_addr = ClientAddress::from(stream.peer_addr().unwrap());
1506
1507            let result = proxy
1508                .prepare_per_command_connection(&mut stream, client_addr)
1509                .await;
1510            assert!(result.is_ok());
1511        }
1512
1513        #[test]
1514        fn test_routing_mode_display_name_empty_cache() {
1515            let config = create_test_config();
1516            let proxy = NntpProxy::new_sync(config, RoutingMode::Hybrid).unwrap();
1517
1518            let _empty_cache = Arc::new(crate::cache::UnifiedCache::memory(
1519                100,
1520                std::time::Duration::from_secs(3600),
1521                false,
1522            ));
1523            assert_eq!(proxy.routing_mode_display_name(), "per-command");
1524        }
1525
1526        #[test]
1527        fn test_log_routing_selection() {
1528            let config = create_test_config();
1529            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1530
1531            let backend_id = crate::types::BackendId::from_index(0);
1532            let client_addr =
1533                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1534
1535            // Should not panic
1536            proxy.log_routing_selection(client_addr, backend_id, &proxy.servers()[0]);
1537        }
1538
1539        #[test]
1540        fn test_log_pool_status() {
1541            let config = create_test_config();
1542            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1543
1544            // Should not panic
1545            proxy.log_pool_status(0);
1546        }
1547
1548        #[tokio::test]
1549        async fn test_apply_tcp_optimizations() {
1550            let config = create_test_config();
1551            let proxy = Arc::new(NntpProxy::new(config, RoutingMode::Stateful).await.unwrap());
1552
1553            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1554            let addr = listener.local_addr().unwrap();
1555
1556            tokio::spawn(async move {
1557                let (_stream, _) = listener.accept().await.unwrap();
1558            });
1559
1560            let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
1561
1562            // Should not panic
1563            proxy.apply_tcp_optimizations(&stream);
1564        }
1565
1566        #[test]
1567        fn test_session_mode_labels_all_combinations() {
1568            use crate::session::SessionMode;
1569
1570            // PerCommand mode
1571            let config = create_test_config();
1572            let proxy = NntpProxy::new_sync(config.clone(), RoutingMode::PerCommand).unwrap();
1573            assert_eq!(
1574                proxy.session_mode_label(SessionMode::PerCommand),
1575                "per-command"
1576            );
1577
1578            // Stateful mode
1579            let proxy = NntpProxy::new_sync(config.clone(), RoutingMode::Stateful).unwrap();
1580            assert_eq!(proxy.session_mode_label(SessionMode::Stateful), "standard");
1581
1582            // Hybrid mode
1583            let proxy = NntpProxy::new_sync(config, RoutingMode::Hybrid).unwrap();
1584            assert_eq!(proxy.session_mode_label(SessionMode::Stateful), "hybrid");
1585        }
1586
1587        #[test]
1588        fn test_finalize_stateful_session_success() {
1589            let config = create_test_config();
1590            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1591
1592            let client_addr =
1593                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1594            let session = proxy.create_session(client_addr, None);
1595            let session_id = proxy.generate_session_id(&session);
1596            let backend_id = crate::types::BackendId::from_index(0);
1597
1598            let metrics = TransferMetrics {
1599                client_to_backend: crate::types::ClientToBackendBytes::new(512),
1600                backend_to_client: crate::types::BackendToClientBytes::new(1024),
1601            };
1602
1603            let result = proxy.finalize_stateful_session(
1604                Ok(metrics),
1605                client_addr,
1606                &session_id,
1607                &session,
1608                backend_id,
1609            );
1610
1611            assert!(result.is_ok());
1612        }
1613
1614        #[test]
1615        fn test_finalize_stateful_session_error() {
1616            let config = create_test_config();
1617            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1618
1619            let client_addr =
1620                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1621            let session = proxy.create_session(client_addr, None);
1622            let session_id = proxy.generate_session_id(&session);
1623            let backend_id = crate::types::BackendId::from_index(0);
1624
1625            let result = proxy.finalize_stateful_session(
1626                Err(anyhow::anyhow!("connection error")),
1627                client_addr,
1628                &session_id,
1629                &session,
1630                backend_id,
1631            );
1632
1633            assert!(result.is_err());
1634        }
1635
1636        #[test]
1637        fn test_finalize_per_command_session_success() {
1638            let config = create_test_config();
1639            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap());
1640
1641            let client_addr =
1642                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1643            let session = proxy.create_session(client_addr, Some(proxy.router.clone()));
1644            let session_id = proxy.generate_session_id(&session);
1645
1646            let metrics = TransferMetrics {
1647                client_to_backend: crate::types::ClientToBackendBytes::new(256),
1648                backend_to_client: crate::types::BackendToClientBytes::new(512),
1649            };
1650
1651            let result =
1652                proxy.finalize_per_command_session(Ok(metrics), client_addr, &session_id, &session);
1653
1654            assert!(result.is_ok());
1655        }
1656
1657        #[test]
1658        fn test_finalize_per_command_session_error() {
1659            let config = create_test_config();
1660            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap());
1661
1662            let client_addr =
1663                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1664            let session = proxy.create_session(client_addr, Some(proxy.router.clone()));
1665            let session_id = proxy.generate_session_id(&session);
1666
1667            let result = proxy.finalize_per_command_session(
1668                Err(anyhow::anyhow!("session failed")),
1669                client_addr,
1670                &session_id,
1671                &session,
1672            );
1673
1674            assert!(result.is_err());
1675        }
1676
1677        #[test]
1678        fn test_record_session_metrics_without_backend() {
1679            let config = create_test_config();
1680            let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap());
1681
1682            let client_addr =
1683                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1684            let session = proxy.create_session(client_addr, Some(proxy.router.clone()));
1685            let session_id = proxy.generate_session_id(&session);
1686
1687            let metrics = TransferMetrics {
1688                client_to_backend: crate::types::ClientToBackendBytes::new(128),
1689                backend_to_client: crate::types::BackendToClientBytes::new(256),
1690            };
1691
1692            // Without backend_id (per-command mode)
1693            let result =
1694                proxy.record_session_metrics(Ok(metrics), client_addr, &session_id, &session, None);
1695
1696            assert!(result.is_ok());
1697        }
1698
1699        #[test]
1700        fn test_generate_session_id_uniqueness() {
1701            let config = create_test_config();
1702            let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1703
1704            let client_addr =
1705                ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1706
1707            let session1 = proxy.create_session(client_addr, None);
1708            let session2 = proxy.create_session(client_addr, None);
1709
1710            let id1 = proxy.generate_session_id(&session1);
1711            let id2 = proxy.generate_session_id(&session2);
1712
1713            // Should generate different IDs for different sessions
1714            assert_ne!(id1, id2);
1715        }
1716    }
1717
1718    mod idle_tracking {
1719        use super::*;
1720        use std::time::Duration;
1721
1722        #[test]
1723        fn test_idle_timeout_constant() {
1724            // Ensure the idle timeout is 5 minutes
1725            assert_eq!(NntpProxy::IDLE_TIMEOUT, Duration::from_secs(5 * 60));
1726        }
1727
1728        #[test]
1729        fn test_active_clients_increment_decrement() {
1730            let config = create_test_config();
1731            let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1732
1733            assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 0);
1734
1735            proxy.increment_active_clients();
1736            assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 1);
1737
1738            proxy.increment_active_clients();
1739            assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 2);
1740
1741            proxy.decrement_active_clients();
1742            assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 1);
1743
1744            proxy.decrement_active_clients();
1745            assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 0);
1746        }
1747
1748        #[test]
1749        fn test_last_activity_updated_on_last_client_disconnect() {
1750            let config = create_test_config();
1751            let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1752
1753            // Initially no activity
1754            assert_eq!(proxy.last_activity_nanos.load(Ordering::Relaxed), 0);
1755
1756            // Connect two clients
1757            proxy.increment_active_clients();
1758            proxy.increment_active_clients();
1759
1760            // First client disconnects - should not update timestamp
1761            proxy.decrement_active_clients();
1762            assert_eq!(proxy.last_activity_nanos.load(Ordering::Relaxed), 0);
1763
1764            // Second (last) client disconnects - should update timestamp
1765            proxy.decrement_active_clients();
1766            assert!(proxy.last_activity_nanos.load(Ordering::Relaxed) > 0);
1767        }
1768
1769        #[test]
1770        fn test_check_and_clear_skips_when_clients_active() {
1771            let config = create_test_config();
1772            let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1773
1774            // Simulate a past activity
1775            proxy.last_activity_nanos.store(1, Ordering::Relaxed);
1776
1777            // With active clients, should not clear
1778            proxy.increment_active_clients();
1779            let cleared = proxy.check_and_clear_stale_pools();
1780            assert!(!cleared);
1781        }
1782
1783        #[test]
1784        fn test_check_and_clear_skips_when_never_active() {
1785            let config = create_test_config();
1786            let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1787
1788            // No prior activity (last_activity_nanos = 0)
1789            let cleared = proxy.check_and_clear_stale_pools();
1790            assert!(!cleared);
1791        }
1792
1793        #[test]
1794        fn test_check_and_clear_skips_when_recently_active() {
1795            let config = create_test_config();
1796            let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1797
1798            // Connect and disconnect to set timestamp
1799            proxy.increment_active_clients();
1800            proxy.decrement_active_clients();
1801
1802            // Should not clear - just disconnected (within timeout)
1803            let cleared = proxy.check_and_clear_stale_pools();
1804            assert!(!cleared);
1805        }
1806    }
1807}