1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
//! DiskSpaceMonitor — edge-triggered disk space watchdog.
//!
//! Linux: opens a fanotify group watching FAN_CLOSE_WRITE on the
//! data directory's mount point. Every write event triggers a
//! statvfs check; if used% ≥ threshold and the debounce window has
//! cleared, emits OperatorEvent::DiskSpaceCritical. Falls back to
//! polling when fanotify_init returns EPERM (unprivileged container).
//!
//! Non-Linux: polls via a tokio timer at POLL_INTERVAL.
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use crate::telemetry::operator_event::OperatorEvent;
/// Debounce window: don't re-emit within this duration after the last emit.
const DEBOUNCE: Duration = Duration::from_secs(30);
/// Poll interval for the non-fanotify fallback path (non-Linux or EPERM).
const POLL_INTERVAL: Duration = Duration::from_secs(30);
/// Background disk-space watchdog. Spawn with [`DiskSpaceMonitor::spawn`].
pub struct DiskSpaceMonitor {
path: PathBuf,
/// 1–99. Default 90 (= emit when used% ≥ 90).
critical_pct: u8,
}
impl DiskSpaceMonitor {
pub fn new(path: impl Into<PathBuf>, critical_pct: u8) -> Self {
Self {
path: path.into(),
critical_pct: critical_pct.clamp(1, 99),
}
}
pub fn with_default_threshold(path: impl Into<PathBuf>) -> Self {
Self::new(path, 90)
}
/// Spawn the monitor as detached background work. When the caller is inside
/// a Tokio runtime this uses that runtime; otherwise it creates a small
/// current-thread runtime for the monitor. The monitor is expected to live
/// for the full server lifetime, so no cancellation handle is exposed.
pub fn spawn(self) {
let path = self.path;
let critical_pct = self.critical_pct;
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(run(path, critical_pct));
return;
}
std::thread::Builder::new()
.name("reddb-disk-space-monitor".into())
.spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("disk space monitor runtime");
runtime.block_on(run(path, critical_pct));
})
.expect("disk space monitor thread spawn");
}
}
async fn run(path: PathBuf, critical_pct: u8) {
#[cfg(target_os = "linux")]
{
if run_fanotify(&path, critical_pct).await {
return;
}
// fanotify failed (EPERM / unsupported kernel) — fall through to poll.
}
run_poll(&path, critical_pct).await;
}
// ---------------------------------------------------------------------------
// Shared: check disk usage and conditionally emit
// ---------------------------------------------------------------------------
/// Returns `true` if used% ≥ `critical_pct` and the event was (or would be)
/// emitted. `last_emit` is updated on each actual emission.
fn check(path: &Path, critical_pct: u8, last_emit: &mut Option<Instant>) -> bool {
let (free, total) = match disk_free_total(path) {
Some(pair) => pair,
None => return false,
};
if total == 0 {
return false;
}
let used = total.saturating_sub(free);
let used_pct = used as f64 / total as f64 * 100.0;
if used_pct >= critical_pct as f64 {
let should_emit = last_emit.is_none_or(|t| t.elapsed() >= DEBOUNCE);
if should_emit {
let threshold_bytes = (total as f64 * ((100 - critical_pct) as f64 / 100.0)) as u64;
OperatorEvent::DiskSpaceCritical {
path: path.to_string_lossy().into_owned(),
available_bytes: free,
threshold_bytes,
}
.emit_global();
*last_emit = Some(Instant::now());
}
return true;
}
false
}
fn disk_free_total(path: &Path) -> Option<(u64, u64)> {
let free = fs2::free_space(path).ok()?;
let total = fs2::total_space(path).ok()?;
Some((free, total))
}
// ---------------------------------------------------------------------------
// Linux: fanotify path
// ---------------------------------------------------------------------------
#[cfg(target_os = "linux")]
async fn run_fanotify(path: &Path, critical_pct: u8) -> bool {
match FanotifyWatcher::open(path) {
Ok(watcher) => {
let mut last_emit: Option<Instant> = None;
watcher.run_loop(path, critical_pct, &mut last_emit).await;
true
}
Err(_) => false,
}
}
#[cfg(target_os = "linux")]
struct FanotifyWatcher {
fd: libc::c_int,
}
#[cfg(target_os = "linux")]
impl FanotifyWatcher {
fn open(path: &Path) -> Result<Self, ()> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
// FAN_CLOEXEC | FAN_CLASS_NOTIF
let fd = unsafe {
libc::fanotify_init(
libc::FAN_CLOEXEC | libc::FAN_CLASS_NOTIF,
libc::O_RDONLY as libc::c_uint,
)
};
if fd < 0 {
return Err(());
}
let path_cstr = match CString::new(path.as_os_str().as_bytes()) {
Ok(s) => s,
Err(_) => {
unsafe { libc::close(fd) };
return Err(());
}
};
// Watch FAN_CLOSE_WRITE on the directory (mark the mount).
let rc = unsafe {
libc::fanotify_mark(
fd,
libc::FAN_MARK_ADD | libc::FAN_MARK_MOUNT,
libc::FAN_CLOSE_WRITE,
libc::AT_FDCWD,
path_cstr.as_ptr(),
)
};
if rc < 0 {
unsafe { libc::close(fd) };
return Err(());
}
Ok(Self { fd })
}
/// Block-read fanotify events using a background blocking thread so the
/// tokio executor doesn't stall. Each event wakes a check.
async fn run_loop(&self, path: &Path, critical_pct: u8, last_emit: &mut Option<Instant>) {
let fd = self.fd;
let path = path.to_path_buf();
// Channel: blocking reader → async checker.
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(64);
// Blocking thread reads fanotify events. It doesn't need the
// event data — the occurrence is enough to trigger a statvfs check.
std::thread::spawn(move || {
let mut buf = [0u8; 4096];
loop {
let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
if n <= 0 {
break;
}
// If receiver is gone, stop.
if tx.blocking_send(()).is_err() {
break;
}
}
});
while rx.recv().await.is_some() {
check(&path, critical_pct, last_emit);
}
}
}
#[cfg(target_os = "linux")]
impl Drop for FanotifyWatcher {
fn drop(&mut self) {
unsafe { libc::close(self.fd) };
}
}
// ---------------------------------------------------------------------------
// Polling fallback (non-Linux or when fanotify is unavailable)
// ---------------------------------------------------------------------------
async fn run_poll(path: &Path, critical_pct: u8) {
let mut last_emit: Option<Instant> = None;
let mut interval = tokio::time::interval(POLL_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick().await;
check(path, critical_pct, &mut last_emit);
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn check_no_emit_below_threshold() {
// Tmp dir exists, so disk_free_total returns real values. With a 99%
// threshold on a host that isn't completely full this should be false.
let path = std::env::temp_dir();
let mut last: Option<Instant> = None;
// threshold=99 → only fires when disk is ≥99% full, extremely unlikely
let fired = check(&path, 99, &mut last);
// We can't assert false in CI (disk could be full), but last_emit
// shouldn't advance unless fired.
if !fired {
assert!(last.is_none());
}
}
#[test]
fn check_threshold_zero_excluded_by_clamp() {
// clamp(0, 1, 99) → 1, which is always ≥1% used → fires on any non-empty disk
let monitor = DiskSpaceMonitor::new("/tmp", 0);
assert_eq!(monitor.critical_pct, 1);
}
#[test]
fn check_threshold_100_excluded_by_clamp() {
let monitor = DiskSpaceMonitor::new("/tmp", 100);
assert_eq!(monitor.critical_pct, 99);
}
#[test]
fn debounce_suppresses_second_emit() {
// Simulate two consecutive calls when disk is "full" by passing a
// synthetic path check via a local helper.
let mut last: Option<Instant> = Some(Instant::now()); // pretend just emitted
// disk_free_total("/nonexistent") → None → check returns false, no emit
let fired = check(Path::new("/nonexistent-path-for-test"), 1, &mut last);
assert!(!fired); // can't get disk stats for nonexistent path
}
#[test]
fn disk_free_total_returns_values_for_tmp() {
let result = disk_free_total(Path::new("/tmp"));
assert!(result.is_some(), "statvfs /tmp should succeed");
let (free, total) = result.unwrap();
assert!(total > 0);
assert!(free <= total);
}
}