Skip to main content

lean_ctx/core/
memory_guard.rs

1//! Process-level RAM guardian with adaptive eviction and hard OOM protection.
2//!
3//! Monitors RSS via platform-specific APIs and triggers tiered cache eviction
4//! when memory usage exceeds configurable thresholds (default: 5% of system RAM).
5//! At critical levels (>3x limit), performs emergency shutdown to prevent OS OOM kill.
6
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9
10static PEAK_RSS: AtomicU64 = AtomicU64::new(0);
11static GUARD_RUNNING: AtomicBool = AtomicBool::new(false);
12static ABORT_REQUESTED: AtomicBool = AtomicBool::new(false);
13
14/// Current process RSS in bytes, or `None` if unavailable.
15pub fn get_rss_bytes() -> Option<u64> {
16    #[cfg(target_os = "linux")]
17    {
18        linux_rss()
19    }
20    #[cfg(target_os = "macos")]
21    {
22        macos_rss()
23    }
24    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
25    {
26        None
27    }
28}
29
30/// Total physical RAM in bytes, or `None` if unavailable.
31pub fn get_system_ram_bytes() -> Option<u64> {
32    #[cfg(target_os = "linux")]
33    {
34        linux_memtotal()
35    }
36    #[cfg(target_os = "macos")]
37    {
38        macos_memsize()
39    }
40    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
41    {
42        None
43    }
44}
45
46/// Returns the RSS limit in bytes based on `max_ram_percent` config.
47pub fn rss_limit_bytes() -> Option<u64> {
48    let sys_ram = get_system_ram_bytes()?;
49    let cfg = super::config::Config::load();
50    let pct = super::config::MemoryGuardConfig::effective(&cfg).max_ram_percent;
51    Some(sys_ram / 100 * u64::from(pct))
52}
53
54/// Recorded peak RSS since process start.
55pub fn peak_rss_bytes() -> u64 {
56    PEAK_RSS.load(Ordering::Relaxed)
57}
58
59/// Snapshot of current memory state for diagnostics.
60#[derive(Debug, Clone, serde::Serialize)]
61pub struct MemorySnapshot {
62    pub rss_bytes: u64,
63    pub peak_rss_bytes: u64,
64    pub system_ram_bytes: u64,
65    pub rss_limit_bytes: u64,
66    pub rss_percent: f64,
67    pub pressure_level: PressureLevel,
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize)]
71#[serde(rename_all = "lowercase")]
72pub enum PressureLevel {
73    Normal,
74    Soft,
75    Medium,
76    Hard,
77    Critical,
78}
79
80impl MemorySnapshot {
81    pub fn capture() -> Option<Self> {
82        let rss = get_rss_bytes()?;
83        let sys = get_system_ram_bytes()?;
84        let limit = rss_limit_bytes()?;
85        let pct = if sys > 0 {
86            (rss as f64 / sys as f64) * 100.0
87        } else {
88            0.0
89        };
90
91        PEAK_RSS.fetch_max(rss, Ordering::Relaxed);
92
93        let cfg = super::config::Config::load();
94        let guard_cfg = super::config::MemoryGuardConfig::effective(&cfg);
95        let base = f64::from(guard_cfg.max_ram_percent);
96
97        let level = if pct > base * 3.0 {
98            PressureLevel::Critical
99        } else if pct > base * 2.0 {
100            PressureLevel::Hard
101        } else if pct > base * 1.4 {
102            PressureLevel::Medium
103        } else if pct > base {
104            PressureLevel::Soft
105        } else {
106            PressureLevel::Normal
107        };
108
109        Some(Self {
110            rss_bytes: rss,
111            peak_rss_bytes: PEAK_RSS.load(Ordering::Relaxed),
112            system_ram_bytes: sys,
113            rss_limit_bytes: limit,
114            rss_percent: pct,
115            pressure_level: level,
116        })
117    }
118}
119
120/// Force-purge all jemalloc arenas to return memory to the OS.
121pub fn jemalloc_purge() {
122    #[cfg(all(feature = "jemalloc", not(windows)))]
123    {
124        use tikv_jemalloc_ctl::raw;
125        let purge_mib = b"arena.4096.purge\0";
126        unsafe {
127            let _ = raw::write(purge_mib, 0u64);
128        }
129    }
130}
131
132/// Returns `true` if the guardian has requested background tasks to abort.
133pub fn abort_requested() -> bool {
134    ABORT_REQUESTED.load(Ordering::Relaxed)
135}
136
137/// Quick, non-allocating memory pressure check for hot loops (scanners, indexers).
138/// Returns `true` if memory is at or above Soft pressure and work should be paused/stopped.
139pub fn is_under_pressure() -> bool {
140    let Some(snap) = MemorySnapshot::capture() else {
141        return false;
142    };
143    snap.pressure_level >= PressureLevel::Soft
144}
145
146/// Start the background memory guardian task (idempotent).
147/// Polls every 3s (normal) or 1s (under pressure). At Critical level, performs
148/// emergency shutdown to prevent OS OOM kill.
149pub fn start_guard(eviction_callback: Arc<dyn Fn(PressureLevel) + Send + Sync>) {
150    if GUARD_RUNNING.swap(true, Ordering::SeqCst) {
151        return;
152    }
153    std::thread::Builder::new()
154        .name("memory-guard".into())
155        .spawn(move || {
156            let mut poll_secs = 3u64;
157            loop {
158                std::thread::sleep(std::time::Duration::from_secs(poll_secs));
159                let Some(snap) = MemorySnapshot::capture() else {
160                    continue;
161                };
162
163                if snap.pressure_level == PressureLevel::Critical {
164                    tracing::error!(
165                        "[memory_guard] CRITICAL: RSS={:.0}MB ({:.1}% of {:.0}GB) — \
166                         emergency shutdown to prevent OS OOM kill",
167                        snap.rss_bytes as f64 / 1_048_576.0,
168                        snap.rss_percent,
169                        snap.system_ram_bytes as f64 / 1_073_741_824.0,
170                    );
171                    ABORT_REQUESTED.store(true, Ordering::SeqCst);
172                    (eviction_callback)(PressureLevel::Critical);
173                    jemalloc_purge();
174
175                    std::thread::sleep(std::time::Duration::from_secs(2));
176                    if let Some(recheck) = MemorySnapshot::capture() {
177                        if recheck.pressure_level >= PressureLevel::Hard {
178                            tracing::error!(
179                                "[memory_guard] still at {:?} after emergency eviction — \
180                                 exiting process (RSS={:.0}MB)",
181                                recheck.pressure_level,
182                                recheck.rss_bytes as f64 / 1_048_576.0,
183                            );
184                            std::process::exit(137);
185                        }
186                    }
187                }
188
189                if snap.pressure_level >= PressureLevel::Soft {
190                    poll_secs = 1;
191                    ABORT_REQUESTED
192                        .store(snap.pressure_level >= PressureLevel::Hard, Ordering::SeqCst);
193                    tracing::warn!(
194                        "[memory_guard] pressure={:?} RSS={:.0}MB limit={:.0}MB ({:.1}% of {:.0}GB)",
195                        snap.pressure_level,
196                        snap.rss_bytes as f64 / 1_048_576.0,
197                        snap.rss_limit_bytes as f64 / 1_048_576.0,
198                        snap.rss_percent,
199                        snap.system_ram_bytes as f64 / 1_073_741_824.0,
200                    );
201                    (eviction_callback)(snap.pressure_level);
202
203                    if snap.pressure_level >= PressureLevel::Hard {
204                        jemalloc_purge();
205                    }
206                } else {
207                    poll_secs = 3;
208                    if ABORT_REQUESTED.load(Ordering::Relaxed) {
209                        ABORT_REQUESTED.store(false, Ordering::SeqCst);
210                        tracing::info!("[memory_guard] pressure normalized, clearing abort flag");
211                    }
212                }
213            }
214        })
215        .ok();
216}
217
218/// Force immediate purge of all caches and jemalloc arenas.
219pub fn force_purge() {
220    jemalloc_purge();
221    tracing::info!("[memory_guard] force_purge completed");
222}
223
224// --- Platform-specific implementations ---
225
226#[cfg(target_os = "linux")]
227fn linux_rss() -> Option<u64> {
228    let status = std::fs::read_to_string("/proc/self/status").ok()?;
229    for line in status.lines() {
230        if let Some(val) = line.strip_prefix("VmRSS:") {
231            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
232            return Some(kb * 1024);
233        }
234    }
235    None
236}
237
238#[cfg(target_os = "linux")]
239fn linux_memtotal() -> Option<u64> {
240    let info = std::fs::read_to_string("/proc/meminfo").ok()?;
241    for line in info.lines() {
242        if let Some(val) = line.strip_prefix("MemTotal:") {
243            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
244            return Some(kb * 1024);
245        }
246    }
247    None
248}
249
250#[cfg(target_os = "macos")]
251#[allow(deprecated, clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
252fn macos_rss() -> Option<u64> {
253    use std::mem;
254    let mut info: libc::mach_task_basic_info_data_t = unsafe { mem::zeroed() };
255    let mut count = (mem::size_of::<libc::mach_task_basic_info_data_t>()
256        / mem::size_of::<libc::natural_t>()) as libc::mach_msg_type_number_t;
257    let kr = unsafe {
258        libc::task_info(
259            libc::mach_task_self(),
260            libc::MACH_TASK_BASIC_INFO,
261            std::ptr::from_mut(&mut info).cast::<i32>(),
262            std::ptr::from_mut(&mut count),
263        )
264    };
265    if kr == libc::KERN_SUCCESS {
266        Some(info.resident_size)
267    } else {
268        None
269    }
270}
271
272#[cfg(target_os = "macos")]
273#[allow(clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
274fn macos_memsize() -> Option<u64> {
275    use std::mem;
276    let mut memsize: u64 = 0;
277    let mut len = mem::size_of::<u64>();
278    let name = b"hw.memsize\0";
279    let ret = unsafe {
280        libc::sysctlbyname(
281            name.as_ptr().cast(),
282            std::ptr::from_mut(&mut memsize).cast::<libc::c_void>(),
283            std::ptr::from_mut(&mut len),
284            std::ptr::null_mut(),
285            0,
286        )
287    };
288    if ret == 0 {
289        Some(memsize)
290    } else {
291        None
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn rss_returns_some_on_supported_os() {
301        if cfg!(any(target_os = "linux", target_os = "macos")) {
302            let rss = get_rss_bytes();
303            assert!(rss.is_some(), "RSS should be readable");
304            assert!(rss.unwrap() > 0, "RSS should be > 0");
305        }
306    }
307
308    #[test]
309    fn system_ram_returns_some_on_supported_os() {
310        if cfg!(any(target_os = "linux", target_os = "macos")) {
311            let ram = get_system_ram_bytes();
312            assert!(ram.is_some(), "System RAM should be readable");
313            assert!(ram.unwrap() > 1_000_000, "System RAM should be > 1MB");
314        }
315    }
316
317    #[test]
318    fn snapshot_captures_correctly() {
319        if cfg!(any(target_os = "linux", target_os = "macos")) {
320            let snap = MemorySnapshot::capture();
321            assert!(snap.is_some());
322            let s = snap.unwrap();
323            assert!(s.rss_bytes > 0);
324            assert!(s.system_ram_bytes > s.rss_bytes);
325            assert!(s.rss_percent > 0.0 && s.rss_percent < 100.0);
326        }
327    }
328
329    #[test]
330    fn peak_rss_tracks_maximum() {
331        PEAK_RSS.store(0, Ordering::Relaxed);
332        PEAK_RSS.fetch_max(100, Ordering::Relaxed);
333        PEAK_RSS.fetch_max(50, Ordering::Relaxed);
334        assert_eq!(PEAK_RSS.load(Ordering::Relaxed), 100);
335    }
336}