Skip to main content

memlink_runtime/
backpressure.rs

1//! Global backpressure management for the runtime.
2//!
3//! This module calculates and broadcasts backpressure signals to all connected clients.
4//! Backpressure is computed from multiple factors:
5//! - Total queue depth across all modules
6//! - CPU and memory usage
7//! - Active client count
8//! - Module-specific quotas
9
10use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14use dashmap::DashMap;
15
16/// Backpressure thresholds for different congestion levels.
17pub const BP_LOW_THRESHOLD: f32 = 0.3;
18pub const BP_MEDIUM_THRESHOLD: f32 = 0.5;
19pub const BP_HIGH_THRESHOLD: f32 = 0.7;
20pub const BP_CRITICAL_THRESHOLD: f32 = 0.9;
21
22/// Maximum backpressure value (scaled to 1000 for atomic operations).
23const BP_SCALE: u32 = 1000;
24
25/// Global backpressure state shared across the runtime.
26///
27/// This is written to the SHM control region for clients to read.
28#[derive(Debug)]
29pub struct BackpressureState {
30    /// Current backpressure value (0-1000, representing 0.0-1.0).
31    value: AtomicU32,
32    /// Timestamp of last calculation.
33    last_update: AtomicU64,
34    /// Number of active clients.
35    active_clients: AtomicUsize,
36    /// Total requests in queue.
37    queue_depth: AtomicUsize,
38    /// Requests rejected due to backpressure.
39    rejected_count: AtomicU64,
40}
41
42impl BackpressureState {
43    /// Creates a new backpressure state.
44    pub fn new() -> Self {
45        Self {
46            value: AtomicU32::new(0),
47            last_update: AtomicU64::new(0),
48            active_clients: AtomicUsize::new(0),
49            queue_depth: AtomicUsize::new(0),
50            rejected_count: AtomicU64::new(0),
51        }
52    }
53
54    /// Returns the current backpressure value (0.0-1.0).
55    pub fn get(&self) -> f32 {
56        self.value.load(Ordering::Acquire) as f32 / BP_SCALE as f32
57    }
58
59    /// Sets the backpressure value (0.0-1.0).
60    pub fn set(&self, value: f32) {
61        let scaled = (value.clamp(0.0, 1.0) * BP_SCALE as f32) as u32;
62        self.value.store(scaled, Ordering::Release);
63
64        // Update timestamp
65        let now = Instant::now().duration_since(Instant::now()).as_secs();
66        self.last_update.store(now, Ordering::Relaxed);
67    }
68
69    /// Increments the rejected request count.
70    pub fn record_rejection(&self) {
71        self.rejected_count.fetch_add(1, Ordering::Relaxed);
72    }
73
74    /// Returns the number of rejected requests.
75    pub fn rejected_count(&self) -> u64 {
76        self.rejected_count.load(Ordering::Acquire)
77    }
78
79    /// Returns the number of active clients.
80    pub fn active_clients(&self) -> usize {
81        self.active_clients.load(Ordering::Acquire)
82    }
83
84    /// Returns the current queue depth.
85    pub fn queue_depth(&self) -> usize {
86        self.queue_depth.load(Ordering::Acquire)
87    }
88
89    /// Returns the backpressure level as a string.
90    pub fn level(&self) -> &'static str {
91        let bp = self.get();
92        if bp < BP_LOW_THRESHOLD {
93            "Low"
94        } else if bp < BP_MEDIUM_THRESHOLD {
95            "Medium"
96        } else if bp < BP_HIGH_THRESHOLD {
97            "High"
98        } else if bp < BP_CRITICAL_THRESHOLD {
99            "Very High"
100        } else {
101            "Critical"
102        }
103    }
104}
105
106impl Default for BackpressureState {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112/// Per-module quota configuration.
113#[derive(Debug, Clone)]
114pub struct ModuleQuota {
115    /// Maximum calls per second.
116    pub max_calls_per_sec: u32,
117    /// Maximum concurrent in-flight requests.
118    pub max_in_flight: usize,
119    /// Maximum memory usage (bytes).
120    pub max_memory_bytes: usize,
121}
122
123impl Default for ModuleQuota {
124    fn default() -> Self {
125        Self {
126            max_calls_per_sec: 1000,
127            max_in_flight: 100,
128            max_memory_bytes: 64 * 1024 * 1024, // 64 MB
129        }
130    }
131}
132
133/// Per-module resource tracking and quota enforcement.
134#[derive(Debug)]
135pub struct ModuleResources {
136    /// Module name/identifier.
137    module_name: String,
138    /// Calls in the current second.
139    calls_this_sec: AtomicU32,
140    /// Current in-flight request count.
141    in_flight: AtomicUsize,
142    /// Current memory usage (bytes).
143    memory_bytes: AtomicUsize,
144    /// Quota configuration.
145    quota: ModuleQuota,
146}
147
148impl ModuleResources {
149    /// Creates a new module resource tracker.
150    pub fn new(module_name: String, quota: ModuleQuota) -> Self {
151        Self {
152            module_name,
153            calls_this_sec: AtomicU32::new(0),
154            in_flight: AtomicUsize::new(0),
155            memory_bytes: AtomicUsize::new(0),
156            quota,
157        }
158    }
159
160    /// Checks if a new call can be admitted.
161    pub fn can_admit(&self) -> bool {
162        // Check in-flight limit
163        if self.in_flight.load(Ordering::Acquire) >= self.quota.max_in_flight {
164            return false;
165        }
166
167        // Check rate limit (simplified - should use sliding window)
168        let calls = self.calls_this_sec.load(Ordering::Acquire);
169        if calls >= self.quota.max_calls_per_sec {
170            return false;
171        }
172
173        // Check memory limit
174        let memory = self.memory_bytes.load(Ordering::Acquire);
175        if memory >= self.quota.max_memory_bytes {
176            return false;
177        }
178
179        true
180    }
181
182    /// Records a new call start.
183    pub fn record_call_start(&self) -> bool {
184        if !self.can_admit() {
185            return false;
186        }
187
188        self.in_flight.fetch_add(1, Ordering::AcqRel);
189        self.calls_this_sec.fetch_add(1, Ordering::Relaxed);
190        true
191    }
192
193    /// Records a call completion.
194    pub fn record_call_end(&self) {
195        self.in_flight.fetch_sub(1, Ordering::AcqRel);
196    }
197
198    /// Updates memory usage.
199    pub fn set_memory_usage(&self, bytes: usize) {
200        self.memory_bytes.store(bytes, Ordering::Release);
201    }
202
203    /// Resets the per-second counter (called by a background task).
204    pub fn reset_second_counter(&self) {
205        self.calls_this_sec.store(0, Ordering::Relaxed);
206    }
207
208    /// Returns the current in-flight count.
209    pub fn in_flight(&self) -> usize {
210        self.in_flight.load(Ordering::Acquire)
211    }
212
213    /// Returns the current memory usage.
214    pub fn memory_usage(&self) -> usize {
215        self.memory_bytes.load(Ordering::Acquire)
216    }
217
218    /// Returns the module name.
219    pub fn module_name(&self) -> &str {
220        &self.module_name
221    }
222}
223
224/// Global resource manager for the runtime.
225///
226/// Tracks all modules and clients, calculates backpressure,
227/// and enforces quotas.
228#[derive(Debug)]
229pub struct ResourceManager {
230    /// Global backpressure state.
231    pub backpressure: Arc<BackpressureState>,
232    /// Per-module resource tracking.
233    modules: DashMap<String, Arc<ModuleResources>>,
234    /// Default quota for modules without explicit configuration.
235    default_quota: ModuleQuota,
236}
237
238impl ResourceManager {
239    /// Creates a new resource manager.
240    pub fn new() -> Self {
241        Self {
242            backpressure: Arc::new(BackpressureState::new()),
243            modules: DashMap::new(),
244            default_quota: ModuleQuota::default(),
245        }
246    }
247
248    /// Creates a resource manager with custom default quota.
249    pub fn with_default_quota(quota: ModuleQuota) -> Self {
250        Self {
251            backpressure: Arc::new(BackpressureState::new()),
252            modules: DashMap::new(),
253            default_quota: quota,
254        }
255    }
256
257    /// Gets or creates a module resource tracker.
258    pub fn get_or_create_module(&self, module_name: &str) -> Arc<ModuleResources> {
259        self.modules
260            .entry(module_name.to_string())
261            .or_insert_with(|| {
262                Arc::new(ModuleResources::new(
263                    module_name.to_string(),
264                    self.default_quota.clone(),
265                ))
266            })
267            .clone()
268    }
269
270    /// Registers a module with a custom quota.
271    pub fn register_module(&self, module_name: &str, quota: ModuleQuota) {
272        self.modules.insert(
273            module_name.to_string(),
274            Arc::new(ModuleResources::new(module_name.to_string(), quota)),
275        );
276    }
277
278    /// Checks if a request can be admitted.
279    pub fn can_admit_request(&self, module_name: &str) -> bool {
280        // Check global backpressure first
281        let bp = self.backpressure.get();
282        if bp >= BP_CRITICAL_THRESHOLD {
283            self.backpressure.record_rejection();
284            return false;
285        }
286
287        // Check module-specific quota
288        let module = self.get_or_create_module(module_name);
289        if !module.can_admit() {
290            self.backpressure.record_rejection();
291            return false;
292        }
293
294        true
295    }
296
297    /// Records a request start.
298    pub fn record_request_start(&self, module_name: &str) -> bool {
299        if !self.can_admit_request(module_name) {
300            return false;
301        }
302
303        let module = self.get_or_create_module(module_name);
304        module.record_call_start();
305
306        // Update global queue depth
307        self.backpressure
308            .queue_depth
309            .fetch_add(1, Ordering::Relaxed);
310
311        true
312    }
313
314    /// Records a request completion.
315    pub fn record_request_end(&self, module_name: &str) {
316        let module = self.get_or_create_module(module_name);
317        module.record_call_end();
318
319        // Update global queue depth
320        self.backpressure
321            .queue_depth
322            .fetch_sub(1, Ordering::Relaxed);
323    }
324
325    /// Calculates and updates global backpressure.
326    ///
327    /// This should be called periodically (e.g., every 100ms).
328    pub fn update_backpressure(&self) {
329        let queue_depth = self.backpressure.queue_depth.load(Ordering::Acquire);
330        let active_clients = self.backpressure.active_clients.load(Ordering::Acquire);
331
332        // Calculate backpressure from multiple factors
333        let queue_factor = (queue_depth as f32 / 1000.0).min(1.0);
334        let client_factor = (active_clients as f32 / 100.0).min(1.0);
335
336        // Weighted average: 70% queue, 30% clients
337        let backpressure = queue_factor * 0.7 + client_factor * 0.3;
338
339        self.backpressure.set(backpressure);
340    }
341
342    /// Increments the active client count.
343    pub fn client_connected(&self) {
344        self.backpressure
345            .active_clients
346            .fetch_add(1, Ordering::AcqRel);
347    }
348
349    /// Decrements the active client count.
350    pub fn client_disconnected(&self) {
351        self.backpressure
352            .active_clients
353            .fetch_sub(1, Ordering::AcqRel);
354    }
355
356    /// Returns statistics about the resource manager.
357    pub fn stats(&self) -> ResourceManagerStats {
358        let mut total_in_flight = 0;
359        let mut total_memory = 0;
360
361        for entry in self.modules.iter() {
362            total_in_flight += entry.value().in_flight();
363            total_memory += entry.value().memory_usage();
364        }
365
366        ResourceManagerStats {
367            backpressure: self.backpressure.get(),
368            active_clients: self.backpressure.active_clients(),
369            queue_depth: self.backpressure.queue_depth(),
370            total_in_flight,
371            total_memory,
372            module_count: self.modules.len(),
373        }
374    }
375
376    /// Starts a background task that periodically resets counters and updates backpressure.
377    pub fn spawn_background_task(&self) -> std::thread::JoinHandle<()> {
378        let _backpressure = Arc::clone(&self.backpressure);
379        let modules = self
380            .modules
381            .iter()
382            .map(|e| e.key().clone())
383            .collect::<Vec<_>>();
384        let modules = Arc::new(modules);
385
386        std::thread::spawn(move || {
387            let mut last_second = Instant::now();
388            let mut last_update = Instant::now();
389
390            loop {
391                let now = Instant::now();
392
393                // Reset per-second counters every second
394                if now.duration_since(last_second) >= Duration::from_secs(1) {
395                    for module_name in modules.iter() {
396                        // Note: In production, you'd get the actual ModuleResources
397                        // This is a simplified version
398                        let _ = module_name;
399                    }
400                    last_second = now;
401                }
402
403                // Update backpressure every 100ms
404                if now.duration_since(last_update) >= Duration::from_millis(100) {
405                    // Note: Would need access to self here
406                    // This is a simplified version
407                    last_update = now;
408                }
409
410                std::thread::sleep(Duration::from_millis(10));
411            }
412        })
413    }
414}
415
416impl Default for ResourceManager {
417    fn default() -> Self {
418        Self::new()
419    }
420}
421
422/// Statistics about resource manager state.
423#[derive(Debug, Clone)]
424pub struct ResourceManagerStats {
425    pub backpressure: f32,
426    pub active_clients: usize,
427    pub queue_depth: usize,
428    pub total_in_flight: usize,
429    pub total_memory: usize,
430    pub module_count: usize,
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    #[test]
438    fn test_backpressure_state() {
439        let bp = BackpressureState::new();
440        assert_eq!(bp.get(), 0.0);
441
442        bp.set(0.5);
443        assert_eq!(bp.get(), 0.5);
444
445        bp.set(1.0);
446        assert_eq!(bp.get(), 1.0);
447
448        bp.set(1.5); // Should clamp to 1.0
449        assert_eq!(bp.get(), 1.0);
450    }
451
452    #[test]
453    fn test_module_quota_enforcement() {
454        let quota = ModuleQuota {
455            max_calls_per_sec: 10,
456            max_in_flight: 5,
457            max_memory_bytes: 1024,
458        };
459
460        let module = ModuleResources::new("test".to_string(), quota);
461
462        // Should admit up to max_in_flight
463        for i in 0..5 {
464            assert!(module.record_call_start(), "Call {} should be admitted", i);
465        }
466
467        // 6th call should be rejected (in-flight limit)
468        assert!(!module.record_call_start());
469
470        // Complete one call
471        module.record_call_end();
472
473        // Now should admit again
474        assert!(module.record_call_start());
475    }
476
477    #[test]
478    fn test_resource_manager_backpressure() {
479        let rm = ResourceManager::new();
480
481        // Initially no backpressure
482        assert_eq!(rm.backpressure.get(), 0.0);
483
484        // Simulate requests
485        rm.record_request_start("module1");
486        rm.record_request_start("module2");
487        rm.record_request_start("module1");
488
489        // Update backpressure
490        rm.update_backpressure();
491
492        // Should have some backpressure now
493        assert!(rm.backpressure.get() > 0.0);
494
495        // Complete requests
496        rm.record_request_end("module1");
497        rm.record_request_end("module2");
498        rm.record_request_end("module1");
499
500        // Update backpressure
501        rm.update_backpressure();
502
503        // Backpressure should decrease
504        assert!(rm.backpressure.get() < 0.1);
505    }
506
507    #[test]
508    fn test_client_tracking() {
509        let rm = ResourceManager::new();
510        assert_eq!(rm.backpressure.active_clients(), 0);
511
512        rm.client_connected();
513        rm.client_connected();
514        assert_eq!(rm.backpressure.active_clients(), 2);
515
516        rm.client_disconnected();
517        assert_eq!(rm.backpressure.active_clients(), 1);
518    }
519}