lean_ctx/core/
memory_guard.rs1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9
10static PEAK_RSS: AtomicU64 = AtomicU64::new(0);
11static GUARD_RUNNING: AtomicBool = AtomicBool::new(false);
12static ABORT_REQUESTED: AtomicBool = AtomicBool::new(false);
13
14pub fn get_rss_bytes() -> Option<u64> {
16 #[cfg(target_os = "linux")]
17 {
18 linux_rss()
19 }
20 #[cfg(target_os = "macos")]
21 {
22 macos_rss()
23 }
24 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
25 {
26 None
27 }
28}
29
30pub fn get_system_ram_bytes() -> Option<u64> {
32 #[cfg(target_os = "linux")]
33 {
34 linux_memtotal()
35 }
36 #[cfg(target_os = "macos")]
37 {
38 macos_memsize()
39 }
40 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
41 {
42 None
43 }
44}
45
46pub fn rss_limit_bytes() -> Option<u64> {
48 let sys_ram = get_system_ram_bytes()?;
49 let cfg = super::config::Config::load();
50 let pct = super::config::MemoryGuardConfig::effective(&cfg).max_ram_percent;
51 Some(sys_ram / 100 * u64::from(pct))
52}
53
54pub fn peak_rss_bytes() -> u64 {
56 PEAK_RSS.load(Ordering::Relaxed)
57}
58
59#[derive(Debug, Clone, serde::Serialize)]
61pub struct MemorySnapshot {
62 pub rss_bytes: u64,
63 pub peak_rss_bytes: u64,
64 pub system_ram_bytes: u64,
65 pub rss_limit_bytes: u64,
66 pub rss_percent: f64,
67 pub pressure_level: PressureLevel,
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize)]
71#[serde(rename_all = "lowercase")]
72pub enum PressureLevel {
73 Normal,
74 Soft,
75 Medium,
76 Hard,
77 Critical,
78}
79
80impl MemorySnapshot {
81 pub fn capture() -> Option<Self> {
82 let rss = get_rss_bytes()?;
83 let sys = get_system_ram_bytes()?;
84 let limit = rss_limit_bytes()?;
85 let pct = if sys > 0 {
86 (rss as f64 / sys as f64) * 100.0
87 } else {
88 0.0
89 };
90
91 PEAK_RSS.fetch_max(rss, Ordering::Relaxed);
92
93 let cfg = super::config::Config::load();
94 let guard_cfg = super::config::MemoryGuardConfig::effective(&cfg);
95 let base = f64::from(guard_cfg.max_ram_percent);
96
97 let level = if pct > base * 3.0 {
98 PressureLevel::Critical
99 } else if pct > base * 2.0 {
100 PressureLevel::Hard
101 } else if pct > base * 1.4 {
102 PressureLevel::Medium
103 } else if pct > base {
104 PressureLevel::Soft
105 } else {
106 PressureLevel::Normal
107 };
108
109 Some(Self {
110 rss_bytes: rss,
111 peak_rss_bytes: PEAK_RSS.load(Ordering::Relaxed),
112 system_ram_bytes: sys,
113 rss_limit_bytes: limit,
114 rss_percent: pct,
115 pressure_level: level,
116 })
117 }
118}
119
120pub fn jemalloc_purge() {
122 #[cfg(all(feature = "jemalloc", not(windows)))]
123 {
124 use tikv_jemalloc_ctl::raw;
125 let purge_mib = b"arena.4096.purge\0";
126 unsafe {
127 let _ = raw::write(purge_mib, 0u64);
128 }
129 }
130}
131
132pub fn abort_requested() -> bool {
134 ABORT_REQUESTED.load(Ordering::Relaxed)
135}
136
137pub fn is_under_pressure() -> bool {
140 let Some(snap) = MemorySnapshot::capture() else {
141 return false;
142 };
143 snap.pressure_level >= PressureLevel::Soft
144}
145
146pub fn start_guard(eviction_callback: Arc<dyn Fn(PressureLevel) + Send + Sync>) {
150 if GUARD_RUNNING.swap(true, Ordering::SeqCst) {
151 return;
152 }
153 std::thread::Builder::new()
154 .name("memory-guard".into())
155 .spawn(move || {
156 let mut poll_secs = 3u64;
157 loop {
158 std::thread::sleep(std::time::Duration::from_secs(poll_secs));
159 let Some(snap) = MemorySnapshot::capture() else {
160 continue;
161 };
162
163 if snap.pressure_level == PressureLevel::Critical {
164 tracing::error!(
165 "[memory_guard] CRITICAL: RSS={:.0}MB ({:.1}% of {:.0}GB) — \
166 aggressive eviction to prevent OS OOM kill",
167 snap.rss_bytes as f64 / 1_048_576.0,
168 snap.rss_percent,
169 snap.system_ram_bytes as f64 / 1_073_741_824.0,
170 );
171 ABORT_REQUESTED.store(true, Ordering::SeqCst);
172 (eviction_callback)(PressureLevel::Critical);
173 jemalloc_purge();
174
175 for attempt in 1..=3 {
176 std::thread::sleep(std::time::Duration::from_secs(2));
177 (eviction_callback)(PressureLevel::Critical);
178 jemalloc_purge();
179 if let Some(recheck) = MemorySnapshot::capture() {
180 if recheck.pressure_level < PressureLevel::Hard {
181 tracing::info!(
182 "[memory_guard] eviction attempt {attempt} succeeded — \
183 RSS={:.0}MB, pressure={:?}",
184 recheck.rss_bytes as f64 / 1_048_576.0,
185 recheck.pressure_level,
186 );
187 break;
188 }
189 tracing::error!(
190 "[memory_guard] eviction attempt {attempt}/3 — still {:?} \
191 (RSS={:.0}MB)",
192 recheck.pressure_level,
193 recheck.rss_bytes as f64 / 1_048_576.0,
194 );
195 }
196 }
197 }
198
199 if snap.pressure_level >= PressureLevel::Soft {
200 poll_secs = 1;
201 ABORT_REQUESTED
202 .store(snap.pressure_level >= PressureLevel::Hard, Ordering::SeqCst);
203 tracing::warn!(
204 "[memory_guard] pressure={:?} RSS={:.0}MB limit={:.0}MB ({:.1}% of {:.0}GB)",
205 snap.pressure_level,
206 snap.rss_bytes as f64 / 1_048_576.0,
207 snap.rss_limit_bytes as f64 / 1_048_576.0,
208 snap.rss_percent,
209 snap.system_ram_bytes as f64 / 1_073_741_824.0,
210 );
211 (eviction_callback)(snap.pressure_level);
212
213 if snap.pressure_level >= PressureLevel::Hard {
214 jemalloc_purge();
215 }
216 } else {
217 poll_secs = 3;
218 if ABORT_REQUESTED.load(Ordering::Relaxed) {
219 ABORT_REQUESTED.store(false, Ordering::SeqCst);
220 tracing::info!("[memory_guard] pressure normalized, clearing abort flag");
221 }
222 }
223 }
224 })
225 .ok();
226}
227
228pub fn force_purge() {
230 jemalloc_purge();
231 tracing::info!("[memory_guard] force_purge completed");
232}
233
234#[cfg(target_os = "linux")]
237fn linux_rss() -> Option<u64> {
238 let status = std::fs::read_to_string("/proc/self/status").ok()?;
239 for line in status.lines() {
240 if let Some(val) = line.strip_prefix("VmRSS:") {
241 let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
242 return Some(kb * 1024);
243 }
244 }
245 None
246}
247
248#[cfg(target_os = "linux")]
249fn linux_memtotal() -> Option<u64> {
250 let info = std::fs::read_to_string("/proc/meminfo").ok()?;
251 for line in info.lines() {
252 if let Some(val) = line.strip_prefix("MemTotal:") {
253 let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
254 return Some(kb * 1024);
255 }
256 }
257 None
258}
259
260#[cfg(target_os = "macos")]
261#[allow(deprecated, clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
262fn macos_rss() -> Option<u64> {
263 use std::mem;
264 let mut info: libc::mach_task_basic_info_data_t = unsafe { mem::zeroed() };
265 let mut count = (mem::size_of::<libc::mach_task_basic_info_data_t>()
266 / mem::size_of::<libc::natural_t>()) as libc::mach_msg_type_number_t;
267 let kr = unsafe {
268 libc::task_info(
269 libc::mach_task_self(),
270 libc::MACH_TASK_BASIC_INFO,
271 std::ptr::from_mut(&mut info).cast::<i32>(),
272 std::ptr::from_mut(&mut count),
273 )
274 };
275 if kr == libc::KERN_SUCCESS {
276 Some(info.resident_size)
277 } else {
278 None
279 }
280}
281
282#[cfg(target_os = "macos")]
283#[allow(clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
284fn macos_memsize() -> Option<u64> {
285 use std::mem;
286 let mut memsize: u64 = 0;
287 let mut len = mem::size_of::<u64>();
288 let name = b"hw.memsize\0";
289 let ret = unsafe {
290 libc::sysctlbyname(
291 name.as_ptr().cast(),
292 std::ptr::from_mut(&mut memsize).cast::<libc::c_void>(),
293 std::ptr::from_mut(&mut len),
294 std::ptr::null_mut(),
295 0,
296 )
297 };
298 if ret == 0 {
299 Some(memsize)
300 } else {
301 None
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308
309 #[test]
310 fn rss_returns_some_on_supported_os() {
311 if cfg!(any(target_os = "linux", target_os = "macos")) {
312 let rss = get_rss_bytes();
313 assert!(rss.is_some(), "RSS should be readable");
314 assert!(rss.unwrap() > 0, "RSS should be > 0");
315 }
316 }
317
318 #[test]
319 fn system_ram_returns_some_on_supported_os() {
320 if cfg!(any(target_os = "linux", target_os = "macos")) {
321 let ram = get_system_ram_bytes();
322 assert!(ram.is_some(), "System RAM should be readable");
323 assert!(ram.unwrap() > 1_000_000, "System RAM should be > 1MB");
324 }
325 }
326
327 #[test]
328 fn snapshot_captures_correctly() {
329 if cfg!(any(target_os = "linux", target_os = "macos")) {
330 let snap = MemorySnapshot::capture();
331 assert!(snap.is_some());
332 let s = snap.unwrap();
333 assert!(s.rss_bytes > 0);
334 assert!(s.system_ram_bytes > s.rss_bytes);
335 assert!(s.rss_percent > 0.0 && s.rss_percent < 100.0);
336 }
337 }
338
339 #[test]
340 fn peak_rss_tracks_maximum() {
341 PEAK_RSS.store(0, Ordering::Relaxed);
342 PEAK_RSS.fetch_max(100, Ordering::Relaxed);
343 PEAK_RSS.fetch_max(50, Ordering::Relaxed);
344 assert_eq!(PEAK_RSS.load(Ordering::Relaxed), 100);
345 }
346}