ant_core/
production.rs

1//! Production hardening features for the P2P Foundation
2//!
3//! This module provides essential production-ready capabilities including:
4//! - Resource management and limits
5//! - Performance monitoring and metrics
6//! - Graceful shutdown handling
7//! - Configuration validation
8//! - Rate limiting and throttling
9//! - Health checks and diagnostics
10
11use crate::{P2PError, Result};
12use serde::{Deserialize, Serialize};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16use tokio::sync::{RwLock, Semaphore};
17use tokio::time::interval;
18use tracing::{debug, info, warn, error};
19
20/// Production configuration with resource limits and performance tuning
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ProductionConfig {
23    /// Maximum number of concurrent connections
24    pub max_connections: usize,
25    /// Maximum memory usage in bytes (0 = unlimited)
26    pub max_memory_bytes: u64,
27    /// Maximum bandwidth per second in bytes
28    pub max_bandwidth_bps: u64,
29    /// Connection timeout for new peers
30    pub connection_timeout: Duration,
31    /// Keep-alive interval for existing connections
32    pub keep_alive_interval: Duration,
33    /// Health check interval
34    pub health_check_interval: Duration,
35    /// Metrics collection interval
36    pub metrics_interval: Duration,
37    /// Enable detailed performance tracking
38    pub enable_performance_tracking: bool,
39    /// Enable automatic resource cleanup
40    pub enable_auto_cleanup: bool,
41    /// Graceful shutdown timeout
42    pub shutdown_timeout: Duration,
43    /// Rate limiting configuration
44    pub rate_limits: RateLimitConfig,
45}
46
47/// Rate limiting configuration for different operations
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct RateLimitConfig {
50    /// Maximum DHT operations per second per peer
51    pub dht_ops_per_sec: u32,
52    /// Maximum MCP calls per second per peer
53    pub mcp_calls_per_sec: u32,
54    /// Maximum messages per second per peer
55    pub messages_per_sec: u32,
56    /// Burst capacity for rate limiting
57    pub burst_capacity: u32,
58    /// Rate limit window duration
59    pub window_duration: Duration,
60}
61
62/// System resource usage metrics
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ResourceMetrics {
65    /// Current memory usage in bytes
66    pub memory_used: u64,
67    /// Current number of connections
68    pub active_connections: usize,
69    /// Current bandwidth usage in bytes per second
70    pub bandwidth_usage: u64,
71    /// CPU usage percentage (0.0 - 100.0)
72    pub cpu_usage: f64,
73    /// Network latency statistics
74    pub network_latency: LatencyStats,
75    /// DHT performance metrics
76    pub dht_metrics: DHTMetrics,
77    /// MCP performance metrics
78    pub mcp_metrics: MCPMetrics,
79    /// Timestamp of metrics collection
80    pub timestamp: SystemTime,
81}
82
83/// Network latency statistics
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct LatencyStats {
86    /// Average latency in milliseconds
87    pub avg_ms: f64,
88    /// Minimum latency in milliseconds
89    pub min_ms: f64,
90    /// Maximum latency in milliseconds
91    pub max_ms: f64,
92    /// 95th percentile latency in milliseconds
93    pub p95_ms: f64,
94    /// Number of samples
95    pub sample_count: u64,
96}
97
98/// DHT performance metrics
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct DHTMetrics {
101    /// Operations per second
102    pub ops_per_sec: f64,
103    /// Average operation latency in milliseconds
104    pub avg_latency_ms: f64,
105    /// Success rate (0.0 - 1.0)
106    pub success_rate: f64,
107    /// Cache hit rate (0.0 - 1.0)
108    pub cache_hit_rate: f64,
109}
110
111/// MCP performance metrics
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct MCPMetrics {
114    /// Tool calls per second
115    pub calls_per_sec: f64,
116    /// Average call latency in milliseconds
117    pub avg_latency_ms: f64,
118    /// Success rate (0.0 - 1.0)
119    pub success_rate: f64,
120    /// Active service count
121    pub active_services: usize,
122}
123
124/// Production resource manager for enforcing limits and monitoring
125pub struct ResourceManager {
126    config: ProductionConfig,
127    metrics: Arc<RwLock<ResourceMetrics>>,
128    connection_semaphore: Arc<Semaphore>,
129    bandwidth_tracker: Arc<BandwidthTracker>,
130    rate_limiters: Arc<RwLock<std::collections::HashMap<String, RateLimiter>>>,
131    shutdown_signal: Arc<tokio::sync::Notify>,
132    is_shutting_down: Arc<std::sync::atomic::AtomicBool>,
133}
134
135/// Bandwidth tracking for monitoring and limiting
136struct BandwidthTracker {
137    bytes_sent: AtomicU64,
138    bytes_received: AtomicU64,
139    last_reset: Arc<RwLock<Instant>>,
140    window_duration: Duration,
141}
142
143/// Rate limiter using token bucket algorithm
144struct RateLimiter {
145    tokens: Arc<std::sync::Mutex<f64>>,
146    last_refill: Arc<std::sync::Mutex<Instant>>,
147    max_tokens: f64,
148    refill_rate: f64, // tokens per second
149}
150
151impl Default for ProductionConfig {
152    fn default() -> Self {
153        Self {
154            max_connections: 1000,
155            max_memory_bytes: 1024 * 1024 * 1024, // 1GB
156            max_bandwidth_bps: 100 * 1024 * 1024, // 100 MB/s
157            connection_timeout: Duration::from_secs(30),
158            keep_alive_interval: Duration::from_secs(30),
159            health_check_interval: Duration::from_secs(60),
160            metrics_interval: Duration::from_secs(10),
161            enable_performance_tracking: true,
162            enable_auto_cleanup: true,
163            shutdown_timeout: Duration::from_secs(30),
164            rate_limits: RateLimitConfig::default(),
165        }
166    }
167}
168
169impl Default for RateLimitConfig {
170    fn default() -> Self {
171        Self {
172            dht_ops_per_sec: 100,
173            mcp_calls_per_sec: 50,
174            messages_per_sec: 200,
175            burst_capacity: 10,
176            window_duration: Duration::from_secs(1),
177        }
178    }
179}
180
181impl ResourceManager {
182    /// Create a new resource manager with the given configuration
183    pub fn new(config: ProductionConfig) -> Self {
184        let connection_semaphore = Arc::new(Semaphore::new(config.max_connections));
185        let bandwidth_tracker = Arc::new(BandwidthTracker::new(Duration::from_secs(1)));
186        
187        let initial_metrics = ResourceMetrics {
188            memory_used: 0,
189            active_connections: 0,
190            bandwidth_usage: 0,
191            cpu_usage: 0.0,
192            network_latency: LatencyStats::default(),
193            dht_metrics: DHTMetrics::default(),
194            mcp_metrics: MCPMetrics::default(),
195            timestamp: SystemTime::now(),
196        };
197
198        Self {
199            config,
200            metrics: Arc::new(RwLock::new(initial_metrics)),
201            connection_semaphore,
202            bandwidth_tracker,
203            rate_limiters: Arc::new(RwLock::new(std::collections::HashMap::new())),
204            shutdown_signal: Arc::new(tokio::sync::Notify::new()),
205            is_shutting_down: Arc::new(std::sync::atomic::AtomicBool::new(false)),
206        }
207    }
208
209    /// Start the resource manager background tasks
210    pub async fn start(&self) -> Result<()> {
211        info!("Starting production resource manager");
212
213        // Start metrics collection task
214        if self.config.enable_performance_tracking {
215            self.spawn_metrics_collector().await;
216        }
217
218        // Start health check task
219        self.spawn_health_checker().await;
220
221        // Start cleanup task
222        if self.config.enable_auto_cleanup {
223            self.spawn_cleanup_task().await;
224        }
225
226        info!("Production resource manager started successfully");
227        Ok(())
228    }
229
230    /// Gracefully shutdown the resource manager
231    pub async fn shutdown(&self) -> Result<()> {
232        info!("Initiating graceful shutdown of resource manager");
233        
234        self.is_shutting_down.store(true, Ordering::SeqCst);
235        self.shutdown_signal.notify_waiters();
236
237        // Wait for shutdown timeout
238        tokio::time::timeout(self.config.shutdown_timeout, async {
239            // Wait for all connections to close gracefully
240            while self.connection_semaphore.available_permits() < self.config.max_connections {
241                tokio::time::sleep(Duration::from_millis(100)).await;
242            }
243        }).await.map_err(|_| P2PError::Network("Shutdown timeout exceeded".to_string()))?;
244
245        info!("Resource manager shutdown completed");
246        Ok(())
247    }
248
249    /// Attempt to acquire a connection slot
250    pub async fn acquire_connection(&self) -> Result<ConnectionGuard<'_>> {
251        if self.is_shutting_down.load(Ordering::SeqCst) {
252            return Err(P2PError::Network("System is shutting down".to_string()));
253        }
254
255        let permit = self.connection_semaphore.clone()
256            .acquire_owned()
257            .await
258            .map_err(|_| P2PError::Network("Connection semaphore closed".to_string()))?;
259
260        debug!("Connection acquired, {} remaining", self.connection_semaphore.available_permits());
261        Ok(ConnectionGuard { permit, _manager: self })
262    }
263
264    /// Check if a peer is within rate limits for the given operation
265    pub async fn check_rate_limit(&self, peer_id: &str, operation: &str) -> Result<bool> {
266        let limit = match operation {
267            "dht" => self.config.rate_limits.dht_ops_per_sec,
268            "mcp" => self.config.rate_limits.mcp_calls_per_sec,
269            "message" => self.config.rate_limits.messages_per_sec,
270            _ => return Ok(true), // Unknown operation, allow
271        };
272
273        let mut limiters = self.rate_limiters.write().await;
274        let limiter = limiters.entry(peer_id.to_string())
275            .or_insert_with(|| RateLimiter::new(limit as f64, self.config.rate_limits.burst_capacity as f64));
276
277        Ok(limiter.try_acquire())
278    }
279
280    /// Record bandwidth usage
281    pub fn record_bandwidth(&self, bytes_sent: u64, bytes_received: u64) {
282        self.bandwidth_tracker.record(bytes_sent, bytes_received);
283    }
284
285    /// Get current resource metrics
286    pub async fn get_metrics(&self) -> ResourceMetrics {
287        self.metrics.read().await.clone()
288    }
289
290    /// Check if the system is healthy
291    pub async fn health_check(&self) -> Result<()> {
292        let metrics = self.get_metrics().await;
293
294        // Check memory usage
295        if self.config.max_memory_bytes > 0 && metrics.memory_used > self.config.max_memory_bytes {
296            warn!("Memory usage ({} bytes) exceeds limit ({} bytes)", 
297                  metrics.memory_used, self.config.max_memory_bytes);
298            return Err(P2PError::Network("Memory limit exceeded".to_string()));
299        }
300
301        // Check bandwidth usage
302        if metrics.bandwidth_usage > self.config.max_bandwidth_bps {
303            warn!("Bandwidth usage ({} bps) exceeds limit ({} bps)",
304                  metrics.bandwidth_usage, self.config.max_bandwidth_bps);
305        }
306
307        // Check connection count
308        if metrics.active_connections >= self.config.max_connections {
309            warn!("Connection count ({}) at maximum ({})",
310                  metrics.active_connections, self.config.max_connections);
311        }
312
313        debug!("Health check passed: {} connections, {} bytes memory, {} bps bandwidth",
314               metrics.active_connections, metrics.memory_used, metrics.bandwidth_usage);
315
316        Ok(())
317    }
318
319    /// Spawn metrics collection background task
320    async fn spawn_metrics_collector(&self) {
321        let manager = self.clone();
322        tokio::spawn(async move {
323            let mut interval = interval(manager.config.metrics_interval);
324            loop {
325                tokio::select! {
326                    _ = interval.tick() => {
327                        if let Err(e) = manager.collect_metrics().await {
328                            error!("Failed to collect metrics: {}", e);
329                        }
330                    }
331                    _ = manager.shutdown_signal.notified() => {
332                        debug!("Metrics collector shutting down");
333                        break;
334                    }
335                }
336            }
337        });
338    }
339
340    /// Spawn health check background task
341    async fn spawn_health_checker(&self) {
342        let manager = self.clone();
343        tokio::spawn(async move {
344            let mut interval = interval(manager.config.health_check_interval);
345            loop {
346                tokio::select! {
347                    _ = interval.tick() => {
348                        if let Err(e) = manager.health_check().await {
349                            error!("Health check failed: {}", e);
350                        }
351                    }
352                    _ = manager.shutdown_signal.notified() => {
353                        debug!("Health checker shutting down");
354                        break;
355                    }
356                }
357            }
358        });
359    }
360
361    /// Spawn cleanup background task
362    async fn spawn_cleanup_task(&self) {
363        let manager = self.clone();
364        tokio::spawn(async move {
365            let mut interval = interval(Duration::from_secs(300)); // Cleanup every 5 minutes
366            loop {
367                tokio::select! {
368                    _ = interval.tick() => {
369                        manager.cleanup_resources().await;
370                    }
371                    _ = manager.shutdown_signal.notified() => {
372                        debug!("Cleanup task shutting down");
373                        break;
374                    }
375                }
376            }
377        });
378    }
379
380    /// Collect current system metrics
381    async fn collect_metrics(&self) -> Result<()> {
382        let mut metrics = self.metrics.write().await;
383        
384        // Update bandwidth usage
385        metrics.bandwidth_usage = self.bandwidth_tracker.current_usage();
386        
387        // Update connection count
388        metrics.active_connections = self.config.max_connections - self.connection_semaphore.available_permits();
389        
390        // Update timestamp
391        metrics.timestamp = SystemTime::now();
392
393        debug!("Metrics updated: {} connections, {} bps bandwidth",
394               metrics.active_connections, metrics.bandwidth_usage);
395
396        Ok(())
397    }
398
399    /// Clean up expired resources
400    async fn cleanup_resources(&self) {
401        debug!("Starting resource cleanup");
402        
403        // Clean up expired rate limiters
404        let mut limiters = self.rate_limiters.write().await;
405        let now = Instant::now();
406        limiters.retain(|_, limiter| !limiter.is_expired(now));
407        
408        debug!("Cleanup completed, {} rate limiters remaining", limiters.len());
409    }
410}
411
412// Implement Clone for ResourceManager to allow sharing across tasks
413impl Clone for ResourceManager {
414    fn clone(&self) -> Self {
415        Self {
416            config: self.config.clone(),
417            metrics: self.metrics.clone(),
418            connection_semaphore: self.connection_semaphore.clone(),
419            bandwidth_tracker: self.bandwidth_tracker.clone(),
420            rate_limiters: self.rate_limiters.clone(),
421            shutdown_signal: self.shutdown_signal.clone(),
422            is_shutting_down: self.is_shutting_down.clone(),
423        }
424    }
425}
426
427/// RAII guard for connection permits
428pub struct ConnectionGuard<'a> {
429    #[allow(dead_code)]
430    permit: tokio::sync::OwnedSemaphorePermit,
431    _manager: &'a ResourceManager,
432}
433
434impl<'a> Drop for ConnectionGuard<'a> {
435    fn drop(&mut self) {
436        debug!("Connection released");
437    }
438}
439
440impl BandwidthTracker {
441    fn new(window_duration: Duration) -> Self {
442        Self {
443            bytes_sent: AtomicU64::new(0),
444            bytes_received: AtomicU64::new(0),
445            last_reset: Arc::new(RwLock::new(Instant::now())),
446            window_duration,
447        }
448    }
449
450    fn record(&self, bytes_sent: u64, bytes_received: u64) {
451        self.bytes_sent.fetch_add(bytes_sent, Ordering::Relaxed);
452        self.bytes_received.fetch_add(bytes_received, Ordering::Relaxed);
453    }
454
455    fn current_usage(&self) -> u64 {
456        let now = Instant::now();
457        
458        // Try to read last reset time non-blocking first
459        let last_reset = {
460            if let Ok(guard) = self.last_reset.try_read() {
461                *guard
462            } else {
463                // If we can't get a read lock, return current values without reset
464                let sent = self.bytes_sent.load(Ordering::Relaxed);
465                let received = self.bytes_received.load(Ordering::Relaxed);
466                return sent + received; // Return raw bytes without rate calculation
467            }
468        };
469        
470        if now.duration_since(last_reset) >= self.window_duration {
471            // Try to reset counters for new window
472            if let Ok(mut guard) = self.last_reset.try_write() {
473                self.bytes_sent.store(0, Ordering::Relaxed);
474                self.bytes_received.store(0, Ordering::Relaxed);
475                *guard = now;
476                return 0;
477            }
478        }
479
480        let sent = self.bytes_sent.load(Ordering::Relaxed);
481        let received = self.bytes_received.load(Ordering::Relaxed);
482        
483        // Calculate bytes per second
484        let elapsed_secs = now.duration_since(last_reset).as_secs_f64();
485        if elapsed_secs > 0.0 {
486            ((sent + received) as f64 / elapsed_secs) as u64
487        } else {
488            0
489        }
490    }
491}
492
493impl RateLimiter {
494    fn new(max_tokens: f64, refill_rate: f64) -> Self {
495        Self {
496            tokens: Arc::new(std::sync::Mutex::new(max_tokens)),
497            last_refill: Arc::new(std::sync::Mutex::new(Instant::now())),
498            max_tokens,
499            refill_rate,
500        }
501    }
502
503    fn try_acquire(&self) -> bool {
504        let now = Instant::now();
505        
506        // Refill tokens based on elapsed time
507        {
508            let mut last_refill = self.last_refill.lock().unwrap();
509            let elapsed = now.duration_since(*last_refill).as_secs_f64();
510            
511            if elapsed > 0.0 {
512                let mut tokens = self.tokens.lock().unwrap();
513                *tokens = (*tokens + elapsed * self.refill_rate).min(self.max_tokens);
514                *last_refill = now;
515            }
516        }
517
518        // Try to consume a token
519        let mut tokens = self.tokens.lock().unwrap();
520        if *tokens >= 1.0 {
521            *tokens -= 1.0;
522            true
523        } else {
524            false
525        }
526    }
527
528    fn is_expired(&self, now: Instant) -> bool {
529        let last_refill = *self.last_refill.lock().unwrap();
530        now.duration_since(last_refill) > Duration::from_secs(300) // 5 minutes
531    }
532}
533
534impl Default for LatencyStats {
535    fn default() -> Self {
536        Self {
537            avg_ms: 0.0,
538            min_ms: 0.0,
539            max_ms: 0.0,
540            p95_ms: 0.0,
541            sample_count: 0,
542        }
543    }
544}
545
546impl Default for DHTMetrics {
547    fn default() -> Self {
548        Self {
549            ops_per_sec: 0.0,
550            avg_latency_ms: 0.0,
551            success_rate: 1.0,
552            cache_hit_rate: 0.0,
553        }
554    }
555}
556
557impl Default for MCPMetrics {
558    fn default() -> Self {
559        Self {
560            calls_per_sec: 0.0,
561            avg_latency_ms: 0.0,
562            success_rate: 1.0,
563            active_services: 0,
564        }
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571    use tokio::time::sleep;
572
573    fn create_test_config() -> ProductionConfig {
574        ProductionConfig {
575            max_connections: 10,
576            max_memory_bytes: 1024 * 1024, // 1MB for testing
577            max_bandwidth_bps: 1024 * 1024, // 1MB/s for testing
578            connection_timeout: Duration::from_millis(100),
579            keep_alive_interval: Duration::from_millis(50),
580            health_check_interval: Duration::from_millis(50),
581            metrics_interval: Duration::from_millis(50),
582            enable_performance_tracking: true,
583            enable_auto_cleanup: true,
584            shutdown_timeout: Duration::from_millis(200),
585            rate_limits: RateLimitConfig {
586                dht_ops_per_sec: 5,
587                mcp_calls_per_sec: 3,
588                messages_per_sec: 10,
589                burst_capacity: 5,
590                window_duration: Duration::from_millis(100),
591            },
592        }
593    }
594
595    #[test]
596    fn test_production_config_default() {
597        let config = ProductionConfig::default();
598        assert_eq!(config.max_connections, 1000);
599        assert_eq!(config.max_memory_bytes, 1024 * 1024 * 1024); // 1GB
600        assert_eq!(config.max_bandwidth_bps, 100 * 1024 * 1024); // 100MB/s
601        assert_eq!(config.connection_timeout, Duration::from_secs(30));
602        assert_eq!(config.keep_alive_interval, Duration::from_secs(30));
603        assert_eq!(config.health_check_interval, Duration::from_secs(60));
604        assert_eq!(config.metrics_interval, Duration::from_secs(10));
605        assert!(config.enable_performance_tracking);
606        assert!(config.enable_auto_cleanup);
607        assert_eq!(config.shutdown_timeout, Duration::from_secs(30));
608    }
609
610    #[test]
611    fn test_rate_limit_config_default() {
612        let config = RateLimitConfig::default();
613        assert_eq!(config.dht_ops_per_sec, 100);
614        assert_eq!(config.mcp_calls_per_sec, 50);
615        assert_eq!(config.messages_per_sec, 200);
616        assert_eq!(config.burst_capacity, 10);
617        assert_eq!(config.window_duration, Duration::from_secs(1));
618    }
619
620    #[test]
621    fn test_latency_stats_default() {
622        let stats = LatencyStats::default();
623        assert_eq!(stats.avg_ms, 0.0);
624        assert_eq!(stats.min_ms, 0.0);
625        assert_eq!(stats.max_ms, 0.0);
626        assert_eq!(stats.p95_ms, 0.0);
627        assert_eq!(stats.sample_count, 0);
628    }
629
630    #[test]
631    fn test_dht_metrics_default() {
632        let metrics = DHTMetrics::default();
633        assert_eq!(metrics.ops_per_sec, 0.0);
634        assert_eq!(metrics.avg_latency_ms, 0.0);
635        assert_eq!(metrics.success_rate, 1.0);
636        assert_eq!(metrics.cache_hit_rate, 0.0);
637    }
638
639    #[test]
640    fn test_mcp_metrics_default() {
641        let metrics = MCPMetrics::default();
642        assert_eq!(metrics.calls_per_sec, 0.0);
643        assert_eq!(metrics.avg_latency_ms, 0.0);
644        assert_eq!(metrics.success_rate, 1.0);
645        assert_eq!(metrics.active_services, 0);
646    }
647
648    #[tokio::test]
649    async fn test_resource_manager_creation() {
650        let config = create_test_config();
651        let manager = ResourceManager::new(config.clone());
652        
653        let metrics = manager.get_metrics().await;
654        assert_eq!(metrics.active_connections, 0);
655        assert_eq!(metrics.bandwidth_usage, 0);
656        assert_eq!(metrics.memory_used, 0);
657        assert_eq!(metrics.cpu_usage, 0.0);
658        assert_eq!(metrics.network_latency.sample_count, 0);
659        assert_eq!(metrics.dht_metrics.success_rate, 1.0);
660        assert_eq!(metrics.mcp_metrics.success_rate, 1.0);
661    }
662
663    #[tokio::test]
664    async fn test_resource_manager_cloning() {
665        let config = create_test_config();
666        let manager = ResourceManager::new(config);
667        let cloned = manager.clone();
668        
669        // Both should work independently
670        let _guard1 = manager.acquire_connection().await.unwrap();
671        let _guard2 = cloned.acquire_connection().await.unwrap();
672        
673        // But they share the same semaphore
674        assert_eq!(manager.connection_semaphore.available_permits(), 8);
675        assert_eq!(cloned.connection_semaphore.available_permits(), 8);
676    }
677
678    #[tokio::test]
679    async fn test_connection_acquisition() {
680        let config = ProductionConfig {
681            max_connections: 2,
682            ..create_test_config()
683        };
684        let manager = ResourceManager::new(config);
685
686        // Acquire first connection
687        let _guard1 = manager.acquire_connection().await.unwrap();
688        assert_eq!(manager.connection_semaphore.available_permits(), 1);
689
690        // Acquire second connection
691        let _guard2 = manager.acquire_connection().await.unwrap();
692        assert_eq!(manager.connection_semaphore.available_permits(), 0);
693
694        // Drop first guard and check permit is released
695        drop(_guard1);
696        sleep(Duration::from_millis(1)).await; // Allow time for cleanup
697        assert_eq!(manager.connection_semaphore.available_permits(), 1);
698    }
699
700    #[tokio::test]
701    async fn test_connection_acquisition_during_shutdown() {
702        let config = create_test_config();
703        let manager = ResourceManager::new(config);
704        
705        // Mark as shutting down
706        manager.is_shutting_down.store(true, Ordering::SeqCst);
707        
708        // Should fail to acquire connection during shutdown
709        let result = manager.acquire_connection().await;
710        assert!(result.is_err());
711        match result {
712            Err(e) => assert!(e.to_string().contains("shutting down")),
713            Ok(_) => panic!("Expected error but got success"),
714        }
715    }
716
717    #[tokio::test]
718    async fn test_connection_guard_drop() {
719        let config = create_test_config();
720        let manager = ResourceManager::new(config);
721        
722        let initial_permits = manager.connection_semaphore.available_permits();
723        {
724            let _guard = manager.acquire_connection().await.unwrap();
725            assert_eq!(manager.connection_semaphore.available_permits(), initial_permits - 1);
726        }
727        // Guard should be dropped and permit released
728        assert_eq!(manager.connection_semaphore.available_permits(), initial_permits);
729    }
730
731    #[tokio::test]
732    async fn test_rate_limiting_dht_operations() {
733        let config = ProductionConfig {
734            rate_limits: RateLimitConfig {
735                dht_ops_per_sec: 2,
736                burst_capacity: 2,
737                ..Default::default()
738            },
739            ..create_test_config()
740        };
741        let manager = ResourceManager::new(config);
742
743        // Should allow burst capacity
744        assert!(manager.check_rate_limit("peer1", "dht").await.unwrap());
745        assert!(manager.check_rate_limit("peer1", "dht").await.unwrap());
746        
747        // Should deny after burst exhausted
748        assert!(!manager.check_rate_limit("peer1", "dht").await.unwrap());
749    }
750
751    #[tokio::test]
752    async fn test_rate_limiting_mcp_operations() {
753        let config = ProductionConfig {
754            rate_limits: RateLimitConfig {
755                mcp_calls_per_sec: 1,
756                burst_capacity: 1,
757                ..Default::default()
758            },
759            ..create_test_config()
760        };
761        let manager = ResourceManager::new(config);
762
763        // Should allow first MCP call
764        assert!(manager.check_rate_limit("peer2", "mcp").await.unwrap());
765        
766        // Should deny second MCP call
767        assert!(!manager.check_rate_limit("peer2", "mcp").await.unwrap());
768    }
769
770    #[tokio::test]
771    async fn test_rate_limiting_message_operations() {
772        let config = ProductionConfig {
773            rate_limits: RateLimitConfig {
774                messages_per_sec: 3,
775                burst_capacity: 3,
776                ..Default::default()
777            },
778            ..create_test_config()
779        };
780        let manager = ResourceManager::new(config);
781
782        // Should allow burst capacity for messages
783        for _ in 0..3 {
784            assert!(manager.check_rate_limit("peer3", "message").await.unwrap());
785        }
786        
787        // Should deny after burst exhausted
788        assert!(!manager.check_rate_limit("peer3", "message").await.unwrap());
789    }
790
791    #[tokio::test]
792    async fn test_rate_limiting_unknown_operation() {
793        let config = create_test_config();
794        let manager = ResourceManager::new(config);
795
796        // Unknown operations should be allowed
797        assert!(manager.check_rate_limit("peer4", "unknown").await.unwrap());
798        assert!(manager.check_rate_limit("peer4", "unknown").await.unwrap());
799    }
800
801    #[tokio::test]
802    async fn test_rate_limiting_different_peers() {
803        let config = ProductionConfig {
804            rate_limits: RateLimitConfig {
805                dht_ops_per_sec: 1,
806                burst_capacity: 1,
807                ..Default::default()
808            },
809            ..create_test_config()
810        };
811        let manager = ResourceManager::new(config);
812
813        // Each peer should have independent rate limits
814        assert!(manager.check_rate_limit("peer1", "dht").await.unwrap());
815        assert!(manager.check_rate_limit("peer2", "dht").await.unwrap());
816        
817        // But each peer should be limited individually
818        assert!(!manager.check_rate_limit("peer1", "dht").await.unwrap());
819        assert!(!manager.check_rate_limit("peer2", "dht").await.unwrap());
820    }
821
822    #[tokio::test]
823    async fn test_bandwidth_tracking() {
824        let tracker = BandwidthTracker::new(Duration::from_millis(100)); // Shorter window for testing
825        
826        tracker.record(1000, 2000);
827        let usage = tracker.current_usage();
828        assert!(usage > 0);
829        
830        // Test reset after window
831        sleep(Duration::from_millis(150)).await; // Wait longer than window
832        let usage_after_reset = tracker.current_usage();
833        assert_eq!(usage_after_reset, 0);
834    }
835
836    #[tokio::test]
837    async fn test_bandwidth_tracking_rate_calculation() {
838        let tracker = BandwidthTracker::new(Duration::from_secs(1));
839        
840        // Record some bytes
841        tracker.record(500, 500); // 1000 bytes total
842        
843        // Wait a short time
844        sleep(Duration::from_millis(50)).await;
845        
846        let usage = tracker.current_usage();
847        // Should calculate bytes per second
848        assert!(usage > 10000); // 1000 bytes in 0.05 seconds = ~20000 bps
849    }
850
851    #[tokio::test]
852    async fn test_bandwidth_tracking_multiple_records() {
853        let tracker = BandwidthTracker::new(Duration::from_millis(200));
854        
855        tracker.record(100, 200);
856        tracker.record(300, 400);
857        tracker.record(500, 600);
858        
859        let usage = tracker.current_usage();
860        assert!(usage > 0);
861        
862        // All records should be included in calculation
863        let sent = tracker.bytes_sent.load(Ordering::Relaxed);
864        let received = tracker.bytes_received.load(Ordering::Relaxed);
865        assert_eq!(sent, 900); // 100 + 300 + 500
866        assert_eq!(received, 1200); // 200 + 400 + 600
867    }
868
869    #[tokio::test]
870    async fn test_manager_bandwidth_recording() {
871        let config = create_test_config();
872        let manager = ResourceManager::new(config);
873        
874        // Record some bandwidth usage
875        manager.record_bandwidth(1000, 2000);
876        
877        // Should be reflected in current usage
878        let usage = manager.bandwidth_tracker.current_usage();
879        assert!(usage > 0);
880    }
881
882    #[tokio::test]
883    async fn test_health_check_success() {
884        let config = ProductionConfig {
885            max_memory_bytes: 2048, // 2KB
886            max_bandwidth_bps: 10000, // 10KB/s
887            max_connections: 5,
888            ..create_test_config()
889        };
890        let manager = ResourceManager::new(config);
891        
892        // Health check should pass with default metrics
893        let result = manager.health_check().await;
894        assert!(result.is_ok());
895    }
896
897    #[tokio::test]
898    async fn test_health_check_memory_limit_exceeded() {
899        let config = ProductionConfig {
900            max_memory_bytes: 100, // Very low limit
901            ..create_test_config()
902        };
903        let manager = ResourceManager::new(config);
904        
905        // Manually set high memory usage
906        {
907            let mut metrics = manager.metrics.write().await;
908            metrics.memory_used = 200; // Exceeds limit
909        }
910        
911        // Health check should fail
912        let result = manager.health_check().await;
913        assert!(result.is_err());
914        assert!(result.unwrap_err().to_string().contains("Memory limit exceeded"));
915    }
916
917    #[tokio::test]
918    async fn test_health_check_bandwidth_warning() {
919        let config = ProductionConfig {
920            max_bandwidth_bps: 1000,
921            ..create_test_config()
922        };
923        let manager = ResourceManager::new(config);
924        
925        // Manually set high bandwidth usage
926        {
927            let mut metrics = manager.metrics.write().await;
928            metrics.bandwidth_usage = 2000; // Exceeds limit but doesn't fail
929        }
930        
931        // Health check should still pass (bandwidth warning only)
932        let result = manager.health_check().await;
933        assert!(result.is_ok());
934    }
935
936    #[tokio::test]
937    async fn test_health_check_connection_warning() {
938        let config = ProductionConfig {
939            max_connections: 2,
940            ..create_test_config()
941        };
942        let manager = ResourceManager::new(config);
943        
944        // Acquire maximum connections
945        let _guard1 = manager.acquire_connection().await.unwrap();
946        let _guard2 = manager.acquire_connection().await.unwrap();
947        
948        // Manually update metrics to reflect max connections
949        {
950            let mut metrics = manager.metrics.write().await;
951            metrics.active_connections = 2;
952        }
953        
954        // Health check should still pass (connection warning only)
955        let result = manager.health_check().await;
956        assert!(result.is_ok());
957    }
958
959    #[tokio::test]
960    async fn test_metrics_collection() {
961        let config = create_test_config();
962        let manager = ResourceManager::new(config);
963        
964        // Record some activity
965        manager.record_bandwidth(500, 1000);
966        let _guard = manager.acquire_connection().await.unwrap();
967        
968        // Collect metrics
969        manager.collect_metrics().await.unwrap();
970        
971        let metrics = manager.get_metrics().await;
972        assert_eq!(metrics.active_connections, 1);
973        assert!(metrics.bandwidth_usage > 0);
974        assert!(metrics.timestamp.elapsed().unwrap().as_millis() < 100); // Recent timestamp
975    }
976
977    #[tokio::test]
978    async fn test_graceful_shutdown() {
979        let config = ProductionConfig {
980            shutdown_timeout: Duration::from_millis(100),
981            ..create_test_config()
982        };
983        let manager = ResourceManager::new(config);
984        
985        // Start manager
986        manager.start().await.unwrap();
987        
988        // Shutdown should complete successfully
989        let result = manager.shutdown().await;
990        assert!(result.is_ok());
991        
992        // Should be marked as shutting down
993        assert!(manager.is_shutting_down.load(Ordering::SeqCst));
994    }
995
996    #[tokio::test]
997    async fn test_graceful_shutdown_with_connections() {
998        let config = ProductionConfig {
999            shutdown_timeout: Duration::from_millis(200),
1000            max_connections: 2,
1001            ..create_test_config()
1002        };
1003        let manager = ResourceManager::new(config);
1004        
1005        // Acquire a connection
1006        let guard = manager.acquire_connection().await.unwrap();
1007        
1008        // Start shutdown in background
1009        let manager_clone = manager.clone();
1010        let shutdown_task = tokio::spawn(async move {
1011            manager_clone.shutdown().await
1012        });
1013        
1014        // Wait a bit then release connection
1015        sleep(Duration::from_millis(50)).await;
1016        drop(guard);
1017        
1018        // Shutdown should complete
1019        let result = shutdown_task.await.unwrap();
1020        assert!(result.is_ok());
1021    }
1022
1023    #[tokio::test]
1024    async fn test_shutdown_timeout() {
1025        let config = ProductionConfig {
1026            shutdown_timeout: Duration::from_millis(50), // Very short timeout
1027            max_connections: 1,
1028            ..create_test_config()
1029        };
1030        let manager = ResourceManager::new(config);
1031        
1032        // Acquire and hold a connection
1033        let _guard = manager.acquire_connection().await.unwrap();
1034        
1035        // Shutdown should timeout
1036        let result = manager.shutdown().await;
1037        assert!(result.is_err());
1038        assert!(result.unwrap_err().to_string().contains("Shutdown timeout"));
1039    }
1040
1041    #[tokio::test]
1042    async fn test_start_with_disabled_features() {
1043        let config = ProductionConfig {
1044            enable_performance_tracking: false,
1045            enable_auto_cleanup: false,
1046            ..create_test_config()
1047        };
1048        let manager = ResourceManager::new(config);
1049        
1050        // Should start successfully even with features disabled
1051        let result = manager.start().await;
1052        assert!(result.is_ok());
1053    }
1054
1055    #[test]
1056    fn test_rate_limiter_creation() {
1057        let limiter = RateLimiter::new(10.0, 5.0); // 10 tokens max, 5 per second refill
1058        
1059        // Should start with full capacity
1060        assert!(limiter.try_acquire());
1061    }
1062
1063    #[test]
1064    fn test_rate_limiter_token_exhaustion() {
1065        let limiter = RateLimiter::new(2.0, 1.0); // 2 tokens max, 1 per second refill
1066        
1067        // Should allow 2 acquisitions
1068        assert!(limiter.try_acquire());
1069        assert!(limiter.try_acquire());
1070        
1071        // Should deny third acquisition
1072        assert!(!limiter.try_acquire());
1073    }
1074
1075    #[tokio::test]
1076    async fn test_rate_limiter_refill() {
1077        let limiter = RateLimiter::new(1.0, 10.0); // 1 token max, 10 per second refill
1078        
1079        // Exhaust tokens
1080        assert!(limiter.try_acquire());
1081        assert!(!limiter.try_acquire());
1082        
1083        // Wait for refill
1084        sleep(Duration::from_millis(200)).await; // Should refill at least 2 tokens
1085        
1086        // Should allow acquisition again
1087        assert!(limiter.try_acquire());
1088    }
1089
1090    #[test]
1091    fn test_rate_limiter_expiration() {
1092        let limiter = RateLimiter::new(10.0, 5.0);
1093        
1094        // Should not be expired initially
1095        assert!(!limiter.is_expired(Instant::now()));
1096        
1097        // Should be expired after 5+ minutes
1098        let future_time = Instant::now() + Duration::from_secs(400);
1099        assert!(limiter.is_expired(future_time));
1100    }
1101
1102    #[tokio::test]
1103    async fn test_cleanup_resources() {
1104        let config = create_test_config();
1105        let manager = ResourceManager::new(config);
1106        
1107        // Add some rate limiters
1108        manager.check_rate_limit("peer1", "dht").await.unwrap();
1109        manager.check_rate_limit("peer2", "mcp").await.unwrap();
1110        
1111        // Should have rate limiters
1112        {
1113            let limiters = manager.rate_limiters.read().await;
1114            assert_eq!(limiters.len(), 2);
1115        }
1116        
1117        // Run cleanup (shouldn't remove recent limiters)
1118        manager.cleanup_resources().await;
1119        
1120        {
1121            let limiters = manager.rate_limiters.read().await;
1122            assert_eq!(limiters.len(), 2); // Should still have both
1123        }
1124    }
1125
1126    #[test]
1127    fn test_bandwidth_tracker_creation() {
1128        let tracker = BandwidthTracker::new(Duration::from_secs(1));
1129        
1130        // Should start with zero usage
1131        assert_eq!(tracker.current_usage(), 0);
1132    }
1133
1134    #[test]
1135    fn test_bandwidth_tracker_window_reset() {
1136        let tracker = BandwidthTracker::new(Duration::from_millis(1)); // Very short window
1137        
1138        tracker.record(1000, 2000);
1139        
1140        // Immediately check usage
1141        let initial_usage = tracker.current_usage();
1142        assert!(initial_usage > 0);
1143        
1144        // Wait for window to expire and check again
1145        std::thread::sleep(Duration::from_millis(10));
1146        let usage_after_window = tracker.current_usage();
1147        assert_eq!(usage_after_window, 0);
1148    }
1149
1150    #[tokio::test]
1151    async fn test_resource_metrics_structure() {
1152        let config = create_test_config();
1153        let manager = ResourceManager::new(config);
1154        
1155        let metrics = manager.get_metrics().await;
1156        
1157        // Verify all fields are properly initialized
1158        assert_eq!(metrics.memory_used, 0);
1159        assert_eq!(metrics.active_connections, 0);
1160        assert_eq!(metrics.bandwidth_usage, 0);
1161        assert_eq!(metrics.cpu_usage, 0.0);
1162        
1163        // Verify nested structures
1164        assert_eq!(metrics.network_latency.sample_count, 0);
1165        assert_eq!(metrics.dht_metrics.ops_per_sec, 0.0);
1166        assert_eq!(metrics.mcp_metrics.calls_per_sec, 0.0);
1167        
1168        // Verify timestamp is recent
1169        assert!(metrics.timestamp.elapsed().unwrap().as_secs() < 1);
1170    }
1171}