1use anyhow::{Context, Result};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10use tokio::net::TcpStream;
11use tracing::{debug, info, warn};
12
13use crate::auth::AuthHandler;
14use crate::cache::{HybridCacheConfig, UnifiedCache};
15use crate::config::{Config, RoutingMode, Server};
16use crate::constants::buffer::{POOL, POOL_COUNT};
17use crate::metrics::{ConnectionStatsAggregator, MetricsCollector};
18use crate::network::NetworkOptimizer;
19use crate::pool::{BufferPool, ConnectionProvider, DeadpoolConnectionProvider, prewarm_pools};
20use crate::router;
21use crate::session::ClientSession;
22use crate::types::ClientAddress;
23use crate::types::{self, BufferSize, TransferMetrics};
24
25#[derive(Debug)]
63pub struct NntpProxyBuilder {
64 config: Config,
65 routing_mode: RoutingMode,
66 buffer_size: Option<usize>,
67 buffer_count: Option<usize>,
68}
69
70impl NntpProxyBuilder {
71 #[must_use]
75 pub fn new(config: Config) -> Self {
76 Self {
77 config,
78 routing_mode: RoutingMode::Stateful,
79 buffer_size: None,
80 buffer_count: None,
81 }
82 }
83
84 #[must_use]
91 pub fn with_routing_mode(mut self, mode: RoutingMode) -> Self {
92 self.routing_mode = mode;
93 self
94 }
95
96 #[must_use]
101 pub fn with_buffer_pool_size(mut self, size: usize) -> Self {
102 self.buffer_size = Some(size);
103 self
104 }
105
106 #[must_use]
111 pub fn with_buffer_pool_count(mut self, count: usize) -> Self {
112 self.buffer_count = Some(count);
113 self
114 }
115
116 pub async fn build(self) -> Result<NntpProxy> {
126 if self.config.servers.is_empty() {
127 anyhow::bail!("No servers configured in configuration");
128 }
129
130 let buffer_size = self.buffer_size.unwrap_or(POOL);
132 let buffer_count = self.buffer_count.unwrap_or(POOL_COUNT);
133
134 let connection_providers: Result<Vec<DeadpoolConnectionProvider>> = self
136 .config
137 .servers
138 .iter()
139 .map(|server| {
140 info!(
141 "Configuring deadpool connection provider for '{}'",
142 server.name
143 );
144 DeadpoolConnectionProvider::from_server_config(server)
145 })
146 .collect();
147
148 let connection_providers = connection_providers?;
149
150 let buffer_pool = BufferPool::new(
151 BufferSize::try_new(buffer_size)
152 .map_err(|_| anyhow::anyhow!("Buffer size must be non-zero"))?,
153 buffer_count,
154 );
155
156 let metrics = MetricsCollector::new(self.config.servers.len());
158
159 let servers = Arc::new(self.config.servers);
160
161 let router = Arc::new({
163 use types::BackendId;
164 let backend_strategy = self.config.proxy.backend_selection;
165 connection_providers.iter().enumerate().fold(
166 router::BackendSelector::with_strategy(backend_strategy),
167 |mut r, (idx, provider)| {
168 let backend_id = BackendId::from_index(idx);
169 r.add_backend(
170 backend_id,
171 servers[idx].name.clone(),
172 provider.clone(),
173 servers[idx].tier,
174 );
175 r
176 },
177 )
178 });
179
180 let auth_handler = {
182 let all_users: Vec<(String, String)> = self
183 .config
184 .client_auth
185 .all_users()
186 .into_iter()
187 .map(|(u, p)| (u.to_string(), p.to_string()))
188 .collect();
189
190 if all_users.is_empty() {
191 Arc::new(AuthHandler::default())
192 } else {
193 Arc::new(AuthHandler::with_users(all_users).with_context(|| {
194 "Invalid authentication configuration. \
195 If you set username/password in config, they cannot be empty. \
196 Remove them entirely to disable authentication."
197 })?)
198 }
199 };
200
201 let (cache, cache_articles) = if let Some(cache_config) = &self.config.cache {
204 let capacity = cache_config.max_capacity.as_u64();
205 let cache_articles = cache_config.cache_articles;
206
207 let cache = if let Some(disk_config) = &cache_config.disk {
209 let hybrid_config = HybridCacheConfig {
211 memory_capacity: capacity,
212 disk_path: disk_config.path.clone(),
213 disk_capacity: disk_config.capacity.as_u64(),
214 shards: disk_config.shards,
215 compression: disk_config.compression,
216 ttl: cache_config.ttl,
217 cache_articles,
218 };
219
220 info!(
221 "Initializing hybrid cache: memory={}MB, disk={}GB at {:?}",
222 capacity / (1024 * 1024),
223 disk_config.capacity.as_u64() / (1024 * 1024 * 1024),
224 disk_config.path
225 );
226
227 Arc::new(
228 UnifiedCache::hybrid(hybrid_config)
229 .await
230 .context("Failed to initialize hybrid disk cache")?,
231 )
232 } else {
233 Arc::new(UnifiedCache::memory(
235 capacity,
236 cache_config.ttl,
237 cache_articles,
238 ))
239 };
240
241 if capacity > 0 {
242 if cache_articles {
243 info!(
244 "Article cache enabled: max_capacity={}, ttl={}s (full caching)",
245 cache_config.max_capacity,
246 cache_config.ttl.as_secs()
247 );
248 } else {
249 info!(
250 "Article cache enabled: max_capacity={}, ttl={}s (availability-only, bodies not cached)",
251 cache_config.max_capacity,
252 cache_config.ttl.as_secs()
253 );
254 }
255 } else {
256 info!("Backend availability tracking enabled (cache disabled, capacity=0)");
257 }
258 (cache, cache_articles)
259 } else {
260 debug!("Cache not configured, using availability-only mode (capacity=0)");
261 (
262 Arc::new(UnifiedCache::memory(0, Duration::from_secs(3600), false)),
263 true,
264 )
265 };
266
267 let adaptive_precheck = self
269 .config
270 .cache
271 .as_ref()
272 .map(|c| c.adaptive_precheck)
273 .unwrap_or(false);
274
275 let start_instant = Instant::now();
276
277 Ok(NntpProxy {
278 servers,
279 router,
280 connection_providers,
281 buffer_pool,
282 routing_mode: self.routing_mode,
283 auth_handler,
284 metrics,
285 connection_stats: ConnectionStatsAggregator::new(),
286 cache,
287 cache_articles,
288 adaptive_precheck,
289 last_activity_nanos: Arc::new(AtomicU64::new(0)),
290 active_clients: Arc::new(AtomicUsize::new(0)),
291 start_instant,
292 })
293 }
294
295 pub fn build_sync(self) -> Result<NntpProxy> {
308 if self.config.servers.is_empty() {
309 anyhow::bail!("No servers configured in configuration");
310 }
311
312 let buffer_size = self.buffer_size.unwrap_or(POOL);
314 let buffer_count = self.buffer_count.unwrap_or(POOL_COUNT);
315
316 let connection_providers: Result<Vec<DeadpoolConnectionProvider>> = self
318 .config
319 .servers
320 .iter()
321 .map(|server| {
322 info!(
323 "Configuring deadpool connection provider for '{}'",
324 server.name
325 );
326 DeadpoolConnectionProvider::from_server_config(server)
327 })
328 .collect();
329
330 let connection_providers = connection_providers?;
331
332 let buffer_pool = BufferPool::new(
333 BufferSize::try_new(buffer_size)
334 .map_err(|_| anyhow::anyhow!("Buffer size must be non-zero"))?,
335 buffer_count,
336 );
337
338 let metrics = MetricsCollector::new(self.config.servers.len());
340
341 let servers = Arc::new(self.config.servers);
342
343 let router = Arc::new({
345 use types::BackendId;
346 let backend_strategy = self.config.proxy.backend_selection;
347 connection_providers.iter().enumerate().fold(
348 router::BackendSelector::with_strategy(backend_strategy),
349 |mut r, (idx, provider)| {
350 let backend_id = BackendId::from_index(idx);
351 r.add_backend(
352 backend_id,
353 servers[idx].name.clone(),
354 provider.clone(),
355 servers[idx].tier,
356 );
357 r
358 },
359 )
360 });
361
362 let auth_handler = {
364 let all_users: Vec<(String, String)> = self
365 .config
366 .client_auth
367 .all_users()
368 .into_iter()
369 .map(|(u, p)| (u.to_string(), p.to_string()))
370 .collect();
371
372 if all_users.is_empty() {
373 Arc::new(AuthHandler::default())
374 } else {
375 Arc::new(AuthHandler::with_users(all_users).with_context(|| {
376 "Invalid authentication configuration. \
377 If you set username/password in config, they cannot be empty. \
378 Remove them entirely to disable authentication."
379 })?)
380 }
381 };
382
383 let (cache, cache_articles) = if let Some(cache_config) = &self.config.cache {
385 let capacity = cache_config.max_capacity.as_u64();
386 let cache_articles = cache_config.cache_articles;
387
388 if cache_config.disk.is_some() {
390 warn!(
391 "Disk cache configured but build_sync() called - using memory-only cache. Use build() for disk cache support."
392 );
393 }
394
395 let cache = Arc::new(UnifiedCache::memory(
397 capacity,
398 cache_config.ttl,
399 cache_articles,
400 ));
401
402 if capacity > 0 {
403 if cache_articles {
404 info!(
405 "Article cache enabled: max_capacity={}, ttl={}s (full caching)",
406 cache_config.max_capacity,
407 cache_config.ttl.as_secs()
408 );
409 } else {
410 info!(
411 "Article cache enabled: max_capacity={}, ttl={}s (availability-only, bodies not cached)",
412 cache_config.max_capacity,
413 cache_config.ttl.as_secs()
414 );
415 }
416 } else {
417 info!("Backend availability tracking enabled (cache disabled, capacity=0)");
418 }
419 (cache, cache_articles)
420 } else {
421 debug!("Cache not configured, using availability-only mode (capacity=0)");
422 (
423 Arc::new(UnifiedCache::memory(0, Duration::from_secs(3600), false)),
424 true,
425 )
426 };
427
428 let adaptive_precheck = self
430 .config
431 .cache
432 .as_ref()
433 .map(|c| c.adaptive_precheck)
434 .unwrap_or(false);
435
436 let start_instant = Instant::now();
437
438 Ok(NntpProxy {
439 servers,
440 router,
441 connection_providers,
442 buffer_pool,
443 routing_mode: self.routing_mode,
444 auth_handler,
445 metrics,
446 connection_stats: ConnectionStatsAggregator::new(),
447 cache,
448 cache_articles,
449 adaptive_precheck,
450 last_activity_nanos: Arc::new(AtomicU64::new(0)),
451 active_clients: Arc::new(AtomicUsize::new(0)),
452 start_instant,
453 })
454 }
455}
456
457#[derive(Debug, Clone)]
458pub struct NntpProxy {
459 servers: Arc<Vec<Server>>,
460 router: Arc<router::BackendSelector>,
462 connection_providers: Vec<DeadpoolConnectionProvider>,
464 buffer_pool: BufferPool,
466 routing_mode: RoutingMode,
468 auth_handler: Arc<AuthHandler>,
470 metrics: MetricsCollector,
472 connection_stats: ConnectionStatsAggregator,
474 cache: Arc<UnifiedCache>,
476 cache_articles: bool,
478 adaptive_precheck: bool,
480 last_activity_nanos: Arc<AtomicU64>,
483 active_clients: Arc<AtomicUsize>,
485 start_instant: Instant,
487}
488
489#[inline]
508pub fn is_client_disconnect_error(e: &anyhow::Error) -> bool {
509 crate::session::error_classification::ErrorClassifier::is_client_disconnect(e)
510}
511
512impl NntpProxy {
513 const IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60);
517
518 #[inline]
519 fn record_connection_opened(&self) {
520 self.metrics.connection_opened();
521 }
522
523 #[inline]
524 fn record_connection_closed(&self) {
525 self.metrics.connection_closed();
526 }
527
528 #[inline]
532 fn increment_active_clients(&self) {
533 self.active_clients.fetch_add(1, Ordering::Relaxed);
534 }
535
536 #[inline]
540 fn decrement_active_clients(&self) {
541 let prev = self.active_clients.fetch_sub(1, Ordering::Relaxed);
542
543 if prev == 1 {
545 let nanos = self.start_instant.elapsed().as_nanos() as u64;
546 self.last_activity_nanos.store(nanos, Ordering::Relaxed);
547 }
548 }
549
550 fn check_and_clear_stale_pools(&self) -> bool {
559 if self.active_clients.load(Ordering::Relaxed) > 0 {
561 return false;
562 }
563
564 let last_activity_nanos = self.last_activity_nanos.load(Ordering::Relaxed);
565
566 if last_activity_nanos == 0 {
568 return false;
569 }
570
571 let last_activity = Duration::from_nanos(last_activity_nanos);
572 let now = self.start_instant.elapsed();
573 let idle_duration = now.saturating_sub(last_activity);
574
575 if idle_duration > Self::IDLE_TIMEOUT {
576 info!(
577 idle_secs = idle_duration.as_secs(),
578 pool_count = self.connection_providers.len(),
579 "Clearing stale pool connections after idle timeout"
580 );
581
582 for provider in &self.connection_providers {
583 provider.clear_idle_connections();
584 }
585
586 true
587 } else {
588 false
589 }
590 }
591
592 fn build_session(
594 &self,
595 client_addr: ClientAddress,
596 router: Option<Arc<router::BackendSelector>>,
597 routing_mode: RoutingMode,
598 cache: Arc<UnifiedCache>,
599 ) -> ClientSession {
600 let builder = ClientSession::builder(
602 client_addr,
603 self.buffer_pool.clone(),
604 self.auth_handler.clone(),
605 self.metrics.clone(),
606 )
607 .with_routing_mode(routing_mode)
608 .with_connection_stats(self.connection_stats.clone())
609 .with_cache(cache)
610 .with_cache_articles(self.cache_articles)
611 .with_adaptive_precheck(self.adaptive_precheck);
612
613 let builder = match router {
615 Some(r) => builder.with_router(r),
616 None => builder,
617 };
618
619 builder.build()
620 }
621
622 fn log_session_completion(
624 &self,
625 client_addr: ClientAddress,
626 session_id: &str,
627 session: &ClientSession,
628 routing_mode: crate::config::RoutingMode,
629 metrics: &types::TransferMetrics,
630 ) {
631 self.connection_stats
632 .record_disconnection(session.username().as_deref(), routing_mode.short_name());
633
634 debug!(
635 "Session {} [{}] ↑{} ↓{}",
636 client_addr,
637 session_id,
638 crate::formatting::format_bytes(metrics.client_to_backend.as_u64()),
639 crate::formatting::format_bytes(metrics.backend_to_client.as_u64())
640 );
641 }
642
643 pub async fn new(config: Config, routing_mode: RoutingMode) -> Result<Self> {
661 NntpProxyBuilder::new(config)
662 .with_routing_mode(routing_mode)
663 .build()
664 .await
665 }
666
667 pub fn new_sync(config: Config, routing_mode: RoutingMode) -> Result<Self> {
684 NntpProxyBuilder::new(config)
685 .with_routing_mode(routing_mode)
686 .build_sync()
687 }
688
689 #[must_use]
708 pub fn builder(config: Config) -> NntpProxyBuilder {
709 NntpProxyBuilder::new(config)
710 }
711
712 pub async fn prewarm_connections(&self) -> Result<()> {
715 prewarm_pools(&self.connection_providers, &self.servers).await
716 }
717
718 pub async fn graceful_shutdown(&self) {
720 info!("Initiating graceful shutdown...");
721
722 info!("Flushing disk cache writes...");
725 if let Err(e) = self.cache.close().await {
726 warn!("Error closing cache: {}", e);
727 }
728
729 info!("Shutting down connection pools...");
730 for provider in &self.connection_providers {
731 provider.graceful_shutdown().await;
732 }
733
734 info!("All connection pools have been shut down gracefully");
735 }
736
737 #[must_use]
739 #[inline]
740 pub fn servers(&self) -> &[Server] {
741 &self.servers
742 }
743
744 #[must_use]
746 #[inline]
747 pub fn router(&self) -> &Arc<router::BackendSelector> {
748 &self.router
749 }
750
751 #[must_use]
753 #[inline]
754 pub fn connection_providers(&self) -> &[DeadpoolConnectionProvider] {
755 &self.connection_providers
756 }
757
758 #[must_use]
760 #[inline]
761 pub fn buffer_pool(&self) -> &BufferPool {
762 &self.buffer_pool
763 }
764
765 #[must_use]
767 #[inline]
768 pub fn cache(&self) -> &Arc<UnifiedCache> {
769 &self.cache
770 }
771
772 #[must_use]
774 #[inline]
775 pub fn metrics(&self) -> &MetricsCollector {
776 &self.metrics
777 }
778
779 #[must_use]
781 #[inline]
782 pub fn connection_stats(&self) -> &ConnectionStatsAggregator {
783 &self.connection_stats
784 }
785
786 #[inline]
788 fn log_routing_selection(
789 &self,
790 client_addr: ClientAddress,
791 backend_id: crate::types::BackendId,
792 server: &Server,
793 ) {
794 info!(
795 "Routing client {} to backend {:?} ({}:{})",
796 client_addr, backend_id, server.host, server.port
797 );
798 }
799
800 #[inline]
802 fn log_pool_status(&self, server_idx: usize) {
803 let pool_status = self.connection_providers[server_idx].status();
804 debug!(
805 "Pool status for {}: {}/{} available, {} created",
806 self.servers[server_idx].name,
807 pool_status.available,
808 pool_status.max_size,
809 pool_status.created
810 );
811 }
812
813 async fn prepare_stateful_connection(
815 &self,
816 client_stream: &mut TcpStream,
817 client_addr: ClientAddress,
818 ) -> Result<crate::types::BackendId> {
819 self.record_connection_opened();
820
821 let client_id = types::ClientId::new();
822 let backend_id = self.router.route_command(client_id, "")?;
823 let server_idx = backend_id.as_index();
824
825 self.log_routing_selection(client_addr, backend_id, &self.servers[server_idx]);
826 self.send_greeting(client_stream, client_addr).await?;
827 self.log_pool_status(server_idx);
828 self.apply_tcp_optimizations(client_stream);
829
830 Ok(backend_id)
831 }
832
833 async fn prepare_per_command_connection(
835 &self,
836 client_stream: &mut TcpStream,
837 client_addr: ClientAddress,
838 ) -> Result<()> {
839 self.record_connection_opened();
840 self.send_greeting(client_stream, client_addr).await?;
841 self.apply_tcp_optimizations(client_stream);
842 Ok(())
843 }
844
845 #[inline]
847 fn create_session(
848 &self,
849 client_addr: ClientAddress,
850 router: Option<Arc<crate::router::BackendSelector>>,
851 ) -> ClientSession {
852 self.build_session(client_addr, router, self.routing_mode, self.cache.clone())
853 }
854
855 #[inline]
857 fn generate_session_id(&self, session: &ClientSession) -> String {
858 crate::formatting::short_id(session.client_id().as_uuid())
859 }
860
861 #[inline]
863 async fn send_greeting(
864 &self,
865 client_stream: &mut TcpStream,
866 client_addr: ClientAddress,
867 ) -> Result<()> {
868 crate::protocol::send_proxy_greeting(client_stream, client_addr).await
869 }
870
871 #[inline]
873 fn apply_tcp_optimizations(&self, client_stream: &TcpStream) {
874 use crate::network::TcpOptimizer;
875 TcpOptimizer::new(client_stream)
876 .optimize()
877 .map_err(|e| debug!("Failed to optimize client socket: {}", e))
878 .ok();
879 }
880
881 #[inline]
883 fn routing_mode_display_name(&self) -> &'static str {
884 if self.cache.entry_count() > 0 {
885 "caching"
886 } else {
887 "per-command"
888 }
889 }
890
891 fn finalize_stateful_session(
893 &self,
894 metrics: Result<TransferMetrics>,
895 client_addr: ClientAddress,
896 session_id: &str,
897 session: &ClientSession,
898 backend_id: crate::types::BackendId,
899 ) -> Result<()> {
900 self.record_connection_if_unauthenticated(session);
901 self.router.complete_command(backend_id);
902 self.record_session_metrics(metrics, client_addr, session_id, session, Some(backend_id))?;
903 self.record_connection_closed();
904 Ok(())
905 }
906
907 fn finalize_per_command_session(
909 &self,
910 metrics: Result<TransferMetrics>,
911 client_addr: ClientAddress,
912 session_id: &str,
913 session: &ClientSession,
914 ) -> Result<()> {
915 self.record_session_metrics(metrics, client_addr, session_id, session, None)?;
916 self.record_connection_closed();
917 Ok(())
918 }
919
920 #[inline]
922 fn record_connection_if_unauthenticated(&self, session: &ClientSession) {
923 if !self.auth_handler.is_enabled() || session.username().is_none() {
924 let mode = self.session_mode_label(session.mode());
925 self.connection_stats
926 .record_connection(session.username().as_deref(), mode);
927 }
928 }
929
930 fn record_session_metrics(
932 &self,
933 metrics: Result<TransferMetrics>,
934 client_addr: ClientAddress,
935 session_id: &str,
936 session: &ClientSession,
937 backend_id: Option<crate::types::BackendId>,
938 ) -> Result<()> {
939 match metrics {
940 Ok(m) => {
941 self.log_session_completion(
942 client_addr,
943 session_id,
944 session,
945 self.routing_mode,
946 &m,
947 );
948
949 if let Some(bid) = backend_id {
950 self.metrics
951 .record_client_to_backend_bytes_for(bid, m.client_to_backend.as_u64());
952 self.metrics
953 .record_backend_to_client_bytes_for(bid, m.backend_to_client.as_u64());
954 }
955 Ok(())
956 }
957 Err(e) => {
958 if let Some(bid) = backend_id {
959 self.metrics.record_error(bid);
960 }
961
962 if !is_client_disconnect_error(&e) {
964 warn!("Session error for client {}: {:?}", client_addr, e);
965 }
966 Err(e)
967 }
968 }
969 }
970
971 #[inline]
973 fn session_mode_label(&self, session_mode: crate::session::SessionMode) -> &'static str {
974 use crate::session::SessionMode;
975 match (session_mode, self.routing_mode) {
976 (SessionMode::PerCommand, _) => "per-command",
977 (SessionMode::Stateful, RoutingMode::Stateful) => "standard",
978 (SessionMode::Stateful, RoutingMode::Hybrid) => "hybrid",
979 (SessionMode::Stateful, _) => "stateful",
980 }
981 }
982
983 pub async fn handle_client(
984 &self,
985 mut client_stream: TcpStream,
986 client_addr: ClientAddress,
987 ) -> Result<()> {
988 debug!("New client connection from {}", client_addr);
989
990 self.check_and_clear_stale_pools();
992 self.increment_active_clients();
993
994 let result = async {
995 let backend_id = self
996 .prepare_stateful_connection(&mut client_stream, client_addr)
997 .await?;
998 let server_idx = backend_id.as_index();
999
1000 let session = self.create_session(client_addr, None);
1001 let session_id = self.generate_session_id(&session);
1002
1003 debug!("Starting stateful session for client {}", client_addr);
1004
1005 let metrics = session
1006 .handle_stateful_session(
1007 client_stream,
1008 backend_id,
1009 &self.connection_providers[server_idx],
1010 &self.servers[server_idx].name,
1011 )
1012 .await;
1013
1014 self.finalize_stateful_session(metrics, client_addr, &session_id, &session, backend_id)
1015 }
1016 .await;
1017
1018 self.decrement_active_clients();
1019 result
1020 }
1021
1022 pub async fn handle_client_per_command_routing(
1027 &self,
1028 client_stream: TcpStream,
1029 client_addr: ClientAddress,
1030 ) -> Result<()> {
1031 self.check_and_clear_stale_pools();
1033 self.increment_active_clients();
1034
1035 let result = self
1036 .handle_per_command_client(client_stream, client_addr)
1037 .await;
1038
1039 self.decrement_active_clients();
1040 result
1041 }
1042
1043 async fn handle_per_command_client(
1045 &self,
1046 mut client_stream: TcpStream,
1047 client_addr: ClientAddress,
1048 ) -> Result<()> {
1049 let mode_label = self.routing_mode_display_name();
1050 debug!(
1051 "New {} routing client connection from {}",
1052 mode_label, client_addr
1053 );
1054
1055 self.prepare_per_command_connection(&mut client_stream, client_addr)
1056 .await?;
1057
1058 let session = self.create_session(client_addr, Some(self.router.clone()));
1059 let session_id = self.generate_session_id(&session);
1060
1061 let metrics = session
1062 .handle_per_command_routing(client_stream)
1063 .await
1064 .with_context(|| {
1065 format!(
1066 "{} routing session failed for {} [{}]",
1067 mode_label, client_addr, session_id
1068 )
1069 });
1070
1071 self.finalize_per_command_session(metrics, client_addr, &session_id, &session)
1072 }
1073}
1074
1075#[cfg(test)]
1076mod tests {
1077 use super::*;
1078 use std::sync::Arc;
1079
1080 fn create_test_config() -> Config {
1081 use crate::config::{health_check_max_per_cycle, health_check_pool_timeout};
1082 use crate::types::{HostName, MaxConnections, Port, ServerName};
1083 Config {
1084 servers: vec![
1085 Server {
1086 host: HostName::try_new("server1.example.com".to_string()).unwrap(),
1087 port: Port::try_new(119).unwrap(),
1088 name: ServerName::try_new("Test Server 1".to_string()).unwrap(),
1089 username: None,
1090 password: None,
1091 max_connections: MaxConnections::try_new(5).unwrap(),
1092 use_tls: false,
1093 tls_verify_cert: true,
1094 tls_cert_path: None,
1095 connection_keepalive: None,
1096 health_check_max_per_cycle: health_check_max_per_cycle(),
1097 health_check_pool_timeout: health_check_pool_timeout(),
1098 tier: 0,
1099 },
1100 Server {
1101 host: HostName::try_new("server2.example.com".to_string()).unwrap(),
1102 port: Port::try_new(119).unwrap(),
1103 name: ServerName::try_new("Test Server 2".to_string()).unwrap(),
1104 username: None,
1105 password: None,
1106 max_connections: MaxConnections::try_new(8).unwrap(),
1107 use_tls: false,
1108 tls_verify_cert: true,
1109 tls_cert_path: None,
1110 connection_keepalive: None,
1111 health_check_max_per_cycle: health_check_max_per_cycle(),
1112 health_check_pool_timeout: health_check_pool_timeout(),
1113 tier: 0,
1114 },
1115 Server {
1116 host: HostName::try_new("server3.example.com".to_string()).unwrap(),
1117 port: Port::try_new(119).unwrap(),
1118 name: ServerName::try_new("Test Server 3".to_string()).unwrap(),
1119 username: None,
1120 password: None,
1121 max_connections: MaxConnections::try_new(12).unwrap(),
1122 use_tls: false,
1123 tls_verify_cert: true,
1124 tls_cert_path: None,
1125 connection_keepalive: None,
1126 health_check_max_per_cycle: health_check_max_per_cycle(),
1127 health_check_pool_timeout: health_check_pool_timeout(),
1128 tier: 0,
1129 },
1130 ],
1131 ..Default::default()
1132 }
1133 }
1134
1135 #[test]
1136 fn test_proxy_creation_with_servers() {
1137 let config = create_test_config();
1138 let proxy = Arc::new(
1139 NntpProxy::new_sync(config, RoutingMode::Stateful).expect("Failed to create proxy"),
1140 );
1141
1142 assert_eq!(proxy.servers().len(), 3);
1143 assert_eq!(proxy.servers()[0].name.as_str(), "Test Server 1");
1144 }
1145
1146 #[test]
1147 fn test_proxy_creation_with_empty_servers() {
1148 let config = Config {
1149 servers: vec![],
1150 ..Default::default()
1151 };
1152 let result = NntpProxy::new_sync(config, RoutingMode::Stateful);
1153
1154 assert!(result.is_err());
1155 assert!(
1156 result
1157 .unwrap_err()
1158 .to_string()
1159 .contains("No servers configured")
1160 );
1161 }
1162
1163 #[test]
1164 fn test_proxy_has_router() {
1165 let config = create_test_config();
1166 let proxy = Arc::new(
1167 NntpProxy::new_sync(config, RoutingMode::Stateful).expect("Failed to create proxy"),
1168 );
1169
1170 assert_eq!(proxy.router.backend_count(), 3);
1172 }
1173
1174 #[test]
1175 fn test_builder_basic_usage() {
1176 let config = create_test_config();
1177 let proxy = NntpProxy::builder(config)
1178 .build_sync()
1179 .expect("Failed to build proxy");
1180
1181 assert_eq!(proxy.servers().len(), 3);
1182 assert_eq!(proxy.router.backend_count(), 3);
1183 }
1184
1185 #[test]
1186 fn test_builder_with_routing_mode() {
1187 let config = create_test_config();
1188 let proxy = NntpProxy::builder(config)
1189 .with_routing_mode(RoutingMode::PerCommand)
1190 .build_sync()
1191 .expect("Failed to build proxy");
1192
1193 assert_eq!(proxy.servers().len(), 3);
1194 }
1195
1196 #[test]
1197 fn test_builder_with_custom_buffer_pool() {
1198 let config = create_test_config();
1199 let proxy = NntpProxy::builder(config)
1200 .with_buffer_pool_size(512 * 1024)
1201 .with_buffer_pool_count(64)
1202 .build_sync()
1203 .expect("Failed to build proxy");
1204
1205 assert_eq!(proxy.servers().len(), 3);
1206 }
1208
1209 #[test]
1210 fn test_builder_with_all_options() {
1211 let config = create_test_config();
1212 let proxy = NntpProxy::builder(config)
1213 .with_routing_mode(RoutingMode::Hybrid)
1214 .with_buffer_pool_size(1024 * 1024)
1215 .with_buffer_pool_count(16)
1216 .build_sync()
1217 .expect("Failed to build proxy");
1218
1219 assert_eq!(proxy.servers().len(), 3);
1220 assert_eq!(proxy.router.backend_count(), 3);
1221 }
1222
1223 #[test]
1224 fn test_builder_empty_servers_error() {
1225 let config = Config {
1226 servers: vec![],
1227 ..Default::default()
1228 };
1229 let result = NntpProxy::builder(config).build_sync();
1230
1231 assert!(result.is_err());
1232 assert!(
1233 result
1234 .unwrap_err()
1235 .to_string()
1236 .contains("No servers configured")
1237 );
1238 }
1239
1240 #[test]
1241 fn test_backward_compatibility_new() {
1242 let config = create_test_config();
1244 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful)
1245 .expect("Failed to create proxy with new_sync()");
1246
1247 assert_eq!(proxy.servers().len(), 3);
1248 assert_eq!(proxy.router.backend_count(), 3);
1249 }
1250
1251 mod error_classification {
1253 use super::*;
1254 use std::io::{Error, ErrorKind};
1255
1256 #[test]
1257 fn test_broken_pipe_is_client_disconnect() {
1258 let io_err = Error::from(ErrorKind::BrokenPipe);
1259 let err = anyhow::Error::from(io_err);
1260 assert!(is_client_disconnect_error(&err));
1261 }
1262
1263 #[test]
1264 fn test_connection_reset_is_client_disconnect() {
1265 let io_err = Error::from(ErrorKind::ConnectionReset);
1266 let err = anyhow::Error::from(io_err);
1267 assert!(is_client_disconnect_error(&err));
1268 }
1269
1270 #[test]
1271 fn test_other_io_errors_not_client_disconnect() {
1272 let error_kinds = vec![
1273 ErrorKind::NotFound,
1274 ErrorKind::PermissionDenied,
1275 ErrorKind::ConnectionRefused,
1276 ErrorKind::ConnectionAborted,
1277 ErrorKind::AddrInUse,
1278 ErrorKind::AddrNotAvailable,
1279 ErrorKind::TimedOut,
1280 ErrorKind::Interrupted,
1281 ErrorKind::UnexpectedEof,
1282 ErrorKind::WouldBlock,
1283 ];
1284
1285 for kind in error_kinds {
1286 let io_err = Error::from(kind);
1287 let err = anyhow::Error::from(io_err);
1288 assert!(
1289 !is_client_disconnect_error(&err),
1290 "{:?} should not be classified as client disconnect",
1291 kind
1292 );
1293 }
1294 }
1295
1296 #[test]
1297 fn test_non_io_error_not_client_disconnect() {
1298 let err = anyhow::anyhow!("generic error message");
1299 assert!(!is_client_disconnect_error(&err));
1300 }
1301
1302 #[test]
1303 fn test_wrapped_broken_pipe_error() {
1304 let io_err = Error::from(ErrorKind::BrokenPipe);
1305 let err = anyhow::Error::from(io_err).context("failed to write to client");
1306 assert!(is_client_disconnect_error(&err));
1307 }
1308
1309 #[test]
1310 fn test_wrapped_connection_reset_error() {
1311 let io_err = Error::from(ErrorKind::ConnectionReset);
1312 let err = anyhow::Error::from(io_err).context("failed to read from client");
1313 assert!(is_client_disconnect_error(&err));
1314 }
1315
1316 #[test]
1317 fn test_deeply_wrapped_error() {
1318 let io_err = Error::from(ErrorKind::BrokenPipe);
1319 let err = anyhow::Error::from(io_err)
1320 .context("inner context")
1321 .context("outer context");
1322 assert!(is_client_disconnect_error(&err));
1323 }
1324
1325 #[test]
1326 fn test_custom_io_error_message() {
1327 let io_err = Error::new(ErrorKind::BrokenPipe, "custom broken pipe");
1328 let err = anyhow::Error::from(io_err);
1329 assert!(is_client_disconnect_error(&err));
1330 }
1331 }
1332
1333 mod helper_methods {
1335 use super::*;
1336 use crate::session::SessionMode;
1337
1338 #[test]
1339 fn test_session_mode_label_per_command() {
1340 let config = create_test_config();
1341 let proxy = NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap();
1342
1343 let label = proxy.session_mode_label(SessionMode::PerCommand);
1344 assert_eq!(label, "per-command");
1345 }
1346
1347 #[test]
1348 fn test_session_mode_label_stateful_standard() {
1349 let config = create_test_config();
1350 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1351
1352 let label = proxy.session_mode_label(SessionMode::Stateful);
1353 assert_eq!(label, "standard");
1354 }
1355
1356 #[test]
1357 fn test_session_mode_label_stateful_hybrid() {
1358 let config = create_test_config();
1359 let proxy = NntpProxy::new_sync(config, RoutingMode::Hybrid).unwrap();
1360
1361 let label = proxy.session_mode_label(SessionMode::Stateful);
1362 assert_eq!(label, "hybrid");
1363 }
1364
1365 #[test]
1366 fn test_routing_mode_display_name_caching() {
1367 let config = create_test_config();
1368 let proxy = NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap();
1369
1370 assert_eq!(proxy.routing_mode_display_name(), "per-command");
1372 }
1373
1374 #[test]
1375 fn test_generate_session_id_format() {
1376 let config = create_test_config();
1377 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1378
1379 let session = proxy.create_session(
1380 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1381 None,
1382 );
1383
1384 let session_id = proxy.generate_session_id(&session);
1385
1386 assert_eq!(session_id.len(), 8);
1388 }
1389
1390 #[test]
1391 fn test_create_session_without_router() {
1392 let config = create_test_config();
1393 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1394
1395 let session = proxy.create_session(
1396 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1397 None,
1398 );
1399
1400 assert_eq!(session.mode(), SessionMode::Stateful);
1402 }
1403
1404 #[test]
1405 fn test_create_session_with_router() {
1406 let config = create_test_config();
1407 let proxy = NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap();
1408
1409 let session = proxy.create_session(
1410 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1411 Some(proxy.router.clone()),
1412 );
1413
1414 assert_eq!(session.mode(), SessionMode::PerCommand);
1417 }
1418
1419 #[test]
1420 fn test_record_connection_if_unauthenticated_no_auth() {
1421 let config = create_test_config();
1422 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1423
1424 let session = proxy.create_session(
1425 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1426 None,
1427 );
1428
1429 proxy.record_connection_if_unauthenticated(&session);
1431
1432 }
1434
1435 #[test]
1436 fn test_record_session_metrics_success() {
1437 let config = create_test_config();
1438 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1439
1440 let session = proxy.create_session(
1441 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1442 None,
1443 );
1444 let session_id = proxy.generate_session_id(&session);
1445
1446 let metrics = TransferMetrics {
1447 client_to_backend: crate::types::ClientToBackendBytes::new(1024),
1448 backend_to_client: crate::types::BackendToClientBytes::new(2048),
1449 };
1450
1451 let result = proxy.record_session_metrics(
1452 Ok(metrics),
1453 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1454 &session_id,
1455 &session,
1456 Some(crate::types::BackendId::from_index(0)),
1457 );
1458
1459 assert!(result.is_ok());
1460 }
1461
1462 #[test]
1463 fn test_record_session_metrics_error() {
1464 let config = create_test_config();
1465 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1466
1467 let session = proxy.create_session(
1468 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1469 None,
1470 );
1471 let session_id = proxy.generate_session_id(&session);
1472
1473 let result = proxy.record_session_metrics(
1474 Err(anyhow::anyhow!("test error")),
1475 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap()),
1476 &session_id,
1477 &session,
1478 Some(crate::types::BackendId::from_index(0)),
1479 );
1480
1481 assert!(result.is_err());
1482 assert_eq!(result.unwrap_err().to_string(), "test error");
1483 }
1484
1485 #[tokio::test]
1486 async fn test_prepare_per_command_connection() {
1487 let config = create_test_config();
1488 let proxy = Arc::new(
1489 NntpProxy::new(config, RoutingMode::PerCommand)
1490 .await
1491 .unwrap(),
1492 );
1493
1494 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1495 let addr = listener.local_addr().unwrap();
1496
1497 tokio::spawn(async move {
1499 let (stream, _) = listener.accept().await.unwrap();
1500 let mut buf = [0u8; 1024];
1501 let _ = stream.try_read(&mut buf); });
1503
1504 let mut stream = tokio::net::TcpStream::connect(addr).await.unwrap();
1505 let client_addr = ClientAddress::from(stream.peer_addr().unwrap());
1506
1507 let result = proxy
1508 .prepare_per_command_connection(&mut stream, client_addr)
1509 .await;
1510 assert!(result.is_ok());
1511 }
1512
1513 #[test]
1514 fn test_routing_mode_display_name_empty_cache() {
1515 let config = create_test_config();
1516 let proxy = NntpProxy::new_sync(config, RoutingMode::Hybrid).unwrap();
1517
1518 let _empty_cache = Arc::new(crate::cache::UnifiedCache::memory(
1519 100,
1520 std::time::Duration::from_secs(3600),
1521 false,
1522 ));
1523 assert_eq!(proxy.routing_mode_display_name(), "per-command");
1524 }
1525
1526 #[test]
1527 fn test_log_routing_selection() {
1528 let config = create_test_config();
1529 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1530
1531 let backend_id = crate::types::BackendId::from_index(0);
1532 let client_addr =
1533 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1534
1535 proxy.log_routing_selection(client_addr, backend_id, &proxy.servers()[0]);
1537 }
1538
1539 #[test]
1540 fn test_log_pool_status() {
1541 let config = create_test_config();
1542 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1543
1544 proxy.log_pool_status(0);
1546 }
1547
1548 #[tokio::test]
1549 async fn test_apply_tcp_optimizations() {
1550 let config = create_test_config();
1551 let proxy = Arc::new(NntpProxy::new(config, RoutingMode::Stateful).await.unwrap());
1552
1553 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1554 let addr = listener.local_addr().unwrap();
1555
1556 tokio::spawn(async move {
1557 let (_stream, _) = listener.accept().await.unwrap();
1558 });
1559
1560 let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
1561
1562 proxy.apply_tcp_optimizations(&stream);
1564 }
1565
1566 #[test]
1567 fn test_session_mode_labels_all_combinations() {
1568 use crate::session::SessionMode;
1569
1570 let config = create_test_config();
1572 let proxy = NntpProxy::new_sync(config.clone(), RoutingMode::PerCommand).unwrap();
1573 assert_eq!(
1574 proxy.session_mode_label(SessionMode::PerCommand),
1575 "per-command"
1576 );
1577
1578 let proxy = NntpProxy::new_sync(config.clone(), RoutingMode::Stateful).unwrap();
1580 assert_eq!(proxy.session_mode_label(SessionMode::Stateful), "standard");
1581
1582 let proxy = NntpProxy::new_sync(config, RoutingMode::Hybrid).unwrap();
1584 assert_eq!(proxy.session_mode_label(SessionMode::Stateful), "hybrid");
1585 }
1586
1587 #[test]
1588 fn test_finalize_stateful_session_success() {
1589 let config = create_test_config();
1590 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1591
1592 let client_addr =
1593 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1594 let session = proxy.create_session(client_addr, None);
1595 let session_id = proxy.generate_session_id(&session);
1596 let backend_id = crate::types::BackendId::from_index(0);
1597
1598 let metrics = TransferMetrics {
1599 client_to_backend: crate::types::ClientToBackendBytes::new(512),
1600 backend_to_client: crate::types::BackendToClientBytes::new(1024),
1601 };
1602
1603 let result = proxy.finalize_stateful_session(
1604 Ok(metrics),
1605 client_addr,
1606 &session_id,
1607 &session,
1608 backend_id,
1609 );
1610
1611 assert!(result.is_ok());
1612 }
1613
1614 #[test]
1615 fn test_finalize_stateful_session_error() {
1616 let config = create_test_config();
1617 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap());
1618
1619 let client_addr =
1620 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1621 let session = proxy.create_session(client_addr, None);
1622 let session_id = proxy.generate_session_id(&session);
1623 let backend_id = crate::types::BackendId::from_index(0);
1624
1625 let result = proxy.finalize_stateful_session(
1626 Err(anyhow::anyhow!("connection error")),
1627 client_addr,
1628 &session_id,
1629 &session,
1630 backend_id,
1631 );
1632
1633 assert!(result.is_err());
1634 }
1635
1636 #[test]
1637 fn test_finalize_per_command_session_success() {
1638 let config = create_test_config();
1639 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap());
1640
1641 let client_addr =
1642 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1643 let session = proxy.create_session(client_addr, Some(proxy.router.clone()));
1644 let session_id = proxy.generate_session_id(&session);
1645
1646 let metrics = TransferMetrics {
1647 client_to_backend: crate::types::ClientToBackendBytes::new(256),
1648 backend_to_client: crate::types::BackendToClientBytes::new(512),
1649 };
1650
1651 let result =
1652 proxy.finalize_per_command_session(Ok(metrics), client_addr, &session_id, &session);
1653
1654 assert!(result.is_ok());
1655 }
1656
1657 #[test]
1658 fn test_finalize_per_command_session_error() {
1659 let config = create_test_config();
1660 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap());
1661
1662 let client_addr =
1663 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1664 let session = proxy.create_session(client_addr, Some(proxy.router.clone()));
1665 let session_id = proxy.generate_session_id(&session);
1666
1667 let result = proxy.finalize_per_command_session(
1668 Err(anyhow::anyhow!("session failed")),
1669 client_addr,
1670 &session_id,
1671 &session,
1672 );
1673
1674 assert!(result.is_err());
1675 }
1676
1677 #[test]
1678 fn test_record_session_metrics_without_backend() {
1679 let config = create_test_config();
1680 let proxy = Arc::new(NntpProxy::new_sync(config, RoutingMode::PerCommand).unwrap());
1681
1682 let client_addr =
1683 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1684 let session = proxy.create_session(client_addr, Some(proxy.router.clone()));
1685 let session_id = proxy.generate_session_id(&session);
1686
1687 let metrics = TransferMetrics {
1688 client_to_backend: crate::types::ClientToBackendBytes::new(128),
1689 backend_to_client: crate::types::BackendToClientBytes::new(256),
1690 };
1691
1692 let result =
1694 proxy.record_session_metrics(Ok(metrics), client_addr, &session_id, &session, None);
1695
1696 assert!(result.is_ok());
1697 }
1698
1699 #[test]
1700 fn test_generate_session_id_uniqueness() {
1701 let config = create_test_config();
1702 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1703
1704 let client_addr =
1705 ClientAddress::from("127.0.0.1:12345".parse::<std::net::SocketAddr>().unwrap());
1706
1707 let session1 = proxy.create_session(client_addr, None);
1708 let session2 = proxy.create_session(client_addr, None);
1709
1710 let id1 = proxy.generate_session_id(&session1);
1711 let id2 = proxy.generate_session_id(&session2);
1712
1713 assert_ne!(id1, id2);
1715 }
1716 }
1717
1718 mod idle_tracking {
1719 use super::*;
1720 use std::time::Duration;
1721
1722 #[test]
1723 fn test_idle_timeout_constant() {
1724 assert_eq!(NntpProxy::IDLE_TIMEOUT, Duration::from_secs(5 * 60));
1726 }
1727
1728 #[test]
1729 fn test_active_clients_increment_decrement() {
1730 let config = create_test_config();
1731 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1732
1733 assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 0);
1734
1735 proxy.increment_active_clients();
1736 assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 1);
1737
1738 proxy.increment_active_clients();
1739 assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 2);
1740
1741 proxy.decrement_active_clients();
1742 assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 1);
1743
1744 proxy.decrement_active_clients();
1745 assert_eq!(proxy.active_clients.load(Ordering::Relaxed), 0);
1746 }
1747
1748 #[test]
1749 fn test_last_activity_updated_on_last_client_disconnect() {
1750 let config = create_test_config();
1751 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1752
1753 assert_eq!(proxy.last_activity_nanos.load(Ordering::Relaxed), 0);
1755
1756 proxy.increment_active_clients();
1758 proxy.increment_active_clients();
1759
1760 proxy.decrement_active_clients();
1762 assert_eq!(proxy.last_activity_nanos.load(Ordering::Relaxed), 0);
1763
1764 proxy.decrement_active_clients();
1766 assert!(proxy.last_activity_nanos.load(Ordering::Relaxed) > 0);
1767 }
1768
1769 #[test]
1770 fn test_check_and_clear_skips_when_clients_active() {
1771 let config = create_test_config();
1772 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1773
1774 proxy.last_activity_nanos.store(1, Ordering::Relaxed);
1776
1777 proxy.increment_active_clients();
1779 let cleared = proxy.check_and_clear_stale_pools();
1780 assert!(!cleared);
1781 }
1782
1783 #[test]
1784 fn test_check_and_clear_skips_when_never_active() {
1785 let config = create_test_config();
1786 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1787
1788 let cleared = proxy.check_and_clear_stale_pools();
1790 assert!(!cleared);
1791 }
1792
1793 #[test]
1794 fn test_check_and_clear_skips_when_recently_active() {
1795 let config = create_test_config();
1796 let proxy = NntpProxy::new_sync(config, RoutingMode::Stateful).unwrap();
1797
1798 proxy.increment_active_clients();
1800 proxy.decrement_active_clients();
1801
1802 let cleared = proxy.check_and_clear_stale_pools();
1804 assert!(!cleared);
1805 }
1806 }
1807}