reddb_server/runtime/
disk_space_monitor.rs1use std::path::{Path, PathBuf};
12use std::time::{Duration, Instant};
13
14use crate::telemetry::operator_event::OperatorEvent;
15
16const DEBOUNCE: Duration = Duration::from_secs(30);
18
19const POLL_INTERVAL: Duration = Duration::from_secs(30);
21
22pub struct DiskSpaceMonitor {
24 path: PathBuf,
25 critical_pct: u8,
27}
28
29impl DiskSpaceMonitor {
30 pub fn new(path: impl Into<PathBuf>, critical_pct: u8) -> Self {
31 Self {
32 path: path.into(),
33 critical_pct: critical_pct.clamp(1, 99),
34 }
35 }
36
37 pub fn with_default_threshold(path: impl Into<PathBuf>) -> Self {
38 Self::new(path, 90)
39 }
40
41 pub fn spawn(self) {
46 let path = self.path;
47 let critical_pct = self.critical_pct;
48
49 if let Ok(handle) = tokio::runtime::Handle::try_current() {
50 handle.spawn(run(path, critical_pct));
51 return;
52 }
53
54 std::thread::Builder::new()
55 .name("reddb-disk-space-monitor".into())
56 .spawn(move || {
57 let runtime = tokio::runtime::Builder::new_current_thread()
58 .enable_time()
59 .build()
60 .expect("disk space monitor runtime");
61 runtime.block_on(run(path, critical_pct));
62 })
63 .expect("disk space monitor thread spawn");
64 }
65}
66
67async fn run(path: PathBuf, critical_pct: u8) {
68 #[cfg(target_os = "linux")]
69 {
70 if run_fanotify(&path, critical_pct).await {
71 return;
72 }
73 }
75 run_poll(&path, critical_pct).await;
76}
77
78fn check(path: &Path, critical_pct: u8, last_emit: &mut Option<Instant>) -> bool {
85 let (free, total) = match disk_free_total(path) {
86 Some(pair) => pair,
87 None => return false,
88 };
89 if total == 0 {
90 return false;
91 }
92 let used = total.saturating_sub(free);
93 let used_pct = used as f64 / total as f64 * 100.0;
94 if used_pct >= critical_pct as f64 {
95 let should_emit = last_emit.is_none_or(|t| t.elapsed() >= DEBOUNCE);
96 if should_emit {
97 let threshold_bytes = (total as f64 * ((100 - critical_pct) as f64 / 100.0)) as u64;
98 OperatorEvent::DiskSpaceCritical {
99 path: path.to_string_lossy().into_owned(),
100 available_bytes: free,
101 threshold_bytes,
102 }
103 .emit_global();
104 *last_emit = Some(Instant::now());
105 }
106 return true;
107 }
108 false
109}
110
111fn disk_free_total(path: &Path) -> Option<(u64, u64)> {
112 let free = fs2::free_space(path).ok()?;
113 let total = fs2::total_space(path).ok()?;
114 Some((free, total))
115}
116
117#[cfg(target_os = "linux")]
122async fn run_fanotify(path: &Path, critical_pct: u8) -> bool {
123 match FanotifyWatcher::open(path) {
124 Ok(watcher) => {
125 let mut last_emit: Option<Instant> = None;
126 watcher.run_loop(path, critical_pct, &mut last_emit).await;
127 true
128 }
129 Err(_) => false,
130 }
131}
132
133#[cfg(target_os = "linux")]
134struct FanotifyWatcher {
135 fd: libc::c_int,
136}
137
138#[cfg(target_os = "linux")]
139impl FanotifyWatcher {
140 fn open(path: &Path) -> Result<Self, ()> {
141 use std::ffi::CString;
142 use std::os::unix::ffi::OsStrExt;
143
144 let fd = unsafe {
146 libc::fanotify_init(
147 libc::FAN_CLOEXEC | libc::FAN_CLASS_NOTIF,
148 libc::O_RDONLY as libc::c_uint,
149 )
150 };
151 if fd < 0 {
152 return Err(());
153 }
154
155 let path_cstr = match CString::new(path.as_os_str().as_bytes()) {
156 Ok(s) => s,
157 Err(_) => {
158 unsafe { libc::close(fd) };
159 return Err(());
160 }
161 };
162
163 let rc = unsafe {
165 libc::fanotify_mark(
166 fd,
167 libc::FAN_MARK_ADD | libc::FAN_MARK_MOUNT,
168 libc::FAN_CLOSE_WRITE,
169 libc::AT_FDCWD,
170 path_cstr.as_ptr(),
171 )
172 };
173 if rc < 0 {
174 unsafe { libc::close(fd) };
175 return Err(());
176 }
177
178 Ok(Self { fd })
179 }
180
181 async fn run_loop(&self, path: &Path, critical_pct: u8, last_emit: &mut Option<Instant>) {
184 let fd = self.fd;
185 let path = path.to_path_buf();
186
187 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(64);
189
190 std::thread::spawn(move || {
193 let mut buf = [0u8; 4096];
194 loop {
195 let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
196 if n <= 0 {
197 break;
198 }
199 if tx.blocking_send(()).is_err() {
201 break;
202 }
203 }
204 });
205
206 while rx.recv().await.is_some() {
207 check(&path, critical_pct, last_emit);
208 }
209 }
210}
211
212#[cfg(target_os = "linux")]
213impl Drop for FanotifyWatcher {
214 fn drop(&mut self) {
215 unsafe { libc::close(self.fd) };
216 }
217}
218
219async fn run_poll(path: &Path, critical_pct: u8) {
224 let mut last_emit: Option<Instant> = None;
225 let mut interval = tokio::time::interval(POLL_INTERVAL);
226 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
227 loop {
228 interval.tick().await;
229 check(path, critical_pct, &mut last_emit);
230 }
231}
232
233#[cfg(test)]
238mod tests {
239 use super::*;
240 use std::time::Instant;
241
242 #[test]
243 fn check_no_emit_below_threshold() {
244 let path = std::env::temp_dir();
247 let mut last: Option<Instant> = None;
248 let fired = check(&path, 99, &mut last);
250 if !fired {
253 assert!(last.is_none());
254 }
255 }
256
257 #[test]
258 fn check_threshold_zero_excluded_by_clamp() {
259 let monitor = DiskSpaceMonitor::new("/tmp", 0);
261 assert_eq!(monitor.critical_pct, 1);
262 }
263
264 #[test]
265 fn check_threshold_100_excluded_by_clamp() {
266 let monitor = DiskSpaceMonitor::new("/tmp", 100);
267 assert_eq!(monitor.critical_pct, 99);
268 }
269
270 #[test]
271 fn debounce_suppresses_second_emit() {
272 let mut last: Option<Instant> = Some(Instant::now()); let fired = check(Path::new("/nonexistent-path-for-test"), 1, &mut last);
277 assert!(!fired); }
279
280 #[test]
281 fn disk_free_total_returns_values_for_tmp() {
282 let result = disk_free_total(Path::new("/tmp"));
283 assert!(result.is_some(), "statvfs /tmp should succeed");
284 let (free, total) = result.unwrap();
285 assert!(total > 0);
286 assert!(free <= total);
287 }
288}