Skip to main content

hyperi_rustlib/memory/
guard.rs

1// Project:   hyperi-rustlib
2// File:      src/memory/guard.rs
3// Purpose:   Memory guard with backpressure signals
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Memory guard with backpressure signals.
10
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12
13use super::cgroup;
14
15/// Read an env var `{PREFIX}_{SUFFIX}` and parse it.
16fn env_parsed<T: std::str::FromStr>(prefix: &str, suffix: &str) -> Option<T> {
17    std::env::var(format!("{prefix}_{suffix}"))
18        .ok()
19        .and_then(|v| v.parse().ok())
20}
21
22/// Memory pressure levels.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum MemoryPressure {
25    /// Usage below 50% of limit.
26    Low,
27    /// Usage between 50% and pressure_threshold.
28    Medium,
29    /// Usage above pressure_threshold -- apply backpressure.
30    High,
31}
32
33/// Configuration for `MemoryGuard`.
34///
35/// When the `config` feature is enabled, this can be loaded from the config
36/// cascade under the `memory` key:
37///
38/// ```yaml
39/// memory:
40///   limit_bytes: 0           # 0 = auto-detect from cgroup/system
41///   pressure_threshold: 0.80 # backpressure at 80% of effective limit
42///   cgroup_headroom: 0.85    # use 85% of cgroup limit
43/// ```
44#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
45pub struct MemoryGuardConfig {
46    /// Explicit memory limit in bytes. 0 = auto-detect from cgroup/system.
47    #[serde(default)]
48    pub limit_bytes: u64,
49    /// Fraction of limit at which backpressure activates (default 0.8).
50    #[serde(default = "default_pressure_threshold")]
51    pub pressure_threshold: f64,
52    /// Fraction of cgroup limit to use as the effective limit (default 0.85).
53    /// Leaves headroom for the process itself (stack, code, etc.).
54    #[serde(default = "default_cgroup_headroom")]
55    pub cgroup_headroom: f64,
56}
57
58fn default_pressure_threshold() -> f64 {
59    DEFAULT_PRESSURE_THRESHOLD
60}
61
62fn default_cgroup_headroom() -> f64 {
63    DEFAULT_CGROUP_HEADROOM
64}
65
66/// Default cgroup headroom: use 85% of cgroup limit.
67///
68/// Rationale: Rust has no GC so no spike headroom needed (unlike JVM 75% / Go 80%).
69/// 15% headroom covers jemalloc fragmentation, kernel overhead, and page cache.
70const DEFAULT_CGROUP_HEADROOM: f64 = 0.85;
71
72/// Default pressure threshold: backpressure at 80% of effective limit.
73///
74/// With 85% headroom, backpressure activates at ~68% of actual cgroup limit.
75/// Matches OTel Collector's `limit_percentage: 80` philosophy.
76const DEFAULT_PRESSURE_THRESHOLD: f64 = 0.80;
77
78impl Default for MemoryGuardConfig {
79    fn default() -> Self {
80        Self {
81            limit_bytes: 0, // auto-detect
82            pressure_threshold: DEFAULT_PRESSURE_THRESHOLD,
83            cgroup_headroom: DEFAULT_CGROUP_HEADROOM,
84        }
85    }
86}
87
88impl MemoryGuardConfig {
89    /// Load from the config cascade, falling back to defaults.
90    ///
91    /// When the `config` feature is enabled and `config::setup()` has been
92    /// called, reads the `memory` key from the cascade. Otherwise returns
93    /// [`MemoryGuardConfig::default()`].
94    #[must_use]
95    pub fn from_cascade() -> Self {
96        #[cfg(feature = "config")]
97        {
98            if let Some(cfg) = crate::config::try_get()
99                && let Ok(memory) = cfg.unmarshal_key_registered::<Self>("memory")
100            {
101                return memory;
102            }
103        }
104        Self::default()
105    }
106
107    /// Create config from environment variables with a prefix.
108    ///
109    /// Reads standard env vars for memory configuration:
110    /// - `{PREFIX}_MEMORY_LIMIT_BYTES` -- explicit limit (0 or unset = auto-detect from cgroup)
111    /// - `{PREFIX}_MEMORY_PRESSURE_THRESHOLD` -- backpressure trigger (default 0.80)
112    /// - `{PREFIX}_MEMORY_CGROUP_HEADROOM` -- fraction of cgroup limit to use (default 0.85)
113    ///
114    /// # Example
115    ///
116    /// ```bash
117    /// DFE_MEMORY_LIMIT_BYTES=4294967296      # 4 GiB explicit
118    /// DFE_MEMORY_PRESSURE_THRESHOLD=0.75     # backpressure at 75%
119    /// DFE_MEMORY_CGROUP_HEADROOM=0.90        # use 90% of cgroup
120    /// ```
121    ///
122    /// ```rust,no_run
123    /// use hyperi_rustlib::memory::MemoryGuardConfig;
124    /// let config = MemoryGuardConfig::from_env("DFE");
125    /// ```
126    #[must_use]
127    #[cfg(feature = "config")]
128    pub fn from_env(prefix: &str) -> Self {
129        use crate::config::flat_env::flat_env_parsed;
130
131        let mut config = Self::default();
132
133        if let Some(v) = flat_env_parsed::<u64>(prefix, "MEMORY_LIMIT_BYTES") {
134            config.limit_bytes = v;
135        }
136        if let Some(v) = flat_env_parsed::<f64>(prefix, "MEMORY_PRESSURE_THRESHOLD") {
137            config.pressure_threshold = v;
138        }
139        if let Some(v) = flat_env_parsed::<f64>(prefix, "MEMORY_CGROUP_HEADROOM") {
140            config.cgroup_headroom = v;
141        }
142
143        config
144    }
145
146    /// Create config from environment variables without requiring `config` feature.
147    ///
148    /// Same as [`from_env`](Self::from_env) but uses `std::env` directly.
149    #[must_use]
150    pub fn from_env_raw(prefix: &str) -> Self {
151        let mut config = Self::default();
152
153        if let Some(v) = env_parsed::<u64>(prefix, "MEMORY_LIMIT_BYTES") {
154            config.limit_bytes = v;
155        }
156        if let Some(v) = env_parsed::<f64>(prefix, "MEMORY_PRESSURE_THRESHOLD") {
157            config.pressure_threshold = v;
158        }
159        if let Some(v) = env_parsed::<f64>(prefix, "MEMORY_CGROUP_HEADROOM") {
160            config.cgroup_headroom = v;
161        }
162
163        config
164    }
165}
166
167/// Cgroup-aware memory tracking with backpressure signals.
168///
169/// Tracks application-level memory usage (not process RSS) and provides
170/// fast atomic checks for the hot path. Designed for data pipeline services
171/// where incoming data must be rejected (503) before hitting the container
172/// memory limit.
173///
174/// # Usage
175///
176/// ```rust,no_run
177/// use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
178///
179/// let guard = MemoryGuard::new(MemoryGuardConfig::default());
180///
181/// // On data arrival -- check before accepting
182/// let payload_len = 1024u64;
183/// if !guard.try_reserve(payload_len) {
184///     // return 503 -- backpressure
185/// }
186///
187/// // After data is flushed/sent
188/// guard.release(payload_len);
189///
190/// // Fast hot-path check
191/// if guard.under_pressure() {
192///     // return 503
193/// }
194/// ```
195pub struct MemoryGuard {
196    /// Current tracked bytes (application-level, not RSS).
197    current_bytes: AtomicU64,
198    /// Effective memory limit in bytes.
199    limit_bytes: u64,
200    /// Pressure threshold (0.0-1.0).
201    pressure_threshold: f64,
202    /// Fast boolean for hot-path pressure check.
203    under_pressure: AtomicBool,
204}
205
206impl MemoryGuard {
207    /// Create a new memory guard.
208    ///
209    /// If `config.limit_bytes` is 0, auto-detects from cgroup (K8s) or system memory,
210    /// then applies `cgroup_headroom` factor to leave room for process overhead.
211    #[must_use]
212    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
213    pub fn new(config: MemoryGuardConfig) -> Self {
214        let raw_limit = if config.limit_bytes > 0 {
215            config.limit_bytes
216        } else {
217            let detected = cgroup::detect_memory_limit();
218            // Apply headroom -- don't use 100% of cgroup limit
219            (detected as f64 * config.cgroup_headroom) as u64
220        };
221
222        tracing::info!(
223            limit_bytes = raw_limit,
224            pressure_threshold = config.pressure_threshold,
225            "memory guard initialised"
226        );
227
228        Self {
229            current_bytes: AtomicU64::new(0),
230            limit_bytes: raw_limit,
231            pressure_threshold: config.pressure_threshold,
232            under_pressure: AtomicBool::new(false),
233        }
234    }
235
236    /// Try to reserve bytes. Returns false if over the limit (backpressure).
237    ///
238    /// This is an atomic check-and-add. If the reservation would exceed
239    /// the limit, the bytes are NOT added and false is returned.
240    #[inline]
241    pub fn try_reserve(&self, bytes: u64) -> bool {
242        let current = self.current_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
243        if current > self.limit_bytes {
244            // Over limit -- roll back
245            self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
246            self.under_pressure.store(true, Ordering::Relaxed);
247            return false;
248        }
249        self.update_pressure(current);
250        true
251    }
252
253    /// Add bytes without checking the limit (for tracking only).
254    /// Use when data is already accepted and you just need to track it.
255    #[inline]
256    pub fn add_bytes(&self, bytes: u64) {
257        let new_total = self.current_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
258        self.update_pressure(new_total);
259    }
260
261    /// Release bytes after data is flushed/sent/dropped.
262    ///
263    /// Uses saturating subtraction to prevent underflow wrapping.
264    #[inline]
265    pub fn release(&self, bytes: u64) {
266        let prev = self
267            .current_bytes
268            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
269                Some(current.saturating_sub(bytes))
270            })
271            // Always succeeds (closure always returns Some).
272            .unwrap_or_else(|v| v);
273        self.update_pressure(prev.saturating_sub(bytes));
274    }
275
276    /// Fast hot-path pressure check (single atomic load).
277    #[inline]
278    pub fn under_pressure(&self) -> bool {
279        self.under_pressure.load(Ordering::Relaxed)
280    }
281
282    /// Current pressure level.
283    #[inline]
284    pub fn pressure(&self) -> MemoryPressure {
285        let ratio = self.pressure_ratio();
286        if ratio >= self.pressure_threshold {
287            MemoryPressure::High
288        } else if ratio >= 0.5 {
289            MemoryPressure::Medium
290        } else {
291            MemoryPressure::Low
292        }
293    }
294
295    /// Current usage as fraction of limit (0.0 - 1.0+).
296    #[inline]
297    pub fn pressure_ratio(&self) -> f64 {
298        self.current_bytes.load(Ordering::Relaxed) as f64 / self.limit_bytes as f64
299    }
300
301    /// Current tracked bytes.
302    #[inline]
303    pub fn current_bytes(&self) -> u64 {
304        self.current_bytes.load(Ordering::Relaxed)
305    }
306
307    /// Configured memory limit in bytes.
308    #[inline]
309    pub fn limit_bytes(&self) -> u64 {
310        self.limit_bytes
311    }
312
313    /// Update the pressure flag based on current usage.
314    #[inline]
315    fn update_pressure(&self, current: u64) {
316        let ratio = current as f64 / self.limit_bytes as f64;
317        self.under_pressure
318            .store(ratio >= self.pressure_threshold, Ordering::Relaxed);
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325
326    #[test]
327    fn test_memory_guard_default() {
328        let guard = MemoryGuard::new(MemoryGuardConfig {
329            limit_bytes: 1_000_000, // 1MB explicit
330            ..Default::default()
331        });
332        assert_eq!(guard.limit_bytes(), 1_000_000);
333        assert_eq!(guard.current_bytes(), 0);
334        assert!(!guard.under_pressure());
335        assert_eq!(guard.pressure(), MemoryPressure::Low);
336    }
337
338    #[test]
339    fn test_try_reserve_within_limit() {
340        let guard = MemoryGuard::new(MemoryGuardConfig {
341            limit_bytes: 1000,
342            ..Default::default()
343        });
344        assert!(guard.try_reserve(500));
345        assert_eq!(guard.current_bytes(), 500);
346    }
347
348    #[test]
349    fn test_try_reserve_over_limit() {
350        let guard = MemoryGuard::new(MemoryGuardConfig {
351            limit_bytes: 1000,
352            ..Default::default()
353        });
354        assert!(guard.try_reserve(500));
355        assert!(!guard.try_reserve(600)); // would exceed 1000
356        assert_eq!(guard.current_bytes(), 500); // rolled back
357        assert!(guard.under_pressure());
358    }
359
360    #[test]
361    fn test_release_reduces_pressure() {
362        let guard = MemoryGuard::new(MemoryGuardConfig {
363            limit_bytes: 1000,
364            pressure_threshold: 0.8,
365            ..Default::default()
366        });
367        guard.add_bytes(900); // 90% -- over threshold
368        assert!(guard.under_pressure());
369        assert_eq!(guard.pressure(), MemoryPressure::High);
370
371        guard.release(500); // down to 400 = 40%
372        assert!(!guard.under_pressure());
373        assert_eq!(guard.pressure(), MemoryPressure::Low);
374    }
375
376    #[test]
377    fn test_pressure_levels() {
378        let guard = MemoryGuard::new(MemoryGuardConfig {
379            limit_bytes: 1000,
380            pressure_threshold: 0.8,
381            ..Default::default()
382        });
383
384        // Low (< 50%)
385        guard.add_bytes(400);
386        assert_eq!(guard.pressure(), MemoryPressure::Low);
387
388        // Medium (50-80%)
389        guard.add_bytes(200); // 600 = 60%
390        assert_eq!(guard.pressure(), MemoryPressure::Medium);
391
392        // High (>= 80%)
393        guard.add_bytes(300); // 900 = 90%
394        assert_eq!(guard.pressure(), MemoryPressure::High);
395    }
396
397    #[test]
398    fn test_pressure_ratio() {
399        let guard = MemoryGuard::new(MemoryGuardConfig {
400            limit_bytes: 1000,
401            ..Default::default()
402        });
403        guard.add_bytes(250);
404        let ratio = guard.pressure_ratio();
405        assert!((ratio - 0.25).abs() < 0.001);
406    }
407
408    #[test]
409    fn test_release_saturating() {
410        let guard = MemoryGuard::new(MemoryGuardConfig {
411            limit_bytes: 1000,
412            ..Default::default()
413        });
414        guard.add_bytes(100);
415        guard.release(200); // release more than added -- saturates to 0
416        assert_eq!(
417            guard.current_bytes(),
418            0,
419            "over-release must saturate to 0, not wrap"
420        );
421        assert!(!guard.under_pressure());
422        assert_eq!(guard.pressure(), MemoryPressure::Low);
423
424        // Verify the guard is still functional after over-release
425        assert!(guard.try_reserve(500));
426        assert_eq!(guard.current_bytes(), 500);
427    }
428
429    #[test]
430    fn test_concurrent_reserve_release() {
431        use std::sync::Arc;
432        use std::thread;
433
434        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
435            limit_bytes: 100_000,
436            pressure_threshold: 0.8,
437            ..Default::default()
438        }));
439
440        let mut handles = vec![];
441        for _ in 0..10 {
442            let g = Arc::clone(&guard);
443            handles.push(thread::spawn(move || {
444                for _ in 0..100 {
445                    g.add_bytes(100);
446                    g.release(100);
447                }
448            }));
449        }
450        for h in handles {
451            h.join().unwrap();
452        }
453        // All bytes should be released -- may not be exactly 0 due to ordering
454        // but should be close (within one thread's batch)
455        assert!(
456            guard.current_bytes() < 1000,
457            "leaked bytes: {}",
458            guard.current_bytes()
459        );
460    }
461
462    #[test]
463    fn test_try_reserve_rollback_is_atomic() {
464        let guard = MemoryGuard::new(MemoryGuardConfig {
465            limit_bytes: 100,
466            ..Default::default()
467        });
468        assert!(guard.try_reserve(90));
469        assert!(!guard.try_reserve(20)); // over limit, rolled back
470        assert_eq!(guard.current_bytes(), 90); // not 110
471        assert!(guard.try_reserve(10)); // exactly at limit
472        assert_eq!(guard.current_bytes(), 100);
473    }
474
475    #[test]
476    fn test_config_defaults() {
477        let config = MemoryGuardConfig::default();
478        assert_eq!(config.limit_bytes, 0);
479        assert!((config.pressure_threshold - 0.80).abs() < 0.001);
480        assert!((config.cgroup_headroom - 0.85).abs() < 0.001);
481    }
482
483    #[test]
484    fn test_from_env_raw_defaults_when_unset() {
485        // With no env vars set, should return defaults
486        let config = MemoryGuardConfig::from_env_raw("TEST_MG_UNSET");
487        assert_eq!(config.limit_bytes, 0);
488        assert!((config.pressure_threshold - 0.80).abs() < 0.001);
489        assert!((config.cgroup_headroom - 0.85).abs() < 0.001);
490    }
491
492    #[test]
493    fn test_env_parsed_helper() {
494        // env_parsed returns None for unset vars
495        assert!(env_parsed::<u64>("NONEXISTENT_PREFIX_XYZ", "FOO").is_none());
496        assert!(env_parsed::<f64>("NONEXISTENT_PREFIX_XYZ", "BAR").is_none());
497    }
498
499    #[test]
500    fn test_guard_with_explicit_config_overrides() {
501        // Simulates what from_env would produce with overrides
502        let config = MemoryGuardConfig {
503            limit_bytes: 2_147_483_648,
504            pressure_threshold: 0.75,
505            cgroup_headroom: 0.90,
506        };
507        let guard = MemoryGuard::new(config);
508        assert_eq!(guard.limit_bytes(), 2_147_483_648);
509    }
510
511    #[test]
512    fn test_guard_with_custom_headroom() {
513        // 85% headroom on 1 GiB = 870 MiB effective
514        let config = MemoryGuardConfig {
515            limit_bytes: 0, // auto-detect
516            pressure_threshold: 0.80,
517            cgroup_headroom: 0.85,
518        };
519        let guard = MemoryGuard::new(config);
520        // Auto-detected, so limit should be 85% of system/cgroup memory
521        assert!(guard.limit_bytes() > 0);
522    }
523
524    #[test]
525    fn test_auto_detect_limit() {
526        // With limit_bytes = 0, should auto-detect from system
527        let guard = MemoryGuard::new(MemoryGuardConfig::default());
528        assert!(
529            guard.limit_bytes() > 0,
530            "auto-detected limit should be positive"
531        );
532        // Should be less than total system memory (headroom applied)
533    }
534}