devalang_wasm/services/watch/file/
mod.rs1#![cfg(feature = "cli")]
2
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
9use tokio::sync::mpsc::{self, Receiver};
10
11use crate::tools::logger::Logger;
12
13#[derive(Debug, Clone)]
14pub struct WatchOptions {
15 pub debounce: Duration,
16 pub poll_interval: Duration,
17}
18
19impl Default for WatchOptions {
20 fn default() -> Self {
21 Self {
22 debounce: Duration::from_millis(150),
23 poll_interval: Duration::from_millis(500),
24 }
25 }
26}
27
28pub struct FileWatcher {
29 logger: Arc<Logger>,
30}
31
32impl FileWatcher {
33 pub fn new(logger: Arc<Logger>) -> Self {
34 Self { logger }
35 }
36
37 #[allow(clippy::unused_async)]
38 pub async fn watch(&self, path: PathBuf, options: WatchOptions) -> Result<FileWatchStream> {
39 let (tx, rx) = mpsc::channel(32);
40 let config = Config::default().with_poll_interval(options.poll_interval);
41 let fallback_path = path.clone();
42 let logger = self.logger.clone();
43
44 let mut watcher = RecommendedWatcher::new(
45 move |res: notify::Result<Event>| match res {
46 Ok(event) => {
47 if !is_relevant(&event.kind) {
48 return;
49 }
50 let target = event
51 .paths
52 .first()
53 .cloned()
54 .unwrap_or_else(|| fallback_path.clone());
55 let send_result = tx.blocking_send(target);
56 if let Err(err) = send_result {
57 logger.warn(format!(
58 "Watch channel dropped before event could be delivered: {err}"
59 ));
60 }
61 }
62 Err(err) => {
63 logger.error(format!("Watch error: {err}"));
64 }
65 },
66 config,
67 )?;
68
69 watcher
70 .watch(Path::new(&path), RecursiveMode::NonRecursive)
71 .with_context(|| format!("failed to watch {}", path.display()))?;
72
73 Ok(FileWatchStream::new(
74 path,
75 rx,
76 watcher,
77 self.logger.clone(),
78 options.debounce,
79 ))
80 }
81}
82
83fn is_relevant(kind: &EventKind) -> bool {
84 matches!(
85 kind,
86 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) | EventKind::Any
87 )
88}
89
90pub struct FileWatchStream {
91 #[allow(dead_code)]
92 path: PathBuf,
93 receiver: Receiver<PathBuf>,
94 #[allow(dead_code)]
95 watcher: RecommendedWatcher,
96 logger: Arc<Logger>,
97 debounce: Duration,
98 last_emit: Option<Instant>,
99}
100
101impl FileWatchStream {
102 fn new(
103 path: PathBuf,
104 receiver: Receiver<PathBuf>,
105 watcher: RecommendedWatcher,
106 logger: Arc<Logger>,
107 debounce: Duration,
108 ) -> Self {
109 Self {
110 path,
111 receiver,
112 watcher,
113 logger,
114 debounce,
115 last_emit: None,
116 }
117 }
118
119 pub async fn next_change(&mut self) -> Option<PathBuf> {
120 while let Some(path) = self.receiver.recv().await {
121 let now = Instant::now();
122 if let Some(last) = self.last_emit {
123 if now.duration_since(last) < self.debounce {
124 continue;
125 }
126 }
127 self.last_emit = Some(now);
128 self.logger
129 .watch(format!("Change detected: {}", path.display()));
130 return Some(path);
131 }
132 None
133 }
134}