hyperi_rustlib/memory/guard.rs
1// Project: hyperi-rustlib
2// File: src/memory/guard.rs
3// Purpose: Memory guard with backpressure signals
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Memory guard with backpressure signals.
10
11use std::sync::OnceLock;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13
14use super::cgroup;
15
16/// Process-wide total-heap byte source, set once at startup.
17///
18/// See [`set_heap_source`] for the rationale. Allocator-agnostic: any
19/// `fn() -> usize` returning live heap bytes (e.g. `cap::Cap::allocated`,
20/// jemalloc `stats.allocated`).
21static HEAP_SOURCE: OnceLock<fn() -> usize> = OnceLock::new();
22
23/// Register a process-wide source of total live-heap bytes.
24///
25/// When set, every [`MemoryGuard`] switches its read path
26/// ([`current_bytes`](MemoryGuard::current_bytes), pressure checks, and
27/// [`try_reserve`](MemoryGuard::try_reserve) admission) from the per-batch
28/// reservation counter to this source -- a cheap, accurate, *total-process*
29/// heap figure that also catches growth the per-batch reservations never see
30/// (e.g. a transform ballooning a `Vec`).
31///
32/// **Why a global hook and not a dependency:** a tracking allocator must be
33/// the binary's single `#[global_allocator]`, which is the *application's*
34/// choice, not a library's -- and rustlib is `#![forbid(unsafe_code)]`, so it
35/// cannot implement one anyway. The application installs its allocator and
36/// wires it here in a few lines. This keeps rustlib allocator-agnostic with no
37/// allocator dependency in its graph.
38///
39/// The first call wins and returns `true`; later calls are a no-op and return
40/// `false` (the existing source is kept). Call once at startup, before
41/// constructing guards.
42///
43/// The application picks a tracking allocator -- prefer an actively-maintained
44/// one such as `tikv-jemalloc-ctl` (`stats.allocated`); the `cap` crate also
45/// works but is effectively unmaintained (last release 2023).
46///
47/// ```ignore
48/// // In the application binary, using jemalloc:
49/// #[global_allocator]
50/// static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
51///
52/// fn main() {
53/// hyperi_rustlib::memory::set_heap_source(|| {
54/// tikv_jemalloc_ctl::epoch::advance().ok();
55/// tikv_jemalloc_ctl::stats::allocated::read().unwrap_or(0)
56/// });
57/// // ... build ServiceRuntime / MemoryGuard ...
58/// }
59/// ```
60#[must_use]
61pub fn set_heap_source(source: fn() -> usize) -> bool {
62 HEAP_SOURCE.set(source).is_ok()
63}
64
65/// Read the registered total-heap source, if any.
66#[inline]
67fn heap_bytes() -> Option<u64> {
68 HEAP_SOURCE.get().map(|f| f() as u64)
69}
70
71/// Read an env var `{PREFIX}_{SUFFIX}` and parse it.
72fn env_parsed<T: std::str::FromStr>(prefix: &str, suffix: &str) -> Option<T> {
73 std::env::var(format!("{prefix}_{suffix}"))
74 .ok()
75 .and_then(|v| v.parse().ok())
76}
77
78/// Memory pressure levels.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum MemoryPressure {
81 /// Usage below 50% of limit.
82 Low,
83 /// Usage between 50% and pressure_threshold.
84 Medium,
85 /// Usage above pressure_threshold -- apply backpressure.
86 High,
87}
88
89/// Configuration for `MemoryGuard`.
90///
91/// When the `config` feature is enabled, this can be loaded from the config
92/// cascade under the `memory` key:
93///
94/// ```yaml
95/// memory:
96/// limit_bytes: 0 # 0 = auto-detect from cgroup/system
97/// pressure_threshold: 0.80 # backpressure at 80% of effective limit
98/// cgroup_headroom: 0.85 # use 85% of cgroup limit
99/// ```
100#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
101pub struct MemoryGuardConfig {
102 /// Explicit memory limit in bytes. 0 = auto-detect from cgroup/system.
103 #[serde(default)]
104 pub limit_bytes: u64,
105 /// Fraction of limit at which backpressure activates (default 0.8).
106 #[serde(default = "default_pressure_threshold")]
107 pub pressure_threshold: f64,
108 /// Fraction of cgroup limit to use as the effective limit (default 0.85).
109 /// Leaves headroom for the process itself (stack, code, etc.).
110 #[serde(default = "default_cgroup_headroom")]
111 pub cgroup_headroom: f64,
112}
113
114fn default_pressure_threshold() -> f64 {
115 DEFAULT_PRESSURE_THRESHOLD
116}
117
118fn default_cgroup_headroom() -> f64 {
119 DEFAULT_CGROUP_HEADROOM
120}
121
122/// A fraction is valid iff it is finite and within `(0.0, 1.0]`.
123fn check_fraction(v: f64, name: &str) -> Result<(), String> {
124 if !v.is_finite() || v <= 0.0 || v > 1.0 {
125 return Err(format!(
126 "memory.{name} must be a finite fraction in (0.0, 1.0], got {v}"
127 ));
128 }
129 Ok(())
130}
131
132/// Return `v` if it is a valid fraction, else log an error and substitute
133/// `default`. Defensive guard so a bad config cannot produce a zero/`NaN`
134/// limit and a divide-by-zero pressure ratio.
135fn sane_fraction(v: f64, default: f64, name: &str) -> f64 {
136 if check_fraction(v, name).is_err() {
137 tracing::error!(
138 value = v,
139 "invalid memory.{name} (need finite fraction in (0,1]); using default {default}"
140 );
141 default
142 } else {
143 v
144 }
145}
146
147/// Effective auto-detected limit: cgroup limit * headroom, capped at the soft
148/// throttle (`memory.high`) when that is set lower.
149///
150/// The kernel reclaims hard and throttles allocations at `memory.high` (before
151/// the `memory.max` OOM-kill), so admitting past it courts a latency cliff.
152/// Pure, so the cap logic is unit-testable without touching the real cgroup
153/// files.
154#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
155fn effective_auto_limit(detected: u64, headroom: f64, high: Option<u64>) -> u64 {
156 let headroom_limit = (detected as f64 * headroom) as u64;
157 match high {
158 Some(h) => headroom_limit.min(h),
159 None => headroom_limit,
160 }
161}
162
163/// Default cgroup headroom: use 85% of cgroup limit.
164///
165/// Rationale: Rust has no GC so no spike headroom needed (unlike JVM 75% / Go 80%).
166/// 15% headroom covers jemalloc fragmentation, kernel overhead, and page cache.
167const DEFAULT_CGROUP_HEADROOM: f64 = 0.85;
168
169/// Default pressure threshold: backpressure at 80% of effective limit.
170///
171/// With 85% headroom, backpressure activates at ~68% of actual cgroup limit.
172/// Matches OTel Collector's `limit_percentage: 80` philosophy.
173const DEFAULT_PRESSURE_THRESHOLD: f64 = 0.80;
174
175impl Default for MemoryGuardConfig {
176 fn default() -> Self {
177 Self {
178 limit_bytes: 0, // auto-detect
179 pressure_threshold: DEFAULT_PRESSURE_THRESHOLD,
180 cgroup_headroom: DEFAULT_CGROUP_HEADROOM,
181 }
182 }
183}
184
185impl MemoryGuardConfig {
186 /// Load from the config cascade, falling back to defaults.
187 ///
188 /// When the `config` feature is enabled and `config::setup()` has been
189 /// called, reads the `memory` key from the cascade. Otherwise returns
190 /// [`MemoryGuardConfig::default()`].
191 #[must_use]
192 pub fn from_cascade() -> Self {
193 #[cfg(feature = "config")]
194 {
195 if let Some(cfg) = crate::config::try_get()
196 && let Ok(memory) = cfg.unmarshal_key_registered::<Self>("memory")
197 {
198 return memory;
199 }
200 }
201 Self::default()
202 }
203
204 /// Create config from environment variables with a prefix.
205 ///
206 /// Reads standard env vars for memory configuration:
207 /// - `{PREFIX}_MEMORY_LIMIT_BYTES` -- explicit limit (0 or unset = auto-detect from cgroup)
208 /// - `{PREFIX}_MEMORY_PRESSURE_THRESHOLD` -- backpressure trigger (default 0.80)
209 /// - `{PREFIX}_MEMORY_CGROUP_HEADROOM` -- fraction of cgroup limit to use (default 0.85)
210 ///
211 /// # Example
212 ///
213 /// ```bash
214 /// DFE_MEMORY_LIMIT_BYTES=4294967296 # 4 GiB explicit
215 /// DFE_MEMORY_PRESSURE_THRESHOLD=0.75 # backpressure at 75%
216 /// DFE_MEMORY_CGROUP_HEADROOM=0.90 # use 90% of cgroup
217 /// ```
218 ///
219 /// ```rust,no_run
220 /// use hyperi_rustlib::memory::MemoryGuardConfig;
221 /// let config = MemoryGuardConfig::from_env("DFE");
222 /// ```
223 #[must_use]
224 #[cfg(feature = "config")]
225 pub fn from_env(prefix: &str) -> Self {
226 let mut config = Self::default();
227 config.apply_flat_env(prefix);
228 config
229 }
230
231 /// Overlay the flat `{PREFIX}_MEMORY_*` env vars onto this config in place.
232 ///
233 /// This is the legacy DFE single-underscore convention
234 /// (`DFE_MEMORY_LIMIT_BYTES`), distinct from the config cascade's nested
235 /// `__` env layer (`{APP}__MEMORY__LIMIT_BYTES`). A present var wins; an
236 /// absent var leaves the field untouched.
237 #[cfg(feature = "config")]
238 fn apply_flat_env(&mut self, prefix: &str) {
239 use crate::config::flat_env::flat_env_parsed;
240
241 if let Some(v) = flat_env_parsed::<u64>(prefix, "MEMORY_LIMIT_BYTES") {
242 self.limit_bytes = v;
243 }
244 if let Some(v) = flat_env_parsed::<f64>(prefix, "MEMORY_PRESSURE_THRESHOLD") {
245 self.pressure_threshold = v;
246 }
247 if let Some(v) = flat_env_parsed::<f64>(prefix, "MEMORY_CGROUP_HEADROOM") {
248 self.cgroup_headroom = v;
249 }
250 }
251
252 /// Cascade-honouring resolution: the config cascade `memory:` section
253 /// (`defaults.yaml` < `settings.yaml` < `settings.{env}.yaml` < nested `__`
254 /// env) is the BASE, with the legacy flat `{PREFIX}_MEMORY_*` env vars
255 /// overlaid as the most-specific operator override.
256 ///
257 /// This is what the live guard should use. [`from_env`](Self::from_env)
258 /// alone ignores the cascade `memory:` section entirely -- setting it in
259 /// YAML did nothing for the guard before 2.8.12 (the recurring
260 /// "set a cascade value, nothing happens" footgun). Requires
261 /// `config::setup()` to have populated the cascade (which `run_app` now
262 /// does, since 2.8.11).
263 #[must_use]
264 #[cfg(feature = "config")]
265 pub fn from_cascade_with_env(prefix: &str) -> Self {
266 let mut config = Self::from_cascade();
267 config.apply_flat_env(prefix);
268 config
269 }
270
271 /// Create config from environment variables without requiring `config` feature.
272 ///
273 /// Same as [`from_env`](Self::from_env) but uses `std::env` directly.
274 #[must_use]
275 pub fn from_env_raw(prefix: &str) -> Self {
276 let mut config = Self::default();
277
278 if let Some(v) = env_parsed::<u64>(prefix, "MEMORY_LIMIT_BYTES") {
279 config.limit_bytes = v;
280 }
281 if let Some(v) = env_parsed::<f64>(prefix, "MEMORY_PRESSURE_THRESHOLD") {
282 config.pressure_threshold = v;
283 }
284 if let Some(v) = env_parsed::<f64>(prefix, "MEMORY_CGROUP_HEADROOM") {
285 config.cgroup_headroom = v;
286 }
287
288 config
289 }
290
291 /// Validate the config, returning an error describing the first invalid
292 /// field. `pressure_threshold` and `cgroup_headroom` must each be a finite
293 /// fraction in `(0.0, 1.0]`. Call this at startup to fail fast on bad
294 /// config rather than relying on [`MemoryGuard::new`]'s defensive clamping.
295 ///
296 /// # Errors
297 ///
298 /// Returns `Err` with a human-readable message if a fraction field is
299 /// non-finite, `<= 0.0`, or `> 1.0`.
300 pub fn validate(&self) -> Result<(), String> {
301 check_fraction(self.pressure_threshold, "pressure_threshold")?;
302 check_fraction(self.cgroup_headroom, "cgroup_headroom")?;
303 Ok(())
304 }
305}
306
307/// Cgroup-aware memory tracking with backpressure signals.
308///
309/// Tracks application-level memory usage (not process RSS) and provides
310/// fast atomic checks for the hot path. Designed for data pipeline services
311/// where incoming data must be rejected (503) before hitting the container
312/// memory limit.
313///
314/// # Usage
315///
316/// ```rust,no_run
317/// use hyperi_rustlib::memory::{MemoryGuard, MemoryGuardConfig};
318///
319/// let guard = MemoryGuard::new(MemoryGuardConfig::default());
320///
321/// // On data arrival -- check before accepting
322/// let payload_len = 1024u64;
323/// if !guard.try_reserve(payload_len) {
324/// // return 503 -- backpressure
325/// }
326///
327/// // After data is flushed/sent
328/// guard.release(payload_len);
329///
330/// // Fast hot-path check
331/// if guard.under_pressure() {
332/// // return 503
333/// }
334/// ```
335pub struct MemoryGuard {
336 /// Current tracked bytes (application-level, not RSS).
337 current_bytes: AtomicU64,
338 /// Effective memory limit in bytes.
339 limit_bytes: u64,
340 /// Pressure threshold (0.0-1.0).
341 pressure_threshold: f64,
342 /// Fast boolean for hot-path pressure check.
343 under_pressure: AtomicBool,
344}
345
346impl MemoryGuard {
347 /// Create a new memory guard.
348 ///
349 /// If `config.limit_bytes` is 0, auto-detects from cgroup (K8s) or system memory,
350 /// then applies `cgroup_headroom` factor to leave room for process overhead.
351 #[must_use]
352 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
353 pub fn new(config: MemoryGuardConfig) -> Self {
354 // Defensive: a non-finite / out-of-range threshold or headroom would
355 // produce a zero/NaN limit and a divide-by-zero pressure ratio. Clamp
356 // to the safe default and log loudly. Callers wanting hard rejection
357 // should call `config.validate()` at startup.
358 let pressure_threshold = sane_fraction(
359 config.pressure_threshold,
360 DEFAULT_PRESSURE_THRESHOLD,
361 "pressure_threshold",
362 );
363 let cgroup_headroom = sane_fraction(
364 config.cgroup_headroom,
365 DEFAULT_CGROUP_HEADROOM,
366 "cgroup_headroom",
367 );
368
369 let raw_limit = if config.limit_bytes > 0 {
370 config.limit_bytes
371 } else {
372 effective_auto_limit(
373 cgroup::detect_memory_limit(),
374 cgroup_headroom,
375 cgroup::detect_memory_high(),
376 )
377 };
378 // Never permit a zero effective limit: every pressure calculation
379 // divides by it.
380 let limit_bytes = raw_limit.max(1);
381
382 tracing::info!(limit_bytes, pressure_threshold, "memory guard initialised");
383
384 Self {
385 current_bytes: AtomicU64::new(0),
386 limit_bytes,
387 pressure_threshold,
388 under_pressure: AtomicBool::new(false),
389 }
390 }
391
392 /// Try to reserve bytes. Returns false if over the limit (backpressure).
393 ///
394 /// With a registered [`set_heap_source`], this is a projected-admission
395 /// check against the *true total heap* (`heap() + bytes <= limit`) and does
396 /// NOT mutate the reservation counter -- the allocator already accounts the
397 /// bytes once they are allocated, and frees them on drop, so no `release`
398 /// is needed. Without a source it is the classic atomic check-and-add on
399 /// the per-batch counter (rolled back if it would exceed the limit).
400 #[inline]
401 pub fn try_reserve(&self, bytes: u64) -> bool {
402 if let Some(heap) = heap_bytes() {
403 return heap + bytes <= self.limit_bytes;
404 }
405 let current = self.current_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
406 if current > self.limit_bytes {
407 // Over limit -- roll back
408 self.current_bytes.fetch_sub(bytes, Ordering::Relaxed);
409 self.under_pressure.store(true, Ordering::Relaxed);
410 return false;
411 }
412 self.update_pressure(current);
413 true
414 }
415
416 /// Add bytes without checking the limit (for tracking only).
417 /// Use when data is already accepted and you just need to track it.
418 #[inline]
419 pub fn add_bytes(&self, bytes: u64) {
420 let new_total = self.current_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
421 self.update_pressure(new_total);
422 }
423
424 /// Release bytes after data is flushed/sent/dropped.
425 ///
426 /// Uses saturating subtraction to prevent underflow wrapping.
427 #[inline]
428 pub fn release(&self, bytes: u64) {
429 let prev = self
430 .current_bytes
431 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
432 Some(current.saturating_sub(bytes))
433 })
434 // Always succeeds (closure always returns Some).
435 .unwrap_or_else(|v| v);
436 self.update_pressure(prev.saturating_sub(bytes));
437 }
438
439 /// Fast hot-path pressure check.
440 ///
441 /// With a registered [`set_heap_source`], computes live from the true heap
442 /// (one atomic load + compare); otherwise reads the cached flag maintained
443 /// by `try_reserve`/`add_bytes`/`release`.
444 #[inline]
445 pub fn under_pressure(&self) -> bool {
446 if heap_bytes().is_some() {
447 return self.pressure_ratio() >= self.pressure_threshold;
448 }
449 self.under_pressure.load(Ordering::Relaxed)
450 }
451
452 /// Current pressure level.
453 #[inline]
454 pub fn pressure(&self) -> MemoryPressure {
455 let ratio = self.pressure_ratio();
456 if ratio >= self.pressure_threshold {
457 MemoryPressure::High
458 } else if ratio >= 0.5 {
459 MemoryPressure::Medium
460 } else {
461 MemoryPressure::Low
462 }
463 }
464
465 /// Current usage as fraction of limit (0.0 - 1.0+).
466 #[inline]
467 pub fn pressure_ratio(&self) -> f64 {
468 self.current_bytes() as f64 / self.limit_bytes as f64
469 }
470
471 /// Current memory usage in bytes.
472 ///
473 /// Returns the true total live heap when a [`set_heap_source`] is
474 /// registered, otherwise the sum of outstanding per-batch reservations.
475 #[inline]
476 pub fn current_bytes(&self) -> u64 {
477 heap_bytes().unwrap_or_else(|| self.current_bytes.load(Ordering::Relaxed))
478 }
479
480 /// Configured memory limit in bytes.
481 #[inline]
482 pub fn limit_bytes(&self) -> u64 {
483 self.limit_bytes
484 }
485
486 /// Update the pressure flag based on current usage.
487 #[inline]
488 fn update_pressure(&self, current: u64) {
489 let ratio = current as f64 / self.limit_bytes as f64;
490 self.under_pressure
491 .store(ratio >= self.pressure_threshold, Ordering::Relaxed);
492 }
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 #[test]
500 fn test_memory_guard_default() {
501 let guard = MemoryGuard::new(MemoryGuardConfig {
502 limit_bytes: 1_000_000, // 1MB explicit
503 ..Default::default()
504 });
505 assert_eq!(guard.limit_bytes(), 1_000_000);
506 assert_eq!(guard.current_bytes(), 0);
507 assert!(!guard.under_pressure());
508 assert_eq!(guard.pressure(), MemoryPressure::Low);
509 }
510
511 #[test]
512 fn test_try_reserve_within_limit() {
513 let guard = MemoryGuard::new(MemoryGuardConfig {
514 limit_bytes: 1000,
515 ..Default::default()
516 });
517 assert!(guard.try_reserve(500));
518 assert_eq!(guard.current_bytes(), 500);
519 }
520
521 #[test]
522 fn test_try_reserve_over_limit() {
523 let guard = MemoryGuard::new(MemoryGuardConfig {
524 limit_bytes: 1000,
525 ..Default::default()
526 });
527 assert!(guard.try_reserve(500));
528 assert!(!guard.try_reserve(600)); // would exceed 1000
529 assert_eq!(guard.current_bytes(), 500); // rolled back
530 assert!(guard.under_pressure());
531 }
532
533 #[test]
534 fn test_release_reduces_pressure() {
535 let guard = MemoryGuard::new(MemoryGuardConfig {
536 limit_bytes: 1000,
537 pressure_threshold: 0.8,
538 ..Default::default()
539 });
540 guard.add_bytes(900); // 90% -- over threshold
541 assert!(guard.under_pressure());
542 assert_eq!(guard.pressure(), MemoryPressure::High);
543
544 guard.release(500); // down to 400 = 40%
545 assert!(!guard.under_pressure());
546 assert_eq!(guard.pressure(), MemoryPressure::Low);
547 }
548
549 #[test]
550 fn test_pressure_levels() {
551 let guard = MemoryGuard::new(MemoryGuardConfig {
552 limit_bytes: 1000,
553 pressure_threshold: 0.8,
554 ..Default::default()
555 });
556
557 // Low (< 50%)
558 guard.add_bytes(400);
559 assert_eq!(guard.pressure(), MemoryPressure::Low);
560
561 // Medium (50-80%)
562 guard.add_bytes(200); // 600 = 60%
563 assert_eq!(guard.pressure(), MemoryPressure::Medium);
564
565 // High (>= 80%)
566 guard.add_bytes(300); // 900 = 90%
567 assert_eq!(guard.pressure(), MemoryPressure::High);
568 }
569
570 #[test]
571 fn test_pressure_ratio() {
572 let guard = MemoryGuard::new(MemoryGuardConfig {
573 limit_bytes: 1000,
574 ..Default::default()
575 });
576 guard.add_bytes(250);
577 let ratio = guard.pressure_ratio();
578 assert!((ratio - 0.25).abs() < 0.001);
579 }
580
581 #[test]
582 fn test_release_saturating() {
583 let guard = MemoryGuard::new(MemoryGuardConfig {
584 limit_bytes: 1000,
585 ..Default::default()
586 });
587 guard.add_bytes(100);
588 guard.release(200); // release more than added -- saturates to 0
589 assert_eq!(
590 guard.current_bytes(),
591 0,
592 "over-release must saturate to 0, not wrap"
593 );
594 assert!(!guard.under_pressure());
595 assert_eq!(guard.pressure(), MemoryPressure::Low);
596
597 // Verify the guard is still functional after over-release
598 assert!(guard.try_reserve(500));
599 assert_eq!(guard.current_bytes(), 500);
600 }
601
602 #[test]
603 fn test_concurrent_reserve_release() {
604 use std::sync::Arc;
605 use std::thread;
606
607 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
608 limit_bytes: 100_000,
609 pressure_threshold: 0.8,
610 ..Default::default()
611 }));
612
613 let mut handles = vec![];
614 for _ in 0..10 {
615 let g = Arc::clone(&guard);
616 handles.push(thread::spawn(move || {
617 for _ in 0..100 {
618 g.add_bytes(100);
619 g.release(100);
620 }
621 }));
622 }
623 for h in handles {
624 h.join().unwrap();
625 }
626 // All bytes should be released -- may not be exactly 0 due to ordering
627 // but should be close (within one thread's batch)
628 assert!(
629 guard.current_bytes() < 1000,
630 "leaked bytes: {}",
631 guard.current_bytes()
632 );
633 }
634
635 #[test]
636 fn test_try_reserve_rollback_is_atomic() {
637 let guard = MemoryGuard::new(MemoryGuardConfig {
638 limit_bytes: 100,
639 ..Default::default()
640 });
641 assert!(guard.try_reserve(90));
642 assert!(!guard.try_reserve(20)); // over limit, rolled back
643 assert_eq!(guard.current_bytes(), 90); // not 110
644 assert!(guard.try_reserve(10)); // exactly at limit
645 assert_eq!(guard.current_bytes(), 100);
646 }
647
648 // Process-global heap source for the switch test. nextest isolates each
649 // test in its own process, so registering it here is contained to this
650 // test and does not leak into the per-batch-counter tests above. (This is
651 // the single test in this module that touches the global hook.)
652 static TEST_HEAP: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
653 fn test_heap_source() -> usize {
654 TEST_HEAP.load(Ordering::Relaxed)
655 }
656
657 #[test]
658 fn heap_source_overrides_read_path_and_admission() {
659 assert!(set_heap_source(test_heap_source), "first set wins");
660 assert!(
661 !set_heap_source(test_heap_source),
662 "second set is a no-op (first-wins)"
663 );
664
665 let guard = MemoryGuard::new(MemoryGuardConfig {
666 limit_bytes: 1_000,
667 pressure_threshold: 0.8,
668 ..Default::default()
669 });
670
671 // Reads come from the heap source, not the reservation counter.
672 TEST_HEAP.store(250, Ordering::Relaxed);
673 assert_eq!(guard.current_bytes(), 250);
674 assert!((guard.pressure_ratio() - 0.25).abs() < 0.001);
675 assert!(!guard.under_pressure());
676
677 // Pressure tracks the live heap -- including growth never reserved,
678 // which the per-batch counter would have been blind to.
679 TEST_HEAP.store(850, Ordering::Relaxed);
680 assert!(
681 guard.under_pressure(),
682 "85% live heap is over the 80% threshold"
683 );
684 assert_eq!(guard.pressure(), MemoryPressure::High);
685
686 // try_reserve is a projected-admission check against the true heap and
687 // does NOT mutate the reservation counter.
688 TEST_HEAP.store(900, Ordering::Relaxed);
689 assert!(guard.try_reserve(100), "900 + 100 == limit, admitted");
690 assert!(!guard.try_reserve(200), "900 + 200 > limit, rejected");
691 assert_eq!(
692 guard.current_bytes(),
693 900,
694 "counter untouched by try_reserve"
695 );
696 }
697
698 #[test]
699 fn effective_auto_limit_caps_at_memory_high() {
700 // headroom 0.85 of 1000 = 850; high 600 is lower -> cap at the soft
701 // throttle so we shed before the kernel does.
702 assert_eq!(effective_auto_limit(1000, 0.85, Some(600)), 600);
703 // high above the headroom limit -> headroom wins (no change).
704 assert_eq!(effective_auto_limit(1000, 0.85, Some(900)), 850);
705 // no memory.high in force -> headroom limit.
706 assert_eq!(effective_auto_limit(1000, 0.85, None), 850);
707 }
708
709 #[test]
710 fn test_config_defaults() {
711 let config = MemoryGuardConfig::default();
712 assert_eq!(config.limit_bytes, 0);
713 assert!((config.pressure_threshold - 0.80).abs() < 0.001);
714 assert!((config.cgroup_headroom - 0.85).abs() < 0.001);
715 }
716
717 #[test]
718 fn test_from_env_raw_defaults_when_unset() {
719 // With no env vars set, should return defaults
720 let config = MemoryGuardConfig::from_env_raw("TEST_MG_UNSET");
721 assert_eq!(config.limit_bytes, 0);
722 assert!((config.pressure_threshold - 0.80).abs() < 0.001);
723 assert!((config.cgroup_headroom - 0.85).abs() < 0.001);
724 }
725
726 #[test]
727 fn test_env_parsed_helper() {
728 // env_parsed returns None for unset vars
729 assert!(env_parsed::<u64>("NONEXISTENT_PREFIX_XYZ", "FOO").is_none());
730 assert!(env_parsed::<f64>("NONEXISTENT_PREFIX_XYZ", "BAR").is_none());
731 }
732
733 #[test]
734 fn test_guard_with_explicit_config_overrides() {
735 // Simulates what from_env would produce with overrides
736 let config = MemoryGuardConfig {
737 limit_bytes: 2_147_483_648,
738 pressure_threshold: 0.75,
739 cgroup_headroom: 0.90,
740 };
741 let guard = MemoryGuard::new(config);
742 assert_eq!(guard.limit_bytes(), 2_147_483_648);
743 }
744
745 #[test]
746 fn test_guard_with_custom_headroom() {
747 // 85% headroom on 1 GiB = 870 MiB effective
748 let config = MemoryGuardConfig {
749 limit_bytes: 0, // auto-detect
750 pressure_threshold: 0.80,
751 cgroup_headroom: 0.85,
752 };
753 let guard = MemoryGuard::new(config);
754 // Auto-detected, so limit should be 85% of system/cgroup memory
755 assert!(guard.limit_bytes() > 0);
756 }
757
758 #[test]
759 fn test_validate_accepts_defaults_and_rejects_bad_fractions() {
760 assert!(MemoryGuardConfig::default().validate().is_ok());
761
762 for bad in [0.0, -0.1, 1.5, f64::NAN, f64::INFINITY] {
763 let cfg = MemoryGuardConfig {
764 pressure_threshold: bad,
765 ..Default::default()
766 };
767 assert!(
768 cfg.validate().is_err(),
769 "pressure_threshold={bad} must be rejected"
770 );
771 let cfg = MemoryGuardConfig {
772 cgroup_headroom: bad,
773 ..Default::default()
774 };
775 assert!(
776 cfg.validate().is_err(),
777 "cgroup_headroom={bad} must be rejected"
778 );
779 }
780 }
781
782 #[test]
783 fn test_new_clamps_invalid_config_no_divide_by_zero() {
784 // A zero/NaN headroom with auto-detect could yield a zero limit ->
785 // divide-by-zero. A zero pressure_threshold would make every ratio
786 // "over". new() must clamp to safe defaults and keep ratios finite.
787 let guard = MemoryGuard::new(MemoryGuardConfig {
788 limit_bytes: 0,
789 pressure_threshold: 0.0,
790 cgroup_headroom: 0.0,
791 });
792 assert!(guard.limit_bytes() >= 1, "limit floored at >=1");
793 guard.add_bytes(10);
794 assert!(
795 guard.pressure_ratio().is_finite(),
796 "pressure ratio must be finite, not div-by-zero"
797 );
798 }
799
800 #[test]
801 fn test_new_with_nan_threshold_is_finite() {
802 let guard = MemoryGuard::new(MemoryGuardConfig {
803 limit_bytes: 1000,
804 pressure_threshold: f64::NAN,
805 cgroup_headroom: f64::NAN,
806 });
807 assert_eq!(guard.limit_bytes(), 1000);
808 guard.add_bytes(900);
809 // Clamped threshold (0.8 default) -> 90% is over -> under pressure.
810 assert!(guard.under_pressure());
811 }
812
813 #[test]
814 fn test_auto_detect_limit() {
815 // With limit_bytes = 0, should auto-detect from system
816 let guard = MemoryGuard::new(MemoryGuardConfig::default());
817 assert!(
818 guard.limit_bytes() > 0,
819 "auto-detected limit should be positive"
820 );
821 // Should be less than total system memory (headroom applied)
822 }
823}