1use std::path::PathBuf;
37use std::sync::Arc;
38use std::time::Instant;
39
40use parking_lot::RwLock;
41use tokio::sync::broadcast;
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45pub enum ConfigSource {
46 Main(PathBuf),
48 Device(PathBuf),
50 Scenario(PathBuf),
52 Protocol { protocol: String, path: PathBuf },
54 Custom { name: String, path: PathBuf },
56}
57
58impl ConfigSource {
59 pub fn path(&self) -> &PathBuf {
61 match self {
62 Self::Main(p) => p,
63 Self::Device(p) => p,
64 Self::Scenario(p) => p,
65 Self::Protocol { path, .. } => path,
66 Self::Custom { path, .. } => path,
67 }
68 }
69
70 pub fn name(&self) -> String {
72 match self {
73 Self::Main(_) => "main".to_string(),
74 Self::Device(_) => "device".to_string(),
75 Self::Scenario(_) => "scenario".to_string(),
76 Self::Protocol { protocol, .. } => format!("protocol:{}", protocol),
77 Self::Custom { name, .. } => format!("custom:{}", name),
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
84pub enum ConfigEvent {
85 Created {
87 source: ConfigSource,
88 timestamp: Instant,
89 },
90 Modified {
92 source: ConfigSource,
93 timestamp: Instant,
94 },
95 Deleted {
97 source: ConfigSource,
98 timestamp: Instant,
99 },
100 Renamed {
102 source: ConfigSource,
103 old_path: PathBuf,
104 new_path: PathBuf,
105 timestamp: Instant,
106 },
107 Error {
109 source: Option<ConfigSource>,
110 message: String,
111 timestamp: Instant,
112 },
113 Reloaded {
115 source: ConfigSource,
116 timestamp: Instant,
117 },
118}
119
120impl ConfigEvent {
121 pub fn timestamp(&self) -> Instant {
123 match self {
124 Self::Created { timestamp, .. } => *timestamp,
125 Self::Modified { timestamp, .. } => *timestamp,
126 Self::Deleted { timestamp, .. } => *timestamp,
127 Self::Renamed { timestamp, .. } => *timestamp,
128 Self::Error { timestamp, .. } => *timestamp,
129 Self::Reloaded { timestamp, .. } => *timestamp,
130 }
131 }
132
133 pub fn source(&self) -> Option<&ConfigSource> {
135 match self {
136 Self::Created { source, .. } => Some(source),
137 Self::Modified { source, .. } => Some(source),
138 Self::Deleted { source, .. } => Some(source),
139 Self::Renamed { source, .. } => Some(source),
140 Self::Error { source, .. } => source.as_ref(),
141 Self::Reloaded { source, .. } => Some(source),
142 }
143 }
144
145 pub fn is_error(&self) -> bool {
147 matches!(self, Self::Error { .. })
148 }
149}
150
151pub trait ConfigEventHandler: Send + Sync {
155 fn on_config_change(&self, event: ConfigEvent);
157
158 fn before_reload(&self, _source: &ConfigSource) -> bool {
161 true
162 }
163
164 fn after_reload(&self, _source: &ConfigSource) {}
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum WatcherState {
171 Stopped,
173 Running,
175 Paused,
177}
178
179pub struct ConfigWatcher {
185 state: RwLock<WatcherState>,
187
188 sources: RwLock<Vec<ConfigSource>>,
190
191 event_tx: broadcast::Sender<ConfigEvent>,
193
194 debounce_ms: u64,
196
197 last_events: RwLock<std::collections::HashMap<String, Instant>>,
199}
200
201impl ConfigWatcher {
202 pub fn new() -> Self {
204 let (event_tx, _) = broadcast::channel(256);
205
206 Self {
207 state: RwLock::new(WatcherState::Stopped),
208 sources: RwLock::new(Vec::new()),
209 event_tx,
210 debounce_ms: 100,
211 last_events: RwLock::new(std::collections::HashMap::new()),
212 }
213 }
214
215 pub fn with_debounce(debounce_ms: u64) -> Self {
217 let mut watcher = Self::new();
218 watcher.debounce_ms = debounce_ms;
219 watcher
220 }
221
222 pub fn state(&self) -> WatcherState {
224 *self.state.read()
225 }
226
227 pub fn start(&self) {
229 *self.state.write() = WatcherState::Running;
230 }
231
232 pub fn stop(&self) {
234 *self.state.write() = WatcherState::Stopped;
235 }
236
237 pub fn pause(&self) {
239 *self.state.write() = WatcherState::Paused;
240 }
241
242 pub fn resume(&self) {
244 *self.state.write() = WatcherState::Running;
245 }
246
247 pub fn register(&self, source: ConfigSource) {
249 let mut sources = self.sources.write();
250 if !sources.iter().any(|s| s.path() == source.path()) {
251 sources.push(source);
252 }
253 }
254
255 pub fn unregister(&self, path: &PathBuf) {
257 self.sources.write().retain(|s| s.path() != path);
258 }
259
260 pub fn sources(&self) -> Vec<ConfigSource> {
262 self.sources.read().clone()
263 }
264
265 pub fn subscribe(&self) -> broadcast::Receiver<ConfigEvent> {
267 self.event_tx.subscribe()
268 }
269
270 pub fn emit(&self, event: ConfigEvent) {
275 if *self.state.read() != WatcherState::Running {
277 return;
278 }
279
280 if let Some(source) = event.source() {
282 let source_key = source.name();
283 let now = Instant::now();
284
285 let mut last_events = self.last_events.write();
286 if let Some(last_time) = last_events.get(&source_key) {
287 if now.duration_since(*last_time).as_millis() < self.debounce_ms as u128 {
288 return; }
290 }
291 last_events.insert(source_key, now);
292 }
293
294 let _ = self.event_tx.send(event);
296 }
297
298 pub fn emit_modified(&self, source: ConfigSource) {
300 self.emit(ConfigEvent::Modified {
301 source,
302 timestamp: Instant::now(),
303 });
304 }
305
306 pub fn emit_reloaded(&self, source: ConfigSource) {
308 self.emit(ConfigEvent::Reloaded {
309 source,
310 timestamp: Instant::now(),
311 });
312 }
313
314 pub fn emit_error(&self, source: Option<ConfigSource>, message: impl Into<String>) {
316 self.emit(ConfigEvent::Error {
317 source,
318 message: message.into(),
319 timestamp: Instant::now(),
320 });
321 }
322}
323
324impl Default for ConfigWatcher {
325 fn default() -> Self {
326 Self::new()
327 }
328}
329
330pub type SharedConfigWatcher = Arc<ConfigWatcher>;
332
333pub fn create_config_watcher() -> SharedConfigWatcher {
335 Arc::new(ConfigWatcher::new())
336}
337
338pub struct CallbackHandler<F>
340where
341 F: Fn(ConfigEvent) + Send + Sync,
342{
343 callback: F,
344}
345
346impl<F> CallbackHandler<F>
347where
348 F: Fn(ConfigEvent) + Send + Sync,
349{
350 pub fn new(callback: F) -> Self {
352 Self { callback }
353 }
354}
355
356impl<F> ConfigEventHandler for CallbackHandler<F>
357where
358 F: Fn(ConfigEvent) + Send + Sync,
359{
360 fn on_config_change(&self, event: ConfigEvent) {
361 (self.callback)(event);
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 #[test]
370 fn test_config_source() {
371 let source = ConfigSource::Main(PathBuf::from("/etc/config.yaml"));
372 assert_eq!(source.name(), "main");
373 assert_eq!(source.path(), &PathBuf::from("/etc/config.yaml"));
374
375 let protocol_source = ConfigSource::Protocol {
376 protocol: "modbus".to_string(),
377 path: PathBuf::from("/etc/modbus.yaml"),
378 };
379 assert_eq!(protocol_source.name(), "protocol:modbus");
380 }
381
382 #[test]
383 fn test_config_watcher_lifecycle() {
384 let watcher = ConfigWatcher::new();
385
386 assert_eq!(watcher.state(), WatcherState::Stopped);
387
388 watcher.start();
389 assert_eq!(watcher.state(), WatcherState::Running);
390
391 watcher.pause();
392 assert_eq!(watcher.state(), WatcherState::Paused);
393
394 watcher.resume();
395 assert_eq!(watcher.state(), WatcherState::Running);
396
397 watcher.stop();
398 assert_eq!(watcher.state(), WatcherState::Stopped);
399 }
400
401 #[test]
402 fn test_config_watcher_sources() {
403 let watcher = ConfigWatcher::new();
404
405 let source1 = ConfigSource::Main(PathBuf::from("/etc/config1.yaml"));
406 let source2 = ConfigSource::Device(PathBuf::from("/etc/config2.yaml"));
407
408 watcher.register(source1.clone());
409 watcher.register(source2.clone());
410
411 let sources = watcher.sources();
412 assert_eq!(sources.len(), 2);
413
414 watcher.unregister(&PathBuf::from("/etc/config1.yaml"));
415 let sources = watcher.sources();
416 assert_eq!(sources.len(), 1);
417 }
418
419 #[tokio::test]
420 async fn test_config_watcher_events() {
421 let watcher = ConfigWatcher::new();
422 watcher.start();
423
424 let mut rx = watcher.subscribe();
425
426 let source = ConfigSource::Main(PathBuf::from("/etc/config.yaml"));
427 watcher.emit_modified(source.clone());
428
429 let event = tokio::time::timeout(
430 std::time::Duration::from_millis(100),
431 rx.recv(),
432 )
433 .await
434 .expect("Timeout")
435 .expect("Channel closed");
436
437 assert!(matches!(event, ConfigEvent::Modified { .. }));
438 }
439
440 #[test]
441 fn test_config_event_methods() {
442 let source = ConfigSource::Main(PathBuf::from("/etc/config.yaml"));
443 let event = ConfigEvent::Modified {
444 source: source.clone(),
445 timestamp: Instant::now(),
446 };
447
448 assert!(!event.is_error());
449 assert_eq!(event.source().unwrap().name(), "main");
450
451 let error_event = ConfigEvent::Error {
452 source: None,
453 message: "Test error".to_string(),
454 timestamp: Instant::now(),
455 };
456 assert!(error_event.is_error());
457 }
458}