Skip to main content

mabi_core/config/
watcher.rs

1//! Configuration file watcher for hot reload support.
2//!
3//! This module provides infrastructure for watching configuration files
4//! and notifying subscribers when changes occur. The actual file watching
5//! implementation is provided by external crates (e.g., notify) at the
6//! application level, while this module defines the interfaces and event types.
7//!
8//! # Architecture
9//!
10//! ```text
11//! ┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
12//! │  Config File    │────▶│  ConfigWatcher   │────▶│  Subscribers    │
13//! │  (YAML/JSON)    │     │  (notify events) │     │  (engine, etc.) │
14//! └─────────────────┘     └──────────────────┘     └─────────────────┘
15//! ```
16//!
17//! # Example
18//!
19//! ```rust,ignore
20//! use mabi_core::config::{ConfigEvent, ConfigEventHandler, ConfigSource};
21//!
22//! struct MyHandler;
23//!
24//! impl ConfigEventHandler for MyHandler {
25//!     fn on_config_change(&self, event: ConfigEvent) {
26//!         match event {
27//!             ConfigEvent::Modified { source, .. } => {
28//!                 println!("Config modified: {:?}", source);
29//!             }
30//!             _ => {}
31//!         }
32//!     }
33//! }
34//! ```
35
36use std::path::PathBuf;
37use std::sync::Arc;
38use std::time::Instant;
39
40use parking_lot::RwLock;
41use tokio::sync::broadcast;
42
43/// Configuration source identifier.
44#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45pub enum ConfigSource {
46    /// Main simulator configuration file.
47    Main(PathBuf),
48    /// Device configuration file.
49    Device(PathBuf),
50    /// Scenario configuration file.
51    Scenario(PathBuf),
52    /// Protocol-specific configuration.
53    Protocol { protocol: String, path: PathBuf },
54    /// Custom configuration source.
55    Custom { name: String, path: PathBuf },
56}
57
58impl ConfigSource {
59    /// Get the path of this configuration source.
60    pub fn path(&self) -> &PathBuf {
61        match self {
62            Self::Main(p) => p,
63            Self::Device(p) => p,
64            Self::Scenario(p) => p,
65            Self::Protocol { path, .. } => path,
66            Self::Custom { path, .. } => path,
67        }
68    }
69
70    /// Get a descriptive name for this source.
71    pub fn name(&self) -> String {
72        match self {
73            Self::Main(_) => "main".to_string(),
74            Self::Device(_) => "device".to_string(),
75            Self::Scenario(_) => "scenario".to_string(),
76            Self::Protocol { protocol, .. } => format!("protocol:{}", protocol),
77            Self::Custom { name, .. } => format!("custom:{}", name),
78        }
79    }
80}
81
82/// Configuration change event.
83#[derive(Debug, Clone)]
84pub enum ConfigEvent {
85    /// Configuration file was created.
86    Created {
87        source: ConfigSource,
88        timestamp: Instant,
89    },
90    /// Configuration file was modified.
91    Modified {
92        source: ConfigSource,
93        timestamp: Instant,
94    },
95    /// Configuration file was deleted.
96    Deleted {
97        source: ConfigSource,
98        timestamp: Instant,
99    },
100    /// Configuration file was renamed.
101    Renamed {
102        source: ConfigSource,
103        old_path: PathBuf,
104        new_path: PathBuf,
105        timestamp: Instant,
106    },
107    /// Error occurred while watching.
108    Error {
109        source: Option<ConfigSource>,
110        message: String,
111        timestamp: Instant,
112    },
113    /// Configuration was reloaded successfully.
114    Reloaded {
115        source: ConfigSource,
116        timestamp: Instant,
117    },
118}
119
120impl ConfigEvent {
121    /// Get the timestamp of this event.
122    pub fn timestamp(&self) -> Instant {
123        match self {
124            Self::Created { timestamp, .. } => *timestamp,
125            Self::Modified { timestamp, .. } => *timestamp,
126            Self::Deleted { timestamp, .. } => *timestamp,
127            Self::Renamed { timestamp, .. } => *timestamp,
128            Self::Error { timestamp, .. } => *timestamp,
129            Self::Reloaded { timestamp, .. } => *timestamp,
130        }
131    }
132
133    /// Get the source of this event, if available.
134    pub fn source(&self) -> Option<&ConfigSource> {
135        match self {
136            Self::Created { source, .. } => Some(source),
137            Self::Modified { source, .. } => Some(source),
138            Self::Deleted { source, .. } => Some(source),
139            Self::Renamed { source, .. } => Some(source),
140            Self::Error { source, .. } => source.as_ref(),
141            Self::Reloaded { source, .. } => Some(source),
142        }
143    }
144
145    /// Check if this is an error event.
146    pub fn is_error(&self) -> bool {
147        matches!(self, Self::Error { .. })
148    }
149}
150
151/// Trait for handling configuration events.
152///
153/// Implement this trait to receive notifications when configuration changes.
154pub trait ConfigEventHandler: Send + Sync {
155    /// Called when a configuration event occurs.
156    fn on_config_change(&self, event: ConfigEvent);
157
158    /// Called before configuration reload.
159    /// Return false to cancel the reload.
160    fn before_reload(&self, _source: &ConfigSource) -> bool {
161        true
162    }
163
164    /// Called after successful configuration reload.
165    fn after_reload(&self, _source: &ConfigSource) {}
166}
167
168/// Configuration watcher state.
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum WatcherState {
171    /// Watcher is stopped.
172    Stopped,
173    /// Watcher is running.
174    Running,
175    /// Watcher is paused (events are queued but not processed).
176    Paused,
177}
178
179/// Configuration watcher for hot reload support.
180///
181/// This struct manages configuration file watching and event distribution.
182/// The actual file system watching should be implemented at the application level
183/// using this struct to distribute events.
184pub struct ConfigWatcher {
185    /// Current state.
186    state: RwLock<WatcherState>,
187
188    /// Registered sources.
189    sources: RwLock<Vec<ConfigSource>>,
190
191    /// Event broadcaster.
192    event_tx: broadcast::Sender<ConfigEvent>,
193
194    /// Debounce duration in milliseconds.
195    debounce_ms: u64,
196
197    /// Last event time per source (for debouncing).
198    last_events: RwLock<std::collections::HashMap<String, Instant>>,
199}
200
201impl ConfigWatcher {
202    /// Create a new configuration watcher.
203    pub fn new() -> Self {
204        let (event_tx, _) = broadcast::channel(256);
205
206        Self {
207            state: RwLock::new(WatcherState::Stopped),
208            sources: RwLock::new(Vec::new()),
209            event_tx,
210            debounce_ms: 100,
211            last_events: RwLock::new(std::collections::HashMap::new()),
212        }
213    }
214
215    /// Create a new configuration watcher with custom debounce duration.
216    pub fn with_debounce(debounce_ms: u64) -> Self {
217        let mut watcher = Self::new();
218        watcher.debounce_ms = debounce_ms;
219        watcher
220    }
221
222    /// Get the current state.
223    pub fn state(&self) -> WatcherState {
224        *self.state.read()
225    }
226
227    /// Start the watcher.
228    pub fn start(&self) {
229        *self.state.write() = WatcherState::Running;
230    }
231
232    /// Stop the watcher.
233    pub fn stop(&self) {
234        *self.state.write() = WatcherState::Stopped;
235    }
236
237    /// Pause the watcher (events are still received but not processed).
238    pub fn pause(&self) {
239        *self.state.write() = WatcherState::Paused;
240    }
241
242    /// Resume the watcher.
243    pub fn resume(&self) {
244        *self.state.write() = WatcherState::Running;
245    }
246
247    /// Register a configuration source to watch.
248    pub fn register(&self, source: ConfigSource) {
249        let mut sources = self.sources.write();
250        if !sources.iter().any(|s| s.path() == source.path()) {
251            sources.push(source);
252        }
253    }
254
255    /// Unregister a configuration source.
256    pub fn unregister(&self, path: &PathBuf) {
257        self.sources.write().retain(|s| s.path() != path);
258    }
259
260    /// Get all registered sources.
261    pub fn sources(&self) -> Vec<ConfigSource> {
262        self.sources.read().clone()
263    }
264
265    /// Subscribe to configuration events.
266    pub fn subscribe(&self) -> broadcast::Receiver<ConfigEvent> {
267        self.event_tx.subscribe()
268    }
269
270    /// Emit a configuration event.
271    ///
272    /// This method should be called by the file system watcher implementation.
273    /// Events are debounced based on the configured duration.
274    pub fn emit(&self, event: ConfigEvent) {
275        // Check if watcher is running
276        if *self.state.read() != WatcherState::Running {
277            return;
278        }
279
280        // Debounce check
281        if let Some(source) = event.source() {
282            let source_key = source.name();
283            let now = Instant::now();
284
285            let mut last_events = self.last_events.write();
286            if let Some(last_time) = last_events.get(&source_key) {
287                if now.duration_since(*last_time).as_millis() < self.debounce_ms as u128 {
288                    return; // Skip debounced event
289                }
290            }
291            last_events.insert(source_key, now);
292        }
293
294        // Broadcast event
295        let _ = self.event_tx.send(event);
296    }
297
298    /// Emit a modification event for a source.
299    pub fn emit_modified(&self, source: ConfigSource) {
300        self.emit(ConfigEvent::Modified {
301            source,
302            timestamp: Instant::now(),
303        });
304    }
305
306    /// Emit a reload success event.
307    pub fn emit_reloaded(&self, source: ConfigSource) {
308        self.emit(ConfigEvent::Reloaded {
309            source,
310            timestamp: Instant::now(),
311        });
312    }
313
314    /// Emit an error event.
315    pub fn emit_error(&self, source: Option<ConfigSource>, message: impl Into<String>) {
316        self.emit(ConfigEvent::Error {
317            source,
318            message: message.into(),
319            timestamp: Instant::now(),
320        });
321    }
322}
323
324impl Default for ConfigWatcher {
325    fn default() -> Self {
326        Self::new()
327    }
328}
329
330/// Arc-wrapped ConfigWatcher for shared access.
331pub type SharedConfigWatcher = Arc<ConfigWatcher>;
332
333/// Create a shared config watcher.
334pub fn create_config_watcher() -> SharedConfigWatcher {
335    Arc::new(ConfigWatcher::new())
336}
337
338/// Callback-based event handler adapter.
339pub struct CallbackHandler<F>
340where
341    F: Fn(ConfigEvent) + Send + Sync,
342{
343    callback: F,
344}
345
346impl<F> CallbackHandler<F>
347where
348    F: Fn(ConfigEvent) + Send + Sync,
349{
350    /// Create a new callback handler.
351    pub fn new(callback: F) -> Self {
352        Self { callback }
353    }
354}
355
356impl<F> ConfigEventHandler for CallbackHandler<F>
357where
358    F: Fn(ConfigEvent) + Send + Sync,
359{
360    fn on_config_change(&self, event: ConfigEvent) {
361        (self.callback)(event);
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    #[test]
370    fn test_config_source() {
371        let source = ConfigSource::Main(PathBuf::from("/etc/config.yaml"));
372        assert_eq!(source.name(), "main");
373        assert_eq!(source.path(), &PathBuf::from("/etc/config.yaml"));
374
375        let protocol_source = ConfigSource::Protocol {
376            protocol: "modbus".to_string(),
377            path: PathBuf::from("/etc/modbus.yaml"),
378        };
379        assert_eq!(protocol_source.name(), "protocol:modbus");
380    }
381
382    #[test]
383    fn test_config_watcher_lifecycle() {
384        let watcher = ConfigWatcher::new();
385
386        assert_eq!(watcher.state(), WatcherState::Stopped);
387
388        watcher.start();
389        assert_eq!(watcher.state(), WatcherState::Running);
390
391        watcher.pause();
392        assert_eq!(watcher.state(), WatcherState::Paused);
393
394        watcher.resume();
395        assert_eq!(watcher.state(), WatcherState::Running);
396
397        watcher.stop();
398        assert_eq!(watcher.state(), WatcherState::Stopped);
399    }
400
401    #[test]
402    fn test_config_watcher_sources() {
403        let watcher = ConfigWatcher::new();
404
405        let source1 = ConfigSource::Main(PathBuf::from("/etc/config1.yaml"));
406        let source2 = ConfigSource::Device(PathBuf::from("/etc/config2.yaml"));
407
408        watcher.register(source1.clone());
409        watcher.register(source2.clone());
410
411        let sources = watcher.sources();
412        assert_eq!(sources.len(), 2);
413
414        watcher.unregister(&PathBuf::from("/etc/config1.yaml"));
415        let sources = watcher.sources();
416        assert_eq!(sources.len(), 1);
417    }
418
419    #[tokio::test]
420    async fn test_config_watcher_events() {
421        let watcher = ConfigWatcher::new();
422        watcher.start();
423
424        let mut rx = watcher.subscribe();
425
426        let source = ConfigSource::Main(PathBuf::from("/etc/config.yaml"));
427        watcher.emit_modified(source.clone());
428
429        let event = tokio::time::timeout(
430            std::time::Duration::from_millis(100),
431            rx.recv(),
432        )
433        .await
434        .expect("Timeout")
435        .expect("Channel closed");
436
437        assert!(matches!(event, ConfigEvent::Modified { .. }));
438    }
439
440    #[test]
441    fn test_config_event_methods() {
442        let source = ConfigSource::Main(PathBuf::from("/etc/config.yaml"));
443        let event = ConfigEvent::Modified {
444            source: source.clone(),
445            timestamp: Instant::now(),
446        };
447
448        assert!(!event.is_error());
449        assert_eq!(event.source().unwrap().name(), "main");
450
451        let error_event = ConfigEvent::Error {
452            source: None,
453            message: "Test error".to_string(),
454            timestamp: Instant::now(),
455        };
456        assert!(error_event.is_error());
457    }
458}