Skip to main content

agentzero_config/
watcher.rs

1use crate::loader;
2use crate::model::AgentZeroConfig;
3use std::path::{Path, PathBuf};
4use std::time::{Duration, SystemTime};
5use tokio::sync::watch;
6
7/// Watches a config file for changes and broadcasts new configs via a watch channel.
8///
9/// Uses `tokio::fs::metadata()` polling to detect modification time changes.
10/// When a change is detected, the file is re-read, parsed, and validated.
11/// Only valid configs are broadcast; invalid files log a warning and are skipped.
12pub struct ConfigWatcher {
13    config_path: PathBuf,
14    poll_interval: Duration,
15    tx: watch::Sender<AgentZeroConfig>,
16    rx: watch::Receiver<AgentZeroConfig>,
17}
18
19impl ConfigWatcher {
20    /// Create a new watcher for the given config file.
21    ///
22    /// Loads the initial config immediately and makes it available via `subscribe()`.
23    pub fn new(config_path: PathBuf, poll_interval: Duration) -> anyhow::Result<Self> {
24        let initial = loader::load(&config_path)?;
25        let (tx, rx) = watch::channel(initial);
26        Ok(Self {
27            config_path,
28            poll_interval,
29            tx,
30            rx,
31        })
32    }
33
34    /// Create a watcher from an already-loaded config (avoids double-load at startup).
35    pub fn from_config(
36        config_path: PathBuf,
37        poll_interval: Duration,
38        config: AgentZeroConfig,
39    ) -> Self {
40        let (tx, rx) = watch::channel(config);
41        Self {
42            config_path,
43            poll_interval,
44            tx,
45            rx,
46        }
47    }
48
49    /// Get a receiver to subscribe to config changes.
50    pub fn subscribe(&self) -> watch::Receiver<AgentZeroConfig> {
51        self.rx.clone()
52    }
53
54    /// Get the current config snapshot.
55    pub fn current(&self) -> AgentZeroConfig {
56        self.rx.borrow().clone()
57    }
58
59    /// Run the polling loop. This is a long-running task — spawn it with `tokio::spawn`.
60    ///
61    /// The loop runs until the sender is dropped (all receivers dropped) or the token
62    /// signals cancellation.
63    pub async fn run(self, cancel: tokio::sync::watch::Receiver<bool>) {
64        let mut last_modified = file_mtime(&self.config_path).await;
65        let mut cancel = cancel;
66
67        loop {
68            tokio::select! {
69                _ = tokio::time::sleep(self.poll_interval) => {}
70                result = cancel.changed() => {
71                    if result.is_err() || *cancel.borrow() {
72                        tracing::debug!("config watcher shutting down");
73                        return;
74                    }
75                }
76            }
77
78            let current_mtime = file_mtime(&self.config_path).await;
79            if current_mtime == last_modified {
80                continue;
81            }
82
83            tracing::info!(path = %self.config_path.display(), "config file changed, reloading");
84            last_modified = current_mtime;
85
86            match loader::load(&self.config_path) {
87                Ok(new_config) => {
88                    if self.tx.send(new_config).is_err() {
89                        tracing::debug!("all config subscribers dropped, stopping watcher");
90                        return;
91                    }
92                    tracing::info!("config reloaded successfully");
93                }
94                Err(e) => {
95                    tracing::warn!(error = %e, "config reload failed, keeping previous config");
96                }
97            }
98        }
99    }
100}
101
102async fn file_mtime(path: &Path) -> Option<SystemTime> {
103    tokio::fs::metadata(path)
104        .await
105        .ok()
106        .and_then(|m| m.modified().ok())
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use std::fs;
113
114    fn minimal_config_toml() -> &'static str {
115        "[security]\nallowed_root = \".\"\nallowed_commands = [\"echo\"]\n"
116    }
117
118    fn temp_dir() -> PathBuf {
119        static SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
120        let nanos = std::time::SystemTime::now()
121            .duration_since(std::time::UNIX_EPOCH)
122            .unwrap()
123            .as_nanos();
124        let seq = SEQ.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
125        let dir = std::env::temp_dir().join(format!(
126            "agentzero-watcher-{}-{nanos}-{seq}",
127            std::process::id()
128        ));
129        fs::create_dir_all(&dir).unwrap();
130        dir
131    }
132
133    #[test]
134    fn from_config_provides_initial_value() {
135        let config = AgentZeroConfig::default();
136        let watcher = ConfigWatcher::from_config(
137            PathBuf::from("/tmp/fake.toml"),
138            Duration::from_secs(5),
139            config.clone(),
140        );
141        let current = watcher.current();
142        assert_eq!(current.provider.kind, config.provider.kind);
143    }
144
145    #[test]
146    fn subscribe_returns_cloned_receiver() {
147        let config = AgentZeroConfig::default();
148        let watcher = ConfigWatcher::from_config(
149            PathBuf::from("/tmp/fake.toml"),
150            Duration::from_secs(5),
151            config,
152        );
153        let rx1 = watcher.subscribe();
154        let rx2 = watcher.subscribe();
155        assert_eq!(rx1.borrow().provider.kind, rx2.borrow().provider.kind);
156    }
157
158    #[tokio::test]
159    async fn detects_config_change() {
160        let dir = temp_dir();
161        let config_path = dir.join("agentzero.toml");
162        fs::write(&config_path, minimal_config_toml()).unwrap();
163
164        let watcher = ConfigWatcher::new(config_path.clone(), Duration::from_millis(50)).unwrap();
165        let mut rx = watcher.subscribe();
166        let (cancel_tx, cancel_rx) = watch::channel(false);
167
168        let handle = tokio::spawn(watcher.run(cancel_rx));
169
170        // Modify the file
171        tokio::time::sleep(Duration::from_millis(100)).await;
172        fs::write(
173            &config_path,
174            "[provider]\nkind = \"anthropic\"\nmodel = \"claude-3\"\n\n[security]\nallowed_root = \".\"\nallowed_commands = [\"echo\"]\n",
175        )
176        .unwrap();
177
178        // Wait for change notification
179        let changed = tokio::time::timeout(Duration::from_secs(2), rx.changed()).await;
180        assert!(changed.is_ok(), "should receive change notification");
181        assert_eq!(rx.borrow().provider.kind, "anthropic");
182
183        // Shutdown
184        let _ = cancel_tx.send(true);
185        let _ = handle.await;
186        let _ = fs::remove_dir_all(dir);
187    }
188
189    #[tokio::test]
190    async fn skips_invalid_config_change() {
191        let dir = temp_dir();
192        let config_path = dir.join("agentzero.toml");
193        // Start with a known provider kind
194        fs::write(
195            &config_path,
196            "[provider]\nkind = \"openai\"\nmodel = \"gpt-4o\"\n\n[security]\nallowed_root = \".\"\nallowed_commands = [\"echo\"]\n",
197        )
198        .unwrap();
199
200        let watcher = ConfigWatcher::new(config_path.clone(), Duration::from_millis(50)).unwrap();
201        let rx = watcher.subscribe();
202        let (cancel_tx, cancel_rx) = watch::channel(false);
203
204        let handle = tokio::spawn(watcher.run(cancel_rx));
205
206        // Write syntactically invalid TOML that cannot be parsed at all.
207        // Using broken TOML ensures no environment variable override can rescue it.
208        tokio::time::sleep(Duration::from_millis(100)).await;
209        fs::write(&config_path, "[provider\nkind = broken toml ~~~\n").unwrap();
210
211        // Give it time to detect and skip
212        tokio::time::sleep(Duration::from_millis(200)).await;
213
214        // Config should still have original provider kind
215        assert_eq!(rx.borrow().provider.kind, "openai");
216
217        let _ = cancel_tx.send(true);
218        let _ = handle.await;
219        let _ = fs::remove_dir_all(dir);
220    }
221
222    #[tokio::test]
223    async fn stops_on_cancel_signal() {
224        let config = AgentZeroConfig::default();
225        let watcher = ConfigWatcher::from_config(
226            PathBuf::from("/tmp/nonexistent.toml"),
227            Duration::from_millis(50),
228            config,
229        );
230        let (cancel_tx, cancel_rx) = watch::channel(false);
231
232        let handle = tokio::spawn(watcher.run(cancel_rx));
233
234        tokio::time::sleep(Duration::from_millis(100)).await;
235        let _ = cancel_tx.send(true);
236
237        let result = tokio::time::timeout(Duration::from_secs(2), handle).await;
238        assert!(result.is_ok(), "watcher should stop within timeout");
239    }
240}