Skip to main content

reddb_server/runtime/
config_watcher.rs

1//! Hot-reload watcher for the server config file.
2//!
3//! Watches the config file (default `/etc/reddb/config.json`, override
4//! via `REDDB_CONFIG_FILE`) for changes and applies hot-reloadable keys
5//! to `red_config` without a server restart.
6//!
7//! **Hot-reloadable:** `red.logging.*`, `slow_query.*`, `disk_space.critical_pct`.
8//!
9//! **Restart-required:** everything else. Detected changes to non-hot-reloadable
10//! fields emit `OperatorEvent::ConfigChangeRequiresRestart` and are NOT applied.
11//! Hot-reloadable fields in the same reload ARE applied.
12//!
13//! Atomicity: if JSON parse fails, nothing is applied (parse-then-apply).
14//!
15//! Linux: inotify on the parent directory catches atomic rename-swaps
16//! (vim's default save pattern: write temp → `rename(2)` → `IN_MOVED_TO`).
17//! Non-Linux: falls back to a 5-second poll loop.
18
19use std::path::PathBuf;
20use std::sync::Arc;
21use std::time::Duration;
22
23use crate::serde_json::Value as JsonValue;
24use crate::storage::UnifiedStore;
25
26const POLL_INTERVAL: Duration = Duration::from_secs(5);
27
28/// Fields updatable in a live process without restart.
29const HOT_RELOAD_WHITELIST: &[&str] = &[
30    "red.logging.level",
31    "red.logging.format",
32    "red.logging.keep_days",
33    "red.logging.dir",
34    "red.logging.file_prefix",
35    "slow_query.threshold_ms",
36    "slow_query.sample_pct",
37    "disk_space.critical_pct",
38];
39
40/// Background watcher that hot-reloads the server config file on change.
41pub struct ConfigWatcher {
42    path: PathBuf,
43    store: Arc<UnifiedStore>,
44}
45
46impl ConfigWatcher {
47    pub fn new(path: impl Into<PathBuf>, store: Arc<UnifiedStore>) -> Self {
48        Self {
49            path: path.into(),
50            store,
51        }
52    }
53
54    /// Spawn the watcher as a detached background thread (lives until
55    /// process exit — no cancellation handle needed).
56    pub fn spawn(self) -> std::thread::JoinHandle<()> {
57        std::thread::Builder::new()
58            .name("reddb-config-watcher".into())
59            .spawn(move || run(self.path, self.store))
60            .expect("config watcher thread spawn")
61    }
62}
63
64fn run(path: PathBuf, store: Arc<UnifiedStore>) {
65    #[cfg(target_os = "linux")]
66    {
67        if run_inotify(&path, &store) {
68            return;
69        }
70        // inotify_init1 or inotify_add_watch failed — fall through to poll.
71    }
72    run_poll(&path, &store);
73}
74
75// ---------------------------------------------------------------------------
76// Linux: inotify path (catches atomic rename-swaps)
77// ---------------------------------------------------------------------------
78
79#[cfg(target_os = "linux")]
80fn run_inotify(path: &std::path::Path, store: &UnifiedStore) -> bool {
81    use std::ffi::CString;
82    use std::os::unix::ffi::OsStrExt;
83
84    let fd = unsafe { libc::inotify_init1(libc::O_CLOEXEC) };
85    if fd < 0 {
86        return false;
87    }
88
89    let dir = path
90        .parent()
91        .filter(|d| !d.as_os_str().is_empty())
92        .map(|d| d.to_path_buf())
93        .unwrap_or_else(|| PathBuf::from("."));
94
95    let file_name = match path.file_name() {
96        Some(n) => n.to_os_string(),
97        None => {
98            unsafe { libc::close(fd) };
99            return false;
100        }
101    };
102
103    let dir_cstr = match CString::new(dir.as_os_str().as_bytes()) {
104        Ok(s) => s,
105        Err(_) => {
106            unsafe { libc::close(fd) };
107            return false;
108        }
109    };
110
111    // Watch for both direct writes and atomic renames (vim).
112    let mask = libc::IN_CLOSE_WRITE | libc::IN_MOVED_TO;
113    let wd = unsafe { libc::inotify_add_watch(fd, dir_cstr.as_ptr(), mask) };
114    if wd < 0 {
115        unsafe { libc::close(fd) };
116        return false;
117    }
118
119    // Blocking read loop — owns the fd until process exit or read error.
120    let mut buf = vec![0u8; 4096];
121    loop {
122        let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
123        if n <= 0 {
124            break;
125        }
126        let mut offset = 0usize;
127        let n = n as usize;
128        while offset + 16 <= n {
129            // inotify_event layout: wd(4) mask(4) cookie(4) len(4) name(len)
130            let len = u32::from_ne_bytes([
131                buf[offset + 12],
132                buf[offset + 13],
133                buf[offset + 14],
134                buf[offset + 15],
135            ]) as usize;
136            let name_end = offset + 16 + len;
137            if name_end > n {
138                break;
139            }
140            if len > 0 {
141                let name_bytes = &buf[offset + 16..name_end];
142                let nul = name_bytes
143                    .iter()
144                    .position(|&b| b == 0)
145                    .unwrap_or(name_bytes.len());
146                let name = std::ffi::OsStr::from_bytes(&name_bytes[..nul]);
147                if name == file_name.as_os_str() {
148                    apply_hot_reload(path, store);
149                }
150            }
151            offset = name_end;
152        }
153    }
154
155    unsafe { libc::close(fd) };
156    true
157}
158
159// ---------------------------------------------------------------------------
160// Polling fallback (non-Linux or when inotify is unavailable)
161// ---------------------------------------------------------------------------
162
163fn run_poll(path: &std::path::Path, store: &UnifiedStore) {
164    let mut last_mtime: Option<std::time::SystemTime> = None;
165    loop {
166        std::thread::sleep(POLL_INTERVAL);
167        let mtime = path.metadata().ok().and_then(|m| m.modified().ok());
168        if mtime.is_some() && mtime != last_mtime {
169            if last_mtime.is_some() {
170                // Only reload on actual changes, not the first observation.
171                apply_hot_reload(path, store);
172            }
173            last_mtime = mtime;
174        }
175    }
176}
177
178// ---------------------------------------------------------------------------
179// Hot-reload logic
180// ---------------------------------------------------------------------------
181
182fn apply_hot_reload(path: &std::path::Path, store: &UnifiedStore) {
183    let raw = match std::fs::read_to_string(path) {
184        Ok(s) => s,
185        Err(err) => {
186            tracing::warn!(path = %path.display(), error = %err, "config watcher: read failed");
187            return;
188        }
189    };
190
191    // Parse-then-apply: if parse fails, nothing is applied.
192    let parsed: JsonValue = match crate::serde_json::from_str(&raw) {
193        Ok(v) => v,
194        Err(err) => {
195            tracing::warn!(
196                path = %path.display(),
197                error = %err,
198                "config watcher: JSON parse failed — not applying"
199            );
200            return;
201        }
202    };
203    let JsonValue::Object(_) = &parsed else {
204        tracing::warn!(
205            path = %path.display(),
206            "config watcher: root must be JSON object — not applying"
207        );
208        return;
209    };
210
211    let mut flat: Vec<(String, JsonValue)> = Vec::new();
212    flatten_json("", &parsed, &mut flat);
213
214    let changed_by = format!("config_watcher::{}", path.display());
215    let mut non_hot: Vec<String> = Vec::new();
216
217    for (key, new_json) in flat {
218        let new_str = json_to_str(&new_json);
219        if HOT_RELOAD_WHITELIST.contains(&key.as_str()) {
220            let old_str = store
221                .get_config(&key)
222                .as_ref()
223                .map(schema_value_to_str)
224                .unwrap_or_default();
225            if old_str == new_str {
226                continue;
227            }
228            store.set_config_tree(&key, &new_json);
229            crate::telemetry::operator_event::OperatorEvent::ConfigChanged {
230                key,
231                old_value: old_str,
232                new_value: new_str,
233                changed_by: changed_by.clone(),
234            }
235            .emit_global();
236        } else {
237            // Non-hot-reloadable: warn if the value actually changed.
238            let old_str = store
239                .get_config(&key)
240                .as_ref()
241                .map(schema_value_to_str)
242                .unwrap_or_default();
243            if old_str != new_str {
244                non_hot.push(key);
245            }
246        }
247    }
248
249    if !non_hot.is_empty() {
250        crate::telemetry::operator_event::OperatorEvent::ConfigChangeRequiresRestart {
251            fields_changed: non_hot.join(", "),
252        }
253        .emit_global();
254    }
255}
256
257fn json_to_str(v: &JsonValue) -> String {
258    match v {
259        JsonValue::String(s) => s.clone(),
260        other => other.to_string(),
261    }
262}
263
264fn schema_value_to_str(v: &crate::storage::schema::Value) -> String {
265    format!("{v}")
266}
267
268/// Flatten a JSON object to dotted key-value pairs (mirrors `config_overlay`).
269fn flatten_json(prefix: &str, value: &JsonValue, out: &mut Vec<(String, JsonValue)>) {
270    match value {
271        JsonValue::Object(map) => {
272            for (k, v) in map {
273                let key = if prefix.is_empty() {
274                    k.clone()
275                } else {
276                    format!("{prefix}.{k}")
277                };
278                flatten_json(&key, v, out);
279            }
280        }
281        _ if !prefix.is_empty() => out.push((prefix.to_string(), value.clone())),
282        _ => {}
283    }
284}
285
286// ---------------------------------------------------------------------------
287// Tests
288// ---------------------------------------------------------------------------
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    fn hot_reload_whitelist_contains_expected_keys() {
296        assert!(HOT_RELOAD_WHITELIST.contains(&"red.logging.level"));
297        assert!(HOT_RELOAD_WHITELIST.contains(&"red.logging.format"));
298        assert!(HOT_RELOAD_WHITELIST.contains(&"slow_query.threshold_ms"));
299        assert!(HOT_RELOAD_WHITELIST.contains(&"slow_query.sample_pct"));
300        assert!(HOT_RELOAD_WHITELIST.contains(&"disk_space.critical_pct"));
301    }
302
303    #[test]
304    fn flatten_json_produces_dotted_keys() {
305        let raw = r#"{"red":{"logging":{"level":"info","format":"json"}},"slow_query":{"threshold_ms":500}}"#;
306        let json: JsonValue = crate::serde_json::from_str(raw).unwrap();
307        let mut flat = Vec::new();
308        flatten_json("", &json, &mut flat);
309        let keys: Vec<&str> = flat.iter().map(|(k, _)| k.as_str()).collect();
310        assert!(keys.contains(&"red.logging.level"));
311        assert!(keys.contains(&"red.logging.format"));
312        assert!(keys.contains(&"slow_query.threshold_ms"));
313    }
314
315    #[test]
316    fn flatten_json_nested_object() {
317        let raw = r#"{"a":{"b":1}}"#;
318        let json: JsonValue = crate::serde_json::from_str(raw).unwrap();
319        let mut out = Vec::new();
320        flatten_json("", &json, &mut out);
321        assert_eq!(out.len(), 1);
322        assert_eq!(out[0].0, "a.b");
323    }
324
325    #[test]
326    fn json_to_str_unquotes_strings() {
327        assert_eq!(json_to_str(&JsonValue::String("info".into())), "info");
328        // Numbers and bools render without quotes
329        let raw = r#"{"n":42,"b":true}"#;
330        let v: JsonValue = crate::serde_json::from_str(raw).unwrap();
331        if let JsonValue::Object(map) = v {
332            assert_eq!(json_to_str(map.get("n").unwrap()), "42");
333            assert_eq!(json_to_str(map.get("b").unwrap()), "true");
334        } else {
335            panic!("expected object");
336        }
337    }
338
339    #[test]
340    fn schema_value_to_str_uses_display() {
341        use crate::storage::schema::Value;
342        assert_eq!(schema_value_to_str(&Value::Integer(-1)), "-1");
343        assert_eq!(schema_value_to_str(&Value::UnsignedInteger(42)), "42");
344        assert_eq!(schema_value_to_str(&Value::Boolean(true)), "true");
345    }
346}