1use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{mpsc, Mutex, OnceLock};
10use std::thread;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use globset::{Glob, GlobSet, GlobSetBuilder};
14use harn_vm::agent_events::{AgentEvent, FsWatchEvent};
15use harn_vm::VmValue;
16use ignore::gitignore::{Gitignore, GitignoreBuilder};
17use notify::event::{ModifyKind, RenameMode};
18use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
19
20use crate::error::HostlibError;
21use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
22use crate::tools::args::{
23 build_dict, dict_arg, optional_bool, optional_int, optional_string, str_value,
24};
25use crate::value_args::optional_string_list;
26
27const SUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_subscribe";
28const UNSUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_unsubscribe";
29const DEFAULT_DEBOUNCE_MS: u64 = 50;
30const DEFAULT_KINDS: &[&str] = &["create", "modify", "remove", "rename"];
31const SUPPORTED_KINDS: &[&str] = &["access", "create", "modify", "other", "remove", "rename"];
32
33static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
34
35#[derive(Default)]
37pub struct FsWatchCapability;
38
39impl HostlibCapability for FsWatchCapability {
40 fn module_name(&self) -> &'static str {
41 "fs_watch"
42 }
43
44 fn register_builtins(&self, registry: &mut BuiltinRegistry) {
45 registry.register(RegisteredBuiltin {
46 name: SUBSCRIBE_BUILTIN,
47 module: "fs_watch",
48 method: "subscribe",
49 handler: subscribe_handler(),
50 });
51 registry.register(RegisteredBuiltin {
52 name: UNSUBSCRIBE_BUILTIN,
53 module: "fs_watch",
54 method: "unsubscribe",
55 handler: unsubscribe_handler(),
56 });
57 }
58}
59
60fn subscribe_handler() -> SyncHandler {
61 std::sync::Arc::new(subscribe)
62}
63
64fn unsubscribe_handler() -> SyncHandler {
65 std::sync::Arc::new(unsubscribe)
66}
67
68struct Subscription {
69 _watcher: RecommendedWatcher,
70 stop_tx: mpsc::Sender<WatchMessage>,
71 worker: Option<thread::JoinHandle<()>>,
72}
73
74impl Drop for Subscription {
75 fn drop(&mut self) {
76 let _ = self.stop_tx.send(WatchMessage::Stop);
77 if let Some(worker) = self.worker.take() {
78 let _ = worker.join();
79 }
80 }
81}
82
83enum WatchMessage {
84 Event(Event),
85 Error(String),
86 Stop,
87}
88
89#[derive(Clone)]
90struct WatchFilter {
91 session_id: String,
92 subscription_id: String,
93 root: PathBuf,
94 globs: Option<GlobSet>,
95 gitignore: Option<Gitignore>,
96 kinds: BTreeSet<String>,
97}
98
99fn subscriptions() -> &'static Mutex<HashMap<String, Subscription>> {
100 static SUBSCRIPTIONS: OnceLock<Mutex<HashMap<String, Subscription>>> = OnceLock::new();
101 SUBSCRIPTIONS.get_or_init(|| Mutex::new(HashMap::new()))
102}
103
104fn subscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
105 let raw = dict_arg(SUBSCRIBE_BUILTIN, args)?;
106 let dict = raw.as_ref();
107 let request = SubscribeRequest::from_dict(dict)?;
108 let subscription_id = next_subscription_id();
109 let (tx, rx) = mpsc::channel();
110
111 let filter = WatchFilter {
112 session_id: request.session_id.clone(),
113 subscription_id: subscription_id.clone(),
114 root: request.root.clone(),
115 globs: request.globs,
116 gitignore: request.gitignore,
117 kinds: request.kinds,
118 };
119 let debounce = Duration::from_millis(request.debounce_ms);
120 let worker = thread::Builder::new()
121 .name(format!("harn-fs-watch-{subscription_id}"))
122 .spawn(move || watch_worker(rx, debounce, filter))
123 .map_err(|err| HostlibError::Backend {
124 builtin: SUBSCRIBE_BUILTIN,
125 message: format!("failed to spawn watch worker: {err}"),
126 })?;
127
128 let notify_tx = tx.clone();
129 let mut watcher = notify::recommended_watcher(move |result: notify::Result<Event>| {
130 let message = match result {
131 Ok(event) => WatchMessage::Event(event),
132 Err(err) => WatchMessage::Error(err.to_string()),
133 };
134 let _ = notify_tx.send(message);
135 })
136 .map_err(|err| HostlibError::Backend {
137 builtin: SUBSCRIBE_BUILTIN,
138 message: format!("failed to create watcher: {err}"),
139 })?;
140
141 let mode = if request.recursive {
142 RecursiveMode::Recursive
143 } else {
144 RecursiveMode::NonRecursive
145 };
146 for path in &request.watch_paths {
147 watcher
148 .watch(path, mode)
149 .map_err(|err| HostlibError::Backend {
150 builtin: SUBSCRIBE_BUILTIN,
151 message: format!("failed to watch {}: {err}", path.display()),
152 })?;
153 }
154
155 subscriptions()
156 .lock()
157 .expect("fs_watch mutex poisoned")
158 .insert(
159 subscription_id.clone(),
160 Subscription {
161 _watcher: watcher,
162 stop_tx: tx,
163 worker: Some(worker),
164 },
165 );
166
167 Ok(build_dict([(
168 "subscription_id",
169 str_value(subscription_id.as_str()),
170 )]))
171}
172
173fn unsubscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
174 let raw = dict_arg(UNSUBSCRIBE_BUILTIN, args)?;
175 let dict = raw.as_ref();
176 let subscription_id = match dict.get("subscription_id") {
177 Some(VmValue::String(value)) if !value.trim().is_empty() => value.to_string(),
178 Some(other) => {
179 return Err(HostlibError::InvalidParameter {
180 builtin: UNSUBSCRIBE_BUILTIN,
181 param: "subscription_id",
182 message: format!("expected non-empty string, got {}", other.type_name()),
183 });
184 }
185 None => {
186 return Err(HostlibError::MissingParameter {
187 builtin: UNSUBSCRIBE_BUILTIN,
188 param: "subscription_id",
189 });
190 }
191 };
192 let removed = subscriptions()
193 .lock()
194 .expect("fs_watch mutex poisoned")
195 .remove(&subscription_id)
196 .is_some();
197 Ok(build_dict([("removed", VmValue::Bool(removed))]))
198}
199
200struct SubscribeRequest {
201 session_id: String,
202 root: PathBuf,
203 watch_paths: Vec<PathBuf>,
204 recursive: bool,
205 debounce_ms: u64,
206 globs: Option<GlobSet>,
207 gitignore: Option<Gitignore>,
208 kinds: BTreeSet<String>,
209}
210
211impl SubscribeRequest {
212 fn from_dict(dict: &BTreeMap<String, VmValue>) -> Result<Self, HostlibError> {
213 let root_param = optional_string(SUBSCRIBE_BUILTIN, dict, "root")?;
214 let raw_paths = optional_string_list(SUBSCRIBE_BUILTIN, dict, "paths")?;
215 let raw_globs = optional_string_list(SUBSCRIBE_BUILTIN, dict, "globs")?;
216 let session_id = optional_string(SUBSCRIBE_BUILTIN, dict, "session_id")?
217 .or_else(harn_vm::agent_sessions::current_session_id)
218 .ok_or(HostlibError::MissingParameter {
219 builtin: SUBSCRIBE_BUILTIN,
220 param: "session_id",
221 })?;
222
223 if session_id.trim().is_empty() {
224 return Err(HostlibError::InvalidParameter {
225 builtin: SUBSCRIBE_BUILTIN,
226 param: "session_id",
227 message: "must not be empty".to_string(),
228 });
229 }
230
231 if root_param.is_none() && raw_paths.is_none() {
232 return Err(HostlibError::MissingParameter {
233 builtin: SUBSCRIBE_BUILTIN,
234 param: "root",
235 });
236 }
237
238 let root = match root_param.as_deref() {
239 Some(root) => normalize_existing_path(SUBSCRIBE_BUILTIN, "root", root)?,
240 None => std::env::current_dir().map_err(|err| HostlibError::Backend {
241 builtin: SUBSCRIBE_BUILTIN,
242 message: format!("failed to resolve current directory: {err}"),
243 })?,
244 };
245
246 let raw_paths = raw_paths.unwrap_or_else(|| {
247 root_param
248 .as_ref()
249 .map(|root| vec![root.clone()])
250 .unwrap_or_default()
251 });
252 if raw_paths.is_empty() {
253 return Err(HostlibError::InvalidParameter {
254 builtin: SUBSCRIBE_BUILTIN,
255 param: "paths",
256 message: "must contain at least one path".to_string(),
257 });
258 }
259
260 let mut watch_paths = Vec::with_capacity(raw_paths.len());
261 for path in raw_paths {
262 let path = PathBuf::from(path);
263 let resolved = if path.is_relative() && root_param.is_some() {
264 root.join(path)
265 } else {
266 path
267 };
268 let normalized = normalize_existing_path_buf(SUBSCRIBE_BUILTIN, "paths", &resolved)?;
269 if normalized.strip_prefix(&root).is_err() {
270 return Err(HostlibError::InvalidParameter {
271 builtin: SUBSCRIBE_BUILTIN,
272 param: "paths",
273 message: format!(
274 "watch path `{}` is outside root `{}`",
275 normalized.display(),
276 root.display()
277 ),
278 });
279 }
280 watch_paths.push(normalized);
281 }
282
283 let recursive = optional_bool(SUBSCRIBE_BUILTIN, dict, "recursive", true)?;
284 let debounce_ms = optional_int(
285 SUBSCRIBE_BUILTIN,
286 dict,
287 "debounce_ms",
288 DEFAULT_DEBOUNCE_MS as i64,
289 )?;
290 if debounce_ms < 0 {
291 return Err(HostlibError::InvalidParameter {
292 builtin: SUBSCRIBE_BUILTIN,
293 param: "debounce_ms",
294 message: "must be >= 0".to_string(),
295 });
296 }
297 let respect_gitignore = optional_bool(SUBSCRIBE_BUILTIN, dict, "respect_gitignore", false)?;
298
299 Ok(Self {
300 session_id,
301 gitignore: if respect_gitignore {
302 Some(build_gitignore(&root))
303 } else {
304 None
305 },
306 globs: build_globs(raw_globs.unwrap_or_default())?,
307 kinds: parse_kinds(dict)?,
308 root,
309 watch_paths,
310 recursive,
311 debounce_ms: debounce_ms as u64,
312 })
313 }
314}
315
316fn watch_worker(rx: mpsc::Receiver<WatchMessage>, debounce: Duration, filter: WatchFilter) {
317 let mut pending = Vec::new();
318 loop {
319 match rx.recv() {
320 Ok(WatchMessage::Event(event)) => {
321 pending.push(event);
322 loop {
323 match rx.recv_timeout(debounce) {
324 Ok(WatchMessage::Event(event)) => pending.push(event),
325 Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
326 Ok(WatchMessage::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => {
327 emit_pending(&filter, &mut pending);
328 return;
329 }
330 Err(mpsc::RecvTimeoutError::Timeout) => break,
331 }
332 }
333 emit_pending(&filter, &mut pending);
334 }
335 Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
336 Ok(WatchMessage::Stop) | Err(_) => return,
337 }
338 }
339}
340
341fn emit_pending(filter: &WatchFilter, pending: &mut Vec<Event>) {
342 if pending.is_empty() {
343 return;
344 }
345 let events = coalesce_events(std::mem::take(pending), filter);
346 if events.is_empty() {
347 return;
348 }
349 harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
350 session_id: filter.session_id.clone(),
351 subscription_id: filter.subscription_id.clone(),
352 events,
353 });
354}
355
356fn emit_watch_error(filter: &WatchFilter, error: String) {
357 harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
358 session_id: filter.session_id.clone(),
359 subscription_id: filter.subscription_id.clone(),
360 events: vec![FsWatchEvent {
361 kind: "error".to_string(),
362 paths: Vec::new(),
363 relative_paths: Vec::new(),
364 raw_kind: "error".to_string(),
365 error: Some(error),
366 }],
367 });
368}
369
370fn coalesce_events(events: Vec<Event>, filter: &WatchFilter) -> Vec<FsWatchEvent> {
371 let mut seen = BTreeSet::new();
372 let mut output = Vec::new();
373 for event in events {
374 let kind = normalize_kind(&event.kind);
375 if !filter.kinds.contains(kind) {
376 continue;
377 }
378 let mut paths = Vec::new();
379 let mut relative_paths = Vec::new();
380 for path in &event.paths {
381 if !filter.matches_path(path) {
382 continue;
383 }
384 paths.push(path_to_string(path));
385 relative_paths.push(filter.relative_path(path));
386 }
387 if paths.is_empty() {
388 continue;
389 }
390 paths.sort();
391 paths.dedup();
392 relative_paths.sort();
393 relative_paths.dedup();
394 let raw_kind = format!("{:?}", event.kind);
395 if !seen.insert((kind.to_string(), paths.clone(), raw_kind.clone())) {
396 continue;
397 }
398 output.push(FsWatchEvent {
399 kind: kind.to_string(),
400 paths,
401 relative_paths,
402 raw_kind,
403 error: None,
404 });
405 }
406 output
407}
408
409impl WatchFilter {
410 fn matches_path(&self, path: &Path) -> bool {
411 if let Some(gitignore) = &self.gitignore {
412 if gitignore.matched(path, path.is_dir()).is_ignore() {
413 return false;
414 }
415 }
416 if let Some(globs) = &self.globs {
417 let relative = self.relative_path(path);
418 return globs.is_match(relative);
419 }
420 true
421 }
422
423 fn relative_path(&self, path: &Path) -> String {
424 let relative = path.strip_prefix(&self.root).unwrap_or(path);
425 let value = path_to_string(relative);
426 if value.is_empty() {
427 ".".to_string()
428 } else {
429 value
430 }
431 }
432}
433
434fn normalize_kind(kind: &EventKind) -> &'static str {
435 match kind {
436 EventKind::Create(_) => "create",
437 EventKind::Remove(_) => "remove",
438 EventKind::Modify(ModifyKind::Name(
439 RenameMode::Any
440 | RenameMode::To
441 | RenameMode::From
442 | RenameMode::Both
443 | RenameMode::Other,
444 )) => "rename",
445 EventKind::Modify(_) | EventKind::Any => "modify",
446 EventKind::Access(_) => "access",
447 EventKind::Other => "other",
448 }
449}
450
451fn parse_kinds(dict: &BTreeMap<String, VmValue>) -> Result<BTreeSet<String>, HostlibError> {
452 let values = optional_string_list(SUBSCRIBE_BUILTIN, dict, "kinds")?.unwrap_or_else(|| {
453 DEFAULT_KINDS
454 .iter()
455 .map(|kind| (*kind).to_string())
456 .collect()
457 });
458 let mut kinds = BTreeSet::new();
459 for kind in values {
460 if SUPPORTED_KINDS.contains(&kind.as_str()) {
461 kinds.insert(kind);
462 } else {
463 return Err(HostlibError::InvalidParameter {
464 builtin: SUBSCRIBE_BUILTIN,
465 param: "kinds",
466 message: format!("unsupported event kind `{kind}`"),
467 });
468 }
469 }
470 Ok(kinds)
471}
472
473fn build_globs(globs: Vec<String>) -> Result<Option<GlobSet>, HostlibError> {
474 if globs.is_empty() {
475 return Ok(None);
476 }
477 let mut builder = GlobSetBuilder::new();
478 for glob in globs {
479 let normalized = normalize_glob(&glob);
480 builder.add(
481 Glob::new(&normalized).map_err(|err| HostlibError::InvalidParameter {
482 builtin: SUBSCRIBE_BUILTIN,
483 param: "globs",
484 message: format!("invalid glob `{glob}`: {err}"),
485 })?,
486 );
487 }
488 Ok(Some(builder.build().map_err(|err| {
489 HostlibError::InvalidParameter {
490 builtin: SUBSCRIBE_BUILTIN,
491 param: "globs",
492 message: format!("invalid glob set: {err}"),
493 }
494 })?))
495}
496
497fn build_gitignore(root: &Path) -> Gitignore {
498 let mut builder = GitignoreBuilder::new(root);
499 let gitignore = root.join(".gitignore");
500 if gitignore.exists() {
501 let _ = builder.add(gitignore);
502 }
503 let exclude = root.join(".git").join("info").join("exclude");
504 if exclude.exists() {
505 let _ = builder.add(exclude);
506 }
507 builder.build().unwrap_or_else(|_| Gitignore::empty())
508}
509
510fn normalize_glob(glob: &str) -> String {
511 let glob = glob.replace('\\', "/");
512 if glob == "*" || glob.starts_with("**/") || glob.contains('/') {
513 glob
514 } else {
515 format!("**/{glob}")
516 }
517}
518
519fn normalize_existing_path(
520 builtin: &'static str,
521 param: &'static str,
522 path: &str,
523) -> Result<PathBuf, HostlibError> {
524 normalize_existing_path_buf(builtin, param, &PathBuf::from(path))
525}
526
527fn normalize_existing_path_buf(
528 builtin: &'static str,
529 param: &'static str,
530 path: &Path,
531) -> Result<PathBuf, HostlibError> {
532 path.canonicalize()
533 .map_err(|err| HostlibError::InvalidParameter {
534 builtin,
535 param,
536 message: format!(
537 "{} does not resolve to an existing path: {err}",
538 path.display()
539 ),
540 })
541}
542
543fn path_to_string(path: &Path) -> String {
544 path.to_string_lossy().replace('\\', "/")
545}
546
547fn next_subscription_id() -> String {
548 let seq = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
549 let millis = SystemTime::now()
550 .duration_since(UNIX_EPOCH)
551 .map(|duration| duration.as_millis())
552 .unwrap_or(0);
553 format!("fsw-{millis}-{seq}")
554}
555
556#[cfg(test)]
557mod tests {
558 use super::*;
559
560 fn event(kind: EventKind, path: impl Into<PathBuf>) -> Event {
561 Event::new(kind).add_path(path.into())
562 }
563
564 fn filter(root: PathBuf, globs: Option<Vec<&str>>) -> WatchFilter {
565 WatchFilter {
566 session_id: "session".to_string(),
567 subscription_id: "sub".to_string(),
568 root,
569 globs: globs.map(|patterns| {
570 build_globs(patterns.into_iter().map(str::to_string).collect())
571 .unwrap()
572 .unwrap()
573 }),
574 gitignore: None,
575 kinds: parse_kinds(&BTreeMap::new()).unwrap(),
576 }
577 }
578
579 #[test]
580 fn coalesce_deduplicates_same_kind_and_path() {
581 let root = std::env::current_dir().unwrap();
582 let path = root.join("src/lib.rs");
583 let filter = filter(root, None);
584 let events = coalesce_events(
585 vec![
586 event(EventKind::Modify(ModifyKind::Any), &path),
587 event(EventKind::Modify(ModifyKind::Any), &path),
588 ],
589 &filter,
590 );
591 assert_eq!(events.len(), 1);
592 assert_eq!(events[0].kind, "modify");
593 }
594
595 #[test]
596 fn glob_filter_uses_relative_paths() {
597 let root = std::env::current_dir().unwrap();
598 let filter = filter(root.clone(), Some(vec!["*.rs"]));
599 let events = coalesce_events(
600 vec![
601 event(
602 EventKind::Create(notify::event::CreateKind::Any),
603 root.join("src/lib.rs"),
604 ),
605 event(
606 EventKind::Create(notify::event::CreateKind::Any),
607 root.join("README.md"),
608 ),
609 ],
610 &filter,
611 );
612 assert_eq!(events.len(), 1);
613 assert_eq!(events[0].relative_paths, vec!["src/lib.rs"]);
614 }
615
616 #[test]
617 fn kind_filter_drops_unrequested_events() {
618 let root = std::env::current_dir().unwrap();
619 let mut filter = filter(root.clone(), None);
620 filter.kinds = BTreeSet::from(["remove".to_string()]);
621
622 let events = coalesce_events(
623 vec![
624 event(
625 EventKind::Create(notify::event::CreateKind::Any),
626 root.join("src/lib.rs"),
627 ),
628 event(
629 EventKind::Remove(notify::event::RemoveKind::Any),
630 root.join("src/lib.rs"),
631 ),
632 ],
633 &filter,
634 );
635
636 assert_eq!(events.len(), 1);
637 assert_eq!(events[0].kind, "remove");
638 }
639
640 #[test]
641 fn kind_filter_allows_access_and_other_events() {
642 let root = std::env::current_dir().unwrap();
643 let mut config = BTreeMap::new();
644 config.insert(
645 "kinds".to_string(),
646 VmValue::List(std::sync::Arc::new(vec![
647 VmValue::String(std::sync::Arc::from("access")),
648 VmValue::String(std::sync::Arc::from("other")),
649 ])),
650 );
651 let mut filter = filter(root.clone(), None);
652 filter.kinds = parse_kinds(&config).unwrap();
653
654 let events = coalesce_events(
655 vec![
656 event(
657 EventKind::Access(notify::event::AccessKind::Any),
658 root.join("src/lib.rs"),
659 ),
660 event(EventKind::Other, root.join("README.md")),
661 ],
662 &filter,
663 );
664
665 assert_eq!(events.len(), 2);
666 assert_eq!(events[0].kind, "access");
667 assert_eq!(events[1].kind, "other");
668 }
669
670 #[test]
671 fn gitignore_filter_drops_ignored_paths() {
672 let temp = tempfile::tempdir().unwrap();
673 std::fs::write(temp.path().join(".gitignore"), "ignored.txt\n").unwrap();
674 let mut filter = filter(temp.path().to_path_buf(), None);
675 filter.gitignore = Some(build_gitignore(temp.path()));
676
677 let events = coalesce_events(
678 vec![
679 event(
680 EventKind::Modify(ModifyKind::Any),
681 temp.path().join("allowed.txt"),
682 ),
683 event(
684 EventKind::Modify(ModifyKind::Any),
685 temp.path().join("ignored.txt"),
686 ),
687 ],
688 &filter,
689 );
690
691 assert_eq!(events.len(), 1);
692 assert_eq!(events[0].relative_paths, vec!["allowed.txt"]);
693 }
694}