Skip to main content

harn_hostlib/fs_watch/
mod.rs

1//! File-system watch host capability.
2//!
3//! Wraps `notify` to deliver coalesced file-change batches into Harn's
4//! session-scoped `AgentEvent` stream.
5
6use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{mpsc, Mutex, OnceLock};
10use std::thread;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use globset::{Glob, GlobSet, GlobSetBuilder};
14use harn_vm::agent_events::{AgentEvent, FsWatchEvent};
15use harn_vm::VmValue;
16use ignore::gitignore::{Gitignore, GitignoreBuilder};
17use notify::event::{ModifyKind, RenameMode};
18use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
19
20use crate::error::HostlibError;
21use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
22use crate::tools::args::{
23    build_dict, dict_arg, optional_bool, optional_int, optional_string, str_value,
24};
25
26const SUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_subscribe";
27const UNSUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_unsubscribe";
28const DEFAULT_DEBOUNCE_MS: u64 = 50;
29
30static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
31
32/// File-watch capability handle.
33#[derive(Default)]
34pub struct FsWatchCapability;
35
36impl HostlibCapability for FsWatchCapability {
37    fn module_name(&self) -> &'static str {
38        "fs_watch"
39    }
40
41    fn register_builtins(&self, registry: &mut BuiltinRegistry) {
42        registry.register(RegisteredBuiltin {
43            name: SUBSCRIBE_BUILTIN,
44            module: "fs_watch",
45            method: "subscribe",
46            handler: subscribe_handler(),
47        });
48        registry.register(RegisteredBuiltin {
49            name: UNSUBSCRIBE_BUILTIN,
50            module: "fs_watch",
51            method: "unsubscribe",
52            handler: unsubscribe_handler(),
53        });
54    }
55}
56
57fn subscribe_handler() -> SyncHandler {
58    std::sync::Arc::new(subscribe)
59}
60
61fn unsubscribe_handler() -> SyncHandler {
62    std::sync::Arc::new(unsubscribe)
63}
64
65struct Subscription {
66    _watcher: RecommendedWatcher,
67    stop_tx: mpsc::Sender<WatchMessage>,
68    worker: Option<thread::JoinHandle<()>>,
69}
70
71impl Drop for Subscription {
72    fn drop(&mut self) {
73        let _ = self.stop_tx.send(WatchMessage::Stop);
74        if let Some(worker) = self.worker.take() {
75            let _ = worker.join();
76        }
77    }
78}
79
80enum WatchMessage {
81    Event(Event),
82    Error(String),
83    Stop,
84}
85
86#[derive(Clone)]
87struct WatchFilter {
88    session_id: String,
89    subscription_id: String,
90    root: PathBuf,
91    globs: Option<GlobSet>,
92    gitignore: Option<Gitignore>,
93    kinds: BTreeSet<String>,
94}
95
96fn subscriptions() -> &'static Mutex<HashMap<String, Subscription>> {
97    static SUBSCRIPTIONS: OnceLock<Mutex<HashMap<String, Subscription>>> = OnceLock::new();
98    SUBSCRIPTIONS.get_or_init(|| Mutex::new(HashMap::new()))
99}
100
101fn subscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
102    let raw = dict_arg(SUBSCRIBE_BUILTIN, args)?;
103    let dict = raw.as_ref();
104    let request = SubscribeRequest::from_dict(dict)?;
105    let subscription_id = next_subscription_id();
106    let (tx, rx) = mpsc::channel();
107
108    let filter = WatchFilter {
109        session_id: request.session_id.clone(),
110        subscription_id: subscription_id.clone(),
111        root: request.root.clone(),
112        globs: request.globs,
113        gitignore: request.gitignore,
114        kinds: request.kinds,
115    };
116    let debounce = Duration::from_millis(request.debounce_ms);
117    let worker = thread::Builder::new()
118        .name(format!("harn-fs-watch-{subscription_id}"))
119        .spawn(move || watch_worker(rx, debounce, filter))
120        .map_err(|err| HostlibError::Backend {
121            builtin: SUBSCRIBE_BUILTIN,
122            message: format!("failed to spawn watch worker: {err}"),
123        })?;
124
125    let notify_tx = tx.clone();
126    let mut watcher = notify::recommended_watcher(move |result: notify::Result<Event>| {
127        let message = match result {
128            Ok(event) => WatchMessage::Event(event),
129            Err(err) => WatchMessage::Error(err.to_string()),
130        };
131        let _ = notify_tx.send(message);
132    })
133    .map_err(|err| HostlibError::Backend {
134        builtin: SUBSCRIBE_BUILTIN,
135        message: format!("failed to create watcher: {err}"),
136    })?;
137
138    let mode = if request.recursive {
139        RecursiveMode::Recursive
140    } else {
141        RecursiveMode::NonRecursive
142    };
143    for path in &request.watch_paths {
144        watcher
145            .watch(path, mode)
146            .map_err(|err| HostlibError::Backend {
147                builtin: SUBSCRIBE_BUILTIN,
148                message: format!("failed to watch {}: {err}", path.display()),
149            })?;
150    }
151
152    subscriptions()
153        .lock()
154        .expect("fs_watch mutex poisoned")
155        .insert(
156            subscription_id.clone(),
157            Subscription {
158                _watcher: watcher,
159                stop_tx: tx,
160                worker: Some(worker),
161            },
162        );
163
164    Ok(build_dict([(
165        "subscription_id",
166        str_value(subscription_id.as_str()),
167    )]))
168}
169
170fn unsubscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
171    let raw = dict_arg(UNSUBSCRIBE_BUILTIN, args)?;
172    let dict = raw.as_ref();
173    let subscription_id = match dict.get("subscription_id") {
174        Some(VmValue::String(value)) if !value.trim().is_empty() => value.to_string(),
175        Some(other) => {
176            return Err(HostlibError::InvalidParameter {
177                builtin: UNSUBSCRIBE_BUILTIN,
178                param: "subscription_id",
179                message: format!("expected non-empty string, got {}", other.type_name()),
180            });
181        }
182        None => {
183            return Err(HostlibError::MissingParameter {
184                builtin: UNSUBSCRIBE_BUILTIN,
185                param: "subscription_id",
186            });
187        }
188    };
189    let removed = subscriptions()
190        .lock()
191        .expect("fs_watch mutex poisoned")
192        .remove(&subscription_id)
193        .is_some();
194    Ok(build_dict([("removed", VmValue::Bool(removed))]))
195}
196
197struct SubscribeRequest {
198    session_id: String,
199    root: PathBuf,
200    watch_paths: Vec<PathBuf>,
201    recursive: bool,
202    debounce_ms: u64,
203    globs: Option<GlobSet>,
204    gitignore: Option<Gitignore>,
205    kinds: BTreeSet<String>,
206}
207
208impl SubscribeRequest {
209    fn from_dict(dict: &BTreeMap<String, VmValue>) -> Result<Self, HostlibError> {
210        let root_param = optional_string(SUBSCRIBE_BUILTIN, dict, "root")?;
211        let raw_paths = optional_string_list(SUBSCRIBE_BUILTIN, dict, "paths")?;
212        let raw_globs = optional_string_list(SUBSCRIBE_BUILTIN, dict, "globs")?;
213        let session_id = optional_string(SUBSCRIBE_BUILTIN, dict, "session_id")?
214            .or_else(harn_vm::agent_sessions::current_session_id)
215            .ok_or(HostlibError::MissingParameter {
216                builtin: SUBSCRIBE_BUILTIN,
217                param: "session_id",
218            })?;
219
220        if session_id.trim().is_empty() {
221            return Err(HostlibError::InvalidParameter {
222                builtin: SUBSCRIBE_BUILTIN,
223                param: "session_id",
224                message: "must not be empty".to_string(),
225            });
226        }
227
228        if root_param.is_none() && raw_paths.is_none() {
229            return Err(HostlibError::MissingParameter {
230                builtin: SUBSCRIBE_BUILTIN,
231                param: "root",
232            });
233        }
234
235        let root = match root_param.as_deref() {
236            Some(root) => normalize_existing_path(SUBSCRIBE_BUILTIN, "root", root)?,
237            None => std::env::current_dir().map_err(|err| HostlibError::Backend {
238                builtin: SUBSCRIBE_BUILTIN,
239                message: format!("failed to resolve current directory: {err}"),
240            })?,
241        };
242
243        let raw_paths = raw_paths.unwrap_or_else(|| {
244            root_param
245                .as_ref()
246                .map(|root| vec![root.clone()])
247                .unwrap_or_default()
248        });
249        if raw_paths.is_empty() {
250            return Err(HostlibError::InvalidParameter {
251                builtin: SUBSCRIBE_BUILTIN,
252                param: "paths",
253                message: "must contain at least one path".to_string(),
254            });
255        }
256
257        let mut watch_paths = Vec::with_capacity(raw_paths.len());
258        for path in raw_paths {
259            let path = PathBuf::from(path);
260            let resolved = if path.is_relative() && root_param.is_some() {
261                root.join(path)
262            } else {
263                path
264            };
265            let normalized = normalize_existing_path_buf(SUBSCRIBE_BUILTIN, "paths", &resolved)?;
266            if normalized.strip_prefix(&root).is_err() {
267                return Err(HostlibError::InvalidParameter {
268                    builtin: SUBSCRIBE_BUILTIN,
269                    param: "paths",
270                    message: format!(
271                        "watch path `{}` is outside root `{}`",
272                        normalized.display(),
273                        root.display()
274                    ),
275                });
276            }
277            watch_paths.push(normalized);
278        }
279
280        let recursive = optional_bool(SUBSCRIBE_BUILTIN, dict, "recursive", true)?;
281        let debounce_ms = optional_int(
282            SUBSCRIBE_BUILTIN,
283            dict,
284            "debounce_ms",
285            DEFAULT_DEBOUNCE_MS as i64,
286        )?;
287        if debounce_ms < 0 {
288            return Err(HostlibError::InvalidParameter {
289                builtin: SUBSCRIBE_BUILTIN,
290                param: "debounce_ms",
291                message: "must be >= 0".to_string(),
292            });
293        }
294        let respect_gitignore = optional_bool(SUBSCRIBE_BUILTIN, dict, "respect_gitignore", false)?;
295
296        Ok(Self {
297            session_id,
298            gitignore: if respect_gitignore {
299                Some(build_gitignore(&root))
300            } else {
301                None
302            },
303            globs: build_globs(raw_globs.unwrap_or_default())?,
304            kinds: parse_kinds(dict)?,
305            root,
306            watch_paths,
307            recursive,
308            debounce_ms: debounce_ms as u64,
309        })
310    }
311}
312
313fn watch_worker(rx: mpsc::Receiver<WatchMessage>, debounce: Duration, filter: WatchFilter) {
314    let mut pending = Vec::new();
315    loop {
316        match rx.recv() {
317            Ok(WatchMessage::Event(event)) => {
318                pending.push(event);
319                loop {
320                    match rx.recv_timeout(debounce) {
321                        Ok(WatchMessage::Event(event)) => pending.push(event),
322                        Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
323                        Ok(WatchMessage::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => {
324                            emit_pending(&filter, &mut pending);
325                            return;
326                        }
327                        Err(mpsc::RecvTimeoutError::Timeout) => break,
328                    }
329                }
330                emit_pending(&filter, &mut pending);
331            }
332            Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
333            Ok(WatchMessage::Stop) | Err(_) => return,
334        }
335    }
336}
337
338fn emit_pending(filter: &WatchFilter, pending: &mut Vec<Event>) {
339    if pending.is_empty() {
340        return;
341    }
342    let events = coalesce_events(std::mem::take(pending), filter);
343    if events.is_empty() {
344        return;
345    }
346    harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
347        session_id: filter.session_id.clone(),
348        subscription_id: filter.subscription_id.clone(),
349        events,
350    });
351}
352
353fn emit_watch_error(filter: &WatchFilter, error: String) {
354    harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
355        session_id: filter.session_id.clone(),
356        subscription_id: filter.subscription_id.clone(),
357        events: vec![FsWatchEvent {
358            kind: "error".to_string(),
359            paths: Vec::new(),
360            relative_paths: Vec::new(),
361            raw_kind: "error".to_string(),
362            error: Some(error),
363        }],
364    });
365}
366
367fn coalesce_events(events: Vec<Event>, filter: &WatchFilter) -> Vec<FsWatchEvent> {
368    let mut seen = BTreeSet::new();
369    let mut output = Vec::new();
370    for event in events {
371        let kind = normalize_kind(&event.kind);
372        if !filter.kinds.contains(kind) {
373            continue;
374        }
375        let mut paths = Vec::new();
376        let mut relative_paths = Vec::new();
377        for path in &event.paths {
378            if !filter.matches_path(path) {
379                continue;
380            }
381            paths.push(path_to_string(path));
382            relative_paths.push(filter.relative_path(path));
383        }
384        if paths.is_empty() {
385            continue;
386        }
387        paths.sort();
388        paths.dedup();
389        relative_paths.sort();
390        relative_paths.dedup();
391        let raw_kind = format!("{:?}", event.kind);
392        if !seen.insert((kind.to_string(), paths.clone(), raw_kind.clone())) {
393            continue;
394        }
395        output.push(FsWatchEvent {
396            kind: kind.to_string(),
397            paths,
398            relative_paths,
399            raw_kind,
400            error: None,
401        });
402    }
403    output
404}
405
406impl WatchFilter {
407    fn matches_path(&self, path: &Path) -> bool {
408        if let Some(gitignore) = &self.gitignore {
409            if gitignore.matched(path, path.is_dir()).is_ignore() {
410                return false;
411            }
412        }
413        if let Some(globs) = &self.globs {
414            let relative = self.relative_path(path);
415            return globs.is_match(relative);
416        }
417        true
418    }
419
420    fn relative_path(&self, path: &Path) -> String {
421        let relative = path.strip_prefix(&self.root).unwrap_or(path);
422        let value = path_to_string(relative);
423        if value.is_empty() {
424            ".".to_string()
425        } else {
426            value
427        }
428    }
429}
430
431fn normalize_kind(kind: &EventKind) -> &'static str {
432    match kind {
433        EventKind::Create(_) => "create",
434        EventKind::Remove(_) => "remove",
435        EventKind::Modify(ModifyKind::Name(
436            RenameMode::Any
437            | RenameMode::To
438            | RenameMode::From
439            | RenameMode::Both
440            | RenameMode::Other,
441        )) => "rename",
442        EventKind::Modify(_) | EventKind::Any => "modify",
443        EventKind::Access(_) => "access",
444        EventKind::Other => "other",
445    }
446}
447
448fn parse_kinds(dict: &BTreeMap<String, VmValue>) -> Result<BTreeSet<String>, HostlibError> {
449    let values = optional_string_list(SUBSCRIBE_BUILTIN, dict, "kinds")?.unwrap_or_else(|| {
450        vec![
451            "create".to_string(),
452            "modify".to_string(),
453            "remove".to_string(),
454            "rename".to_string(),
455        ]
456    });
457    let mut kinds = BTreeSet::new();
458    for kind in values {
459        match kind.as_str() {
460            "create" | "modify" | "remove" | "rename" => {
461                kinds.insert(kind);
462            }
463            _ => {
464                return Err(HostlibError::InvalidParameter {
465                    builtin: SUBSCRIBE_BUILTIN,
466                    param: "kinds",
467                    message: format!("unsupported event kind `{kind}`"),
468                });
469            }
470        }
471    }
472    Ok(kinds)
473}
474
475fn build_globs(globs: Vec<String>) -> Result<Option<GlobSet>, HostlibError> {
476    if globs.is_empty() {
477        return Ok(None);
478    }
479    let mut builder = GlobSetBuilder::new();
480    for glob in globs {
481        let normalized = normalize_glob(&glob);
482        builder.add(
483            Glob::new(&normalized).map_err(|err| HostlibError::InvalidParameter {
484                builtin: SUBSCRIBE_BUILTIN,
485                param: "globs",
486                message: format!("invalid glob `{glob}`: {err}"),
487            })?,
488        );
489    }
490    Ok(Some(builder.build().map_err(|err| {
491        HostlibError::InvalidParameter {
492            builtin: SUBSCRIBE_BUILTIN,
493            param: "globs",
494            message: format!("invalid glob set: {err}"),
495        }
496    })?))
497}
498
499fn build_gitignore(root: &Path) -> Gitignore {
500    let mut builder = GitignoreBuilder::new(root);
501    let gitignore = root.join(".gitignore");
502    if gitignore.exists() {
503        let _ = builder.add(gitignore);
504    }
505    let exclude = root.join(".git").join("info").join("exclude");
506    if exclude.exists() {
507        let _ = builder.add(exclude);
508    }
509    builder.build().unwrap_or_else(|_| Gitignore::empty())
510}
511
512fn normalize_glob(glob: &str) -> String {
513    let glob = glob.replace('\\', "/");
514    if glob == "*" || glob.starts_with("**/") || glob.contains('/') {
515        glob
516    } else {
517        format!("**/{glob}")
518    }
519}
520
521fn optional_string_list(
522    builtin: &'static str,
523    dict: &BTreeMap<String, VmValue>,
524    key: &'static str,
525) -> Result<Option<Vec<String>>, HostlibError> {
526    let Some(value) = dict.get(key) else {
527        return Ok(None);
528    };
529    match value {
530        VmValue::Nil => Ok(None),
531        VmValue::List(items) => items
532            .iter()
533            .enumerate()
534            .map(|(idx, item)| match item {
535                VmValue::String(value) => Ok(value.to_string()),
536                other => Err(HostlibError::InvalidParameter {
537                    builtin,
538                    param: key,
539                    message: format!("item {idx} must be a string, got {}", other.type_name()),
540                }),
541            })
542            .collect::<Result<Vec<_>, _>>()
543            .map(Some),
544        other => Err(HostlibError::InvalidParameter {
545            builtin,
546            param: key,
547            message: format!("expected list of strings, got {}", other.type_name()),
548        }),
549    }
550}
551
552fn normalize_existing_path(
553    builtin: &'static str,
554    param: &'static str,
555    path: &str,
556) -> Result<PathBuf, HostlibError> {
557    normalize_existing_path_buf(builtin, param, &PathBuf::from(path))
558}
559
560fn normalize_existing_path_buf(
561    builtin: &'static str,
562    param: &'static str,
563    path: &Path,
564) -> Result<PathBuf, HostlibError> {
565    path.canonicalize()
566        .map_err(|err| HostlibError::InvalidParameter {
567            builtin,
568            param,
569            message: format!(
570                "{} does not resolve to an existing path: {err}",
571                path.display()
572            ),
573        })
574}
575
576fn path_to_string(path: &Path) -> String {
577    path.to_string_lossy().replace('\\', "/")
578}
579
580fn next_subscription_id() -> String {
581    let seq = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
582    let millis = SystemTime::now()
583        .duration_since(UNIX_EPOCH)
584        .map(|duration| duration.as_millis())
585        .unwrap_or(0);
586    format!("fsw-{millis}-{seq}")
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592
593    fn event(kind: EventKind, path: impl Into<PathBuf>) -> Event {
594        Event::new(kind).add_path(path.into())
595    }
596
597    fn filter(root: PathBuf, globs: Option<Vec<&str>>) -> WatchFilter {
598        WatchFilter {
599            session_id: "session".to_string(),
600            subscription_id: "sub".to_string(),
601            root,
602            globs: globs.map(|patterns| {
603                build_globs(patterns.into_iter().map(str::to_string).collect())
604                    .unwrap()
605                    .unwrap()
606            }),
607            gitignore: None,
608            kinds: parse_kinds(&BTreeMap::new()).unwrap(),
609        }
610    }
611
612    #[test]
613    fn coalesce_deduplicates_same_kind_and_path() {
614        let root = std::env::current_dir().unwrap();
615        let path = root.join("src/lib.rs");
616        let filter = filter(root, None);
617        let events = coalesce_events(
618            vec![
619                event(EventKind::Modify(ModifyKind::Any), &path),
620                event(EventKind::Modify(ModifyKind::Any), &path),
621            ],
622            &filter,
623        );
624        assert_eq!(events.len(), 1);
625        assert_eq!(events[0].kind, "modify");
626    }
627
628    #[test]
629    fn glob_filter_uses_relative_paths() {
630        let root = std::env::current_dir().unwrap();
631        let filter = filter(root.clone(), Some(vec!["*.rs"]));
632        let events = coalesce_events(
633            vec![
634                event(
635                    EventKind::Create(notify::event::CreateKind::Any),
636                    root.join("src/lib.rs"),
637                ),
638                event(
639                    EventKind::Create(notify::event::CreateKind::Any),
640                    root.join("README.md"),
641                ),
642            ],
643            &filter,
644        );
645        assert_eq!(events.len(), 1);
646        assert_eq!(events[0].relative_paths, vec!["src/lib.rs"]);
647    }
648
649    #[test]
650    fn kind_filter_drops_unrequested_events() {
651        let root = std::env::current_dir().unwrap();
652        let mut filter = filter(root.clone(), None);
653        filter.kinds = BTreeSet::from(["remove".to_string()]);
654
655        let events = coalesce_events(
656            vec![
657                event(
658                    EventKind::Create(notify::event::CreateKind::Any),
659                    root.join("src/lib.rs"),
660                ),
661                event(
662                    EventKind::Remove(notify::event::RemoveKind::Any),
663                    root.join("src/lib.rs"),
664                ),
665            ],
666            &filter,
667        );
668
669        assert_eq!(events.len(), 1);
670        assert_eq!(events[0].kind, "remove");
671    }
672
673    #[test]
674    fn gitignore_filter_drops_ignored_paths() {
675        let temp = tempfile::tempdir().unwrap();
676        std::fs::write(temp.path().join(".gitignore"), "ignored.txt\n").unwrap();
677        let mut filter = filter(temp.path().to_path_buf(), None);
678        filter.gitignore = Some(build_gitignore(temp.path()));
679
680        let events = coalesce_events(
681            vec![
682                event(
683                    EventKind::Modify(ModifyKind::Any),
684                    temp.path().join("allowed.txt"),
685                ),
686                event(
687                    EventKind::Modify(ModifyKind::Any),
688                    temp.path().join("ignored.txt"),
689                ),
690            ],
691            &filter,
692        );
693
694        assert_eq!(events.len(), 1);
695        assert_eq!(events[0].relative_paths, vec!["allowed.txt"]);
696    }
697}