1use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
8use std::sync::Arc;
9
10static PEAK_RSS: AtomicU64 = AtomicU64::new(0);
11static GUARD_RUNNING: AtomicBool = AtomicBool::new(false);
12static ABORT_REQUESTED: AtomicBool = AtomicBool::new(false);
13static CURRENT_PRESSURE: AtomicU8 = AtomicU8::new(0);
14
15pub fn get_rss_bytes() -> Option<u64> {
17 #[cfg(target_os = "linux")]
18 {
19 linux_rss()
20 }
21 #[cfg(target_os = "macos")]
22 {
23 macos_rss()
24 }
25 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
26 {
27 None
28 }
29}
30
31pub fn get_rss_bytes_for_pid(pid: u32) -> Option<u64> {
33 #[cfg(target_os = "linux")]
34 {
35 linux_rss_for_pid(pid)
36 }
37 #[cfg(target_os = "macos")]
38 {
39 macos_rss_for_pid(pid)
40 }
41 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
42 {
43 let _ = pid;
44 None
45 }
46}
47
48pub fn get_system_ram_bytes() -> Option<u64> {
50 #[cfg(target_os = "linux")]
51 {
52 linux_memtotal()
53 }
54 #[cfg(target_os = "macos")]
55 {
56 macos_memsize()
57 }
58 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
59 {
60 None
61 }
62}
63
64pub fn rss_limit_bytes() -> Option<u64> {
66 let sys_ram = get_system_ram_bytes()?;
67 let cfg = super::config::Config::load();
68 let pct = super::config::MemoryGuardConfig::effective(&cfg).max_ram_percent;
69 Some(sys_ram / 100 * u64::from(pct))
70}
71
72pub fn peak_rss_bytes() -> u64 {
74 PEAK_RSS.load(Ordering::Relaxed)
75}
76
77#[derive(Debug, Clone, serde::Serialize)]
79pub struct MemorySnapshot {
80 pub rss_bytes: u64,
81 pub peak_rss_bytes: u64,
82 pub system_ram_bytes: u64,
83 pub rss_limit_bytes: u64,
84 pub rss_percent: f64,
85 pub pressure_level: PressureLevel,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize)]
89#[serde(rename_all = "lowercase")]
90#[repr(u8)]
91pub enum PressureLevel {
92 Normal = 0,
93 Soft = 1,
94 Medium = 2,
95 Hard = 3,
96 Critical = 4,
97}
98
99impl PressureLevel {
100 fn from_u8(v: u8) -> Self {
101 match v {
102 1 => Self::Soft,
103 2 => Self::Medium,
104 3 => Self::Hard,
105 4 => Self::Critical,
106 _ => Self::Normal,
107 }
108 }
109}
110
111impl MemorySnapshot {
112 pub fn capture() -> Option<Self> {
114 Self::capture_impl(get_rss_bytes()?)
115 }
116
117 pub fn capture_for_pid(pid: u32) -> Option<Self> {
120 let rss = get_rss_bytes_for_pid(pid).or_else(get_rss_bytes)?;
121 Self::capture_impl(rss)
122 }
123
124 fn capture_impl(rss: u64) -> Option<Self> {
125 let sys = get_system_ram_bytes()?;
126 let limit = rss_limit_bytes()?;
127 let pct = if sys > 0 {
128 (rss as f64 / sys as f64) * 100.0
129 } else {
130 0.0
131 };
132
133 PEAK_RSS.fetch_max(rss, Ordering::Relaxed);
134
135 let cfg = super::config::Config::load();
136 let guard_cfg = super::config::MemoryGuardConfig::effective(&cfg);
137 let base = f64::from(guard_cfg.max_ram_percent);
138
139 let level = if pct > base * 3.0 {
140 PressureLevel::Critical
141 } else if pct > base * 2.0 {
142 PressureLevel::Hard
143 } else if pct > base * 1.4 {
144 PressureLevel::Medium
145 } else if pct > base {
146 PressureLevel::Soft
147 } else {
148 PressureLevel::Normal
149 };
150
151 Some(Self {
152 rss_bytes: rss,
153 peak_rss_bytes: PEAK_RSS.load(Ordering::Relaxed),
154 system_ram_bytes: sys,
155 rss_limit_bytes: limit,
156 rss_percent: pct,
157 pressure_level: level,
158 })
159 }
160}
161
162pub fn jemalloc_purge() {
166 #[cfg(all(feature = "jemalloc", not(windows)))]
167 {
168 use tikv_jemalloc_ctl::raw;
169 let purge_mib = b"arena.4096.purge\0";
170 unsafe {
171 if let Err(e) = raw::write(purge_mib, 0u64) {
172 tracing::debug!("[memory_guard] jemalloc purge failed: {e}");
173 }
174 }
175 }
176}
177
178pub fn abort_requested() -> bool {
180 ABORT_REQUESTED.load(Ordering::Relaxed)
181}
182
183pub fn is_under_pressure() -> bool {
186 current_pressure() >= PressureLevel::Soft
187}
188
189pub fn current_pressure() -> PressureLevel {
191 PressureLevel::from_u8(CURRENT_PRESSURE.load(Ordering::Relaxed))
192}
193
194pub fn start_guard(eviction_callback: Arc<dyn Fn(PressureLevel) + Send + Sync>) {
198 if GUARD_RUNNING.swap(true, Ordering::SeqCst) {
199 return;
200 }
201 std::thread::Builder::new()
202 .name("memory-guard".into())
203 .spawn(move || {
204 let mut poll_secs = 3u64;
205 loop {
206 std::thread::sleep(std::time::Duration::from_secs(poll_secs));
207 let Some(snap) = MemorySnapshot::capture() else {
208 continue;
209 };
210
211 CURRENT_PRESSURE.store(snap.pressure_level as u8, Ordering::Relaxed);
212
213 if snap.pressure_level == PressureLevel::Critical {
214 tracing::error!(
215 "[memory_guard] CRITICAL: RSS={:.0}MB ({:.1}% of {:.0}GB) — \
216 aggressive eviction to prevent OS OOM kill",
217 snap.rss_bytes as f64 / 1_048_576.0,
218 snap.rss_percent,
219 snap.system_ram_bytes as f64 / 1_073_741_824.0,
220 );
221 ABORT_REQUESTED.store(true, Ordering::SeqCst);
222 (eviction_callback)(PressureLevel::Critical);
223 jemalloc_purge();
224
225 for attempt in 1..=3 {
226 std::thread::sleep(std::time::Duration::from_secs(2));
227 (eviction_callback)(PressureLevel::Critical);
228 jemalloc_purge();
229 if let Some(recheck) = MemorySnapshot::capture() {
230 if recheck.pressure_level < PressureLevel::Hard {
231 tracing::info!(
232 "[memory_guard] eviction attempt {attempt} succeeded — \
233 RSS={:.0}MB, pressure={:?}",
234 recheck.rss_bytes as f64 / 1_048_576.0,
235 recheck.pressure_level,
236 );
237 break;
238 }
239 tracing::error!(
240 "[memory_guard] eviction attempt {attempt}/3 — still {:?} \
241 (RSS={:.0}MB)",
242 recheck.pressure_level,
243 recheck.rss_bytes as f64 / 1_048_576.0,
244 );
245 }
246 }
247 }
248
249 if snap.pressure_level >= PressureLevel::Soft {
250 poll_secs = 1;
251 ABORT_REQUESTED
252 .store(snap.pressure_level >= PressureLevel::Hard, Ordering::SeqCst);
253 tracing::warn!(
254 "[memory_guard] pressure={:?} RSS={:.0}MB limit={:.0}MB ({:.1}% of {:.0}GB)",
255 snap.pressure_level,
256 snap.rss_bytes as f64 / 1_048_576.0,
257 snap.rss_limit_bytes as f64 / 1_048_576.0,
258 snap.rss_percent,
259 snap.system_ram_bytes as f64 / 1_073_741_824.0,
260 );
261 (eviction_callback)(snap.pressure_level);
262
263 if snap.pressure_level >= PressureLevel::Hard {
264 jemalloc_purge();
265 }
266 } else {
267 poll_secs = 3;
268 if ABORT_REQUESTED.load(Ordering::Relaxed) {
269 ABORT_REQUESTED.store(false, Ordering::SeqCst);
270 tracing::info!("[memory_guard] pressure normalized, clearing abort flag");
271 }
272 }
273 }
274 })
275 .ok();
276}
277
278pub fn force_purge() {
280 jemalloc_purge();
281 tracing::info!("[memory_guard] force_purge completed");
282}
283
284#[cfg(target_os = "linux")]
287fn linux_rss() -> Option<u64> {
288 linux_rss_for_pid(std::process::id())
289}
290
291#[cfg(target_os = "linux")]
292fn linux_rss_for_pid(pid: u32) -> Option<u64> {
293 let path = format!("/proc/{pid}/status");
294 let status = std::fs::read_to_string(path).ok()?;
295 for line in status.lines() {
296 if let Some(val) = line.strip_prefix("VmRSS:") {
297 let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
298 return Some(kb * 1024);
299 }
300 }
301 None
302}
303
304#[cfg(target_os = "linux")]
305fn linux_memtotal() -> Option<u64> {
306 let info = std::fs::read_to_string("/proc/meminfo").ok()?;
307 for line in info.lines() {
308 if let Some(val) = line.strip_prefix("MemTotal:") {
309 let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
310 return Some(kb * 1024);
311 }
312 }
313 None
314}
315
316#[cfg(target_os = "macos")]
317#[allow(deprecated, clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
318fn macos_rss() -> Option<u64> {
319 use std::mem;
320 let mut info: libc::mach_task_basic_info_data_t = unsafe { mem::zeroed() };
321 let mut count = (mem::size_of::<libc::mach_task_basic_info_data_t>()
322 / mem::size_of::<libc::natural_t>()) as libc::mach_msg_type_number_t;
323 let kr = unsafe {
324 libc::task_info(
325 libc::mach_task_self(),
326 libc::MACH_TASK_BASIC_INFO,
327 std::ptr::from_mut(&mut info).cast::<i32>(),
328 std::ptr::from_mut(&mut count),
329 )
330 };
331 if kr == libc::KERN_SUCCESS {
332 Some(info.resident_size)
333 } else {
334 None
335 }
336}
337
338#[cfg(target_os = "macos")]
339fn macos_rss_for_pid(pid: u32) -> Option<u64> {
340 let output = std::process::Command::new("ps")
343 .args(["-o", "rss=", "-p", &pid.to_string()])
344 .output()
345 .ok()?;
346 if !output.status.success() {
347 return None;
348 }
349 let text = String::from_utf8_lossy(&output.stdout);
350 let kb: u64 = text.trim().parse().ok()?;
351 Some(kb * 1024)
352}
353
354#[cfg(target_os = "macos")]
355#[allow(clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
356fn macos_memsize() -> Option<u64> {
357 use std::mem;
358 let mut memsize: u64 = 0;
359 let mut len = mem::size_of::<u64>();
360 let name = b"hw.memsize\0";
361 let ret = unsafe {
362 libc::sysctlbyname(
363 name.as_ptr().cast(),
364 std::ptr::from_mut(&mut memsize).cast::<libc::c_void>(),
365 std::ptr::from_mut(&mut len),
366 std::ptr::null_mut(),
367 0,
368 )
369 };
370 if ret == 0 {
371 Some(memsize)
372 } else {
373 None
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 #[test]
382 fn rss_returns_some_on_supported_os() {
383 if cfg!(any(target_os = "linux", target_os = "macos")) {
384 let rss = get_rss_bytes();
385 assert!(rss.is_some(), "RSS should be readable");
386 assert!(rss.unwrap() > 0, "RSS should be > 0");
387 }
388 }
389
390 #[test]
391 fn system_ram_returns_some_on_supported_os() {
392 if cfg!(any(target_os = "linux", target_os = "macos")) {
393 let ram = get_system_ram_bytes();
394 assert!(ram.is_some(), "System RAM should be readable");
395 assert!(ram.unwrap() > 1_000_000, "System RAM should be > 1MB");
396 }
397 }
398
399 #[test]
400 fn snapshot_captures_correctly() {
401 if cfg!(any(target_os = "linux", target_os = "macos")) {
402 let snap = MemorySnapshot::capture();
403 assert!(snap.is_some());
404 let s = snap.unwrap();
405 assert!(s.rss_bytes > 0);
406 assert!(s.system_ram_bytes > s.rss_bytes);
407 assert!(s.rss_percent > 0.0 && s.rss_percent < 100.0);
408 }
409 }
410
411 #[test]
412 fn peak_rss_tracks_maximum() {
413 PEAK_RSS.store(0, Ordering::Relaxed);
414 PEAK_RSS.fetch_max(100, Ordering::Relaxed);
415 PEAK_RSS.fetch_max(50, Ordering::Relaxed);
416 assert_eq!(PEAK_RSS.load(Ordering::Relaxed), 100);
417 }
418
419 #[test]
420 fn pressure_level_roundtrip() {
421 for level in [
422 PressureLevel::Normal,
423 PressureLevel::Soft,
424 PressureLevel::Medium,
425 PressureLevel::Hard,
426 PressureLevel::Critical,
427 ] {
428 assert_eq!(PressureLevel::from_u8(level as u8), level);
429 }
430 }
431
432 #[test]
433 fn atomic_pressure_defaults_to_normal() {
434 assert_eq!(current_pressure(), PressureLevel::Normal);
435 }
436
437 #[test]
438 fn rss_for_own_pid_matches_self() {
439 if cfg!(any(target_os = "linux", target_os = "macos")) {
440 let self_rss = get_rss_bytes().unwrap();
441 let pid_rss = get_rss_bytes_for_pid(std::process::id()).unwrap();
442 let ratio = self_rss as f64 / pid_rss as f64;
443 assert!(
444 (0.5..2.0).contains(&ratio),
445 "self RSS ({self_rss}) and pid-based RSS ({pid_rss}) should be within 2x"
446 );
447 }
448 }
449
450 #[test]
451 fn rss_for_dead_pid_returns_none() {
452 let dead_pid = 999_999_999u32;
453 assert!(get_rss_bytes_for_pid(dead_pid).is_none());
454 }
455
456 #[test]
457 fn capture_for_pid_falls_back_on_dead_pid() {
458 if cfg!(any(target_os = "linux", target_os = "macos")) {
459 let snap = MemorySnapshot::capture_for_pid(999_999_999);
460 assert!(snap.is_some(), "should fall back to self RSS");
461 }
462 }
463}