Skip to main content

lean_ctx/core/
memory_guard.rs

1//! Process-level RAM guardian with adaptive eviction and hard OOM protection.
2//!
3//! Monitors RSS via platform-specific APIs and triggers tiered cache eviction
4//! when memory usage exceeds configurable thresholds (default: 5% of system RAM).
5//! At critical levels (>3x limit), performs emergency shutdown to prevent OS OOM kill.
6
7use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
8use std::sync::Arc;
9
10static PEAK_RSS: AtomicU64 = AtomicU64::new(0);
11static GUARD_RUNNING: AtomicBool = AtomicBool::new(false);
12static ABORT_REQUESTED: AtomicBool = AtomicBool::new(false);
13static CURRENT_PRESSURE: AtomicU8 = AtomicU8::new(0);
14
15/// Current process RSS in bytes, or `None` if unavailable.
16pub fn get_rss_bytes() -> Option<u64> {
17    #[cfg(target_os = "linux")]
18    {
19        linux_rss()
20    }
21    #[cfg(target_os = "macos")]
22    {
23        macos_rss()
24    }
25    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
26    {
27        None
28    }
29}
30
31/// RSS of an arbitrary process by PID, or `None` if unavailable/dead.
32pub fn get_rss_bytes_for_pid(pid: u32) -> Option<u64> {
33    #[cfg(target_os = "linux")]
34    {
35        linux_rss_for_pid(pid)
36    }
37    #[cfg(target_os = "macos")]
38    {
39        macos_rss_for_pid(pid)
40    }
41    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
42    {
43        let _ = pid;
44        None
45    }
46}
47
48/// Total physical RAM in bytes, or `None` if unavailable.
49pub fn get_system_ram_bytes() -> Option<u64> {
50    #[cfg(target_os = "linux")]
51    {
52        linux_memtotal()
53    }
54    #[cfg(target_os = "macos")]
55    {
56        macos_memsize()
57    }
58    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
59    {
60        None
61    }
62}
63
64/// Returns the RSS limit in bytes based on `max_ram_percent` config.
65pub fn rss_limit_bytes() -> Option<u64> {
66    let sys_ram = get_system_ram_bytes()?;
67    let cfg = super::config::Config::load();
68    let pct = super::config::MemoryGuardConfig::effective(&cfg).max_ram_percent;
69    Some(sys_ram / 100 * u64::from(pct))
70}
71
72/// Recorded peak RSS since process start.
73pub fn peak_rss_bytes() -> u64 {
74    PEAK_RSS.load(Ordering::Relaxed)
75}
76
77/// Snapshot of current memory state for diagnostics.
78#[derive(Debug, Clone, serde::Serialize)]
79pub struct MemorySnapshot {
80    pub rss_bytes: u64,
81    pub peak_rss_bytes: u64,
82    pub system_ram_bytes: u64,
83    pub rss_limit_bytes: u64,
84    pub rss_percent: f64,
85    pub pressure_level: PressureLevel,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize)]
89#[serde(rename_all = "lowercase")]
90#[repr(u8)]
91pub enum PressureLevel {
92    Normal = 0,
93    Soft = 1,
94    Medium = 2,
95    Hard = 3,
96    Critical = 4,
97}
98
99impl PressureLevel {
100    fn from_u8(v: u8) -> Self {
101        match v {
102            1 => Self::Soft,
103            2 => Self::Medium,
104            3 => Self::Hard,
105            4 => Self::Critical,
106            _ => Self::Normal,
107        }
108    }
109}
110
111impl MemorySnapshot {
112    /// Capture memory snapshot of the **current** process.
113    pub fn capture() -> Option<Self> {
114        Self::capture_impl(get_rss_bytes()?)
115    }
116
117    /// Capture memory snapshot for the **daemon** process (by PID).
118    /// Falls back to the current process if the PID is dead or unreadable.
119    pub fn capture_for_pid(pid: u32) -> Option<Self> {
120        let rss = get_rss_bytes_for_pid(pid).or_else(get_rss_bytes)?;
121        Self::capture_impl(rss)
122    }
123
124    fn capture_impl(rss: u64) -> Option<Self> {
125        let sys = get_system_ram_bytes()?;
126        let limit = rss_limit_bytes()?;
127        let pct = if sys > 0 {
128            (rss as f64 / sys as f64) * 100.0
129        } else {
130            0.0
131        };
132
133        PEAK_RSS.fetch_max(rss, Ordering::Relaxed);
134
135        let cfg = super::config::Config::load();
136        let guard_cfg = super::config::MemoryGuardConfig::effective(&cfg);
137        let base = f64::from(guard_cfg.max_ram_percent);
138
139        let level = if pct > base * 3.0 {
140            PressureLevel::Critical
141        } else if pct > base * 2.0 {
142            PressureLevel::Hard
143        } else if pct > base * 1.4 {
144            PressureLevel::Medium
145        } else if pct > base {
146            PressureLevel::Soft
147        } else {
148            PressureLevel::Normal
149        };
150
151        Some(Self {
152            rss_bytes: rss,
153            peak_rss_bytes: PEAK_RSS.load(Ordering::Relaxed),
154            system_ram_bytes: sys,
155            rss_limit_bytes: limit,
156            rss_percent: pct,
157            pressure_level: level,
158        })
159    }
160}
161
162/// Force-purge all jemalloc arenas to return memory to the OS.
163/// Uses `MALLCTL_ARENAS_ALL` (value 4096) which is the jemalloc sentinel
164/// for "all arenas". Logs errors instead of silently swallowing them.
165pub fn jemalloc_purge() {
166    #[cfg(all(feature = "jemalloc", not(windows)))]
167    {
168        use tikv_jemalloc_ctl::raw;
169        let purge_mib = b"arena.4096.purge\0";
170        unsafe {
171            if let Err(e) = raw::write(purge_mib, 0u64) {
172                tracing::debug!("[memory_guard] jemalloc purge failed: {e}");
173            }
174        }
175    }
176}
177
178/// Returns `true` if the guardian has requested background tasks to abort.
179pub fn abort_requested() -> bool {
180    ABORT_REQUESTED.load(Ordering::Relaxed)
181}
182
183/// Quick, non-allocating memory pressure check for hot loops (scanners, indexers).
184/// Reads the cached atomic flag set by the guardian thread — O(1), no syscalls.
185pub fn is_under_pressure() -> bool {
186    current_pressure() >= PressureLevel::Soft
187}
188
189/// Returns the current pressure level as last observed by the guardian thread.
190pub fn current_pressure() -> PressureLevel {
191    PressureLevel::from_u8(CURRENT_PRESSURE.load(Ordering::Relaxed))
192}
193
194/// Start the background memory guardian task (idempotent).
195/// Polls every 3s (normal) or 1s (under pressure). At Critical level, performs
196/// emergency shutdown to prevent OS OOM kill.
197pub fn start_guard(eviction_callback: Arc<dyn Fn(PressureLevel) + Send + Sync>) {
198    if GUARD_RUNNING.swap(true, Ordering::SeqCst) {
199        return;
200    }
201    std::thread::Builder::new()
202        .name("memory-guard".into())
203        .spawn(move || {
204            let mut poll_secs = 3u64;
205            loop {
206                std::thread::sleep(std::time::Duration::from_secs(poll_secs));
207                let Some(snap) = MemorySnapshot::capture() else {
208                    continue;
209                };
210
211                CURRENT_PRESSURE.store(snap.pressure_level as u8, Ordering::Relaxed);
212
213                if snap.pressure_level == PressureLevel::Critical {
214                    tracing::error!(
215                        "[memory_guard] CRITICAL: RSS={:.0}MB ({:.1}% of {:.0}GB) — \
216                         aggressive eviction to prevent OS OOM kill",
217                        snap.rss_bytes as f64 / 1_048_576.0,
218                        snap.rss_percent,
219                        snap.system_ram_bytes as f64 / 1_073_741_824.0,
220                    );
221                    ABORT_REQUESTED.store(true, Ordering::SeqCst);
222                    (eviction_callback)(PressureLevel::Critical);
223                    jemalloc_purge();
224
225                    for attempt in 1..=3 {
226                        std::thread::sleep(std::time::Duration::from_secs(2));
227                        (eviction_callback)(PressureLevel::Critical);
228                        jemalloc_purge();
229                        if let Some(recheck) = MemorySnapshot::capture() {
230                            if recheck.pressure_level < PressureLevel::Hard {
231                                tracing::info!(
232                                    "[memory_guard] eviction attempt {attempt} succeeded — \
233                                     RSS={:.0}MB, pressure={:?}",
234                                    recheck.rss_bytes as f64 / 1_048_576.0,
235                                    recheck.pressure_level,
236                                );
237                                break;
238                            }
239                            tracing::error!(
240                                "[memory_guard] eviction attempt {attempt}/3 — still {:?} \
241                                 (RSS={:.0}MB)",
242                                recheck.pressure_level,
243                                recheck.rss_bytes as f64 / 1_048_576.0,
244                            );
245                        }
246                    }
247                }
248
249                if snap.pressure_level >= PressureLevel::Soft {
250                    poll_secs = 1;
251                    ABORT_REQUESTED
252                        .store(snap.pressure_level >= PressureLevel::Hard, Ordering::SeqCst);
253                    tracing::warn!(
254                        "[memory_guard] pressure={:?} RSS={:.0}MB limit={:.0}MB ({:.1}% of {:.0}GB)",
255                        snap.pressure_level,
256                        snap.rss_bytes as f64 / 1_048_576.0,
257                        snap.rss_limit_bytes as f64 / 1_048_576.0,
258                        snap.rss_percent,
259                        snap.system_ram_bytes as f64 / 1_073_741_824.0,
260                    );
261                    (eviction_callback)(snap.pressure_level);
262
263                    if snap.pressure_level >= PressureLevel::Hard {
264                        jemalloc_purge();
265                    }
266                } else {
267                    poll_secs = 3;
268                    if ABORT_REQUESTED.load(Ordering::Relaxed) {
269                        ABORT_REQUESTED.store(false, Ordering::SeqCst);
270                        tracing::info!("[memory_guard] pressure normalized, clearing abort flag");
271                    }
272                }
273            }
274        })
275        .ok();
276}
277
278/// Force immediate purge of all caches and jemalloc arenas.
279pub fn force_purge() {
280    jemalloc_purge();
281    tracing::info!("[memory_guard] force_purge completed");
282}
283
284// --- Platform-specific implementations ---
285
286#[cfg(target_os = "linux")]
287fn linux_rss() -> Option<u64> {
288    linux_rss_for_pid(std::process::id())
289}
290
291#[cfg(target_os = "linux")]
292fn linux_rss_for_pid(pid: u32) -> Option<u64> {
293    let path = format!("/proc/{pid}/status");
294    let status = std::fs::read_to_string(path).ok()?;
295    for line in status.lines() {
296        if let Some(val) = line.strip_prefix("VmRSS:") {
297            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
298            return Some(kb * 1024);
299        }
300    }
301    None
302}
303
304#[cfg(target_os = "linux")]
305fn linux_memtotal() -> Option<u64> {
306    let info = std::fs::read_to_string("/proc/meminfo").ok()?;
307    for line in info.lines() {
308        if let Some(val) = line.strip_prefix("MemTotal:") {
309            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
310            return Some(kb * 1024);
311        }
312    }
313    None
314}
315
316#[cfg(target_os = "macos")]
317#[allow(deprecated, clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
318fn macos_rss() -> Option<u64> {
319    use std::mem;
320    let mut info: libc::mach_task_basic_info_data_t = unsafe { mem::zeroed() };
321    let mut count = (mem::size_of::<libc::mach_task_basic_info_data_t>()
322        / mem::size_of::<libc::natural_t>()) as libc::mach_msg_type_number_t;
323    let kr = unsafe {
324        libc::task_info(
325            libc::mach_task_self(),
326            libc::MACH_TASK_BASIC_INFO,
327            std::ptr::from_mut(&mut info).cast::<i32>(),
328            std::ptr::from_mut(&mut count),
329        )
330    };
331    if kr == libc::KERN_SUCCESS {
332        Some(info.resident_size)
333    } else {
334        None
335    }
336}
337
338#[cfg(target_os = "macos")]
339fn macos_rss_for_pid(pid: u32) -> Option<u64> {
340    // Use `ps -o rss= -p <pid>` as a portable fallback.
341    // `task_for_pid` requires root/entitlements, `proc_pid_rusage` is private API.
342    let output = std::process::Command::new("ps")
343        .args(["-o", "rss=", "-p", &pid.to_string()])
344        .output()
345        .ok()?;
346    if !output.status.success() {
347        return None;
348    }
349    let text = String::from_utf8_lossy(&output.stdout);
350    let kb: u64 = text.trim().parse().ok()?;
351    Some(kb * 1024)
352}
353
354#[cfg(target_os = "macos")]
355#[allow(clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
356fn macos_memsize() -> Option<u64> {
357    use std::mem;
358    let mut memsize: u64 = 0;
359    let mut len = mem::size_of::<u64>();
360    let name = b"hw.memsize\0";
361    let ret = unsafe {
362        libc::sysctlbyname(
363            name.as_ptr().cast(),
364            std::ptr::from_mut(&mut memsize).cast::<libc::c_void>(),
365            std::ptr::from_mut(&mut len),
366            std::ptr::null_mut(),
367            0,
368        )
369    };
370    if ret == 0 {
371        Some(memsize)
372    } else {
373        None
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    #[test]
382    fn rss_returns_some_on_supported_os() {
383        if cfg!(any(target_os = "linux", target_os = "macos")) {
384            let rss = get_rss_bytes();
385            assert!(rss.is_some(), "RSS should be readable");
386            assert!(rss.unwrap() > 0, "RSS should be > 0");
387        }
388    }
389
390    #[test]
391    fn system_ram_returns_some_on_supported_os() {
392        if cfg!(any(target_os = "linux", target_os = "macos")) {
393            let ram = get_system_ram_bytes();
394            assert!(ram.is_some(), "System RAM should be readable");
395            assert!(ram.unwrap() > 1_000_000, "System RAM should be > 1MB");
396        }
397    }
398
399    #[test]
400    fn snapshot_captures_correctly() {
401        if cfg!(any(target_os = "linux", target_os = "macos")) {
402            let snap = MemorySnapshot::capture();
403            assert!(snap.is_some());
404            let s = snap.unwrap();
405            assert!(s.rss_bytes > 0);
406            assert!(s.system_ram_bytes > s.rss_bytes);
407            assert!(s.rss_percent > 0.0 && s.rss_percent < 100.0);
408        }
409    }
410
411    #[test]
412    fn peak_rss_tracks_maximum() {
413        PEAK_RSS.store(0, Ordering::Relaxed);
414        PEAK_RSS.fetch_max(100, Ordering::Relaxed);
415        PEAK_RSS.fetch_max(50, Ordering::Relaxed);
416        assert_eq!(PEAK_RSS.load(Ordering::Relaxed), 100);
417    }
418
419    #[test]
420    fn pressure_level_roundtrip() {
421        for level in [
422            PressureLevel::Normal,
423            PressureLevel::Soft,
424            PressureLevel::Medium,
425            PressureLevel::Hard,
426            PressureLevel::Critical,
427        ] {
428            assert_eq!(PressureLevel::from_u8(level as u8), level);
429        }
430    }
431
432    #[test]
433    fn atomic_pressure_defaults_to_normal() {
434        assert_eq!(current_pressure(), PressureLevel::Normal);
435    }
436
437    #[test]
438    fn rss_for_own_pid_matches_self() {
439        if cfg!(any(target_os = "linux", target_os = "macos")) {
440            let self_rss = get_rss_bytes().unwrap();
441            let pid_rss = get_rss_bytes_for_pid(std::process::id()).unwrap();
442            let ratio = self_rss as f64 / pid_rss as f64;
443            assert!(
444                (0.5..2.0).contains(&ratio),
445                "self RSS ({self_rss}) and pid-based RSS ({pid_rss}) should be within 2x"
446            );
447        }
448    }
449
450    #[test]
451    fn rss_for_dead_pid_returns_none() {
452        let dead_pid = 999_999_999u32;
453        assert!(get_rss_bytes_for_pid(dead_pid).is_none());
454    }
455
456    #[test]
457    fn capture_for_pid_falls_back_on_dead_pid() {
458        if cfg!(any(target_os = "linux", target_os = "macos")) {
459            let snap = MemorySnapshot::capture_for_pid(999_999_999);
460            assert!(snap.is_some(), "should fall back to self RSS");
461        }
462    }
463}