zeph_core/
config_watcher.rs1use std::path::Path;
5use std::time::Duration;
6
7use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
8use tokio::sync::mpsc;
9
10#[non_exhaustive]
11#[derive(Debug, thiserror::Error)]
12pub enum ConfigWatcherError {
13 #[error("config path has no parent directory")]
14 NoParentDir,
15
16 #[error("config path has no filename")]
17 NoFilename,
18
19 #[error("filesystem watcher error: {0}")]
20 Notify(#[from] notify::Error),
21}
22
23#[non_exhaustive]
24#[derive(Clone)]
25pub enum ConfigEvent {
26 Changed,
27}
28
29pub struct ConfigWatcher {
30 _handle: tokio::task::JoinHandle<()>,
31}
32
33impl ConfigWatcher {
34 pub fn start(path: &Path, tx: mpsc::Sender<ConfigEvent>) -> Result<Self, ConfigWatcherError> {
44 let dir = path
45 .parent()
46 .ok_or(ConfigWatcherError::NoParentDir)?
47 .to_path_buf();
48 let filename = path
49 .file_name()
50 .ok_or(ConfigWatcherError::NoFilename)?
51 .to_os_string();
52
53 let (notify_tx, mut notify_rx) = mpsc::channel(16);
54
55 let mut debouncer = new_debouncer(
56 Duration::from_millis(500),
57 move |events: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
58 let events = match events {
59 Ok(events) => events,
60 Err(e) => {
61 tracing::warn!("config watcher error: {e}");
62 return;
63 }
64 };
65
66 let has_change = events.iter().any(|e| {
67 e.kind == DebouncedEventKind::Any
68 && e.path.file_name().is_some_and(|n| n == filename)
69 });
70
71 if has_change {
72 let _ = notify_tx.blocking_send(());
73 }
74 },
75 )?;
76
77 debouncer
78 .watcher()
79 .watch(&dir, notify::RecursiveMode::NonRecursive)?;
80
81 let handle = tokio::spawn(async move {
82 let _debouncer = debouncer;
83 while notify_rx.recv().await.is_some() {
84 if tx.send(ConfigEvent::Changed).await.is_err() {
85 break;
86 }
87 }
88 });
89
90 Ok(Self { _handle: handle })
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97
98 #[tokio::test]
99 async fn start_with_valid_config_file() {
100 let dir = tempfile::tempdir().unwrap();
101 let config_path = dir.path().join("config.toml");
102 std::fs::write(&config_path, "key = 1").unwrap();
103 let (tx, _rx) = mpsc::channel(16);
104 let watcher = ConfigWatcher::start(&config_path, tx);
105 assert!(watcher.is_ok());
106 }
107
108 #[tokio::test]
109 async fn start_with_nonexistent_parent_fails() {
110 let (tx, _rx) = mpsc::channel(16);
111 let result = ConfigWatcher::start(Path::new("/nonexistent/dir/config.toml"), tx);
112 assert!(result.is_err());
113 }
114
115 #[tokio::test]
116 async fn detects_config_file_change() {
117 let dir = tempfile::tempdir().unwrap();
118 let config_path = dir.path().join("config.toml");
119 std::fs::write(&config_path, "initial = true").unwrap();
120
121 let (tx, mut rx) = mpsc::channel(16);
122 let _watcher = ConfigWatcher::start(&config_path, tx).unwrap();
123
124 tokio::time::sleep(Duration::from_millis(100)).await;
125 std::fs::write(&config_path, "updated = true").unwrap();
126
127 let result = tokio::time::timeout(Duration::from_secs(3), rx.recv()).await;
128 assert!(
129 result.is_ok(),
130 "expected ConfigEvent::Changed within timeout"
131 );
132 }
133
134 #[tokio::test]
135 async fn ignores_other_files_in_directory() {
136 let dir = tempfile::tempdir().unwrap();
137 let config_path = dir.path().join("config.toml");
138 std::fs::write(&config_path, "key = 1").unwrap();
139
140 let (tx, mut rx) = mpsc::channel(16);
141 let _watcher = ConfigWatcher::start(&config_path, tx).unwrap();
142
143 tokio::time::sleep(Duration::from_millis(800)).await;
145 while rx.try_recv().is_ok() {}
146
147 let other_path = dir.path().join("other.txt");
148 std::fs::write(&other_path, "content").unwrap();
149
150 let result = tokio::time::timeout(Duration::from_millis(1500), rx.recv()).await;
151 assert!(
152 result.is_err(),
153 "should not receive event for non-config file"
154 );
155 }
156}