reddb_server/runtime/
config_watcher.rs1use std::path::PathBuf;
20use std::sync::Arc;
21use std::time::Duration;
22
23use crate::serde_json::Value as JsonValue;
24use crate::storage::UnifiedStore;
25
26const POLL_INTERVAL: Duration = Duration::from_secs(5);
27
28const HOT_RELOAD_WHITELIST: &[&str] = &[
30 "red.logging.level",
31 "red.logging.format",
32 "red.logging.keep_days",
33 "red.logging.dir",
34 "red.logging.file_prefix",
35 "slow_query.threshold_ms",
36 "slow_query.sample_pct",
37 "disk_space.critical_pct",
38];
39
40pub struct ConfigWatcher {
42 path: PathBuf,
43 store: Arc<UnifiedStore>,
44}
45
46impl ConfigWatcher {
47 pub fn new(path: impl Into<PathBuf>, store: Arc<UnifiedStore>) -> Self {
48 Self {
49 path: path.into(),
50 store,
51 }
52 }
53
54 pub fn spawn(self) -> std::thread::JoinHandle<()> {
57 std::thread::Builder::new()
58 .name("reddb-config-watcher".into())
59 .spawn(move || run(self.path, self.store))
60 .expect("config watcher thread spawn")
61 }
62}
63
64fn run(path: PathBuf, store: Arc<UnifiedStore>) {
65 #[cfg(target_os = "linux")]
66 {
67 if run_inotify(&path, &store) {
68 return;
69 }
70 }
72 run_poll(&path, &store);
73}
74
75#[cfg(target_os = "linux")]
80fn run_inotify(path: &std::path::Path, store: &UnifiedStore) -> bool {
81 use std::ffi::CString;
82 use std::os::unix::ffi::OsStrExt;
83
84 let fd = unsafe { libc::inotify_init1(libc::O_CLOEXEC) };
85 if fd < 0 {
86 return false;
87 }
88
89 let dir = path
90 .parent()
91 .filter(|d| !d.as_os_str().is_empty())
92 .map(|d| d.to_path_buf())
93 .unwrap_or_else(|| PathBuf::from("."));
94
95 let file_name = match path.file_name() {
96 Some(n) => n.to_os_string(),
97 None => {
98 unsafe { libc::close(fd) };
99 return false;
100 }
101 };
102
103 let dir_cstr = match CString::new(dir.as_os_str().as_bytes()) {
104 Ok(s) => s,
105 Err(_) => {
106 unsafe { libc::close(fd) };
107 return false;
108 }
109 };
110
111 let mask = libc::IN_CLOSE_WRITE | libc::IN_MOVED_TO;
113 let wd = unsafe { libc::inotify_add_watch(fd, dir_cstr.as_ptr(), mask) };
114 if wd < 0 {
115 unsafe { libc::close(fd) };
116 return false;
117 }
118
119 let mut buf = vec![0u8; 4096];
121 loop {
122 let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
123 if n <= 0 {
124 break;
125 }
126 let mut offset = 0usize;
127 let n = n as usize;
128 while offset + 16 <= n {
129 let len = u32::from_ne_bytes([
131 buf[offset + 12],
132 buf[offset + 13],
133 buf[offset + 14],
134 buf[offset + 15],
135 ]) as usize;
136 let name_end = offset + 16 + len;
137 if name_end > n {
138 break;
139 }
140 if len > 0 {
141 let name_bytes = &buf[offset + 16..name_end];
142 let nul = name_bytes
143 .iter()
144 .position(|&b| b == 0)
145 .unwrap_or(name_bytes.len());
146 let name = std::ffi::OsStr::from_bytes(&name_bytes[..nul]);
147 if name == file_name.as_os_str() {
148 apply_hot_reload(path, store);
149 }
150 }
151 offset = name_end;
152 }
153 }
154
155 unsafe { libc::close(fd) };
156 true
157}
158
159fn run_poll(path: &std::path::Path, store: &UnifiedStore) {
164 let mut last_mtime: Option<std::time::SystemTime> = None;
165 loop {
166 std::thread::sleep(POLL_INTERVAL);
167 let mtime = path.metadata().ok().and_then(|m| m.modified().ok());
168 if mtime.is_some() && mtime != last_mtime {
169 if last_mtime.is_some() {
170 apply_hot_reload(path, store);
172 }
173 last_mtime = mtime;
174 }
175 }
176}
177
178fn apply_hot_reload(path: &std::path::Path, store: &UnifiedStore) {
183 let raw = match std::fs::read_to_string(path) {
184 Ok(s) => s,
185 Err(err) => {
186 tracing::warn!(path = %path.display(), error = %err, "config watcher: read failed");
187 return;
188 }
189 };
190
191 let parsed: JsonValue = match crate::serde_json::from_str(&raw) {
193 Ok(v) => v,
194 Err(err) => {
195 tracing::warn!(
196 path = %path.display(),
197 error = %err,
198 "config watcher: JSON parse failed — not applying"
199 );
200 return;
201 }
202 };
203 let JsonValue::Object(_) = &parsed else {
204 tracing::warn!(
205 path = %path.display(),
206 "config watcher: root must be JSON object — not applying"
207 );
208 return;
209 };
210
211 let mut flat: Vec<(String, JsonValue)> = Vec::new();
212 flatten_json("", &parsed, &mut flat);
213
214 let changed_by = format!("config_watcher::{}", path.display());
215 let mut non_hot: Vec<String> = Vec::new();
216
217 for (key, new_json) in flat {
218 let new_str = json_to_str(&new_json);
219 if HOT_RELOAD_WHITELIST.contains(&key.as_str()) {
220 let old_str = store
221 .get_config(&key)
222 .as_ref()
223 .map(schema_value_to_str)
224 .unwrap_or_default();
225 if old_str == new_str {
226 continue;
227 }
228 store.set_config_tree(&key, &new_json);
229 crate::telemetry::operator_event::OperatorEvent::ConfigChanged {
230 key,
231 old_value: old_str,
232 new_value: new_str,
233 changed_by: changed_by.clone(),
234 }
235 .emit_global();
236 } else {
237 let old_str = store
239 .get_config(&key)
240 .as_ref()
241 .map(schema_value_to_str)
242 .unwrap_or_default();
243 if old_str != new_str {
244 non_hot.push(key);
245 }
246 }
247 }
248
249 if !non_hot.is_empty() {
250 crate::telemetry::operator_event::OperatorEvent::ConfigChangeRequiresRestart {
251 fields_changed: non_hot.join(", "),
252 }
253 .emit_global();
254 }
255}
256
257fn json_to_str(v: &JsonValue) -> String {
258 match v {
259 JsonValue::String(s) => s.clone(),
260 other => other.to_string(),
261 }
262}
263
264fn schema_value_to_str(v: &crate::storage::schema::Value) -> String {
265 format!("{v}")
266}
267
268fn flatten_json(prefix: &str, value: &JsonValue, out: &mut Vec<(String, JsonValue)>) {
270 match value {
271 JsonValue::Object(map) => {
272 for (k, v) in map {
273 let key = if prefix.is_empty() {
274 k.clone()
275 } else {
276 format!("{prefix}.{k}")
277 };
278 flatten_json(&key, v, out);
279 }
280 }
281 _ if !prefix.is_empty() => out.push((prefix.to_string(), value.clone())),
282 _ => {}
283 }
284}
285
286#[cfg(test)]
291mod tests {
292 use super::*;
293
294 #[test]
295 fn hot_reload_whitelist_contains_expected_keys() {
296 assert!(HOT_RELOAD_WHITELIST.contains(&"red.logging.level"));
297 assert!(HOT_RELOAD_WHITELIST.contains(&"red.logging.format"));
298 assert!(HOT_RELOAD_WHITELIST.contains(&"slow_query.threshold_ms"));
299 assert!(HOT_RELOAD_WHITELIST.contains(&"slow_query.sample_pct"));
300 assert!(HOT_RELOAD_WHITELIST.contains(&"disk_space.critical_pct"));
301 }
302
303 #[test]
304 fn flatten_json_produces_dotted_keys() {
305 let raw = r#"{"red":{"logging":{"level":"info","format":"json"}},"slow_query":{"threshold_ms":500}}"#;
306 let json: JsonValue = crate::serde_json::from_str(raw).unwrap();
307 let mut flat = Vec::new();
308 flatten_json("", &json, &mut flat);
309 let keys: Vec<&str> = flat.iter().map(|(k, _)| k.as_str()).collect();
310 assert!(keys.contains(&"red.logging.level"));
311 assert!(keys.contains(&"red.logging.format"));
312 assert!(keys.contains(&"slow_query.threshold_ms"));
313 }
314
315 #[test]
316 fn flatten_json_nested_object() {
317 let raw = r#"{"a":{"b":1}}"#;
318 let json: JsonValue = crate::serde_json::from_str(raw).unwrap();
319 let mut out = Vec::new();
320 flatten_json("", &json, &mut out);
321 assert_eq!(out.len(), 1);
322 assert_eq!(out[0].0, "a.b");
323 }
324
325 #[test]
326 fn json_to_str_unquotes_strings() {
327 assert_eq!(json_to_str(&JsonValue::String("info".into())), "info");
328 let raw = r#"{"n":42,"b":true}"#;
330 let v: JsonValue = crate::serde_json::from_str(raw).unwrap();
331 if let JsonValue::Object(map) = v {
332 assert_eq!(json_to_str(map.get("n").unwrap()), "42");
333 assert_eq!(json_to_str(map.get("b").unwrap()), "true");
334 } else {
335 panic!("expected object");
336 }
337 }
338
339 #[test]
340 fn schema_value_to_str_uses_display() {
341 use crate::storage::schema::Value;
342 assert_eq!(schema_value_to_str(&Value::Integer(-1)), "-1");
343 assert_eq!(schema_value_to_str(&Value::UnsignedInteger(42)), "42");
344 assert_eq!(schema_value_to_str(&Value::Boolean(true)), "true");
345 }
346}