zeph_core/
config_watcher.rs1use std::path::Path;
5use std::time::Duration;
6
7use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
8use tokio::sync::mpsc;
9
10#[derive(Debug, thiserror::Error)]
11pub enum ConfigWatcherError {
12 #[error("config path has no parent directory")]
13 NoParentDir,
14
15 #[error("config path has no filename")]
16 NoFilename,
17
18 #[error("filesystem watcher error: {0}")]
19 Notify(#[from] notify::Error),
20}
21
22#[derive(Clone)]
23pub enum ConfigEvent {
24 Changed,
25}
26
27pub struct ConfigWatcher {
28 _handle: tokio::task::JoinHandle<()>,
29}
30
31impl ConfigWatcher {
32 pub fn start(path: &Path, tx: mpsc::Sender<ConfigEvent>) -> Result<Self, ConfigWatcherError> {
42 let dir = path
43 .parent()
44 .ok_or(ConfigWatcherError::NoParentDir)?
45 .to_path_buf();
46 let filename = path
47 .file_name()
48 .ok_or(ConfigWatcherError::NoFilename)?
49 .to_os_string();
50
51 let (notify_tx, mut notify_rx) = mpsc::channel(16);
52
53 let mut debouncer = new_debouncer(
54 Duration::from_millis(500),
55 move |events: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
56 let events = match events {
57 Ok(events) => events,
58 Err(e) => {
59 tracing::warn!("config watcher error: {e}");
60 return;
61 }
62 };
63
64 let has_change = events.iter().any(|e| {
65 e.kind == DebouncedEventKind::Any
66 && e.path.file_name().is_some_and(|n| n == filename)
67 });
68
69 if has_change {
70 let _ = notify_tx.blocking_send(());
71 }
72 },
73 )?;
74
75 debouncer
76 .watcher()
77 .watch(&dir, notify::RecursiveMode::NonRecursive)?;
78
79 let handle = tokio::spawn(async move {
80 let _debouncer = debouncer;
81 while notify_rx.recv().await.is_some() {
82 if tx.send(ConfigEvent::Changed).await.is_err() {
83 break;
84 }
85 }
86 });
87
88 Ok(Self { _handle: handle })
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95
96 #[tokio::test]
97 async fn start_with_valid_config_file() {
98 let dir = tempfile::tempdir().unwrap();
99 let config_path = dir.path().join("config.toml");
100 std::fs::write(&config_path, "key = 1").unwrap();
101 let (tx, _rx) = mpsc::channel(16);
102 let watcher = ConfigWatcher::start(&config_path, tx);
103 assert!(watcher.is_ok());
104 }
105
106 #[tokio::test]
107 async fn start_with_nonexistent_parent_fails() {
108 let (tx, _rx) = mpsc::channel(16);
109 let result = ConfigWatcher::start(Path::new("/nonexistent/dir/config.toml"), tx);
110 assert!(result.is_err());
111 }
112
113 #[tokio::test]
114 async fn detects_config_file_change() {
115 let dir = tempfile::tempdir().unwrap();
116 let config_path = dir.path().join("config.toml");
117 std::fs::write(&config_path, "initial = true").unwrap();
118
119 let (tx, mut rx) = mpsc::channel(16);
120 let _watcher = ConfigWatcher::start(&config_path, tx).unwrap();
121
122 tokio::time::sleep(Duration::from_millis(100)).await;
123 std::fs::write(&config_path, "updated = true").unwrap();
124
125 let result = tokio::time::timeout(Duration::from_secs(3), rx.recv()).await;
126 assert!(
127 result.is_ok(),
128 "expected ConfigEvent::Changed within timeout"
129 );
130 }
131
132 #[tokio::test]
133 async fn ignores_other_files_in_directory() {
134 let dir = tempfile::tempdir().unwrap();
135 let config_path = dir.path().join("config.toml");
136 std::fs::write(&config_path, "key = 1").unwrap();
137
138 let (tx, mut rx) = mpsc::channel(16);
139 let _watcher = ConfigWatcher::start(&config_path, tx).unwrap();
140
141 tokio::time::sleep(Duration::from_millis(800)).await;
143 while rx.try_recv().is_ok() {}
144
145 let other_path = dir.path().join("other.txt");
146 std::fs::write(&other_path, "content").unwrap();
147
148 let result = tokio::time::timeout(Duration::from_millis(1500), rx.recv()).await;
149 assert!(
150 result.is_err(),
151 "should not receive event for non-config file"
152 );
153 }
154}