Skip to main content

hyperi_rustlib/memory/
guard.rs

1// Project:   hyperi-rustlib
2// File:      src/memory/guard.rs
3// Purpose:   Memory guard with backpressure signals
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Memory guard with backpressure signals.
10
11use std::sync::OnceLock;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13
14use super::cgroup;
15
16/// Process-wide total-heap byte source, set once at startup.
17///
18/// See [`set_heap_source`] for the rationale. Allocator-agnostic: any
19/// `fn() -> usize` returning live heap bytes (e.g. `cap::Cap::allocated`,
20/// jemalloc `stats.allocated`).
21static HEAP_SOURCE: OnceLock<fn() -> usize> = OnceLock::new();
22
23/// Register a process-wide source of total live-heap bytes.
24///
25/// When set, every [`MemoryGuard`] switches its read path
26/// ([`current_bytes`](MemoryGuard::current_bytes), pressure checks, and
27/// [`try_reserve`](MemoryGuard::try_reserve) admission) from the per-batch
28/// reservation counter to this source -- a cheap, accurate, *total-process*
29/// heap figure that also catches growth the per-batch reservations never see
30/// (e.g. a transform ballooning a `Vec`).
31///
32/// **Why a global hook and not a dependency:** a tracking allocator must be
33/// the binary's single `#[global_allocator]`, which is the *application's*
34/// choice, not a library's -- and rustlib is `#![forbid(unsafe_code)]`, so it
35/// cannot implement one anyway. The application installs its allocator and
36/// wires it here in a few lines. This keeps rustlib allocator-agnostic with no
37/// allocator dependency in its graph.
38///
39/// The first call wins and returns `true`; later calls are a no-op and return
40/// `false` (the existing source is kept). Call once at startup, before
41/// constructing guards.
42///
43/// The application picks a tracking allocator -- prefer an actively-maintained
44/// one such as `tikv-jemalloc-ctl` (`stats.allocated`); the `cap` crate also
45/// works but is effectively unmaintained (last release 2023).
46///
47/// ```ignore
48/// // In the application binary, using jemalloc:
49/// #[global_allocator]
50/// static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
51///
52/// fn main() {
53///     hyperi_rustlib::memory::set_heap_source(|| {
54///         tikv_jemalloc_ctl::epoch::advance().ok();
55///         tikv_jemalloc_ctl::stats::allocated::read().unwrap_or(0)
56///     });
57///     // ... build ServiceRuntime / MemoryGuard ...
58/// }
59/// ```
60#[must_use]
61pub fn set_heap_source(source: fn() -> usize) -> bool {
62    HEAP_SOURCE.set(source).is_ok()
63}
64
65/// Read the registered total-heap source, if any.
66#[inline]
67fn heap_bytes() -> Option<u64> {
68    HEAP_SOURCE.get().map(|f| f() as u64)
69}
70
71/// Read an env var `{PREFIX}_{SUFFIX}` and parse it.
72fn env_parsed<T: std::str::FromStr>(prefix: &str, suffix: &str) -> Option<T> {
73    std::env::var(format!("{prefix}_{suffix}"))
74        .ok()
75        .and_then(|v| v.parse().ok())
76}
77
78/// Memory pressure levels.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum MemoryPressure {
81    /// Usage below 50% of limit.
82    Low,
83    /// Usage between 50% and pressure_threshold.
84    Medium,
85    /// Usage above pressure_threshold -- apply backpressure.
86    High,
87}
88
89/// Configuration for `MemoryGuard`.
90///
91/// When the `config` feature is enabled, this can be loaded from the config
92/// cascade under the `memory` key:
93///
94/// ```yaml
95/// memory:
96///   limit_bytes: 0           # 0 = auto-detect from cgroup/system
97///   pressure_threshold: 0.80 # backpressure at 80% of effective limit
98///   cgroup_headroom: 0.85    # use 85% of cgroup limit
99/// ```
100#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
101pub struct MemoryGuardConfig {
102    /// Explicit memory limit in bytes. 0 = auto-detect from cgroup/system.
103    #[serde(default)]
104    pub limit_bytes: u64,
105    /// Fraction of limit at which backpressure activates (default 0.8).
106    #[serde(default = "default_pressure_threshold")]
107    pub pressure_threshold: f64,
108    /// Fraction of cgroup limit to use as the effective limit (default 0.85).
109    /// Leaves headroom for the process itself (stack, code, etc.).
110    #[serde(default = "default_cgroup_headroom")]
111    pub cgroup_headroom: f64,
112}
113
114fn default_pressure_threshold() -> f64 {
115    DEFAULT_PRESSURE_THRESHOLD
116}
117
118fn default_cgroup_headroom() -> f64 {
119    DEFAULT_CGROUP_HEADROOM
120}
121
122/// A fraction is valid iff it is finite and within `(0.0, 1.0]`.
123fn check_fraction(v: f64, name: &str) -> Result<(), String> {
124    if !v.is_finite() || v <= 0.0 || v > 1.0 {
125        return Err(format!(
126            "memory.{name} must be a finite fraction in (0.0, 1.0], got {v}"
127        ));
128    }
129    Ok(())
130}
131
132/// Return `v` if it is a valid fraction, else log an error and substitute
133/// `default`. Defensive guard so a bad config cannot produce a zero/`NaN`
134/// limit and a divide-by-zero pressure ratio.
135fn sane_fraction(v: f64, default: f64, name: &str) -> f64 {
136    if check_fraction(v, name).is_err() {
137        tracing::error!(
138            value = v,
139            "invalid memory.{name} (need finite fraction in (0,1]); using default {default}"
140        );
141        default
142    } else {
143        v
144    }
145}
146
147/// Effective auto-detected limit: cgroup limit * headroom, capped at the soft
148/// throttle (`memory.high`) when that is set lower.
149///
150/// The kernel reclaims hard and throttles allocations at `memory.high` (before
151/// the `memory.max` OOM-kill), so admitting past it courts a latency cliff.
152/// Pure, so the cap logic is unit-testable without touching the real cgroup
153/// files.
154#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
155fn effective_auto_limit(detected: u64, headroom: f64, high: Option<u64>) -> u64 {
156    let headroom_limit = (detected as f64 * headroom) as u64;
157    match high {
158        Some(h) => headroom_limit.min(h),
159        None => headroom_limit,
160    }
161}
162
163/// Default cgroup headroom: use 85% of cgroup limit.
164///
165/// Rationale: Rust has no GC so no spike headroom needed (unlike JVM 75% / Go 80%).
166/// 15% headroom covers jemalloc fragmentation, kernel overhead, and page cache.
167const DEFAULT_CGROUP_HEADROOM: f64 = 0.85;
168
169/// Default pressure threshold: backpressure at 80% of effective limit.
170///
171/// With 85% headroom, backpressure activates at ~68% of actual cgroup limit.
172/// Matches OTel Collector's `limit_percentage: 80` philosophy.
173const DEFAULT_PRESSURE_THRESHOLD: f64 = 0.80;
174
175impl Default for MemoryGuardConfig {
176    fn default() -> Self {
177        Self {
178            limit_bytes: 0, // auto-detect
179            pressure_threshold: DEFAULT_PRESSURE_THRESHOLD,
180            cgroup_headroom: DEFAULT_CGROUP_HEADROOM,
181        }
182    }
183}
184
185impl MemoryGuardConfig {
186    /// Load from the config cascade, falling back to defaults.
187    ///
188    /// When the `config` feature is enabled and `config::setup()` has been
189    /// called, reads the `memory` key from the cascade. Otherwise returns
190    /// [`MemoryGuardConfig::default()`].
191    #[must_use]
192    pub fn from_cascade() -> Self {
193        #[cfg(feature = "config")]
194        {
195            if let Some(cfg) = crate::config::try_get()
196                && let Ok(memory) = cfg.unmarshal_key_registered::<Self>("memory")
197            {
198                return memory;
199            }
200        }
201        Self::default()
202    }
203
204    /// Create config from environment variables with a prefix.
205    ///
206    /// Reads standard env vars for memory configuration:
207    /// - `{PREFIX}_MEMORY_LIMIT_BYTES` -- explicit limit (0 or unset = auto-detect from cgroup)
208    /// - `{PREFIX}_MEMORY_PRESSURE_THRESHOLD` -- backpressure trigger (default 0.80)
209    /// - `{PREFIX}_MEMORY_CGROUP_HEADROOM` -- fraction of cgroup limit to use (default 0.85)
210    ///
211    /// # Example
212    ///
213    /// ```bash
214    /// DFE_MEMORY_LIMIT_BYTES=4294967296      # 4 GiB explicit
215    /// DFE_MEMORY_PRESSURE_THRESHOLD=0.75     # backpressure at 75%
216    /// DFE_MEMORY_CGROUP_HEADROOM=0.90        # use 90% of cgroup
217    /// ```
218    ///
219    /// ```rust,no_run
220    /// use hyperi_rustlib::memory::MemoryGuardConfig;
221    /// let config = MemoryGuardConfig::from_env("DFE");
222    /// ```
223    #[must_use]
224    #[cfg(feature = "config")]
225    pub fn from_env(prefix: &str) -> Self {
226        let mut config = Self::default();
227        config.apply_flat_env(prefix);
228        config
229    }
230
231    /// Overlay the flat `{PREFIX}_MEMORY_*` env vars onto this config in place.
232    ///
233    /// This is the legacy DFE single-underscore convention
234    /// (`DFE_MEMORY_LIMIT_BYTES`), distinct from the config cascade's nested
235    /// `__` env layer (`{APP}__MEMORY__LIMIT_BYTES`). A present var wins; an
236    /// absent var leaves the field untouched.
237    #[cfg(feature = "config")]
238    fn apply_flat_env(&mut self, prefix: &str) {
239        use crate::config::flat_env::flat_env_parsed;
240
241        if let Some(v) = flat_env_parsed::<u64>(prefix, "MEMORY_LIMIT_BYTES") {
242            self.limit_bytes = v;
243        }
244        if let Some(v) = flat_env_parsed::<f64>(prefix, "MEMORY_PRESSURE_THRESHOLD") {
245            self.pressure_threshold = v;
246        }
247        if let Some(v) = flat_env_parsed::<f64>(prefix, "MEMORY_CGROUP_HEADROOM") {
248            self.cgroup_headroom = v;
249        }
250    }
251
252    /// Cascade-honouring resolution: the config cascade `memory:` section
253    /// (`defaults.yaml` < `settings.yaml` < `settings.{env}.yaml` < nested `__`
254    /// env) is the BASE, with the legacy flat `{PREFIX}_MEMORY_*` env vars
255    /// overlaid as the most-specific operator override.
256    ///
257    /// This is what the live guard should use. [`from_env`](Self::from_env)
258    /// alone ignores the cascade `memory:` section entirely -- setting it in
259    /// YAML did nothing for the guard before 2.8.12 (the recurring
260    /// "set a cascade value, nothing happens" footgun). Requires
261    /// `config::setup()` to have populated the cascade (which `run_app` now
262    /// does, since 2.8.11).
263    #[must_use]
264    #[cfg(feature = "config")]
265    pub fn from_cascade_with_env(prefix: &str) -> Self {
266        let mut config = Self::from_cascade();
267        config.apply_flat_env(prefix);
268        config
269    }
270
271    /// Create config from environment variables without requiring `config` feature.
272    ///
273    /// Same as [`from_env`](Self::from_env) but uses `std::env` directly.
274    #[must_use]
275    pub fn from_env_raw(prefix: &str) -> Self {
276        let mut config = Self::default();
277
278        if let Some(v) = env_parsed::<u64>(prefix, "MEMORY_LIMIT_BYTES") {
279            config.limit_bytes = v;
280        }
281        if let Some(v) = env_parsed::<f64>(prefix, "MEMORY_PRESSURE_THRESHOLD") {
282            config.pressure_threshold = v;
283        }
284        if let Some(v) = env_parsed::<f64>(prefix, "MEMORY_CGROUP_HEADROOM") {
285            config.cgroup_headroom = v;
286        }
287
288        config
289    }
290
291    /// Validate the config, returning an error describing the first invalid
292    /// field. `pressure_threshold` and `cgroup_headroom` must each be a finite
293    /// fraction in `(0.0, 1.0]`. Call this at startup to fail fast on bad
294    /// config rather than relying on [`MemoryGuard::new`]'s defensive clamping.
295    ///
296    /// # Errors
297    ///
298    /// Returns `Err` with a human-readable message if a fraction field is
299    /// non-finite, `<= 0.0`, or `> 1.0`.
300    pub fn validate(&self) -> Result<(), String> {
301        check_fraction(self.pressure_threshold, "pressure_threshold")?;
302        check_fraction(self.cgroup_headroom, "cgroup_headroom")?;
303        Ok(())
304    }
305}
306
307/// Cgroup-aware memory tracking with backpressure signals.
308///
309/// Tracks application-level memory usage (not process RSS) and provides
310/// fast atomic checks for the hot path. Designed for data pipeline services
311/// where incoming data must be rejected (503) before hitting the container
312/// memory limit.
313///
314/// # Usage
315///
316/// ```rust,no_run
317/// use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
318///
319/// let guard = MemoryGuard::new(MemoryGuardConfig::default());
320///
321/// // On data arrival -- check before accepting
322/// let payload_len = 1024u64;
323/// if !guard.try_reserve(payload_len) {
324///     // return 503 -- backpressure
325/// }
326///
327/// // After data is flushed/sent
328/// guard.release(payload_len);
329///
330/// // Fast hot-path check
331/// if guard.under_pressure() {
332///     // return 503
333/// }
334/// ```
335pub struct MemoryGuard {
336    /// Current tracked bytes (application-level, not RSS).
337    current_bytes: AtomicU64,
338    /// Effective memory limit in bytes.
339    limit_bytes: u64,
340    /// Pressure threshold (0.0-1.0).
341    pressure_threshold: f64,
342    /// Fast boolean for hot-path pressure check.
343    under_pressure: AtomicBool,
344}
345
346impl MemoryGuard {
347    /// Create a new memory guard.
348    ///
349    /// If `config.limit_bytes` is 0, auto-detects from cgroup (K8s) or system memory,
350    /// then applies `cgroup_headroom` factor to leave room for process overhead.
351    #[must_use]
352    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
353    pub fn new(config: MemoryGuardConfig) -> Self {
354        // Defensive: a non-finite / out-of-range threshold or headroom would
355        // produce a zero/NaN limit and a divide-by-zero pressure ratio. Clamp
356        // to the safe default and log loudly. Callers wanting hard rejection
357        // should call `config.validate()` at startup.
358        let pressure_threshold = sane_fraction(
359            config.pressure_threshold,
360            DEFAULT_PRESSURE_THRESHOLD,
361            "pressure_threshold",
362        );
363        let cgroup_headroom = sane_fraction(
364            config.cgroup_headroom,
365            DEFAULT_CGROUP_HEADROOM,
366            "cgroup_headroom",
367        );
368
369        let raw_limit = if config.limit_bytes > 0 {
370            config.limit_bytes
371        } else {
372            effective_auto_limit(
373                cgroup::detect_memory_limit(),
374                cgroup_headroom,
375                cgroup::detect_memory_high(),
376            )
377        };
378        // Never permit a zero effective limit: every pressure calculation
379        // divides by it.
380        let limit_bytes = raw_limit.max(1);
381
382        tracing::info!(limit_bytes, pressure_threshold, "memory guard initialised");
383
384        Self {
385            current_bytes: AtomicU64::new(0),
386            limit_bytes,
387            pressure_threshold,
388            under_pressure: AtomicBool::new(false),
389        }
390    }
391
392    /// Try to reserve bytes. Returns false if over the limit (backpressure).
393    ///
394    /// With a registered [`set_heap_source`], this is a projected-admission
395    /// check against the *true total heap* (`heap() + bytes <= limit`) and does
396    /// NOT mutate the reservation counter -- the allocator already accounts the
397    /// bytes once they are allocated, and frees them on drop, so no `release`
398    /// is needed. Without a source it is the classic atomic check-and-add on
399    /// the per-batch counter (rolled back if it would exceed the limit).
400    #[inline]
401    pub fn try_reserve(&self, bytes: u64) -> bool {
402        if let Some(heap) = heap_bytes() {
403            return heap + bytes <= self.limit_bytes;
404        }
405        let current = self.current_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
406        if current > self.limit_bytes {
407            // Over limit -- roll back
408            self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
409            self.under_pressure.store(true, Ordering::Relaxed);
410            return false;
411        }
412        self.update_pressure(current);
413        true
414    }
415
416    /// Add bytes without checking the limit (for tracking only).
417    /// Use when data is already accepted and you just need to track it.
418    #[inline]
419    pub fn add_bytes(&self, bytes: u64) {
420        let new_total = self.current_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
421        self.update_pressure(new_total);
422    }
423
424    /// Release bytes after data is flushed/sent/dropped.
425    ///
426    /// Uses saturating subtraction to prevent underflow wrapping.
427    #[inline]
428    pub fn release(&self, bytes: u64) {
429        let prev = self
430            .current_bytes
431            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
432                Some(current.saturating_sub(bytes))
433            })
434            // Always succeeds (closure always returns Some).
435            .unwrap_or_else(|v| v);
436        self.update_pressure(prev.saturating_sub(bytes));
437    }
438
439    /// Fast hot-path pressure check.
440    ///
441    /// With a registered [`set_heap_source`], computes live from the true heap
442    /// (one atomic load + compare); otherwise reads the cached flag maintained
443    /// by `try_reserve`/`add_bytes`/`release`.
444    #[inline]
445    pub fn under_pressure(&self) -> bool {
446        if heap_bytes().is_some() {
447            return self.pressure_ratio() >= self.pressure_threshold;
448        }
449        self.under_pressure.load(Ordering::Relaxed)
450    }
451
452    /// Current pressure level.
453    #[inline]
454    pub fn pressure(&self) -> MemoryPressure {
455        let ratio = self.pressure_ratio();
456        if ratio >= self.pressure_threshold {
457            MemoryPressure::High
458        } else if ratio >= 0.5 {
459            MemoryPressure::Medium
460        } else {
461            MemoryPressure::Low
462        }
463    }
464
465    /// Current usage as fraction of limit (0.0 - 1.0+).
466    #[inline]
467    pub fn pressure_ratio(&self) -> f64 {
468        self.current_bytes() as f64 / self.limit_bytes as f64
469    }
470
471    /// Current memory usage in bytes.
472    ///
473    /// Returns the true total live heap when a [`set_heap_source`] is
474    /// registered, otherwise the sum of outstanding per-batch reservations.
475    #[inline]
476    pub fn current_bytes(&self) -> u64 {
477        heap_bytes().unwrap_or_else(|| self.current_bytes.load(Ordering::Relaxed))
478    }
479
480    /// Configured memory limit in bytes.
481    #[inline]
482    pub fn limit_bytes(&self) -> u64 {
483        self.limit_bytes
484    }
485
486    /// Update the pressure flag based on current usage.
487    #[inline]
488    fn update_pressure(&self, current: u64) {
489        let ratio = current as f64 / self.limit_bytes as f64;
490        self.under_pressure
491            .store(ratio >= self.pressure_threshold, Ordering::Relaxed);
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    #[test]
500    fn test_memory_guard_default() {
501        let guard = MemoryGuard::new(MemoryGuardConfig {
502            limit_bytes: 1_000_000, // 1MB explicit
503            ..Default::default()
504        });
505        assert_eq!(guard.limit_bytes(), 1_000_000);
506        assert_eq!(guard.current_bytes(), 0);
507        assert!(!guard.under_pressure());
508        assert_eq!(guard.pressure(), MemoryPressure::Low);
509    }
510
511    #[test]
512    fn test_try_reserve_within_limit() {
513        let guard = MemoryGuard::new(MemoryGuardConfig {
514            limit_bytes: 1000,
515            ..Default::default()
516        });
517        assert!(guard.try_reserve(500));
518        assert_eq!(guard.current_bytes(), 500);
519    }
520
521    #[test]
522    fn test_try_reserve_over_limit() {
523        let guard = MemoryGuard::new(MemoryGuardConfig {
524            limit_bytes: 1000,
525            ..Default::default()
526        });
527        assert!(guard.try_reserve(500));
528        assert!(!guard.try_reserve(600)); // would exceed 1000
529        assert_eq!(guard.current_bytes(), 500); // rolled back
530        assert!(guard.under_pressure());
531    }
532
533    #[test]
534    fn test_release_reduces_pressure() {
535        let guard = MemoryGuard::new(MemoryGuardConfig {
536            limit_bytes: 1000,
537            pressure_threshold: 0.8,
538            ..Default::default()
539        });
540        guard.add_bytes(900); // 90% -- over threshold
541        assert!(guard.under_pressure());
542        assert_eq!(guard.pressure(), MemoryPressure::High);
543
544        guard.release(500); // down to 400 = 40%
545        assert!(!guard.under_pressure());
546        assert_eq!(guard.pressure(), MemoryPressure::Low);
547    }
548
549    #[test]
550    fn test_pressure_levels() {
551        let guard = MemoryGuard::new(MemoryGuardConfig {
552            limit_bytes: 1000,
553            pressure_threshold: 0.8,
554            ..Default::default()
555        });
556
557        // Low (< 50%)
558        guard.add_bytes(400);
559        assert_eq!(guard.pressure(), MemoryPressure::Low);
560
561        // Medium (50-80%)
562        guard.add_bytes(200); // 600 = 60%
563        assert_eq!(guard.pressure(), MemoryPressure::Medium);
564
565        // High (>= 80%)
566        guard.add_bytes(300); // 900 = 90%
567        assert_eq!(guard.pressure(), MemoryPressure::High);
568    }
569
570    #[test]
571    fn test_pressure_ratio() {
572        let guard = MemoryGuard::new(MemoryGuardConfig {
573            limit_bytes: 1000,
574            ..Default::default()
575        });
576        guard.add_bytes(250);
577        let ratio = guard.pressure_ratio();
578        assert!((ratio - 0.25).abs() < 0.001);
579    }
580
581    #[test]
582    fn test_release_saturating() {
583        let guard = MemoryGuard::new(MemoryGuardConfig {
584            limit_bytes: 1000,
585            ..Default::default()
586        });
587        guard.add_bytes(100);
588        guard.release(200); // release more than added -- saturates to 0
589        assert_eq!(
590            guard.current_bytes(),
591            0,
592            "over-release must saturate to 0, not wrap"
593        );
594        assert!(!guard.under_pressure());
595        assert_eq!(guard.pressure(), MemoryPressure::Low);
596
597        // Verify the guard is still functional after over-release
598        assert!(guard.try_reserve(500));
599        assert_eq!(guard.current_bytes(), 500);
600    }
601
602    #[test]
603    fn test_concurrent_reserve_release() {
604        use std::sync::Arc;
605        use std::thread;
606
607        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
608            limit_bytes: 100_000,
609            pressure_threshold: 0.8,
610            ..Default::default()
611        }));
612
613        let mut handles = vec![];
614        for _ in 0..10 {
615            let g = Arc::clone(&guard);
616            handles.push(thread::spawn(move || {
617                for _ in 0..100 {
618                    g.add_bytes(100);
619                    g.release(100);
620                }
621            }));
622        }
623        for h in handles {
624            h.join().unwrap();
625        }
626        // All bytes should be released -- may not be exactly 0 due to ordering
627        // but should be close (within one thread's batch)
628        assert!(
629            guard.current_bytes() < 1000,
630            "leaked bytes: {}",
631            guard.current_bytes()
632        );
633    }
634
635    #[test]
636    fn test_try_reserve_rollback_is_atomic() {
637        let guard = MemoryGuard::new(MemoryGuardConfig {
638            limit_bytes: 100,
639            ..Default::default()
640        });
641        assert!(guard.try_reserve(90));
642        assert!(!guard.try_reserve(20)); // over limit, rolled back
643        assert_eq!(guard.current_bytes(), 90); // not 110
644        assert!(guard.try_reserve(10)); // exactly at limit
645        assert_eq!(guard.current_bytes(), 100);
646    }
647
648    // Process-global heap source for the switch test. nextest isolates each
649    // test in its own process, so registering it here is contained to this
650    // test and does not leak into the per-batch-counter tests above. (This is
651    // the single test in this module that touches the global hook.)
652    static TEST_HEAP: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
653    fn test_heap_source() -> usize {
654        TEST_HEAP.load(Ordering::Relaxed)
655    }
656
657    #[test]
658    fn heap_source_overrides_read_path_and_admission() {
659        assert!(set_heap_source(test_heap_source), "first set wins");
660        assert!(
661            !set_heap_source(test_heap_source),
662            "second set is a no-op (first-wins)"
663        );
664
665        let guard = MemoryGuard::new(MemoryGuardConfig {
666            limit_bytes: 1_000,
667            pressure_threshold: 0.8,
668            ..Default::default()
669        });
670
671        // Reads come from the heap source, not the reservation counter.
672        TEST_HEAP.store(250, Ordering::Relaxed);
673        assert_eq!(guard.current_bytes(), 250);
674        assert!((guard.pressure_ratio() - 0.25).abs() < 0.001);
675        assert!(!guard.under_pressure());
676
677        // Pressure tracks the live heap -- including growth never reserved,
678        // which the per-batch counter would have been blind to.
679        TEST_HEAP.store(850, Ordering::Relaxed);
680        assert!(
681            guard.under_pressure(),
682            "85% live heap is over the 80% threshold"
683        );
684        assert_eq!(guard.pressure(), MemoryPressure::High);
685
686        // try_reserve is a projected-admission check against the true heap and
687        // does NOT mutate the reservation counter.
688        TEST_HEAP.store(900, Ordering::Relaxed);
689        assert!(guard.try_reserve(100), "900 + 100 == limit, admitted");
690        assert!(!guard.try_reserve(200), "900 + 200 > limit, rejected");
691        assert_eq!(
692            guard.current_bytes(),
693            900,
694            "counter untouched by try_reserve"
695        );
696    }
697
698    #[test]
699    fn effective_auto_limit_caps_at_memory_high() {
700        // headroom 0.85 of 1000 = 850; high 600 is lower -> cap at the soft
701        // throttle so we shed before the kernel does.
702        assert_eq!(effective_auto_limit(1000, 0.85, Some(600)), 600);
703        // high above the headroom limit -> headroom wins (no change).
704        assert_eq!(effective_auto_limit(1000, 0.85, Some(900)), 850);
705        // no memory.high in force -> headroom limit.
706        assert_eq!(effective_auto_limit(1000, 0.85, None), 850);
707    }
708
709    #[test]
710    fn test_config_defaults() {
711        let config = MemoryGuardConfig::default();
712        assert_eq!(config.limit_bytes, 0);
713        assert!((config.pressure_threshold - 0.80).abs() < 0.001);
714        assert!((config.cgroup_headroom - 0.85).abs() < 0.001);
715    }
716
717    #[test]
718    fn test_from_env_raw_defaults_when_unset() {
719        // With no env vars set, should return defaults
720        let config = MemoryGuardConfig::from_env_raw("TEST_MG_UNSET");
721        assert_eq!(config.limit_bytes, 0);
722        assert!((config.pressure_threshold - 0.80).abs() < 0.001);
723        assert!((config.cgroup_headroom - 0.85).abs() < 0.001);
724    }
725
726    #[test]
727    fn test_env_parsed_helper() {
728        // env_parsed returns None for unset vars
729        assert!(env_parsed::<u64>("NONEXISTENT_PREFIX_XYZ", "FOO").is_none());
730        assert!(env_parsed::<f64>("NONEXISTENT_PREFIX_XYZ", "BAR").is_none());
731    }
732
733    #[test]
734    fn test_guard_with_explicit_config_overrides() {
735        // Simulates what from_env would produce with overrides
736        let config = MemoryGuardConfig {
737            limit_bytes: 2_147_483_648,
738            pressure_threshold: 0.75,
739            cgroup_headroom: 0.90,
740        };
741        let guard = MemoryGuard::new(config);
742        assert_eq!(guard.limit_bytes(), 2_147_483_648);
743    }
744
745    #[test]
746    fn test_guard_with_custom_headroom() {
747        // 85% headroom on 1 GiB = 870 MiB effective
748        let config = MemoryGuardConfig {
749            limit_bytes: 0, // auto-detect
750            pressure_threshold: 0.80,
751            cgroup_headroom: 0.85,
752        };
753        let guard = MemoryGuard::new(config);
754        // Auto-detected, so limit should be 85% of system/cgroup memory
755        assert!(guard.limit_bytes() > 0);
756    }
757
758    #[test]
759    fn test_validate_accepts_defaults_and_rejects_bad_fractions() {
760        assert!(MemoryGuardConfig::default().validate().is_ok());
761
762        for bad in [0.0, -0.1, 1.5, f64::NAN, f64::INFINITY] {
763            let cfg = MemoryGuardConfig {
764                pressure_threshold: bad,
765                ..Default::default()
766            };
767            assert!(
768                cfg.validate().is_err(),
769                "pressure_threshold={bad} must be rejected"
770            );
771            let cfg = MemoryGuardConfig {
772                cgroup_headroom: bad,
773                ..Default::default()
774            };
775            assert!(
776                cfg.validate().is_err(),
777                "cgroup_headroom={bad} must be rejected"
778            );
779        }
780    }
781
782    #[test]
783    fn test_new_clamps_invalid_config_no_divide_by_zero() {
784        // A zero/NaN headroom with auto-detect could yield a zero limit ->
785        // divide-by-zero. A zero pressure_threshold would make every ratio
786        // "over". new() must clamp to safe defaults and keep ratios finite.
787        let guard = MemoryGuard::new(MemoryGuardConfig {
788            limit_bytes: 0,
789            pressure_threshold: 0.0,
790            cgroup_headroom: 0.0,
791        });
792        assert!(guard.limit_bytes() >= 1, "limit floored at >=1");
793        guard.add_bytes(10);
794        assert!(
795            guard.pressure_ratio().is_finite(),
796            "pressure ratio must be finite, not div-by-zero"
797        );
798    }
799
800    #[test]
801    fn test_new_with_nan_threshold_is_finite() {
802        let guard = MemoryGuard::new(MemoryGuardConfig {
803            limit_bytes: 1000,
804            pressure_threshold: f64::NAN,
805            cgroup_headroom: f64::NAN,
806        });
807        assert_eq!(guard.limit_bytes(), 1000);
808        guard.add_bytes(900);
809        // Clamped threshold (0.8 default) -> 90% is over -> under pressure.
810        assert!(guard.under_pressure());
811    }
812
813    #[test]
814    fn test_auto_detect_limit() {
815        // With limit_bytes = 0, should auto-detect from system
816        let guard = MemoryGuard::new(MemoryGuardConfig::default());
817        assert!(
818            guard.limit_bytes() > 0,
819            "auto-detected limit should be positive"
820        );
821        // Should be less than total system memory (headroom applied)
822    }
823}