Skip to main content

reddb_server/runtime/
disk_space_monitor.rs

1//! DiskSpaceMonitor — edge-triggered disk space watchdog.
2//!
3//! Linux: opens a fanotify group watching FAN_CLOSE_WRITE on the
4//! data directory's mount point. Every write event triggers a
5//! statvfs check; if used% ≥ threshold and the debounce window has
6//! cleared, emits OperatorEvent::DiskSpaceCritical. Falls back to
7//! polling when fanotify_init returns EPERM (unprivileged container).
8//!
9//! Non-Linux: polls via a tokio timer at POLL_INTERVAL.
10
11use std::path::{Path, PathBuf};
12use std::time::{Duration, Instant};
13
14use crate::telemetry::operator_event::OperatorEvent;
15
16/// Debounce window: don't re-emit within this duration after the last emit.
17const DEBOUNCE: Duration = Duration::from_secs(30);
18
19/// Poll interval for the non-fanotify fallback path (non-Linux or EPERM).
20const POLL_INTERVAL: Duration = Duration::from_secs(30);
21
22/// Background disk-space watchdog. Spawn with [`DiskSpaceMonitor::spawn`].
23pub struct DiskSpaceMonitor {
24    path: PathBuf,
25    /// 1–99. Default 90 (= emit when used% ≥ 90).
26    critical_pct: u8,
27}
28
29impl DiskSpaceMonitor {
30    pub fn new(path: impl Into<PathBuf>, critical_pct: u8) -> Self {
31        Self {
32            path: path.into(),
33            critical_pct: critical_pct.clamp(1, 99),
34        }
35    }
36
37    pub fn with_default_threshold(path: impl Into<PathBuf>) -> Self {
38        Self::new(path, 90)
39    }
40
41    /// Spawn the monitor as detached background work. When the caller is inside
42    /// a Tokio runtime this uses that runtime; otherwise it creates a small
43    /// current-thread runtime for the monitor. The monitor is expected to live
44    /// for the full server lifetime, so no cancellation handle is exposed.
45    pub fn spawn(self) {
46        let path = self.path;
47        let critical_pct = self.critical_pct;
48
49        if let Ok(handle) = tokio::runtime::Handle::try_current() {
50            handle.spawn(run(path, critical_pct));
51            return;
52        }
53
54        std::thread::Builder::new()
55            .name("reddb-disk-space-monitor".into())
56            .spawn(move || {
57                let runtime = tokio::runtime::Builder::new_current_thread()
58                    .enable_time()
59                    .build()
60                    .expect("disk space monitor runtime");
61                runtime.block_on(run(path, critical_pct));
62            })
63            .expect("disk space monitor thread spawn");
64    }
65}
66
67async fn run(path: PathBuf, critical_pct: u8) {
68    #[cfg(target_os = "linux")]
69    {
70        if run_fanotify(&path, critical_pct).await {
71            return;
72        }
73        // fanotify failed (EPERM / unsupported kernel) — fall through to poll.
74    }
75    run_poll(&path, critical_pct).await;
76}
77
78// ---------------------------------------------------------------------------
79// Shared: check disk usage and conditionally emit
80// ---------------------------------------------------------------------------
81
82/// Returns `true` if used% ≥ `critical_pct` and the event was (or would be)
83/// emitted. `last_emit` is updated on each actual emission.
84fn check(path: &Path, critical_pct: u8, last_emit: &mut Option<Instant>) -> bool {
85    let (free, total) = match disk_free_total(path) {
86        Some(pair) => pair,
87        None => return false,
88    };
89    if total == 0 {
90        return false;
91    }
92    let used = total.saturating_sub(free);
93    let used_pct = used as f64 / total as f64 * 100.0;
94    if used_pct >= critical_pct as f64 {
95        let should_emit = last_emit.is_none_or(|t| t.elapsed() >= DEBOUNCE);
96        if should_emit {
97            let threshold_bytes = (total as f64 * ((100 - critical_pct) as f64 / 100.0)) as u64;
98            OperatorEvent::DiskSpaceCritical {
99                path: path.to_string_lossy().into_owned(),
100                available_bytes: free,
101                threshold_bytes,
102            }
103            .emit_global();
104            *last_emit = Some(Instant::now());
105        }
106        return true;
107    }
108    false
109}
110
111fn disk_free_total(path: &Path) -> Option<(u64, u64)> {
112    let free = fs2::free_space(path).ok()?;
113    let total = fs2::total_space(path).ok()?;
114    Some((free, total))
115}
116
117// ---------------------------------------------------------------------------
118// Linux: fanotify path
119// ---------------------------------------------------------------------------
120
121#[cfg(target_os = "linux")]
122async fn run_fanotify(path: &Path, critical_pct: u8) -> bool {
123    match FanotifyWatcher::open(path) {
124        Ok(watcher) => {
125            let mut last_emit: Option<Instant> = None;
126            watcher.run_loop(path, critical_pct, &mut last_emit).await;
127            true
128        }
129        Err(_) => false,
130    }
131}
132
133#[cfg(target_os = "linux")]
134struct FanotifyWatcher {
135    fd: libc::c_int,
136}
137
138#[cfg(target_os = "linux")]
139impl FanotifyWatcher {
140    fn open(path: &Path) -> Result<Self, ()> {
141        use std::ffi::CString;
142        use std::os::unix::ffi::OsStrExt;
143
144        // FAN_CLOEXEC | FAN_CLASS_NOTIF
145        let fd = unsafe {
146            libc::fanotify_init(
147                libc::FAN_CLOEXEC | libc::FAN_CLASS_NOTIF,
148                libc::O_RDONLY as libc::c_uint,
149            )
150        };
151        if fd < 0 {
152            return Err(());
153        }
154
155        let path_cstr = match CString::new(path.as_os_str().as_bytes()) {
156            Ok(s) => s,
157            Err(_) => {
158                unsafe { libc::close(fd) };
159                return Err(());
160            }
161        };
162
163        // Watch FAN_CLOSE_WRITE on the directory (mark the mount).
164        let rc = unsafe {
165            libc::fanotify_mark(
166                fd,
167                libc::FAN_MARK_ADD | libc::FAN_MARK_MOUNT,
168                libc::FAN_CLOSE_WRITE,
169                libc::AT_FDCWD,
170                path_cstr.as_ptr(),
171            )
172        };
173        if rc < 0 {
174            unsafe { libc::close(fd) };
175            return Err(());
176        }
177
178        Ok(Self { fd })
179    }
180
181    /// Block-read fanotify events using a background blocking thread so the
182    /// tokio executor doesn't stall. Each event wakes a check.
183    async fn run_loop(&self, path: &Path, critical_pct: u8, last_emit: &mut Option<Instant>) {
184        let fd = self.fd;
185        let path = path.to_path_buf();
186
187        // Channel: blocking reader → async checker.
188        let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(64);
189
190        // Blocking thread reads fanotify events. It doesn't need the
191        // event data — the occurrence is enough to trigger a statvfs check.
192        std::thread::spawn(move || {
193            let mut buf = [0u8; 4096];
194            loop {
195                let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
196                if n <= 0 {
197                    break;
198                }
199                // If receiver is gone, stop.
200                if tx.blocking_send(()).is_err() {
201                    break;
202                }
203            }
204        });
205
206        while rx.recv().await.is_some() {
207            check(&path, critical_pct, last_emit);
208        }
209    }
210}
211
212#[cfg(target_os = "linux")]
213impl Drop for FanotifyWatcher {
214    fn drop(&mut self) {
215        unsafe { libc::close(self.fd) };
216    }
217}
218
219// ---------------------------------------------------------------------------
220// Polling fallback (non-Linux or when fanotify is unavailable)
221// ---------------------------------------------------------------------------
222
223async fn run_poll(path: &Path, critical_pct: u8) {
224    let mut last_emit: Option<Instant> = None;
225    let mut interval = tokio::time::interval(POLL_INTERVAL);
226    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
227    loop {
228        interval.tick().await;
229        check(path, critical_pct, &mut last_emit);
230    }
231}
232
233// ---------------------------------------------------------------------------
234// Tests
235// ---------------------------------------------------------------------------
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use std::time::Instant;
241
242    #[test]
243    fn check_no_emit_below_threshold() {
244        // Tmp dir exists, so disk_free_total returns real values. With a 99%
245        // threshold on a host that isn't completely full this should be false.
246        let path = std::env::temp_dir();
247        let mut last: Option<Instant> = None;
248        // threshold=99 → only fires when disk is ≥99% full, extremely unlikely
249        let fired = check(&path, 99, &mut last);
250        // We can't assert false in CI (disk could be full), but last_emit
251        // shouldn't advance unless fired.
252        if !fired {
253            assert!(last.is_none());
254        }
255    }
256
257    #[test]
258    fn check_threshold_zero_excluded_by_clamp() {
259        // clamp(0, 1, 99) → 1, which is always ≥1% used → fires on any non-empty disk
260        let monitor = DiskSpaceMonitor::new("/tmp", 0);
261        assert_eq!(monitor.critical_pct, 1);
262    }
263
264    #[test]
265    fn check_threshold_100_excluded_by_clamp() {
266        let monitor = DiskSpaceMonitor::new("/tmp", 100);
267        assert_eq!(monitor.critical_pct, 99);
268    }
269
270    #[test]
271    fn debounce_suppresses_second_emit() {
272        // Simulate two consecutive calls when disk is "full" by passing a
273        // synthetic path check via a local helper.
274        let mut last: Option<Instant> = Some(Instant::now()); // pretend just emitted
275                                                              // disk_free_total("/nonexistent") → None → check returns false, no emit
276        let fired = check(Path::new("/nonexistent-path-for-test"), 1, &mut last);
277        assert!(!fired); // can't get disk stats for nonexistent path
278    }
279
280    #[test]
281    fn disk_free_total_returns_values_for_tmp() {
282        let result = disk_free_total(Path::new("/tmp"));
283        assert!(result.is_some(), "statvfs /tmp should succeed");
284        let (free, total) = result.unwrap();
285        assert!(total > 0);
286        assert!(free <= total);
287    }
288}