devalang_wasm/services/watch/file/
mod.rs

1#![cfg(feature = "cli")]
2
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
9use tokio::sync::mpsc::{self, Receiver};
10
11use crate::tools::logger::Logger;
12
13#[derive(Debug, Clone)]
14pub struct WatchOptions {
15    pub debounce: Duration,
16    pub poll_interval: Duration,
17}
18
19impl Default for WatchOptions {
20    fn default() -> Self {
21        Self {
22            debounce: Duration::from_millis(150),
23            poll_interval: Duration::from_millis(500),
24        }
25    }
26}
27
28pub struct FileWatcher {
29    logger: Arc<Logger>,
30}
31
32impl FileWatcher {
33    pub fn new(logger: Arc<Logger>) -> Self {
34        Self { logger }
35    }
36
37    #[allow(clippy::unused_async)]
38    pub async fn watch(&self, path: PathBuf, options: WatchOptions) -> Result<FileWatchStream> {
39        let (tx, rx) = mpsc::channel(32);
40        let config = Config::default().with_poll_interval(options.poll_interval);
41        let fallback_path = path.clone();
42        let logger = self.logger.clone();
43
44        let mut watcher = RecommendedWatcher::new(
45            move |res: notify::Result<Event>| match res {
46                Ok(event) => {
47                    if !is_relevant(&event.kind) {
48                        return;
49                    }
50                    let target = event
51                        .paths
52                        .first()
53                        .cloned()
54                        .unwrap_or_else(|| fallback_path.clone());
55                    let send_result = tx.blocking_send(target);
56                    if let Err(err) = send_result {
57                        logger.warn(format!(
58                            "Watch channel dropped before event could be delivered: {err}"
59                        ));
60                    }
61                }
62                Err(err) => {
63                    logger.error(format!("Watch error: {err}"));
64                }
65            },
66            config,
67        )?;
68
69        watcher
70            .watch(Path::new(&path), RecursiveMode::NonRecursive)
71            .with_context(|| format!("failed to watch {}", path.display()))?;
72
73        Ok(FileWatchStream::new(
74            path,
75            rx,
76            watcher,
77            self.logger.clone(),
78            options.debounce,
79        ))
80    }
81}
82
83fn is_relevant(kind: &EventKind) -> bool {
84    matches!(
85        kind,
86        EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) | EventKind::Any
87    )
88}
89
90pub struct FileWatchStream {
91    #[allow(dead_code)]
92    path: PathBuf,
93    receiver: Receiver<PathBuf>,
94    #[allow(dead_code)]
95    watcher: RecommendedWatcher,
96    logger: Arc<Logger>,
97    debounce: Duration,
98    last_emit: Option<Instant>,
99}
100
101impl FileWatchStream {
102    fn new(
103        path: PathBuf,
104        receiver: Receiver<PathBuf>,
105        watcher: RecommendedWatcher,
106        logger: Arc<Logger>,
107        debounce: Duration,
108    ) -> Self {
109        Self {
110            path,
111            receiver,
112            watcher,
113            logger,
114            debounce,
115            last_emit: None,
116        }
117    }
118
119    pub async fn next_change(&mut self) -> Option<PathBuf> {
120        while let Some(path) = self.receiver.recv().await {
121            let now = Instant::now();
122            if let Some(last) = self.last_emit {
123                if now.duration_since(last) < self.debounce {
124                    continue;
125                }
126            }
127            self.last_emit = Some(now);
128            self.logger
129                .watch(format!("Change detected: {}", path.display()));
130            return Some(path);
131        }
132        None
133    }
134}