Skip to main content

nntp_proxy/
runtime.rs

1//! Tokio runtime configuration and common utilities for binary targets
2//!
3//! This module provides:
4//! - Testable runtime configuration and builder logic
5//! - Shared utilities used across multiple binary targets to reduce duplication
6//! - Shutdown signal handling
7
8use crate::types::ThreadCount;
9use anyhow::Result;
10
11/// Runtime configuration
12#[derive(Debug, Clone)]
13pub struct RuntimeConfig {
14    /// Number of worker threads
15    worker_threads: usize,
16    /// Whether to enable CPU pinning (Linux only)
17    enable_cpu_pinning: bool,
18}
19
20impl RuntimeConfig {
21    /// Create runtime config from optional thread count
22    ///
23    /// If `threads` is None, defaults to 1 thread.
24    /// If `threads` is Some(ThreadCount(0)), uses number of CPU cores.
25    /// Single-threaded runtime is used if threads == 1.
26    #[must_use]
27    pub fn from_args(threads: Option<ThreadCount>) -> Self {
28        let worker_threads = threads.map(|t| t.get()).unwrap_or(1);
29
30        Self {
31            worker_threads,
32            enable_cpu_pinning: true,
33        }
34    }
35
36    /// Disable CPU pinning
37    #[must_use]
38    pub fn without_cpu_pinning(mut self) -> Self {
39        self.enable_cpu_pinning = false;
40        self
41    }
42
43    /// Get number of worker threads
44    #[must_use]
45    pub const fn worker_threads(&self) -> usize {
46        self.worker_threads
47    }
48
49    /// Check if single-threaded
50    #[must_use]
51    pub const fn is_single_threaded(&self) -> bool {
52        self.worker_threads == 1
53    }
54
55    /// Build the tokio runtime
56    ///
57    /// Creates either a current-thread or multi-threaded runtime based on
58    /// the configured worker thread count. Applies CPU pinning if enabled.
59    ///
60    /// # Errors
61    /// Returns error if runtime creation fails or CPU pinning fails
62    pub fn build_runtime(self) -> Result<tokio::runtime::Runtime> {
63        let rt = if self.is_single_threaded() {
64            tracing::info!("Starting NNTP proxy with single-threaded runtime");
65            tokio::runtime::Builder::new_current_thread()
66                .enable_all()
67                .build()?
68        } else {
69            let num_cpus = std::thread::available_parallelism()
70                .map(|p| p.get())
71                .unwrap_or(1);
72            tracing::info!(
73                "Starting NNTP proxy with {} worker threads (detected {} CPUs)",
74                self.worker_threads,
75                num_cpus
76            );
77            tokio::runtime::Builder::new_multi_thread()
78                .worker_threads(self.worker_threads)
79                .enable_all()
80                .build()?
81        };
82
83        if self.enable_cpu_pinning {
84            pin_to_cpu_cores(self.worker_threads)?;
85        }
86
87        Ok(rt)
88    }
89}
90
91impl Default for RuntimeConfig {
92    fn default() -> Self {
93        Self::from_args(None)
94    }
95}
96
97/// Pin current process to specific CPU cores for optimal performance
98///
99/// This is a best-effort operation - failures are logged but not fatal.
100///
101/// # Arguments
102/// * `num_cores` - Number of CPU cores to pin to (0..num_cores)
103#[cfg(target_os = "linux")]
104fn pin_to_cpu_cores(num_cores: usize) -> Result<()> {
105    use nix::sched::{CpuSet, sched_setaffinity};
106    use nix::unistd::Pid;
107
108    let mut cpu_set = CpuSet::new();
109    for core in 0..num_cores {
110        let _ = cpu_set.set(core);
111    }
112
113    match sched_setaffinity(Pid::from_raw(0), &cpu_set) {
114        Ok(()) => {
115            tracing::info!(
116                "Successfully pinned process to {} CPU cores for optimal performance",
117                num_cores
118            );
119            Ok(())
120        }
121        Err(e) => {
122            tracing::warn!(
123                "Failed to set CPU affinity: {}, continuing without pinning",
124                e
125            );
126            Ok(()) // Non-fatal
127        }
128    }
129}
130
131#[cfg(not(target_os = "linux"))]
132fn pin_to_cpu_cores(_num_cores: usize) -> Result<()> {
133    tracing::info!("CPU pinning not available on this platform");
134    Ok(())
135}
136
137/// Wait for shutdown signal (Ctrl+C or SIGTERM on Unix)
138///
139/// This is a common utility for all binary targets.
140pub async fn shutdown_signal() {
141    let ctrl_c = async {
142        tokio::signal::ctrl_c()
143            .await
144            .expect("Failed to install Ctrl+C handler");
145    };
146
147    #[cfg(unix)]
148    let terminate = async {
149        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
150            .expect("Failed to install signal handler")
151            .recv()
152            .await;
153    };
154
155    #[cfg(not(unix))]
156    let terminate = std::future::pending::<()>();
157
158    tokio::select! {
159        _ = ctrl_c => {},
160        _ = terminate => {},
161    }
162}
163
164// ============================================================================
165// Binary Utilities - Shared code across nntp-proxy, nntp-proxy-tui, etc.
166// ============================================================================
167
168/// Load configuration and log server information
169///
170/// Common pattern across all binary targets - load config and display backends.
171///
172/// # Errors
173/// Returns error if configuration loading fails
174pub fn load_and_log_config(
175    config_path: &str,
176) -> Result<(crate::config::Config, crate::config::ConfigSource)> {
177    use crate::load_config_with_fallback;
178    use tracing::info;
179
180    let (config, source) = load_config_with_fallback(config_path)?;
181
182    info!("Loaded configuration from {}", source.description());
183    info!("Loaded {} backend servers:", config.servers.len());
184    for server in &config.servers {
185        info!("  - {} ({}:{})", server.name, server.host, server.port);
186    }
187
188    Ok((config, source))
189}
190
191/// Extract listen address from CLI args or config
192///
193/// Prefers CLI args over config values.
194#[must_use]
195pub fn resolve_listen_address(
196    host_arg: Option<&str>,
197    port_arg: Option<crate::types::Port>,
198    config: &crate::config::Config,
199) -> (String, crate::types::Port) {
200    let host = host_arg
201        .map(String::from)
202        .unwrap_or_else(|| config.proxy.host.clone());
203    let port = port_arg.unwrap_or(config.proxy.port);
204    (host, port)
205}
206
207/// Bind TCP listener and log startup information
208///
209/// # Errors
210/// Returns error if binding fails
211pub async fn bind_listener(
212    host: &str,
213    port: crate::types::Port,
214    routing_mode: crate::RoutingMode,
215) -> Result<tokio::net::TcpListener> {
216    use tracing::info;
217
218    let listen_addr = format!("{}:{}", host, port.get());
219    let listener = tokio::net::TcpListener::bind(&listen_addr).await?;
220
221    info!("NNTP proxy listening on {} ({})", listen_addr, routing_mode);
222
223    Ok(listener)
224}
225
226/// Spawn background task to prewarm connection pools
227///
228/// Returns immediately without blocking. Logs errors but doesn't fail.
229pub fn spawn_connection_prewarming(proxy: &std::sync::Arc<crate::NntpProxy>) {
230    use std::sync::Arc;
231    use tracing::{info, warn};
232
233    let proxy = Arc::clone(proxy);
234    tokio::spawn(async move {
235        info!("Prewarming connection pools...");
236        if let Err(e) = proxy.prewarm_connections().await {
237            warn!("Failed to prewarm connection pools: {}", e);
238            return;
239        }
240        info!("Connection pools ready");
241    });
242}
243
244/// Spawn background task to periodically flush connection stats
245///
246/// Flushes every 30 seconds to ensure metrics are up-to-date.
247pub fn spawn_stats_flusher(stats: &crate::metrics::ConnectionStatsAggregator) {
248    let stats = stats.clone();
249    tokio::spawn(async move {
250        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
251        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
252
253        loop {
254            interval.tick().await;
255            stats.flush();
256        }
257    });
258}
259
260/// Spawn background task to periodically log cache statistics
261///
262/// Only spawns if cache is enabled AND debug logging is enabled.
263/// Logs every 60 seconds.
264pub fn spawn_cache_stats_logger(proxy: &std::sync::Arc<crate::NntpProxy>) {
265    use std::sync::Arc;
266    use tracing::debug;
267
268    // Only spawn if debug logging is enabled
269    if !tracing::enabled!(tracing::Level::DEBUG) {
270        return;
271    }
272
273    // Cache is always present now
274    let cache = Arc::clone(proxy.cache());
275    tokio::spawn(async move {
276        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
277        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
278
279        loop {
280            interval.tick().await;
281            let entries = cache.entry_count();
282            let size_bytes = cache.weighted_size();
283            let hit_rate = cache.hit_rate();
284            debug!(
285                "Cache stats: entries={}, size={} ({:.1}% hit rate)",
286                entries,
287                crate::formatting::format_bytes(size_bytes),
288                hit_rate
289            );
290        }
291    });
292}
293
294/// Spawn graceful shutdown handler
295///
296/// Waits for shutdown signal, then:
297/// 1. Sends shutdown notification via channel
298/// 2. Calls graceful_shutdown() on proxy
299///
300/// Returns the shutdown receiver channel.
301#[must_use]
302pub fn spawn_shutdown_handler(
303    proxy: &std::sync::Arc<crate::NntpProxy>,
304) -> tokio::sync::mpsc::Receiver<()> {
305    use std::sync::Arc;
306    use tracing::info;
307
308    let (shutdown_tx, shutdown_rx) = tokio::sync::mpsc::channel::<()>(1);
309    let proxy = Arc::clone(proxy);
310
311    tokio::spawn(async move {
312        shutdown_signal().await;
313        info!("Shutdown signal received");
314
315        // Notify listeners
316        let _ = shutdown_tx.send(()).await;
317
318        // Close idle connections
319        proxy.graceful_shutdown().await;
320        info!("Graceful shutdown complete");
321    });
322
323    shutdown_rx
324}
325
326/// Run the main accept loop for client connections
327///
328/// Accepts connections and spawns a task for each based on routing mode.
329/// Exits when shutdown signal is received.
330///
331/// # Errors
332/// Returns error if listener.accept() fails
333pub async fn run_accept_loop(
334    proxy: std::sync::Arc<crate::NntpProxy>,
335    listener: tokio::net::TcpListener,
336    mut shutdown_rx: tokio::sync::mpsc::Receiver<()>,
337    routing_mode: crate::RoutingMode,
338) -> Result<()> {
339    use tracing::{error, info};
340
341    let uses_per_command = routing_mode.supports_per_command_routing();
342
343    loop {
344        tokio::select! {
345            _ = shutdown_rx.recv() => {
346                info!("Shutdown initiated, stopping accept loop");
347                break;
348            }
349
350            accept_result = listener.accept() => {
351                let (stream, addr) = accept_result?;
352                let proxy = proxy.clone();
353
354                tokio::spawn(async move {
355                    let result = if uses_per_command {
356                        proxy.handle_client_per_command_routing(stream, addr.into()).await
357                    } else {
358                        proxy.handle_client(stream, addr.into()).await
359                    };
360
361                    if let Err(e) = result {
362                        // Only log non-client-disconnect errors (avoid duplicate logging)
363                        // Client disconnects are already handled gracefully in session handlers
364                        if !crate::is_client_disconnect_error(&e) {
365                            error!("Error handling client {}: {:?}", addr, e);
366                        }
367                    }
368                });
369            }
370        }
371    }
372
373    proxy.graceful_shutdown().await;
374    info!("Proxy shutdown complete");
375
376    Ok(())
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn test_runtime_config_from_args_default() {
385        let config = RuntimeConfig::from_args(None);
386
387        // Should default to single-threaded (1 thread)
388        assert_eq!(config.worker_threads(), 1);
389        assert!(config.is_single_threaded());
390        assert!(config.enable_cpu_pinning); // CPU pinning helps even with 1 thread
391    }
392
393    #[test]
394    fn test_runtime_config_from_args_explicit() {
395        let thread_count = ThreadCount::new(4).unwrap();
396        let config = RuntimeConfig::from_args(Some(thread_count));
397
398        assert_eq!(config.worker_threads(), 4);
399        assert!(!config.is_single_threaded());
400    }
401
402    #[test]
403    fn test_runtime_config_single_threaded() {
404        let thread_count = ThreadCount::new(1).unwrap();
405        let config = RuntimeConfig::from_args(Some(thread_count));
406
407        assert_eq!(config.worker_threads(), 1);
408        assert!(config.is_single_threaded());
409    }
410
411    #[test]
412    fn test_runtime_config_multi_threaded() {
413        let thread_count = ThreadCount::new(8).unwrap();
414        let config = RuntimeConfig::from_args(Some(thread_count));
415
416        assert_eq!(config.worker_threads(), 8);
417        assert!(config.enable_cpu_pinning);
418    }
419
420    #[test]
421    fn test_runtime_config_without_cpu_pinning() {
422        let thread_count = ThreadCount::new(4).unwrap();
423        let config = RuntimeConfig::from_args(Some(thread_count)).without_cpu_pinning();
424
425        assert_eq!(config.worker_threads(), 4);
426        assert!(!config.enable_cpu_pinning);
427    }
428
429    #[test]
430    fn test_runtime_config_default() {
431        let config = RuntimeConfig::default();
432
433        // Should match from_args(None)
434        let expected = RuntimeConfig::from_args(None);
435        assert_eq!(config.worker_threads(), expected.worker_threads());
436    }
437
438    #[test]
439    fn test_pin_to_cpu_cores_non_fatal() {
440        // Should not panic even if pinning fails
441        let result = pin_to_cpu_cores(1);
442        assert!(result.is_ok());
443    }
444
445    // Edge case tests
446
447    #[test]
448    fn test_runtime_config_zero_threads_auto() {
449        // ThreadCount(0) means auto-detect CPUs
450        // We can't test ThreadCount::new(0) directly as it returns None,
451        // but we can test the from_args behavior
452        let config = RuntimeConfig::from_args(None);
453        assert_eq!(config.worker_threads(), 1); // Defaults to 1
454    }
455
456    #[test]
457    fn test_runtime_config_large_thread_count() {
458        let thread_count = ThreadCount::new(128).unwrap();
459        let config = RuntimeConfig::from_args(Some(thread_count));
460
461        assert_eq!(config.worker_threads(), 128);
462        assert!(!config.is_single_threaded());
463    }
464
465    #[test]
466    fn test_runtime_config_clone() {
467        let config = RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap()));
468        let cloned = config.clone();
469
470        assert_eq!(cloned.worker_threads(), config.worker_threads());
471        assert_eq!(cloned.enable_cpu_pinning, config.enable_cpu_pinning);
472    }
473
474    #[test]
475    fn test_runtime_config_debug() {
476        let config = RuntimeConfig::from_args(Some(ThreadCount::new(2).unwrap()));
477        let debug_str = format!("{:?}", config);
478
479        assert!(debug_str.contains("RuntimeConfig"));
480        assert!(debug_str.contains("worker_threads"));
481        assert!(debug_str.contains("enable_cpu_pinning"));
482    }
483
484    // Builder pattern tests
485
486    #[test]
487    fn test_runtime_config_builder_chaining() {
488        let config =
489            RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap())).without_cpu_pinning();
490
491        assert_eq!(config.worker_threads(), 4);
492        assert!(!config.enable_cpu_pinning);
493    }
494
495    #[test]
496    fn test_runtime_config_builder_multiple_calls() {
497        let config = RuntimeConfig::from_args(Some(ThreadCount::new(8).unwrap()))
498            .without_cpu_pinning()
499            .without_cpu_pinning(); // Idempotent
500
501        assert!(!config.enable_cpu_pinning);
502    }
503
504    // Getter tests
505
506    #[test]
507    fn test_worker_threads_getter() {
508        let config = RuntimeConfig::from_args(Some(ThreadCount::new(6).unwrap()));
509        assert_eq!(config.worker_threads(), 6);
510    }
511
512    #[test]
513    fn test_is_single_threaded_true() {
514        let config = RuntimeConfig::from_args(Some(ThreadCount::new(1).unwrap()));
515        assert!(config.is_single_threaded());
516    }
517
518    #[test]
519    fn test_is_single_threaded_false() {
520        let config = RuntimeConfig::from_args(Some(ThreadCount::new(2).unwrap()));
521        assert!(!config.is_single_threaded());
522    }
523
524    #[test]
525    fn test_is_single_threaded_false_for_large() {
526        let config = RuntimeConfig::from_args(Some(ThreadCount::new(100).unwrap()));
527        assert!(!config.is_single_threaded());
528    }
529
530    // CPU pinning platform-specific tests
531
532    #[test]
533    #[cfg(target_os = "linux")]
534    fn test_pin_to_cpu_cores_linux_single_core() {
535        let result = pin_to_cpu_cores(1);
536        assert!(result.is_ok());
537    }
538
539    #[test]
540    #[cfg(target_os = "linux")]
541    fn test_pin_to_cpu_cores_linux_multi_core() {
542        let result = pin_to_cpu_cores(4);
543        assert!(result.is_ok());
544    }
545
546    #[test]
547    #[cfg(target_os = "linux")]
548    fn test_pin_to_cpu_cores_linux_zero_cores() {
549        // Edge case: 0 cores should still succeed (no-op)
550        let result = pin_to_cpu_cores(0);
551        assert!(result.is_ok());
552    }
553
554    #[test]
555    #[cfg(target_os = "linux")]
556    fn test_pin_to_cpu_cores_linux_many_cores() {
557        // Try to pin to more cores than available - should not panic
558        let result = pin_to_cpu_cores(1024);
559        assert!(result.is_ok()); // Best-effort, won't fail
560    }
561
562    #[test]
563    #[cfg(not(target_os = "linux"))]
564    fn test_pin_to_cpu_cores_non_linux() {
565        // Non-Linux platforms should gracefully no-op
566        let result = pin_to_cpu_cores(4);
567        assert!(result.is_ok());
568    }
569
570    // Default implementation tests
571
572    #[test]
573    fn test_default_matches_from_args_none() {
574        let default_config = RuntimeConfig::default();
575        let explicit_config = RuntimeConfig::from_args(None);
576
577        assert_eq!(
578            default_config.worker_threads(),
579            explicit_config.worker_threads()
580        );
581        assert_eq!(
582            default_config.enable_cpu_pinning,
583            explicit_config.enable_cpu_pinning
584        );
585    }
586
587    #[test]
588    fn test_default_is_single_threaded() {
589        let config = RuntimeConfig::default();
590        assert!(config.is_single_threaded());
591    }
592
593    #[test]
594    fn test_default_has_cpu_pinning_enabled() {
595        let config = RuntimeConfig::default();
596        assert!(config.enable_cpu_pinning);
597    }
598
599    // Configuration combination tests
600
601    #[test]
602    fn test_config_single_threaded_with_pinning() {
603        let config = RuntimeConfig::from_args(Some(ThreadCount::new(1).unwrap()));
604        assert!(config.is_single_threaded());
605        assert!(config.enable_cpu_pinning);
606    }
607
608    #[test]
609    fn test_config_single_threaded_without_pinning() {
610        let config =
611            RuntimeConfig::from_args(Some(ThreadCount::new(1).unwrap())).without_cpu_pinning();
612        assert!(config.is_single_threaded());
613        assert!(!config.enable_cpu_pinning);
614    }
615
616    #[test]
617    fn test_config_multi_threaded_with_pinning() {
618        let config = RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap()));
619        assert!(!config.is_single_threaded());
620        assert!(config.enable_cpu_pinning);
621    }
622
623    #[test]
624    fn test_config_multi_threaded_without_pinning() {
625        let config =
626            RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap())).without_cpu_pinning();
627        assert!(!config.is_single_threaded());
628        assert!(!config.enable_cpu_pinning);
629    }
630}