1use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
8use std::sync::Arc;
9
10static PEAK_RSS: AtomicU64 = AtomicU64::new(0);
11static GUARD_RUNNING: AtomicBool = AtomicBool::new(false);
12static ABORT_REQUESTED: AtomicBool = AtomicBool::new(false);
13static CURRENT_PRESSURE: AtomicU8 = AtomicU8::new(0);
14
15pub fn get_rss_bytes() -> Option<u64> {
17 #[cfg(target_os = "linux")]
18 {
19 linux_rss()
20 }
21 #[cfg(target_os = "macos")]
22 {
23 macos_rss()
24 }
25 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
26 {
27 None
28 }
29}
30
31pub fn get_system_ram_bytes() -> Option<u64> {
33 #[cfg(target_os = "linux")]
34 {
35 linux_memtotal()
36 }
37 #[cfg(target_os = "macos")]
38 {
39 macos_memsize()
40 }
41 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
42 {
43 None
44 }
45}
46
47pub fn rss_limit_bytes() -> Option<u64> {
49 let sys_ram = get_system_ram_bytes()?;
50 let cfg = super::config::Config::load();
51 let pct = super::config::MemoryGuardConfig::effective(&cfg).max_ram_percent;
52 Some(sys_ram / 100 * u64::from(pct))
53}
54
55pub fn peak_rss_bytes() -> u64 {
57 PEAK_RSS.load(Ordering::Relaxed)
58}
59
60#[derive(Debug, Clone, serde::Serialize)]
62pub struct MemorySnapshot {
63 pub rss_bytes: u64,
64 pub peak_rss_bytes: u64,
65 pub system_ram_bytes: u64,
66 pub rss_limit_bytes: u64,
67 pub rss_percent: f64,
68 pub pressure_level: PressureLevel,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize)]
72#[serde(rename_all = "lowercase")]
73#[repr(u8)]
74pub enum PressureLevel {
75 Normal = 0,
76 Soft = 1,
77 Medium = 2,
78 Hard = 3,
79 Critical = 4,
80}
81
82impl PressureLevel {
83 fn from_u8(v: u8) -> Self {
84 match v {
85 1 => Self::Soft,
86 2 => Self::Medium,
87 3 => Self::Hard,
88 4 => Self::Critical,
89 _ => Self::Normal,
90 }
91 }
92}
93
94impl MemorySnapshot {
95 pub fn capture() -> Option<Self> {
96 let rss = get_rss_bytes()?;
97 let sys = get_system_ram_bytes()?;
98 let limit = rss_limit_bytes()?;
99 let pct = if sys > 0 {
100 (rss as f64 / sys as f64) * 100.0
101 } else {
102 0.0
103 };
104
105 PEAK_RSS.fetch_max(rss, Ordering::Relaxed);
106
107 let cfg = super::config::Config::load();
108 let guard_cfg = super::config::MemoryGuardConfig::effective(&cfg);
109 let base = f64::from(guard_cfg.max_ram_percent);
110
111 let level = if pct > base * 3.0 {
112 PressureLevel::Critical
113 } else if pct > base * 2.0 {
114 PressureLevel::Hard
115 } else if pct > base * 1.4 {
116 PressureLevel::Medium
117 } else if pct > base {
118 PressureLevel::Soft
119 } else {
120 PressureLevel::Normal
121 };
122
123 Some(Self {
124 rss_bytes: rss,
125 peak_rss_bytes: PEAK_RSS.load(Ordering::Relaxed),
126 system_ram_bytes: sys,
127 rss_limit_bytes: limit,
128 rss_percent: pct,
129 pressure_level: level,
130 })
131 }
132}
133
134pub fn jemalloc_purge() {
138 #[cfg(all(feature = "jemalloc", not(windows)))]
139 {
140 use tikv_jemalloc_ctl::raw;
141 let purge_mib = b"arena.4096.purge\0";
142 unsafe {
143 if let Err(e) = raw::write(purge_mib, 0u64) {
144 tracing::debug!("[memory_guard] jemalloc purge failed: {e}");
145 }
146 }
147 }
148}
149
150pub fn abort_requested() -> bool {
152 ABORT_REQUESTED.load(Ordering::Relaxed)
153}
154
155pub fn is_under_pressure() -> bool {
158 current_pressure() >= PressureLevel::Soft
159}
160
161pub fn current_pressure() -> PressureLevel {
163 PressureLevel::from_u8(CURRENT_PRESSURE.load(Ordering::Relaxed))
164}
165
166pub fn start_guard(eviction_callback: Arc<dyn Fn(PressureLevel) + Send + Sync>) {
170 if GUARD_RUNNING.swap(true, Ordering::SeqCst) {
171 return;
172 }
173 std::thread::Builder::new()
174 .name("memory-guard".into())
175 .spawn(move || {
176 let mut poll_secs = 3u64;
177 loop {
178 std::thread::sleep(std::time::Duration::from_secs(poll_secs));
179 let Some(snap) = MemorySnapshot::capture() else {
180 continue;
181 };
182
183 CURRENT_PRESSURE.store(snap.pressure_level as u8, Ordering::Relaxed);
184
185 if snap.pressure_level == PressureLevel::Critical {
186 tracing::error!(
187 "[memory_guard] CRITICAL: RSS={:.0}MB ({:.1}% of {:.0}GB) — \
188 aggressive eviction to prevent OS OOM kill",
189 snap.rss_bytes as f64 / 1_048_576.0,
190 snap.rss_percent,
191 snap.system_ram_bytes as f64 / 1_073_741_824.0,
192 );
193 ABORT_REQUESTED.store(true, Ordering::SeqCst);
194 (eviction_callback)(PressureLevel::Critical);
195 jemalloc_purge();
196
197 for attempt in 1..=3 {
198 std::thread::sleep(std::time::Duration::from_secs(2));
199 (eviction_callback)(PressureLevel::Critical);
200 jemalloc_purge();
201 if let Some(recheck) = MemorySnapshot::capture() {
202 if recheck.pressure_level < PressureLevel::Hard {
203 tracing::info!(
204 "[memory_guard] eviction attempt {attempt} succeeded — \
205 RSS={:.0}MB, pressure={:?}",
206 recheck.rss_bytes as f64 / 1_048_576.0,
207 recheck.pressure_level,
208 );
209 break;
210 }
211 tracing::error!(
212 "[memory_guard] eviction attempt {attempt}/3 — still {:?} \
213 (RSS={:.0}MB)",
214 recheck.pressure_level,
215 recheck.rss_bytes as f64 / 1_048_576.0,
216 );
217 }
218 }
219 }
220
221 if snap.pressure_level >= PressureLevel::Soft {
222 poll_secs = 1;
223 ABORT_REQUESTED
224 .store(snap.pressure_level >= PressureLevel::Hard, Ordering::SeqCst);
225 tracing::warn!(
226 "[memory_guard] pressure={:?} RSS={:.0}MB limit={:.0}MB ({:.1}% of {:.0}GB)",
227 snap.pressure_level,
228 snap.rss_bytes as f64 / 1_048_576.0,
229 snap.rss_limit_bytes as f64 / 1_048_576.0,
230 snap.rss_percent,
231 snap.system_ram_bytes as f64 / 1_073_741_824.0,
232 );
233 (eviction_callback)(snap.pressure_level);
234
235 if snap.pressure_level >= PressureLevel::Hard {
236 jemalloc_purge();
237 }
238 } else {
239 poll_secs = 3;
240 if ABORT_REQUESTED.load(Ordering::Relaxed) {
241 ABORT_REQUESTED.store(false, Ordering::SeqCst);
242 tracing::info!("[memory_guard] pressure normalized, clearing abort flag");
243 }
244 }
245 }
246 })
247 .ok();
248}
249
250pub fn force_purge() {
252 jemalloc_purge();
253 tracing::info!("[memory_guard] force_purge completed");
254}
255
256#[cfg(target_os = "linux")]
259fn linux_rss() -> Option<u64> {
260 let status = std::fs::read_to_string("/proc/self/status").ok()?;
261 for line in status.lines() {
262 if let Some(val) = line.strip_prefix("VmRSS:") {
263 let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
264 return Some(kb * 1024);
265 }
266 }
267 None
268}
269
270#[cfg(target_os = "linux")]
271fn linux_memtotal() -> Option<u64> {
272 let info = std::fs::read_to_string("/proc/meminfo").ok()?;
273 for line in info.lines() {
274 if let Some(val) = line.strip_prefix("MemTotal:") {
275 let kb: u64 = val.trim().trim_end_matches(" kB").trim().parse().ok()?;
276 return Some(kb * 1024);
277 }
278 }
279 None
280}
281
282#[cfg(target_os = "macos")]
283#[allow(deprecated, clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
284fn macos_rss() -> Option<u64> {
285 use std::mem;
286 let mut info: libc::mach_task_basic_info_data_t = unsafe { mem::zeroed() };
287 let mut count = (mem::size_of::<libc::mach_task_basic_info_data_t>()
288 / mem::size_of::<libc::natural_t>()) as libc::mach_msg_type_number_t;
289 let kr = unsafe {
290 libc::task_info(
291 libc::mach_task_self(),
292 libc::MACH_TASK_BASIC_INFO,
293 std::ptr::from_mut(&mut info).cast::<i32>(),
294 std::ptr::from_mut(&mut count),
295 )
296 };
297 if kr == libc::KERN_SUCCESS {
298 Some(info.resident_size)
299 } else {
300 None
301 }
302}
303
304#[cfg(target_os = "macos")]
305#[allow(clippy::borrow_as_ptr, clippy::ptr_as_ptr)]
306fn macos_memsize() -> Option<u64> {
307 use std::mem;
308 let mut memsize: u64 = 0;
309 let mut len = mem::size_of::<u64>();
310 let name = b"hw.memsize\0";
311 let ret = unsafe {
312 libc::sysctlbyname(
313 name.as_ptr().cast(),
314 std::ptr::from_mut(&mut memsize).cast::<libc::c_void>(),
315 std::ptr::from_mut(&mut len),
316 std::ptr::null_mut(),
317 0,
318 )
319 };
320 if ret == 0 {
321 Some(memsize)
322 } else {
323 None
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[test]
332 fn rss_returns_some_on_supported_os() {
333 if cfg!(any(target_os = "linux", target_os = "macos")) {
334 let rss = get_rss_bytes();
335 assert!(rss.is_some(), "RSS should be readable");
336 assert!(rss.unwrap() > 0, "RSS should be > 0");
337 }
338 }
339
340 #[test]
341 fn system_ram_returns_some_on_supported_os() {
342 if cfg!(any(target_os = "linux", target_os = "macos")) {
343 let ram = get_system_ram_bytes();
344 assert!(ram.is_some(), "System RAM should be readable");
345 assert!(ram.unwrap() > 1_000_000, "System RAM should be > 1MB");
346 }
347 }
348
349 #[test]
350 fn snapshot_captures_correctly() {
351 if cfg!(any(target_os = "linux", target_os = "macos")) {
352 let snap = MemorySnapshot::capture();
353 assert!(snap.is_some());
354 let s = snap.unwrap();
355 assert!(s.rss_bytes > 0);
356 assert!(s.system_ram_bytes > s.rss_bytes);
357 assert!(s.rss_percent > 0.0 && s.rss_percent < 100.0);
358 }
359 }
360
361 #[test]
362 fn peak_rss_tracks_maximum() {
363 PEAK_RSS.store(0, Ordering::Relaxed);
364 PEAK_RSS.fetch_max(100, Ordering::Relaxed);
365 PEAK_RSS.fetch_max(50, Ordering::Relaxed);
366 assert_eq!(PEAK_RSS.load(Ordering::Relaxed), 100);
367 }
368
369 #[test]
370 fn pressure_level_roundtrip() {
371 for level in [
372 PressureLevel::Normal,
373 PressureLevel::Soft,
374 PressureLevel::Medium,
375 PressureLevel::Hard,
376 PressureLevel::Critical,
377 ] {
378 assert_eq!(PressureLevel::from_u8(level as u8), level);
379 }
380 }
381
382 #[test]
383 fn atomic_pressure_defaults_to_normal() {
384 assert_eq!(current_pressure(), PressureLevel::Normal);
385 }
386}