Skip to main content

harn_hostlib/fs_watch/
mod.rs

1//! File-system watch host capability.
2//!
3//! Wraps `notify` to deliver coalesced file-change batches into Harn's
4//! session-scoped `AgentEvent` stream.
5
6use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{mpsc, Mutex, OnceLock};
10use std::thread;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use globset::{Glob, GlobSet, GlobSetBuilder};
14use harn_vm::agent_events::{AgentEvent, FsWatchEvent};
15use harn_vm::VmValue;
16use ignore::gitignore::{Gitignore, GitignoreBuilder};
17use notify::event::{ModifyKind, RenameMode};
18use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
19
20use crate::error::HostlibError;
21use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
22use crate::tools::args::{
23    build_dict, dict_arg, optional_bool, optional_int, optional_string, str_value,
24};
25use crate::value_args::optional_string_list;
26
27const SUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_subscribe";
28const UNSUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_unsubscribe";
29const DEFAULT_DEBOUNCE_MS: u64 = 50;
30const DEFAULT_KINDS: &[&str] = &["create", "modify", "remove", "rename"];
31const SUPPORTED_KINDS: &[&str] = &["access", "create", "modify", "other", "remove", "rename"];
32
33static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
34
35/// File-watch capability handle.
36#[derive(Default)]
37pub struct FsWatchCapability;
38
39impl HostlibCapability for FsWatchCapability {
40    fn module_name(&self) -> &'static str {
41        "fs_watch"
42    }
43
44    fn register_builtins(&self, registry: &mut BuiltinRegistry) {
45        registry.register(RegisteredBuiltin {
46            name: SUBSCRIBE_BUILTIN,
47            module: "fs_watch",
48            method: "subscribe",
49            handler: subscribe_handler(),
50        });
51        registry.register(RegisteredBuiltin {
52            name: UNSUBSCRIBE_BUILTIN,
53            module: "fs_watch",
54            method: "unsubscribe",
55            handler: unsubscribe_handler(),
56        });
57    }
58}
59
60fn subscribe_handler() -> SyncHandler {
61    std::sync::Arc::new(subscribe)
62}
63
64fn unsubscribe_handler() -> SyncHandler {
65    std::sync::Arc::new(unsubscribe)
66}
67
68struct Subscription {
69    _watcher: RecommendedWatcher,
70    stop_tx: mpsc::Sender<WatchMessage>,
71    worker: Option<thread::JoinHandle<()>>,
72}
73
74impl Drop for Subscription {
75    fn drop(&mut self) {
76        let _ = self.stop_tx.send(WatchMessage::Stop);
77        if let Some(worker) = self.worker.take() {
78            let _ = worker.join();
79        }
80    }
81}
82
83enum WatchMessage {
84    Event(Event),
85    Error(String),
86    Stop,
87}
88
89#[derive(Clone)]
90struct WatchFilter {
91    session_id: String,
92    subscription_id: String,
93    root: PathBuf,
94    globs: Option<GlobSet>,
95    gitignore: Option<Gitignore>,
96    kinds: BTreeSet<String>,
97}
98
99fn subscriptions() -> &'static Mutex<HashMap<String, Subscription>> {
100    static SUBSCRIPTIONS: OnceLock<Mutex<HashMap<String, Subscription>>> = OnceLock::new();
101    SUBSCRIPTIONS.get_or_init(|| Mutex::new(HashMap::new()))
102}
103
104fn subscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
105    let raw = dict_arg(SUBSCRIBE_BUILTIN, args)?;
106    let dict = raw.as_ref();
107    let request = SubscribeRequest::from_dict(dict)?;
108    let subscription_id = next_subscription_id();
109    let (tx, rx) = mpsc::channel();
110
111    let filter = WatchFilter {
112        session_id: request.session_id.clone(),
113        subscription_id: subscription_id.clone(),
114        root: request.root.clone(),
115        globs: request.globs,
116        gitignore: request.gitignore,
117        kinds: request.kinds,
118    };
119    let debounce = Duration::from_millis(request.debounce_ms);
120    let worker = thread::Builder::new()
121        .name(format!("harn-fs-watch-{subscription_id}"))
122        .spawn(move || watch_worker(rx, debounce, filter))
123        .map_err(|err| HostlibError::Backend {
124            builtin: SUBSCRIBE_BUILTIN,
125            message: format!("failed to spawn watch worker: {err}"),
126        })?;
127
128    let notify_tx = tx.clone();
129    let mut watcher = notify::recommended_watcher(move |result: notify::Result<Event>| {
130        let message = match result {
131            Ok(event) => WatchMessage::Event(event),
132            Err(err) => WatchMessage::Error(err.to_string()),
133        };
134        let _ = notify_tx.send(message);
135    })
136    .map_err(|err| HostlibError::Backend {
137        builtin: SUBSCRIBE_BUILTIN,
138        message: format!("failed to create watcher: {err}"),
139    })?;
140
141    let mode = if request.recursive {
142        RecursiveMode::Recursive
143    } else {
144        RecursiveMode::NonRecursive
145    };
146    for path in &request.watch_paths {
147        watcher
148            .watch(path, mode)
149            .map_err(|err| HostlibError::Backend {
150                builtin: SUBSCRIBE_BUILTIN,
151                message: format!("failed to watch {}: {err}", path.display()),
152            })?;
153    }
154
155    subscriptions()
156        .lock()
157        .expect("fs_watch mutex poisoned")
158        .insert(
159            subscription_id.clone(),
160            Subscription {
161                _watcher: watcher,
162                stop_tx: tx,
163                worker: Some(worker),
164            },
165        );
166
167    Ok(build_dict([(
168        "subscription_id",
169        str_value(subscription_id.as_str()),
170    )]))
171}
172
173fn unsubscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
174    let raw = dict_arg(UNSUBSCRIBE_BUILTIN, args)?;
175    let dict = raw.as_ref();
176    let subscription_id = match dict.get("subscription_id") {
177        Some(VmValue::String(value)) if !value.trim().is_empty() => value.to_string(),
178        Some(other) => {
179            return Err(HostlibError::InvalidParameter {
180                builtin: UNSUBSCRIBE_BUILTIN,
181                param: "subscription_id",
182                message: format!("expected non-empty string, got {}", other.type_name()),
183            });
184        }
185        None => {
186            return Err(HostlibError::MissingParameter {
187                builtin: UNSUBSCRIBE_BUILTIN,
188                param: "subscription_id",
189            });
190        }
191    };
192    let removed = subscriptions()
193        .lock()
194        .expect("fs_watch mutex poisoned")
195        .remove(&subscription_id)
196        .is_some();
197    Ok(build_dict([("removed", VmValue::Bool(removed))]))
198}
199
200struct SubscribeRequest {
201    session_id: String,
202    root: PathBuf,
203    watch_paths: Vec<PathBuf>,
204    recursive: bool,
205    debounce_ms: u64,
206    globs: Option<GlobSet>,
207    gitignore: Option<Gitignore>,
208    kinds: BTreeSet<String>,
209}
210
211impl SubscribeRequest {
212    fn from_dict(dict: &BTreeMap<String, VmValue>) -> Result<Self, HostlibError> {
213        let root_param = optional_string(SUBSCRIBE_BUILTIN, dict, "root")?;
214        let raw_paths = optional_string_list(SUBSCRIBE_BUILTIN, dict, "paths")?;
215        let raw_globs = optional_string_list(SUBSCRIBE_BUILTIN, dict, "globs")?;
216        let session_id = optional_string(SUBSCRIBE_BUILTIN, dict, "session_id")?
217            .or_else(harn_vm::agent_sessions::current_session_id)
218            .ok_or(HostlibError::MissingParameter {
219                builtin: SUBSCRIBE_BUILTIN,
220                param: "session_id",
221            })?;
222
223        if session_id.trim().is_empty() {
224            return Err(HostlibError::InvalidParameter {
225                builtin: SUBSCRIBE_BUILTIN,
226                param: "session_id",
227                message: "must not be empty".to_string(),
228            });
229        }
230
231        if root_param.is_none() && raw_paths.is_none() {
232            return Err(HostlibError::MissingParameter {
233                builtin: SUBSCRIBE_BUILTIN,
234                param: "root",
235            });
236        }
237
238        let root = match root_param.as_deref() {
239            Some(root) => normalize_existing_path(SUBSCRIBE_BUILTIN, "root", root)?,
240            None => std::env::current_dir().map_err(|err| HostlibError::Backend {
241                builtin: SUBSCRIBE_BUILTIN,
242                message: format!("failed to resolve current directory: {err}"),
243            })?,
244        };
245
246        let raw_paths = raw_paths.unwrap_or_else(|| {
247            root_param
248                .as_ref()
249                .map(|root| vec![root.clone()])
250                .unwrap_or_default()
251        });
252        if raw_paths.is_empty() {
253            return Err(HostlibError::InvalidParameter {
254                builtin: SUBSCRIBE_BUILTIN,
255                param: "paths",
256                message: "must contain at least one path".to_string(),
257            });
258        }
259
260        let mut watch_paths = Vec::with_capacity(raw_paths.len());
261        for path in raw_paths {
262            let path = PathBuf::from(path);
263            let resolved = if path.is_relative() && root_param.is_some() {
264                root.join(path)
265            } else {
266                path
267            };
268            let normalized = normalize_existing_path_buf(SUBSCRIBE_BUILTIN, "paths", &resolved)?;
269            if normalized.strip_prefix(&root).is_err() {
270                return Err(HostlibError::InvalidParameter {
271                    builtin: SUBSCRIBE_BUILTIN,
272                    param: "paths",
273                    message: format!(
274                        "watch path `{}` is outside root `{}`",
275                        normalized.display(),
276                        root.display()
277                    ),
278                });
279            }
280            watch_paths.push(normalized);
281        }
282
283        let recursive = optional_bool(SUBSCRIBE_BUILTIN, dict, "recursive", true)?;
284        let debounce_ms = optional_int(
285            SUBSCRIBE_BUILTIN,
286            dict,
287            "debounce_ms",
288            DEFAULT_DEBOUNCE_MS as i64,
289        )?;
290        if debounce_ms < 0 {
291            return Err(HostlibError::InvalidParameter {
292                builtin: SUBSCRIBE_BUILTIN,
293                param: "debounce_ms",
294                message: "must be >= 0".to_string(),
295            });
296        }
297        let respect_gitignore = optional_bool(SUBSCRIBE_BUILTIN, dict, "respect_gitignore", false)?;
298
299        Ok(Self {
300            session_id,
301            gitignore: if respect_gitignore {
302                Some(build_gitignore(&root))
303            } else {
304                None
305            },
306            globs: build_globs(raw_globs.unwrap_or_default())?,
307            kinds: parse_kinds(dict)?,
308            root,
309            watch_paths,
310            recursive,
311            debounce_ms: debounce_ms as u64,
312        })
313    }
314}
315
316fn watch_worker(rx: mpsc::Receiver<WatchMessage>, debounce: Duration, filter: WatchFilter) {
317    let mut pending = Vec::new();
318    loop {
319        match rx.recv() {
320            Ok(WatchMessage::Event(event)) => {
321                pending.push(event);
322                loop {
323                    match rx.recv_timeout(debounce) {
324                        Ok(WatchMessage::Event(event)) => pending.push(event),
325                        Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
326                        Ok(WatchMessage::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => {
327                            emit_pending(&filter, &mut pending);
328                            return;
329                        }
330                        Err(mpsc::RecvTimeoutError::Timeout) => break,
331                    }
332                }
333                emit_pending(&filter, &mut pending);
334            }
335            Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
336            Ok(WatchMessage::Stop) | Err(_) => return,
337        }
338    }
339}
340
341fn emit_pending(filter: &WatchFilter, pending: &mut Vec<Event>) {
342    if pending.is_empty() {
343        return;
344    }
345    let events = coalesce_events(std::mem::take(pending), filter);
346    if events.is_empty() {
347        return;
348    }
349    harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
350        session_id: filter.session_id.clone(),
351        subscription_id: filter.subscription_id.clone(),
352        events,
353    });
354}
355
356fn emit_watch_error(filter: &WatchFilter, error: String) {
357    harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
358        session_id: filter.session_id.clone(),
359        subscription_id: filter.subscription_id.clone(),
360        events: vec![FsWatchEvent {
361            kind: "error".to_string(),
362            paths: Vec::new(),
363            relative_paths: Vec::new(),
364            raw_kind: "error".to_string(),
365            error: Some(error),
366        }],
367    });
368}
369
370fn coalesce_events(events: Vec<Event>, filter: &WatchFilter) -> Vec<FsWatchEvent> {
371    let mut seen = BTreeSet::new();
372    let mut output = Vec::new();
373    for event in events {
374        let kind = normalize_kind(&event.kind);
375        if !filter.kinds.contains(kind) {
376            continue;
377        }
378        let mut paths = Vec::new();
379        let mut relative_paths = Vec::new();
380        for path in &event.paths {
381            if !filter.matches_path(path) {
382                continue;
383            }
384            paths.push(path_to_string(path));
385            relative_paths.push(filter.relative_path(path));
386        }
387        if paths.is_empty() {
388            continue;
389        }
390        paths.sort();
391        paths.dedup();
392        relative_paths.sort();
393        relative_paths.dedup();
394        let raw_kind = format!("{:?}", event.kind);
395        if !seen.insert((kind.to_string(), paths.clone(), raw_kind.clone())) {
396            continue;
397        }
398        output.push(FsWatchEvent {
399            kind: kind.to_string(),
400            paths,
401            relative_paths,
402            raw_kind,
403            error: None,
404        });
405    }
406    output
407}
408
409impl WatchFilter {
410    fn matches_path(&self, path: &Path) -> bool {
411        if let Some(gitignore) = &self.gitignore {
412            if gitignore.matched(path, path.is_dir()).is_ignore() {
413                return false;
414            }
415        }
416        if let Some(globs) = &self.globs {
417            let relative = self.relative_path(path);
418            return globs.is_match(relative);
419        }
420        true
421    }
422
423    fn relative_path(&self, path: &Path) -> String {
424        let relative = path.strip_prefix(&self.root).unwrap_or(path);
425        let value = path_to_string(relative);
426        if value.is_empty() {
427            ".".to_string()
428        } else {
429            value
430        }
431    }
432}
433
434fn normalize_kind(kind: &EventKind) -> &'static str {
435    match kind {
436        EventKind::Create(_) => "create",
437        EventKind::Remove(_) => "remove",
438        EventKind::Modify(ModifyKind::Name(
439            RenameMode::Any
440            | RenameMode::To
441            | RenameMode::From
442            | RenameMode::Both
443            | RenameMode::Other,
444        )) => "rename",
445        EventKind::Modify(_) | EventKind::Any => "modify",
446        EventKind::Access(_) => "access",
447        EventKind::Other => "other",
448    }
449}
450
451fn parse_kinds(dict: &BTreeMap<String, VmValue>) -> Result<BTreeSet<String>, HostlibError> {
452    let values = optional_string_list(SUBSCRIBE_BUILTIN, dict, "kinds")?.unwrap_or_else(|| {
453        DEFAULT_KINDS
454            .iter()
455            .map(|kind| (*kind).to_string())
456            .collect()
457    });
458    let mut kinds = BTreeSet::new();
459    for kind in values {
460        if SUPPORTED_KINDS.contains(&kind.as_str()) {
461            kinds.insert(kind);
462        } else {
463            return Err(HostlibError::InvalidParameter {
464                builtin: SUBSCRIBE_BUILTIN,
465                param: "kinds",
466                message: format!("unsupported event kind `{kind}`"),
467            });
468        }
469    }
470    Ok(kinds)
471}
472
473fn build_globs(globs: Vec<String>) -> Result<Option<GlobSet>, HostlibError> {
474    if globs.is_empty() {
475        return Ok(None);
476    }
477    let mut builder = GlobSetBuilder::new();
478    for glob in globs {
479        let normalized = normalize_glob(&glob);
480        builder.add(
481            Glob::new(&normalized).map_err(|err| HostlibError::InvalidParameter {
482                builtin: SUBSCRIBE_BUILTIN,
483                param: "globs",
484                message: format!("invalid glob `{glob}`: {err}"),
485            })?,
486        );
487    }
488    Ok(Some(builder.build().map_err(|err| {
489        HostlibError::InvalidParameter {
490            builtin: SUBSCRIBE_BUILTIN,
491            param: "globs",
492            message: format!("invalid glob set: {err}"),
493        }
494    })?))
495}
496
497fn build_gitignore(root: &Path) -> Gitignore {
498    let mut builder = GitignoreBuilder::new(root);
499    let gitignore = root.join(".gitignore");
500    if gitignore.exists() {
501        let _ = builder.add(gitignore);
502    }
503    let exclude = root.join(".git").join("info").join("exclude");
504    if exclude.exists() {
505        let _ = builder.add(exclude);
506    }
507    builder.build().unwrap_or_else(|_| Gitignore::empty())
508}
509
510fn normalize_glob(glob: &str) -> String {
511    let glob = glob.replace('\\', "/");
512    if glob == "*" || glob.starts_with("**/") || glob.contains('/') {
513        glob
514    } else {
515        format!("**/{glob}")
516    }
517}
518
519fn normalize_existing_path(
520    builtin: &'static str,
521    param: &'static str,
522    path: &str,
523) -> Result<PathBuf, HostlibError> {
524    normalize_existing_path_buf(builtin, param, &PathBuf::from(path))
525}
526
527fn normalize_existing_path_buf(
528    builtin: &'static str,
529    param: &'static str,
530    path: &Path,
531) -> Result<PathBuf, HostlibError> {
532    path.canonicalize()
533        .map_err(|err| HostlibError::InvalidParameter {
534            builtin,
535            param,
536            message: format!(
537                "{} does not resolve to an existing path: {err}",
538                path.display()
539            ),
540        })
541}
542
543fn path_to_string(path: &Path) -> String {
544    path.to_string_lossy().replace('\\', "/")
545}
546
547fn next_subscription_id() -> String {
548    let seq = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
549    let millis = SystemTime::now()
550        .duration_since(UNIX_EPOCH)
551        .map(|duration| duration.as_millis())
552        .unwrap_or(0);
553    format!("fsw-{millis}-{seq}")
554}
555
556#[cfg(test)]
557mod tests {
558    use super::*;
559
560    fn event(kind: EventKind, path: impl Into<PathBuf>) -> Event {
561        Event::new(kind).add_path(path.into())
562    }
563
564    fn filter(root: PathBuf, globs: Option<Vec<&str>>) -> WatchFilter {
565        WatchFilter {
566            session_id: "session".to_string(),
567            subscription_id: "sub".to_string(),
568            root,
569            globs: globs.map(|patterns| {
570                build_globs(patterns.into_iter().map(str::to_string).collect())
571                    .unwrap()
572                    .unwrap()
573            }),
574            gitignore: None,
575            kinds: parse_kinds(&BTreeMap::new()).unwrap(),
576        }
577    }
578
579    #[test]
580    fn coalesce_deduplicates_same_kind_and_path() {
581        let root = std::env::current_dir().unwrap();
582        let path = root.join("src/lib.rs");
583        let filter = filter(root, None);
584        let events = coalesce_events(
585            vec![
586                event(EventKind::Modify(ModifyKind::Any), &path),
587                event(EventKind::Modify(ModifyKind::Any), &path),
588            ],
589            &filter,
590        );
591        assert_eq!(events.len(), 1);
592        assert_eq!(events[0].kind, "modify");
593    }
594
595    #[test]
596    fn glob_filter_uses_relative_paths() {
597        let root = std::env::current_dir().unwrap();
598        let filter = filter(root.clone(), Some(vec!["*.rs"]));
599        let events = coalesce_events(
600            vec![
601                event(
602                    EventKind::Create(notify::event::CreateKind::Any),
603                    root.join("src/lib.rs"),
604                ),
605                event(
606                    EventKind::Create(notify::event::CreateKind::Any),
607                    root.join("README.md"),
608                ),
609            ],
610            &filter,
611        );
612        assert_eq!(events.len(), 1);
613        assert_eq!(events[0].relative_paths, vec!["src/lib.rs"]);
614    }
615
616    #[test]
617    fn kind_filter_drops_unrequested_events() {
618        let root = std::env::current_dir().unwrap();
619        let mut filter = filter(root.clone(), None);
620        filter.kinds = BTreeSet::from(["remove".to_string()]);
621
622        let events = coalesce_events(
623            vec![
624                event(
625                    EventKind::Create(notify::event::CreateKind::Any),
626                    root.join("src/lib.rs"),
627                ),
628                event(
629                    EventKind::Remove(notify::event::RemoveKind::Any),
630                    root.join("src/lib.rs"),
631                ),
632            ],
633            &filter,
634        );
635
636        assert_eq!(events.len(), 1);
637        assert_eq!(events[0].kind, "remove");
638    }
639
640    #[test]
641    fn kind_filter_allows_access_and_other_events() {
642        let root = std::env::current_dir().unwrap();
643        let mut config = BTreeMap::new();
644        config.insert(
645            "kinds".to_string(),
646            VmValue::List(std::sync::Arc::new(vec![
647                VmValue::String(std::sync::Arc::from("access")),
648                VmValue::String(std::sync::Arc::from("other")),
649            ])),
650        );
651        let mut filter = filter(root.clone(), None);
652        filter.kinds = parse_kinds(&config).unwrap();
653
654        let events = coalesce_events(
655            vec![
656                event(
657                    EventKind::Access(notify::event::AccessKind::Any),
658                    root.join("src/lib.rs"),
659                ),
660                event(EventKind::Other, root.join("README.md")),
661            ],
662            &filter,
663        );
664
665        assert_eq!(events.len(), 2);
666        assert_eq!(events[0].kind, "access");
667        assert_eq!(events[1].kind, "other");
668    }
669
670    #[test]
671    fn gitignore_filter_drops_ignored_paths() {
672        let temp = tempfile::tempdir().unwrap();
673        std::fs::write(temp.path().join(".gitignore"), "ignored.txt\n").unwrap();
674        let mut filter = filter(temp.path().to_path_buf(), None);
675        filter.gitignore = Some(build_gitignore(temp.path()));
676
677        let events = coalesce_events(
678            vec![
679                event(
680                    EventKind::Modify(ModifyKind::Any),
681                    temp.path().join("allowed.txt"),
682                ),
683                event(
684                    EventKind::Modify(ModifyKind::Any),
685                    temp.path().join("ignored.txt"),
686                ),
687            ],
688            &filter,
689        );
690
691        assert_eq!(events.len(), 1);
692        assert_eq!(events[0].relative_paths, vec!["allowed.txt"]);
693    }
694}