mecha10_core/
config_watcher.rs

1//! Configuration Hot-Reload Support
2//!
3//! Provides file watching and automatic reloading of configuration files
4//! with validation and change notifications.
5//!
6//! # Features
7//!
8//! - File system watching with debouncing
9//! - Automatic validation on reload
10//! - Change callbacks for application updates
11//! - Support for JSON, YAML, and TOML configs
12//! - Thread-safe shared configuration state
13//! - Graceful error handling (keeps old config on invalid changes)
14//!
15//! # Example
16//!
17//! ```rust
18//! use mecha10::prelude::*;
19//! use mecha10::config_watcher::ConfigWatcher;
20//!
21//! #[derive(Debug, Clone, Deserialize)]
22//! struct MyConfig {
23//!     rate: f32,
24//!     enabled: bool,
25//! }
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<()> {
29//!     // Create watcher with callback
30//!     let watcher = ConfigWatcher::new("config.yaml", |config: &MyConfig| {
31//!         println!("Config reloaded! Rate: {}", config.rate);
32//!     })?;
33//!
34//!     // Get current config
35//!     let config = watcher.get();
36//!     println!("Initial rate: {}", config.rate);
37//!
38//!     // Config automatically reloads when file changes
39//!     tokio::signal::ctrl_c().await?;
40//!     Ok(())
41//! }
42//! ```
43
44use crate::{Mecha10Error, Result};
45use anyhow::Context as _;
46use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
47use serde::de::DeserializeOwned;
48use std::path::{Path, PathBuf};
49use std::sync::Arc;
50use std::time::Duration;
51use tokio::sync::RwLock;
52use tokio::time::sleep;
53use tracing::{debug, error, info, warn};
54
55// ============================================================================
56// Configuration Watcher
57// ============================================================================
58
59/// Hot-reload configuration watcher
60///
61/// Watches a configuration file for changes and automatically reloads it.
62/// Validates new configurations before applying them.
63///
64/// # Type Parameters
65///
66/// * `T` - The configuration type (must implement `DeserializeOwned + Clone + Send + Sync`)
67///
68/// # Example
69///
70/// ```rust
71/// use mecha10::config_watcher::ConfigWatcher;
72/// use serde::Deserialize;
73///
74/// #[derive(Debug, Clone, Deserialize)]
75/// struct AppConfig {
76///     port: u16,
77///     debug: bool,
78/// }
79///
80/// let watcher = ConfigWatcher::new("config.yaml", |config: &AppConfig| {
81///     println!("Port changed to: {}", config.port);
82/// })?;
83///
84/// // Get current config
85/// let config = watcher.get();
86/// ```
87pub struct ConfigWatcher<T>
88where
89    T: DeserializeOwned + Clone + Send + Sync + 'static,
90{
91    /// Current configuration
92    config: Arc<RwLock<T>>,
93
94    /// Path to config file
95    path: PathBuf,
96
97    /// File watcher
98    _watcher: RecommendedWatcher,
99}
100
101impl<T> ConfigWatcher<T>
102where
103    T: DeserializeOwned + Clone + Send + Sync + 'static,
104{
105    /// Create a new configuration watcher
106    ///
107    /// # Arguments
108    ///
109    /// * `path` - Path to configuration file
110    /// * `on_change` - Callback invoked when configuration changes
111    ///
112    /// # Returns
113    ///
114    /// A new `ConfigWatcher` instance
115    ///
116    /// # Errors
117    ///
118    /// Returns error if:
119    /// - File doesn't exist
120    /// - Initial configuration is invalid
121    /// - File watcher can't be created
122    ///
123    /// # Example
124    ///
125    /// ```rust
126    /// let watcher = ConfigWatcher::new("config.yaml", |config: &MyConfig| {
127    ///     // Handle configuration change
128    ///     println!("Config updated!");
129    /// })?;
130    /// ```
131    pub fn new<P, F>(path: P, on_change: F) -> Result<Self>
132    where
133        P: AsRef<Path>,
134        F: Fn(&T) + Send + Sync + 'static,
135    {
136        let path = path.as_ref().to_path_buf();
137
138        // Load initial configuration
139        let initial_config = Self::load_config(&path)?;
140        let config = Arc::new(RwLock::new(initial_config));
141
142        // Create callback wrapper
143        let config_clone = Arc::clone(&config);
144        let path_clone = path.clone();
145        let on_change = Arc::new(on_change);
146
147        // Create file watcher
148        let (tx, mut rx) = tokio::sync::mpsc::channel(100);
149
150        let mut watcher = RecommendedWatcher::new(
151            move |res: notify::Result<Event>| {
152                if let Ok(event) = res {
153                    // Only care about modify events
154                    if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
155                        let _ = tx.blocking_send(event);
156                    }
157                }
158            },
159            Config::default().with_poll_interval(Duration::from_millis(500)),
160        )
161        .map_err(|e| Mecha10Error::Configuration(format!("Failed to create file watcher: {}", e)))?;
162
163        // Watch the config file
164        watcher
165            .watch(&path, RecursiveMode::NonRecursive)
166            .map_err(|e| Mecha10Error::Configuration(format!("Failed to watch file {}: {}", path.display(), e)))?;
167
168        info!("Started watching config file: {}", path.display());
169
170        // Spawn reload task
171        tokio::spawn(async move {
172            let mut last_reload = tokio::time::Instant::now();
173
174            while let Some(_event) = rx.recv().await {
175                // Debounce: ignore events within 1 second of last reload
176                if last_reload.elapsed() < Duration::from_secs(1) {
177                    continue;
178                }
179
180                debug!("Config file changed, debouncing...");
181
182                // Wait a bit for file writes to complete
183                sleep(Duration::from_millis(100)).await;
184
185                // Attempt to reload
186                match Self::load_config(&path_clone) {
187                    Ok(new_config) => {
188                        info!("Successfully reloaded config from: {}", path_clone.display());
189
190                        // Update shared config
191                        {
192                            let mut config = config_clone.write().await;
193                            *config = new_config.clone();
194                        }
195
196                        // Invoke callback
197                        on_change(&new_config);
198
199                        last_reload = tokio::time::Instant::now();
200                    }
201                    Err(e) => {
202                        error!(
203                            "Failed to reload config from {}: {}. Keeping old config.",
204                            path_clone.display(),
205                            e
206                        );
207                        warn!("Fix the configuration errors to apply changes.");
208                    }
209                }
210            }
211        });
212
213        Ok(Self {
214            config,
215            path,
216            _watcher: watcher,
217        })
218    }
219
220    /// Get the current configuration
221    ///
222    /// Returns a clone of the current configuration.
223    ///
224    /// # Example
225    ///
226    /// ```rust
227    /// let config = watcher.get();
228    /// println!("Current rate: {}", config.rate);
229    /// ```
230    pub fn get(&self) -> T {
231        let config = self.config.blocking_read();
232        config.clone()
233    }
234
235    /// Get the current configuration asynchronously
236    ///
237    /// Returns a clone of the current configuration.
238    ///
239    /// # Example
240    ///
241    /// ```rust
242    /// let config = watcher.get_async().await;
243    /// println!("Current rate: {}", config.rate);
244    /// ```
245    pub async fn get_async(&self) -> T {
246        let config = self.config.read().await;
247        config.clone()
248    }
249
250    /// Manually reload the configuration
251    ///
252    /// Forces a reload of the configuration from disk.
253    ///
254    /// # Returns
255    ///
256    /// Ok(()) if reload succeeded, Err if loading or validation failed
257    ///
258    /// # Example
259    ///
260    /// ```rust
261    /// watcher.reload().await?;
262    /// ```
263    pub async fn reload(&self) -> Result<()> {
264        let new_config = Self::load_config(&self.path)?;
265
266        let mut config = self.config.write().await;
267        *config = new_config;
268
269        info!("Manually reloaded config from: {}", self.path.display());
270        Ok(())
271    }
272
273    /// Get the path to the configuration file
274    pub fn path(&self) -> &Path {
275        &self.path
276    }
277
278    // Helper: Load and parse configuration file
279    fn load_config(path: &Path) -> Result<T> {
280        use crate::config::load_config;
281
282        load_config(path)
283            .with_context(|| format!("Failed to load configuration from {}", path.display()))
284            .map_err(|e| Mecha10Error::Configuration(format!("{:#}", e)))
285    }
286}
287
288// ============================================================================
289// Validated Configuration Watcher
290// ============================================================================
291
292/// Configuration watcher with custom validation
293///
294/// Like `ConfigWatcher`, but also runs custom validation logic before applying changes.
295///
296/// # Example
297///
298/// ```rust
299/// use mecha10::config_watcher::ValidatedConfigWatcher;
300/// use mecha10::config::Validate;
301///
302/// #[derive(Debug, Clone, Deserialize)]
303/// struct AppConfig {
304///     port: u16,
305/// }
306///
307/// impl Validate for AppConfig {
308///     fn validate(&self) -> Result<()> {
309///         if self.port < 1024 {
310///             return Err(Mecha10Error::Configuration(
311///                 "Port must be >= 1024".to_string()
312///             ));
313///         }
314///         Ok(())
315///     }
316/// }
317///
318/// let watcher = ValidatedConfigWatcher::new("config.yaml", |config: &AppConfig| {
319///     println!("Valid config loaded!");
320/// })?;
321/// ```
322pub struct ValidatedConfigWatcher<T>
323where
324    T: DeserializeOwned + Clone + Send + Sync + crate::config::Validate + 'static,
325{
326    inner: ConfigWatcher<T>,
327}
328
329impl<T> ValidatedConfigWatcher<T>
330where
331    T: DeserializeOwned + Clone + Send + Sync + crate::config::Validate + 'static,
332{
333    /// Create a new validated configuration watcher
334    ///
335    /// # Arguments
336    ///
337    /// * `path` - Path to configuration file
338    /// * `on_change` - Callback invoked when configuration changes
339    ///
340    /// # Returns
341    ///
342    /// A new `ValidatedConfigWatcher` instance
343    ///
344    /// # Errors
345    ///
346    /// Returns error if initial configuration is invalid
347    pub fn new<P, F>(path: P, on_change: F) -> Result<Self>
348    where
349        P: AsRef<Path>,
350        F: Fn(&T) + Send + Sync + 'static,
351    {
352        // Wrap on_change to include validation
353        let inner = ConfigWatcher::new(path, move |config: &T| {
354            if let Err(e) = config.validate() {
355                error!("Configuration validation failed: {}", e);
356                warn!("Keeping previous configuration.");
357                return;
358            }
359            on_change(config);
360        })?;
361
362        Ok(Self { inner })
363    }
364
365    /// Get the current configuration
366    pub fn get(&self) -> T {
367        self.inner.get()
368    }
369
370    /// Get the current configuration asynchronously
371    pub async fn get_async(&self) -> T {
372        self.inner.get_async().await
373    }
374
375    /// Manually reload and validate the configuration
376    pub async fn reload(&self) -> Result<()> {
377        self.inner.reload().await?;
378        self.inner.get().validate()?;
379        Ok(())
380    }
381
382    /// Get the path to the configuration file
383    pub fn path(&self) -> &Path {
384        self.inner.path()
385    }
386}