Skip to main content

lean_ctx/core/
memory_guard.rs

1//! Process-level RAM guardian with adaptive eviction and hard OOM protection.
2//!
3//! Monitors RSS via platform-specific APIs and triggers tiered cache eviction
4//! when memory usage exceeds configurable thresholds (default: 5% of system RAM).
5//! At critical levels (>3x limit), performs emergency shutdown to prevent OS OOM kill.
6
7use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
8use std::sync::Arc;
9
10static PEAK_RSS: AtomicU64 = AtomicU64::new(0);
11static GUARD_RUNNING: AtomicBool = AtomicBool::new(false);
12static ABORT_REQUESTED: AtomicBool = AtomicBool::new(false);
13static CURRENT_PRESSURE: AtomicU8 = AtomicU8::new(0);
14
15/// Current process RSS in bytes, or `None` if unavailable.
16pub fn get_rss_bytes() -> Option<u64> {
17    #[cfg(target_os = "linux")]
18    {
19        linux_rss()
20    }
21    #[cfg(target_os = "macos")]
22    {
23        macos_rss()
24    }
25    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
26    {
27        None
28    }
29}
30
31/// Total physical RAM in bytes, or `None` if unavailable.
32pub fn get_system_ram_bytes() -> Option<u64> {
33    #[cfg(target_os = "linux")]
34    {
35        linux_memtotal()
36    }
37    #[cfg(target_os = "macos")]
38    {
39        macos_memsize()
40    }
41    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
42    {
43        None
44    }
45}
46
47/// Returns the RSS limit in bytes based on `max_ram_percent` config.
48pub fn rss_limit_bytes() -> Option<u64> {
49    let sys_ram = get_system_ram_bytes()?;
50    let cfg = super::config::Config::load();
51    let pct = super::config::MemoryGuardConfig::effective(&cfg).max_ram_percent;
52    Some(sys_ram / 100 * u64::from(pct))
53}
54
55/// Recorded peak RSS since process start.
56pub fn peak_rss_bytes() -> u64 {
57    PEAK_RSS.load(Ordering::Relaxed)
58}
59
60/// Snapshot of current memory state for diagnostics.
61#[derive(Debug, Clone, serde::Serialize)]
62pub struct MemorySnapshot {
63    pub rss_bytes: u64,
64    pub peak_rss_bytes: u64,
65    pub system_ram_bytes: u64,
66    pub rss_limit_bytes: u64,
67    pub rss_percent: f64,
68    pub pressure_level: PressureLevel,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize)]
72#[serde(rename_all = "lowercase")]
73#[repr(u8)]
74pub enum PressureLevel {
75    Normal = 0,
76    Soft = 1,
77    Medium = 2,
78    Hard = 3,
79    Critical = 4,
80}
81
82impl PressureLevel {
83    fn from_u8(v: u8) -> Self {
84        match v {
85            1 => Self::Soft,
86            2 => Self::Medium,
87            3 => Self::Hard,
88            4 => Self::Critical,
89            _ => Self::Normal,
90        }
91    }
92}
93
94impl MemorySnapshot {
95    pub fn capture() -> Option<Self> {
96        let rss = get_rss_bytes()?;
97        let sys = get_system_ram_bytes()?;
98        let limit = rss_limit_bytes()?;
99        let pct = if sys > 0 {
100            (rss as f64 / sys as f64) * 100.0
101        } else {
102            0.0
103        };
104
105        PEAK_RSS.fetch_max(rss, Ordering::Relaxed);
106
107        let cfg = super::config::Config::load();
108        let guard_cfg = super::config::MemoryGuardConfig::effective(&cfg);
109        let base = f64::from(guard_cfg.max_ram_percent);
110
111        let level = if pct > base * 3.0 {
112            PressureLevel::Critical
113        } else if pct > base * 2.0 {
114            PressureLevel::Hard
115        } else if pct > base * 1.4 {
116            PressureLevel::Medium
117        } else if pct > base {
118            PressureLevel::Soft
119        } else {
120            PressureLevel::Normal
121        };
122
123        Some(Self {
124            rss_bytes: rss,
125            peak_rss_bytes: PEAK_RSS.load(Ordering::Relaxed),
126            system_ram_bytes: sys,
127            rss_limit_bytes: limit,
128            rss_percent: pct,
129            pressure_level: level,
130        })
131    }
132}
133
134/// Force-purge all jemalloc arenas to return memory to the OS.
135/// Uses `MALLCTL_ARENAS_ALL` (value 4096) which is the jemalloc sentinel
136/// for "all arenas". Logs errors instead of silently swallowing them.
137pub fn jemalloc_purge() {
138    #[cfg(all(feature = "jemalloc", not(windows)))]
139    {
140        use tikv_jemalloc_ctl::raw;
141        let purge_mib = b"arena.4096.purge\0";
142        unsafe {
143            if let Err(e) = raw::write(purge_mib, 0u64) {
144                tracing::debug!("[memory_guard] jemalloc purge failed: {e}");
145            }
146        }
147    }
148}
149
150/// Returns `true` if the guardian has requested background tasks to abort.
151pub fn abort_requested() -> bool {
152    ABORT_REQUESTED.load(Ordering::Relaxed)
153}
154
155/// Quick, non-allocating memory pressure check for hot loops (scanners, indexers).
156/// Reads the cached atomic flag set by the guardian thread — O(1), no syscalls.
157pub fn is_under_pressure() -> bool {
158    current_pressure() >= PressureLevel::Soft
159}
160
161/// Returns the current pressure level as last observed by the guardian thread.
162pub fn current_pressure() -> PressureLevel {
163    PressureLevel::from_u8(CURRENT_PRESSURE.load(Ordering::Relaxed))
164}
165
166/// Start the background memory guardian task (idempotent).
167/// Polls every 3s (normal) or 1s (under pressure). At Critical level, performs
168/// emergency shutdown to prevent OS OOM kill.
169pub fn start_guard(eviction_callback: Arc<dyn Fn(PressureLevel) + Send + Sync>) {
170    if GUARD_RUNNING.swap(true, Ordering::SeqCst) {
171        return;
172    }
173    std::thread::Builder::new()
174        .name("memory-guard".into())
175        .spawn(move || {
176            let mut poll_secs = 3u64;
177            loop {
178                std::thread::sleep(std::time::Duration::from_secs(poll_secs));
179                let Some(snap) = MemorySnapshot::capture() else {
180                    continue;
181                };
182
183                CURRENT_PRESSURE.store(snap.pressure_level as u8, Ordering::Relaxed);
184
185                if snap.pressure_level == PressureLevel::Critical {
186                    tracing::error!(
187                        "[memory_guard] CRITICAL: RSS={:.0}MB ({:.1}% of {:.0}GB) — \
188                         aggressive eviction to prevent OS OOM kill",
189                        snap.rss_bytes as f64 / 1_048_576.0,
190                        snap.rss_percent,
191                        snap.system_ram_bytes as f64 / 1_073_741_824.0,
192                    );
193                    ABORT_REQUESTED.store(true, Ordering::SeqCst);
194                    (eviction_callback)(PressureLevel::Critical);
195                    jemalloc_purge();
196
197                    for attempt in 1..=3 {
198                        std::thread::sleep(std::time::Duration::from_secs(2));
199                        (eviction_callback)(PressureLevel::Critical);
200                        jemalloc_purge();
201                        if let Some(recheck) = MemorySnapshot::capture() {
202                            if recheck.pressure_level < PressureLevel::Hard {
203                                tracing::info!(
204                                    "[memory_guard] eviction attempt {attempt} succeeded — \
205                                     RSS={:.0}MB, pressure={:?}",
206                                    recheck.rss_bytes as f64 / 1_048_576.0,
207                                    recheck.pressure_level,
208                                );
209                                break;
210                            }
211                            tracing::error!(
212                                "[memory_guard] eviction attempt {attempt}/3 — still {:?} \
213                                 (RSS={:.0}MB)",
214                                recheck.pressure_level,
215                                recheck.rss_bytes as f64 / 1_048_576.0,
216                            );
217                        }
218                    }
219                }
220
221                if snap.pressure_level >= PressureLevel::Soft {
222                    poll_secs = 1;
223                    ABORT_REQUESTED
224                        .store(snap.pressure_level >= PressureLevel::Hard, Ordering::SeqCst);
225                    tracing::warn!(
226                        "[memory_guard] pressure={:?} RSS={:.0}MB limit={:.0}MB ({:.1}% of {:.0}GB)",
227                        snap.pressure_level,
228                        snap.rss_bytes as f64 / 1_048_576.0,
229                        snap.rss_limit_bytes as f64 / 1_048_576.0,
230                        snap.rss_percent,
231                        snap.system_ram_bytes as f64 / 1_073_741_824.0,
232                    );
233                    (eviction_callback)(snap.pressure_level);
234
235                    if snap.pressure_level >= PressureLevel::Hard {
236                        jemalloc_purge();
237                    }
238                } else {
239                    poll_secs = 3;
240                    if ABORT_REQUESTED.load(Ordering::Relaxed) {
241                        ABORT_REQUESTED.store(false, Ordering::SeqCst);
242                        tracing::info!("[memory_guard] pressure normalized, clearing abort flag");
243                    }
244                }
245            }
246        })
247        .ok();
248}
249
250/// Force immediate purge of all caches and jemalloc arenas.
251pub fn force_purge() {
252    jemalloc_purge();
253    tracing::info!("[memory_guard] force_purge completed");
254}
255
256// --- Platform-specific implementations ---
257
258#[cfg(target_os = "linux")]
259fn linux_rss() -> Option<u64> {
260    let status = std::fs::read_to_string("/proc/self/status").ok()?;
261    for line in status.lines() {
262        if let Some(val) = line.strip_prefix("VmRSS:") {
263            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
264            return Some(kb * 1024);
265        }
266    }
267    None
268}
269
270#[cfg(target_os = "linux")]
271fn linux_memtotal() -> Option<u64> {
272    let info = std::fs::read_to_string("/proc/meminfo").ok()?;
273    for line in info.lines() {
274        if let Some(val) = line.strip_prefix("MemTotal:") {
275            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
276            return Some(kb * 1024);
277        }
278    }
279    None
280}
281
282#[cfg(target_os = "macos")]
283#[allow(deprecated, clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
284fn macos_rss() -> Option<u64> {
285    use std::mem;
286    let mut info: libc::mach_task_basic_info_data_t = unsafe { mem::zeroed() };
287    let mut count = (mem::size_of::<libc::mach_task_basic_info_data_t>()
288        / mem::size_of::<libc::natural_t>()) as libc::mach_msg_type_number_t;
289    let kr = unsafe {
290        libc::task_info(
291            libc::mach_task_self(),
292            libc::MACH_TASK_BASIC_INFO,
293            std::ptr::from_mut(&mut info).cast::<i32>(),
294            std::ptr::from_mut(&mut count),
295        )
296    };
297    if kr == libc::KERN_SUCCESS {
298        Some(info.resident_size)
299    } else {
300        None
301    }
302}
303
304#[cfg(target_os = "macos")]
305#[allow(clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
306fn macos_memsize() -> Option<u64> {
307    use std::mem;
308    let mut memsize: u64 = 0;
309    let mut len = mem::size_of::<u64>();
310    let name = b"hw.memsize\0";
311    let ret = unsafe {
312        libc::sysctlbyname(
313            name.as_ptr().cast(),
314            std::ptr::from_mut(&mut memsize).cast::<libc::c_void>(),
315            std::ptr::from_mut(&mut len),
316            std::ptr::null_mut(),
317            0,
318        )
319    };
320    if ret == 0 {
321        Some(memsize)
322    } else {
323        None
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn rss_returns_some_on_supported_os() {
333        if cfg!(any(target_os = "linux", target_os = "macos")) {
334            let rss = get_rss_bytes();
335            assert!(rss.is_some(), "RSS should be readable");
336            assert!(rss.unwrap() > 0, "RSS should be > 0");
337        }
338    }
339
340    #[test]
341    fn system_ram_returns_some_on_supported_os() {
342        if cfg!(any(target_os = "linux", target_os = "macos")) {
343            let ram = get_system_ram_bytes();
344            assert!(ram.is_some(), "System RAM should be readable");
345            assert!(ram.unwrap() > 1_000_000, "System RAM should be > 1MB");
346        }
347    }
348
349    #[test]
350    fn snapshot_captures_correctly() {
351        if cfg!(any(target_os = "linux", target_os = "macos")) {
352            let snap = MemorySnapshot::capture();
353            assert!(snap.is_some());
354            let s = snap.unwrap();
355            assert!(s.rss_bytes > 0);
356            assert!(s.system_ram_bytes > s.rss_bytes);
357            assert!(s.rss_percent > 0.0 && s.rss_percent < 100.0);
358        }
359    }
360
361    #[test]
362    fn peak_rss_tracks_maximum() {
363        PEAK_RSS.store(0, Ordering::Relaxed);
364        PEAK_RSS.fetch_max(100, Ordering::Relaxed);
365        PEAK_RSS.fetch_max(50, Ordering::Relaxed);
366        assert_eq!(PEAK_RSS.load(Ordering::Relaxed), 100);
367    }
368
369    #[test]
370    fn pressure_level_roundtrip() {
371        for level in [
372            PressureLevel::Normal,
373            PressureLevel::Soft,
374            PressureLevel::Medium,
375            PressureLevel::Hard,
376            PressureLevel::Critical,
377        ] {
378            assert_eq!(PressureLevel::from_u8(level as u8), level);
379        }
380    }
381
382    #[test]
383    fn atomic_pressure_defaults_to_normal() {
384        assert_eq!(current_pressure(), PressureLevel::Normal);
385    }
386}