agentzero_config/
watcher.rs1use crate::loader;
2use crate::model::AgentZeroConfig;
3use std::path::{Path, PathBuf};
4use std::time::{Duration, SystemTime};
5use tokio::sync::watch;
6
7pub struct ConfigWatcher {
13 config_path: PathBuf,
14 poll_interval: Duration,
15 tx: watch::Sender<AgentZeroConfig>,
16 rx: watch::Receiver<AgentZeroConfig>,
17}
18
19impl ConfigWatcher {
20 pub fn new(config_path: PathBuf, poll_interval: Duration) -> anyhow::Result<Self> {
24 let initial = loader::load(&config_path)?;
25 let (tx, rx) = watch::channel(initial);
26 Ok(Self {
27 config_path,
28 poll_interval,
29 tx,
30 rx,
31 })
32 }
33
34 pub fn from_config(
36 config_path: PathBuf,
37 poll_interval: Duration,
38 config: AgentZeroConfig,
39 ) -> Self {
40 let (tx, rx) = watch::channel(config);
41 Self {
42 config_path,
43 poll_interval,
44 tx,
45 rx,
46 }
47 }
48
49 pub fn subscribe(&self) -> watch::Receiver<AgentZeroConfig> {
51 self.rx.clone()
52 }
53
54 pub fn current(&self) -> AgentZeroConfig {
56 self.rx.borrow().clone()
57 }
58
59 pub async fn run(self, cancel: tokio::sync::watch::Receiver<bool>) {
64 let mut last_modified = file_mtime(&self.config_path).await;
65 let mut cancel = cancel;
66
67 loop {
68 tokio::select! {
69 _ = tokio::time::sleep(self.poll_interval) => {}
70 result = cancel.changed() => {
71 if result.is_err() || *cancel.borrow() {
72 tracing::debug!("config watcher shutting down");
73 return;
74 }
75 }
76 }
77
78 let current_mtime = file_mtime(&self.config_path).await;
79 if current_mtime == last_modified {
80 continue;
81 }
82
83 tracing::info!(path = %self.config_path.display(), "config file changed, reloading");
84 last_modified = current_mtime;
85
86 match loader::load(&self.config_path) {
87 Ok(new_config) => {
88 if self.tx.send(new_config).is_err() {
89 tracing::debug!("all config subscribers dropped, stopping watcher");
90 return;
91 }
92 tracing::info!("config reloaded successfully");
93 }
94 Err(e) => {
95 tracing::warn!(error = %e, "config reload failed, keeping previous config");
96 }
97 }
98 }
99 }
100}
101
102async fn file_mtime(path: &Path) -> Option<SystemTime> {
103 tokio::fs::metadata(path)
104 .await
105 .ok()
106 .and_then(|m| m.modified().ok())
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use std::fs;
113
114 fn minimal_config_toml() -> &'static str {
115 "[security]\nallowed_root = \".\"\nallowed_commands = [\"echo\"]\n"
116 }
117
118 fn temp_dir() -> PathBuf {
119 static SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
120 let nanos = std::time::SystemTime::now()
121 .duration_since(std::time::UNIX_EPOCH)
122 .unwrap()
123 .as_nanos();
124 let seq = SEQ.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
125 let dir = std::env::temp_dir().join(format!(
126 "agentzero-watcher-{}-{nanos}-{seq}",
127 std::process::id()
128 ));
129 fs::create_dir_all(&dir).unwrap();
130 dir
131 }
132
133 #[test]
134 fn from_config_provides_initial_value() {
135 let config = AgentZeroConfig::default();
136 let watcher = ConfigWatcher::from_config(
137 PathBuf::from("/tmp/fake.toml"),
138 Duration::from_secs(5),
139 config.clone(),
140 );
141 let current = watcher.current();
142 assert_eq!(current.provider.kind, config.provider.kind);
143 }
144
145 #[test]
146 fn subscribe_returns_cloned_receiver() {
147 let config = AgentZeroConfig::default();
148 let watcher = ConfigWatcher::from_config(
149 PathBuf::from("/tmp/fake.toml"),
150 Duration::from_secs(5),
151 config,
152 );
153 let rx1 = watcher.subscribe();
154 let rx2 = watcher.subscribe();
155 assert_eq!(rx1.borrow().provider.kind, rx2.borrow().provider.kind);
156 }
157
158 #[tokio::test]
159 async fn detects_config_change() {
160 let dir = temp_dir();
161 let config_path = dir.join("agentzero.toml");
162 fs::write(&config_path, minimal_config_toml()).unwrap();
163
164 let watcher = ConfigWatcher::new(config_path.clone(), Duration::from_millis(50)).unwrap();
165 let mut rx = watcher.subscribe();
166 let (cancel_tx, cancel_rx) = watch::channel(false);
167
168 let handle = tokio::spawn(watcher.run(cancel_rx));
169
170 tokio::time::sleep(Duration::from_millis(100)).await;
172 fs::write(
173 &config_path,
174 "[provider]\nkind = \"anthropic\"\nmodel = \"claude-3\"\n\n[security]\nallowed_root = \".\"\nallowed_commands = [\"echo\"]\n",
175 )
176 .unwrap();
177
178 let changed = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
180 assert!(changed.is_ok(), "should receive change notification");
181 assert_eq!(rx.borrow().provider.kind, "anthropic");
182
183 let _ = cancel_tx.send(true);
185 let _ = handle.await;
186 let _ = fs::remove_dir_all(dir);
187 }
188
189 #[tokio::test]
190 async fn skips_invalid_config_change() {
191 let dir = temp_dir();
192 let config_path = dir.join("agentzero.toml");
193 fs::write(
195 &config_path,
196 "[provider]\nkind = \"openai\"\nmodel = \"gpt-4o\"\n\n[security]\nallowed_root = \".\"\nallowed_commands = [\"echo\"]\n",
197 )
198 .unwrap();
199
200 let watcher = ConfigWatcher::new(config_path.clone(), Duration::from_millis(50)).unwrap();
201 let rx = watcher.subscribe();
202 let (cancel_tx, cancel_rx) = watch::channel(false);
203
204 let handle = tokio::spawn(watcher.run(cancel_rx));
205
206 tokio::time::sleep(Duration::from_millis(100)).await;
209 fs::write(&config_path, "[provider\nkind = broken toml ~~~\n").unwrap();
210
211 tokio::time::sleep(Duration::from_millis(200)).await;
213
214 assert_eq!(rx.borrow().provider.kind, "openai");
216
217 let _ = cancel_tx.send(true);
218 let _ = handle.await;
219 let _ = fs::remove_dir_all(dir);
220 }
221
222 #[tokio::test]
223 async fn stops_on_cancel_signal() {
224 let config = AgentZeroConfig::default();
225 let watcher = ConfigWatcher::from_config(
226 PathBuf::from("/tmp/nonexistent.toml"),
227 Duration::from_millis(50),
228 config,
229 );
230 let (cancel_tx, cancel_rx) = watch::channel(false);
231
232 let handle = tokio::spawn(watcher.run(cancel_rx));
233
234 tokio::time::sleep(Duration::from_millis(100)).await;
235 let _ = cancel_tx.send(true);
236
237 let result = tokio::time::timeout(Duration::from_secs(2), handle).await;
238 assert!(result.is_ok(), "watcher should stop within timeout");
239 }
240}