use crate::{P2PError, Result};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::{RwLock, Semaphore};
use tokio::time::interval;
use tracing::{debug, info, warn, error};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProductionConfig {
pub max_connections: usize,
pub max_memory_bytes: u64,
pub max_bandwidth_bps: u64,
pub connection_timeout: Duration,
pub keep_alive_interval: Duration,
pub health_check_interval: Duration,
pub metrics_interval: Duration,
pub enable_performance_tracking: bool,
pub enable_auto_cleanup: bool,
pub shutdown_timeout: Duration,
pub rate_limits: RateLimitConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RateLimitConfig {
pub dht_ops_per_sec: u32,
pub mcp_calls_per_sec: u32,
pub messages_per_sec: u32,
pub burst_capacity: u32,
pub window_duration: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceMetrics {
pub memory_used: u64,
pub active_connections: usize,
pub bandwidth_usage: u64,
pub cpu_usage: f64,
pub network_latency: LatencyStats,
pub dht_metrics: DHTMetrics,
pub mcp_metrics: MCPMetrics,
pub timestamp: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LatencyStats {
pub avg_ms: f64,
pub min_ms: f64,
pub max_ms: f64,
pub p95_ms: f64,
pub sample_count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DHTMetrics {
pub ops_per_sec: f64,
pub avg_latency_ms: f64,
pub success_rate: f64,
pub cache_hit_rate: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MCPMetrics {
pub calls_per_sec: f64,
pub avg_latency_ms: f64,
pub success_rate: f64,
pub active_services: usize,
}
pub struct ResourceManager {
config: ProductionConfig,
metrics: Arc<RwLock<ResourceMetrics>>,
connection_semaphore: Arc<Semaphore>,
bandwidth_tracker: Arc<BandwidthTracker>,
rate_limiters: Arc<RwLock<std::collections::HashMap<String, RateLimiter>>>,
shutdown_signal: Arc<tokio::sync::Notify>,
is_shutting_down: Arc<std::sync::atomic::AtomicBool>,
}
struct BandwidthTracker {
bytes_sent: AtomicU64,
bytes_received: AtomicU64,
last_reset: Arc<RwLock<Instant>>,
window_duration: Duration,
}
struct RateLimiter {
tokens: Arc<std::sync::Mutex<f64>>,
last_refill: Arc<std::sync::Mutex<Instant>>,
max_tokens: f64,
refill_rate: f64, }
impl Default for ProductionConfig {
fn default() -> Self {
Self {
max_connections: 1000,
max_memory_bytes: 1024 * 1024 * 1024, max_bandwidth_bps: 100 * 1024 * 1024, connection_timeout: Duration::from_secs(30),
keep_alive_interval: Duration::from_secs(30),
health_check_interval: Duration::from_secs(60),
metrics_interval: Duration::from_secs(10),
enable_performance_tracking: true,
enable_auto_cleanup: true,
shutdown_timeout: Duration::from_secs(30),
rate_limits: RateLimitConfig::default(),
}
}
}
impl Default for RateLimitConfig {
fn default() -> Self {
Self {
dht_ops_per_sec: 100,
mcp_calls_per_sec: 50,
messages_per_sec: 200,
burst_capacity: 10,
window_duration: Duration::from_secs(1),
}
}
}
impl ResourceManager {
pub fn new(config: ProductionConfig) -> Self {
let connection_semaphore = Arc::new(Semaphore::new(config.max_connections));
let bandwidth_tracker = Arc::new(BandwidthTracker::new(Duration::from_secs(1)));
let initial_metrics = ResourceMetrics {
memory_used: 0,
active_connections: 0,
bandwidth_usage: 0,
cpu_usage: 0.0,
network_latency: LatencyStats::default(),
dht_metrics: DHTMetrics::default(),
mcp_metrics: MCPMetrics::default(),
timestamp: SystemTime::now(),
};
Self {
config,
metrics: Arc::new(RwLock::new(initial_metrics)),
connection_semaphore,
bandwidth_tracker,
rate_limiters: Arc::new(RwLock::new(std::collections::HashMap::new())),
shutdown_signal: Arc::new(tokio::sync::Notify::new()),
is_shutting_down: Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
pub async fn start(&self) -> Result<()> {
info!("Starting production resource manager");
if self.config.enable_performance_tracking {
self.spawn_metrics_collector().await;
}
self.spawn_health_checker().await;
if self.config.enable_auto_cleanup {
self.spawn_cleanup_task().await;
}
info!("Production resource manager started successfully");
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
info!("Initiating graceful shutdown of resource manager");
self.is_shutting_down.store(true, Ordering::SeqCst);
self.shutdown_signal.notify_waiters();
tokio::time::timeout(self.config.shutdown_timeout, async {
while self.connection_semaphore.available_permits() < self.config.max_connections {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}).await.map_err(|_| P2PError::Network("Shutdown timeout exceeded".to_string()))?;
info!("Resource manager shutdown completed");
Ok(())
}
pub async fn acquire_connection(&self) -> Result<ConnectionGuard<'_>> {
if self.is_shutting_down.load(Ordering::SeqCst) {
return Err(P2PError::Network("System is shutting down".to_string()));
}
let permit = self.connection_semaphore.clone()
.acquire_owned()
.await
.map_err(|_| P2PError::Network("Connection semaphore closed".to_string()))?;
debug!("Connection acquired, {} remaining", self.connection_semaphore.available_permits());
Ok(ConnectionGuard { permit, _manager: self })
}
pub async fn check_rate_limit(&self, peer_id: &str, operation: &str) -> Result<bool> {
let limit = match operation {
"dht" => self.config.rate_limits.dht_ops_per_sec,
"mcp" => self.config.rate_limits.mcp_calls_per_sec,
"message" => self.config.rate_limits.messages_per_sec,
_ => return Ok(true), };
let mut limiters = self.rate_limiters.write().await;
let limiter = limiters.entry(peer_id.to_string())
.or_insert_with(|| RateLimiter::new(limit as f64, self.config.rate_limits.burst_capacity as f64));
Ok(limiter.try_acquire())
}
pub fn record_bandwidth(&self, bytes_sent: u64, bytes_received: u64) {
self.bandwidth_tracker.record(bytes_sent, bytes_received);
}
pub async fn get_metrics(&self) -> ResourceMetrics {
self.metrics.read().await.clone()
}
pub async fn health_check(&self) -> Result<()> {
let metrics = self.get_metrics().await;
if self.config.max_memory_bytes > 0 && metrics.memory_used > self.config.max_memory_bytes {
warn!("Memory usage ({} bytes) exceeds limit ({} bytes)",
metrics.memory_used, self.config.max_memory_bytes);
return Err(P2PError::Network("Memory limit exceeded".to_string()));
}
if metrics.bandwidth_usage > self.config.max_bandwidth_bps {
warn!("Bandwidth usage ({} bps) exceeds limit ({} bps)",
metrics.bandwidth_usage, self.config.max_bandwidth_bps);
}
if metrics.active_connections >= self.config.max_connections {
warn!("Connection count ({}) at maximum ({})",
metrics.active_connections, self.config.max_connections);
}
debug!("Health check passed: {} connections, {} bytes memory, {} bps bandwidth",
metrics.active_connections, metrics.memory_used, metrics.bandwidth_usage);
Ok(())
}
async fn spawn_metrics_collector(&self) {
let manager = self.clone();
tokio::spawn(async move {
let mut interval = interval(manager.config.metrics_interval);
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = manager.collect_metrics().await {
error!("Failed to collect metrics: {}", e);
}
}
_ = manager.shutdown_signal.notified() => {
debug!("Metrics collector shutting down");
break;
}
}
}
});
}
async fn spawn_health_checker(&self) {
let manager = self.clone();
tokio::spawn(async move {
let mut interval = interval(manager.config.health_check_interval);
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = manager.health_check().await {
error!("Health check failed: {}", e);
}
}
_ = manager.shutdown_signal.notified() => {
debug!("Health checker shutting down");
break;
}
}
}
});
}
async fn spawn_cleanup_task(&self) {
let manager = self.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(300)); loop {
tokio::select! {
_ = interval.tick() => {
manager.cleanup_resources().await;
}
_ = manager.shutdown_signal.notified() => {
debug!("Cleanup task shutting down");
break;
}
}
}
});
}
async fn collect_metrics(&self) -> Result<()> {
let mut metrics = self.metrics.write().await;
metrics.bandwidth_usage = self.bandwidth_tracker.current_usage();
metrics.active_connections = self.config.max_connections - self.connection_semaphore.available_permits();
metrics.timestamp = SystemTime::now();
debug!("Metrics updated: {} connections, {} bps bandwidth",
metrics.active_connections, metrics.bandwidth_usage);
Ok(())
}
async fn cleanup_resources(&self) {
debug!("Starting resource cleanup");
let mut limiters = self.rate_limiters.write().await;
let now = Instant::now();
limiters.retain(|_, limiter| !limiter.is_expired(now));
debug!("Cleanup completed, {} rate limiters remaining", limiters.len());
}
}
impl Clone for ResourceManager {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
metrics: self.metrics.clone(),
connection_semaphore: self.connection_semaphore.clone(),
bandwidth_tracker: self.bandwidth_tracker.clone(),
rate_limiters: self.rate_limiters.clone(),
shutdown_signal: self.shutdown_signal.clone(),
is_shutting_down: self.is_shutting_down.clone(),
}
}
}
pub struct ConnectionGuard<'a> {
#[allow(dead_code)]
permit: tokio::sync::OwnedSemaphorePermit,
_manager: &'a ResourceManager,
}
impl<'a> Drop for ConnectionGuard<'a> {
fn drop(&mut self) {
debug!("Connection released");
}
}
impl BandwidthTracker {
fn new(window_duration: Duration) -> Self {
Self {
bytes_sent: AtomicU64::new(0),
bytes_received: AtomicU64::new(0),
last_reset: Arc::new(RwLock::new(Instant::now())),
window_duration,
}
}
fn record(&self, bytes_sent: u64, bytes_received: u64) {
self.bytes_sent.fetch_add(bytes_sent, Ordering::Relaxed);
self.bytes_received.fetch_add(bytes_received, Ordering::Relaxed);
}
fn current_usage(&self) -> u64 {
let now = Instant::now();
let last_reset = {
if let Ok(guard) = self.last_reset.try_read() {
*guard
} else {
let sent = self.bytes_sent.load(Ordering::Relaxed);
let received = self.bytes_received.load(Ordering::Relaxed);
return sent + received; }
};
if now.duration_since(last_reset) >= self.window_duration {
if let Ok(mut guard) = self.last_reset.try_write() {
self.bytes_sent.store(0, Ordering::Relaxed);
self.bytes_received.store(0, Ordering::Relaxed);
*guard = now;
return 0;
}
}
let sent = self.bytes_sent.load(Ordering::Relaxed);
let received = self.bytes_received.load(Ordering::Relaxed);
let elapsed_secs = now.duration_since(last_reset).as_secs_f64();
if elapsed_secs > 0.0 {
((sent + received) as f64 / elapsed_secs) as u64
} else {
0
}
}
}
impl RateLimiter {
fn new(max_tokens: f64, refill_rate: f64) -> Self {
Self {
tokens: Arc::new(std::sync::Mutex::new(max_tokens)),
last_refill: Arc::new(std::sync::Mutex::new(Instant::now())),
max_tokens,
refill_rate,
}
}
fn try_acquire(&self) -> bool {
let now = Instant::now();
{
let mut last_refill = self.last_refill.lock().unwrap();
let elapsed = now.duration_since(*last_refill).as_secs_f64();
if elapsed > 0.0 {
let mut tokens = self.tokens.lock().unwrap();
*tokens = (*tokens + elapsed * self.refill_rate).min(self.max_tokens);
*last_refill = now;
}
}
let mut tokens = self.tokens.lock().unwrap();
if *tokens >= 1.0 {
*tokens -= 1.0;
true
} else {
false
}
}
fn is_expired(&self, now: Instant) -> bool {
let last_refill = *self.last_refill.lock().unwrap();
now.duration_since(last_refill) > Duration::from_secs(300) }
}
impl Default for LatencyStats {
fn default() -> Self {
Self {
avg_ms: 0.0,
min_ms: 0.0,
max_ms: 0.0,
p95_ms: 0.0,
sample_count: 0,
}
}
}
impl Default for DHTMetrics {
fn default() -> Self {
Self {
ops_per_sec: 0.0,
avg_latency_ms: 0.0,
success_rate: 1.0,
cache_hit_rate: 0.0,
}
}
}
impl Default for MCPMetrics {
fn default() -> Self {
Self {
calls_per_sec: 0.0,
avg_latency_ms: 0.0,
success_rate: 1.0,
active_services: 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::sleep;
fn create_test_config() -> ProductionConfig {
ProductionConfig {
max_connections: 10,
max_memory_bytes: 1024 * 1024, max_bandwidth_bps: 1024 * 1024, connection_timeout: Duration::from_millis(100),
keep_alive_interval: Duration::from_millis(50),
health_check_interval: Duration::from_millis(50),
metrics_interval: Duration::from_millis(50),
enable_performance_tracking: true,
enable_auto_cleanup: true,
shutdown_timeout: Duration::from_millis(200),
rate_limits: RateLimitConfig {
dht_ops_per_sec: 5,
mcp_calls_per_sec: 3,
messages_per_sec: 10,
burst_capacity: 5,
window_duration: Duration::from_millis(100),
},
}
}
#[test]
fn test_production_config_default() {
let config = ProductionConfig::default();
assert_eq!(config.max_connections, 1000);
assert_eq!(config.max_memory_bytes, 1024 * 1024 * 1024); assert_eq!(config.max_bandwidth_bps, 100 * 1024 * 1024); assert_eq!(config.connection_timeout, Duration::from_secs(30));
assert_eq!(config.keep_alive_interval, Duration::from_secs(30));
assert_eq!(config.health_check_interval, Duration::from_secs(60));
assert_eq!(config.metrics_interval, Duration::from_secs(10));
assert!(config.enable_performance_tracking);
assert!(config.enable_auto_cleanup);
assert_eq!(config.shutdown_timeout, Duration::from_secs(30));
}
#[test]
fn test_rate_limit_config_default() {
let config = RateLimitConfig::default();
assert_eq!(config.dht_ops_per_sec, 100);
assert_eq!(config.mcp_calls_per_sec, 50);
assert_eq!(config.messages_per_sec, 200);
assert_eq!(config.burst_capacity, 10);
assert_eq!(config.window_duration, Duration::from_secs(1));
}
#[test]
fn test_latency_stats_default() {
let stats = LatencyStats::default();
assert_eq!(stats.avg_ms, 0.0);
assert_eq!(stats.min_ms, 0.0);
assert_eq!(stats.max_ms, 0.0);
assert_eq!(stats.p95_ms, 0.0);
assert_eq!(stats.sample_count, 0);
}
#[test]
fn test_dht_metrics_default() {
let metrics = DHTMetrics::default();
assert_eq!(metrics.ops_per_sec, 0.0);
assert_eq!(metrics.avg_latency_ms, 0.0);
assert_eq!(metrics.success_rate, 1.0);
assert_eq!(metrics.cache_hit_rate, 0.0);
}
#[test]
fn test_mcp_metrics_default() {
let metrics = MCPMetrics::default();
assert_eq!(metrics.calls_per_sec, 0.0);
assert_eq!(metrics.avg_latency_ms, 0.0);
assert_eq!(metrics.success_rate, 1.0);
assert_eq!(metrics.active_services, 0);
}
#[tokio::test]
async fn test_resource_manager_creation() {
let config = create_test_config();
let manager = ResourceManager::new(config.clone());
let metrics = manager.get_metrics().await;
assert_eq!(metrics.active_connections, 0);
assert_eq!(metrics.bandwidth_usage, 0);
assert_eq!(metrics.memory_used, 0);
assert_eq!(metrics.cpu_usage, 0.0);
assert_eq!(metrics.network_latency.sample_count, 0);
assert_eq!(metrics.dht_metrics.success_rate, 1.0);
assert_eq!(metrics.mcp_metrics.success_rate, 1.0);
}
#[tokio::test]
async fn test_resource_manager_cloning() {
let config = create_test_config();
let manager = ResourceManager::new(config);
let cloned = manager.clone();
let _guard1 = manager.acquire_connection().await.unwrap();
let _guard2 = cloned.acquire_connection().await.unwrap();
assert_eq!(manager.connection_semaphore.available_permits(), 8);
assert_eq!(cloned.connection_semaphore.available_permits(), 8);
}
#[tokio::test]
async fn test_connection_acquisition() {
let config = ProductionConfig {
max_connections: 2,
..create_test_config()
};
let manager = ResourceManager::new(config);
let _guard1 = manager.acquire_connection().await.unwrap();
assert_eq!(manager.connection_semaphore.available_permits(), 1);
let _guard2 = manager.acquire_connection().await.unwrap();
assert_eq!(manager.connection_semaphore.available_permits(), 0);
drop(_guard1);
sleep(Duration::from_millis(1)).await; assert_eq!(manager.connection_semaphore.available_permits(), 1);
}
#[tokio::test]
async fn test_connection_acquisition_during_shutdown() {
let config = create_test_config();
let manager = ResourceManager::new(config);
manager.is_shutting_down.store(true, Ordering::SeqCst);
let result = manager.acquire_connection().await;
assert!(result.is_err());
match result {
Err(e) => assert!(e.to_string().contains("shutting down")),
Ok(_) => panic!("Expected error but got success"),
}
}
#[tokio::test]
async fn test_connection_guard_drop() {
let config = create_test_config();
let manager = ResourceManager::new(config);
let initial_permits = manager.connection_semaphore.available_permits();
{
let _guard = manager.acquire_connection().await.unwrap();
assert_eq!(manager.connection_semaphore.available_permits(), initial_permits - 1);
}
assert_eq!(manager.connection_semaphore.available_permits(), initial_permits);
}
#[tokio::test]
async fn test_rate_limiting_dht_operations() {
let config = ProductionConfig {
rate_limits: RateLimitConfig {
dht_ops_per_sec: 2,
burst_capacity: 2,
..Default::default()
},
..create_test_config()
};
let manager = ResourceManager::new(config);
assert!(manager.check_rate_limit("peer1", "dht").await.unwrap());
assert!(manager.check_rate_limit("peer1", "dht").await.unwrap());
assert!(!manager.check_rate_limit("peer1", "dht").await.unwrap());
}
#[tokio::test]
async fn test_rate_limiting_mcp_operations() {
let config = ProductionConfig {
rate_limits: RateLimitConfig {
mcp_calls_per_sec: 1,
burst_capacity: 1,
..Default::default()
},
..create_test_config()
};
let manager = ResourceManager::new(config);
assert!(manager.check_rate_limit("peer2", "mcp").await.unwrap());
assert!(!manager.check_rate_limit("peer2", "mcp").await.unwrap());
}
#[tokio::test]
async fn test_rate_limiting_message_operations() {
let config = ProductionConfig {
rate_limits: RateLimitConfig {
messages_per_sec: 3,
burst_capacity: 3,
..Default::default()
},
..create_test_config()
};
let manager = ResourceManager::new(config);
for _ in 0..3 {
assert!(manager.check_rate_limit("peer3", "message").await.unwrap());
}
assert!(!manager.check_rate_limit("peer3", "message").await.unwrap());
}
#[tokio::test]
async fn test_rate_limiting_unknown_operation() {
let config = create_test_config();
let manager = ResourceManager::new(config);
assert!(manager.check_rate_limit("peer4", "unknown").await.unwrap());
assert!(manager.check_rate_limit("peer4", "unknown").await.unwrap());
}
#[tokio::test]
async fn test_rate_limiting_different_peers() {
let config = ProductionConfig {
rate_limits: RateLimitConfig {
dht_ops_per_sec: 1,
burst_capacity: 1,
..Default::default()
},
..create_test_config()
};
let manager = ResourceManager::new(config);
assert!(manager.check_rate_limit("peer1", "dht").await.unwrap());
assert!(manager.check_rate_limit("peer2", "dht").await.unwrap());
assert!(!manager.check_rate_limit("peer1", "dht").await.unwrap());
assert!(!manager.check_rate_limit("peer2", "dht").await.unwrap());
}
#[tokio::test]
async fn test_bandwidth_tracking() {
let tracker = BandwidthTracker::new(Duration::from_millis(100));
tracker.record(1000, 2000);
let usage = tracker.current_usage();
assert!(usage > 0);
sleep(Duration::from_millis(150)).await; let usage_after_reset = tracker.current_usage();
assert_eq!(usage_after_reset, 0);
}
#[tokio::test]
async fn test_bandwidth_tracking_rate_calculation() {
let tracker = BandwidthTracker::new(Duration::from_secs(1));
tracker.record(500, 500);
sleep(Duration::from_millis(50)).await;
let usage = tracker.current_usage();
assert!(usage > 10000); }
#[tokio::test]
async fn test_bandwidth_tracking_multiple_records() {
let tracker = BandwidthTracker::new(Duration::from_millis(200));
tracker.record(100, 200);
tracker.record(300, 400);
tracker.record(500, 600);
let usage = tracker.current_usage();
assert!(usage > 0);
let sent = tracker.bytes_sent.load(Ordering::Relaxed);
let received = tracker.bytes_received.load(Ordering::Relaxed);
assert_eq!(sent, 900); assert_eq!(received, 1200); }
#[tokio::test]
async fn test_manager_bandwidth_recording() {
let config = create_test_config();
let manager = ResourceManager::new(config);
manager.record_bandwidth(1000, 2000);
let usage = manager.bandwidth_tracker.current_usage();
assert!(usage > 0);
}
#[tokio::test]
async fn test_health_check_success() {
let config = ProductionConfig {
max_memory_bytes: 2048, max_bandwidth_bps: 10000, max_connections: 5,
..create_test_config()
};
let manager = ResourceManager::new(config);
let result = manager.health_check().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_health_check_memory_limit_exceeded() {
let config = ProductionConfig {
max_memory_bytes: 100, ..create_test_config()
};
let manager = ResourceManager::new(config);
{
let mut metrics = manager.metrics.write().await;
metrics.memory_used = 200; }
let result = manager.health_check().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Memory limit exceeded"));
}
#[tokio::test]
async fn test_health_check_bandwidth_warning() {
let config = ProductionConfig {
max_bandwidth_bps: 1000,
..create_test_config()
};
let manager = ResourceManager::new(config);
{
let mut metrics = manager.metrics.write().await;
metrics.bandwidth_usage = 2000; }
let result = manager.health_check().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_health_check_connection_warning() {
let config = ProductionConfig {
max_connections: 2,
..create_test_config()
};
let manager = ResourceManager::new(config);
let _guard1 = manager.acquire_connection().await.unwrap();
let _guard2 = manager.acquire_connection().await.unwrap();
{
let mut metrics = manager.metrics.write().await;
metrics.active_connections = 2;
}
let result = manager.health_check().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_metrics_collection() {
let config = create_test_config();
let manager = ResourceManager::new(config);
manager.record_bandwidth(500, 1000);
let _guard = manager.acquire_connection().await.unwrap();
manager.collect_metrics().await.unwrap();
let metrics = manager.get_metrics().await;
assert_eq!(metrics.active_connections, 1);
assert!(metrics.bandwidth_usage > 0);
assert!(metrics.timestamp.elapsed().unwrap().as_millis() < 100); }
#[tokio::test]
async fn test_graceful_shutdown() {
let config = ProductionConfig {
shutdown_timeout: Duration::from_millis(100),
..create_test_config()
};
let manager = ResourceManager::new(config);
manager.start().await.unwrap();
let result = manager.shutdown().await;
assert!(result.is_ok());
assert!(manager.is_shutting_down.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_graceful_shutdown_with_connections() {
let config = ProductionConfig {
shutdown_timeout: Duration::from_millis(200),
max_connections: 2,
..create_test_config()
};
let manager = ResourceManager::new(config);
let guard = manager.acquire_connection().await.unwrap();
let manager_clone = manager.clone();
let shutdown_task = tokio::spawn(async move {
manager_clone.shutdown().await
});
sleep(Duration::from_millis(50)).await;
drop(guard);
let result = shutdown_task.await.unwrap();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_shutdown_timeout() {
let config = ProductionConfig {
shutdown_timeout: Duration::from_millis(50), max_connections: 1,
..create_test_config()
};
let manager = ResourceManager::new(config);
let _guard = manager.acquire_connection().await.unwrap();
let result = manager.shutdown().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Shutdown timeout"));
}
#[tokio::test]
async fn test_start_with_disabled_features() {
let config = ProductionConfig {
enable_performance_tracking: false,
enable_auto_cleanup: false,
..create_test_config()
};
let manager = ResourceManager::new(config);
let result = manager.start().await;
assert!(result.is_ok());
}
#[test]
fn test_rate_limiter_creation() {
let limiter = RateLimiter::new(10.0, 5.0);
assert!(limiter.try_acquire());
}
#[test]
fn test_rate_limiter_token_exhaustion() {
let limiter = RateLimiter::new(2.0, 1.0);
assert!(limiter.try_acquire());
assert!(limiter.try_acquire());
assert!(!limiter.try_acquire());
}
#[tokio::test]
async fn test_rate_limiter_refill() {
let limiter = RateLimiter::new(1.0, 10.0);
assert!(limiter.try_acquire());
assert!(!limiter.try_acquire());
sleep(Duration::from_millis(200)).await;
assert!(limiter.try_acquire());
}
#[test]
fn test_rate_limiter_expiration() {
let limiter = RateLimiter::new(10.0, 5.0);
assert!(!limiter.is_expired(Instant::now()));
let future_time = Instant::now() + Duration::from_secs(400);
assert!(limiter.is_expired(future_time));
}
#[tokio::test]
async fn test_cleanup_resources() {
let config = create_test_config();
let manager = ResourceManager::new(config);
manager.check_rate_limit("peer1", "dht").await.unwrap();
manager.check_rate_limit("peer2", "mcp").await.unwrap();
{
let limiters = manager.rate_limiters.read().await;
assert_eq!(limiters.len(), 2);
}
manager.cleanup_resources().await;
{
let limiters = manager.rate_limiters.read().await;
assert_eq!(limiters.len(), 2); }
}
#[test]
fn test_bandwidth_tracker_creation() {
let tracker = BandwidthTracker::new(Duration::from_secs(1));
assert_eq!(tracker.current_usage(), 0);
}
#[test]
fn test_bandwidth_tracker_window_reset() {
let tracker = BandwidthTracker::new(Duration::from_millis(1));
tracker.record(1000, 2000);
let initial_usage = tracker.current_usage();
assert!(initial_usage > 0);
std::thread::sleep(Duration::from_millis(10));
let usage_after_window = tracker.current_usage();
assert_eq!(usage_after_window, 0);
}
#[tokio::test]
async fn test_resource_metrics_structure() {
let config = create_test_config();
let manager = ResourceManager::new(config);
let metrics = manager.get_metrics().await;
assert_eq!(metrics.memory_used, 0);
assert_eq!(metrics.active_connections, 0);
assert_eq!(metrics.bandwidth_usage, 0);
assert_eq!(metrics.cpu_usage, 0.0);
assert_eq!(metrics.network_latency.sample_count, 0);
assert_eq!(metrics.dht_metrics.ops_per_sec, 0.0);
assert_eq!(metrics.mcp_metrics.calls_per_sec, 0.0);
assert!(metrics.timestamp.elapsed().unwrap().as_secs() < 1);
}
}