1use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use dashmap::DashMap;
15
16pub const BP_LOW_THRESHOLD: f32 = 0.3;
18pub const BP_MEDIUM_THRESHOLD: f32 = 0.5;
19pub const BP_HIGH_THRESHOLD: f32 = 0.7;
20pub const BP_CRITICAL_THRESHOLD: f32 = 0.9;
21
22const BP_SCALE: u32 = 1000;
24
25#[derive(Debug)]
29pub struct BackpressureState {
30 value: AtomicU32,
32 last_update: AtomicU64,
34 active_clients: AtomicUsize,
36 queue_depth: AtomicUsize,
38 rejected_count: AtomicU64,
40}
41
42impl BackpressureState {
43 pub fn new() -> Self {
45 Self {
46 value: AtomicU32::new(0),
47 last_update: AtomicU64::new(0),
48 active_clients: AtomicUsize::new(0),
49 queue_depth: AtomicUsize::new(0),
50 rejected_count: AtomicU64::new(0),
51 }
52 }
53
54 pub fn get(&self) -> f32 {
56 self.value.load(Ordering::Acquire) as f32 / BP_SCALE as f32
57 }
58
59 pub fn set(&self, value: f32) {
61 let scaled = (value.clamp(0.0, 1.0) * BP_SCALE as f32) as u32;
62 self.value.store(scaled, Ordering::Release);
63
64 let now = Instant::now().duration_since(Instant::now()).as_secs();
66 self.last_update.store(now, Ordering::Relaxed);
67 }
68
69 pub fn record_rejection(&self) {
71 self.rejected_count.fetch_add(1, Ordering::Relaxed);
72 }
73
74 pub fn rejected_count(&self) -> u64 {
76 self.rejected_count.load(Ordering::Acquire)
77 }
78
79 pub fn active_clients(&self) -> usize {
81 self.active_clients.load(Ordering::Acquire)
82 }
83
84 pub fn queue_depth(&self) -> usize {
86 self.queue_depth.load(Ordering::Acquire)
87 }
88
89 pub fn level(&self) -> &'static str {
91 let bp = self.get();
92 if bp < BP_LOW_THRESHOLD {
93 "Low"
94 } else if bp < BP_MEDIUM_THRESHOLD {
95 "Medium"
96 } else if bp < BP_HIGH_THRESHOLD {
97 "High"
98 } else if bp < BP_CRITICAL_THRESHOLD {
99 "Very High"
100 } else {
101 "Critical"
102 }
103 }
104}
105
106impl Default for BackpressureState {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct ModuleQuota {
115 pub max_calls_per_sec: u32,
117 pub max_in_flight: usize,
119 pub max_memory_bytes: usize,
121}
122
123impl Default for ModuleQuota {
124 fn default() -> Self {
125 Self {
126 max_calls_per_sec: 1000,
127 max_in_flight: 100,
128 max_memory_bytes: 64 * 1024 * 1024, }
130 }
131}
132
133#[derive(Debug)]
135pub struct ModuleResources {
136 module_name: String,
138 calls_this_sec: AtomicU32,
140 in_flight: AtomicUsize,
142 memory_bytes: AtomicUsize,
144 quota: ModuleQuota,
146}
147
148impl ModuleResources {
149 pub fn new(module_name: String, quota: ModuleQuota) -> Self {
151 Self {
152 module_name,
153 calls_this_sec: AtomicU32::new(0),
154 in_flight: AtomicUsize::new(0),
155 memory_bytes: AtomicUsize::new(0),
156 quota,
157 }
158 }
159
160 pub fn can_admit(&self) -> bool {
162 if self.in_flight.load(Ordering::Acquire) >= self.quota.max_in_flight {
164 return false;
165 }
166
167 let calls = self.calls_this_sec.load(Ordering::Acquire);
169 if calls >= self.quota.max_calls_per_sec {
170 return false;
171 }
172
173 let memory = self.memory_bytes.load(Ordering::Acquire);
175 if memory >= self.quota.max_memory_bytes {
176 return false;
177 }
178
179 true
180 }
181
182 pub fn record_call_start(&self) -> bool {
184 if !self.can_admit() {
185 return false;
186 }
187
188 self.in_flight.fetch_add(1, Ordering::AcqRel);
189 self.calls_this_sec.fetch_add(1, Ordering::Relaxed);
190 true
191 }
192
193 pub fn record_call_end(&self) {
195 self.in_flight.fetch_sub(1, Ordering::AcqRel);
196 }
197
198 pub fn set_memory_usage(&self, bytes: usize) {
200 self.memory_bytes.store(bytes, Ordering::Release);
201 }
202
203 pub fn reset_second_counter(&self) {
205 self.calls_this_sec.store(0, Ordering::Relaxed);
206 }
207
208 pub fn in_flight(&self) -> usize {
210 self.in_flight.load(Ordering::Acquire)
211 }
212
213 pub fn memory_usage(&self) -> usize {
215 self.memory_bytes.load(Ordering::Acquire)
216 }
217
218 pub fn module_name(&self) -> &str {
220 &self.module_name
221 }
222}
223
224#[derive(Debug)]
229pub struct ResourceManager {
230 pub backpressure: Arc<BackpressureState>,
232 modules: DashMap<String, Arc<ModuleResources>>,
234 default_quota: ModuleQuota,
236}
237
238impl ResourceManager {
239 pub fn new() -> Self {
241 Self {
242 backpressure: Arc::new(BackpressureState::new()),
243 modules: DashMap::new(),
244 default_quota: ModuleQuota::default(),
245 }
246 }
247
248 pub fn with_default_quota(quota: ModuleQuota) -> Self {
250 Self {
251 backpressure: Arc::new(BackpressureState::new()),
252 modules: DashMap::new(),
253 default_quota: quota,
254 }
255 }
256
257 pub fn get_or_create_module(&self, module_name: &str) -> Arc<ModuleResources> {
259 self.modules
260 .entry(module_name.to_string())
261 .or_insert_with(|| {
262 Arc::new(ModuleResources::new(
263 module_name.to_string(),
264 self.default_quota.clone(),
265 ))
266 })
267 .clone()
268 }
269
270 pub fn register_module(&self, module_name: &str, quota: ModuleQuota) {
272 self.modules.insert(
273 module_name.to_string(),
274 Arc::new(ModuleResources::new(module_name.to_string(), quota)),
275 );
276 }
277
278 pub fn can_admit_request(&self, module_name: &str) -> bool {
280 let bp = self.backpressure.get();
282 if bp >= BP_CRITICAL_THRESHOLD {
283 self.backpressure.record_rejection();
284 return false;
285 }
286
287 let module = self.get_or_create_module(module_name);
289 if !module.can_admit() {
290 self.backpressure.record_rejection();
291 return false;
292 }
293
294 true
295 }
296
297 pub fn record_request_start(&self, module_name: &str) -> bool {
299 if !self.can_admit_request(module_name) {
300 return false;
301 }
302
303 let module = self.get_or_create_module(module_name);
304 module.record_call_start();
305
306 self.backpressure
308 .queue_depth
309 .fetch_add(1, Ordering::Relaxed);
310
311 true
312 }
313
314 pub fn record_request_end(&self, module_name: &str) {
316 let module = self.get_or_create_module(module_name);
317 module.record_call_end();
318
319 self.backpressure
321 .queue_depth
322 .fetch_sub(1, Ordering::Relaxed);
323 }
324
325 pub fn update_backpressure(&self) {
329 let queue_depth = self.backpressure.queue_depth.load(Ordering::Acquire);
330 let active_clients = self.backpressure.active_clients.load(Ordering::Acquire);
331
332 let queue_factor = (queue_depth as f32 / 1000.0).min(1.0);
334 let client_factor = (active_clients as f32 / 100.0).min(1.0);
335
336 let backpressure = queue_factor * 0.7 + client_factor * 0.3;
338
339 self.backpressure.set(backpressure);
340 }
341
342 pub fn client_connected(&self) {
344 self.backpressure
345 .active_clients
346 .fetch_add(1, Ordering::AcqRel);
347 }
348
349 pub fn client_disconnected(&self) {
351 self.backpressure
352 .active_clients
353 .fetch_sub(1, Ordering::AcqRel);
354 }
355
356 pub fn stats(&self) -> ResourceManagerStats {
358 let mut total_in_flight = 0;
359 let mut total_memory = 0;
360
361 for entry in self.modules.iter() {
362 total_in_flight += entry.value().in_flight();
363 total_memory += entry.value().memory_usage();
364 }
365
366 ResourceManagerStats {
367 backpressure: self.backpressure.get(),
368 active_clients: self.backpressure.active_clients(),
369 queue_depth: self.backpressure.queue_depth(),
370 total_in_flight,
371 total_memory,
372 module_count: self.modules.len(),
373 }
374 }
375
376 pub fn spawn_background_task(&self) -> std::thread::JoinHandle<()> {
378 let _backpressure = Arc::clone(&self.backpressure);
379 let modules = self
380 .modules
381 .iter()
382 .map(|e| e.key().clone())
383 .collect::<Vec<_>>();
384 let modules = Arc::new(modules);
385
386 std::thread::spawn(move || {
387 let mut last_second = Instant::now();
388 let mut last_update = Instant::now();
389
390 loop {
391 let now = Instant::now();
392
393 if now.duration_since(last_second) >= Duration::from_secs(1) {
395 for module_name in modules.iter() {
396 let _ = module_name;
399 }
400 last_second = now;
401 }
402
403 if now.duration_since(last_update) >= Duration::from_millis(100) {
405 last_update = now;
408 }
409
410 std::thread::sleep(Duration::from_millis(10));
411 }
412 })
413 }
414}
415
416impl Default for ResourceManager {
417 fn default() -> Self {
418 Self::new()
419 }
420}
421
422#[derive(Debug, Clone)]
424pub struct ResourceManagerStats {
425 pub backpressure: f32,
426 pub active_clients: usize,
427 pub queue_depth: usize,
428 pub total_in_flight: usize,
429 pub total_memory: usize,
430 pub module_count: usize,
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 #[test]
438 fn test_backpressure_state() {
439 let bp = BackpressureState::new();
440 assert_eq!(bp.get(), 0.0);
441
442 bp.set(0.5);
443 assert_eq!(bp.get(), 0.5);
444
445 bp.set(1.0);
446 assert_eq!(bp.get(), 1.0);
447
448 bp.set(1.5); assert_eq!(bp.get(), 1.0);
450 }
451
452 #[test]
453 fn test_module_quota_enforcement() {
454 let quota = ModuleQuota {
455 max_calls_per_sec: 10,
456 max_in_flight: 5,
457 max_memory_bytes: 1024,
458 };
459
460 let module = ModuleResources::new("test".to_string(), quota);
461
462 for i in 0..5 {
464 assert!(module.record_call_start(), "Call {} should be admitted", i);
465 }
466
467 assert!(!module.record_call_start());
469
470 module.record_call_end();
472
473 assert!(module.record_call_start());
475 }
476
477 #[test]
478 fn test_resource_manager_backpressure() {
479 let rm = ResourceManager::new();
480
481 assert_eq!(rm.backpressure.get(), 0.0);
483
484 rm.record_request_start("module1");
486 rm.record_request_start("module2");
487 rm.record_request_start("module1");
488
489 rm.update_backpressure();
491
492 assert!(rm.backpressure.get() > 0.0);
494
495 rm.record_request_end("module1");
497 rm.record_request_end("module2");
498 rm.record_request_end("module1");
499
500 rm.update_backpressure();
502
503 assert!(rm.backpressure.get() < 0.1);
505 }
506
507 #[test]
508 fn test_client_tracking() {
509 let rm = ResourceManager::new();
510 assert_eq!(rm.backpressure.active_clients(), 0);
511
512 rm.client_connected();
513 rm.client_connected();
514 assert_eq!(rm.backpressure.active_clients(), 2);
515
516 rm.client_disconnected();
517 assert_eq!(rm.backpressure.active_clients(), 1);
518 }
519}