use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct ResourceConfig {
pub max_memory_bytes: u64,
pub max_keys: usize,
pub memory_threshold_percent: u8,
pub buffer_overflow_threshold: usize,
pub cleanup_interval: Duration,
pub emergency_cleanup_threshold: u8,
}
impl Default for ResourceConfig {
fn default() -> Self {
Self {
max_memory_bytes: 1024 * 1024 * 1024, max_keys: 100_000,
memory_threshold_percent: 80,
buffer_overflow_threshold: 10_000,
cleanup_interval: Duration::from_secs(30),
emergency_cleanup_threshold: 95,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum CircuitBreakerState {
Closed,
Open,
HalfOpen,
}
#[derive(Debug, Clone)]
pub struct ResourceMetrics {
pub current_memory_bytes: u64,
pub peak_memory_bytes: u64,
pub active_keys: usize,
pub buffer_overflow_count: u64,
pub circuit_breaker_trips: u64,
pub emergency_cleanups: u64,
}
pub struct ResourceManager {
config: ResourceConfig,
memory_usage: Arc<AtomicU64>,
active_keys: Arc<AtomicUsize>,
buffer_overflow_count: Arc<AtomicU64>,
circuit_breaker_trips: Arc<AtomicU64>,
emergency_cleanups: Arc<AtomicU64>,
peak_memory: Arc<AtomicU64>,
circuit_breaker_state: Arc<RwLock<CircuitBreakerState>>,
last_cleanup: Arc<RwLock<Instant>>,
resource_trackers: Arc<RwLock<HashMap<String, u64>>>,
}
impl ResourceManager {
pub fn new() -> Self {
Self::with_config(ResourceConfig::default())
}
pub fn with_config(config: ResourceConfig) -> Self {
Self {
config,
memory_usage: Arc::new(AtomicU64::new(0)),
active_keys: Arc::new(AtomicUsize::new(0)),
buffer_overflow_count: Arc::new(AtomicU64::new(0)),
circuit_breaker_trips: Arc::new(AtomicU64::new(0)),
emergency_cleanups: Arc::new(AtomicU64::new(0)),
peak_memory: Arc::new(AtomicU64::new(0)),
circuit_breaker_state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
last_cleanup: Arc::new(RwLock::new(Instant::now())),
resource_trackers: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn track_memory_allocation(&self, bytes: u64) -> Result<(), ResourceError> {
let current = self.memory_usage.fetch_add(bytes, Ordering::Relaxed) + bytes;
let mut peak = self.peak_memory.load(Ordering::Relaxed);
while current > peak {
match self.peak_memory.compare_exchange_weak(
peak, current, Ordering::Relaxed, Ordering::Relaxed
) {
Ok(_) => break,
Err(new_peak) => peak = new_peak,
}
}
if current > self.config.max_memory_bytes * self.config.memory_threshold_percent as u64 / 100 {
self.trip_circuit_breaker().await;
}
if current > self.config.max_memory_bytes * self.config.emergency_cleanup_threshold as u64 / 100 {
self.emergency_cleanup().await;
}
Ok(())
}
pub async fn track_memory_deallocation(&self, bytes: u64) {
self.memory_usage.fetch_sub(bytes, Ordering::Relaxed);
}
pub async fn track_key_creation(&self) -> Result<(), ResourceError> {
let current_keys = self.active_keys.fetch_add(1, Ordering::Relaxed) + 1;
if current_keys > self.config.max_keys {
self.active_keys.fetch_sub(1, Ordering::Relaxed);
return Err(ResourceError::MaxKeysExceeded);
}
Ok(())
}
pub async fn track_key_removal(&self) {
self.active_keys.fetch_sub(1, Ordering::Relaxed);
}
pub async fn track_buffer_overflow(&self) -> Result<(), ResourceError> {
self.buffer_overflow_count.fetch_add(1, Ordering::Relaxed);
let overflow_count = self.buffer_overflow_count.load(Ordering::Relaxed);
if overflow_count > self.config.buffer_overflow_threshold as u64 {
self.trip_circuit_breaker().await;
return Err(ResourceError::BufferOverflowThresholdExceeded);
}
Ok(())
}
pub async fn get_metrics(&self) -> ResourceMetrics {
ResourceMetrics {
current_memory_bytes: self.memory_usage.load(Ordering::Relaxed),
peak_memory_bytes: self.peak_memory.load(Ordering::Relaxed),
active_keys: self.active_keys.load(Ordering::Relaxed),
buffer_overflow_count: self.buffer_overflow_count.load(Ordering::Relaxed),
circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
emergency_cleanups: self.emergency_cleanups.load(Ordering::Relaxed),
}
}
pub async fn is_circuit_open(&self) -> bool {
let state = self.circuit_breaker_state.read().await;
*state == CircuitBreakerState::Open
}
async fn trip_circuit_breaker(&self) {
let mut state = self.circuit_breaker_state.write().await;
if *state == CircuitBreakerState::Closed {
*state = CircuitBreakerState::Open;
self.circuit_breaker_trips.fetch_add(1, Ordering::Relaxed);
}
}
pub async fn reset_circuit_breaker(&self) {
let mut state = self.circuit_breaker_state.write().await;
*state = CircuitBreakerState::HalfOpen;
}
pub async fn close_circuit_breaker(&self) {
let mut state = self.circuit_breaker_state.write().await;
*state = CircuitBreakerState::Closed;
}
async fn emergency_cleanup(&self) {
self.emergency_cleanups.fetch_add(1, Ordering::Relaxed);
log::warn!("Emergency cleanup triggered - memory pressure detected");
self.perform_cleanup().await;
self.perform_emergency_measures().await;
if self.is_under_memory_pressure().await {
log::error!("Memory pressure persists after emergency cleanup - tripping circuit breaker");
self.trip_circuit_breaker().await;
}
}
async fn perform_emergency_measures(&self) {
let mut trackers = self.resource_trackers.write().await;
trackers.clear();
drop(trackers);
log::info!("Emergency measures completed - cleared non-essential resources");
}
pub async fn periodic_cleanup(&self) {
let mut last_cleanup = self.last_cleanup.write().await;
if last_cleanup.elapsed() >= self.config.cleanup_interval {
self.perform_cleanup().await;
*last_cleanup = Instant::now();
}
}
async fn perform_cleanup(&self) {
log::debug!("Performing periodic cleanup");
let mut trackers = self.resource_trackers.write().await;
let before_count = trackers.len();
if trackers.len() > 1000 {
let to_remove: Vec<String> = trackers.keys().take(100).cloned().collect();
for key in to_remove {
trackers.remove(&key);
}
}
let after_count = trackers.len();
drop(trackers);
if before_count != after_count {
log::info!("Cleanup completed: removed {} resources", before_count - after_count);
}
let overflow_count = self.buffer_overflow_count.load(Ordering::Relaxed);
if overflow_count > 1000 {
self.buffer_overflow_count.store(0, Ordering::Relaxed);
log::info!("Reset buffer overflow counter");
}
}
pub async fn track_resource(&self, resource_id: String, size_bytes: u64) {
let mut trackers = self.resource_trackers.write().await;
trackers.insert(resource_id, size_bytes);
}
pub async fn untrack_resource(&self, resource_id: &str) -> Option<u64> {
let mut trackers = self.resource_trackers.write().await;
trackers.remove(resource_id)
}
pub async fn get_memory_pressure(&self) -> u8 {
let current = self.memory_usage.load(Ordering::Relaxed);
let max = self.config.max_memory_bytes;
((current * 100) / max) as u8
}
pub async fn is_under_memory_pressure(&self) -> bool {
let pressure = self.get_memory_pressure().await;
pressure > self.config.memory_threshold_percent
}
}
#[derive(Debug, thiserror::Error)]
pub enum ResourceError {
#[error("Maximum memory usage exceeded")]
MaxMemoryExceeded,
#[error("Maximum number of keys exceeded")]
MaxKeysExceeded,
#[error("Buffer overflow threshold exceeded")]
BufferOverflowThresholdExceeded,
#[error("Circuit breaker is open")]
CircuitBreakerOpen,
#[error("Resource allocation failed: {0}")]
AllocationFailed(String),
}
lazy_static::lazy_static! {
pub static ref GLOBAL_RESOURCE_MANAGER: Arc<ResourceManager> = Arc::new(ResourceManager::new());
}
pub fn get_global_resource_manager() -> Arc<ResourceManager> {
GLOBAL_RESOURCE_MANAGER.clone()
}