Skip to main content

lean_ctx/core/
memory_guard.rs

1//! Process-level RAM guardian with adaptive eviction and hard OOM protection.
2//!
3//! Monitors RSS via platform-specific APIs and triggers tiered cache eviction
4//! when memory usage exceeds configurable thresholds (default: 5% of system RAM).
5//! At critical levels (>3x limit), performs emergency shutdown to prevent OS OOM kill.
6
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9
10static PEAK_RSS: AtomicU64 = AtomicU64::new(0);
11static GUARD_RUNNING: AtomicBool = AtomicBool::new(false);
12static ABORT_REQUESTED: AtomicBool = AtomicBool::new(false);
13
14/// Current process RSS in bytes, or `None` if unavailable.
15pub fn get_rss_bytes() -> Option<u64> {
16    #[cfg(target_os = "linux")]
17    {
18        linux_rss()
19    }
20    #[cfg(target_os = "macos")]
21    {
22        macos_rss()
23    }
24    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
25    {
26        None
27    }
28}
29
30/// Total physical RAM in bytes, or `None` if unavailable.
31pub fn get_system_ram_bytes() -> Option<u64> {
32    #[cfg(target_os = "linux")]
33    {
34        linux_memtotal()
35    }
36    #[cfg(target_os = "macos")]
37    {
38        macos_memsize()
39    }
40    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
41    {
42        None
43    }
44}
45
46/// Returns the RSS limit in bytes based on `max_ram_percent` config.
47pub fn rss_limit_bytes() -> Option<u64> {
48    let sys_ram = get_system_ram_bytes()?;
49    let cfg = super::config::Config::load();
50    let pct = super::config::MemoryGuardConfig::effective(&cfg).max_ram_percent;
51    Some(sys_ram / 100 * u64::from(pct))
52}
53
54/// Recorded peak RSS since process start.
55pub fn peak_rss_bytes() -> u64 {
56    PEAK_RSS.load(Ordering::Relaxed)
57}
58
59/// Snapshot of current memory state for diagnostics.
60#[derive(Debug, Clone, serde::Serialize)]
61pub struct MemorySnapshot {
62    pub rss_bytes: u64,
63    pub peak_rss_bytes: u64,
64    pub system_ram_bytes: u64,
65    pub rss_limit_bytes: u64,
66    pub rss_percent: f64,
67    pub pressure_level: PressureLevel,
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize)]
71#[serde(rename_all = "lowercase")]
72pub enum PressureLevel {
73    Normal,
74    Soft,
75    Medium,
76    Hard,
77    Critical,
78}
79
80impl MemorySnapshot {
81    pub fn capture() -> Option<Self> {
82        let rss = get_rss_bytes()?;
83        let sys = get_system_ram_bytes()?;
84        let limit = rss_limit_bytes()?;
85        let pct = if sys > 0 {
86            (rss as f64 / sys as f64) * 100.0
87        } else {
88            0.0
89        };
90
91        PEAK_RSS.fetch_max(rss, Ordering::Relaxed);
92
93        let cfg = super::config::Config::load();
94        let guard_cfg = super::config::MemoryGuardConfig::effective(&cfg);
95        let base = f64::from(guard_cfg.max_ram_percent);
96
97        let level = if pct > base * 3.0 {
98            PressureLevel::Critical
99        } else if pct > base * 2.0 {
100            PressureLevel::Hard
101        } else if pct > base * 1.4 {
102            PressureLevel::Medium
103        } else if pct > base {
104            PressureLevel::Soft
105        } else {
106            PressureLevel::Normal
107        };
108
109        Some(Self {
110            rss_bytes: rss,
111            peak_rss_bytes: PEAK_RSS.load(Ordering::Relaxed),
112            system_ram_bytes: sys,
113            rss_limit_bytes: limit,
114            rss_percent: pct,
115            pressure_level: level,
116        })
117    }
118}
119
120/// Force-purge all jemalloc arenas to return memory to the OS.
121pub fn jemalloc_purge() {
122    #[cfg(all(feature = "jemalloc", not(windows)))]
123    {
124        use tikv_jemalloc_ctl::raw;
125        let purge_mib = b"arena.4096.purge\0";
126        unsafe {
127            let _ = raw::write(purge_mib, 0u64);
128        }
129    }
130}
131
132/// Returns `true` if the guardian has requested background tasks to abort.
133pub fn abort_requested() -> bool {
134    ABORT_REQUESTED.load(Ordering::Relaxed)
135}
136
137/// Quick, non-allocating memory pressure check for hot loops (scanners, indexers).
138/// Returns `true` if memory is at or above Soft pressure and work should be paused/stopped.
139pub fn is_under_pressure() -> bool {
140    let Some(snap) = MemorySnapshot::capture() else {
141        return false;
142    };
143    snap.pressure_level >= PressureLevel::Soft
144}
145
146/// Start the background memory guardian task (idempotent).
147/// Polls every 3s (normal) or 1s (under pressure). At Critical level, performs
148/// emergency shutdown to prevent OS OOM kill.
149pub fn start_guard(eviction_callback: Arc<dyn Fn(PressureLevel) + Send + Sync>) {
150    if GUARD_RUNNING.swap(true, Ordering::SeqCst) {
151        return;
152    }
153    std::thread::Builder::new()
154        .name("memory-guard".into())
155        .spawn(move || {
156            let mut poll_secs = 3u64;
157            loop {
158                std::thread::sleep(std::time::Duration::from_secs(poll_secs));
159                let Some(snap) = MemorySnapshot::capture() else {
160                    continue;
161                };
162
163                if snap.pressure_level == PressureLevel::Critical {
164                    tracing::error!(
165                        "[memory_guard] CRITICAL: RSS={:.0}MB ({:.1}% of {:.0}GB) — \
166                         aggressive eviction to prevent OS OOM kill",
167                        snap.rss_bytes as f64 / 1_048_576.0,
168                        snap.rss_percent,
169                        snap.system_ram_bytes as f64 / 1_073_741_824.0,
170                    );
171                    ABORT_REQUESTED.store(true, Ordering::SeqCst);
172                    (eviction_callback)(PressureLevel::Critical);
173                    jemalloc_purge();
174
175                    for attempt in 1..=3 {
176                        std::thread::sleep(std::time::Duration::from_secs(2));
177                        (eviction_callback)(PressureLevel::Critical);
178                        jemalloc_purge();
179                        if let Some(recheck) = MemorySnapshot::capture() {
180                            if recheck.pressure_level < PressureLevel::Hard {
181                                tracing::info!(
182                                    "[memory_guard] eviction attempt {attempt} succeeded — \
183                                     RSS={:.0}MB, pressure={:?}",
184                                    recheck.rss_bytes as f64 / 1_048_576.0,
185                                    recheck.pressure_level,
186                                );
187                                break;
188                            }
189                            tracing::error!(
190                                "[memory_guard] eviction attempt {attempt}/3 — still {:?} \
191                                 (RSS={:.0}MB)",
192                                recheck.pressure_level,
193                                recheck.rss_bytes as f64 / 1_048_576.0,
194                            );
195                        }
196                    }
197                }
198
199                if snap.pressure_level >= PressureLevel::Soft {
200                    poll_secs = 1;
201                    ABORT_REQUESTED
202                        .store(snap.pressure_level >= PressureLevel::Hard, Ordering::SeqCst);
203                    tracing::warn!(
204                        "[memory_guard] pressure={:?} RSS={:.0}MB limit={:.0}MB ({:.1}% of {:.0}GB)",
205                        snap.pressure_level,
206                        snap.rss_bytes as f64 / 1_048_576.0,
207                        snap.rss_limit_bytes as f64 / 1_048_576.0,
208                        snap.rss_percent,
209                        snap.system_ram_bytes as f64 / 1_073_741_824.0,
210                    );
211                    (eviction_callback)(snap.pressure_level);
212
213                    if snap.pressure_level >= PressureLevel::Hard {
214                        jemalloc_purge();
215                    }
216                } else {
217                    poll_secs = 3;
218                    if ABORT_REQUESTED.load(Ordering::Relaxed) {
219                        ABORT_REQUESTED.store(false, Ordering::SeqCst);
220                        tracing::info!("[memory_guard] pressure normalized, clearing abort flag");
221                    }
222                }
223            }
224        })
225        .ok();
226}
227
228/// Force immediate purge of all caches and jemalloc arenas.
229pub fn force_purge() {
230    jemalloc_purge();
231    tracing::info!("[memory_guard] force_purge completed");
232}
233
234// --- Platform-specific implementations ---
235
236#[cfg(target_os = "linux")]
237fn linux_rss() -> Option<u64> {
238    let status = std::fs::read_to_string("/proc/self/status").ok()?;
239    for line in status.lines() {
240        if let Some(val) = line.strip_prefix("VmRSS:") {
241            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
242            return Some(kb * 1024);
243        }
244    }
245    None
246}
247
248#[cfg(target_os = "linux")]
249fn linux_memtotal() -> Option<u64> {
250    let info = std::fs::read_to_string("/proc/meminfo").ok()?;
251    for line in info.lines() {
252        if let Some(val) = line.strip_prefix("MemTotal:") {
253            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
254            return Some(kb * 1024);
255        }
256    }
257    None
258}
259
260#[cfg(target_os = "macos")]
261#[allow(deprecated, clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
262fn macos_rss() -> Option<u64> {
263    use std::mem;
264    let mut info: libc::mach_task_basic_info_data_t = unsafe { mem::zeroed() };
265    let mut count = (mem::size_of::<libc::mach_task_basic_info_data_t>()
266        / mem::size_of::<libc::natural_t>()) as libc::mach_msg_type_number_t;
267    let kr = unsafe {
268        libc::task_info(
269            libc::mach_task_self(),
270            libc::MACH_TASK_BASIC_INFO,
271            std::ptr::from_mut(&mut info).cast::<i32>(),
272            std::ptr::from_mut(&mut count),
273        )
274    };
275    if kr == libc::KERN_SUCCESS {
276        Some(info.resident_size)
277    } else {
278        None
279    }
280}
281
282#[cfg(target_os = "macos")]
283#[allow(clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
284fn macos_memsize() -> Option<u64> {
285    use std::mem;
286    let mut memsize: u64 = 0;
287    let mut len = mem::size_of::<u64>();
288    let name = b"hw.memsize\0";
289    let ret = unsafe {
290        libc::sysctlbyname(
291            name.as_ptr().cast(),
292            std::ptr::from_mut(&mut memsize).cast::<libc::c_void>(),
293            std::ptr::from_mut(&mut len),
294            std::ptr::null_mut(),
295            0,
296        )
297    };
298    if ret == 0 {
299        Some(memsize)
300    } else {
301        None
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308
309    #[test]
310    fn rss_returns_some_on_supported_os() {
311        if cfg!(any(target_os = "linux", target_os = "macos")) {
312            let rss = get_rss_bytes();
313            assert!(rss.is_some(), "RSS should be readable");
314            assert!(rss.unwrap() > 0, "RSS should be > 0");
315        }
316    }
317
318    #[test]
319    fn system_ram_returns_some_on_supported_os() {
320        if cfg!(any(target_os = "linux", target_os = "macos")) {
321            let ram = get_system_ram_bytes();
322            assert!(ram.is_some(), "System RAM should be readable");
323            assert!(ram.unwrap() > 1_000_000, "System RAM should be > 1MB");
324        }
325    }
326
327    #[test]
328    fn snapshot_captures_correctly() {
329        if cfg!(any(target_os = "linux", target_os = "macos")) {
330            let snap = MemorySnapshot::capture();
331            assert!(snap.is_some());
332            let s = snap.unwrap();
333            assert!(s.rss_bytes > 0);
334            assert!(s.system_ram_bytes > s.rss_bytes);
335            assert!(s.rss_percent > 0.0 && s.rss_percent < 100.0);
336        }
337    }
338
339    #[test]
340    fn peak_rss_tracks_maximum() {
341        PEAK_RSS.store(0, Ordering::Relaxed);
342        PEAK_RSS.fetch_max(100, Ordering::Relaxed);
343        PEAK_RSS.fetch_max(50, Ordering::Relaxed);
344        assert_eq!(PEAK_RSS.load(Ordering::Relaxed), 100);
345    }
346}