Skip to main content

harn_hostlib/fs_watch/
mod.rs

1//! File-system watch host capability.
2//!
3//! Wraps `notify` to deliver coalesced file-change batches into Harn's
4//! session-scoped `AgentEvent` stream.
5
6use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{mpsc, Mutex, OnceLock};
10use std::thread;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use globset::{Glob, GlobSet, GlobSetBuilder};
14use harn_vm::agent_events::{AgentEvent, FsWatchEvent};
15use harn_vm::VmValue;
16use ignore::gitignore::{Gitignore, GitignoreBuilder};
17use notify::event::{ModifyKind, RenameMode};
18use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
19
20use crate::error::HostlibError;
21use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
22use crate::tools::args::{
23    build_dict, dict_arg, optional_bool, optional_int, optional_string, str_value,
24};
25
26const SUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_subscribe";
27const UNSUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_unsubscribe";
28const DEFAULT_DEBOUNCE_MS: u64 = 50;
29
30static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
31
32/// File-watch capability handle.
33#[derive(Default)]
34pub struct FsWatchCapability;
35
36impl HostlibCapability for FsWatchCapability {
37    fn module_name(&self) -> &'static str {
38        "fs_watch"
39    }
40
41    fn register_builtins(&self, registry: &mut BuiltinRegistry) {
42        registry.register(RegisteredBuiltin {
43            name: SUBSCRIBE_BUILTIN,
44            module: "fs_watch",
45            method: "subscribe",
46            handler: subscribe_handler(),
47        });
48        registry.register(RegisteredBuiltin {
49            name: UNSUBSCRIBE_BUILTIN,
50            module: "fs_watch",
51            method: "unsubscribe",
52            handler: unsubscribe_handler(),
53        });
54    }
55}
56
57fn subscribe_handler() -> SyncHandler {
58    std::sync::Arc::new(subscribe)
59}
60
61fn unsubscribe_handler() -> SyncHandler {
62    std::sync::Arc::new(unsubscribe)
63}
64
65struct Subscription {
66    _watcher: RecommendedWatcher,
67    stop_tx: mpsc::Sender<WatchMessage>,
68    worker: Option<thread::JoinHandle<()>>,
69}
70
71impl Drop for Subscription {
72    fn drop(&mut self) {
73        let _ = self.stop_tx.send(WatchMessage::Stop);
74        if let Some(worker) = self.worker.take() {
75            let _ = worker.join();
76        }
77    }
78}
79
80enum WatchMessage {
81    Event(Event),
82    Error(String),
83    Stop,
84}
85
86#[derive(Clone)]
87struct WatchFilter {
88    session_id: String,
89    subscription_id: String,
90    root: PathBuf,
91    globs: Option<GlobSet>,
92    gitignore: Option<Gitignore>,
93    kinds: BTreeSet<String>,
94}
95
96fn subscriptions() -> &'static Mutex<HashMap<String, Subscription>> {
97    static SUBSCRIPTIONS: OnceLock<Mutex<HashMap<String, Subscription>>> = OnceLock::new();
98    SUBSCRIPTIONS.get_or_init(|| Mutex::new(HashMap::new()))
99}
100
101fn subscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
102    let raw = dict_arg(SUBSCRIBE_BUILTIN, args)?;
103    let dict = raw.as_ref();
104    let request = SubscribeRequest::from_dict(dict)?;
105    let subscription_id = next_subscription_id();
106    let (tx, rx) = mpsc::channel();
107
108    let filter = WatchFilter {
109        session_id: request.session_id.clone(),
110        subscription_id: subscription_id.clone(),
111        root: request.root.clone(),
112        globs: request.globs,
113        gitignore: request.gitignore,
114        kinds: request.kinds,
115    };
116    let debounce = Duration::from_millis(request.debounce_ms);
117    let worker = thread::Builder::new()
118        .name(format!("harn-fs-watch-{subscription_id}"))
119        .spawn(move || watch_worker(rx, debounce, filter))
120        .map_err(|err| HostlibError::Backend {
121            builtin: SUBSCRIBE_BUILTIN,
122            message: format!("failed to spawn watch worker: {err}"),
123        })?;
124
125    let notify_tx = tx.clone();
126    let mut watcher = notify::recommended_watcher(move |result: notify::Result<Event>| {
127        let message = match result {
128            Ok(event) => WatchMessage::Event(event),
129            Err(err) => WatchMessage::Error(err.to_string()),
130        };
131        let _ = notify_tx.send(message);
132    })
133    .map_err(|err| HostlibError::Backend {
134        builtin: SUBSCRIBE_BUILTIN,
135        message: format!("failed to create watcher: {err}"),
136    })?;
137
138    let mode = if request.recursive {
139        RecursiveMode::Recursive
140    } else {
141        RecursiveMode::NonRecursive
142    };
143    for path in &request.watch_paths {
144        watcher
145            .watch(path, mode)
146            .map_err(|err| HostlibError::Backend {
147                builtin: SUBSCRIBE_BUILTIN,
148                message: format!("failed to watch {}: {err}", path.display()),
149            })?;
150    }
151
152    subscriptions()
153        .lock()
154        .expect("fs_watch mutex poisoned")
155        .insert(
156            subscription_id.clone(),
157            Subscription {
158                _watcher: watcher,
159                stop_tx: tx,
160                worker: Some(worker),
161            },
162        );
163
164    Ok(build_dict([(
165        "subscription_id",
166        str_value(subscription_id.as_str()),
167    )]))
168}
169
170fn unsubscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
171    let raw = dict_arg(UNSUBSCRIBE_BUILTIN, args)?;
172    let dict = raw.as_ref();
173    let subscription_id = match dict.get("subscription_id") {
174        Some(VmValue::String(value)) if !value.trim().is_empty() => value.to_string(),
175        Some(other) => {
176            return Err(HostlibError::InvalidParameter {
177                builtin: UNSUBSCRIBE_BUILTIN,
178                param: "subscription_id",
179                message: format!("expected non-empty string, got {}", other.type_name()),
180            });
181        }
182        None => {
183            return Err(HostlibError::MissingParameter {
184                builtin: UNSUBSCRIBE_BUILTIN,
185                param: "subscription_id",
186            });
187        }
188    };
189    let removed = subscriptions()
190        .lock()
191        .expect("fs_watch mutex poisoned")
192        .remove(&subscription_id)
193        .is_some();
194    Ok(build_dict([("removed", VmValue::Bool(removed))]))
195}
196
197struct SubscribeRequest {
198    session_id: String,
199    root: PathBuf,
200    watch_paths: Vec<PathBuf>,
201    recursive: bool,
202    debounce_ms: u64,
203    globs: Option<GlobSet>,
204    gitignore: Option<Gitignore>,
205    kinds: BTreeSet<String>,
206}
207
208impl SubscribeRequest {
209    fn from_dict(dict: &BTreeMap<String, VmValue>) -> Result<Self, HostlibError> {
210        let root_param = optional_string(SUBSCRIBE_BUILTIN, dict, "root")?;
211        let raw_paths = optional_string_list(SUBSCRIBE_BUILTIN, dict, "paths")?;
212        let raw_globs = optional_string_list(SUBSCRIBE_BUILTIN, dict, "globs")?;
213        let session_id = optional_string(SUBSCRIBE_BUILTIN, dict, "session_id")?
214            .or_else(harn_vm::agent_sessions::current_session_id)
215            .ok_or(HostlibError::MissingParameter {
216                builtin: SUBSCRIBE_BUILTIN,
217                param: "session_id",
218            })?;
219
220        if session_id.trim().is_empty() {
221            return Err(HostlibError::InvalidParameter {
222                builtin: SUBSCRIBE_BUILTIN,
223                param: "session_id",
224                message: "must not be empty".to_string(),
225            });
226        }
227
228        if root_param.is_none() && raw_paths.is_none() {
229            return Err(HostlibError::MissingParameter {
230                builtin: SUBSCRIBE_BUILTIN,
231                param: "root",
232            });
233        }
234
235        let root = match root_param.as_deref() {
236            Some(root) => normalize_existing_path(SUBSCRIBE_BUILTIN, "root", root)?,
237            None => std::env::current_dir().map_err(|err| HostlibError::Backend {
238                builtin: SUBSCRIBE_BUILTIN,
239                message: format!("failed to resolve current directory: {err}"),
240            })?,
241        };
242
243        let raw_paths = raw_paths.unwrap_or_else(|| {
244            root_param
245                .as_ref()
246                .map(|root| vec![root.clone()])
247                .unwrap_or_default()
248        });
249        if raw_paths.is_empty() {
250            return Err(HostlibError::InvalidParameter {
251                builtin: SUBSCRIBE_BUILTIN,
252                param: "paths",
253                message: "must contain at least one path".to_string(),
254            });
255        }
256
257        let mut watch_paths = Vec::with_capacity(raw_paths.len());
258        for path in raw_paths {
259            let path = PathBuf::from(path);
260            let resolved = if path.is_relative() && root_param.is_some() {
261                root.join(path)
262            } else {
263                path
264            };
265            watch_paths.push(normalize_existing_path_buf(
266                SUBSCRIBE_BUILTIN,
267                "paths",
268                &resolved,
269            )?);
270        }
271
272        let recursive = optional_bool(SUBSCRIBE_BUILTIN, dict, "recursive", true)?;
273        let debounce_ms = optional_int(
274            SUBSCRIBE_BUILTIN,
275            dict,
276            "debounce_ms",
277            DEFAULT_DEBOUNCE_MS as i64,
278        )?;
279        if debounce_ms < 0 {
280            return Err(HostlibError::InvalidParameter {
281                builtin: SUBSCRIBE_BUILTIN,
282                param: "debounce_ms",
283                message: "must be >= 0".to_string(),
284            });
285        }
286        let respect_gitignore = optional_bool(SUBSCRIBE_BUILTIN, dict, "respect_gitignore", false)?;
287
288        Ok(Self {
289            session_id,
290            gitignore: if respect_gitignore {
291                Some(build_gitignore(&root))
292            } else {
293                None
294            },
295            globs: build_globs(raw_globs.unwrap_or_default())?,
296            kinds: parse_kinds(dict)?,
297            root,
298            watch_paths,
299            recursive,
300            debounce_ms: debounce_ms as u64,
301        })
302    }
303}
304
305fn watch_worker(rx: mpsc::Receiver<WatchMessage>, debounce: Duration, filter: WatchFilter) {
306    let mut pending = Vec::new();
307    loop {
308        match rx.recv() {
309            Ok(WatchMessage::Event(event)) => {
310                pending.push(event);
311                loop {
312                    match rx.recv_timeout(debounce) {
313                        Ok(WatchMessage::Event(event)) => pending.push(event),
314                        Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
315                        Ok(WatchMessage::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => {
316                            emit_pending(&filter, &mut pending);
317                            return;
318                        }
319                        Err(mpsc::RecvTimeoutError::Timeout) => break,
320                    }
321                }
322                emit_pending(&filter, &mut pending);
323            }
324            Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
325            Ok(WatchMessage::Stop) | Err(_) => return,
326        }
327    }
328}
329
330fn emit_pending(filter: &WatchFilter, pending: &mut Vec<Event>) {
331    if pending.is_empty() {
332        return;
333    }
334    let events = coalesce_events(std::mem::take(pending), filter);
335    if events.is_empty() {
336        return;
337    }
338    harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
339        session_id: filter.session_id.clone(),
340        subscription_id: filter.subscription_id.clone(),
341        events,
342    });
343}
344
345fn emit_watch_error(filter: &WatchFilter, error: String) {
346    harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
347        session_id: filter.session_id.clone(),
348        subscription_id: filter.subscription_id.clone(),
349        events: vec![FsWatchEvent {
350            kind: "error".to_string(),
351            paths: Vec::new(),
352            relative_paths: Vec::new(),
353            raw_kind: "error".to_string(),
354            error: Some(error),
355        }],
356    });
357}
358
359fn coalesce_events(events: Vec<Event>, filter: &WatchFilter) -> Vec<FsWatchEvent> {
360    let mut seen = BTreeSet::new();
361    let mut output = Vec::new();
362    for event in events {
363        let kind = normalize_kind(&event.kind);
364        if !filter.kinds.contains(kind) {
365            continue;
366        }
367        let mut paths = Vec::new();
368        let mut relative_paths = Vec::new();
369        for path in &event.paths {
370            if !filter.matches_path(path) {
371                continue;
372            }
373            paths.push(path_to_string(path));
374            relative_paths.push(filter.relative_path(path));
375        }
376        if paths.is_empty() {
377            continue;
378        }
379        paths.sort();
380        paths.dedup();
381        relative_paths.sort();
382        relative_paths.dedup();
383        let raw_kind = format!("{:?}", event.kind);
384        if !seen.insert((kind.to_string(), paths.clone(), raw_kind.clone())) {
385            continue;
386        }
387        output.push(FsWatchEvent {
388            kind: kind.to_string(),
389            paths,
390            relative_paths,
391            raw_kind,
392            error: None,
393        });
394    }
395    output
396}
397
398impl WatchFilter {
399    fn matches_path(&self, path: &Path) -> bool {
400        if let Some(gitignore) = &self.gitignore {
401            if gitignore.matched(path, path.is_dir()).is_ignore() {
402                return false;
403            }
404        }
405        if let Some(globs) = &self.globs {
406            let relative = self.relative_path(path);
407            return globs.is_match(relative);
408        }
409        true
410    }
411
412    fn relative_path(&self, path: &Path) -> String {
413        let relative = path.strip_prefix(&self.root).unwrap_or(path);
414        let value = path_to_string(relative);
415        if value.is_empty() {
416            ".".to_string()
417        } else {
418            value
419        }
420    }
421}
422
423fn normalize_kind(kind: &EventKind) -> &'static str {
424    match kind {
425        EventKind::Create(_) => "create",
426        EventKind::Remove(_) => "remove",
427        EventKind::Modify(ModifyKind::Name(
428            RenameMode::Any
429            | RenameMode::To
430            | RenameMode::From
431            | RenameMode::Both
432            | RenameMode::Other,
433        )) => "rename",
434        EventKind::Modify(_) | EventKind::Any => "modify",
435        EventKind::Access(_) => "access",
436        EventKind::Other => "other",
437    }
438}
439
440fn parse_kinds(dict: &BTreeMap<String, VmValue>) -> Result<BTreeSet<String>, HostlibError> {
441    let values = optional_string_list(SUBSCRIBE_BUILTIN, dict, "kinds")?.unwrap_or_else(|| {
442        vec![
443            "create".to_string(),
444            "modify".to_string(),
445            "remove".to_string(),
446            "rename".to_string(),
447        ]
448    });
449    let mut kinds = BTreeSet::new();
450    for kind in values {
451        match kind.as_str() {
452            "create" | "modify" | "remove" | "rename" => {
453                kinds.insert(kind);
454            }
455            _ => {
456                return Err(HostlibError::InvalidParameter {
457                    builtin: SUBSCRIBE_BUILTIN,
458                    param: "kinds",
459                    message: format!("unsupported event kind `{kind}`"),
460                });
461            }
462        }
463    }
464    Ok(kinds)
465}
466
467fn build_globs(globs: Vec<String>) -> Result<Option<GlobSet>, HostlibError> {
468    if globs.is_empty() {
469        return Ok(None);
470    }
471    let mut builder = GlobSetBuilder::new();
472    for glob in globs {
473        let normalized = normalize_glob(&glob);
474        builder.add(
475            Glob::new(&normalized).map_err(|err| HostlibError::InvalidParameter {
476                builtin: SUBSCRIBE_BUILTIN,
477                param: "globs",
478                message: format!("invalid glob `{glob}`: {err}"),
479            })?,
480        );
481    }
482    Ok(Some(builder.build().map_err(|err| {
483        HostlibError::InvalidParameter {
484            builtin: SUBSCRIBE_BUILTIN,
485            param: "globs",
486            message: format!("invalid glob set: {err}"),
487        }
488    })?))
489}
490
491fn build_gitignore(root: &Path) -> Gitignore {
492    let mut builder = GitignoreBuilder::new(root);
493    let gitignore = root.join(".gitignore");
494    if gitignore.exists() {
495        let _ = builder.add(gitignore);
496    }
497    let exclude = root.join(".git").join("info").join("exclude");
498    if exclude.exists() {
499        let _ = builder.add(exclude);
500    }
501    builder.build().unwrap_or_else(|_| Gitignore::empty())
502}
503
504fn normalize_glob(glob: &str) -> String {
505    let glob = glob.replace('\\', "/");
506    if glob == "*" || glob.starts_with("**/") || glob.contains('/') {
507        glob
508    } else {
509        format!("**/{glob}")
510    }
511}
512
513fn optional_string_list(
514    builtin: &'static str,
515    dict: &BTreeMap<String, VmValue>,
516    key: &'static str,
517) -> Result<Option<Vec<String>>, HostlibError> {
518    let Some(value) = dict.get(key) else {
519        return Ok(None);
520    };
521    match value {
522        VmValue::Nil => Ok(None),
523        VmValue::List(items) => items
524            .iter()
525            .enumerate()
526            .map(|(idx, item)| match item {
527                VmValue::String(value) => Ok(value.to_string()),
528                other => Err(HostlibError::InvalidParameter {
529                    builtin,
530                    param: key,
531                    message: format!("item {idx} must be a string, got {}", other.type_name()),
532                }),
533            })
534            .collect::<Result<Vec<_>, _>>()
535            .map(Some),
536        other => Err(HostlibError::InvalidParameter {
537            builtin,
538            param: key,
539            message: format!("expected list of strings, got {}", other.type_name()),
540        }),
541    }
542}
543
544fn normalize_existing_path(
545    builtin: &'static str,
546    param: &'static str,
547    path: &str,
548) -> Result<PathBuf, HostlibError> {
549    normalize_existing_path_buf(builtin, param, &PathBuf::from(path))
550}
551
552fn normalize_existing_path_buf(
553    builtin: &'static str,
554    param: &'static str,
555    path: &Path,
556) -> Result<PathBuf, HostlibError> {
557    path.canonicalize()
558        .map_err(|err| HostlibError::InvalidParameter {
559            builtin,
560            param,
561            message: format!(
562                "{} does not resolve to an existing path: {err}",
563                path.display()
564            ),
565        })
566}
567
568fn path_to_string(path: &Path) -> String {
569    path.to_string_lossy().replace('\\', "/")
570}
571
572fn next_subscription_id() -> String {
573    let seq = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
574    let millis = SystemTime::now()
575        .duration_since(UNIX_EPOCH)
576        .map(|duration| duration.as_millis())
577        .unwrap_or(0);
578    format!("fsw-{millis}-{seq}")
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584
585    fn event(kind: EventKind, path: impl Into<PathBuf>) -> Event {
586        Event::new(kind).add_path(path.into())
587    }
588
589    fn filter(root: PathBuf, globs: Option<Vec<&str>>) -> WatchFilter {
590        WatchFilter {
591            session_id: "session".to_string(),
592            subscription_id: "sub".to_string(),
593            root,
594            globs: globs.map(|patterns| {
595                build_globs(patterns.into_iter().map(str::to_string).collect())
596                    .unwrap()
597                    .unwrap()
598            }),
599            gitignore: None,
600            kinds: parse_kinds(&BTreeMap::new()).unwrap(),
601        }
602    }
603
604    #[test]
605    fn coalesce_deduplicates_same_kind_and_path() {
606        let root = std::env::current_dir().unwrap();
607        let path = root.join("src/lib.rs");
608        let filter = filter(root, None);
609        let events = coalesce_events(
610            vec![
611                event(EventKind::Modify(ModifyKind::Any), &path),
612                event(EventKind::Modify(ModifyKind::Any), &path),
613            ],
614            &filter,
615        );
616        assert_eq!(events.len(), 1);
617        assert_eq!(events[0].kind, "modify");
618    }
619
620    #[test]
621    fn glob_filter_uses_relative_paths() {
622        let root = std::env::current_dir().unwrap();
623        let filter = filter(root.clone(), Some(vec!["*.rs"]));
624        let events = coalesce_events(
625            vec![
626                event(
627                    EventKind::Create(notify::event::CreateKind::Any),
628                    root.join("src/lib.rs"),
629                ),
630                event(
631                    EventKind::Create(notify::event::CreateKind::Any),
632                    root.join("README.md"),
633                ),
634            ],
635            &filter,
636        );
637        assert_eq!(events.len(), 1);
638        assert_eq!(events[0].relative_paths, vec!["src/lib.rs"]);
639    }
640
641    #[test]
642    fn kind_filter_drops_unrequested_events() {
643        let root = std::env::current_dir().unwrap();
644        let mut filter = filter(root.clone(), None);
645        filter.kinds = BTreeSet::from(["remove".to_string()]);
646
647        let events = coalesce_events(
648            vec![
649                event(
650                    EventKind::Create(notify::event::CreateKind::Any),
651                    root.join("src/lib.rs"),
652                ),
653                event(
654                    EventKind::Remove(notify::event::RemoveKind::Any),
655                    root.join("src/lib.rs"),
656                ),
657            ],
658            &filter,
659        );
660
661        assert_eq!(events.len(), 1);
662        assert_eq!(events[0].kind, "remove");
663    }
664
665    #[test]
666    fn gitignore_filter_drops_ignored_paths() {
667        let temp = tempfile::tempdir().unwrap();
668        std::fs::write(temp.path().join(".gitignore"), "ignored.txt\n").unwrap();
669        let mut filter = filter(temp.path().to_path_buf(), None);
670        filter.gitignore = Some(build_gitignore(temp.path()));
671
672        let events = coalesce_events(
673            vec![
674                event(
675                    EventKind::Modify(ModifyKind::Any),
676                    temp.path().join("allowed.txt"),
677                ),
678                event(
679                    EventKind::Modify(ModifyKind::Any),
680                    temp.path().join("ignored.txt"),
681                ),
682            ],
683            &filter,
684        );
685
686        assert_eq!(events.len(), 1);
687        assert_eq!(events[0].relative_paths, vec!["allowed.txt"]);
688    }
689}