Skip to main content

saorsa_core/
production.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Production hardening features for the P2P Foundation
15//!
16//! This module provides essential production-ready capabilities including:
17//! - Resource management and limits
18//! - Performance monitoring and metrics
19//! - Graceful shutdown handling
20//! - Configuration validation
21//! - Rate limiting and throttling
22//! - Health checks and diagnostics
23
24use crate::error::NetworkError;
25use crate::{P2PError, Result};
26use serde::{Deserialize, Serialize};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::{Duration, Instant, SystemTime};
30use tokio::sync::{RwLock, Semaphore};
31use tokio::time::interval;
32use tracing::{debug, error, info, warn};
33
34/// Production configuration with resource limits and performance tuning
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct ProductionConfig {
37    /// Maximum number of concurrent connections
38    pub max_connections: usize,
39    /// Maximum memory usage in bytes (0 = unlimited)
40    pub max_memory_bytes: u64,
41    /// Maximum bandwidth per second in bytes
42    pub max_bandwidth_bps: u64,
43    /// Connection timeout for new peers
44    pub connection_timeout: Duration,
45    /// Keep-alive interval for existing connections
46    pub keep_alive_interval: Duration,
47    /// Health check interval
48    pub health_check_interval: Duration,
49    /// Metrics collection interval
50    pub metrics_interval: Duration,
51    /// Enable detailed performance tracking
52    pub enable_performance_tracking: bool,
53    /// Enable automatic resource cleanup
54    pub enable_auto_cleanup: bool,
55    /// Graceful shutdown timeout
56    pub shutdown_timeout: Duration,
57    /// Rate limiting configuration
58    pub rate_limits: RateLimitConfig,
59}
60
61/// Rate limiting configuration for different operations
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct RateLimitConfig {
64    /// Maximum DHT operations per second per peer
65    pub dht_ops_per_sec: u32,
66    /// Maximum MCP calls per second per peer
67    pub mcp_calls_per_sec: u32,
68    /// Maximum messages per second per peer
69    pub messages_per_sec: u32,
70    /// Burst capacity for rate limiting
71    pub burst_capacity: u32,
72    /// Rate limit window duration
73    pub window_duration: Duration,
74}
75
76/// System resource usage metrics
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ResourceMetrics {
79    /// Current memory usage in bytes
80    pub memory_used: u64,
81    /// Current number of connections
82    pub active_connections: usize,
83    /// Current bandwidth usage in bytes per second
84    pub bandwidth_usage: u64,
85    /// CPU usage percentage (0.0 - 100.0)
86    pub cpu_usage: f64,
87    /// Network latency statistics
88    pub network_latency: LatencyStats,
89    /// DHT performance metrics
90    pub dht_metrics: DHTMetrics,
91    /// MCP performance metrics
92    pub mcp_metrics: MCPMetrics,
93    /// Timestamp of metrics collection
94    pub timestamp: SystemTime,
95}
96
97/// Network latency statistics
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct LatencyStats {
100    /// Average latency in milliseconds
101    pub avg_ms: f64,
102    /// Minimum latency in milliseconds
103    pub min_ms: f64,
104    /// Maximum latency in milliseconds
105    pub max_ms: f64,
106    /// 95th percentile latency in milliseconds
107    pub p95_ms: f64,
108    /// Number of samples
109    pub sample_count: u64,
110}
111
112/// DHT performance metrics
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct DHTMetrics {
115    /// Operations per second
116    pub ops_per_sec: f64,
117    /// Average operation latency in milliseconds
118    pub avg_latency_ms: f64,
119    /// Success rate (0.0 - 1.0)
120    pub success_rate: f64,
121    /// Cache hit rate (0.0 - 1.0)
122    pub cache_hit_rate: f64,
123}
124
125/// MCP performance metrics
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MCPMetrics {
128    /// Tool calls per second
129    pub calls_per_sec: f64,
130    /// Average call latency in milliseconds
131    pub avg_latency_ms: f64,
132    /// Success rate (0.0 - 1.0)
133    pub success_rate: f64,
134    /// Active service count
135    pub active_services: usize,
136}
137
138/// Production resource manager for enforcing limits and monitoring
139pub struct ResourceManager {
140    pub config: ProductionConfig,
141    metrics: Arc<RwLock<ResourceMetrics>>,
142    connection_semaphore: Arc<Semaphore>,
143    bandwidth_tracker: Arc<BandwidthTracker>,
144    rate_limiters: Arc<RwLock<std::collections::HashMap<String, RateLimiter>>>,
145    shutdown_signal: Arc<tokio::sync::Notify>,
146    is_shutting_down: Arc<std::sync::atomic::AtomicBool>,
147}
148
149/// Bandwidth tracking for monitoring and limiting
150struct BandwidthTracker {
151    bytes_sent: AtomicU64,
152    bytes_received: AtomicU64,
153    last_reset: Arc<RwLock<Instant>>,
154    window_duration: Duration,
155}
156
157/// Rate limiter using token bucket algorithm
158struct RateLimiter {
159    tokens: Arc<std::sync::Mutex<f64>>,
160    last_refill: Arc<std::sync::Mutex<Instant>>,
161    max_tokens: f64,
162    refill_rate: f64, // tokens per second
163}
164
165impl Default for ProductionConfig {
166    fn default() -> Self {
167        Self {
168            max_connections: 1000,
169            max_memory_bytes: 1024 * 1024 * 1024, // 1GB
170            max_bandwidth_bps: 100 * 1024 * 1024, // 100 MB/s
171            connection_timeout: Duration::from_secs(30),
172            keep_alive_interval: Duration::from_secs(30),
173            health_check_interval: Duration::from_secs(60),
174            metrics_interval: Duration::from_secs(10),
175            enable_performance_tracking: true,
176            enable_auto_cleanup: true,
177            shutdown_timeout: Duration::from_secs(30),
178            rate_limits: RateLimitConfig::default(),
179        }
180    }
181}
182
183impl Default for RateLimitConfig {
184    fn default() -> Self {
185        Self {
186            dht_ops_per_sec: 100,
187            mcp_calls_per_sec: 50,
188            messages_per_sec: 200,
189            burst_capacity: 10,
190            window_duration: Duration::from_secs(1),
191        }
192    }
193}
194
195impl ResourceManager {
196    /// Create a new resource manager with the given configuration
197    pub fn new(config: ProductionConfig) -> Self {
198        let connection_semaphore = Arc::new(Semaphore::new(config.max_connections));
199        let bandwidth_tracker = Arc::new(BandwidthTracker::new(Duration::from_secs(1)));
200
201        let initial_metrics = ResourceMetrics {
202            memory_used: 0,
203            active_connections: 0,
204            bandwidth_usage: 0,
205            cpu_usage: 0.0,
206            network_latency: LatencyStats::default(),
207            dht_metrics: DHTMetrics::default(),
208            mcp_metrics: MCPMetrics::default(),
209            timestamp: SystemTime::now(),
210        };
211
212        Self {
213            config,
214            metrics: Arc::new(RwLock::new(initial_metrics)),
215            connection_semaphore,
216            bandwidth_tracker,
217            rate_limiters: Arc::new(RwLock::new(std::collections::HashMap::new())),
218            shutdown_signal: Arc::new(tokio::sync::Notify::new()),
219            is_shutting_down: Arc::new(std::sync::atomic::AtomicBool::new(false)),
220        }
221    }
222
223    /// Start the resource manager background tasks
224    pub async fn start(&self) -> Result<()> {
225        info!("Starting production resource manager");
226
227        // Start metrics collection task
228        if self.config.enable_performance_tracking {
229            self.spawn_metrics_collector().await;
230        }
231
232        // Start health check task
233        self.spawn_health_checker().await;
234
235        // Start cleanup task
236        if self.config.enable_auto_cleanup {
237            self.spawn_cleanup_task().await;
238        }
239
240        info!("Production resource manager started successfully");
241        Ok(())
242    }
243
244    /// Gracefully shutdown the resource manager
245    pub async fn shutdown(&self) -> Result<()> {
246        info!("Initiating graceful shutdown of resource manager");
247
248        self.is_shutting_down.store(true, Ordering::SeqCst);
249        self.shutdown_signal.notify_waiters();
250
251        // Wait for shutdown timeout
252        tokio::time::timeout(self.config.shutdown_timeout, async {
253            // Wait for all connections to close gracefully
254            while self.connection_semaphore.available_permits() < self.config.max_connections {
255                tokio::time::sleep(Duration::from_millis(100)).await;
256            }
257        })
258        .await
259        .map_err(|_| {
260            P2PError::Network(crate::error::NetworkError::ProtocolError(
261                "Shutdown timeout exceeded".to_string().into(),
262            ))
263        })?;
264
265        info!("Resource manager shutdown completed");
266        Ok(())
267    }
268
269    /// Attempt to acquire a connection slot
270    pub async fn acquire_connection(&self) -> Result<ConnectionGuard<'_>> {
271        if self.is_shutting_down.load(Ordering::SeqCst) {
272            return Err(P2PError::Network(
273                crate::error::NetworkError::ProtocolError(
274                    "System is shutting down".to_string().into(),
275                ),
276            ));
277        }
278
279        let permit = self
280            .connection_semaphore
281            .clone()
282            .acquire_owned()
283            .await
284            .map_err(|_| {
285                P2PError::Network(crate::error::NetworkError::ProtocolError(
286                    "Connection semaphore closed".to_string().into(),
287                ))
288            })?;
289
290        debug!(
291            "Connection acquired, {} remaining",
292            self.connection_semaphore.available_permits()
293        );
294        Ok(ConnectionGuard {
295            permit,
296            _manager: self,
297        })
298    }
299
300    /// Check if a peer is within rate limits for the given operation
301    pub async fn check_rate_limit(&self, peer_id: &str, operation: &str) -> Result<bool> {
302        let limit = match operation {
303            "dht" => self.config.rate_limits.dht_ops_per_sec,
304            "mcp" => self.config.rate_limits.mcp_calls_per_sec,
305            "message" => self.config.rate_limits.messages_per_sec,
306            _ => return Ok(true), // Unknown operation, allow
307        };
308
309        let mut limiters = self.rate_limiters.write().await;
310        let limiter = limiters.entry(peer_id.to_string()).or_insert_with(|| {
311            RateLimiter::new(limit as f64, self.config.rate_limits.burst_capacity as f64)
312        });
313
314        limiter.try_acquire()
315    }
316
317    /// Record bandwidth usage
318    pub fn record_bandwidth(&self, bytes_sent: u64, bytes_received: u64) {
319        self.bandwidth_tracker.record(bytes_sent, bytes_received);
320    }
321
322    /// Get current resource metrics
323    pub async fn get_metrics(&self) -> ResourceMetrics {
324        self.metrics.read().await.clone()
325    }
326
327    /// Check if the system is healthy
328    pub async fn health_check(&self) -> Result<()> {
329        let metrics = self.get_metrics().await;
330
331        // Check memory usage
332        if self.config.max_memory_bytes > 0 && metrics.memory_used > self.config.max_memory_bytes {
333            warn!(
334                "Memory usage ({} bytes) exceeds limit ({} bytes)",
335                metrics.memory_used, self.config.max_memory_bytes
336            );
337            return Err(P2PError::Network(
338                crate::error::NetworkError::ProtocolError(
339                    "Memory limit exceeded".to_string().into(),
340                ),
341            ));
342        }
343
344        // Check bandwidth usage
345        if metrics.bandwidth_usage > self.config.max_bandwidth_bps {
346            warn!(
347                "Bandwidth usage ({} bps) exceeds limit ({} bps)",
348                metrics.bandwidth_usage, self.config.max_bandwidth_bps
349            );
350        }
351
352        // Check connection count
353        if metrics.active_connections >= self.config.max_connections {
354            warn!(
355                "Connection count ({}) at maximum ({})",
356                metrics.active_connections, self.config.max_connections
357            );
358        }
359
360        debug!(
361            "Health check passed: {} connections, {} bytes memory, {} bps bandwidth",
362            metrics.active_connections, metrics.memory_used, metrics.bandwidth_usage
363        );
364
365        Ok(())
366    }
367
368    /// Spawn metrics collection background task
369    async fn spawn_metrics_collector(&self) {
370        let manager = self.clone();
371        tokio::spawn(async move {
372            let mut interval = interval(manager.config.metrics_interval);
373            loop {
374                tokio::select! {
375                    _ = interval.tick() => {
376                        if let Err(e) = manager.collect_metrics().await {
377                            error!("Failed to collect metrics: {}", e);
378                        }
379                    }
380                    _ = manager.shutdown_signal.notified() => {
381                        debug!("Metrics collector shutting down");
382                        break;
383                    }
384                }
385            }
386        });
387    }
388
389    /// Spawn health check background task
390    async fn spawn_health_checker(&self) {
391        let manager = self.clone();
392        tokio::spawn(async move {
393            let mut interval = interval(manager.config.health_check_interval);
394            loop {
395                tokio::select! {
396                    _ = interval.tick() => {
397                        if let Err(e) = manager.health_check().await {
398                            error!("Health check failed: {}", e);
399                        }
400                    }
401                    _ = manager.shutdown_signal.notified() => {
402                        debug!("Health checker shutting down");
403                        break;
404                    }
405                }
406            }
407        });
408    }
409
410    /// Spawn cleanup background task
411    async fn spawn_cleanup_task(&self) {
412        let manager = self.clone();
413        tokio::spawn(async move {
414            let mut interval = interval(Duration::from_secs(300)); // Cleanup every 5 minutes
415            loop {
416                tokio::select! {
417                    _ = interval.tick() => {
418                        manager.cleanup_resources().await;
419                    }
420                    _ = manager.shutdown_signal.notified() => {
421                        debug!("Cleanup task shutting down");
422                        break;
423                    }
424                }
425            }
426        });
427    }
428
429    /// Collect current system metrics
430    async fn collect_metrics(&self) -> Result<()> {
431        let mut metrics = self.metrics.write().await;
432
433        // Update bandwidth usage
434        metrics.bandwidth_usage = self.bandwidth_tracker.current_usage();
435
436        // Update connection count
437        metrics.active_connections =
438            self.config.max_connections - self.connection_semaphore.available_permits();
439
440        // Update timestamp
441        metrics.timestamp = SystemTime::now();
442
443        debug!(
444            "Metrics updated: {} connections, {} bps bandwidth",
445            metrics.active_connections, metrics.bandwidth_usage
446        );
447
448        Ok(())
449    }
450
451    /// Clean up expired resources
452    async fn cleanup_resources(&self) {
453        debug!("Starting resource cleanup");
454
455        // Clean up expired rate limiters
456        let mut limiters = self.rate_limiters.write().await;
457        let now = Instant::now();
458        limiters.retain(|_, limiter| {
459            // If is_expired fails, assume it's expired and remove it
460            match limiter.is_expired(now) {
461                Ok(expired) => !expired,
462                Err(_) => false,
463            }
464        });
465
466        debug!(
467            "Cleanup completed, {} rate limiters remaining",
468            limiters.len()
469        );
470    }
471}
472
473// Implement Clone for ResourceManager to allow sharing across tasks
474impl Clone for ResourceManager {
475    fn clone(&self) -> Self {
476        Self {
477            config: self.config.clone(),
478            metrics: self.metrics.clone(),
479            connection_semaphore: self.connection_semaphore.clone(),
480            bandwidth_tracker: self.bandwidth_tracker.clone(),
481            rate_limiters: self.rate_limiters.clone(),
482            shutdown_signal: self.shutdown_signal.clone(),
483            is_shutting_down: self.is_shutting_down.clone(),
484        }
485    }
486}
487
488/// RAII guard for connection permits
489pub struct ConnectionGuard<'a> {
490    #[allow(dead_code)]
491    permit: tokio::sync::OwnedSemaphorePermit,
492    _manager: &'a ResourceManager,
493}
494
495impl<'a> Drop for ConnectionGuard<'a> {
496    fn drop(&mut self) {
497        debug!("Connection released");
498    }
499}
500
501impl BandwidthTracker {
502    fn new(window_duration: Duration) -> Self {
503        Self {
504            bytes_sent: AtomicU64::new(0),
505            bytes_received: AtomicU64::new(0),
506            last_reset: Arc::new(RwLock::new(Instant::now())),
507            window_duration,
508        }
509    }
510
511    fn record(&self, bytes_sent: u64, bytes_received: u64) {
512        self.bytes_sent.fetch_add(bytes_sent, Ordering::Relaxed);
513        self.bytes_received
514            .fetch_add(bytes_received, Ordering::Relaxed);
515    }
516
517    fn current_usage(&self) -> u64 {
518        let now = Instant::now();
519
520        // Try to read last reset time non-blocking first
521        let last_reset = {
522            if let Ok(guard) = self.last_reset.try_read() {
523                *guard
524            } else {
525                // If we can't get a read lock, return current values without reset
526                let sent = self.bytes_sent.load(Ordering::Relaxed);
527                let received = self.bytes_received.load(Ordering::Relaxed);
528                return sent + received; // Return raw bytes without rate calculation
529            }
530        };
531
532        if now.duration_since(last_reset) >= self.window_duration {
533            // Try to reset counters for new window
534            if let Ok(mut guard) = self.last_reset.try_write() {
535                self.bytes_sent.store(0, Ordering::Relaxed);
536                self.bytes_received.store(0, Ordering::Relaxed);
537                *guard = now;
538                return 0;
539            }
540        }
541
542        let sent = self.bytes_sent.load(Ordering::Relaxed);
543        let received = self.bytes_received.load(Ordering::Relaxed);
544
545        // Calculate bytes per second
546        let elapsed_secs = now.duration_since(last_reset).as_secs_f64();
547        if elapsed_secs > 0.0 {
548            ((sent + received) as f64 / elapsed_secs) as u64
549        } else {
550            0
551        }
552    }
553}
554
555impl RateLimiter {
556    fn new(max_tokens: f64, refill_rate: f64) -> Self {
557        Self {
558            tokens: Arc::new(std::sync::Mutex::new(max_tokens)),
559            last_refill: Arc::new(std::sync::Mutex::new(Instant::now())),
560            max_tokens,
561            refill_rate,
562        }
563    }
564
565    fn try_acquire(&self) -> Result<bool> {
566        let now = Instant::now();
567
568        // Refill tokens based on elapsed time
569        {
570            let mut last_refill = self.last_refill.lock().map_err(|_| {
571                P2PError::Network(NetworkError::ProtocolError(
572                    "mutex lock failed".to_string().into(),
573                ))
574            })?;
575            let elapsed = now.duration_since(*last_refill).as_secs_f64();
576
577            if elapsed > 0.0 {
578                let mut tokens = self.tokens.lock().map_err(|_| {
579                    P2PError::Network(NetworkError::ProtocolError(
580                        "mutex lock failed".to_string().into(),
581                    ))
582                })?;
583                *tokens = (*tokens + elapsed * self.refill_rate).min(self.max_tokens);
584                *last_refill = now;
585            }
586        }
587
588        // Try to consume a token
589        let mut tokens = self.tokens.lock().map_err(|_| {
590            P2PError::Network(NetworkError::ProtocolError(
591                "mutex lock failed".to_string().into(),
592            ))
593        })?;
594        if *tokens >= 1.0 {
595            *tokens -= 1.0;
596            Ok(true)
597        } else {
598            Ok(false)
599        }
600    }
601
602    fn is_expired(&self, now: Instant) -> Result<bool> {
603        let last_refill = *self.last_refill.lock().map_err(|_| {
604            P2PError::Network(NetworkError::ProtocolError(
605                "mutex lock failed".to_string().into(),
606            ))
607        })?;
608        Ok(now.duration_since(last_refill) > Duration::from_secs(300)) // 5 minutes
609    }
610}
611
612impl Default for LatencyStats {
613    fn default() -> Self {
614        Self {
615            avg_ms: 0.0,
616            min_ms: 0.0,
617            max_ms: 0.0,
618            p95_ms: 0.0,
619            sample_count: 0,
620        }
621    }
622}
623
624impl Default for DHTMetrics {
625    fn default() -> Self {
626        Self {
627            ops_per_sec: 0.0,
628            avg_latency_ms: 0.0,
629            success_rate: 1.0,
630            cache_hit_rate: 0.0,
631        }
632    }
633}
634
635impl Default for MCPMetrics {
636    fn default() -> Self {
637        Self {
638            calls_per_sec: 0.0,
639            avg_latency_ms: 0.0,
640            success_rate: 1.0,
641            active_services: 0,
642        }
643    }
644}
645
646#[cfg(test)]
647mod tests {
648    use super::*;
649    use tokio::time::sleep;
650
651    fn create_test_config() -> ProductionConfig {
652        ProductionConfig {
653            max_connections: 10,
654            max_memory_bytes: 1024 * 1024,  // 1MB for testing
655            max_bandwidth_bps: 1024 * 1024, // 1MB/s for testing
656            connection_timeout: Duration::from_millis(100),
657            keep_alive_interval: Duration::from_millis(50),
658            health_check_interval: Duration::from_millis(50),
659            metrics_interval: Duration::from_millis(50),
660            enable_performance_tracking: true,
661            enable_auto_cleanup: true,
662            shutdown_timeout: Duration::from_millis(200),
663            rate_limits: RateLimitConfig {
664                dht_ops_per_sec: 5,
665                mcp_calls_per_sec: 3,
666                messages_per_sec: 10,
667                burst_capacity: 5,
668                window_duration: Duration::from_millis(100),
669            },
670        }
671    }
672
673    #[test]
674    fn test_production_config_default() {
675        let config = ProductionConfig::default();
676        assert_eq!(config.max_connections, 1000);
677        assert_eq!(config.max_memory_bytes, 1024 * 1024 * 1024); // 1GB
678        assert_eq!(config.max_bandwidth_bps, 100 * 1024 * 1024); // 100MB/s
679        assert_eq!(config.connection_timeout, Duration::from_secs(30));
680        assert_eq!(config.keep_alive_interval, Duration::from_secs(30));
681        assert_eq!(config.health_check_interval, Duration::from_secs(60));
682        assert_eq!(config.metrics_interval, Duration::from_secs(10));
683        assert!(config.enable_performance_tracking);
684        assert!(config.enable_auto_cleanup);
685        assert_eq!(config.shutdown_timeout, Duration::from_secs(30));
686    }
687
688    #[test]
689    fn test_rate_limit_config_default() {
690        let config = RateLimitConfig::default();
691        assert_eq!(config.dht_ops_per_sec, 100);
692        assert_eq!(config.mcp_calls_per_sec, 50);
693        assert_eq!(config.messages_per_sec, 200);
694        assert_eq!(config.burst_capacity, 10);
695        assert_eq!(config.window_duration, Duration::from_secs(1));
696    }
697
698    #[test]
699    fn test_latency_stats_default() {
700        let stats = LatencyStats::default();
701        assert_eq!(stats.avg_ms, 0.0);
702        assert_eq!(stats.min_ms, 0.0);
703        assert_eq!(stats.max_ms, 0.0);
704        assert_eq!(stats.p95_ms, 0.0);
705        assert_eq!(stats.sample_count, 0);
706    }
707
708    #[test]
709    fn test_dht_metrics_default() {
710        let metrics = DHTMetrics::default();
711        assert_eq!(metrics.ops_per_sec, 0.0);
712        assert_eq!(metrics.avg_latency_ms, 0.0);
713        assert_eq!(metrics.success_rate, 1.0);
714        assert_eq!(metrics.cache_hit_rate, 0.0);
715    }
716
717    #[test]
718    fn test_mcp_metrics_default() {
719        let metrics = MCPMetrics::default();
720        assert_eq!(metrics.calls_per_sec, 0.0);
721        assert_eq!(metrics.avg_latency_ms, 0.0);
722        assert_eq!(metrics.success_rate, 1.0);
723        assert_eq!(metrics.active_services, 0);
724    }
725
726    #[tokio::test]
727    async fn test_resource_manager_creation() {
728        let config = create_test_config();
729        let manager = ResourceManager::new(config.clone());
730
731        let metrics = manager.get_metrics().await;
732        assert_eq!(metrics.active_connections, 0);
733        assert_eq!(metrics.bandwidth_usage, 0);
734        assert_eq!(metrics.memory_used, 0);
735        assert_eq!(metrics.cpu_usage, 0.0);
736        assert_eq!(metrics.network_latency.sample_count, 0);
737        assert_eq!(metrics.dht_metrics.success_rate, 1.0);
738        assert_eq!(metrics.mcp_metrics.success_rate, 1.0);
739    }
740
741    #[tokio::test]
742    async fn test_resource_manager_cloning() -> Result<()> {
743        let config = create_test_config();
744        let manager = ResourceManager::new(config);
745        let cloned = manager.clone();
746
747        // Both should work independently
748        let _guard1 = manager.acquire_connection().await?;
749        let _guard2 = cloned.acquire_connection().await?;
750
751        // But they share the same semaphore
752        assert_eq!(manager.connection_semaphore.available_permits(), 8);
753        assert_eq!(cloned.connection_semaphore.available_permits(), 8);
754
755        Ok(())
756    }
757
758    #[tokio::test]
759    async fn test_connection_acquisition() -> Result<()> {
760        let config = ProductionConfig {
761            max_connections: 2,
762            ..create_test_config()
763        };
764        let manager = ResourceManager::new(config);
765
766        // Acquire first connection
767        let _guard1 = manager.acquire_connection().await?;
768        assert_eq!(manager.connection_semaphore.available_permits(), 1);
769
770        // Acquire second connection
771        let _guard2 = manager.acquire_connection().await?;
772        assert_eq!(manager.connection_semaphore.available_permits(), 0);
773
774        // Drop first guard and check permit is released
775        drop(_guard1);
776        sleep(Duration::from_millis(1)).await; // Allow time for cleanup
777        assert_eq!(manager.connection_semaphore.available_permits(), 1);
778
779        Ok(())
780    }
781
782    #[tokio::test]
783    async fn test_connection_acquisition_during_shutdown() {
784        let config = create_test_config();
785        let manager = ResourceManager::new(config);
786
787        // Mark as shutting down
788        manager.is_shutting_down.store(true, Ordering::SeqCst);
789
790        // Should fail to acquire connection during shutdown
791        let result = manager.acquire_connection().await;
792        assert!(result.is_err());
793        match result {
794            Err(e) => assert!(e.to_string().contains("shutting down")),
795            Ok(_) => panic!("Expected error but got success"),
796        }
797    }
798
799    #[tokio::test]
800    async fn test_connection_guard_drop() -> Result<()> {
801        let config = create_test_config();
802        let manager = ResourceManager::new(config);
803
804        let initial_permits = manager.connection_semaphore.available_permits();
805        {
806            let _guard = manager.acquire_connection().await?;
807            assert_eq!(
808                manager.connection_semaphore.available_permits(),
809                initial_permits - 1
810            );
811        }
812        // Guard should be dropped and permit released
813        assert_eq!(
814            manager.connection_semaphore.available_permits(),
815            initial_permits
816        );
817
818        Ok(())
819    }
820
821    #[tokio::test]
822    async fn test_rate_limiting_dht_operations() -> Result<()> {
823        let config = ProductionConfig {
824            rate_limits: RateLimitConfig {
825                dht_ops_per_sec: 2,
826                burst_capacity: 2,
827                ..Default::default()
828            },
829            ..create_test_config()
830        };
831        let manager = ResourceManager::new(config);
832
833        // Should allow burst capacity
834        assert!(manager.check_rate_limit("peer1", "dht").await?);
835        assert!(manager.check_rate_limit("peer1", "dht").await?);
836
837        // Should deny after burst exhausted
838        assert!(!manager.check_rate_limit("peer1", "dht").await?);
839
840        Ok(())
841    }
842
843    #[tokio::test]
844    async fn test_rate_limiting_mcp_operations() -> Result<()> {
845        let config = ProductionConfig {
846            rate_limits: RateLimitConfig {
847                mcp_calls_per_sec: 1,
848                burst_capacity: 1,
849                ..Default::default()
850            },
851            ..create_test_config()
852        };
853        let manager = ResourceManager::new(config);
854
855        // Should allow first MCP call
856        assert!(manager.check_rate_limit("peer2", "mcp").await?);
857
858        // Should deny second MCP call
859        assert!(!manager.check_rate_limit("peer2", "mcp").await?);
860        Ok(())
861    }
862
863    #[tokio::test]
864    async fn test_rate_limiting_message_operations() -> Result<()> {
865        let config = ProductionConfig {
866            rate_limits: RateLimitConfig {
867                messages_per_sec: 3,
868                burst_capacity: 3,
869                ..Default::default()
870            },
871            ..create_test_config()
872        };
873        let manager = ResourceManager::new(config);
874
875        // Should allow burst capacity for messages
876        for _ in 0..3 {
877            assert!(manager.check_rate_limit("peer3", "message").await?);
878        }
879
880        // Should deny after burst exhausted
881        assert!(!manager.check_rate_limit("peer3", "message").await?);
882
883        Ok(())
884    }
885
886    #[tokio::test]
887    async fn test_rate_limiting_unknown_operation() -> Result<()> {
888        let config = create_test_config();
889        let manager = ResourceManager::new(config);
890
891        // Unknown operations should be allowed
892        assert!(manager.check_rate_limit("peer4", "unknown").await?);
893        assert!(manager.check_rate_limit("peer4", "unknown").await?);
894
895        Ok(())
896    }
897
898    #[tokio::test]
899    async fn test_rate_limiting_different_peers() -> Result<()> {
900        let config = ProductionConfig {
901            rate_limits: RateLimitConfig {
902                dht_ops_per_sec: 1,
903                burst_capacity: 1,
904                ..Default::default()
905            },
906            ..create_test_config()
907        };
908        let manager = ResourceManager::new(config);
909
910        // Each peer should have independent rate limits
911        assert!(manager.check_rate_limit("peer1", "dht").await?);
912        assert!(manager.check_rate_limit("peer2", "dht").await?);
913
914        // But each peer should be limited individually
915        assert!(!manager.check_rate_limit("peer1", "dht").await?);
916        assert!(!manager.check_rate_limit("peer2", "dht").await?);
917
918        Ok(())
919    }
920
921    #[tokio::test]
922    async fn test_bandwidth_tracking() {
923        let tracker = BandwidthTracker::new(Duration::from_millis(100)); // Shorter window for testing
924
925        tracker.record(1000, 2000);
926        let usage = tracker.current_usage();
927        assert!(usage > 0);
928
929        // Test reset after window
930        sleep(Duration::from_millis(150)).await; // Wait longer than window
931        let usage_after_reset = tracker.current_usage();
932        assert_eq!(usage_after_reset, 0);
933    }
934
935    #[tokio::test]
936    async fn test_bandwidth_tracking_rate_calculation() {
937        let tracker = BandwidthTracker::new(Duration::from_secs(1));
938
939        // Record some bytes
940        tracker.record(500, 500); // 1000 bytes total
941
942        // Wait a short time
943        sleep(Duration::from_millis(50)).await;
944
945        let usage = tracker.current_usage();
946        // Should calculate bytes per second
947        assert!(usage > 10000); // 1000 bytes in 0.05 seconds = ~20000 bps
948    }
949
950    #[tokio::test]
951    async fn test_bandwidth_tracking_multiple_records() {
952        let tracker = BandwidthTracker::new(Duration::from_millis(200));
953
954        tracker.record(100, 200);
955        tracker.record(300, 400);
956        tracker.record(500, 600);
957
958        let usage = tracker.current_usage();
959        assert!(usage > 0);
960
961        // All records should be included in calculation
962        let sent = tracker.bytes_sent.load(Ordering::Relaxed);
963        let received = tracker.bytes_received.load(Ordering::Relaxed);
964        assert_eq!(sent, 900); // 100 + 300 + 500
965        assert_eq!(received, 1200); // 200 + 400 + 600
966    }
967
968    #[tokio::test]
969    async fn test_manager_bandwidth_recording() {
970        let config = create_test_config();
971        let manager = ResourceManager::new(config);
972
973        // Record some bandwidth usage
974        manager.record_bandwidth(1000, 2000);
975
976        // Should be reflected in current usage
977        let usage = manager.bandwidth_tracker.current_usage();
978        assert!(usage > 0);
979    }
980
981    #[tokio::test]
982    async fn test_health_check_success() {
983        let config = ProductionConfig {
984            max_memory_bytes: 2048,   // 2KB
985            max_bandwidth_bps: 10000, // 10KB/s
986            max_connections: 5,
987            ..create_test_config()
988        };
989        let manager = ResourceManager::new(config);
990
991        // Health check should pass with default metrics
992        let result = manager.health_check().await;
993        assert!(result.is_ok());
994    }
995
996    #[tokio::test]
997    async fn test_health_check_memory_limit_exceeded() {
998        let config = ProductionConfig {
999            max_memory_bytes: 100, // Very low limit
1000            ..create_test_config()
1001        };
1002        let manager = ResourceManager::new(config);
1003
1004        // Manually set high memory usage
1005        {
1006            let mut metrics = manager.metrics.write().await;
1007            metrics.memory_used = 200; // Exceeds limit
1008        }
1009
1010        // Health check should fail
1011        let result = manager.health_check().await;
1012        assert!(result.is_err());
1013        assert!(
1014            result
1015                .unwrap_err()
1016                .to_string()
1017                .contains("Memory limit exceeded")
1018        );
1019    }
1020
1021    #[tokio::test]
1022    async fn test_health_check_bandwidth_warning() {
1023        let config = ProductionConfig {
1024            max_bandwidth_bps: 1000,
1025            ..create_test_config()
1026        };
1027        let manager = ResourceManager::new(config);
1028
1029        // Manually set high bandwidth usage
1030        {
1031            let mut metrics = manager.metrics.write().await;
1032            metrics.bandwidth_usage = 2000; // Exceeds limit but doesn't fail
1033        }
1034
1035        // Health check should still pass (bandwidth warning only)
1036        let result = manager.health_check().await;
1037        assert!(result.is_ok());
1038    }
1039
1040    #[tokio::test]
1041    async fn test_health_check_connection_warning() -> Result<()> {
1042        let config = ProductionConfig {
1043            max_connections: 2,
1044            ..create_test_config()
1045        };
1046        let manager = ResourceManager::new(config);
1047
1048        // Acquire maximum connections
1049        let _guard1 = manager.acquire_connection().await?;
1050        let _guard2 = manager.acquire_connection().await?;
1051
1052        // Manually update metrics to reflect max connections
1053        {
1054            let mut metrics = manager.metrics.write().await;
1055            metrics.active_connections = 2;
1056        }
1057
1058        // Health check should still pass (connection warning only)
1059        let result = manager.health_check().await;
1060        assert!(result.is_ok());
1061        Ok(())
1062    }
1063
1064    #[tokio::test]
1065    async fn test_metrics_collection() -> Result<()> {
1066        let config = create_test_config();
1067        let manager = ResourceManager::new(config);
1068
1069        // Record some activity
1070        manager.record_bandwidth(500, 1000);
1071        let _guard = manager.acquire_connection().await?;
1072
1073        // Collect metrics
1074        manager.collect_metrics().await?;
1075
1076        let metrics = manager.get_metrics().await;
1077        assert_eq!(metrics.active_connections, 1);
1078        assert!(metrics.bandwidth_usage > 0);
1079        assert!(metrics.timestamp.elapsed().unwrap().as_millis() < 100); // Recent timestamp
1080
1081        Ok(())
1082    }
1083
1084    #[tokio::test]
1085    async fn test_graceful_shutdown() -> Result<()> {
1086        let config = ProductionConfig {
1087            shutdown_timeout: Duration::from_millis(100),
1088            ..create_test_config()
1089        };
1090        let manager = ResourceManager::new(config);
1091
1092        // Start manager
1093        manager.start().await?;
1094
1095        // Shutdown should complete successfully
1096        let result = manager.shutdown().await;
1097        assert!(result.is_ok());
1098
1099        // Should be marked as shutting down
1100        assert!(manager.is_shutting_down.load(Ordering::SeqCst));
1101
1102        Ok(())
1103    }
1104
1105    #[tokio::test]
1106    async fn test_graceful_shutdown_with_connections() -> Result<()> {
1107        let config = ProductionConfig {
1108            shutdown_timeout: Duration::from_millis(200),
1109            max_connections: 2,
1110            ..create_test_config()
1111        };
1112        let manager = ResourceManager::new(config);
1113
1114        // Acquire a connection
1115        let guard = manager.acquire_connection().await?;
1116
1117        // Start shutdown in background
1118        let manager_clone = manager.clone();
1119        let shutdown_task = tokio::spawn(async move { manager_clone.shutdown().await });
1120
1121        // Wait a bit then release connection
1122        sleep(Duration::from_millis(50)).await;
1123        drop(guard);
1124
1125        // Shutdown should complete
1126        let result = shutdown_task.await.expect("Task panicked");
1127        assert!(result.is_ok());
1128        Ok(())
1129    }
1130
1131    #[tokio::test]
1132    async fn test_shutdown_timeout() -> Result<()> {
1133        let config = ProductionConfig {
1134            shutdown_timeout: Duration::from_millis(50), // Very short timeout
1135            max_connections: 1,
1136            ..create_test_config()
1137        };
1138        let manager = ResourceManager::new(config);
1139
1140        // Acquire and hold a connection
1141        let _guard = manager.acquire_connection().await?;
1142
1143        // Shutdown should timeout
1144        let result = manager.shutdown().await;
1145        assert!(result.is_err());
1146        assert!(result.unwrap_err().to_string().contains("Shutdown timeout"));
1147        Ok(())
1148    }
1149
1150    #[tokio::test]
1151    async fn test_start_with_disabled_features() {
1152        let config = ProductionConfig {
1153            enable_performance_tracking: false,
1154            enable_auto_cleanup: false,
1155            ..create_test_config()
1156        };
1157        let manager = ResourceManager::new(config);
1158
1159        // Should start successfully even with features disabled
1160        let result = manager.start().await;
1161        assert!(result.is_ok());
1162    }
1163
1164    #[test]
1165    fn test_rate_limiter_creation() {
1166        let limiter = RateLimiter::new(10.0, 5.0); // 10 tokens max, 5 per second refill
1167
1168        // Should start with full capacity
1169        assert!(limiter.try_acquire().expect("Should succeed in test"));
1170    }
1171
1172    #[test]
1173    fn test_rate_limiter_token_exhaustion() {
1174        let limiter = RateLimiter::new(2.0, 1.0); // 2 tokens max, 1 per second refill
1175
1176        // Should allow 2 acquisitions
1177        assert!(limiter.try_acquire().expect("Should succeed in test"));
1178        assert!(limiter.try_acquire().expect("Should succeed in test"));
1179
1180        // Should deny third acquisition
1181        assert!(!limiter.try_acquire().expect("Should succeed in test"));
1182    }
1183
1184    #[tokio::test]
1185    async fn test_rate_limiter_refill() {
1186        let limiter = RateLimiter::new(1.0, 10.0); // 1 token max, 10 per second refill
1187
1188        // Exhaust tokens
1189        assert!(limiter.try_acquire().expect("Should succeed in test"));
1190        assert!(!limiter.try_acquire().expect("Should succeed in test"));
1191
1192        // Wait for refill
1193        sleep(Duration::from_millis(200)).await; // Should refill at least 2 tokens
1194
1195        // Should allow acquisition again
1196        assert!(limiter.try_acquire().expect("Should succeed in test"));
1197    }
1198
1199    #[test]
1200    fn test_rate_limiter_expiration() {
1201        let limiter = RateLimiter::new(10.0, 5.0);
1202
1203        // Should not be expired initially
1204        assert!(
1205            !limiter
1206                .is_expired(Instant::now())
1207                .expect("Should succeed in test")
1208        );
1209
1210        // Should be expired after 5+ minutes
1211        let future_time = Instant::now() + Duration::from_secs(400);
1212        assert!(
1213            limiter
1214                .is_expired(future_time)
1215                .expect("Should succeed in test")
1216        );
1217    }
1218
1219    #[tokio::test]
1220    async fn test_cleanup_resources() -> Result<()> {
1221        let config = create_test_config();
1222        let manager = ResourceManager::new(config);
1223
1224        // Add some rate limiters
1225        manager.check_rate_limit("peer1", "dht").await?;
1226        manager.check_rate_limit("peer2", "mcp").await?;
1227
1228        // Should have rate limiters
1229        {
1230            let limiters = manager.rate_limiters.read().await;
1231            assert_eq!(limiters.len(), 2);
1232        }
1233
1234        // Run cleanup (shouldn't remove recent limiters)
1235        manager.cleanup_resources().await;
1236
1237        {
1238            let limiters = manager.rate_limiters.read().await;
1239            assert_eq!(limiters.len(), 2); // Should still have both
1240        }
1241        Ok(())
1242    }
1243
1244    #[test]
1245    fn test_bandwidth_tracker_creation() {
1246        let tracker = BandwidthTracker::new(Duration::from_secs(1));
1247
1248        // Should start with zero usage
1249        assert_eq!(tracker.current_usage(), 0);
1250    }
1251
1252    #[test]
1253    fn test_bandwidth_tracker_window_reset() {
1254        let tracker = BandwidthTracker::new(Duration::from_millis(1)); // Very short window
1255
1256        tracker.record(1000, 2000);
1257
1258        // Immediately check usage
1259        let initial_usage = tracker.current_usage();
1260        assert!(initial_usage > 0);
1261
1262        // Wait for window to expire and check again
1263        std::thread::sleep(Duration::from_millis(10));
1264        let usage_after_window = tracker.current_usage();
1265        assert_eq!(usage_after_window, 0);
1266    }
1267
1268    #[tokio::test]
1269    async fn test_resource_metrics_structure() {
1270        let config = create_test_config();
1271        let manager = ResourceManager::new(config);
1272
1273        let metrics = manager.get_metrics().await;
1274
1275        // Verify all fields are properly initialized
1276        assert_eq!(metrics.memory_used, 0);
1277        assert_eq!(metrics.active_connections, 0);
1278        assert_eq!(metrics.bandwidth_usage, 0);
1279        assert_eq!(metrics.cpu_usage, 0.0);
1280
1281        // Verify nested structures
1282        assert_eq!(metrics.network_latency.sample_count, 0);
1283        assert_eq!(metrics.dht_metrics.ops_per_sec, 0.0);
1284        assert_eq!(metrics.mcp_metrics.calls_per_sec, 0.0);
1285
1286        // Verify timestamp is recent
1287        assert!(metrics.timestamp.elapsed().unwrap().as_secs() < 1);
1288    }
1289}