mecha10_core/config_watcher.rs
1//! Configuration Hot-Reload Support
2//!
3//! Provides file watching and automatic reloading of configuration files
4//! with validation and change notifications.
5//!
6//! # Features
7//!
8//! - File system watching with debouncing
9//! - Automatic validation on reload
10//! - Change callbacks for application updates
11//! - Support for JSON, YAML, and TOML configs
12//! - Thread-safe shared configuration state
13//! - Graceful error handling (keeps old config on invalid changes)
14//!
15//! # Example
16//!
17//! ```rust
18//! use mecha10::prelude::*;
19//! use mecha10::config_watcher::ConfigWatcher;
20//!
21//! #[derive(Debug, Clone, Deserialize)]
22//! struct MyConfig {
23//! rate: f32,
24//! enabled: bool,
25//! }
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<()> {
29//! // Create watcher with callback
30//! let watcher = ConfigWatcher::new("config.yaml", |config: &MyConfig| {
31//! println!("Config reloaded! Rate: {}", config.rate);
32//! })?;
33//!
34//! // Get current config
35//! let config = watcher.get();
36//! println!("Initial rate: {}", config.rate);
37//!
38//! // Config automatically reloads when file changes
39//! tokio::signal::ctrl_c().await?;
40//! Ok(())
41//! }
42//! ```
43
44use crate::{Mecha10Error, Result};
45use anyhow::Context as _;
46use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
47use serde::de::DeserializeOwned;
48use std::path::{Path, PathBuf};
49use std::sync::Arc;
50use std::time::Duration;
51use tokio::sync::RwLock;
52use tokio::time::sleep;
53use tracing::{debug, error, info, warn};
54
55// ============================================================================
56// Configuration Watcher
57// ============================================================================
58
59/// Hot-reload configuration watcher
60///
61/// Watches a configuration file for changes and automatically reloads it.
62/// Validates new configurations before applying them.
63///
64/// # Type Parameters
65///
66/// * `T` - The configuration type (must implement `DeserializeOwned + Clone + Send + Sync`)
67///
68/// # Example
69///
70/// ```rust
71/// use mecha10::config_watcher::ConfigWatcher;
72/// use serde::Deserialize;
73///
74/// #[derive(Debug, Clone, Deserialize)]
75/// struct AppConfig {
76/// port: u16,
77/// debug: bool,
78/// }
79///
80/// let watcher = ConfigWatcher::new("config.yaml", |config: &AppConfig| {
81/// println!("Port changed to: {}", config.port);
82/// })?;
83///
84/// // Get current config
85/// let config = watcher.get();
86/// ```
87pub struct ConfigWatcher<T>
88where
89 T: DeserializeOwned + Clone + Send + Sync + 'static,
90{
91 /// Current configuration
92 config: Arc<RwLock<T>>,
93
94 /// Path to config file
95 path: PathBuf,
96
97 /// File watcher
98 _watcher: RecommendedWatcher,
99}
100
101impl<T> ConfigWatcher<T>
102where
103 T: DeserializeOwned + Clone + Send + Sync + 'static,
104{
105 /// Create a new configuration watcher
106 ///
107 /// # Arguments
108 ///
109 /// * `path` - Path to configuration file
110 /// * `on_change` - Callback invoked when configuration changes
111 ///
112 /// # Returns
113 ///
114 /// A new `ConfigWatcher` instance
115 ///
116 /// # Errors
117 ///
118 /// Returns error if:
119 /// - File doesn't exist
120 /// - Initial configuration is invalid
121 /// - File watcher can't be created
122 ///
123 /// # Example
124 ///
125 /// ```rust
126 /// let watcher = ConfigWatcher::new("config.yaml", |config: &MyConfig| {
127 /// // Handle configuration change
128 /// println!("Config updated!");
129 /// })?;
130 /// ```
131 pub fn new<P, F>(path: P, on_change: F) -> Result<Self>
132 where
133 P: AsRef<Path>,
134 F: Fn(&T) + Send + Sync + 'static,
135 {
136 let path = path.as_ref().to_path_buf();
137
138 // Load initial configuration
139 let initial_config = Self::load_config(&path)?;
140 let config = Arc::new(RwLock::new(initial_config));
141
142 // Create callback wrapper
143 let config_clone = Arc::clone(&config);
144 let path_clone = path.clone();
145 let on_change = Arc::new(on_change);
146
147 // Create file watcher
148 let (tx, mut rx) = tokio::sync::mpsc::channel(100);
149
150 let mut watcher = RecommendedWatcher::new(
151 move |res: notify::Result<Event>| {
152 if let Ok(event) = res {
153 // Only care about modify events
154 if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
155 let _ = tx.blocking_send(event);
156 }
157 }
158 },
159 Config::default().with_poll_interval(Duration::from_millis(500)),
160 )
161 .map_err(|e| Mecha10Error::Configuration(format!("Failed to create file watcher: {}", e)))?;
162
163 // Watch the config file
164 watcher
165 .watch(&path, RecursiveMode::NonRecursive)
166 .map_err(|e| Mecha10Error::Configuration(format!("Failed to watch file {}: {}", path.display(), e)))?;
167
168 info!("Started watching config file: {}", path.display());
169
170 // Spawn reload task
171 tokio::spawn(async move {
172 let mut last_reload = tokio::time::Instant::now();
173
174 while let Some(_event) = rx.recv().await {
175 // Debounce: ignore events within 1 second of last reload
176 if last_reload.elapsed() < Duration::from_secs(1) {
177 continue;
178 }
179
180 debug!("Config file changed, debouncing...");
181
182 // Wait a bit for file writes to complete
183 sleep(Duration::from_millis(100)).await;
184
185 // Attempt to reload
186 match Self::load_config(&path_clone) {
187 Ok(new_config) => {
188 info!("Successfully reloaded config from: {}", path_clone.display());
189
190 // Update shared config
191 {
192 let mut config = config_clone.write().await;
193 *config = new_config.clone();
194 }
195
196 // Invoke callback
197 on_change(&new_config);
198
199 last_reload = tokio::time::Instant::now();
200 }
201 Err(e) => {
202 error!(
203 "Failed to reload config from {}: {}. Keeping old config.",
204 path_clone.display(),
205 e
206 );
207 warn!("Fix the configuration errors to apply changes.");
208 }
209 }
210 }
211 });
212
213 Ok(Self {
214 config,
215 path,
216 _watcher: watcher,
217 })
218 }
219
220 /// Get the current configuration
221 ///
222 /// Returns a clone of the current configuration.
223 ///
224 /// # Example
225 ///
226 /// ```rust
227 /// let config = watcher.get();
228 /// println!("Current rate: {}", config.rate);
229 /// ```
230 pub fn get(&self) -> T {
231 let config = self.config.blocking_read();
232 config.clone()
233 }
234
235 /// Get the current configuration asynchronously
236 ///
237 /// Returns a clone of the current configuration.
238 ///
239 /// # Example
240 ///
241 /// ```rust
242 /// let config = watcher.get_async().await;
243 /// println!("Current rate: {}", config.rate);
244 /// ```
245 pub async fn get_async(&self) -> T {
246 let config = self.config.read().await;
247 config.clone()
248 }
249
250 /// Manually reload the configuration
251 ///
252 /// Forces a reload of the configuration from disk.
253 ///
254 /// # Returns
255 ///
256 /// Ok(()) if reload succeeded, Err if loading or validation failed
257 ///
258 /// # Example
259 ///
260 /// ```rust
261 /// watcher.reload().await?;
262 /// ```
263 pub async fn reload(&self) -> Result<()> {
264 let new_config = Self::load_config(&self.path)?;
265
266 let mut config = self.config.write().await;
267 *config = new_config;
268
269 info!("Manually reloaded config from: {}", self.path.display());
270 Ok(())
271 }
272
273 /// Get the path to the configuration file
274 pub fn path(&self) -> &Path {
275 &self.path
276 }
277
278 // Helper: Load and parse configuration file
279 fn load_config(path: &Path) -> Result<T> {
280 use crate::config::load_config;
281
282 load_config(path)
283 .with_context(|| format!("Failed to load configuration from {}", path.display()))
284 .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)))
285 }
286}
287
288// ============================================================================
289// Validated Configuration Watcher
290// ============================================================================
291
292/// Configuration watcher with custom validation
293///
294/// Like `ConfigWatcher`, but also runs custom validation logic before applying changes.
295///
296/// # Example
297///
298/// ```rust
299/// use mecha10::config_watcher::ValidatedConfigWatcher;
300/// use mecha10::config::Validate;
301///
302/// #[derive(Debug, Clone, Deserialize)]
303/// struct AppConfig {
304/// port: u16,
305/// }
306///
307/// impl Validate for AppConfig {
308/// fn validate(&self) -> Result<()> {
309/// if self.port < 1024 {
310/// return Err(Mecha10Error::Configuration(
311/// "Port must be >= 1024".to_string()
312/// ));
313/// }
314/// Ok(())
315/// }
316/// }
317///
318/// let watcher = ValidatedConfigWatcher::new("config.yaml", |config: &AppConfig| {
319/// println!("Valid config loaded!");
320/// })?;
321/// ```
322pub struct ValidatedConfigWatcher<T>
323where
324 T: DeserializeOwned + Clone + Send + Sync + crate::config::Validate + 'static,
325{
326 inner: ConfigWatcher<T>,
327}
328
329impl<T> ValidatedConfigWatcher<T>
330where
331 T: DeserializeOwned + Clone + Send + Sync + crate::config::Validate + 'static,
332{
333 /// Create a new validated configuration watcher
334 ///
335 /// # Arguments
336 ///
337 /// * `path` - Path to configuration file
338 /// * `on_change` - Callback invoked when configuration changes
339 ///
340 /// # Returns
341 ///
342 /// A new `ValidatedConfigWatcher` instance
343 ///
344 /// # Errors
345 ///
346 /// Returns error if initial configuration is invalid
347 pub fn new<P, F>(path: P, on_change: F) -> Result<Self>
348 where
349 P: AsRef<Path>,
350 F: Fn(&T) + Send + Sync + 'static,
351 {
352 // Wrap on_change to include validation
353 let inner = ConfigWatcher::new(path, move |config: &T| {
354 if let Err(e) = config.validate() {
355 error!("Configuration validation failed: {}", e);
356 warn!("Keeping previous configuration.");
357 return;
358 }
359 on_change(config);
360 })?;
361
362 Ok(Self { inner })
363 }
364
365 /// Get the current configuration
366 pub fn get(&self) -> T {
367 self.inner.get()
368 }
369
370 /// Get the current configuration asynchronously
371 pub async fn get_async(&self) -> T {
372 self.inner.get_async().await
373 }
374
375 /// Manually reload and validate the configuration
376 pub async fn reload(&self) -> Result<()> {
377 self.inner.reload().await?;
378 self.inner.get().validate()?;
379 Ok(())
380 }
381
382 /// Get the path to the configuration file
383 pub fn path(&self) -> &Path {
384 self.inner.path()
385 }
386}