rs2_stream/
resource_manager.rs

1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use std::collections::HashMap;
5use std::time::{Duration, Instant};
6
7/// Resource management configuration
8#[derive(Debug, Clone)]
9pub struct ResourceConfig {
10    /// Maximum memory usage in bytes
11    pub max_memory_bytes: u64,
12    /// Maximum number of active keys/streams
13    pub max_keys: usize,
14    /// Memory threshold for circuit breaker (percentage of max_memory)
15    pub memory_threshold_percent: u8,
16    /// Buffer overflow threshold
17    pub buffer_overflow_threshold: usize,
18    /// Cleanup interval
19    pub cleanup_interval: Duration,
20    /// Emergency cleanup trigger threshold (percentage of max_memory)
21    pub emergency_cleanup_threshold: u8,
22}
23
24impl Default for ResourceConfig {
25    fn default() -> Self {
26        Self {
27            max_memory_bytes: 1024 * 1024 * 1024, // 1GB
28            max_keys: 100_000,
29            memory_threshold_percent: 80,
30            buffer_overflow_threshold: 10_000,
31            cleanup_interval: Duration::from_secs(30),
32            emergency_cleanup_threshold: 95,
33        }
34    }
35}
36
37/// Circuit breaker states
38#[derive(Debug, Clone, PartialEq)]
39pub enum CircuitBreakerState {
40    Closed,
41    Open,
42    HalfOpen,
43}
44
45/// Resource usage metrics
46#[derive(Debug, Clone)]
47pub struct ResourceMetrics {
48    pub current_memory_bytes: u64,
49    pub peak_memory_bytes: u64,
50    pub active_keys: usize,
51    pub buffer_overflow_count: u64,
52    pub circuit_breaker_trips: u64,
53    pub emergency_cleanups: u64,
54}
55
56/// Production-grade resource manager for RS2 streaming library
57pub struct ResourceManager {
58    config: ResourceConfig,
59    memory_usage: Arc<AtomicU64>,
60    active_keys: Arc<AtomicUsize>,
61    buffer_overflow_count: Arc<AtomicU64>,
62    circuit_breaker_trips: Arc<AtomicU64>,
63    emergency_cleanups: Arc<AtomicU64>,
64    peak_memory: Arc<AtomicU64>,
65    circuit_breaker_state: Arc<RwLock<CircuitBreakerState>>,
66    last_cleanup: Arc<RwLock<Instant>>,
67    resource_trackers: Arc<RwLock<HashMap<String, u64>>>,
68}
69
70impl ResourceManager {
71    /// Create a new resource manager with default configuration
72    pub fn new() -> Self {
73        Self::with_config(ResourceConfig::default())
74    }
75
76    /// Create a new resource manager with custom configuration
77    pub fn with_config(config: ResourceConfig) -> Self {
78        Self {
79            config,
80            memory_usage: Arc::new(AtomicU64::new(0)),
81            active_keys: Arc::new(AtomicUsize::new(0)),
82            buffer_overflow_count: Arc::new(AtomicU64::new(0)),
83            circuit_breaker_trips: Arc::new(AtomicU64::new(0)),
84            emergency_cleanups: Arc::new(AtomicU64::new(0)),
85            peak_memory: Arc::new(AtomicU64::new(0)),
86            circuit_breaker_state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
87            last_cleanup: Arc::new(RwLock::new(Instant::now())),
88            resource_trackers: Arc::new(RwLock::new(HashMap::new())),
89        }
90    }
91
92    /// Track memory allocation
93    pub async fn track_memory_allocation(&self, bytes: u64) -> Result<(), ResourceError> {
94        let current = self.memory_usage.fetch_add(bytes, Ordering::Relaxed) + bytes;
95        
96        // Update peak memory
97        let mut peak = self.peak_memory.load(Ordering::Relaxed);
98        while current > peak {
99            match self.peak_memory.compare_exchange_weak(
100                peak, current, Ordering::Relaxed, Ordering::Relaxed
101            ) {
102                Ok(_) => break,
103                Err(new_peak) => peak = new_peak,
104            }
105        }
106
107        // Check memory threshold
108        if current > self.config.max_memory_bytes * self.config.memory_threshold_percent as u64 / 100 {
109            self.trip_circuit_breaker().await;
110        }
111
112        // Emergency cleanup if needed
113        if current > self.config.max_memory_bytes * self.config.emergency_cleanup_threshold as u64 / 100 {
114            self.emergency_cleanup().await;
115        }
116
117        Ok(())
118    }
119
120    /// Track memory deallocation
121    pub async fn track_memory_deallocation(&self, bytes: u64) {
122        self.memory_usage.fetch_sub(bytes, Ordering::Relaxed);
123    }
124
125    /// Track key creation
126    pub async fn track_key_creation(&self) -> Result<(), ResourceError> {
127        let current_keys = self.active_keys.fetch_add(1, Ordering::Relaxed) + 1;
128        
129        if current_keys > self.config.max_keys {
130            self.active_keys.fetch_sub(1, Ordering::Relaxed);
131            return Err(ResourceError::MaxKeysExceeded);
132        }
133
134        Ok(())
135    }
136
137    /// Track key removal
138    pub async fn track_key_removal(&self) {
139        self.active_keys.fetch_sub(1, Ordering::Relaxed);
140    }
141
142    /// Track buffer overflow
143    pub async fn track_buffer_overflow(&self) -> Result<(), ResourceError> {
144        self.buffer_overflow_count.fetch_add(1, Ordering::Relaxed);
145        
146        let overflow_count = self.buffer_overflow_count.load(Ordering::Relaxed);
147        if overflow_count > self.config.buffer_overflow_threshold as u64 {
148            self.trip_circuit_breaker().await;
149            return Err(ResourceError::BufferOverflowThresholdExceeded);
150        }
151
152        Ok(())
153    }
154
155    /// Get current resource metrics
156    pub async fn get_metrics(&self) -> ResourceMetrics {
157        ResourceMetrics {
158            current_memory_bytes: self.memory_usage.load(Ordering::Relaxed),
159            peak_memory_bytes: self.peak_memory.load(Ordering::Relaxed),
160            active_keys: self.active_keys.load(Ordering::Relaxed),
161            buffer_overflow_count: self.buffer_overflow_count.load(Ordering::Relaxed),
162            circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
163            emergency_cleanups: self.emergency_cleanups.load(Ordering::Relaxed),
164        }
165    }
166
167    /// Check if circuit breaker is open
168    pub async fn is_circuit_open(&self) -> bool {
169        let state = self.circuit_breaker_state.read().await;
170        *state == CircuitBreakerState::Open
171    }
172
173    /// Trip the circuit breaker
174    async fn trip_circuit_breaker(&self) {
175        let mut state = self.circuit_breaker_state.write().await;
176        if *state == CircuitBreakerState::Closed {
177            *state = CircuitBreakerState::Open;
178            self.circuit_breaker_trips.fetch_add(1, Ordering::Relaxed);
179        }
180    }
181
182    /// Reset circuit breaker to half-open state
183    pub async fn reset_circuit_breaker(&self) {
184        let mut state = self.circuit_breaker_state.write().await;
185        *state = CircuitBreakerState::HalfOpen;
186    }
187
188    /// Close circuit breaker
189    pub async fn close_circuit_breaker(&self) {
190        let mut state = self.circuit_breaker_state.write().await;
191        *state = CircuitBreakerState::Closed;
192    }
193
194    /// Perform emergency cleanup
195    async fn emergency_cleanup(&self) {
196        self.emergency_cleanups.fetch_add(1, Ordering::Relaxed);
197        log::warn!("Emergency cleanup triggered - memory pressure detected");
198        self.perform_cleanup().await;
199        self.perform_emergency_measures().await;
200        if self.is_under_memory_pressure().await {
201            log::error!("Memory pressure persists after emergency cleanup - tripping circuit breaker");
202            self.trip_circuit_breaker().await;
203        }
204    }
205
206    /// Perform emergency measures for severe memory pressure
207    async fn perform_emergency_measures(&self) {
208        let mut trackers = self.resource_trackers.write().await;
209        trackers.clear();
210        drop(trackers);
211        log::info!("Emergency measures completed - cleared non-essential resources");
212    }
213
214    /// Perform periodic cleanup
215    pub async fn periodic_cleanup(&self) {
216        let mut last_cleanup = self.last_cleanup.write().await;
217        if last_cleanup.elapsed() >= self.config.cleanup_interval {
218            self.perform_cleanup().await;
219            *last_cleanup = Instant::now();
220        }
221    }
222
223    /// Perform actual cleanup operations
224    async fn perform_cleanup(&self) {
225        log::debug!("Performing periodic cleanup");
226        
227        let mut trackers = self.resource_trackers.write().await;
228        let before_count = trackers.len();
229        
230       
231        if trackers.len() > 1000 {
232            let to_remove: Vec<String> = trackers.keys().take(100).cloned().collect();
233            for key in to_remove {
234                trackers.remove(&key);
235            }
236        }
237        
238        let after_count = trackers.len();
239        drop(trackers);
240        
241        if before_count != after_count {
242            log::info!("Cleanup completed: removed {} resources", before_count - after_count);
243        }
244        
245        let overflow_count = self.buffer_overflow_count.load(Ordering::Relaxed);
246        if overflow_count > 1000 {
247            self.buffer_overflow_count.store(0, Ordering::Relaxed);
248            log::info!("Reset buffer overflow counter");
249        }
250    }
251
252    /// Track a specific resource
253    pub async fn track_resource(&self, resource_id: String, size_bytes: u64) {
254        let mut trackers = self.resource_trackers.write().await;
255        trackers.insert(resource_id, size_bytes);
256    }
257
258    /// Untrack a specific resource
259    pub async fn untrack_resource(&self, resource_id: &str) -> Option<u64> {
260        let mut trackers = self.resource_trackers.write().await;
261        trackers.remove(resource_id)
262    }
263
264    /// Get memory pressure level (0-100)
265    pub async fn get_memory_pressure(&self) -> u8 {
266        let current = self.memory_usage.load(Ordering::Relaxed);
267        let max = self.config.max_memory_bytes;
268        ((current * 100) / max) as u8
269    }
270
271    /// Check if system is under memory pressure
272    pub async fn is_under_memory_pressure(&self) -> bool {
273        let pressure = self.get_memory_pressure().await;
274        pressure > self.config.memory_threshold_percent
275    }
276}
277
278/// Resource management errors
279#[derive(Debug, thiserror::Error)]
280pub enum ResourceError {
281    #[error("Maximum memory usage exceeded")]
282    MaxMemoryExceeded,
283    #[error("Maximum number of keys exceeded")]
284    MaxKeysExceeded,
285    #[error("Buffer overflow threshold exceeded")]
286    BufferOverflowThresholdExceeded,
287    #[error("Circuit breaker is open")]
288    CircuitBreakerOpen,
289    #[error("Resource allocation failed: {0}")]
290    AllocationFailed(String),
291}
292
293/// Global resource manager instance
294lazy_static::lazy_static! {
295    pub static ref GLOBAL_RESOURCE_MANAGER: Arc<ResourceManager> = Arc::new(ResourceManager::new());
296}
297
298/// Get the global resource manager
299pub fn get_global_resource_manager() -> Arc<ResourceManager> {
300    GLOBAL_RESOURCE_MANAGER.clone()
301}