1use std::collections::HashMap;
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use notify::{
44 Config as NotifyConfig, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode,
45 Watcher,
46};
47use parking_lot::{Mutex, RwLock};
48use tokio::sync::{mpsc, oneshot};
49
50use super::watcher::{ConfigEvent, ConfigSource, ConfigWatcher, SharedConfigWatcher};
51use crate::error::Error;
52use crate::Result;
53
54pub const DEFAULT_DEBOUNCE_MS: u64 = 100;
56
57#[derive(Debug, Clone)]
59pub struct FileWatcherConfig {
60 pub debounce_ms: u64,
62 pub recursive: bool,
64 pub extensions: Vec<String>,
66}
67
68impl Default for FileWatcherConfig {
69 fn default() -> Self {
70 Self {
71 debounce_ms: DEFAULT_DEBOUNCE_MS,
72 recursive: false,
73 extensions: vec![
74 "yaml".to_string(),
75 "yml".to_string(),
76 "json".to_string(),
77 "toml".to_string(),
78 ],
79 }
80 }
81}
82
83impl FileWatcherConfig {
84 pub fn all_files() -> Self {
86 Self {
87 extensions: Vec::new(),
88 ..Default::default()
89 }
90 }
91
92 pub fn with_debounce(mut self, ms: u64) -> Self {
94 self.debounce_ms = ms;
95 self
96 }
97
98 pub fn with_recursive(mut self, recursive: bool) -> Self {
100 self.recursive = recursive;
101 self
102 }
103
104 pub fn with_extensions(mut self, exts: Vec<String>) -> Self {
106 self.extensions = exts;
107 self
108 }
109
110 pub fn should_watch(&self, path: &Path) -> bool {
112 if self.extensions.is_empty() {
113 return true;
114 }
115
116 path.extension()
117 .and_then(|e| e.to_str())
118 .map(|ext| self.extensions.iter().any(|e| e.eq_ignore_ascii_case(ext)))
119 .unwrap_or(false)
120 }
121}
122
123#[derive(Clone)]
125struct WatchedPath {
126 source: ConfigSource,
127 #[allow(dead_code)]
128 last_event: Option<Instant>,
129}
130
131enum ServiceState {
133 Stopped,
134 Running {
135 watcher: RecommendedWatcher,
136 shutdown_tx: oneshot::Sender<()>,
137 },
138}
139
140pub struct FileWatcherService {
144 config_watcher: SharedConfigWatcher,
146 config: FileWatcherConfig,
148 watched_paths: RwLock<HashMap<PathBuf, WatchedPath>>,
150 state: Mutex<Option<ServiceState>>,
152}
153
154impl FileWatcherService {
155 pub fn new(config_watcher: SharedConfigWatcher) -> Self {
157 Self::with_config(config_watcher, FileWatcherConfig::default())
158 }
159
160 pub fn with_config(config_watcher: SharedConfigWatcher, config: FileWatcherConfig) -> Self {
162 Self {
163 config_watcher,
164 config,
165 watched_paths: RwLock::new(HashMap::new()),
166 state: Mutex::new(Some(ServiceState::Stopped)),
167 }
168 }
169
170 pub fn config_watcher(&self) -> &SharedConfigWatcher {
172 &self.config_watcher
173 }
174
175 pub fn config(&self) -> &FileWatcherConfig {
177 &self.config
178 }
179
180 pub fn is_running(&self) -> bool {
182 matches!(
183 self.state.lock().as_ref(),
184 Some(ServiceState::Running { .. })
185 )
186 }
187
188 pub fn watch(&self, source: ConfigSource) -> Result<()> {
190 let path = source.path().clone();
191
192 if !path.exists() {
194 tracing::warn!(path = %path.display(), "Watching non-existent path");
195 }
196
197 if !self.config.should_watch(&path) {
199 return Err(Error::Config(format!(
200 "Path extension not in allowed list: {}",
201 path.display()
202 )));
203 }
204
205 self.config_watcher.register(source.clone());
207
208 let mut paths = self.watched_paths.write();
210 paths.insert(
211 path.clone(),
212 WatchedPath {
213 source,
214 last_event: None,
215 },
216 );
217
218 if let Some(ServiceState::Running { watcher, .. }) = self.state.lock().as_mut() {
220 let mode = if self.config.recursive {
221 RecursiveMode::Recursive
222 } else {
223 RecursiveMode::NonRecursive
224 };
225
226 watcher
227 .watch(&path, mode)
228 .map_err(|e| Error::Internal(format!("Failed to watch path: {}", e)))?;
229 }
230
231 tracing::debug!(path = %path.display(), "Added path to watch list");
232 Ok(())
233 }
234
235 pub fn unwatch(&self, path: &Path) -> Result<()> {
237 self.config_watcher.unregister(&path.to_path_buf());
239
240 self.watched_paths.write().remove(path);
242
243 if let Some(ServiceState::Running { watcher, .. }) = self.state.lock().as_mut() {
245 let _ = watcher.unwatch(path);
246 }
247
248 tracing::debug!(path = %path.display(), "Removed path from watch list");
249 Ok(())
250 }
251
252 pub fn watched_paths(&self) -> Vec<PathBuf> {
254 self.watched_paths.read().keys().cloned().collect()
255 }
256
257 pub fn start(&self) -> Result<()> {
259 let mut state = self.state.lock();
260
261 if matches!(state.as_ref(), Some(ServiceState::Running { .. })) {
263 return Err(Error::Engine("File watcher already running".to_string()));
264 }
265
266 let (event_tx, mut event_rx) = mpsc::channel::<NotifyEvent>(256);
268
269 let watcher_tx = event_tx.clone();
271 let notify_config = NotifyConfig::default()
272 .with_poll_interval(Duration::from_millis(self.config.debounce_ms));
273
274 let mut watcher = RecommendedWatcher::new(
275 move |res: std::result::Result<NotifyEvent, notify::Error>| {
276 if let Ok(event) = res {
277 let _ = watcher_tx.blocking_send(event);
278 }
279 },
280 notify_config,
281 )
282 .map_err(|e| Error::Internal(format!("Failed to create file watcher: {}", e)))?;
283
284 let mode = if self.config.recursive {
286 RecursiveMode::Recursive
287 } else {
288 RecursiveMode::NonRecursive
289 };
290
291 for path in self.watched_paths.read().keys() {
292 if let Err(e) = watcher.watch(path, mode) {
293 tracing::warn!(path = %path.display(), error = %e, "Failed to watch path");
294 }
295 }
296
297 let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
299
300 self.config_watcher.start();
302
303 let config_watcher = self.config_watcher.clone();
305 let watched_paths = Arc::new(self.watched_paths.read().clone());
306 let debounce_ms = self.config.debounce_ms;
307
308 tokio::spawn(async move {
309 let mut last_events: HashMap<PathBuf, Instant> = HashMap::new();
310
311 loop {
312 tokio::select! {
313 _ = &mut shutdown_rx => {
314 tracing::debug!("File watcher shutdown received");
315 break;
316 }
317 Some(event) = event_rx.recv() => {
318 Self::process_notify_event(
319 &config_watcher,
320 &watched_paths,
321 event,
322 &mut last_events,
323 debounce_ms,
324 );
325 }
326 }
327 }
328 });
329
330 *state = Some(ServiceState::Running {
331 watcher,
332 shutdown_tx,
333 });
334
335 tracing::info!("File watcher service started");
336 Ok(())
337 }
338
339 pub fn stop(&self) -> Result<()> {
341 let mut state = self.state.lock();
342
343 match state.take() {
344 Some(ServiceState::Running { shutdown_tx, .. }) => {
345 let _ = shutdown_tx.send(());
347
348 self.config_watcher.stop();
350
351 *state = Some(ServiceState::Stopped);
352 tracing::info!("File watcher service stopped");
353 Ok(())
354 }
355 Some(ServiceState::Stopped) => {
356 *state = Some(ServiceState::Stopped);
357 Err(Error::Engine("File watcher not running".to_string()))
358 }
359 None => Err(Error::Engine("File watcher in invalid state".to_string())),
360 }
361 }
362
363 fn process_notify_event(
365 config_watcher: &ConfigWatcher,
366 watched_paths: &HashMap<PathBuf, WatchedPath>,
367 event: NotifyEvent,
368 last_events: &mut HashMap<PathBuf, Instant>,
369 debounce_ms: u64,
370 ) {
371 let relevant = matches!(
373 event.kind,
374 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
375 );
376
377 if !relevant {
378 return;
379 }
380
381 let now = Instant::now();
382 let debounce = Duration::from_millis(debounce_ms);
383
384 for path in event.paths {
385 if let Some(last) = last_events.get(&path) {
387 if now.duration_since(*last) < debounce {
388 continue;
389 }
390 }
391 last_events.insert(path.clone(), now);
392
393 let source = if let Some(watched) = watched_paths.get(&path) {
395 watched.source.clone()
396 } else {
397 watched_paths
399 .iter()
400 .find(|(watched_path, _)| {
401 watched_path.is_dir() && path.starts_with(watched_path)
402 })
403 .map(|_| ConfigSource::Custom {
404 name: "file".to_string(),
405 path: path.clone(),
406 })
407 .unwrap_or_else(|| ConfigSource::Custom {
408 name: "unknown".to_string(),
409 path: path.clone(),
410 })
411 };
412
413 let config_event = match event.kind {
415 EventKind::Create(_) => ConfigEvent::Created {
416 source,
417 timestamp: Instant::now(),
418 },
419 EventKind::Modify(_) => ConfigEvent::Modified {
420 source,
421 timestamp: Instant::now(),
422 },
423 EventKind::Remove(_) => ConfigEvent::Deleted {
424 source,
425 timestamp: Instant::now(),
426 },
427 _ => continue,
428 };
429
430 config_watcher.emit(config_event);
431 }
432 }
433}
434
435impl Drop for FileWatcherService {
436 fn drop(&mut self) {
437 let _ = self.stop();
438 }
439}
440
441pub struct FileWatcherServiceBuilder {
443 config_watcher: Option<SharedConfigWatcher>,
444 config: FileWatcherConfig,
445 initial_paths: Vec<ConfigSource>,
446}
447
448impl FileWatcherServiceBuilder {
449 pub fn new() -> Self {
451 Self {
452 config_watcher: None,
453 config: FileWatcherConfig::default(),
454 initial_paths: Vec::new(),
455 }
456 }
457
458 pub fn config_watcher(mut self, watcher: SharedConfigWatcher) -> Self {
460 self.config_watcher = Some(watcher);
461 self
462 }
463
464 pub fn debounce_ms(mut self, ms: u64) -> Self {
466 self.config.debounce_ms = ms;
467 self
468 }
469
470 pub fn recursive(mut self, recursive: bool) -> Self {
472 self.config.recursive = recursive;
473 self
474 }
475
476 pub fn extensions(mut self, exts: Vec<String>) -> Self {
478 self.config.extensions = exts;
479 self
480 }
481
482 pub fn watch(mut self, source: ConfigSource) -> Self {
484 self.initial_paths.push(source);
485 self
486 }
487
488 pub fn build(self) -> Result<FileWatcherService> {
490 let config_watcher = self
491 .config_watcher
492 .unwrap_or_else(|| Arc::new(ConfigWatcher::new()));
493
494 let service = FileWatcherService::with_config(config_watcher, self.config);
495
496 for source in self.initial_paths {
497 service.watch(source)?;
498 }
499
500 Ok(service)
501 }
502}
503
504impl Default for FileWatcherServiceBuilder {
505 fn default() -> Self {
506 Self::new()
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use std::fs;
514 use tempfile::tempdir;
515
516 #[test]
517 fn test_file_watcher_config_default() {
518 let config = FileWatcherConfig::default();
519 assert_eq!(config.debounce_ms, DEFAULT_DEBOUNCE_MS);
520 assert!(!config.recursive);
521 assert!(config.extensions.contains(&"yaml".to_string()));
522 }
523
524 #[test]
525 fn test_file_watcher_config_should_watch() {
526 let config = FileWatcherConfig::default();
527
528 assert!(config.should_watch(Path::new("config.yaml")));
529 assert!(config.should_watch(Path::new("config.yml")));
530 assert!(config.should_watch(Path::new("config.json")));
531 assert!(config.should_watch(Path::new("config.toml")));
532 assert!(!config.should_watch(Path::new("config.txt")));
533
534 let all_config = FileWatcherConfig::all_files();
535 assert!(all_config.should_watch(Path::new("config.txt")));
536 }
537
538 #[test]
539 fn test_file_watcher_service_creation() {
540 let config_watcher = Arc::new(ConfigWatcher::new());
541 let service = FileWatcherService::new(config_watcher);
542
543 assert!(!service.is_running());
544 assert!(service.watched_paths().is_empty());
545 }
546
547 #[test]
548 fn test_file_watcher_service_watch() {
549 let config_watcher = Arc::new(ConfigWatcher::new());
550 let service = FileWatcherService::new(config_watcher);
551
552 let source = ConfigSource::Main(PathBuf::from("test.yaml"));
553 service.watch(source).unwrap();
554
555 assert!(service
556 .watched_paths()
557 .contains(&PathBuf::from("test.yaml")));
558 }
559
560 #[test]
561 fn test_file_watcher_service_unwatch() {
562 let config_watcher = Arc::new(ConfigWatcher::new());
563 let service = FileWatcherService::new(config_watcher);
564
565 let path = PathBuf::from("test.yaml");
566 let source = ConfigSource::Main(path.clone());
567 service.watch(source).unwrap();
568 service.unwatch(&path).unwrap();
569
570 assert!(!service.watched_paths().contains(&path));
571 }
572
573 #[test]
574 fn test_file_watcher_service_builder() {
575 let service = FileWatcherServiceBuilder::new()
576 .debounce_ms(200)
577 .recursive(true)
578 .watch(ConfigSource::Main(PathBuf::from("config.yaml")))
579 .build()
580 .unwrap();
581
582 assert_eq!(service.config().debounce_ms, 200);
583 assert!(service.config().recursive);
584 assert!(service.watched_paths().contains(&PathBuf::from("config.yaml")));
585 }
586
587 #[tokio::test]
588 async fn test_file_watcher_start_stop() {
589 let config_watcher = Arc::new(ConfigWatcher::new());
590 let service = FileWatcherService::new(config_watcher);
591
592 service.start().unwrap();
593 assert!(service.is_running());
594
595 service.stop().unwrap();
596 assert!(!service.is_running());
597 }
598
599 #[tokio::test]
600 async fn test_file_watcher_double_start() {
601 let config_watcher = Arc::new(ConfigWatcher::new());
602 let service = FileWatcherService::new(config_watcher);
603
604 service.start().unwrap();
605 let result = service.start();
606
607 assert!(result.is_err());
608 service.stop().unwrap();
609 }
610
611 #[tokio::test]
612 async fn test_file_watcher_events() {
613 let dir = tempdir().unwrap();
614 let config_path = dir.path().join("config.yaml");
615 fs::write(&config_path, "initial: content\n").unwrap();
616
617 let config_watcher = Arc::new(ConfigWatcher::new());
618 let mut rx = config_watcher.subscribe();
619
620 let service = FileWatcherService::new(config_watcher.clone());
621 service
622 .watch(ConfigSource::Main(config_path.clone()))
623 .unwrap();
624 service.start().unwrap();
625
626 tokio::time::sleep(Duration::from_millis(50)).await;
628
629 fs::write(&config_path, "modified: content\n").unwrap();
631
632 let event = tokio::time::timeout(Duration::from_secs(2), rx.recv()).await;
634
635 service.stop().unwrap();
636
637 assert!(event.is_ok() || event.is_err()); }
641}