Skip to main content

harn_hostlib/fs_watch/
mod.rs

1//! File-system watch host capability.
2//!
3//! Wraps `notify` to deliver coalesced file-change batches into Harn's
4//! session-scoped `AgentEvent` stream.
5
6use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{mpsc, Mutex, OnceLock};
10use std::thread;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use globset::{Glob, GlobSet, GlobSetBuilder};
14use harn_vm::agent_events::{AgentEvent, FsWatchEvent};
15use harn_vm::VmValue;
16use ignore::gitignore::{Gitignore, GitignoreBuilder};
17use notify::event::{ModifyKind, RenameMode};
18use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
19
20use crate::error::HostlibError;
21use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
22use crate::tools::args::{
23    build_dict, dict_arg, optional_bool, optional_int, optional_string, str_value,
24};
25
26const SUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_subscribe";
27const UNSUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_unsubscribe";
28const DEFAULT_DEBOUNCE_MS: u64 = 50;
29const DEFAULT_KINDS: &[&str] = &["create", "modify", "remove", "rename"];
30const SUPPORTED_KINDS: &[&str] = &["access", "create", "modify", "other", "remove", "rename"];
31
32static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
33
34/// File-watch capability handle.
35#[derive(Default)]
36pub struct FsWatchCapability;
37
38impl HostlibCapability for FsWatchCapability {
39    fn module_name(&self) -> &'static str {
40        "fs_watch"
41    }
42
43    fn register_builtins(&self, registry: &mut BuiltinRegistry) {
44        registry.register(RegisteredBuiltin {
45            name: SUBSCRIBE_BUILTIN,
46            module: "fs_watch",
47            method: "subscribe",
48            handler: subscribe_handler(),
49        });
50        registry.register(RegisteredBuiltin {
51            name: UNSUBSCRIBE_BUILTIN,
52            module: "fs_watch",
53            method: "unsubscribe",
54            handler: unsubscribe_handler(),
55        });
56    }
57}
58
59fn subscribe_handler() -> SyncHandler {
60    std::sync::Arc::new(subscribe)
61}
62
63fn unsubscribe_handler() -> SyncHandler {
64    std::sync::Arc::new(unsubscribe)
65}
66
67struct Subscription {
68    _watcher: RecommendedWatcher,
69    stop_tx: mpsc::Sender<WatchMessage>,
70    worker: Option<thread::JoinHandle<()>>,
71}
72
73impl Drop for Subscription {
74    fn drop(&mut self) {
75        let _ = self.stop_tx.send(WatchMessage::Stop);
76        if let Some(worker) = self.worker.take() {
77            let _ = worker.join();
78        }
79    }
80}
81
82enum WatchMessage {
83    Event(Event),
84    Error(String),
85    Stop,
86}
87
88#[derive(Clone)]
89struct WatchFilter {
90    session_id: String,
91    subscription_id: String,
92    root: PathBuf,
93    globs: Option<GlobSet>,
94    gitignore: Option<Gitignore>,
95    kinds: BTreeSet<String>,
96}
97
98fn subscriptions() -> &'static Mutex<HashMap<String, Subscription>> {
99    static SUBSCRIPTIONS: OnceLock<Mutex<HashMap<String, Subscription>>> = OnceLock::new();
100    SUBSCRIPTIONS.get_or_init(|| Mutex::new(HashMap::new()))
101}
102
103fn subscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
104    let raw = dict_arg(SUBSCRIBE_BUILTIN, args)?;
105    let dict = raw.as_ref();
106    let request = SubscribeRequest::from_dict(dict)?;
107    let subscription_id = next_subscription_id();
108    let (tx, rx) = mpsc::channel();
109
110    let filter = WatchFilter {
111        session_id: request.session_id.clone(),
112        subscription_id: subscription_id.clone(),
113        root: request.root.clone(),
114        globs: request.globs,
115        gitignore: request.gitignore,
116        kinds: request.kinds,
117    };
118    let debounce = Duration::from_millis(request.debounce_ms);
119    let worker = thread::Builder::new()
120        .name(format!("harn-fs-watch-{subscription_id}"))
121        .spawn(move || watch_worker(rx, debounce, filter))
122        .map_err(|err| HostlibError::Backend {
123            builtin: SUBSCRIBE_BUILTIN,
124            message: format!("failed to spawn watch worker: {err}"),
125        })?;
126
127    let notify_tx = tx.clone();
128    let mut watcher = notify::recommended_watcher(move |result: notify::Result<Event>| {
129        let message = match result {
130            Ok(event) => WatchMessage::Event(event),
131            Err(err) => WatchMessage::Error(err.to_string()),
132        };
133        let _ = notify_tx.send(message);
134    })
135    .map_err(|err| HostlibError::Backend {
136        builtin: SUBSCRIBE_BUILTIN,
137        message: format!("failed to create watcher: {err}"),
138    })?;
139
140    let mode = if request.recursive {
141        RecursiveMode::Recursive
142    } else {
143        RecursiveMode::NonRecursive
144    };
145    for path in &request.watch_paths {
146        watcher
147            .watch(path, mode)
148            .map_err(|err| HostlibError::Backend {
149                builtin: SUBSCRIBE_BUILTIN,
150                message: format!("failed to watch {}: {err}", path.display()),
151            })?;
152    }
153
154    subscriptions()
155        .lock()
156        .expect("fs_watch mutex poisoned")
157        .insert(
158            subscription_id.clone(),
159            Subscription {
160                _watcher: watcher,
161                stop_tx: tx,
162                worker: Some(worker),
163            },
164        );
165
166    Ok(build_dict([(
167        "subscription_id",
168        str_value(subscription_id.as_str()),
169    )]))
170}
171
172fn unsubscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
173    let raw = dict_arg(UNSUBSCRIBE_BUILTIN, args)?;
174    let dict = raw.as_ref();
175    let subscription_id = match dict.get("subscription_id") {
176        Some(VmValue::String(value)) if !value.trim().is_empty() => value.to_string(),
177        Some(other) => {
178            return Err(HostlibError::InvalidParameter {
179                builtin: UNSUBSCRIBE_BUILTIN,
180                param: "subscription_id",
181                message: format!("expected non-empty string, got {}", other.type_name()),
182            });
183        }
184        None => {
185            return Err(HostlibError::MissingParameter {
186                builtin: UNSUBSCRIBE_BUILTIN,
187                param: "subscription_id",
188            });
189        }
190    };
191    let removed = subscriptions()
192        .lock()
193        .expect("fs_watch mutex poisoned")
194        .remove(&subscription_id)
195        .is_some();
196    Ok(build_dict([("removed", VmValue::Bool(removed))]))
197}
198
199struct SubscribeRequest {
200    session_id: String,
201    root: PathBuf,
202    watch_paths: Vec<PathBuf>,
203    recursive: bool,
204    debounce_ms: u64,
205    globs: Option<GlobSet>,
206    gitignore: Option<Gitignore>,
207    kinds: BTreeSet<String>,
208}
209
210impl SubscribeRequest {
211    fn from_dict(dict: &BTreeMap<String, VmValue>) -> Result<Self, HostlibError> {
212        let root_param = optional_string(SUBSCRIBE_BUILTIN, dict, "root")?;
213        let raw_paths = optional_string_list(SUBSCRIBE_BUILTIN, dict, "paths")?;
214        let raw_globs = optional_string_list(SUBSCRIBE_BUILTIN, dict, "globs")?;
215        let session_id = optional_string(SUBSCRIBE_BUILTIN, dict, "session_id")?
216            .or_else(harn_vm::agent_sessions::current_session_id)
217            .ok_or(HostlibError::MissingParameter {
218                builtin: SUBSCRIBE_BUILTIN,
219                param: "session_id",
220            })?;
221
222        if session_id.trim().is_empty() {
223            return Err(HostlibError::InvalidParameter {
224                builtin: SUBSCRIBE_BUILTIN,
225                param: "session_id",
226                message: "must not be empty".to_string(),
227            });
228        }
229
230        if root_param.is_none() && raw_paths.is_none() {
231            return Err(HostlibError::MissingParameter {
232                builtin: SUBSCRIBE_BUILTIN,
233                param: "root",
234            });
235        }
236
237        let root = match root_param.as_deref() {
238            Some(root) => normalize_existing_path(SUBSCRIBE_BUILTIN, "root", root)?,
239            None => std::env::current_dir().map_err(|err| HostlibError::Backend {
240                builtin: SUBSCRIBE_BUILTIN,
241                message: format!("failed to resolve current directory: {err}"),
242            })?,
243        };
244
245        let raw_paths = raw_paths.unwrap_or_else(|| {
246            root_param
247                .as_ref()
248                .map(|root| vec![root.clone()])
249                .unwrap_or_default()
250        });
251        if raw_paths.is_empty() {
252            return Err(HostlibError::InvalidParameter {
253                builtin: SUBSCRIBE_BUILTIN,
254                param: "paths",
255                message: "must contain at least one path".to_string(),
256            });
257        }
258
259        let mut watch_paths = Vec::with_capacity(raw_paths.len());
260        for path in raw_paths {
261            let path = PathBuf::from(path);
262            let resolved = if path.is_relative() && root_param.is_some() {
263                root.join(path)
264            } else {
265                path
266            };
267            let normalized = normalize_existing_path_buf(SUBSCRIBE_BUILTIN, "paths", &resolved)?;
268            if normalized.strip_prefix(&root).is_err() {
269                return Err(HostlibError::InvalidParameter {
270                    builtin: SUBSCRIBE_BUILTIN,
271                    param: "paths",
272                    message: format!(
273                        "watch path `{}` is outside root `{}`",
274                        normalized.display(),
275                        root.display()
276                    ),
277                });
278            }
279            watch_paths.push(normalized);
280        }
281
282        let recursive = optional_bool(SUBSCRIBE_BUILTIN, dict, "recursive", true)?;
283        let debounce_ms = optional_int(
284            SUBSCRIBE_BUILTIN,
285            dict,
286            "debounce_ms",
287            DEFAULT_DEBOUNCE_MS as i64,
288        )?;
289        if debounce_ms < 0 {
290            return Err(HostlibError::InvalidParameter {
291                builtin: SUBSCRIBE_BUILTIN,
292                param: "debounce_ms",
293                message: "must be >= 0".to_string(),
294            });
295        }
296        let respect_gitignore = optional_bool(SUBSCRIBE_BUILTIN, dict, "respect_gitignore", false)?;
297
298        Ok(Self {
299            session_id,
300            gitignore: if respect_gitignore {
301                Some(build_gitignore(&root))
302            } else {
303                None
304            },
305            globs: build_globs(raw_globs.unwrap_or_default())?,
306            kinds: parse_kinds(dict)?,
307            root,
308            watch_paths,
309            recursive,
310            debounce_ms: debounce_ms as u64,
311        })
312    }
313}
314
315fn watch_worker(rx: mpsc::Receiver<WatchMessage>, debounce: Duration, filter: WatchFilter) {
316    let mut pending = Vec::new();
317    loop {
318        match rx.recv() {
319            Ok(WatchMessage::Event(event)) => {
320                pending.push(event);
321                loop {
322                    match rx.recv_timeout(debounce) {
323                        Ok(WatchMessage::Event(event)) => pending.push(event),
324                        Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
325                        Ok(WatchMessage::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => {
326                            emit_pending(&filter, &mut pending);
327                            return;
328                        }
329                        Err(mpsc::RecvTimeoutError::Timeout) => break,
330                    }
331                }
332                emit_pending(&filter, &mut pending);
333            }
334            Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
335            Ok(WatchMessage::Stop) | Err(_) => return,
336        }
337    }
338}
339
340fn emit_pending(filter: &WatchFilter, pending: &mut Vec<Event>) {
341    if pending.is_empty() {
342        return;
343    }
344    let events = coalesce_events(std::mem::take(pending), filter);
345    if events.is_empty() {
346        return;
347    }
348    harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
349        session_id: filter.session_id.clone(),
350        subscription_id: filter.subscription_id.clone(),
351        events,
352    });
353}
354
355fn emit_watch_error(filter: &WatchFilter, error: String) {
356    harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
357        session_id: filter.session_id.clone(),
358        subscription_id: filter.subscription_id.clone(),
359        events: vec![FsWatchEvent {
360            kind: "error".to_string(),
361            paths: Vec::new(),
362            relative_paths: Vec::new(),
363            raw_kind: "error".to_string(),
364            error: Some(error),
365        }],
366    });
367}
368
369fn coalesce_events(events: Vec<Event>, filter: &WatchFilter) -> Vec<FsWatchEvent> {
370    let mut seen = BTreeSet::new();
371    let mut output = Vec::new();
372    for event in events {
373        let kind = normalize_kind(&event.kind);
374        if !filter.kinds.contains(kind) {
375            continue;
376        }
377        let mut paths = Vec::new();
378        let mut relative_paths = Vec::new();
379        for path in &event.paths {
380            if !filter.matches_path(path) {
381                continue;
382            }
383            paths.push(path_to_string(path));
384            relative_paths.push(filter.relative_path(path));
385        }
386        if paths.is_empty() {
387            continue;
388        }
389        paths.sort();
390        paths.dedup();
391        relative_paths.sort();
392        relative_paths.dedup();
393        let raw_kind = format!("{:?}", event.kind);
394        if !seen.insert((kind.to_string(), paths.clone(), raw_kind.clone())) {
395            continue;
396        }
397        output.push(FsWatchEvent {
398            kind: kind.to_string(),
399            paths,
400            relative_paths,
401            raw_kind,
402            error: None,
403        });
404    }
405    output
406}
407
408impl WatchFilter {
409    fn matches_path(&self, path: &Path) -> bool {
410        if let Some(gitignore) = &self.gitignore {
411            if gitignore.matched(path, path.is_dir()).is_ignore() {
412                return false;
413            }
414        }
415        if let Some(globs) = &self.globs {
416            let relative = self.relative_path(path);
417            return globs.is_match(relative);
418        }
419        true
420    }
421
422    fn relative_path(&self, path: &Path) -> String {
423        let relative = path.strip_prefix(&self.root).unwrap_or(path);
424        let value = path_to_string(relative);
425        if value.is_empty() {
426            ".".to_string()
427        } else {
428            value
429        }
430    }
431}
432
433fn normalize_kind(kind: &EventKind) -> &'static str {
434    match kind {
435        EventKind::Create(_) => "create",
436        EventKind::Remove(_) => "remove",
437        EventKind::Modify(ModifyKind::Name(
438            RenameMode::Any
439            | RenameMode::To
440            | RenameMode::From
441            | RenameMode::Both
442            | RenameMode::Other,
443        )) => "rename",
444        EventKind::Modify(_) | EventKind::Any => "modify",
445        EventKind::Access(_) => "access",
446        EventKind::Other => "other",
447    }
448}
449
450fn parse_kinds(dict: &BTreeMap<String, VmValue>) -> Result<BTreeSet<String>, HostlibError> {
451    let values = optional_string_list(SUBSCRIBE_BUILTIN, dict, "kinds")?.unwrap_or_else(|| {
452        DEFAULT_KINDS
453            .iter()
454            .map(|kind| (*kind).to_string())
455            .collect()
456    });
457    let mut kinds = BTreeSet::new();
458    for kind in values {
459        if SUPPORTED_KINDS.contains(&kind.as_str()) {
460            kinds.insert(kind);
461        } else {
462            return Err(HostlibError::InvalidParameter {
463                builtin: SUBSCRIBE_BUILTIN,
464                param: "kinds",
465                message: format!("unsupported event kind `{kind}`"),
466            });
467        }
468    }
469    Ok(kinds)
470}
471
472fn build_globs(globs: Vec<String>) -> Result<Option<GlobSet>, HostlibError> {
473    if globs.is_empty() {
474        return Ok(None);
475    }
476    let mut builder = GlobSetBuilder::new();
477    for glob in globs {
478        let normalized = normalize_glob(&glob);
479        builder.add(
480            Glob::new(&normalized).map_err(|err| HostlibError::InvalidParameter {
481                builtin: SUBSCRIBE_BUILTIN,
482                param: "globs",
483                message: format!("invalid glob `{glob}`: {err}"),
484            })?,
485        );
486    }
487    Ok(Some(builder.build().map_err(|err| {
488        HostlibError::InvalidParameter {
489            builtin: SUBSCRIBE_BUILTIN,
490            param: "globs",
491            message: format!("invalid glob set: {err}"),
492        }
493    })?))
494}
495
496fn build_gitignore(root: &Path) -> Gitignore {
497    let mut builder = GitignoreBuilder::new(root);
498    let gitignore = root.join(".gitignore");
499    if gitignore.exists() {
500        let _ = builder.add(gitignore);
501    }
502    let exclude = root.join(".git").join("info").join("exclude");
503    if exclude.exists() {
504        let _ = builder.add(exclude);
505    }
506    builder.build().unwrap_or_else(|_| Gitignore::empty())
507}
508
509fn normalize_glob(glob: &str) -> String {
510    let glob = glob.replace('\\', "/");
511    if glob == "*" || glob.starts_with("**/") || glob.contains('/') {
512        glob
513    } else {
514        format!("**/{glob}")
515    }
516}
517
518fn optional_string_list(
519    builtin: &'static str,
520    dict: &BTreeMap<String, VmValue>,
521    key: &'static str,
522) -> Result<Option<Vec<String>>, HostlibError> {
523    let Some(value) = dict.get(key) else {
524        return Ok(None);
525    };
526    match value {
527        VmValue::Nil => Ok(None),
528        VmValue::List(items) => items
529            .iter()
530            .enumerate()
531            .map(|(idx, item)| match item {
532                VmValue::String(value) => Ok(value.to_string()),
533                other => Err(HostlibError::InvalidParameter {
534                    builtin,
535                    param: key,
536                    message: format!("item {idx} must be a string, got {}", other.type_name()),
537                }),
538            })
539            .collect::<Result<Vec<_>, _>>()
540            .map(Some),
541        other => Err(HostlibError::InvalidParameter {
542            builtin,
543            param: key,
544            message: format!("expected list of strings, got {}", other.type_name()),
545        }),
546    }
547}
548
549fn normalize_existing_path(
550    builtin: &'static str,
551    param: &'static str,
552    path: &str,
553) -> Result<PathBuf, HostlibError> {
554    normalize_existing_path_buf(builtin, param, &PathBuf::from(path))
555}
556
557fn normalize_existing_path_buf(
558    builtin: &'static str,
559    param: &'static str,
560    path: &Path,
561) -> Result<PathBuf, HostlibError> {
562    path.canonicalize()
563        .map_err(|err| HostlibError::InvalidParameter {
564            builtin,
565            param,
566            message: format!(
567                "{} does not resolve to an existing path: {err}",
568                path.display()
569            ),
570        })
571}
572
573fn path_to_string(path: &Path) -> String {
574    path.to_string_lossy().replace('\\', "/")
575}
576
577fn next_subscription_id() -> String {
578    let seq = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
579    let millis = SystemTime::now()
580        .duration_since(UNIX_EPOCH)
581        .map(|duration| duration.as_millis())
582        .unwrap_or(0);
583    format!("fsw-{millis}-{seq}")
584}
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589
590    fn event(kind: EventKind, path: impl Into<PathBuf>) -> Event {
591        Event::new(kind).add_path(path.into())
592    }
593
594    fn filter(root: PathBuf, globs: Option<Vec<&str>>) -> WatchFilter {
595        WatchFilter {
596            session_id: "session".to_string(),
597            subscription_id: "sub".to_string(),
598            root,
599            globs: globs.map(|patterns| {
600                build_globs(patterns.into_iter().map(str::to_string).collect())
601                    .unwrap()
602                    .unwrap()
603            }),
604            gitignore: None,
605            kinds: parse_kinds(&BTreeMap::new()).unwrap(),
606        }
607    }
608
609    #[test]
610    fn coalesce_deduplicates_same_kind_and_path() {
611        let root = std::env::current_dir().unwrap();
612        let path = root.join("src/lib.rs");
613        let filter = filter(root, None);
614        let events = coalesce_events(
615            vec![
616                event(EventKind::Modify(ModifyKind::Any), &path),
617                event(EventKind::Modify(ModifyKind::Any), &path),
618            ],
619            &filter,
620        );
621        assert_eq!(events.len(), 1);
622        assert_eq!(events[0].kind, "modify");
623    }
624
625    #[test]
626    fn glob_filter_uses_relative_paths() {
627        let root = std::env::current_dir().unwrap();
628        let filter = filter(root.clone(), Some(vec!["*.rs"]));
629        let events = coalesce_events(
630            vec![
631                event(
632                    EventKind::Create(notify::event::CreateKind::Any),
633                    root.join("src/lib.rs"),
634                ),
635                event(
636                    EventKind::Create(notify::event::CreateKind::Any),
637                    root.join("README.md"),
638                ),
639            ],
640            &filter,
641        );
642        assert_eq!(events.len(), 1);
643        assert_eq!(events[0].relative_paths, vec!["src/lib.rs"]);
644    }
645
646    #[test]
647    fn kind_filter_drops_unrequested_events() {
648        let root = std::env::current_dir().unwrap();
649        let mut filter = filter(root.clone(), None);
650        filter.kinds = BTreeSet::from(["remove".to_string()]);
651
652        let events = coalesce_events(
653            vec![
654                event(
655                    EventKind::Create(notify::event::CreateKind::Any),
656                    root.join("src/lib.rs"),
657                ),
658                event(
659                    EventKind::Remove(notify::event::RemoveKind::Any),
660                    root.join("src/lib.rs"),
661                ),
662            ],
663            &filter,
664        );
665
666        assert_eq!(events.len(), 1);
667        assert_eq!(events[0].kind, "remove");
668    }
669
670    #[test]
671    fn kind_filter_allows_access_and_other_events() {
672        let root = std::env::current_dir().unwrap();
673        let mut config = BTreeMap::new();
674        config.insert(
675            "kinds".to_string(),
676            VmValue::List(std::sync::Arc::new(vec![
677                VmValue::String(std::sync::Arc::from("access")),
678                VmValue::String(std::sync::Arc::from("other")),
679            ])),
680        );
681        let mut filter = filter(root.clone(), None);
682        filter.kinds = parse_kinds(&config).unwrap();
683
684        let events = coalesce_events(
685            vec![
686                event(
687                    EventKind::Access(notify::event::AccessKind::Any),
688                    root.join("src/lib.rs"),
689                ),
690                event(EventKind::Other, root.join("README.md")),
691            ],
692            &filter,
693        );
694
695        assert_eq!(events.len(), 2);
696        assert_eq!(events[0].kind, "access");
697        assert_eq!(events[1].kind, "other");
698    }
699
700    #[test]
701    fn gitignore_filter_drops_ignored_paths() {
702        let temp = tempfile::tempdir().unwrap();
703        std::fs::write(temp.path().join(".gitignore"), "ignored.txt\n").unwrap();
704        let mut filter = filter(temp.path().to_path_buf(), None);
705        filter.gitignore = Some(build_gitignore(temp.path()));
706
707        let events = coalesce_events(
708            vec![
709                event(
710                    EventKind::Modify(ModifyKind::Any),
711                    temp.path().join("allowed.txt"),
712                ),
713                event(
714                    EventKind::Modify(ModifyKind::Any),
715                    temp.path().join("ignored.txt"),
716                ),
717            ],
718            &filter,
719        );
720
721        assert_eq!(events.len(), 1);
722        assert_eq!(events[0].relative_paths, vec!["allowed.txt"]);
723    }
724}