zeph_core/
config_watcher.rs1use std::path::Path;
5use std::time::Duration;
6
7use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
8use tokio::sync::mpsc;
9
10#[derive(Clone)]
11pub enum ConfigEvent {
12 Changed,
13}
14
15pub struct ConfigWatcher {
16 _handle: tokio::task::JoinHandle<()>,
17}
18
19impl ConfigWatcher {
20 pub fn start(path: &Path, tx: mpsc::Sender<ConfigEvent>) -> anyhow::Result<Self> {
30 let dir = path
31 .parent()
32 .ok_or_else(|| anyhow::anyhow!("config path has no parent directory"))?
33 .to_path_buf();
34 let filename = path
35 .file_name()
36 .ok_or_else(|| anyhow::anyhow!("config path has no filename"))?
37 .to_os_string();
38
39 let (notify_tx, mut notify_rx) = mpsc::channel(16);
40
41 let mut debouncer = new_debouncer(
42 Duration::from_millis(500),
43 move |events: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
44 let events = match events {
45 Ok(events) => events,
46 Err(e) => {
47 tracing::warn!("config watcher error: {e}");
48 return;
49 }
50 };
51
52 let has_change = events.iter().any(|e| {
53 e.kind == DebouncedEventKind::Any
54 && e.path.file_name().is_some_and(|n| n == filename)
55 });
56
57 if has_change {
58 let _ = notify_tx.blocking_send(());
59 }
60 },
61 )?;
62
63 debouncer
64 .watcher()
65 .watch(&dir, notify::RecursiveMode::NonRecursive)?;
66
67 let handle = tokio::spawn(async move {
68 let _debouncer = debouncer;
69 while notify_rx.recv().await.is_some() {
70 if tx.send(ConfigEvent::Changed).await.is_err() {
71 break;
72 }
73 }
74 });
75
76 Ok(Self { _handle: handle })
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83
84 #[tokio::test]
85 async fn start_with_valid_config_file() {
86 let dir = tempfile::tempdir().unwrap();
87 let config_path = dir.path().join("config.toml");
88 std::fs::write(&config_path, "key = 1").unwrap();
89 let (tx, _rx) = mpsc::channel(16);
90 let watcher = ConfigWatcher::start(&config_path, tx);
91 assert!(watcher.is_ok());
92 }
93
94 #[tokio::test]
95 async fn start_with_nonexistent_parent_fails() {
96 let (tx, _rx) = mpsc::channel(16);
97 let result = ConfigWatcher::start(Path::new("/nonexistent/dir/config.toml"), tx);
98 assert!(result.is_err());
99 }
100
101 #[tokio::test]
102 async fn detects_config_file_change() {
103 let dir = tempfile::tempdir().unwrap();
104 let config_path = dir.path().join("config.toml");
105 std::fs::write(&config_path, "initial = true").unwrap();
106
107 let (tx, mut rx) = mpsc::channel(16);
108 let _watcher = ConfigWatcher::start(&config_path, tx).unwrap();
109
110 tokio::time::sleep(Duration::from_millis(100)).await;
111 std::fs::write(&config_path, "updated = true").unwrap();
112
113 let result = tokio::time::timeout(Duration::from_secs(3), rx.recv()).await;
114 assert!(
115 result.is_ok(),
116 "expected ConfigEvent::Changed within timeout"
117 );
118 }
119
120 #[tokio::test]
121 async fn ignores_other_files_in_directory() {
122 let dir = tempfile::tempdir().unwrap();
123 let config_path = dir.path().join("config.toml");
124 std::fs::write(&config_path, "key = 1").unwrap();
125
126 let (tx, mut rx) = mpsc::channel(16);
127 let _watcher = ConfigWatcher::start(&config_path, tx).unwrap();
128
129 tokio::time::sleep(Duration::from_millis(800)).await;
131 while rx.try_recv().is_ok() {}
132
133 let other_path = dir.path().join("other.txt");
134 std::fs::write(&other_path, "content").unwrap();
135
136 let result = tokio::time::timeout(Duration::from_millis(1500), rx.recv()).await;
137 assert!(
138 result.is_err(),
139 "should not receive event for non-config file"
140 );
141 }
142}