1use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{mpsc, Mutex, OnceLock};
10use std::thread;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use globset::{Glob, GlobSet, GlobSetBuilder};
14use harn_vm::agent_events::{AgentEvent, FsWatchEvent};
15use harn_vm::VmValue;
16use ignore::gitignore::{Gitignore, GitignoreBuilder};
17use notify::event::{ModifyKind, RenameMode};
18use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
19
20use crate::error::HostlibError;
21use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
22use crate::tools::args::{
23 build_dict, dict_arg, optional_bool, optional_int, optional_string, str_value,
24};
25
26const SUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_subscribe";
27const UNSUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_unsubscribe";
28const DEFAULT_DEBOUNCE_MS: u64 = 50;
29const DEFAULT_KINDS: &[&str] = &["create", "modify", "remove", "rename"];
30const SUPPORTED_KINDS: &[&str] = &["access", "create", "modify", "other", "remove", "rename"];
31
32static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
33
34#[derive(Default)]
36pub struct FsWatchCapability;
37
38impl HostlibCapability for FsWatchCapability {
39 fn module_name(&self) -> &'static str {
40 "fs_watch"
41 }
42
43 fn register_builtins(&self, registry: &mut BuiltinRegistry) {
44 registry.register(RegisteredBuiltin {
45 name: SUBSCRIBE_BUILTIN,
46 module: "fs_watch",
47 method: "subscribe",
48 handler: subscribe_handler(),
49 });
50 registry.register(RegisteredBuiltin {
51 name: UNSUBSCRIBE_BUILTIN,
52 module: "fs_watch",
53 method: "unsubscribe",
54 handler: unsubscribe_handler(),
55 });
56 }
57}
58
59fn subscribe_handler() -> SyncHandler {
60 std::sync::Arc::new(subscribe)
61}
62
63fn unsubscribe_handler() -> SyncHandler {
64 std::sync::Arc::new(unsubscribe)
65}
66
67struct Subscription {
68 _watcher: RecommendedWatcher,
69 stop_tx: mpsc::Sender<WatchMessage>,
70 worker: Option<thread::JoinHandle<()>>,
71}
72
73impl Drop for Subscription {
74 fn drop(&mut self) {
75 let _ = self.stop_tx.send(WatchMessage::Stop);
76 if let Some(worker) = self.worker.take() {
77 let _ = worker.join();
78 }
79 }
80}
81
82enum WatchMessage {
83 Event(Event),
84 Error(String),
85 Stop,
86}
87
88#[derive(Clone)]
89struct WatchFilter {
90 session_id: String,
91 subscription_id: String,
92 root: PathBuf,
93 globs: Option<GlobSet>,
94 gitignore: Option<Gitignore>,
95 kinds: BTreeSet<String>,
96}
97
98fn subscriptions() -> &'static Mutex<HashMap<String, Subscription>> {
99 static SUBSCRIPTIONS: OnceLock<Mutex<HashMap<String, Subscription>>> = OnceLock::new();
100 SUBSCRIPTIONS.get_or_init(|| Mutex::new(HashMap::new()))
101}
102
103fn subscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
104 let raw = dict_arg(SUBSCRIBE_BUILTIN, args)?;
105 let dict = raw.as_ref();
106 let request = SubscribeRequest::from_dict(dict)?;
107 let subscription_id = next_subscription_id();
108 let (tx, rx) = mpsc::channel();
109
110 let filter = WatchFilter {
111 session_id: request.session_id.clone(),
112 subscription_id: subscription_id.clone(),
113 root: request.root.clone(),
114 globs: request.globs,
115 gitignore: request.gitignore,
116 kinds: request.kinds,
117 };
118 let debounce = Duration::from_millis(request.debounce_ms);
119 let worker = thread::Builder::new()
120 .name(format!("harn-fs-watch-{subscription_id}"))
121 .spawn(move || watch_worker(rx, debounce, filter))
122 .map_err(|err| HostlibError::Backend {
123 builtin: SUBSCRIBE_BUILTIN,
124 message: format!("failed to spawn watch worker: {err}"),
125 })?;
126
127 let notify_tx = tx.clone();
128 let mut watcher = notify::recommended_watcher(move |result: notify::Result<Event>| {
129 let message = match result {
130 Ok(event) => WatchMessage::Event(event),
131 Err(err) => WatchMessage::Error(err.to_string()),
132 };
133 let _ = notify_tx.send(message);
134 })
135 .map_err(|err| HostlibError::Backend {
136 builtin: SUBSCRIBE_BUILTIN,
137 message: format!("failed to create watcher: {err}"),
138 })?;
139
140 let mode = if request.recursive {
141 RecursiveMode::Recursive
142 } else {
143 RecursiveMode::NonRecursive
144 };
145 for path in &request.watch_paths {
146 watcher
147 .watch(path, mode)
148 .map_err(|err| HostlibError::Backend {
149 builtin: SUBSCRIBE_BUILTIN,
150 message: format!("failed to watch {}: {err}", path.display()),
151 })?;
152 }
153
154 subscriptions()
155 .lock()
156 .expect("fs_watch mutex poisoned")
157 .insert(
158 subscription_id.clone(),
159 Subscription {
160 _watcher: watcher,
161 stop_tx: tx,
162 worker: Some(worker),
163 },
164 );
165
166 Ok(build_dict([(
167 "subscription_id",
168 str_value(subscription_id.as_str()),
169 )]))
170}
171
172fn unsubscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
173 let raw = dict_arg(UNSUBSCRIBE_BUILTIN, args)?;
174 let dict = raw.as_ref();
175 let subscription_id = match dict.get("subscription_id") {
176 Some(VmValue::String(value)) if !value.trim().is_empty() => value.to_string(),
177 Some(other) => {
178 return Err(HostlibError::InvalidParameter {
179 builtin: UNSUBSCRIBE_BUILTIN,
180 param: "subscription_id",
181 message: format!("expected non-empty string, got {}", other.type_name()),
182 });
183 }
184 None => {
185 return Err(HostlibError::MissingParameter {
186 builtin: UNSUBSCRIBE_BUILTIN,
187 param: "subscription_id",
188 });
189 }
190 };
191 let removed = subscriptions()
192 .lock()
193 .expect("fs_watch mutex poisoned")
194 .remove(&subscription_id)
195 .is_some();
196 Ok(build_dict([("removed", VmValue::Bool(removed))]))
197}
198
199struct SubscribeRequest {
200 session_id: String,
201 root: PathBuf,
202 watch_paths: Vec<PathBuf>,
203 recursive: bool,
204 debounce_ms: u64,
205 globs: Option<GlobSet>,
206 gitignore: Option<Gitignore>,
207 kinds: BTreeSet<String>,
208}
209
210impl SubscribeRequest {
211 fn from_dict(dict: &BTreeMap<String, VmValue>) -> Result<Self, HostlibError> {
212 let root_param = optional_string(SUBSCRIBE_BUILTIN, dict, "root")?;
213 let raw_paths = optional_string_list(SUBSCRIBE_BUILTIN, dict, "paths")?;
214 let raw_globs = optional_string_list(SUBSCRIBE_BUILTIN, dict, "globs")?;
215 let session_id = optional_string(SUBSCRIBE_BUILTIN, dict, "session_id")?
216 .or_else(harn_vm::agent_sessions::current_session_id)
217 .ok_or(HostlibError::MissingParameter {
218 builtin: SUBSCRIBE_BUILTIN,
219 param: "session_id",
220 })?;
221
222 if session_id.trim().is_empty() {
223 return Err(HostlibError::InvalidParameter {
224 builtin: SUBSCRIBE_BUILTIN,
225 param: "session_id",
226 message: "must not be empty".to_string(),
227 });
228 }
229
230 if root_param.is_none() && raw_paths.is_none() {
231 return Err(HostlibError::MissingParameter {
232 builtin: SUBSCRIBE_BUILTIN,
233 param: "root",
234 });
235 }
236
237 let root = match root_param.as_deref() {
238 Some(root) => normalize_existing_path(SUBSCRIBE_BUILTIN, "root", root)?,
239 None => std::env::current_dir().map_err(|err| HostlibError::Backend {
240 builtin: SUBSCRIBE_BUILTIN,
241 message: format!("failed to resolve current directory: {err}"),
242 })?,
243 };
244
245 let raw_paths = raw_paths.unwrap_or_else(|| {
246 root_param
247 .as_ref()
248 .map(|root| vec![root.clone()])
249 .unwrap_or_default()
250 });
251 if raw_paths.is_empty() {
252 return Err(HostlibError::InvalidParameter {
253 builtin: SUBSCRIBE_BUILTIN,
254 param: "paths",
255 message: "must contain at least one path".to_string(),
256 });
257 }
258
259 let mut watch_paths = Vec::with_capacity(raw_paths.len());
260 for path in raw_paths {
261 let path = PathBuf::from(path);
262 let resolved = if path.is_relative() && root_param.is_some() {
263 root.join(path)
264 } else {
265 path
266 };
267 let normalized = normalize_existing_path_buf(SUBSCRIBE_BUILTIN, "paths", &resolved)?;
268 if normalized.strip_prefix(&root).is_err() {
269 return Err(HostlibError::InvalidParameter {
270 builtin: SUBSCRIBE_BUILTIN,
271 param: "paths",
272 message: format!(
273 "watch path `{}` is outside root `{}`",
274 normalized.display(),
275 root.display()
276 ),
277 });
278 }
279 watch_paths.push(normalized);
280 }
281
282 let recursive = optional_bool(SUBSCRIBE_BUILTIN, dict, "recursive", true)?;
283 let debounce_ms = optional_int(
284 SUBSCRIBE_BUILTIN,
285 dict,
286 "debounce_ms",
287 DEFAULT_DEBOUNCE_MS as i64,
288 )?;
289 if debounce_ms < 0 {
290 return Err(HostlibError::InvalidParameter {
291 builtin: SUBSCRIBE_BUILTIN,
292 param: "debounce_ms",
293 message: "must be >= 0".to_string(),
294 });
295 }
296 let respect_gitignore = optional_bool(SUBSCRIBE_BUILTIN, dict, "respect_gitignore", false)?;
297
298 Ok(Self {
299 session_id,
300 gitignore: if respect_gitignore {
301 Some(build_gitignore(&root))
302 } else {
303 None
304 },
305 globs: build_globs(raw_globs.unwrap_or_default())?,
306 kinds: parse_kinds(dict)?,
307 root,
308 watch_paths,
309 recursive,
310 debounce_ms: debounce_ms as u64,
311 })
312 }
313}
314
315fn watch_worker(rx: mpsc::Receiver<WatchMessage>, debounce: Duration, filter: WatchFilter) {
316 let mut pending = Vec::new();
317 loop {
318 match rx.recv() {
319 Ok(WatchMessage::Event(event)) => {
320 pending.push(event);
321 loop {
322 match rx.recv_timeout(debounce) {
323 Ok(WatchMessage::Event(event)) => pending.push(event),
324 Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
325 Ok(WatchMessage::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => {
326 emit_pending(&filter, &mut pending);
327 return;
328 }
329 Err(mpsc::RecvTimeoutError::Timeout) => break,
330 }
331 }
332 emit_pending(&filter, &mut pending);
333 }
334 Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
335 Ok(WatchMessage::Stop) | Err(_) => return,
336 }
337 }
338}
339
340fn emit_pending(filter: &WatchFilter, pending: &mut Vec<Event>) {
341 if pending.is_empty() {
342 return;
343 }
344 let events = coalesce_events(std::mem::take(pending), filter);
345 if events.is_empty() {
346 return;
347 }
348 harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
349 session_id: filter.session_id.clone(),
350 subscription_id: filter.subscription_id.clone(),
351 events,
352 });
353}
354
355fn emit_watch_error(filter: &WatchFilter, error: String) {
356 harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
357 session_id: filter.session_id.clone(),
358 subscription_id: filter.subscription_id.clone(),
359 events: vec![FsWatchEvent {
360 kind: "error".to_string(),
361 paths: Vec::new(),
362 relative_paths: Vec::new(),
363 raw_kind: "error".to_string(),
364 error: Some(error),
365 }],
366 });
367}
368
369fn coalesce_events(events: Vec<Event>, filter: &WatchFilter) -> Vec<FsWatchEvent> {
370 let mut seen = BTreeSet::new();
371 let mut output = Vec::new();
372 for event in events {
373 let kind = normalize_kind(&event.kind);
374 if !filter.kinds.contains(kind) {
375 continue;
376 }
377 let mut paths = Vec::new();
378 let mut relative_paths = Vec::new();
379 for path in &event.paths {
380 if !filter.matches_path(path) {
381 continue;
382 }
383 paths.push(path_to_string(path));
384 relative_paths.push(filter.relative_path(path));
385 }
386 if paths.is_empty() {
387 continue;
388 }
389 paths.sort();
390 paths.dedup();
391 relative_paths.sort();
392 relative_paths.dedup();
393 let raw_kind = format!("{:?}", event.kind);
394 if !seen.insert((kind.to_string(), paths.clone(), raw_kind.clone())) {
395 continue;
396 }
397 output.push(FsWatchEvent {
398 kind: kind.to_string(),
399 paths,
400 relative_paths,
401 raw_kind,
402 error: None,
403 });
404 }
405 output
406}
407
408impl WatchFilter {
409 fn matches_path(&self, path: &Path) -> bool {
410 if let Some(gitignore) = &self.gitignore {
411 if gitignore.matched(path, path.is_dir()).is_ignore() {
412 return false;
413 }
414 }
415 if let Some(globs) = &self.globs {
416 let relative = self.relative_path(path);
417 return globs.is_match(relative);
418 }
419 true
420 }
421
422 fn relative_path(&self, path: &Path) -> String {
423 let relative = path.strip_prefix(&self.root).unwrap_or(path);
424 let value = path_to_string(relative);
425 if value.is_empty() {
426 ".".to_string()
427 } else {
428 value
429 }
430 }
431}
432
433fn normalize_kind(kind: &EventKind) -> &'static str {
434 match kind {
435 EventKind::Create(_) => "create",
436 EventKind::Remove(_) => "remove",
437 EventKind::Modify(ModifyKind::Name(
438 RenameMode::Any
439 | RenameMode::To
440 | RenameMode::From
441 | RenameMode::Both
442 | RenameMode::Other,
443 )) => "rename",
444 EventKind::Modify(_) | EventKind::Any => "modify",
445 EventKind::Access(_) => "access",
446 EventKind::Other => "other",
447 }
448}
449
450fn parse_kinds(dict: &BTreeMap<String, VmValue>) -> Result<BTreeSet<String>, HostlibError> {
451 let values = optional_string_list(SUBSCRIBE_BUILTIN, dict, "kinds")?.unwrap_or_else(|| {
452 DEFAULT_KINDS
453 .iter()
454 .map(|kind| (*kind).to_string())
455 .collect()
456 });
457 let mut kinds = BTreeSet::new();
458 for kind in values {
459 if SUPPORTED_KINDS.contains(&kind.as_str()) {
460 kinds.insert(kind);
461 } else {
462 return Err(HostlibError::InvalidParameter {
463 builtin: SUBSCRIBE_BUILTIN,
464 param: "kinds",
465 message: format!("unsupported event kind `{kind}`"),
466 });
467 }
468 }
469 Ok(kinds)
470}
471
472fn build_globs(globs: Vec<String>) -> Result<Option<GlobSet>, HostlibError> {
473 if globs.is_empty() {
474 return Ok(None);
475 }
476 let mut builder = GlobSetBuilder::new();
477 for glob in globs {
478 let normalized = normalize_glob(&glob);
479 builder.add(
480 Glob::new(&normalized).map_err(|err| HostlibError::InvalidParameter {
481 builtin: SUBSCRIBE_BUILTIN,
482 param: "globs",
483 message: format!("invalid glob `{glob}`: {err}"),
484 })?,
485 );
486 }
487 Ok(Some(builder.build().map_err(|err| {
488 HostlibError::InvalidParameter {
489 builtin: SUBSCRIBE_BUILTIN,
490 param: "globs",
491 message: format!("invalid glob set: {err}"),
492 }
493 })?))
494}
495
496fn build_gitignore(root: &Path) -> Gitignore {
497 let mut builder = GitignoreBuilder::new(root);
498 let gitignore = root.join(".gitignore");
499 if gitignore.exists() {
500 let _ = builder.add(gitignore);
501 }
502 let exclude = root.join(".git").join("info").join("exclude");
503 if exclude.exists() {
504 let _ = builder.add(exclude);
505 }
506 builder.build().unwrap_or_else(|_| Gitignore::empty())
507}
508
509fn normalize_glob(glob: &str) -> String {
510 let glob = glob.replace('\\', "/");
511 if glob == "*" || glob.starts_with("**/") || glob.contains('/') {
512 glob
513 } else {
514 format!("**/{glob}")
515 }
516}
517
518fn optional_string_list(
519 builtin: &'static str,
520 dict: &BTreeMap<String, VmValue>,
521 key: &'static str,
522) -> Result<Option<Vec<String>>, HostlibError> {
523 let Some(value) = dict.get(key) else {
524 return Ok(None);
525 };
526 match value {
527 VmValue::Nil => Ok(None),
528 VmValue::List(items) => items
529 .iter()
530 .enumerate()
531 .map(|(idx, item)| match item {
532 VmValue::String(value) => Ok(value.to_string()),
533 other => Err(HostlibError::InvalidParameter {
534 builtin,
535 param: key,
536 message: format!("item {idx} must be a string, got {}", other.type_name()),
537 }),
538 })
539 .collect::<Result<Vec<_>, _>>()
540 .map(Some),
541 other => Err(HostlibError::InvalidParameter {
542 builtin,
543 param: key,
544 message: format!("expected list of strings, got {}", other.type_name()),
545 }),
546 }
547}
548
549fn normalize_existing_path(
550 builtin: &'static str,
551 param: &'static str,
552 path: &str,
553) -> Result<PathBuf, HostlibError> {
554 normalize_existing_path_buf(builtin, param, &PathBuf::from(path))
555}
556
557fn normalize_existing_path_buf(
558 builtin: &'static str,
559 param: &'static str,
560 path: &Path,
561) -> Result<PathBuf, HostlibError> {
562 path.canonicalize()
563 .map_err(|err| HostlibError::InvalidParameter {
564 builtin,
565 param,
566 message: format!(
567 "{} does not resolve to an existing path: {err}",
568 path.display()
569 ),
570 })
571}
572
573fn path_to_string(path: &Path) -> String {
574 path.to_string_lossy().replace('\\', "/")
575}
576
577fn next_subscription_id() -> String {
578 let seq = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
579 let millis = SystemTime::now()
580 .duration_since(UNIX_EPOCH)
581 .map(|duration| duration.as_millis())
582 .unwrap_or(0);
583 format!("fsw-{millis}-{seq}")
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589
590 fn event(kind: EventKind, path: impl Into<PathBuf>) -> Event {
591 Event::new(kind).add_path(path.into())
592 }
593
594 fn filter(root: PathBuf, globs: Option<Vec<&str>>) -> WatchFilter {
595 WatchFilter {
596 session_id: "session".to_string(),
597 subscription_id: "sub".to_string(),
598 root,
599 globs: globs.map(|patterns| {
600 build_globs(patterns.into_iter().map(str::to_string).collect())
601 .unwrap()
602 .unwrap()
603 }),
604 gitignore: None,
605 kinds: parse_kinds(&BTreeMap::new()).unwrap(),
606 }
607 }
608
609 #[test]
610 fn coalesce_deduplicates_same_kind_and_path() {
611 let root = std::env::current_dir().unwrap();
612 let path = root.join("src/lib.rs");
613 let filter = filter(root, None);
614 let events = coalesce_events(
615 vec![
616 event(EventKind::Modify(ModifyKind::Any), &path),
617 event(EventKind::Modify(ModifyKind::Any), &path),
618 ],
619 &filter,
620 );
621 assert_eq!(events.len(), 1);
622 assert_eq!(events[0].kind, "modify");
623 }
624
625 #[test]
626 fn glob_filter_uses_relative_paths() {
627 let root = std::env::current_dir().unwrap();
628 let filter = filter(root.clone(), Some(vec!["*.rs"]));
629 let events = coalesce_events(
630 vec![
631 event(
632 EventKind::Create(notify::event::CreateKind::Any),
633 root.join("src/lib.rs"),
634 ),
635 event(
636 EventKind::Create(notify::event::CreateKind::Any),
637 root.join("README.md"),
638 ),
639 ],
640 &filter,
641 );
642 assert_eq!(events.len(), 1);
643 assert_eq!(events[0].relative_paths, vec!["src/lib.rs"]);
644 }
645
646 #[test]
647 fn kind_filter_drops_unrequested_events() {
648 let root = std::env::current_dir().unwrap();
649 let mut filter = filter(root.clone(), None);
650 filter.kinds = BTreeSet::from(["remove".to_string()]);
651
652 let events = coalesce_events(
653 vec![
654 event(
655 EventKind::Create(notify::event::CreateKind::Any),
656 root.join("src/lib.rs"),
657 ),
658 event(
659 EventKind::Remove(notify::event::RemoveKind::Any),
660 root.join("src/lib.rs"),
661 ),
662 ],
663 &filter,
664 );
665
666 assert_eq!(events.len(), 1);
667 assert_eq!(events[0].kind, "remove");
668 }
669
670 #[test]
671 fn kind_filter_allows_access_and_other_events() {
672 let root = std::env::current_dir().unwrap();
673 let mut config = BTreeMap::new();
674 config.insert(
675 "kinds".to_string(),
676 VmValue::List(std::sync::Arc::new(vec![
677 VmValue::String(std::sync::Arc::from("access")),
678 VmValue::String(std::sync::Arc::from("other")),
679 ])),
680 );
681 let mut filter = filter(root.clone(), None);
682 filter.kinds = parse_kinds(&config).unwrap();
683
684 let events = coalesce_events(
685 vec![
686 event(
687 EventKind::Access(notify::event::AccessKind::Any),
688 root.join("src/lib.rs"),
689 ),
690 event(EventKind::Other, root.join("README.md")),
691 ],
692 &filter,
693 );
694
695 assert_eq!(events.len(), 2);
696 assert_eq!(events[0].kind, "access");
697 assert_eq!(events[1].kind, "other");
698 }
699
700 #[test]
701 fn gitignore_filter_drops_ignored_paths() {
702 let temp = tempfile::tempdir().unwrap();
703 std::fs::write(temp.path().join(".gitignore"), "ignored.txt\n").unwrap();
704 let mut filter = filter(temp.path().to_path_buf(), None);
705 filter.gitignore = Some(build_gitignore(temp.path()));
706
707 let events = coalesce_events(
708 vec![
709 event(
710 EventKind::Modify(ModifyKind::Any),
711 temp.path().join("allowed.txt"),
712 ),
713 event(
714 EventKind::Modify(ModifyKind::Any),
715 temp.path().join("ignored.txt"),
716 ),
717 ],
718 &filter,
719 );
720
721 assert_eq!(events.len(), 1);
722 assert_eq!(events[0].relative_paths, vec!["allowed.txt"]);
723 }
724}