nntp_proxy/
runtime.rs

1//! Tokio runtime configuration and builder
2//!
3//! This module provides testable runtime configuration and builder logic,
4//! extracted from the binary for better separation of concerns.
5
6use crate::types::ThreadCount;
7use anyhow::Result;
8
9/// Runtime configuration
10#[derive(Debug, Clone)]
11pub struct RuntimeConfig {
12    /// Number of worker threads
13    worker_threads: usize,
14    /// Whether to enable CPU pinning (Linux only)
15    enable_cpu_pinning: bool,
16}
17
18impl RuntimeConfig {
19    /// Create runtime config from optional thread count
20    ///
21    /// If `threads` is None, defaults to 1 thread.
22    /// If `threads` is Some(ThreadCount(0)), uses number of CPU cores.
23    /// Single-threaded runtime is used if threads == 1.
24    #[must_use]
25    pub fn from_args(threads: Option<ThreadCount>) -> Self {
26        let worker_threads = threads.map(|t| t.get()).unwrap_or(1);
27
28        Self {
29            worker_threads,
30            enable_cpu_pinning: true,
31        }
32    }
33
34    /// Disable CPU pinning
35    #[must_use]
36    pub fn without_cpu_pinning(mut self) -> Self {
37        self.enable_cpu_pinning = false;
38        self
39    }
40
41    /// Get number of worker threads
42    #[must_use]
43    pub const fn worker_threads(&self) -> usize {
44        self.worker_threads
45    }
46
47    /// Check if single-threaded
48    #[must_use]
49    pub const fn is_single_threaded(&self) -> bool {
50        self.worker_threads == 1
51    }
52
53    /// Build the tokio runtime
54    ///
55    /// Creates either a current-thread or multi-threaded runtime based on
56    /// the configured worker thread count. Applies CPU pinning if enabled.
57    ///
58    /// # Errors
59    /// Returns error if runtime creation fails or CPU pinning fails
60    pub fn build_runtime(self) -> Result<tokio::runtime::Runtime> {
61        let rt = if self.is_single_threaded() {
62            tracing::info!("Starting NNTP proxy with single-threaded runtime");
63            tokio::runtime::Builder::new_current_thread()
64                .enable_all()
65                .build()?
66        } else {
67            let num_cpus = std::thread::available_parallelism()
68                .map(|p| p.get())
69                .unwrap_or(1);
70            tracing::info!(
71                "Starting NNTP proxy with {} worker threads (detected {} CPUs)",
72                self.worker_threads,
73                num_cpus
74            );
75            tokio::runtime::Builder::new_multi_thread()
76                .worker_threads(self.worker_threads)
77                .enable_all()
78                .build()?
79        };
80
81        if self.enable_cpu_pinning {
82            pin_to_cpu_cores(self.worker_threads)?;
83        }
84
85        Ok(rt)
86    }
87}
88
89impl Default for RuntimeConfig {
90    fn default() -> Self {
91        Self::from_args(None)
92    }
93}
94
95/// Pin current process to specific CPU cores for optimal performance
96///
97/// This is a best-effort operation - failures are logged but not fatal.
98///
99/// # Arguments
100/// * `num_cores` - Number of CPU cores to pin to (0..num_cores)
101#[cfg(target_os = "linux")]
102fn pin_to_cpu_cores(num_cores: usize) -> Result<()> {
103    use nix::sched::{CpuSet, sched_setaffinity};
104    use nix::unistd::Pid;
105
106    let mut cpu_set = CpuSet::new();
107    for core in 0..num_cores {
108        let _ = cpu_set.set(core);
109    }
110
111    match sched_setaffinity(Pid::from_raw(0), &cpu_set) {
112        Ok(()) => {
113            tracing::info!(
114                "Successfully pinned process to {} CPU cores for optimal performance",
115                num_cores
116            );
117            Ok(())
118        }
119        Err(e) => {
120            tracing::warn!(
121                "Failed to set CPU affinity: {}, continuing without pinning",
122                e
123            );
124            Ok(()) // Non-fatal
125        }
126    }
127}
128
129#[cfg(not(target_os = "linux"))]
130fn pin_to_cpu_cores(_num_cores: usize) -> Result<()> {
131    tracing::info!("CPU pinning not available on this platform");
132    Ok(())
133}
134
135/// Wait for shutdown signal (Ctrl+C or SIGTERM on Unix)
136///
137/// This is a common utility for all binary targets.
138pub async fn shutdown_signal() {
139    let ctrl_c = async {
140        tokio::signal::ctrl_c()
141            .await
142            .expect("Failed to install Ctrl+C handler");
143    };
144
145    #[cfg(unix)]
146    let terminate = async {
147        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
148            .expect("Failed to install signal handler")
149            .recv()
150            .await;
151    };
152
153    #[cfg(not(unix))]
154    let terminate = std::future::pending::<()>();
155
156    tokio::select! {
157        _ = ctrl_c => {},
158        _ = terminate => {},
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn test_runtime_config_from_args_default() {
168        let config = RuntimeConfig::from_args(None);
169
170        // Should default to single-threaded (1 thread)
171        assert_eq!(config.worker_threads(), 1);
172        assert!(config.is_single_threaded());
173        assert!(config.enable_cpu_pinning); // CPU pinning helps even with 1 thread
174    }
175
176    #[test]
177    fn test_runtime_config_from_args_explicit() {
178        let thread_count = ThreadCount::new(4).unwrap();
179        let config = RuntimeConfig::from_args(Some(thread_count));
180
181        assert_eq!(config.worker_threads(), 4);
182        assert!(!config.is_single_threaded());
183    }
184
185    #[test]
186    fn test_runtime_config_single_threaded() {
187        let thread_count = ThreadCount::new(1).unwrap();
188        let config = RuntimeConfig::from_args(Some(thread_count));
189
190        assert_eq!(config.worker_threads(), 1);
191        assert!(config.is_single_threaded());
192    }
193
194    #[test]
195    fn test_runtime_config_multi_threaded() {
196        let thread_count = ThreadCount::new(8).unwrap();
197        let config = RuntimeConfig::from_args(Some(thread_count));
198
199        assert_eq!(config.worker_threads(), 8);
200        assert!(config.enable_cpu_pinning);
201    }
202
203    #[test]
204    fn test_runtime_config_without_cpu_pinning() {
205        let thread_count = ThreadCount::new(4).unwrap();
206        let config = RuntimeConfig::from_args(Some(thread_count)).without_cpu_pinning();
207
208        assert_eq!(config.worker_threads(), 4);
209        assert!(!config.enable_cpu_pinning);
210    }
211
212    #[test]
213    fn test_runtime_config_default() {
214        let config = RuntimeConfig::default();
215
216        // Should match from_args(None)
217        let expected = RuntimeConfig::from_args(None);
218        assert_eq!(config.worker_threads(), expected.worker_threads());
219    }
220
221    #[test]
222    fn test_pin_to_cpu_cores_non_fatal() {
223        // Should not panic even if pinning fails
224        let result = pin_to_cpu_cores(1);
225        assert!(result.is_ok());
226    }
227
228    // Edge case tests
229
230    #[test]
231    fn test_runtime_config_zero_threads_auto() {
232        // ThreadCount(0) means auto-detect CPUs
233        // We can't test ThreadCount::new(0) directly as it returns None,
234        // but we can test the from_args behavior
235        let config = RuntimeConfig::from_args(None);
236        assert_eq!(config.worker_threads(), 1); // Defaults to 1
237    }
238
239    #[test]
240    fn test_runtime_config_large_thread_count() {
241        let thread_count = ThreadCount::new(128).unwrap();
242        let config = RuntimeConfig::from_args(Some(thread_count));
243
244        assert_eq!(config.worker_threads(), 128);
245        assert!(!config.is_single_threaded());
246    }
247
248    #[test]
249    fn test_runtime_config_clone() {
250        let config = RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap()));
251        let cloned = config.clone();
252
253        assert_eq!(cloned.worker_threads(), config.worker_threads());
254        assert_eq!(cloned.enable_cpu_pinning, config.enable_cpu_pinning);
255    }
256
257    #[test]
258    fn test_runtime_config_debug() {
259        let config = RuntimeConfig::from_args(Some(ThreadCount::new(2).unwrap()));
260        let debug_str = format!("{:?}", config);
261
262        assert!(debug_str.contains("RuntimeConfig"));
263        assert!(debug_str.contains("worker_threads"));
264        assert!(debug_str.contains("enable_cpu_pinning"));
265    }
266
267    // Builder pattern tests
268
269    #[test]
270    fn test_runtime_config_builder_chaining() {
271        let config =
272            RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap())).without_cpu_pinning();
273
274        assert_eq!(config.worker_threads(), 4);
275        assert!(!config.enable_cpu_pinning);
276    }
277
278    #[test]
279    fn test_runtime_config_builder_multiple_calls() {
280        let config = RuntimeConfig::from_args(Some(ThreadCount::new(8).unwrap()))
281            .without_cpu_pinning()
282            .without_cpu_pinning(); // Idempotent
283
284        assert!(!config.enable_cpu_pinning);
285    }
286
287    // Getter tests
288
289    #[test]
290    fn test_worker_threads_getter() {
291        let config = RuntimeConfig::from_args(Some(ThreadCount::new(6).unwrap()));
292        assert_eq!(config.worker_threads(), 6);
293    }
294
295    #[test]
296    fn test_is_single_threaded_true() {
297        let config = RuntimeConfig::from_args(Some(ThreadCount::new(1).unwrap()));
298        assert!(config.is_single_threaded());
299    }
300
301    #[test]
302    fn test_is_single_threaded_false() {
303        let config = RuntimeConfig::from_args(Some(ThreadCount::new(2).unwrap()));
304        assert!(!config.is_single_threaded());
305    }
306
307    #[test]
308    fn test_is_single_threaded_false_for_large() {
309        let config = RuntimeConfig::from_args(Some(ThreadCount::new(100).unwrap()));
310        assert!(!config.is_single_threaded());
311    }
312
313    // CPU pinning platform-specific tests
314
315    #[test]
316    #[cfg(target_os = "linux")]
317    fn test_pin_to_cpu_cores_linux_single_core() {
318        let result = pin_to_cpu_cores(1);
319        assert!(result.is_ok());
320    }
321
322    #[test]
323    #[cfg(target_os = "linux")]
324    fn test_pin_to_cpu_cores_linux_multi_core() {
325        let result = pin_to_cpu_cores(4);
326        assert!(result.is_ok());
327    }
328
329    #[test]
330    #[cfg(target_os = "linux")]
331    fn test_pin_to_cpu_cores_linux_zero_cores() {
332        // Edge case: 0 cores should still succeed (no-op)
333        let result = pin_to_cpu_cores(0);
334        assert!(result.is_ok());
335    }
336
337    #[test]
338    #[cfg(target_os = "linux")]
339    fn test_pin_to_cpu_cores_linux_many_cores() {
340        // Try to pin to more cores than available - should not panic
341        let result = pin_to_cpu_cores(1024);
342        assert!(result.is_ok()); // Best-effort, won't fail
343    }
344
345    #[test]
346    #[cfg(not(target_os = "linux"))]
347    fn test_pin_to_cpu_cores_non_linux() {
348        // Non-Linux platforms should gracefully no-op
349        let result = pin_to_cpu_cores(4);
350        assert!(result.is_ok());
351    }
352
353    // Default implementation tests
354
355    #[test]
356    fn test_default_matches_from_args_none() {
357        let default_config = RuntimeConfig::default();
358        let explicit_config = RuntimeConfig::from_args(None);
359
360        assert_eq!(
361            default_config.worker_threads(),
362            explicit_config.worker_threads()
363        );
364        assert_eq!(
365            default_config.enable_cpu_pinning,
366            explicit_config.enable_cpu_pinning
367        );
368    }
369
370    #[test]
371    fn test_default_is_single_threaded() {
372        let config = RuntimeConfig::default();
373        assert!(config.is_single_threaded());
374    }
375
376    #[test]
377    fn test_default_has_cpu_pinning_enabled() {
378        let config = RuntimeConfig::default();
379        assert!(config.enable_cpu_pinning);
380    }
381
382    // Configuration combination tests
383
384    #[test]
385    fn test_config_single_threaded_with_pinning() {
386        let config = RuntimeConfig::from_args(Some(ThreadCount::new(1).unwrap()));
387        assert!(config.is_single_threaded());
388        assert!(config.enable_cpu_pinning);
389    }
390
391    #[test]
392    fn test_config_single_threaded_without_pinning() {
393        let config =
394            RuntimeConfig::from_args(Some(ThreadCount::new(1).unwrap())).without_cpu_pinning();
395        assert!(config.is_single_threaded());
396        assert!(!config.enable_cpu_pinning);
397    }
398
399    #[test]
400    fn test_config_multi_threaded_with_pinning() {
401        let config = RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap()));
402        assert!(!config.is_single_threaded());
403        assert!(config.enable_cpu_pinning);
404    }
405
406    #[test]
407    fn test_config_multi_threaded_without_pinning() {
408        let config =
409            RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap())).without_cpu_pinning();
410        assert!(!config.is_single_threaded());
411        assert!(!config.enable_cpu_pinning);
412    }
413}