1use crate::error::NetworkError;
25use crate::{P2PError, Result};
26use serde::{Deserialize, Serialize};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::{Duration, Instant, SystemTime};
30use tokio::sync::{RwLock, Semaphore};
31use tokio::time::interval;
32use tracing::{debug, error, info, warn};
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct ProductionConfig {
37 pub max_connections: usize,
39 pub max_memory_bytes: u64,
41 pub max_bandwidth_bps: u64,
43 pub connection_timeout: Duration,
45 pub keep_alive_interval: Duration,
47 pub health_check_interval: Duration,
49 pub metrics_interval: Duration,
51 pub enable_performance_tracking: bool,
53 pub enable_auto_cleanup: bool,
55 pub shutdown_timeout: Duration,
57 pub rate_limits: RateLimitConfig,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct RateLimitConfig {
64 pub dht_ops_per_sec: u32,
66 pub mcp_calls_per_sec: u32,
68 pub messages_per_sec: u32,
70 pub burst_capacity: u32,
72 pub window_duration: Duration,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ResourceMetrics {
79 pub memory_used: u64,
81 pub active_connections: usize,
83 pub bandwidth_usage: u64,
85 pub cpu_usage: f64,
87 pub network_latency: LatencyStats,
89 pub dht_metrics: DHTMetrics,
91 pub mcp_metrics: MCPMetrics,
93 pub timestamp: SystemTime,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct LatencyStats {
100 pub avg_ms: f64,
102 pub min_ms: f64,
104 pub max_ms: f64,
106 pub p95_ms: f64,
108 pub sample_count: u64,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct DHTMetrics {
115 pub ops_per_sec: f64,
117 pub avg_latency_ms: f64,
119 pub success_rate: f64,
121 pub cache_hit_rate: f64,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct MCPMetrics {
128 pub calls_per_sec: f64,
130 pub avg_latency_ms: f64,
132 pub success_rate: f64,
134 pub active_services: usize,
136}
137
138pub struct ResourceManager {
140 pub config: ProductionConfig,
141 metrics: Arc<RwLock<ResourceMetrics>>,
142 connection_semaphore: Arc<Semaphore>,
143 bandwidth_tracker: Arc<BandwidthTracker>,
144 rate_limiters: Arc<RwLock<std::collections::HashMap<String, RateLimiter>>>,
145 shutdown_signal: Arc<tokio::sync::Notify>,
146 is_shutting_down: Arc<std::sync::atomic::AtomicBool>,
147}
148
149struct BandwidthTracker {
151 bytes_sent: AtomicU64,
152 bytes_received: AtomicU64,
153 last_reset: Arc<RwLock<Instant>>,
154 window_duration: Duration,
155}
156
157struct RateLimiter {
159 tokens: Arc<std::sync::Mutex<f64>>,
160 last_refill: Arc<std::sync::Mutex<Instant>>,
161 max_tokens: f64,
162 refill_rate: f64, }
164
165impl Default for ProductionConfig {
166 fn default() -> Self {
167 Self {
168 max_connections: 1000,
169 max_memory_bytes: 1024 * 1024 * 1024, max_bandwidth_bps: 100 * 1024 * 1024, connection_timeout: Duration::from_secs(30),
172 keep_alive_interval: Duration::from_secs(30),
173 health_check_interval: Duration::from_secs(60),
174 metrics_interval: Duration::from_secs(10),
175 enable_performance_tracking: true,
176 enable_auto_cleanup: true,
177 shutdown_timeout: Duration::from_secs(30),
178 rate_limits: RateLimitConfig::default(),
179 }
180 }
181}
182
183impl Default for RateLimitConfig {
184 fn default() -> Self {
185 Self {
186 dht_ops_per_sec: 100,
187 mcp_calls_per_sec: 50,
188 messages_per_sec: 200,
189 burst_capacity: 10,
190 window_duration: Duration::from_secs(1),
191 }
192 }
193}
194
195impl ResourceManager {
196 pub fn new(config: ProductionConfig) -> Self {
198 let connection_semaphore = Arc::new(Semaphore::new(config.max_connections));
199 let bandwidth_tracker = Arc::new(BandwidthTracker::new(Duration::from_secs(1)));
200
201 let initial_metrics = ResourceMetrics {
202 memory_used: 0,
203 active_connections: 0,
204 bandwidth_usage: 0,
205 cpu_usage: 0.0,
206 network_latency: LatencyStats::default(),
207 dht_metrics: DHTMetrics::default(),
208 mcp_metrics: MCPMetrics::default(),
209 timestamp: SystemTime::now(),
210 };
211
212 Self {
213 config,
214 metrics: Arc::new(RwLock::new(initial_metrics)),
215 connection_semaphore,
216 bandwidth_tracker,
217 rate_limiters: Arc::new(RwLock::new(std::collections::HashMap::new())),
218 shutdown_signal: Arc::new(tokio::sync::Notify::new()),
219 is_shutting_down: Arc::new(std::sync::atomic::AtomicBool::new(false)),
220 }
221 }
222
223 pub async fn start(&self) -> Result<()> {
225 info!("Starting production resource manager");
226
227 if self.config.enable_performance_tracking {
229 self.spawn_metrics_collector().await;
230 }
231
232 self.spawn_health_checker().await;
234
235 if self.config.enable_auto_cleanup {
237 self.spawn_cleanup_task().await;
238 }
239
240 info!("Production resource manager started successfully");
241 Ok(())
242 }
243
244 pub async fn shutdown(&self) -> Result<()> {
246 info!("Initiating graceful shutdown of resource manager");
247
248 self.is_shutting_down.store(true, Ordering::SeqCst);
249 self.shutdown_signal.notify_waiters();
250
251 tokio::time::timeout(self.config.shutdown_timeout, async {
253 while self.connection_semaphore.available_permits() < self.config.max_connections {
255 tokio::time::sleep(Duration::from_millis(100)).await;
256 }
257 })
258 .await
259 .map_err(|_| {
260 P2PError::Network(crate::error::NetworkError::ProtocolError(
261 "Shutdown timeout exceeded".to_string().into(),
262 ))
263 })?;
264
265 info!("Resource manager shutdown completed");
266 Ok(())
267 }
268
269 pub async fn acquire_connection(&self) -> Result<ConnectionGuard<'_>> {
271 if self.is_shutting_down.load(Ordering::SeqCst) {
272 return Err(P2PError::Network(
273 crate::error::NetworkError::ProtocolError(
274 "System is shutting down".to_string().into(),
275 ),
276 ));
277 }
278
279 let permit = self
280 .connection_semaphore
281 .clone()
282 .acquire_owned()
283 .await
284 .map_err(|_| {
285 P2PError::Network(crate::error::NetworkError::ProtocolError(
286 "Connection semaphore closed".to_string().into(),
287 ))
288 })?;
289
290 debug!(
291 "Connection acquired, {} remaining",
292 self.connection_semaphore.available_permits()
293 );
294 Ok(ConnectionGuard {
295 permit,
296 _manager: self,
297 })
298 }
299
300 pub async fn check_rate_limit(&self, peer_id: &str, operation: &str) -> Result<bool> {
302 let limit = match operation {
303 "dht" => self.config.rate_limits.dht_ops_per_sec,
304 "mcp" => self.config.rate_limits.mcp_calls_per_sec,
305 "message" => self.config.rate_limits.messages_per_sec,
306 _ => return Ok(true), };
308
309 let mut limiters = self.rate_limiters.write().await;
310 let limiter = limiters.entry(peer_id.to_string()).or_insert_with(|| {
311 RateLimiter::new(limit as f64, self.config.rate_limits.burst_capacity as f64)
312 });
313
314 limiter.try_acquire()
315 }
316
317 pub fn record_bandwidth(&self, bytes_sent: u64, bytes_received: u64) {
319 self.bandwidth_tracker.record(bytes_sent, bytes_received);
320 }
321
322 pub async fn get_metrics(&self) -> ResourceMetrics {
324 self.metrics.read().await.clone()
325 }
326
327 pub async fn health_check(&self) -> Result<()> {
329 let metrics = self.get_metrics().await;
330
331 if self.config.max_memory_bytes > 0 && metrics.memory_used > self.config.max_memory_bytes {
333 warn!(
334 "Memory usage ({} bytes) exceeds limit ({} bytes)",
335 metrics.memory_used, self.config.max_memory_bytes
336 );
337 return Err(P2PError::Network(
338 crate::error::NetworkError::ProtocolError(
339 "Memory limit exceeded".to_string().into(),
340 ),
341 ));
342 }
343
344 if metrics.bandwidth_usage > self.config.max_bandwidth_bps {
346 warn!(
347 "Bandwidth usage ({} bps) exceeds limit ({} bps)",
348 metrics.bandwidth_usage, self.config.max_bandwidth_bps
349 );
350 }
351
352 if metrics.active_connections >= self.config.max_connections {
354 warn!(
355 "Connection count ({}) at maximum ({})",
356 metrics.active_connections, self.config.max_connections
357 );
358 }
359
360 debug!(
361 "Health check passed: {} connections, {} bytes memory, {} bps bandwidth",
362 metrics.active_connections, metrics.memory_used, metrics.bandwidth_usage
363 );
364
365 Ok(())
366 }
367
368 async fn spawn_metrics_collector(&self) {
370 let manager = self.clone();
371 tokio::spawn(async move {
372 let mut interval = interval(manager.config.metrics_interval);
373 loop {
374 tokio::select! {
375 _ = interval.tick() => {
376 if let Err(e) = manager.collect_metrics().await {
377 error!("Failed to collect metrics: {}", e);
378 }
379 }
380 _ = manager.shutdown_signal.notified() => {
381 debug!("Metrics collector shutting down");
382 break;
383 }
384 }
385 }
386 });
387 }
388
389 async fn spawn_health_checker(&self) {
391 let manager = self.clone();
392 tokio::spawn(async move {
393 let mut interval = interval(manager.config.health_check_interval);
394 loop {
395 tokio::select! {
396 _ = interval.tick() => {
397 if let Err(e) = manager.health_check().await {
398 error!("Health check failed: {}", e);
399 }
400 }
401 _ = manager.shutdown_signal.notified() => {
402 debug!("Health checker shutting down");
403 break;
404 }
405 }
406 }
407 });
408 }
409
410 async fn spawn_cleanup_task(&self) {
412 let manager = self.clone();
413 tokio::spawn(async move {
414 let mut interval = interval(Duration::from_secs(300)); loop {
416 tokio::select! {
417 _ = interval.tick() => {
418 manager.cleanup_resources().await;
419 }
420 _ = manager.shutdown_signal.notified() => {
421 debug!("Cleanup task shutting down");
422 break;
423 }
424 }
425 }
426 });
427 }
428
429 async fn collect_metrics(&self) -> Result<()> {
431 let mut metrics = self.metrics.write().await;
432
433 metrics.bandwidth_usage = self.bandwidth_tracker.current_usage();
435
436 metrics.active_connections =
438 self.config.max_connections - self.connection_semaphore.available_permits();
439
440 metrics.timestamp = SystemTime::now();
442
443 debug!(
444 "Metrics updated: {} connections, {} bps bandwidth",
445 metrics.active_connections, metrics.bandwidth_usage
446 );
447
448 Ok(())
449 }
450
451 async fn cleanup_resources(&self) {
453 debug!("Starting resource cleanup");
454
455 let mut limiters = self.rate_limiters.write().await;
457 let now = Instant::now();
458 limiters.retain(|_, limiter| {
459 match limiter.is_expired(now) {
461 Ok(expired) => !expired,
462 Err(_) => false,
463 }
464 });
465
466 debug!(
467 "Cleanup completed, {} rate limiters remaining",
468 limiters.len()
469 );
470 }
471}
472
473impl Clone for ResourceManager {
475 fn clone(&self) -> Self {
476 Self {
477 config: self.config.clone(),
478 metrics: self.metrics.clone(),
479 connection_semaphore: self.connection_semaphore.clone(),
480 bandwidth_tracker: self.bandwidth_tracker.clone(),
481 rate_limiters: self.rate_limiters.clone(),
482 shutdown_signal: self.shutdown_signal.clone(),
483 is_shutting_down: self.is_shutting_down.clone(),
484 }
485 }
486}
487
488pub struct ConnectionGuard<'a> {
490 #[allow(dead_code)]
491 permit: tokio::sync::OwnedSemaphorePermit,
492 _manager: &'a ResourceManager,
493}
494
495impl<'a> Drop for ConnectionGuard<'a> {
496 fn drop(&mut self) {
497 debug!("Connection released");
498 }
499}
500
501impl BandwidthTracker {
502 fn new(window_duration: Duration) -> Self {
503 Self {
504 bytes_sent: AtomicU64::new(0),
505 bytes_received: AtomicU64::new(0),
506 last_reset: Arc::new(RwLock::new(Instant::now())),
507 window_duration,
508 }
509 }
510
511 fn record(&self, bytes_sent: u64, bytes_received: u64) {
512 self.bytes_sent.fetch_add(bytes_sent, Ordering::Relaxed);
513 self.bytes_received
514 .fetch_add(bytes_received, Ordering::Relaxed);
515 }
516
517 fn current_usage(&self) -> u64 {
518 let now = Instant::now();
519
520 let last_reset = {
522 if let Ok(guard) = self.last_reset.try_read() {
523 *guard
524 } else {
525 let sent = self.bytes_sent.load(Ordering::Relaxed);
527 let received = self.bytes_received.load(Ordering::Relaxed);
528 return sent + received; }
530 };
531
532 if now.duration_since(last_reset) >= self.window_duration {
533 if let Ok(mut guard) = self.last_reset.try_write() {
535 self.bytes_sent.store(0, Ordering::Relaxed);
536 self.bytes_received.store(0, Ordering::Relaxed);
537 *guard = now;
538 return 0;
539 }
540 }
541
542 let sent = self.bytes_sent.load(Ordering::Relaxed);
543 let received = self.bytes_received.load(Ordering::Relaxed);
544
545 let elapsed_secs = now.duration_since(last_reset).as_secs_f64();
547 if elapsed_secs > 0.0 {
548 ((sent + received) as f64 / elapsed_secs) as u64
549 } else {
550 0
551 }
552 }
553}
554
555impl RateLimiter {
556 fn new(max_tokens: f64, refill_rate: f64) -> Self {
557 Self {
558 tokens: Arc::new(std::sync::Mutex::new(max_tokens)),
559 last_refill: Arc::new(std::sync::Mutex::new(Instant::now())),
560 max_tokens,
561 refill_rate,
562 }
563 }
564
565 fn try_acquire(&self) -> Result<bool> {
566 let now = Instant::now();
567
568 {
570 let mut last_refill = self.last_refill.lock().map_err(|_| {
571 P2PError::Network(NetworkError::ProtocolError(
572 "mutex lock failed".to_string().into(),
573 ))
574 })?;
575 let elapsed = now.duration_since(*last_refill).as_secs_f64();
576
577 if elapsed > 0.0 {
578 let mut tokens = self.tokens.lock().map_err(|_| {
579 P2PError::Network(NetworkError::ProtocolError(
580 "mutex lock failed".to_string().into(),
581 ))
582 })?;
583 *tokens = (*tokens + elapsed * self.refill_rate).min(self.max_tokens);
584 *last_refill = now;
585 }
586 }
587
588 let mut tokens = self.tokens.lock().map_err(|_| {
590 P2PError::Network(NetworkError::ProtocolError(
591 "mutex lock failed".to_string().into(),
592 ))
593 })?;
594 if *tokens >= 1.0 {
595 *tokens -= 1.0;
596 Ok(true)
597 } else {
598 Ok(false)
599 }
600 }
601
602 fn is_expired(&self, now: Instant) -> Result<bool> {
603 let last_refill = *self.last_refill.lock().map_err(|_| {
604 P2PError::Network(NetworkError::ProtocolError(
605 "mutex lock failed".to_string().into(),
606 ))
607 })?;
608 Ok(now.duration_since(last_refill) > Duration::from_secs(300)) }
610}
611
612impl Default for LatencyStats {
613 fn default() -> Self {
614 Self {
615 avg_ms: 0.0,
616 min_ms: 0.0,
617 max_ms: 0.0,
618 p95_ms: 0.0,
619 sample_count: 0,
620 }
621 }
622}
623
624impl Default for DHTMetrics {
625 fn default() -> Self {
626 Self {
627 ops_per_sec: 0.0,
628 avg_latency_ms: 0.0,
629 success_rate: 1.0,
630 cache_hit_rate: 0.0,
631 }
632 }
633}
634
635impl Default for MCPMetrics {
636 fn default() -> Self {
637 Self {
638 calls_per_sec: 0.0,
639 avg_latency_ms: 0.0,
640 success_rate: 1.0,
641 active_services: 0,
642 }
643 }
644}
645
646#[cfg(test)]
647mod tests {
648 use super::*;
649 use tokio::time::sleep;
650
651 fn create_test_config() -> ProductionConfig {
652 ProductionConfig {
653 max_connections: 10,
654 max_memory_bytes: 1024 * 1024, max_bandwidth_bps: 1024 * 1024, connection_timeout: Duration::from_millis(100),
657 keep_alive_interval: Duration::from_millis(50),
658 health_check_interval: Duration::from_millis(50),
659 metrics_interval: Duration::from_millis(50),
660 enable_performance_tracking: true,
661 enable_auto_cleanup: true,
662 shutdown_timeout: Duration::from_millis(200),
663 rate_limits: RateLimitConfig {
664 dht_ops_per_sec: 5,
665 mcp_calls_per_sec: 3,
666 messages_per_sec: 10,
667 burst_capacity: 5,
668 window_duration: Duration::from_millis(100),
669 },
670 }
671 }
672
673 #[test]
674 fn test_production_config_default() {
675 let config = ProductionConfig::default();
676 assert_eq!(config.max_connections, 1000);
677 assert_eq!(config.max_memory_bytes, 1024 * 1024 * 1024); assert_eq!(config.max_bandwidth_bps, 100 * 1024 * 1024); assert_eq!(config.connection_timeout, Duration::from_secs(30));
680 assert_eq!(config.keep_alive_interval, Duration::from_secs(30));
681 assert_eq!(config.health_check_interval, Duration::from_secs(60));
682 assert_eq!(config.metrics_interval, Duration::from_secs(10));
683 assert!(config.enable_performance_tracking);
684 assert!(config.enable_auto_cleanup);
685 assert_eq!(config.shutdown_timeout, Duration::from_secs(30));
686 }
687
688 #[test]
689 fn test_rate_limit_config_default() {
690 let config = RateLimitConfig::default();
691 assert_eq!(config.dht_ops_per_sec, 100);
692 assert_eq!(config.mcp_calls_per_sec, 50);
693 assert_eq!(config.messages_per_sec, 200);
694 assert_eq!(config.burst_capacity, 10);
695 assert_eq!(config.window_duration, Duration::from_secs(1));
696 }
697
698 #[test]
699 fn test_latency_stats_default() {
700 let stats = LatencyStats::default();
701 assert_eq!(stats.avg_ms, 0.0);
702 assert_eq!(stats.min_ms, 0.0);
703 assert_eq!(stats.max_ms, 0.0);
704 assert_eq!(stats.p95_ms, 0.0);
705 assert_eq!(stats.sample_count, 0);
706 }
707
708 #[test]
709 fn test_dht_metrics_default() {
710 let metrics = DHTMetrics::default();
711 assert_eq!(metrics.ops_per_sec, 0.0);
712 assert_eq!(metrics.avg_latency_ms, 0.0);
713 assert_eq!(metrics.success_rate, 1.0);
714 assert_eq!(metrics.cache_hit_rate, 0.0);
715 }
716
717 #[test]
718 fn test_mcp_metrics_default() {
719 let metrics = MCPMetrics::default();
720 assert_eq!(metrics.calls_per_sec, 0.0);
721 assert_eq!(metrics.avg_latency_ms, 0.0);
722 assert_eq!(metrics.success_rate, 1.0);
723 assert_eq!(metrics.active_services, 0);
724 }
725
726 #[tokio::test]
727 async fn test_resource_manager_creation() {
728 let config = create_test_config();
729 let manager = ResourceManager::new(config.clone());
730
731 let metrics = manager.get_metrics().await;
732 assert_eq!(metrics.active_connections, 0);
733 assert_eq!(metrics.bandwidth_usage, 0);
734 assert_eq!(metrics.memory_used, 0);
735 assert_eq!(metrics.cpu_usage, 0.0);
736 assert_eq!(metrics.network_latency.sample_count, 0);
737 assert_eq!(metrics.dht_metrics.success_rate, 1.0);
738 assert_eq!(metrics.mcp_metrics.success_rate, 1.0);
739 }
740
741 #[tokio::test]
742 async fn test_resource_manager_cloning() -> Result<()> {
743 let config = create_test_config();
744 let manager = ResourceManager::new(config);
745 let cloned = manager.clone();
746
747 let _guard1 = manager.acquire_connection().await?;
749 let _guard2 = cloned.acquire_connection().await?;
750
751 assert_eq!(manager.connection_semaphore.available_permits(), 8);
753 assert_eq!(cloned.connection_semaphore.available_permits(), 8);
754
755 Ok(())
756 }
757
758 #[tokio::test]
759 async fn test_connection_acquisition() -> Result<()> {
760 let config = ProductionConfig {
761 max_connections: 2,
762 ..create_test_config()
763 };
764 let manager = ResourceManager::new(config);
765
766 let _guard1 = manager.acquire_connection().await?;
768 assert_eq!(manager.connection_semaphore.available_permits(), 1);
769
770 let _guard2 = manager.acquire_connection().await?;
772 assert_eq!(manager.connection_semaphore.available_permits(), 0);
773
774 drop(_guard1);
776 sleep(Duration::from_millis(1)).await; assert_eq!(manager.connection_semaphore.available_permits(), 1);
778
779 Ok(())
780 }
781
782 #[tokio::test]
783 async fn test_connection_acquisition_during_shutdown() {
784 let config = create_test_config();
785 let manager = ResourceManager::new(config);
786
787 manager.is_shutting_down.store(true, Ordering::SeqCst);
789
790 let result = manager.acquire_connection().await;
792 assert!(result.is_err());
793 match result {
794 Err(e) => assert!(e.to_string().contains("shutting down")),
795 Ok(_) => panic!("Expected error but got success"),
796 }
797 }
798
799 #[tokio::test]
800 async fn test_connection_guard_drop() -> Result<()> {
801 let config = create_test_config();
802 let manager = ResourceManager::new(config);
803
804 let initial_permits = manager.connection_semaphore.available_permits();
805 {
806 let _guard = manager.acquire_connection().await?;
807 assert_eq!(
808 manager.connection_semaphore.available_permits(),
809 initial_permits - 1
810 );
811 }
812 assert_eq!(
814 manager.connection_semaphore.available_permits(),
815 initial_permits
816 );
817
818 Ok(())
819 }
820
821 #[tokio::test]
822 async fn test_rate_limiting_dht_operations() -> Result<()> {
823 let config = ProductionConfig {
824 rate_limits: RateLimitConfig {
825 dht_ops_per_sec: 2,
826 burst_capacity: 2,
827 ..Default::default()
828 },
829 ..create_test_config()
830 };
831 let manager = ResourceManager::new(config);
832
833 assert!(manager.check_rate_limit("peer1", "dht").await?);
835 assert!(manager.check_rate_limit("peer1", "dht").await?);
836
837 assert!(!manager.check_rate_limit("peer1", "dht").await?);
839
840 Ok(())
841 }
842
843 #[tokio::test]
844 async fn test_rate_limiting_mcp_operations() -> Result<()> {
845 let config = ProductionConfig {
846 rate_limits: RateLimitConfig {
847 mcp_calls_per_sec: 1,
848 burst_capacity: 1,
849 ..Default::default()
850 },
851 ..create_test_config()
852 };
853 let manager = ResourceManager::new(config);
854
855 assert!(manager.check_rate_limit("peer2", "mcp").await?);
857
858 assert!(!manager.check_rate_limit("peer2", "mcp").await?);
860 Ok(())
861 }
862
863 #[tokio::test]
864 async fn test_rate_limiting_message_operations() -> Result<()> {
865 let config = ProductionConfig {
866 rate_limits: RateLimitConfig {
867 messages_per_sec: 3,
868 burst_capacity: 3,
869 ..Default::default()
870 },
871 ..create_test_config()
872 };
873 let manager = ResourceManager::new(config);
874
875 for _ in 0..3 {
877 assert!(manager.check_rate_limit("peer3", "message").await?);
878 }
879
880 assert!(!manager.check_rate_limit("peer3", "message").await?);
882
883 Ok(())
884 }
885
886 #[tokio::test]
887 async fn test_rate_limiting_unknown_operation() -> Result<()> {
888 let config = create_test_config();
889 let manager = ResourceManager::new(config);
890
891 assert!(manager.check_rate_limit("peer4", "unknown").await?);
893 assert!(manager.check_rate_limit("peer4", "unknown").await?);
894
895 Ok(())
896 }
897
898 #[tokio::test]
899 async fn test_rate_limiting_different_peers() -> Result<()> {
900 let config = ProductionConfig {
901 rate_limits: RateLimitConfig {
902 dht_ops_per_sec: 1,
903 burst_capacity: 1,
904 ..Default::default()
905 },
906 ..create_test_config()
907 };
908 let manager = ResourceManager::new(config);
909
910 assert!(manager.check_rate_limit("peer1", "dht").await?);
912 assert!(manager.check_rate_limit("peer2", "dht").await?);
913
914 assert!(!manager.check_rate_limit("peer1", "dht").await?);
916 assert!(!manager.check_rate_limit("peer2", "dht").await?);
917
918 Ok(())
919 }
920
921 #[tokio::test]
922 async fn test_bandwidth_tracking() {
923 let tracker = BandwidthTracker::new(Duration::from_millis(100)); tracker.record(1000, 2000);
926 let usage = tracker.current_usage();
927 assert!(usage > 0);
928
929 sleep(Duration::from_millis(150)).await; let usage_after_reset = tracker.current_usage();
932 assert_eq!(usage_after_reset, 0);
933 }
934
935 #[tokio::test]
936 async fn test_bandwidth_tracking_rate_calculation() {
937 let tracker = BandwidthTracker::new(Duration::from_secs(1));
938
939 tracker.record(500, 500); sleep(Duration::from_millis(50)).await;
944
945 let usage = tracker.current_usage();
946 assert!(usage > 10000); }
949
950 #[tokio::test]
951 async fn test_bandwidth_tracking_multiple_records() {
952 let tracker = BandwidthTracker::new(Duration::from_millis(200));
953
954 tracker.record(100, 200);
955 tracker.record(300, 400);
956 tracker.record(500, 600);
957
958 let usage = tracker.current_usage();
959 assert!(usage > 0);
960
961 let sent = tracker.bytes_sent.load(Ordering::Relaxed);
963 let received = tracker.bytes_received.load(Ordering::Relaxed);
964 assert_eq!(sent, 900); assert_eq!(received, 1200); }
967
968 #[tokio::test]
969 async fn test_manager_bandwidth_recording() {
970 let config = create_test_config();
971 let manager = ResourceManager::new(config);
972
973 manager.record_bandwidth(1000, 2000);
975
976 let usage = manager.bandwidth_tracker.current_usage();
978 assert!(usage > 0);
979 }
980
981 #[tokio::test]
982 async fn test_health_check_success() {
983 let config = ProductionConfig {
984 max_memory_bytes: 2048, max_bandwidth_bps: 10000, max_connections: 5,
987 ..create_test_config()
988 };
989 let manager = ResourceManager::new(config);
990
991 let result = manager.health_check().await;
993 assert!(result.is_ok());
994 }
995
996 #[tokio::test]
997 async fn test_health_check_memory_limit_exceeded() {
998 let config = ProductionConfig {
999 max_memory_bytes: 100, ..create_test_config()
1001 };
1002 let manager = ResourceManager::new(config);
1003
1004 {
1006 let mut metrics = manager.metrics.write().await;
1007 metrics.memory_used = 200; }
1009
1010 let result = manager.health_check().await;
1012 assert!(result.is_err());
1013 assert!(
1014 result
1015 .unwrap_err()
1016 .to_string()
1017 .contains("Memory limit exceeded")
1018 );
1019 }
1020
1021 #[tokio::test]
1022 async fn test_health_check_bandwidth_warning() {
1023 let config = ProductionConfig {
1024 max_bandwidth_bps: 1000,
1025 ..create_test_config()
1026 };
1027 let manager = ResourceManager::new(config);
1028
1029 {
1031 let mut metrics = manager.metrics.write().await;
1032 metrics.bandwidth_usage = 2000; }
1034
1035 let result = manager.health_check().await;
1037 assert!(result.is_ok());
1038 }
1039
1040 #[tokio::test]
1041 async fn test_health_check_connection_warning() -> Result<()> {
1042 let config = ProductionConfig {
1043 max_connections: 2,
1044 ..create_test_config()
1045 };
1046 let manager = ResourceManager::new(config);
1047
1048 let _guard1 = manager.acquire_connection().await?;
1050 let _guard2 = manager.acquire_connection().await?;
1051
1052 {
1054 let mut metrics = manager.metrics.write().await;
1055 metrics.active_connections = 2;
1056 }
1057
1058 let result = manager.health_check().await;
1060 assert!(result.is_ok());
1061 Ok(())
1062 }
1063
1064 #[tokio::test]
1065 async fn test_metrics_collection() -> Result<()> {
1066 let config = create_test_config();
1067 let manager = ResourceManager::new(config);
1068
1069 manager.record_bandwidth(500, 1000);
1071 let _guard = manager.acquire_connection().await?;
1072
1073 manager.collect_metrics().await?;
1075
1076 let metrics = manager.get_metrics().await;
1077 assert_eq!(metrics.active_connections, 1);
1078 assert!(metrics.bandwidth_usage > 0);
1079 assert!(metrics.timestamp.elapsed().unwrap().as_millis() < 100); Ok(())
1082 }
1083
1084 #[tokio::test]
1085 async fn test_graceful_shutdown() -> Result<()> {
1086 let config = ProductionConfig {
1087 shutdown_timeout: Duration::from_millis(100),
1088 ..create_test_config()
1089 };
1090 let manager = ResourceManager::new(config);
1091
1092 manager.start().await?;
1094
1095 let result = manager.shutdown().await;
1097 assert!(result.is_ok());
1098
1099 assert!(manager.is_shutting_down.load(Ordering::SeqCst));
1101
1102 Ok(())
1103 }
1104
1105 #[tokio::test]
1106 async fn test_graceful_shutdown_with_connections() -> Result<()> {
1107 let config = ProductionConfig {
1108 shutdown_timeout: Duration::from_millis(200),
1109 max_connections: 2,
1110 ..create_test_config()
1111 };
1112 let manager = ResourceManager::new(config);
1113
1114 let guard = manager.acquire_connection().await?;
1116
1117 let manager_clone = manager.clone();
1119 let shutdown_task = tokio::spawn(async move { manager_clone.shutdown().await });
1120
1121 sleep(Duration::from_millis(50)).await;
1123 drop(guard);
1124
1125 let result = shutdown_task.await.expect("Task panicked");
1127 assert!(result.is_ok());
1128 Ok(())
1129 }
1130
1131 #[tokio::test]
1132 async fn test_shutdown_timeout() -> Result<()> {
1133 let config = ProductionConfig {
1134 shutdown_timeout: Duration::from_millis(50), max_connections: 1,
1136 ..create_test_config()
1137 };
1138 let manager = ResourceManager::new(config);
1139
1140 let _guard = manager.acquire_connection().await?;
1142
1143 let result = manager.shutdown().await;
1145 assert!(result.is_err());
1146 assert!(result.unwrap_err().to_string().contains("Shutdown timeout"));
1147 Ok(())
1148 }
1149
1150 #[tokio::test]
1151 async fn test_start_with_disabled_features() {
1152 let config = ProductionConfig {
1153 enable_performance_tracking: false,
1154 enable_auto_cleanup: false,
1155 ..create_test_config()
1156 };
1157 let manager = ResourceManager::new(config);
1158
1159 let result = manager.start().await;
1161 assert!(result.is_ok());
1162 }
1163
1164 #[test]
1165 fn test_rate_limiter_creation() {
1166 let limiter = RateLimiter::new(10.0, 5.0); assert!(limiter.try_acquire().expect("Should succeed in test"));
1170 }
1171
1172 #[test]
1173 fn test_rate_limiter_token_exhaustion() {
1174 let limiter = RateLimiter::new(2.0, 1.0); assert!(limiter.try_acquire().expect("Should succeed in test"));
1178 assert!(limiter.try_acquire().expect("Should succeed in test"));
1179
1180 assert!(!limiter.try_acquire().expect("Should succeed in test"));
1182 }
1183
1184 #[tokio::test]
1185 async fn test_rate_limiter_refill() {
1186 let limiter = RateLimiter::new(1.0, 10.0); assert!(limiter.try_acquire().expect("Should succeed in test"));
1190 assert!(!limiter.try_acquire().expect("Should succeed in test"));
1191
1192 sleep(Duration::from_millis(200)).await; assert!(limiter.try_acquire().expect("Should succeed in test"));
1197 }
1198
1199 #[test]
1200 fn test_rate_limiter_expiration() {
1201 let limiter = RateLimiter::new(10.0, 5.0);
1202
1203 assert!(
1205 !limiter
1206 .is_expired(Instant::now())
1207 .expect("Should succeed in test")
1208 );
1209
1210 let future_time = Instant::now() + Duration::from_secs(400);
1212 assert!(
1213 limiter
1214 .is_expired(future_time)
1215 .expect("Should succeed in test")
1216 );
1217 }
1218
1219 #[tokio::test]
1220 async fn test_cleanup_resources() -> Result<()> {
1221 let config = create_test_config();
1222 let manager = ResourceManager::new(config);
1223
1224 manager.check_rate_limit("peer1", "dht").await?;
1226 manager.check_rate_limit("peer2", "mcp").await?;
1227
1228 {
1230 let limiters = manager.rate_limiters.read().await;
1231 assert_eq!(limiters.len(), 2);
1232 }
1233
1234 manager.cleanup_resources().await;
1236
1237 {
1238 let limiters = manager.rate_limiters.read().await;
1239 assert_eq!(limiters.len(), 2); }
1241 Ok(())
1242 }
1243
1244 #[test]
1245 fn test_bandwidth_tracker_creation() {
1246 let tracker = BandwidthTracker::new(Duration::from_secs(1));
1247
1248 assert_eq!(tracker.current_usage(), 0);
1250 }
1251
1252 #[test]
1253 fn test_bandwidth_tracker_window_reset() {
1254 let tracker = BandwidthTracker::new(Duration::from_millis(1)); tracker.record(1000, 2000);
1257
1258 let initial_usage = tracker.current_usage();
1260 assert!(initial_usage > 0);
1261
1262 std::thread::sleep(Duration::from_millis(10));
1264 let usage_after_window = tracker.current_usage();
1265 assert_eq!(usage_after_window, 0);
1266 }
1267
1268 #[tokio::test]
1269 async fn test_resource_metrics_structure() {
1270 let config = create_test_config();
1271 let manager = ResourceManager::new(config);
1272
1273 let metrics = manager.get_metrics().await;
1274
1275 assert_eq!(metrics.memory_used, 0);
1277 assert_eq!(metrics.active_connections, 0);
1278 assert_eq!(metrics.bandwidth_usage, 0);
1279 assert_eq!(metrics.cpu_usage, 0.0);
1280
1281 assert_eq!(metrics.network_latency.sample_count, 0);
1283 assert_eq!(metrics.dht_metrics.ops_per_sec, 0.0);
1284 assert_eq!(metrics.mcp_metrics.calls_per_sec, 0.0);
1285
1286 assert!(metrics.timestamp.elapsed().unwrap().as_secs() < 1);
1288 }
1289}