1use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{mpsc, Mutex, OnceLock};
10use std::thread;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use globset::{Glob, GlobSet, GlobSetBuilder};
14use harn_vm::agent_events::{AgentEvent, FsWatchEvent};
15use harn_vm::VmValue;
16use ignore::gitignore::{Gitignore, GitignoreBuilder};
17use notify::event::{ModifyKind, RenameMode};
18use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
19
20use crate::error::HostlibError;
21use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
22use crate::tools::args::{
23 build_dict, dict_arg, optional_bool, optional_int, optional_string, str_value,
24};
25
26const SUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_subscribe";
27const UNSUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_unsubscribe";
28const DEFAULT_DEBOUNCE_MS: u64 = 50;
29
30static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
31
32#[derive(Default)]
34pub struct FsWatchCapability;
35
36impl HostlibCapability for FsWatchCapability {
37 fn module_name(&self) -> &'static str {
38 "fs_watch"
39 }
40
41 fn register_builtins(&self, registry: &mut BuiltinRegistry) {
42 registry.register(RegisteredBuiltin {
43 name: SUBSCRIBE_BUILTIN,
44 module: "fs_watch",
45 method: "subscribe",
46 handler: subscribe_handler(),
47 });
48 registry.register(RegisteredBuiltin {
49 name: UNSUBSCRIBE_BUILTIN,
50 module: "fs_watch",
51 method: "unsubscribe",
52 handler: unsubscribe_handler(),
53 });
54 }
55}
56
57fn subscribe_handler() -> SyncHandler {
58 std::sync::Arc::new(subscribe)
59}
60
61fn unsubscribe_handler() -> SyncHandler {
62 std::sync::Arc::new(unsubscribe)
63}
64
65struct Subscription {
66 _watcher: RecommendedWatcher,
67 stop_tx: mpsc::Sender<WatchMessage>,
68 worker: Option<thread::JoinHandle<()>>,
69}
70
71impl Drop for Subscription {
72 fn drop(&mut self) {
73 let _ = self.stop_tx.send(WatchMessage::Stop);
74 if let Some(worker) = self.worker.take() {
75 let _ = worker.join();
76 }
77 }
78}
79
80enum WatchMessage {
81 Event(Event),
82 Error(String),
83 Stop,
84}
85
86#[derive(Clone)]
87struct WatchFilter {
88 session_id: String,
89 subscription_id: String,
90 root: PathBuf,
91 globs: Option<GlobSet>,
92 gitignore: Option<Gitignore>,
93 kinds: BTreeSet<String>,
94}
95
96fn subscriptions() -> &'static Mutex<HashMap<String, Subscription>> {
97 static SUBSCRIPTIONS: OnceLock<Mutex<HashMap<String, Subscription>>> = OnceLock::new();
98 SUBSCRIPTIONS.get_or_init(|| Mutex::new(HashMap::new()))
99}
100
101fn subscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
102 let raw = dict_arg(SUBSCRIBE_BUILTIN, args)?;
103 let dict = raw.as_ref();
104 let request = SubscribeRequest::from_dict(dict)?;
105 let subscription_id = next_subscription_id();
106 let (tx, rx) = mpsc::channel();
107
108 let filter = WatchFilter {
109 session_id: request.session_id.clone(),
110 subscription_id: subscription_id.clone(),
111 root: request.root.clone(),
112 globs: request.globs,
113 gitignore: request.gitignore,
114 kinds: request.kinds,
115 };
116 let debounce = Duration::from_millis(request.debounce_ms);
117 let worker = thread::Builder::new()
118 .name(format!("harn-fs-watch-{subscription_id}"))
119 .spawn(move || watch_worker(rx, debounce, filter))
120 .map_err(|err| HostlibError::Backend {
121 builtin: SUBSCRIBE_BUILTIN,
122 message: format!("failed to spawn watch worker: {err}"),
123 })?;
124
125 let notify_tx = tx.clone();
126 let mut watcher = notify::recommended_watcher(move |result: notify::Result<Event>| {
127 let message = match result {
128 Ok(event) => WatchMessage::Event(event),
129 Err(err) => WatchMessage::Error(err.to_string()),
130 };
131 let _ = notify_tx.send(message);
132 })
133 .map_err(|err| HostlibError::Backend {
134 builtin: SUBSCRIBE_BUILTIN,
135 message: format!("failed to create watcher: {err}"),
136 })?;
137
138 let mode = if request.recursive {
139 RecursiveMode::Recursive
140 } else {
141 RecursiveMode::NonRecursive
142 };
143 for path in &request.watch_paths {
144 watcher
145 .watch(path, mode)
146 .map_err(|err| HostlibError::Backend {
147 builtin: SUBSCRIBE_BUILTIN,
148 message: format!("failed to watch {}: {err}", path.display()),
149 })?;
150 }
151
152 subscriptions()
153 .lock()
154 .expect("fs_watch mutex poisoned")
155 .insert(
156 subscription_id.clone(),
157 Subscription {
158 _watcher: watcher,
159 stop_tx: tx,
160 worker: Some(worker),
161 },
162 );
163
164 Ok(build_dict([(
165 "subscription_id",
166 str_value(subscription_id.as_str()),
167 )]))
168}
169
170fn unsubscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
171 let raw = dict_arg(UNSUBSCRIBE_BUILTIN, args)?;
172 let dict = raw.as_ref();
173 let subscription_id = match dict.get("subscription_id") {
174 Some(VmValue::String(value)) if !value.trim().is_empty() => value.to_string(),
175 Some(other) => {
176 return Err(HostlibError::InvalidParameter {
177 builtin: UNSUBSCRIBE_BUILTIN,
178 param: "subscription_id",
179 message: format!("expected non-empty string, got {}", other.type_name()),
180 });
181 }
182 None => {
183 return Err(HostlibError::MissingParameter {
184 builtin: UNSUBSCRIBE_BUILTIN,
185 param: "subscription_id",
186 });
187 }
188 };
189 let removed = subscriptions()
190 .lock()
191 .expect("fs_watch mutex poisoned")
192 .remove(&subscription_id)
193 .is_some();
194 Ok(build_dict([("removed", VmValue::Bool(removed))]))
195}
196
197struct SubscribeRequest {
198 session_id: String,
199 root: PathBuf,
200 watch_paths: Vec<PathBuf>,
201 recursive: bool,
202 debounce_ms: u64,
203 globs: Option<GlobSet>,
204 gitignore: Option<Gitignore>,
205 kinds: BTreeSet<String>,
206}
207
208impl SubscribeRequest {
209 fn from_dict(dict: &BTreeMap<String, VmValue>) -> Result<Self, HostlibError> {
210 let root_param = optional_string(SUBSCRIBE_BUILTIN, dict, "root")?;
211 let raw_paths = optional_string_list(SUBSCRIBE_BUILTIN, dict, "paths")?;
212 let raw_globs = optional_string_list(SUBSCRIBE_BUILTIN, dict, "globs")?;
213 let session_id = optional_string(SUBSCRIBE_BUILTIN, dict, "session_id")?
214 .or_else(harn_vm::agent_sessions::current_session_id)
215 .ok_or(HostlibError::MissingParameter {
216 builtin: SUBSCRIBE_BUILTIN,
217 param: "session_id",
218 })?;
219
220 if session_id.trim().is_empty() {
221 return Err(HostlibError::InvalidParameter {
222 builtin: SUBSCRIBE_BUILTIN,
223 param: "session_id",
224 message: "must not be empty".to_string(),
225 });
226 }
227
228 if root_param.is_none() && raw_paths.is_none() {
229 return Err(HostlibError::MissingParameter {
230 builtin: SUBSCRIBE_BUILTIN,
231 param: "root",
232 });
233 }
234
235 let root = match root_param.as_deref() {
236 Some(root) => normalize_existing_path(SUBSCRIBE_BUILTIN, "root", root)?,
237 None => std::env::current_dir().map_err(|err| HostlibError::Backend {
238 builtin: SUBSCRIBE_BUILTIN,
239 message: format!("failed to resolve current directory: {err}"),
240 })?,
241 };
242
243 let raw_paths = raw_paths.unwrap_or_else(|| {
244 root_param
245 .as_ref()
246 .map(|root| vec![root.clone()])
247 .unwrap_or_default()
248 });
249 if raw_paths.is_empty() {
250 return Err(HostlibError::InvalidParameter {
251 builtin: SUBSCRIBE_BUILTIN,
252 param: "paths",
253 message: "must contain at least one path".to_string(),
254 });
255 }
256
257 let mut watch_paths = Vec::with_capacity(raw_paths.len());
258 for path in raw_paths {
259 let path = PathBuf::from(path);
260 let resolved = if path.is_relative() && root_param.is_some() {
261 root.join(path)
262 } else {
263 path
264 };
265 let normalized = normalize_existing_path_buf(SUBSCRIBE_BUILTIN, "paths", &resolved)?;
266 if normalized.strip_prefix(&root).is_err() {
267 return Err(HostlibError::InvalidParameter {
268 builtin: SUBSCRIBE_BUILTIN,
269 param: "paths",
270 message: format!(
271 "watch path `{}` is outside root `{}`",
272 normalized.display(),
273 root.display()
274 ),
275 });
276 }
277 watch_paths.push(normalized);
278 }
279
280 let recursive = optional_bool(SUBSCRIBE_BUILTIN, dict, "recursive", true)?;
281 let debounce_ms = optional_int(
282 SUBSCRIBE_BUILTIN,
283 dict,
284 "debounce_ms",
285 DEFAULT_DEBOUNCE_MS as i64,
286 )?;
287 if debounce_ms < 0 {
288 return Err(HostlibError::InvalidParameter {
289 builtin: SUBSCRIBE_BUILTIN,
290 param: "debounce_ms",
291 message: "must be >= 0".to_string(),
292 });
293 }
294 let respect_gitignore = optional_bool(SUBSCRIBE_BUILTIN, dict, "respect_gitignore", false)?;
295
296 Ok(Self {
297 session_id,
298 gitignore: if respect_gitignore {
299 Some(build_gitignore(&root))
300 } else {
301 None
302 },
303 globs: build_globs(raw_globs.unwrap_or_default())?,
304 kinds: parse_kinds(dict)?,
305 root,
306 watch_paths,
307 recursive,
308 debounce_ms: debounce_ms as u64,
309 })
310 }
311}
312
313fn watch_worker(rx: mpsc::Receiver<WatchMessage>, debounce: Duration, filter: WatchFilter) {
314 let mut pending = Vec::new();
315 loop {
316 match rx.recv() {
317 Ok(WatchMessage::Event(event)) => {
318 pending.push(event);
319 loop {
320 match rx.recv_timeout(debounce) {
321 Ok(WatchMessage::Event(event)) => pending.push(event),
322 Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
323 Ok(WatchMessage::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => {
324 emit_pending(&filter, &mut pending);
325 return;
326 }
327 Err(mpsc::RecvTimeoutError::Timeout) => break,
328 }
329 }
330 emit_pending(&filter, &mut pending);
331 }
332 Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
333 Ok(WatchMessage::Stop) | Err(_) => return,
334 }
335 }
336}
337
338fn emit_pending(filter: &WatchFilter, pending: &mut Vec<Event>) {
339 if pending.is_empty() {
340 return;
341 }
342 let events = coalesce_events(std::mem::take(pending), filter);
343 if events.is_empty() {
344 return;
345 }
346 harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
347 session_id: filter.session_id.clone(),
348 subscription_id: filter.subscription_id.clone(),
349 events,
350 });
351}
352
353fn emit_watch_error(filter: &WatchFilter, error: String) {
354 harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
355 session_id: filter.session_id.clone(),
356 subscription_id: filter.subscription_id.clone(),
357 events: vec![FsWatchEvent {
358 kind: "error".to_string(),
359 paths: Vec::new(),
360 relative_paths: Vec::new(),
361 raw_kind: "error".to_string(),
362 error: Some(error),
363 }],
364 });
365}
366
367fn coalesce_events(events: Vec<Event>, filter: &WatchFilter) -> Vec<FsWatchEvent> {
368 let mut seen = BTreeSet::new();
369 let mut output = Vec::new();
370 for event in events {
371 let kind = normalize_kind(&event.kind);
372 if !filter.kinds.contains(kind) {
373 continue;
374 }
375 let mut paths = Vec::new();
376 let mut relative_paths = Vec::new();
377 for path in &event.paths {
378 if !filter.matches_path(path) {
379 continue;
380 }
381 paths.push(path_to_string(path));
382 relative_paths.push(filter.relative_path(path));
383 }
384 if paths.is_empty() {
385 continue;
386 }
387 paths.sort();
388 paths.dedup();
389 relative_paths.sort();
390 relative_paths.dedup();
391 let raw_kind = format!("{:?}", event.kind);
392 if !seen.insert((kind.to_string(), paths.clone(), raw_kind.clone())) {
393 continue;
394 }
395 output.push(FsWatchEvent {
396 kind: kind.to_string(),
397 paths,
398 relative_paths,
399 raw_kind,
400 error: None,
401 });
402 }
403 output
404}
405
406impl WatchFilter {
407 fn matches_path(&self, path: &Path) -> bool {
408 if let Some(gitignore) = &self.gitignore {
409 if gitignore.matched(path, path.is_dir()).is_ignore() {
410 return false;
411 }
412 }
413 if let Some(globs) = &self.globs {
414 let relative = self.relative_path(path);
415 return globs.is_match(relative);
416 }
417 true
418 }
419
420 fn relative_path(&self, path: &Path) -> String {
421 let relative = path.strip_prefix(&self.root).unwrap_or(path);
422 let value = path_to_string(relative);
423 if value.is_empty() {
424 ".".to_string()
425 } else {
426 value
427 }
428 }
429}
430
431fn normalize_kind(kind: &EventKind) -> &'static str {
432 match kind {
433 EventKind::Create(_) => "create",
434 EventKind::Remove(_) => "remove",
435 EventKind::Modify(ModifyKind::Name(
436 RenameMode::Any
437 | RenameMode::To
438 | RenameMode::From
439 | RenameMode::Both
440 | RenameMode::Other,
441 )) => "rename",
442 EventKind::Modify(_) | EventKind::Any => "modify",
443 EventKind::Access(_) => "access",
444 EventKind::Other => "other",
445 }
446}
447
448fn parse_kinds(dict: &BTreeMap<String, VmValue>) -> Result<BTreeSet<String>, HostlibError> {
449 let values = optional_string_list(SUBSCRIBE_BUILTIN, dict, "kinds")?.unwrap_or_else(|| {
450 vec![
451 "create".to_string(),
452 "modify".to_string(),
453 "remove".to_string(),
454 "rename".to_string(),
455 ]
456 });
457 let mut kinds = BTreeSet::new();
458 for kind in values {
459 match kind.as_str() {
460 "create" | "modify" | "remove" | "rename" => {
461 kinds.insert(kind);
462 }
463 _ => {
464 return Err(HostlibError::InvalidParameter {
465 builtin: SUBSCRIBE_BUILTIN,
466 param: "kinds",
467 message: format!("unsupported event kind `{kind}`"),
468 });
469 }
470 }
471 }
472 Ok(kinds)
473}
474
475fn build_globs(globs: Vec<String>) -> Result<Option<GlobSet>, HostlibError> {
476 if globs.is_empty() {
477 return Ok(None);
478 }
479 let mut builder = GlobSetBuilder::new();
480 for glob in globs {
481 let normalized = normalize_glob(&glob);
482 builder.add(
483 Glob::new(&normalized).map_err(|err| HostlibError::InvalidParameter {
484 builtin: SUBSCRIBE_BUILTIN,
485 param: "globs",
486 message: format!("invalid glob `{glob}`: {err}"),
487 })?,
488 );
489 }
490 Ok(Some(builder.build().map_err(|err| {
491 HostlibError::InvalidParameter {
492 builtin: SUBSCRIBE_BUILTIN,
493 param: "globs",
494 message: format!("invalid glob set: {err}"),
495 }
496 })?))
497}
498
499fn build_gitignore(root: &Path) -> Gitignore {
500 let mut builder = GitignoreBuilder::new(root);
501 let gitignore = root.join(".gitignore");
502 if gitignore.exists() {
503 let _ = builder.add(gitignore);
504 }
505 let exclude = root.join(".git").join("info").join("exclude");
506 if exclude.exists() {
507 let _ = builder.add(exclude);
508 }
509 builder.build().unwrap_or_else(|_| Gitignore::empty())
510}
511
512fn normalize_glob(glob: &str) -> String {
513 let glob = glob.replace('\\', "/");
514 if glob == "*" || glob.starts_with("**/") || glob.contains('/') {
515 glob
516 } else {
517 format!("**/{glob}")
518 }
519}
520
521fn optional_string_list(
522 builtin: &'static str,
523 dict: &BTreeMap<String, VmValue>,
524 key: &'static str,
525) -> Result<Option<Vec<String>>, HostlibError> {
526 let Some(value) = dict.get(key) else {
527 return Ok(None);
528 };
529 match value {
530 VmValue::Nil => Ok(None),
531 VmValue::List(items) => items
532 .iter()
533 .enumerate()
534 .map(|(idx, item)| match item {
535 VmValue::String(value) => Ok(value.to_string()),
536 other => Err(HostlibError::InvalidParameter {
537 builtin,
538 param: key,
539 message: format!("item {idx} must be a string, got {}", other.type_name()),
540 }),
541 })
542 .collect::<Result<Vec<_>, _>>()
543 .map(Some),
544 other => Err(HostlibError::InvalidParameter {
545 builtin,
546 param: key,
547 message: format!("expected list of strings, got {}", other.type_name()),
548 }),
549 }
550}
551
552fn normalize_existing_path(
553 builtin: &'static str,
554 param: &'static str,
555 path: &str,
556) -> Result<PathBuf, HostlibError> {
557 normalize_existing_path_buf(builtin, param, &PathBuf::from(path))
558}
559
560fn normalize_existing_path_buf(
561 builtin: &'static str,
562 param: &'static str,
563 path: &Path,
564) -> Result<PathBuf, HostlibError> {
565 path.canonicalize()
566 .map_err(|err| HostlibError::InvalidParameter {
567 builtin,
568 param,
569 message: format!(
570 "{} does not resolve to an existing path: {err}",
571 path.display()
572 ),
573 })
574}
575
576fn path_to_string(path: &Path) -> String {
577 path.to_string_lossy().replace('\\', "/")
578}
579
580fn next_subscription_id() -> String {
581 let seq = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
582 let millis = SystemTime::now()
583 .duration_since(UNIX_EPOCH)
584 .map(|duration| duration.as_millis())
585 .unwrap_or(0);
586 format!("fsw-{millis}-{seq}")
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 fn event(kind: EventKind, path: impl Into<PathBuf>) -> Event {
594 Event::new(kind).add_path(path.into())
595 }
596
597 fn filter(root: PathBuf, globs: Option<Vec<&str>>) -> WatchFilter {
598 WatchFilter {
599 session_id: "session".to_string(),
600 subscription_id: "sub".to_string(),
601 root,
602 globs: globs.map(|patterns| {
603 build_globs(patterns.into_iter().map(str::to_string).collect())
604 .unwrap()
605 .unwrap()
606 }),
607 gitignore: None,
608 kinds: parse_kinds(&BTreeMap::new()).unwrap(),
609 }
610 }
611
612 #[test]
613 fn coalesce_deduplicates_same_kind_and_path() {
614 let root = std::env::current_dir().unwrap();
615 let path = root.join("src/lib.rs");
616 let filter = filter(root, None);
617 let events = coalesce_events(
618 vec![
619 event(EventKind::Modify(ModifyKind::Any), &path),
620 event(EventKind::Modify(ModifyKind::Any), &path),
621 ],
622 &filter,
623 );
624 assert_eq!(events.len(), 1);
625 assert_eq!(events[0].kind, "modify");
626 }
627
628 #[test]
629 fn glob_filter_uses_relative_paths() {
630 let root = std::env::current_dir().unwrap();
631 let filter = filter(root.clone(), Some(vec!["*.rs"]));
632 let events = coalesce_events(
633 vec![
634 event(
635 EventKind::Create(notify::event::CreateKind::Any),
636 root.join("src/lib.rs"),
637 ),
638 event(
639 EventKind::Create(notify::event::CreateKind::Any),
640 root.join("README.md"),
641 ),
642 ],
643 &filter,
644 );
645 assert_eq!(events.len(), 1);
646 assert_eq!(events[0].relative_paths, vec!["src/lib.rs"]);
647 }
648
649 #[test]
650 fn kind_filter_drops_unrequested_events() {
651 let root = std::env::current_dir().unwrap();
652 let mut filter = filter(root.clone(), None);
653 filter.kinds = BTreeSet::from(["remove".to_string()]);
654
655 let events = coalesce_events(
656 vec![
657 event(
658 EventKind::Create(notify::event::CreateKind::Any),
659 root.join("src/lib.rs"),
660 ),
661 event(
662 EventKind::Remove(notify::event::RemoveKind::Any),
663 root.join("src/lib.rs"),
664 ),
665 ],
666 &filter,
667 );
668
669 assert_eq!(events.len(), 1);
670 assert_eq!(events[0].kind, "remove");
671 }
672
673 #[test]
674 fn gitignore_filter_drops_ignored_paths() {
675 let temp = tempfile::tempdir().unwrap();
676 std::fs::write(temp.path().join(".gitignore"), "ignored.txt\n").unwrap();
677 let mut filter = filter(temp.path().to_path_buf(), None);
678 filter.gitignore = Some(build_gitignore(temp.path()));
679
680 let events = coalesce_events(
681 vec![
682 event(
683 EventKind::Modify(ModifyKind::Any),
684 temp.path().join("allowed.txt"),
685 ),
686 event(
687 EventKind::Modify(ModifyKind::Any),
688 temp.path().join("ignored.txt"),
689 ),
690 ],
691 &filter,
692 );
693
694 assert_eq!(events.len(), 1);
695 assert_eq!(events[0].relative_paths, vec!["allowed.txt"]);
696 }
697}