1use crate::types::ThreadCount;
9use anyhow::Result;
10
11#[derive(Debug, Clone)]
13pub struct RuntimeConfig {
14 worker_threads: usize,
16 enable_cpu_pinning: bool,
18}
19
20impl RuntimeConfig {
21 #[must_use]
27 pub fn from_args(threads: Option<ThreadCount>) -> Self {
28 let worker_threads = threads.map(|t| t.get()).unwrap_or(1);
29
30 Self {
31 worker_threads,
32 enable_cpu_pinning: true,
33 }
34 }
35
36 #[must_use]
38 pub fn without_cpu_pinning(mut self) -> Self {
39 self.enable_cpu_pinning = false;
40 self
41 }
42
43 #[must_use]
45 pub const fn worker_threads(&self) -> usize {
46 self.worker_threads
47 }
48
49 #[must_use]
51 pub const fn is_single_threaded(&self) -> bool {
52 self.worker_threads == 1
53 }
54
55 pub fn build_runtime(self) -> Result<tokio::runtime::Runtime> {
63 let rt = if self.is_single_threaded() {
64 tracing::info!("Starting NNTP proxy with single-threaded runtime");
65 tokio::runtime::Builder::new_current_thread()
66 .enable_all()
67 .build()?
68 } else {
69 let num_cpus = std::thread::available_parallelism()
70 .map(|p| p.get())
71 .unwrap_or(1);
72 tracing::info!(
73 "Starting NNTP proxy with {} worker threads (detected {} CPUs)",
74 self.worker_threads,
75 num_cpus
76 );
77 tokio::runtime::Builder::new_multi_thread()
78 .worker_threads(self.worker_threads)
79 .enable_all()
80 .build()?
81 };
82
83 if self.enable_cpu_pinning {
84 pin_to_cpu_cores(self.worker_threads)?;
85 }
86
87 Ok(rt)
88 }
89}
90
91impl Default for RuntimeConfig {
92 fn default() -> Self {
93 Self::from_args(None)
94 }
95}
96
97#[cfg(target_os = "linux")]
104fn pin_to_cpu_cores(num_cores: usize) -> Result<()> {
105 use nix::sched::{CpuSet, sched_setaffinity};
106 use nix::unistd::Pid;
107
108 let mut cpu_set = CpuSet::new();
109 for core in 0..num_cores {
110 let _ = cpu_set.set(core);
111 }
112
113 match sched_setaffinity(Pid::from_raw(0), &cpu_set) {
114 Ok(()) => {
115 tracing::info!(
116 "Successfully pinned process to {} CPU cores for optimal performance",
117 num_cores
118 );
119 Ok(())
120 }
121 Err(e) => {
122 tracing::warn!(
123 "Failed to set CPU affinity: {}, continuing without pinning",
124 e
125 );
126 Ok(()) }
128 }
129}
130
131#[cfg(not(target_os = "linux"))]
132fn pin_to_cpu_cores(_num_cores: usize) -> Result<()> {
133 tracing::info!("CPU pinning not available on this platform");
134 Ok(())
135}
136
137pub async fn shutdown_signal() {
141 let ctrl_c = async {
142 tokio::signal::ctrl_c()
143 .await
144 .expect("Failed to install Ctrl+C handler");
145 };
146
147 #[cfg(unix)]
148 let terminate = async {
149 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
150 .expect("Failed to install signal handler")
151 .recv()
152 .await;
153 };
154
155 #[cfg(not(unix))]
156 let terminate = std::future::pending::<()>();
157
158 tokio::select! {
159 _ = ctrl_c => {},
160 _ = terminate => {},
161 }
162}
163
164pub fn load_and_log_config(
175 config_path: &str,
176) -> Result<(crate::config::Config, crate::config::ConfigSource)> {
177 use crate::load_config_with_fallback;
178 use tracing::info;
179
180 let (config, source) = load_config_with_fallback(config_path)?;
181
182 info!("Loaded configuration from {}", source.description());
183 info!("Loaded {} backend servers:", config.servers.len());
184 for server in &config.servers {
185 info!(" - {} ({}:{})", server.name, server.host, server.port);
186 }
187
188 Ok((config, source))
189}
190
191#[must_use]
195pub fn resolve_listen_address(
196 host_arg: Option<&str>,
197 port_arg: Option<crate::types::Port>,
198 config: &crate::config::Config,
199) -> (String, crate::types::Port) {
200 let host = host_arg
201 .map(String::from)
202 .unwrap_or_else(|| config.proxy.host.clone());
203 let port = port_arg.unwrap_or(config.proxy.port);
204 (host, port)
205}
206
207pub async fn bind_listener(
212 host: &str,
213 port: crate::types::Port,
214 routing_mode: crate::RoutingMode,
215) -> Result<tokio::net::TcpListener> {
216 use tracing::info;
217
218 let listen_addr = format!("{}:{}", host, port.get());
219 let listener = tokio::net::TcpListener::bind(&listen_addr).await?;
220
221 info!("NNTP proxy listening on {} ({})", listen_addr, routing_mode);
222
223 Ok(listener)
224}
225
226pub fn spawn_connection_prewarming(proxy: &std::sync::Arc<crate::NntpProxy>) {
230 use std::sync::Arc;
231 use tracing::{info, warn};
232
233 let proxy = Arc::clone(proxy);
234 tokio::spawn(async move {
235 info!("Prewarming connection pools...");
236 if let Err(e) = proxy.prewarm_connections().await {
237 warn!("Failed to prewarm connection pools: {}", e);
238 return;
239 }
240 info!("Connection pools ready");
241 });
242}
243
244pub fn spawn_stats_flusher(stats: &crate::metrics::ConnectionStatsAggregator) {
248 let stats = stats.clone();
249 tokio::spawn(async move {
250 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
251 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
252
253 loop {
254 interval.tick().await;
255 stats.flush();
256 }
257 });
258}
259
260pub fn spawn_cache_stats_logger(proxy: &std::sync::Arc<crate::NntpProxy>) {
265 use std::sync::Arc;
266 use tracing::debug;
267
268 if !tracing::enabled!(tracing::Level::DEBUG) {
270 return;
271 }
272
273 let cache = Arc::clone(proxy.cache());
275 tokio::spawn(async move {
276 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
277 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
278
279 loop {
280 interval.tick().await;
281 let entries = cache.entry_count();
282 let size_bytes = cache.weighted_size();
283 let hit_rate = cache.hit_rate();
284 debug!(
285 "Cache stats: entries={}, size={} ({:.1}% hit rate)",
286 entries,
287 crate::formatting::format_bytes(size_bytes),
288 hit_rate
289 );
290 }
291 });
292}
293
294#[must_use]
302pub fn spawn_shutdown_handler(
303 proxy: &std::sync::Arc<crate::NntpProxy>,
304) -> tokio::sync::mpsc::Receiver<()> {
305 use std::sync::Arc;
306 use tracing::info;
307
308 let (shutdown_tx, shutdown_rx) = tokio::sync::mpsc::channel::<()>(1);
309 let proxy = Arc::clone(proxy);
310
311 tokio::spawn(async move {
312 shutdown_signal().await;
313 info!("Shutdown signal received");
314
315 let _ = shutdown_tx.send(()).await;
317
318 proxy.graceful_shutdown().await;
320 info!("Graceful shutdown complete");
321 });
322
323 shutdown_rx
324}
325
326pub async fn run_accept_loop(
334 proxy: std::sync::Arc<crate::NntpProxy>,
335 listener: tokio::net::TcpListener,
336 mut shutdown_rx: tokio::sync::mpsc::Receiver<()>,
337 routing_mode: crate::RoutingMode,
338) -> Result<()> {
339 use tracing::{error, info};
340
341 let uses_per_command = routing_mode.supports_per_command_routing();
342
343 loop {
344 tokio::select! {
345 _ = shutdown_rx.recv() => {
346 info!("Shutdown initiated, stopping accept loop");
347 break;
348 }
349
350 accept_result = listener.accept() => {
351 let (stream, addr) = accept_result?;
352 let proxy = proxy.clone();
353
354 tokio::spawn(async move {
355 let result = if uses_per_command {
356 proxy.handle_client_per_command_routing(stream, addr.into()).await
357 } else {
358 proxy.handle_client(stream, addr.into()).await
359 };
360
361 if let Err(e) = result {
362 if !crate::is_client_disconnect_error(&e) {
365 error!("Error handling client {}: {:?}", addr, e);
366 }
367 }
368 });
369 }
370 }
371 }
372
373 proxy.graceful_shutdown().await;
374 info!("Proxy shutdown complete");
375
376 Ok(())
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn test_runtime_config_from_args_default() {
385 let config = RuntimeConfig::from_args(None);
386
387 assert_eq!(config.worker_threads(), 1);
389 assert!(config.is_single_threaded());
390 assert!(config.enable_cpu_pinning); }
392
393 #[test]
394 fn test_runtime_config_from_args_explicit() {
395 let thread_count = ThreadCount::new(4).unwrap();
396 let config = RuntimeConfig::from_args(Some(thread_count));
397
398 assert_eq!(config.worker_threads(), 4);
399 assert!(!config.is_single_threaded());
400 }
401
402 #[test]
403 fn test_runtime_config_single_threaded() {
404 let thread_count = ThreadCount::new(1).unwrap();
405 let config = RuntimeConfig::from_args(Some(thread_count));
406
407 assert_eq!(config.worker_threads(), 1);
408 assert!(config.is_single_threaded());
409 }
410
411 #[test]
412 fn test_runtime_config_multi_threaded() {
413 let thread_count = ThreadCount::new(8).unwrap();
414 let config = RuntimeConfig::from_args(Some(thread_count));
415
416 assert_eq!(config.worker_threads(), 8);
417 assert!(config.enable_cpu_pinning);
418 }
419
420 #[test]
421 fn test_runtime_config_without_cpu_pinning() {
422 let thread_count = ThreadCount::new(4).unwrap();
423 let config = RuntimeConfig::from_args(Some(thread_count)).without_cpu_pinning();
424
425 assert_eq!(config.worker_threads(), 4);
426 assert!(!config.enable_cpu_pinning);
427 }
428
429 #[test]
430 fn test_runtime_config_default() {
431 let config = RuntimeConfig::default();
432
433 let expected = RuntimeConfig::from_args(None);
435 assert_eq!(config.worker_threads(), expected.worker_threads());
436 }
437
438 #[test]
439 fn test_pin_to_cpu_cores_non_fatal() {
440 let result = pin_to_cpu_cores(1);
442 assert!(result.is_ok());
443 }
444
445 #[test]
448 fn test_runtime_config_zero_threads_auto() {
449 let config = RuntimeConfig::from_args(None);
453 assert_eq!(config.worker_threads(), 1); }
455
456 #[test]
457 fn test_runtime_config_large_thread_count() {
458 let thread_count = ThreadCount::new(128).unwrap();
459 let config = RuntimeConfig::from_args(Some(thread_count));
460
461 assert_eq!(config.worker_threads(), 128);
462 assert!(!config.is_single_threaded());
463 }
464
465 #[test]
466 fn test_runtime_config_clone() {
467 let config = RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap()));
468 let cloned = config.clone();
469
470 assert_eq!(cloned.worker_threads(), config.worker_threads());
471 assert_eq!(cloned.enable_cpu_pinning, config.enable_cpu_pinning);
472 }
473
474 #[test]
475 fn test_runtime_config_debug() {
476 let config = RuntimeConfig::from_args(Some(ThreadCount::new(2).unwrap()));
477 let debug_str = format!("{:?}", config);
478
479 assert!(debug_str.contains("RuntimeConfig"));
480 assert!(debug_str.contains("worker_threads"));
481 assert!(debug_str.contains("enable_cpu_pinning"));
482 }
483
484 #[test]
487 fn test_runtime_config_builder_chaining() {
488 let config =
489 RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap())).without_cpu_pinning();
490
491 assert_eq!(config.worker_threads(), 4);
492 assert!(!config.enable_cpu_pinning);
493 }
494
495 #[test]
496 fn test_runtime_config_builder_multiple_calls() {
497 let config = RuntimeConfig::from_args(Some(ThreadCount::new(8).unwrap()))
498 .without_cpu_pinning()
499 .without_cpu_pinning(); assert!(!config.enable_cpu_pinning);
502 }
503
504 #[test]
507 fn test_worker_threads_getter() {
508 let config = RuntimeConfig::from_args(Some(ThreadCount::new(6).unwrap()));
509 assert_eq!(config.worker_threads(), 6);
510 }
511
512 #[test]
513 fn test_is_single_threaded_true() {
514 let config = RuntimeConfig::from_args(Some(ThreadCount::new(1).unwrap()));
515 assert!(config.is_single_threaded());
516 }
517
518 #[test]
519 fn test_is_single_threaded_false() {
520 let config = RuntimeConfig::from_args(Some(ThreadCount::new(2).unwrap()));
521 assert!(!config.is_single_threaded());
522 }
523
524 #[test]
525 fn test_is_single_threaded_false_for_large() {
526 let config = RuntimeConfig::from_args(Some(ThreadCount::new(100).unwrap()));
527 assert!(!config.is_single_threaded());
528 }
529
530 #[test]
533 #[cfg(target_os = "linux")]
534 fn test_pin_to_cpu_cores_linux_single_core() {
535 let result = pin_to_cpu_cores(1);
536 assert!(result.is_ok());
537 }
538
539 #[test]
540 #[cfg(target_os = "linux")]
541 fn test_pin_to_cpu_cores_linux_multi_core() {
542 let result = pin_to_cpu_cores(4);
543 assert!(result.is_ok());
544 }
545
546 #[test]
547 #[cfg(target_os = "linux")]
548 fn test_pin_to_cpu_cores_linux_zero_cores() {
549 let result = pin_to_cpu_cores(0);
551 assert!(result.is_ok());
552 }
553
554 #[test]
555 #[cfg(target_os = "linux")]
556 fn test_pin_to_cpu_cores_linux_many_cores() {
557 let result = pin_to_cpu_cores(1024);
559 assert!(result.is_ok()); }
561
562 #[test]
563 #[cfg(not(target_os = "linux"))]
564 fn test_pin_to_cpu_cores_non_linux() {
565 let result = pin_to_cpu_cores(4);
567 assert!(result.is_ok());
568 }
569
570 #[test]
573 fn test_default_matches_from_args_none() {
574 let default_config = RuntimeConfig::default();
575 let explicit_config = RuntimeConfig::from_args(None);
576
577 assert_eq!(
578 default_config.worker_threads(),
579 explicit_config.worker_threads()
580 );
581 assert_eq!(
582 default_config.enable_cpu_pinning,
583 explicit_config.enable_cpu_pinning
584 );
585 }
586
587 #[test]
588 fn test_default_is_single_threaded() {
589 let config = RuntimeConfig::default();
590 assert!(config.is_single_threaded());
591 }
592
593 #[test]
594 fn test_default_has_cpu_pinning_enabled() {
595 let config = RuntimeConfig::default();
596 assert!(config.enable_cpu_pinning);
597 }
598
599 #[test]
602 fn test_config_single_threaded_with_pinning() {
603 let config = RuntimeConfig::from_args(Some(ThreadCount::new(1).unwrap()));
604 assert!(config.is_single_threaded());
605 assert!(config.enable_cpu_pinning);
606 }
607
608 #[test]
609 fn test_config_single_threaded_without_pinning() {
610 let config =
611 RuntimeConfig::from_args(Some(ThreadCount::new(1).unwrap())).without_cpu_pinning();
612 assert!(config.is_single_threaded());
613 assert!(!config.enable_cpu_pinning);
614 }
615
616 #[test]
617 fn test_config_multi_threaded_with_pinning() {
618 let config = RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap()));
619 assert!(!config.is_single_threaded());
620 assert!(config.enable_cpu_pinning);
621 }
622
623 #[test]
624 fn test_config_multi_threaded_without_pinning() {
625 let config =
626 RuntimeConfig::from_args(Some(ThreadCount::new(4).unwrap())).without_cpu_pinning();
627 assert!(!config.is_single_threaded());
628 assert!(!config.enable_cpu_pinning);
629 }
630}