Skip to main content

lean_ctx/core/
memory_guard.rs

1//! Process-level RAM guardian with adaptive eviction.
2//!
3//! Monitors RSS via platform-specific APIs and triggers tiered cache eviction
4//! when memory usage exceeds configurable thresholds (default: 5% of system RAM).
5
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Arc;
8
9static PEAK_RSS: AtomicU64 = AtomicU64::new(0);
10static GUARD_RUNNING: AtomicBool = AtomicBool::new(false);
11
12/// Current process RSS in bytes, or `None` if unavailable.
13pub fn get_rss_bytes() -> Option<u64> {
14    #[cfg(target_os = "linux")]
15    {
16        linux_rss()
17    }
18    #[cfg(target_os = "macos")]
19    {
20        macos_rss()
21    }
22    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
23    {
24        None
25    }
26}
27
28/// Total physical RAM in bytes, or `None` if unavailable.
29pub fn get_system_ram_bytes() -> Option<u64> {
30    #[cfg(target_os = "linux")]
31    {
32        linux_memtotal()
33    }
34    #[cfg(target_os = "macos")]
35    {
36        macos_memsize()
37    }
38    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
39    {
40        None
41    }
42}
43
44/// Returns the RSS limit in bytes based on `max_ram_percent` config.
45pub fn rss_limit_bytes() -> Option<u64> {
46    let sys_ram = get_system_ram_bytes()?;
47    let cfg = super::config::Config::load();
48    let pct = super::config::MemoryGuardConfig::effective(&cfg).max_ram_percent;
49    Some(sys_ram / 100 * u64::from(pct))
50}
51
52/// Recorded peak RSS since process start.
53pub fn peak_rss_bytes() -> u64 {
54    PEAK_RSS.load(Ordering::Relaxed)
55}
56
57/// Snapshot of current memory state for diagnostics.
58#[derive(Debug, Clone, serde::Serialize)]
59pub struct MemorySnapshot {
60    pub rss_bytes: u64,
61    pub peak_rss_bytes: u64,
62    pub system_ram_bytes: u64,
63    pub rss_limit_bytes: u64,
64    pub rss_percent: f64,
65    pub pressure_level: PressureLevel,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
69#[serde(rename_all = "lowercase")]
70pub enum PressureLevel {
71    Normal,
72    Soft,
73    Medium,
74    Hard,
75}
76
77impl MemorySnapshot {
78    pub fn capture() -> Option<Self> {
79        let rss = get_rss_bytes()?;
80        let sys = get_system_ram_bytes()?;
81        let limit = rss_limit_bytes()?;
82        let pct = if sys > 0 {
83            (rss as f64 / sys as f64) * 100.0
84        } else {
85            0.0
86        };
87
88        PEAK_RSS.fetch_max(rss, Ordering::Relaxed);
89
90        let cfg = super::config::Config::load();
91        let guard_cfg = super::config::MemoryGuardConfig::effective(&cfg);
92        let base = f64::from(guard_cfg.max_ram_percent);
93
94        let level = if pct > base * 2.0 {
95            PressureLevel::Hard
96        } else if pct > base * 1.4 {
97            PressureLevel::Medium
98        } else if pct > base {
99            PressureLevel::Soft
100        } else {
101            PressureLevel::Normal
102        };
103
104        Some(Self {
105            rss_bytes: rss,
106            peak_rss_bytes: PEAK_RSS.load(Ordering::Relaxed),
107            system_ram_bytes: sys,
108            rss_limit_bytes: limit,
109            rss_percent: pct,
110            pressure_level: level,
111        })
112    }
113}
114
115/// Force-purge all jemalloc arenas to return memory to the OS.
116pub fn jemalloc_purge() {
117    #[cfg(feature = "jemalloc")]
118    {
119        use tikv_jemalloc_ctl::raw;
120        let purge_mib = b"arena.4096.purge\0";
121        unsafe {
122            let _ = raw::write(purge_mib, 0u64);
123        }
124    }
125}
126
127/// Start the background memory guardian task (idempotent).
128pub fn start_guard(eviction_callback: Arc<dyn Fn(PressureLevel) + Send + Sync>) {
129    if GUARD_RUNNING.swap(true, Ordering::SeqCst) {
130        return;
131    }
132    tokio::spawn(async move {
133        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
134        loop {
135            interval.tick().await;
136            let Some(snap) = MemorySnapshot::capture() else {
137                continue;
138            };
139            if snap.pressure_level != PressureLevel::Normal {
140                tracing::warn!(
141                    "[memory_guard] pressure={:?} RSS={:.0}MB limit={:.0}MB ({:.1}% of {:.0}GB)",
142                    snap.pressure_level,
143                    snap.rss_bytes as f64 / 1_048_576.0,
144                    snap.rss_limit_bytes as f64 / 1_048_576.0,
145                    snap.rss_percent,
146                    snap.system_ram_bytes as f64 / 1_073_741_824.0,
147                );
148                (eviction_callback)(snap.pressure_level);
149
150                if snap.pressure_level == PressureLevel::Hard {
151                    jemalloc_purge();
152                }
153            }
154        }
155    });
156}
157
158/// Force immediate purge of all caches and jemalloc arenas.
159pub fn force_purge() {
160    jemalloc_purge();
161    tracing::info!("[memory_guard] force_purge completed");
162}
163
164// --- Platform-specific implementations ---
165
166#[cfg(target_os = "linux")]
167fn linux_rss() -> Option<u64> {
168    let status = std::fs::read_to_string("/proc/self/status").ok()?;
169    for line in status.lines() {
170        if let Some(val) = line.strip_prefix("VmRSS:") {
171            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
172            return Some(kb * 1024);
173        }
174    }
175    None
176}
177
178#[cfg(target_os = "linux")]
179fn linux_memtotal() -> Option<u64> {
180    let info = std::fs::read_to_string("/proc/meminfo").ok()?;
181    for line in info.lines() {
182        if let Some(val) = line.strip_prefix("MemTotal:") {
183            let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
184            return Some(kb * 1024);
185        }
186    }
187    None
188}
189
190#[cfg(target_os = "macos")]
191#[allow(deprecated, clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
192fn macos_rss() -> Option<u64> {
193    use std::mem;
194    let mut info: libc::mach_task_basic_info_data_t = unsafe { mem::zeroed() };
195    let mut count = (mem::size_of::<libc::mach_task_basic_info_data_t>()
196        / mem::size_of::<libc::natural_t>()) as libc::mach_msg_type_number_t;
197    let kr = unsafe {
198        libc::task_info(
199            libc::mach_task_self(),
200            libc::MACH_TASK_BASIC_INFO,
201            std::ptr::from_mut(&mut info).cast::<i32>(),
202            std::ptr::from_mut(&mut count),
203        )
204    };
205    if kr == libc::KERN_SUCCESS {
206        Some(info.resident_size)
207    } else {
208        None
209    }
210}
211
212#[cfg(target_os = "macos")]
213#[allow(clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
214fn macos_memsize() -> Option<u64> {
215    use std::mem;
216    let mut memsize: u64 = 0;
217    let mut len = mem::size_of::<u64>();
218    let name = b"hw.memsize\0";
219    let ret = unsafe {
220        libc::sysctlbyname(
221            name.as_ptr().cast(),
222            std::ptr::from_mut(&mut memsize).cast::<libc::c_void>(),
223            std::ptr::from_mut(&mut len),
224            std::ptr::null_mut(),
225            0,
226        )
227    };
228    if ret == 0 {
229        Some(memsize)
230    } else {
231        None
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    #[test]
240    fn rss_returns_some_on_supported_os() {
241        if cfg!(any(target_os = "linux", target_os = "macos")) {
242            let rss = get_rss_bytes();
243            assert!(rss.is_some(), "RSS should be readable");
244            assert!(rss.unwrap() > 0, "RSS should be > 0");
245        }
246    }
247
248    #[test]
249    fn system_ram_returns_some_on_supported_os() {
250        if cfg!(any(target_os = "linux", target_os = "macos")) {
251            let ram = get_system_ram_bytes();
252            assert!(ram.is_some(), "System RAM should be readable");
253            assert!(ram.unwrap() > 1_000_000, "System RAM should be > 1MB");
254        }
255    }
256
257    #[test]
258    fn snapshot_captures_correctly() {
259        if cfg!(any(target_os = "linux", target_os = "macos")) {
260            let snap = MemorySnapshot::capture();
261            assert!(snap.is_some());
262            let s = snap.unwrap();
263            assert!(s.rss_bytes > 0);
264            assert!(s.system_ram_bytes > s.rss_bytes);
265            assert!(s.rss_percent > 0.0 && s.rss_percent < 100.0);
266        }
267    }
268
269    #[test]
270    fn peak_rss_tracks_maximum() {
271        PEAK_RSS.store(0, Ordering::Relaxed);
272        PEAK_RSS.fetch_max(100, Ordering::Relaxed);
273        PEAK_RSS.fetch_max(50, Ordering::Relaxed);
274        assert_eq!(PEAK_RSS.load(Ordering::Relaxed), 100);
275    }
276}