rs2_stream/
resource_manager.rs1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use std::collections::HashMap;
5use std::time::{Duration, Instant};
6
7#[derive(Debug, Clone)]
9pub struct ResourceConfig {
10 pub max_memory_bytes: u64,
12 pub max_keys: usize,
14 pub memory_threshold_percent: u8,
16 pub buffer_overflow_threshold: usize,
18 pub cleanup_interval: Duration,
20 pub emergency_cleanup_threshold: u8,
22}
23
24impl Default for ResourceConfig {
25 fn default() -> Self {
26 Self {
27 max_memory_bytes: 1024 * 1024 * 1024, max_keys: 100_000,
29 memory_threshold_percent: 80,
30 buffer_overflow_threshold: 10_000,
31 cleanup_interval: Duration::from_secs(30),
32 emergency_cleanup_threshold: 95,
33 }
34 }
35}
36
37#[derive(Debug, Clone, PartialEq)]
39pub enum CircuitBreakerState {
40 Closed,
41 Open,
42 HalfOpen,
43}
44
45#[derive(Debug, Clone)]
47pub struct ResourceMetrics {
48 pub current_memory_bytes: u64,
49 pub peak_memory_bytes: u64,
50 pub active_keys: usize,
51 pub buffer_overflow_count: u64,
52 pub circuit_breaker_trips: u64,
53 pub emergency_cleanups: u64,
54}
55
56pub struct ResourceManager {
58 config: ResourceConfig,
59 memory_usage: Arc<AtomicU64>,
60 active_keys: Arc<AtomicUsize>,
61 buffer_overflow_count: Arc<AtomicU64>,
62 circuit_breaker_trips: Arc<AtomicU64>,
63 emergency_cleanups: Arc<AtomicU64>,
64 peak_memory: Arc<AtomicU64>,
65 circuit_breaker_state: Arc<RwLock<CircuitBreakerState>>,
66 last_cleanup: Arc<RwLock<Instant>>,
67 resource_trackers: Arc<RwLock<HashMap<String, u64>>>,
68}
69
70impl ResourceManager {
71 pub fn new() -> Self {
73 Self::with_config(ResourceConfig::default())
74 }
75
76 pub fn with_config(config: ResourceConfig) -> Self {
78 Self {
79 config,
80 memory_usage: Arc::new(AtomicU64::new(0)),
81 active_keys: Arc::new(AtomicUsize::new(0)),
82 buffer_overflow_count: Arc::new(AtomicU64::new(0)),
83 circuit_breaker_trips: Arc::new(AtomicU64::new(0)),
84 emergency_cleanups: Arc::new(AtomicU64::new(0)),
85 peak_memory: Arc::new(AtomicU64::new(0)),
86 circuit_breaker_state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
87 last_cleanup: Arc::new(RwLock::new(Instant::now())),
88 resource_trackers: Arc::new(RwLock::new(HashMap::new())),
89 }
90 }
91
92 pub async fn track_memory_allocation(&self, bytes: u64) -> Result<(), ResourceError> {
94 let current = self.memory_usage.fetch_add(bytes, Ordering::Relaxed) + bytes;
95
96 let mut peak = self.peak_memory.load(Ordering::Relaxed);
98 while current > peak {
99 match self.peak_memory.compare_exchange_weak(
100 peak, current, Ordering::Relaxed, Ordering::Relaxed
101 ) {
102 Ok(_) => break,
103 Err(new_peak) => peak = new_peak,
104 }
105 }
106
107 if current > self.config.max_memory_bytes * self.config.memory_threshold_percent as u64 / 100 {
109 self.trip_circuit_breaker().await;
110 }
111
112 if current > self.config.max_memory_bytes * self.config.emergency_cleanup_threshold as u64 / 100 {
114 self.emergency_cleanup().await;
115 }
116
117 Ok(())
118 }
119
120 pub async fn track_memory_deallocation(&self, bytes: u64) {
122 self.memory_usage.fetch_sub(bytes, Ordering::Relaxed);
123 }
124
125 pub async fn track_key_creation(&self) -> Result<(), ResourceError> {
127 let current_keys = self.active_keys.fetch_add(1, Ordering::Relaxed) + 1;
128
129 if current_keys > self.config.max_keys {
130 self.active_keys.fetch_sub(1, Ordering::Relaxed);
131 return Err(ResourceError::MaxKeysExceeded);
132 }
133
134 Ok(())
135 }
136
137 pub async fn track_key_removal(&self) {
139 self.active_keys.fetch_sub(1, Ordering::Relaxed);
140 }
141
142 pub async fn track_buffer_overflow(&self) -> Result<(), ResourceError> {
144 self.buffer_overflow_count.fetch_add(1, Ordering::Relaxed);
145
146 let overflow_count = self.buffer_overflow_count.load(Ordering::Relaxed);
147 if overflow_count > self.config.buffer_overflow_threshold as u64 {
148 self.trip_circuit_breaker().await;
149 return Err(ResourceError::BufferOverflowThresholdExceeded);
150 }
151
152 Ok(())
153 }
154
155 pub async fn get_metrics(&self) -> ResourceMetrics {
157 ResourceMetrics {
158 current_memory_bytes: self.memory_usage.load(Ordering::Relaxed),
159 peak_memory_bytes: self.peak_memory.load(Ordering::Relaxed),
160 active_keys: self.active_keys.load(Ordering::Relaxed),
161 buffer_overflow_count: self.buffer_overflow_count.load(Ordering::Relaxed),
162 circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
163 emergency_cleanups: self.emergency_cleanups.load(Ordering::Relaxed),
164 }
165 }
166
167 pub async fn is_circuit_open(&self) -> bool {
169 let state = self.circuit_breaker_state.read().await;
170 *state == CircuitBreakerState::Open
171 }
172
173 async fn trip_circuit_breaker(&self) {
175 let mut state = self.circuit_breaker_state.write().await;
176 if *state == CircuitBreakerState::Closed {
177 *state = CircuitBreakerState::Open;
178 self.circuit_breaker_trips.fetch_add(1, Ordering::Relaxed);
179 }
180 }
181
182 pub async fn reset_circuit_breaker(&self) {
184 let mut state = self.circuit_breaker_state.write().await;
185 *state = CircuitBreakerState::HalfOpen;
186 }
187
188 pub async fn close_circuit_breaker(&self) {
190 let mut state = self.circuit_breaker_state.write().await;
191 *state = CircuitBreakerState::Closed;
192 }
193
194 async fn emergency_cleanup(&self) {
196 self.emergency_cleanups.fetch_add(1, Ordering::Relaxed);
197 log::warn!("Emergency cleanup triggered - memory pressure detected");
198 self.perform_cleanup().await;
199 self.perform_emergency_measures().await;
200 if self.is_under_memory_pressure().await {
201 log::error!("Memory pressure persists after emergency cleanup - tripping circuit breaker");
202 self.trip_circuit_breaker().await;
203 }
204 }
205
206 async fn perform_emergency_measures(&self) {
208 let mut trackers = self.resource_trackers.write().await;
209 trackers.clear();
210 drop(trackers);
211 log::info!("Emergency measures completed - cleared non-essential resources");
212 }
213
214 pub async fn periodic_cleanup(&self) {
216 let mut last_cleanup = self.last_cleanup.write().await;
217 if last_cleanup.elapsed() >= self.config.cleanup_interval {
218 self.perform_cleanup().await;
219 *last_cleanup = Instant::now();
220 }
221 }
222
223 async fn perform_cleanup(&self) {
225 log::debug!("Performing periodic cleanup");
226
227 let mut trackers = self.resource_trackers.write().await;
228 let before_count = trackers.len();
229
230
231 if trackers.len() > 1000 {
232 let to_remove: Vec<String> = trackers.keys().take(100).cloned().collect();
233 for key in to_remove {
234 trackers.remove(&key);
235 }
236 }
237
238 let after_count = trackers.len();
239 drop(trackers);
240
241 if before_count != after_count {
242 log::info!("Cleanup completed: removed {} resources", before_count - after_count);
243 }
244
245 let overflow_count = self.buffer_overflow_count.load(Ordering::Relaxed);
246 if overflow_count > 1000 {
247 self.buffer_overflow_count.store(0, Ordering::Relaxed);
248 log::info!("Reset buffer overflow counter");
249 }
250 }
251
252 pub async fn track_resource(&self, resource_id: String, size_bytes: u64) {
254 let mut trackers = self.resource_trackers.write().await;
255 trackers.insert(resource_id, size_bytes);
256 }
257
258 pub async fn untrack_resource(&self, resource_id: &str) -> Option<u64> {
260 let mut trackers = self.resource_trackers.write().await;
261 trackers.remove(resource_id)
262 }
263
264 pub async fn get_memory_pressure(&self) -> u8 {
266 let current = self.memory_usage.load(Ordering::Relaxed);
267 let max = self.config.max_memory_bytes;
268 ((current * 100) / max) as u8
269 }
270
271 pub async fn is_under_memory_pressure(&self) -> bool {
273 let pressure = self.get_memory_pressure().await;
274 pressure > self.config.memory_threshold_percent
275 }
276}
277
278#[derive(Debug, thiserror::Error)]
280pub enum ResourceError {
281 #[error("Maximum memory usage exceeded")]
282 MaxMemoryExceeded,
283 #[error("Maximum number of keys exceeded")]
284 MaxKeysExceeded,
285 #[error("Buffer overflow threshold exceeded")]
286 BufferOverflowThresholdExceeded,
287 #[error("Circuit breaker is open")]
288 CircuitBreakerOpen,
289 #[error("Resource allocation failed: {0}")]
290 AllocationFailed(String),
291}
292
293lazy_static::lazy_static! {
295 pub static ref GLOBAL_RESOURCE_MANAGER: Arc<ResourceManager> = Arc::new(ResourceManager::new());
296}
297
298pub fn get_global_resource_manager() -> Arc<ResourceManager> {
300 GLOBAL_RESOURCE_MANAGER.clone()
301}