1use crate::{P2PError, Result};
12use serde::{Deserialize, Serialize};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16use tokio::sync::{RwLock, Semaphore};
17use tokio::time::interval;
18use tracing::{debug, info, warn, error};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ProductionConfig {
23 pub max_connections: usize,
25 pub max_memory_bytes: u64,
27 pub max_bandwidth_bps: u64,
29 pub connection_timeout: Duration,
31 pub keep_alive_interval: Duration,
33 pub health_check_interval: Duration,
35 pub metrics_interval: Duration,
37 pub enable_performance_tracking: bool,
39 pub enable_auto_cleanup: bool,
41 pub shutdown_timeout: Duration,
43 pub rate_limits: RateLimitConfig,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct RateLimitConfig {
50 pub dht_ops_per_sec: u32,
52 pub mcp_calls_per_sec: u32,
54 pub messages_per_sec: u32,
56 pub burst_capacity: u32,
58 pub window_duration: Duration,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ResourceMetrics {
65 pub memory_used: u64,
67 pub active_connections: usize,
69 pub bandwidth_usage: u64,
71 pub cpu_usage: f64,
73 pub network_latency: LatencyStats,
75 pub dht_metrics: DHTMetrics,
77 pub mcp_metrics: MCPMetrics,
79 pub timestamp: SystemTime,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct LatencyStats {
86 pub avg_ms: f64,
88 pub min_ms: f64,
90 pub max_ms: f64,
92 pub p95_ms: f64,
94 pub sample_count: u64,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct DHTMetrics {
101 pub ops_per_sec: f64,
103 pub avg_latency_ms: f64,
105 pub success_rate: f64,
107 pub cache_hit_rate: f64,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct MCPMetrics {
114 pub calls_per_sec: f64,
116 pub avg_latency_ms: f64,
118 pub success_rate: f64,
120 pub active_services: usize,
122}
123
124pub struct ResourceManager {
126 config: ProductionConfig,
127 metrics: Arc<RwLock<ResourceMetrics>>,
128 connection_semaphore: Arc<Semaphore>,
129 bandwidth_tracker: Arc<BandwidthTracker>,
130 rate_limiters: Arc<RwLock<std::collections::HashMap<String, RateLimiter>>>,
131 shutdown_signal: Arc<tokio::sync::Notify>,
132 is_shutting_down: Arc<std::sync::atomic::AtomicBool>,
133}
134
135struct BandwidthTracker {
137 bytes_sent: AtomicU64,
138 bytes_received: AtomicU64,
139 last_reset: Arc<RwLock<Instant>>,
140 window_duration: Duration,
141}
142
143struct RateLimiter {
145 tokens: Arc<std::sync::Mutex<f64>>,
146 last_refill: Arc<std::sync::Mutex<Instant>>,
147 max_tokens: f64,
148 refill_rate: f64, }
150
151impl Default for ProductionConfig {
152 fn default() -> Self {
153 Self {
154 max_connections: 1000,
155 max_memory_bytes: 1024 * 1024 * 1024, max_bandwidth_bps: 100 * 1024 * 1024, connection_timeout: Duration::from_secs(30),
158 keep_alive_interval: Duration::from_secs(30),
159 health_check_interval: Duration::from_secs(60),
160 metrics_interval: Duration::from_secs(10),
161 enable_performance_tracking: true,
162 enable_auto_cleanup: true,
163 shutdown_timeout: Duration::from_secs(30),
164 rate_limits: RateLimitConfig::default(),
165 }
166 }
167}
168
169impl Default for RateLimitConfig {
170 fn default() -> Self {
171 Self {
172 dht_ops_per_sec: 100,
173 mcp_calls_per_sec: 50,
174 messages_per_sec: 200,
175 burst_capacity: 10,
176 window_duration: Duration::from_secs(1),
177 }
178 }
179}
180
181impl ResourceManager {
182 pub fn new(config: ProductionConfig) -> Self {
184 let connection_semaphore = Arc::new(Semaphore::new(config.max_connections));
185 let bandwidth_tracker = Arc::new(BandwidthTracker::new(Duration::from_secs(1)));
186
187 let initial_metrics = ResourceMetrics {
188 memory_used: 0,
189 active_connections: 0,
190 bandwidth_usage: 0,
191 cpu_usage: 0.0,
192 network_latency: LatencyStats::default(),
193 dht_metrics: DHTMetrics::default(),
194 mcp_metrics: MCPMetrics::default(),
195 timestamp: SystemTime::now(),
196 };
197
198 Self {
199 config,
200 metrics: Arc::new(RwLock::new(initial_metrics)),
201 connection_semaphore,
202 bandwidth_tracker,
203 rate_limiters: Arc::new(RwLock::new(std::collections::HashMap::new())),
204 shutdown_signal: Arc::new(tokio::sync::Notify::new()),
205 is_shutting_down: Arc::new(std::sync::atomic::AtomicBool::new(false)),
206 }
207 }
208
209 pub async fn start(&self) -> Result<()> {
211 info!("Starting production resource manager");
212
213 if self.config.enable_performance_tracking {
215 self.spawn_metrics_collector().await;
216 }
217
218 self.spawn_health_checker().await;
220
221 if self.config.enable_auto_cleanup {
223 self.spawn_cleanup_task().await;
224 }
225
226 info!("Production resource manager started successfully");
227 Ok(())
228 }
229
230 pub async fn shutdown(&self) -> Result<()> {
232 info!("Initiating graceful shutdown of resource manager");
233
234 self.is_shutting_down.store(true, Ordering::SeqCst);
235 self.shutdown_signal.notify_waiters();
236
237 tokio::time::timeout(self.config.shutdown_timeout, async {
239 while self.connection_semaphore.available_permits() < self.config.max_connections {
241 tokio::time::sleep(Duration::from_millis(100)).await;
242 }
243 }).await.map_err(|_| P2PError::Network("Shutdown timeout exceeded".to_string()))?;
244
245 info!("Resource manager shutdown completed");
246 Ok(())
247 }
248
249 pub async fn acquire_connection(&self) -> Result<ConnectionGuard<'_>> {
251 if self.is_shutting_down.load(Ordering::SeqCst) {
252 return Err(P2PError::Network("System is shutting down".to_string()));
253 }
254
255 let permit = self.connection_semaphore.clone()
256 .acquire_owned()
257 .await
258 .map_err(|_| P2PError::Network("Connection semaphore closed".to_string()))?;
259
260 debug!("Connection acquired, {} remaining", self.connection_semaphore.available_permits());
261 Ok(ConnectionGuard { permit, _manager: self })
262 }
263
264 pub async fn check_rate_limit(&self, peer_id: &str, operation: &str) -> Result<bool> {
266 let limit = match operation {
267 "dht" => self.config.rate_limits.dht_ops_per_sec,
268 "mcp" => self.config.rate_limits.mcp_calls_per_sec,
269 "message" => self.config.rate_limits.messages_per_sec,
270 _ => return Ok(true), };
272
273 let mut limiters = self.rate_limiters.write().await;
274 let limiter = limiters.entry(peer_id.to_string())
275 .or_insert_with(|| RateLimiter::new(limit as f64, self.config.rate_limits.burst_capacity as f64));
276
277 Ok(limiter.try_acquire())
278 }
279
280 pub fn record_bandwidth(&self, bytes_sent: u64, bytes_received: u64) {
282 self.bandwidth_tracker.record(bytes_sent, bytes_received);
283 }
284
285 pub async fn get_metrics(&self) -> ResourceMetrics {
287 self.metrics.read().await.clone()
288 }
289
290 pub async fn health_check(&self) -> Result<()> {
292 let metrics = self.get_metrics().await;
293
294 if self.config.max_memory_bytes > 0 && metrics.memory_used > self.config.max_memory_bytes {
296 warn!("Memory usage ({} bytes) exceeds limit ({} bytes)",
297 metrics.memory_used, self.config.max_memory_bytes);
298 return Err(P2PError::Network("Memory limit exceeded".to_string()));
299 }
300
301 if metrics.bandwidth_usage > self.config.max_bandwidth_bps {
303 warn!("Bandwidth usage ({} bps) exceeds limit ({} bps)",
304 metrics.bandwidth_usage, self.config.max_bandwidth_bps);
305 }
306
307 if metrics.active_connections >= self.config.max_connections {
309 warn!("Connection count ({}) at maximum ({})",
310 metrics.active_connections, self.config.max_connections);
311 }
312
313 debug!("Health check passed: {} connections, {} bytes memory, {} bps bandwidth",
314 metrics.active_connections, metrics.memory_used, metrics.bandwidth_usage);
315
316 Ok(())
317 }
318
319 async fn spawn_metrics_collector(&self) {
321 let manager = self.clone();
322 tokio::spawn(async move {
323 let mut interval = interval(manager.config.metrics_interval);
324 loop {
325 tokio::select! {
326 _ = interval.tick() => {
327 if let Err(e) = manager.collect_metrics().await {
328 error!("Failed to collect metrics: {}", e);
329 }
330 }
331 _ = manager.shutdown_signal.notified() => {
332 debug!("Metrics collector shutting down");
333 break;
334 }
335 }
336 }
337 });
338 }
339
340 async fn spawn_health_checker(&self) {
342 let manager = self.clone();
343 tokio::spawn(async move {
344 let mut interval = interval(manager.config.health_check_interval);
345 loop {
346 tokio::select! {
347 _ = interval.tick() => {
348 if let Err(e) = manager.health_check().await {
349 error!("Health check failed: {}", e);
350 }
351 }
352 _ = manager.shutdown_signal.notified() => {
353 debug!("Health checker shutting down");
354 break;
355 }
356 }
357 }
358 });
359 }
360
361 async fn spawn_cleanup_task(&self) {
363 let manager = self.clone();
364 tokio::spawn(async move {
365 let mut interval = interval(Duration::from_secs(300)); loop {
367 tokio::select! {
368 _ = interval.tick() => {
369 manager.cleanup_resources().await;
370 }
371 _ = manager.shutdown_signal.notified() => {
372 debug!("Cleanup task shutting down");
373 break;
374 }
375 }
376 }
377 });
378 }
379
380 async fn collect_metrics(&self) -> Result<()> {
382 let mut metrics = self.metrics.write().await;
383
384 metrics.bandwidth_usage = self.bandwidth_tracker.current_usage();
386
387 metrics.active_connections = self.config.max_connections - self.connection_semaphore.available_permits();
389
390 metrics.timestamp = SystemTime::now();
392
393 debug!("Metrics updated: {} connections, {} bps bandwidth",
394 metrics.active_connections, metrics.bandwidth_usage);
395
396 Ok(())
397 }
398
399 async fn cleanup_resources(&self) {
401 debug!("Starting resource cleanup");
402
403 let mut limiters = self.rate_limiters.write().await;
405 let now = Instant::now();
406 limiters.retain(|_, limiter| !limiter.is_expired(now));
407
408 debug!("Cleanup completed, {} rate limiters remaining", limiters.len());
409 }
410}
411
412impl Clone for ResourceManager {
414 fn clone(&self) -> Self {
415 Self {
416 config: self.config.clone(),
417 metrics: self.metrics.clone(),
418 connection_semaphore: self.connection_semaphore.clone(),
419 bandwidth_tracker: self.bandwidth_tracker.clone(),
420 rate_limiters: self.rate_limiters.clone(),
421 shutdown_signal: self.shutdown_signal.clone(),
422 is_shutting_down: self.is_shutting_down.clone(),
423 }
424 }
425}
426
427pub struct ConnectionGuard<'a> {
429 #[allow(dead_code)]
430 permit: tokio::sync::OwnedSemaphorePermit,
431 _manager: &'a ResourceManager,
432}
433
434impl<'a> Drop for ConnectionGuard<'a> {
435 fn drop(&mut self) {
436 debug!("Connection released");
437 }
438}
439
440impl BandwidthTracker {
441 fn new(window_duration: Duration) -> Self {
442 Self {
443 bytes_sent: AtomicU64::new(0),
444 bytes_received: AtomicU64::new(0),
445 last_reset: Arc::new(RwLock::new(Instant::now())),
446 window_duration,
447 }
448 }
449
450 fn record(&self, bytes_sent: u64, bytes_received: u64) {
451 self.bytes_sent.fetch_add(bytes_sent, Ordering::Relaxed);
452 self.bytes_received.fetch_add(bytes_received, Ordering::Relaxed);
453 }
454
455 fn current_usage(&self) -> u64 {
456 let now = Instant::now();
457
458 let last_reset = {
460 if let Ok(guard) = self.last_reset.try_read() {
461 *guard
462 } else {
463 let sent = self.bytes_sent.load(Ordering::Relaxed);
465 let received = self.bytes_received.load(Ordering::Relaxed);
466 return sent + received; }
468 };
469
470 if now.duration_since(last_reset) >= self.window_duration {
471 if let Ok(mut guard) = self.last_reset.try_write() {
473 self.bytes_sent.store(0, Ordering::Relaxed);
474 self.bytes_received.store(0, Ordering::Relaxed);
475 *guard = now;
476 return 0;
477 }
478 }
479
480 let sent = self.bytes_sent.load(Ordering::Relaxed);
481 let received = self.bytes_received.load(Ordering::Relaxed);
482
483 let elapsed_secs = now.duration_since(last_reset).as_secs_f64();
485 if elapsed_secs > 0.0 {
486 ((sent + received) as f64 / elapsed_secs) as u64
487 } else {
488 0
489 }
490 }
491}
492
493impl RateLimiter {
494 fn new(max_tokens: f64, refill_rate: f64) -> Self {
495 Self {
496 tokens: Arc::new(std::sync::Mutex::new(max_tokens)),
497 last_refill: Arc::new(std::sync::Mutex::new(Instant::now())),
498 max_tokens,
499 refill_rate,
500 }
501 }
502
503 fn try_acquire(&self) -> bool {
504 let now = Instant::now();
505
506 {
508 let mut last_refill = self.last_refill.lock().unwrap();
509 let elapsed = now.duration_since(*last_refill).as_secs_f64();
510
511 if elapsed > 0.0 {
512 let mut tokens = self.tokens.lock().unwrap();
513 *tokens = (*tokens + elapsed * self.refill_rate).min(self.max_tokens);
514 *last_refill = now;
515 }
516 }
517
518 let mut tokens = self.tokens.lock().unwrap();
520 if *tokens >= 1.0 {
521 *tokens -= 1.0;
522 true
523 } else {
524 false
525 }
526 }
527
528 fn is_expired(&self, now: Instant) -> bool {
529 let last_refill = *self.last_refill.lock().unwrap();
530 now.duration_since(last_refill) > Duration::from_secs(300) }
532}
533
534impl Default for LatencyStats {
535 fn default() -> Self {
536 Self {
537 avg_ms: 0.0,
538 min_ms: 0.0,
539 max_ms: 0.0,
540 p95_ms: 0.0,
541 sample_count: 0,
542 }
543 }
544}
545
546impl Default for DHTMetrics {
547 fn default() -> Self {
548 Self {
549 ops_per_sec: 0.0,
550 avg_latency_ms: 0.0,
551 success_rate: 1.0,
552 cache_hit_rate: 0.0,
553 }
554 }
555}
556
557impl Default for MCPMetrics {
558 fn default() -> Self {
559 Self {
560 calls_per_sec: 0.0,
561 avg_latency_ms: 0.0,
562 success_rate: 1.0,
563 active_services: 0,
564 }
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571 use tokio::time::sleep;
572
573 fn create_test_config() -> ProductionConfig {
574 ProductionConfig {
575 max_connections: 10,
576 max_memory_bytes: 1024 * 1024, max_bandwidth_bps: 1024 * 1024, connection_timeout: Duration::from_millis(100),
579 keep_alive_interval: Duration::from_millis(50),
580 health_check_interval: Duration::from_millis(50),
581 metrics_interval: Duration::from_millis(50),
582 enable_performance_tracking: true,
583 enable_auto_cleanup: true,
584 shutdown_timeout: Duration::from_millis(200),
585 rate_limits: RateLimitConfig {
586 dht_ops_per_sec: 5,
587 mcp_calls_per_sec: 3,
588 messages_per_sec: 10,
589 burst_capacity: 5,
590 window_duration: Duration::from_millis(100),
591 },
592 }
593 }
594
595 #[test]
596 fn test_production_config_default() {
597 let config = ProductionConfig::default();
598 assert_eq!(config.max_connections, 1000);
599 assert_eq!(config.max_memory_bytes, 1024 * 1024 * 1024); assert_eq!(config.max_bandwidth_bps, 100 * 1024 * 1024); assert_eq!(config.connection_timeout, Duration::from_secs(30));
602 assert_eq!(config.keep_alive_interval, Duration::from_secs(30));
603 assert_eq!(config.health_check_interval, Duration::from_secs(60));
604 assert_eq!(config.metrics_interval, Duration::from_secs(10));
605 assert!(config.enable_performance_tracking);
606 assert!(config.enable_auto_cleanup);
607 assert_eq!(config.shutdown_timeout, Duration::from_secs(30));
608 }
609
610 #[test]
611 fn test_rate_limit_config_default() {
612 let config = RateLimitConfig::default();
613 assert_eq!(config.dht_ops_per_sec, 100);
614 assert_eq!(config.mcp_calls_per_sec, 50);
615 assert_eq!(config.messages_per_sec, 200);
616 assert_eq!(config.burst_capacity, 10);
617 assert_eq!(config.window_duration, Duration::from_secs(1));
618 }
619
620 #[test]
621 fn test_latency_stats_default() {
622 let stats = LatencyStats::default();
623 assert_eq!(stats.avg_ms, 0.0);
624 assert_eq!(stats.min_ms, 0.0);
625 assert_eq!(stats.max_ms, 0.0);
626 assert_eq!(stats.p95_ms, 0.0);
627 assert_eq!(stats.sample_count, 0);
628 }
629
630 #[test]
631 fn test_dht_metrics_default() {
632 let metrics = DHTMetrics::default();
633 assert_eq!(metrics.ops_per_sec, 0.0);
634 assert_eq!(metrics.avg_latency_ms, 0.0);
635 assert_eq!(metrics.success_rate, 1.0);
636 assert_eq!(metrics.cache_hit_rate, 0.0);
637 }
638
639 #[test]
640 fn test_mcp_metrics_default() {
641 let metrics = MCPMetrics::default();
642 assert_eq!(metrics.calls_per_sec, 0.0);
643 assert_eq!(metrics.avg_latency_ms, 0.0);
644 assert_eq!(metrics.success_rate, 1.0);
645 assert_eq!(metrics.active_services, 0);
646 }
647
648 #[tokio::test]
649 async fn test_resource_manager_creation() {
650 let config = create_test_config();
651 let manager = ResourceManager::new(config.clone());
652
653 let metrics = manager.get_metrics().await;
654 assert_eq!(metrics.active_connections, 0);
655 assert_eq!(metrics.bandwidth_usage, 0);
656 assert_eq!(metrics.memory_used, 0);
657 assert_eq!(metrics.cpu_usage, 0.0);
658 assert_eq!(metrics.network_latency.sample_count, 0);
659 assert_eq!(metrics.dht_metrics.success_rate, 1.0);
660 assert_eq!(metrics.mcp_metrics.success_rate, 1.0);
661 }
662
663 #[tokio::test]
664 async fn test_resource_manager_cloning() {
665 let config = create_test_config();
666 let manager = ResourceManager::new(config);
667 let cloned = manager.clone();
668
669 let _guard1 = manager.acquire_connection().await.unwrap();
671 let _guard2 = cloned.acquire_connection().await.unwrap();
672
673 assert_eq!(manager.connection_semaphore.available_permits(), 8);
675 assert_eq!(cloned.connection_semaphore.available_permits(), 8);
676 }
677
678 #[tokio::test]
679 async fn test_connection_acquisition() {
680 let config = ProductionConfig {
681 max_connections: 2,
682 ..create_test_config()
683 };
684 let manager = ResourceManager::new(config);
685
686 let _guard1 = manager.acquire_connection().await.unwrap();
688 assert_eq!(manager.connection_semaphore.available_permits(), 1);
689
690 let _guard2 = manager.acquire_connection().await.unwrap();
692 assert_eq!(manager.connection_semaphore.available_permits(), 0);
693
694 drop(_guard1);
696 sleep(Duration::from_millis(1)).await; assert_eq!(manager.connection_semaphore.available_permits(), 1);
698 }
699
700 #[tokio::test]
701 async fn test_connection_acquisition_during_shutdown() {
702 let config = create_test_config();
703 let manager = ResourceManager::new(config);
704
705 manager.is_shutting_down.store(true, Ordering::SeqCst);
707
708 let result = manager.acquire_connection().await;
710 assert!(result.is_err());
711 match result {
712 Err(e) => assert!(e.to_string().contains("shutting down")),
713 Ok(_) => panic!("Expected error but got success"),
714 }
715 }
716
717 #[tokio::test]
718 async fn test_connection_guard_drop() {
719 let config = create_test_config();
720 let manager = ResourceManager::new(config);
721
722 let initial_permits = manager.connection_semaphore.available_permits();
723 {
724 let _guard = manager.acquire_connection().await.unwrap();
725 assert_eq!(manager.connection_semaphore.available_permits(), initial_permits - 1);
726 }
727 assert_eq!(manager.connection_semaphore.available_permits(), initial_permits);
729 }
730
731 #[tokio::test]
732 async fn test_rate_limiting_dht_operations() {
733 let config = ProductionConfig {
734 rate_limits: RateLimitConfig {
735 dht_ops_per_sec: 2,
736 burst_capacity: 2,
737 ..Default::default()
738 },
739 ..create_test_config()
740 };
741 let manager = ResourceManager::new(config);
742
743 assert!(manager.check_rate_limit("peer1", "dht").await.unwrap());
745 assert!(manager.check_rate_limit("peer1", "dht").await.unwrap());
746
747 assert!(!manager.check_rate_limit("peer1", "dht").await.unwrap());
749 }
750
751 #[tokio::test]
752 async fn test_rate_limiting_mcp_operations() {
753 let config = ProductionConfig {
754 rate_limits: RateLimitConfig {
755 mcp_calls_per_sec: 1,
756 burst_capacity: 1,
757 ..Default::default()
758 },
759 ..create_test_config()
760 };
761 let manager = ResourceManager::new(config);
762
763 assert!(manager.check_rate_limit("peer2", "mcp").await.unwrap());
765
766 assert!(!manager.check_rate_limit("peer2", "mcp").await.unwrap());
768 }
769
770 #[tokio::test]
771 async fn test_rate_limiting_message_operations() {
772 let config = ProductionConfig {
773 rate_limits: RateLimitConfig {
774 messages_per_sec: 3,
775 burst_capacity: 3,
776 ..Default::default()
777 },
778 ..create_test_config()
779 };
780 let manager = ResourceManager::new(config);
781
782 for _ in 0..3 {
784 assert!(manager.check_rate_limit("peer3", "message").await.unwrap());
785 }
786
787 assert!(!manager.check_rate_limit("peer3", "message").await.unwrap());
789 }
790
791 #[tokio::test]
792 async fn test_rate_limiting_unknown_operation() {
793 let config = create_test_config();
794 let manager = ResourceManager::new(config);
795
796 assert!(manager.check_rate_limit("peer4", "unknown").await.unwrap());
798 assert!(manager.check_rate_limit("peer4", "unknown").await.unwrap());
799 }
800
801 #[tokio::test]
802 async fn test_rate_limiting_different_peers() {
803 let config = ProductionConfig {
804 rate_limits: RateLimitConfig {
805 dht_ops_per_sec: 1,
806 burst_capacity: 1,
807 ..Default::default()
808 },
809 ..create_test_config()
810 };
811 let manager = ResourceManager::new(config);
812
813 assert!(manager.check_rate_limit("peer1", "dht").await.unwrap());
815 assert!(manager.check_rate_limit("peer2", "dht").await.unwrap());
816
817 assert!(!manager.check_rate_limit("peer1", "dht").await.unwrap());
819 assert!(!manager.check_rate_limit("peer2", "dht").await.unwrap());
820 }
821
822 #[tokio::test]
823 async fn test_bandwidth_tracking() {
824 let tracker = BandwidthTracker::new(Duration::from_millis(100)); tracker.record(1000, 2000);
827 let usage = tracker.current_usage();
828 assert!(usage > 0);
829
830 sleep(Duration::from_millis(150)).await; let usage_after_reset = tracker.current_usage();
833 assert_eq!(usage_after_reset, 0);
834 }
835
836 #[tokio::test]
837 async fn test_bandwidth_tracking_rate_calculation() {
838 let tracker = BandwidthTracker::new(Duration::from_secs(1));
839
840 tracker.record(500, 500); sleep(Duration::from_millis(50)).await;
845
846 let usage = tracker.current_usage();
847 assert!(usage > 10000); }
850
851 #[tokio::test]
852 async fn test_bandwidth_tracking_multiple_records() {
853 let tracker = BandwidthTracker::new(Duration::from_millis(200));
854
855 tracker.record(100, 200);
856 tracker.record(300, 400);
857 tracker.record(500, 600);
858
859 let usage = tracker.current_usage();
860 assert!(usage > 0);
861
862 let sent = tracker.bytes_sent.load(Ordering::Relaxed);
864 let received = tracker.bytes_received.load(Ordering::Relaxed);
865 assert_eq!(sent, 900); assert_eq!(received, 1200); }
868
869 #[tokio::test]
870 async fn test_manager_bandwidth_recording() {
871 let config = create_test_config();
872 let manager = ResourceManager::new(config);
873
874 manager.record_bandwidth(1000, 2000);
876
877 let usage = manager.bandwidth_tracker.current_usage();
879 assert!(usage > 0);
880 }
881
882 #[tokio::test]
883 async fn test_health_check_success() {
884 let config = ProductionConfig {
885 max_memory_bytes: 2048, max_bandwidth_bps: 10000, max_connections: 5,
888 ..create_test_config()
889 };
890 let manager = ResourceManager::new(config);
891
892 let result = manager.health_check().await;
894 assert!(result.is_ok());
895 }
896
897 #[tokio::test]
898 async fn test_health_check_memory_limit_exceeded() {
899 let config = ProductionConfig {
900 max_memory_bytes: 100, ..create_test_config()
902 };
903 let manager = ResourceManager::new(config);
904
905 {
907 let mut metrics = manager.metrics.write().await;
908 metrics.memory_used = 200; }
910
911 let result = manager.health_check().await;
913 assert!(result.is_err());
914 assert!(result.unwrap_err().to_string().contains("Memory limit exceeded"));
915 }
916
917 #[tokio::test]
918 async fn test_health_check_bandwidth_warning() {
919 let config = ProductionConfig {
920 max_bandwidth_bps: 1000,
921 ..create_test_config()
922 };
923 let manager = ResourceManager::new(config);
924
925 {
927 let mut metrics = manager.metrics.write().await;
928 metrics.bandwidth_usage = 2000; }
930
931 let result = manager.health_check().await;
933 assert!(result.is_ok());
934 }
935
936 #[tokio::test]
937 async fn test_health_check_connection_warning() {
938 let config = ProductionConfig {
939 max_connections: 2,
940 ..create_test_config()
941 };
942 let manager = ResourceManager::new(config);
943
944 let _guard1 = manager.acquire_connection().await.unwrap();
946 let _guard2 = manager.acquire_connection().await.unwrap();
947
948 {
950 let mut metrics = manager.metrics.write().await;
951 metrics.active_connections = 2;
952 }
953
954 let result = manager.health_check().await;
956 assert!(result.is_ok());
957 }
958
959 #[tokio::test]
960 async fn test_metrics_collection() {
961 let config = create_test_config();
962 let manager = ResourceManager::new(config);
963
964 manager.record_bandwidth(500, 1000);
966 let _guard = manager.acquire_connection().await.unwrap();
967
968 manager.collect_metrics().await.unwrap();
970
971 let metrics = manager.get_metrics().await;
972 assert_eq!(metrics.active_connections, 1);
973 assert!(metrics.bandwidth_usage > 0);
974 assert!(metrics.timestamp.elapsed().unwrap().as_millis() < 100); }
976
977 #[tokio::test]
978 async fn test_graceful_shutdown() {
979 let config = ProductionConfig {
980 shutdown_timeout: Duration::from_millis(100),
981 ..create_test_config()
982 };
983 let manager = ResourceManager::new(config);
984
985 manager.start().await.unwrap();
987
988 let result = manager.shutdown().await;
990 assert!(result.is_ok());
991
992 assert!(manager.is_shutting_down.load(Ordering::SeqCst));
994 }
995
996 #[tokio::test]
997 async fn test_graceful_shutdown_with_connections() {
998 let config = ProductionConfig {
999 shutdown_timeout: Duration::from_millis(200),
1000 max_connections: 2,
1001 ..create_test_config()
1002 };
1003 let manager = ResourceManager::new(config);
1004
1005 let guard = manager.acquire_connection().await.unwrap();
1007
1008 let manager_clone = manager.clone();
1010 let shutdown_task = tokio::spawn(async move {
1011 manager_clone.shutdown().await
1012 });
1013
1014 sleep(Duration::from_millis(50)).await;
1016 drop(guard);
1017
1018 let result = shutdown_task.await.unwrap();
1020 assert!(result.is_ok());
1021 }
1022
1023 #[tokio::test]
1024 async fn test_shutdown_timeout() {
1025 let config = ProductionConfig {
1026 shutdown_timeout: Duration::from_millis(50), max_connections: 1,
1028 ..create_test_config()
1029 };
1030 let manager = ResourceManager::new(config);
1031
1032 let _guard = manager.acquire_connection().await.unwrap();
1034
1035 let result = manager.shutdown().await;
1037 assert!(result.is_err());
1038 assert!(result.unwrap_err().to_string().contains("Shutdown timeout"));
1039 }
1040
1041 #[tokio::test]
1042 async fn test_start_with_disabled_features() {
1043 let config = ProductionConfig {
1044 enable_performance_tracking: false,
1045 enable_auto_cleanup: false,
1046 ..create_test_config()
1047 };
1048 let manager = ResourceManager::new(config);
1049
1050 let result = manager.start().await;
1052 assert!(result.is_ok());
1053 }
1054
1055 #[test]
1056 fn test_rate_limiter_creation() {
1057 let limiter = RateLimiter::new(10.0, 5.0); assert!(limiter.try_acquire());
1061 }
1062
1063 #[test]
1064 fn test_rate_limiter_token_exhaustion() {
1065 let limiter = RateLimiter::new(2.0, 1.0); assert!(limiter.try_acquire());
1069 assert!(limiter.try_acquire());
1070
1071 assert!(!limiter.try_acquire());
1073 }
1074
1075 #[tokio::test]
1076 async fn test_rate_limiter_refill() {
1077 let limiter = RateLimiter::new(1.0, 10.0); assert!(limiter.try_acquire());
1081 assert!(!limiter.try_acquire());
1082
1083 sleep(Duration::from_millis(200)).await; assert!(limiter.try_acquire());
1088 }
1089
1090 #[test]
1091 fn test_rate_limiter_expiration() {
1092 let limiter = RateLimiter::new(10.0, 5.0);
1093
1094 assert!(!limiter.is_expired(Instant::now()));
1096
1097 let future_time = Instant::now() + Duration::from_secs(400);
1099 assert!(limiter.is_expired(future_time));
1100 }
1101
1102 #[tokio::test]
1103 async fn test_cleanup_resources() {
1104 let config = create_test_config();
1105 let manager = ResourceManager::new(config);
1106
1107 manager.check_rate_limit("peer1", "dht").await.unwrap();
1109 manager.check_rate_limit("peer2", "mcp").await.unwrap();
1110
1111 {
1113 let limiters = manager.rate_limiters.read().await;
1114 assert_eq!(limiters.len(), 2);
1115 }
1116
1117 manager.cleanup_resources().await;
1119
1120 {
1121 let limiters = manager.rate_limiters.read().await;
1122 assert_eq!(limiters.len(), 2); }
1124 }
1125
1126 #[test]
1127 fn test_bandwidth_tracker_creation() {
1128 let tracker = BandwidthTracker::new(Duration::from_secs(1));
1129
1130 assert_eq!(tracker.current_usage(), 0);
1132 }
1133
1134 #[test]
1135 fn test_bandwidth_tracker_window_reset() {
1136 let tracker = BandwidthTracker::new(Duration::from_millis(1)); tracker.record(1000, 2000);
1139
1140 let initial_usage = tracker.current_usage();
1142 assert!(initial_usage > 0);
1143
1144 std::thread::sleep(Duration::from_millis(10));
1146 let usage_after_window = tracker.current_usage();
1147 assert_eq!(usage_after_window, 0);
1148 }
1149
1150 #[tokio::test]
1151 async fn test_resource_metrics_structure() {
1152 let config = create_test_config();
1153 let manager = ResourceManager::new(config);
1154
1155 let metrics = manager.get_metrics().await;
1156
1157 assert_eq!(metrics.memory_used, 0);
1159 assert_eq!(metrics.active_connections, 0);
1160 assert_eq!(metrics.bandwidth_usage, 0);
1161 assert_eq!(metrics.cpu_usage, 0.0);
1162
1163 assert_eq!(metrics.network_latency.sample_count, 0);
1165 assert_eq!(metrics.dht_metrics.ops_per_sec, 0.0);
1166 assert_eq!(metrics.mcp_metrics.calls_per_sec, 0.0);
1167
1168 assert!(metrics.timestamp.elapsed().unwrap().as_secs() < 1);
1170 }
1171}