zeph_core/
config_watcher.rs1use std::path::Path;
5use std::time::Duration;
6
7use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
8use tokio::sync::mpsc;
9
10pub enum ConfigEvent {
11 Changed,
12}
13
14pub struct ConfigWatcher {
15 _handle: tokio::task::JoinHandle<()>,
16}
17
18impl ConfigWatcher {
19 pub fn start(path: &Path, tx: mpsc::Sender<ConfigEvent>) -> anyhow::Result<Self> {
29 let dir = path
30 .parent()
31 .ok_or_else(|| anyhow::anyhow!("config path has no parent directory"))?
32 .to_path_buf();
33 let filename = path
34 .file_name()
35 .ok_or_else(|| anyhow::anyhow!("config path has no filename"))?
36 .to_os_string();
37
38 let (notify_tx, mut notify_rx) = mpsc::channel(16);
39
40 let mut debouncer = new_debouncer(
41 Duration::from_millis(500),
42 move |events: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
43 let events = match events {
44 Ok(events) => events,
45 Err(e) => {
46 tracing::warn!("config watcher error: {e}");
47 return;
48 }
49 };
50
51 let has_change = events.iter().any(|e| {
52 e.kind == DebouncedEventKind::Any
53 && e.path.file_name().is_some_and(|n| n == filename)
54 });
55
56 if has_change {
57 let _ = notify_tx.blocking_send(());
58 }
59 },
60 )?;
61
62 debouncer
63 .watcher()
64 .watch(&dir, notify::RecursiveMode::NonRecursive)?;
65
66 let handle = tokio::spawn(async move {
67 let _debouncer = debouncer;
68 while notify_rx.recv().await.is_some() {
69 if tx.send(ConfigEvent::Changed).await.is_err() {
70 break;
71 }
72 }
73 });
74
75 Ok(Self { _handle: handle })
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82
83 #[tokio::test]
84 async fn start_with_valid_config_file() {
85 let dir = tempfile::tempdir().unwrap();
86 let config_path = dir.path().join("config.toml");
87 std::fs::write(&config_path, "key = 1").unwrap();
88 let (tx, _rx) = mpsc::channel(16);
89 let watcher = ConfigWatcher::start(&config_path, tx);
90 assert!(watcher.is_ok());
91 }
92
93 #[tokio::test]
94 async fn start_with_nonexistent_parent_fails() {
95 let (tx, _rx) = mpsc::channel(16);
96 let result = ConfigWatcher::start(Path::new("/nonexistent/dir/config.toml"), tx);
97 assert!(result.is_err());
98 }
99
100 #[tokio::test]
101 async fn detects_config_file_change() {
102 let dir = tempfile::tempdir().unwrap();
103 let config_path = dir.path().join("config.toml");
104 std::fs::write(&config_path, "initial = true").unwrap();
105
106 let (tx, mut rx) = mpsc::channel(16);
107 let _watcher = ConfigWatcher::start(&config_path, tx).unwrap();
108
109 tokio::time::sleep(Duration::from_millis(100)).await;
110 std::fs::write(&config_path, "updated = true").unwrap();
111
112 let result = tokio::time::timeout(Duration::from_secs(3), rx.recv()).await;
113 assert!(
114 result.is_ok(),
115 "expected ConfigEvent::Changed within timeout"
116 );
117 }
118
119 #[tokio::test]
120 async fn ignores_other_files_in_directory() {
121 let dir = tempfile::tempdir().unwrap();
122 let config_path = dir.path().join("config.toml");
123 std::fs::write(&config_path, "key = 1").unwrap();
124
125 let (tx, mut rx) = mpsc::channel(16);
126 let _watcher = ConfigWatcher::start(&config_path, tx).unwrap();
127
128 tokio::time::sleep(Duration::from_millis(800)).await;
130 while rx.try_recv().is_ok() {}
131
132 let other_path = dir.path().join("other.txt");
133 std::fs::write(&other_path, "content").unwrap();
134
135 let result = tokio::time::timeout(Duration::from_millis(1500), rx.recv()).await;
136 assert!(
137 result.is_err(),
138 "should not receive event for non-config file"
139 );
140 }
141}