1use std::collections::{BTreeMap, BTreeSet, HashMap};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{mpsc, Mutex, OnceLock};
10use std::thread;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use globset::{Glob, GlobSet, GlobSetBuilder};
14use harn_vm::agent_events::{AgentEvent, FsWatchEvent};
15use harn_vm::VmValue;
16use ignore::gitignore::{Gitignore, GitignoreBuilder};
17use notify::event::{ModifyKind, RenameMode};
18use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
19
20use crate::error::HostlibError;
21use crate::registry::{BuiltinRegistry, HostlibCapability, RegisteredBuiltin, SyncHandler};
22use crate::tools::args::{
23 build_dict, dict_arg, optional_bool, optional_int, optional_string, str_value,
24};
25
26const SUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_subscribe";
27const UNSUBSCRIBE_BUILTIN: &str = "hostlib_fs_watch_unsubscribe";
28const DEFAULT_DEBOUNCE_MS: u64 = 50;
29
30static NEXT_SUBSCRIPTION_ID: AtomicU64 = AtomicU64::new(1);
31
32#[derive(Default)]
34pub struct FsWatchCapability;
35
36impl HostlibCapability for FsWatchCapability {
37 fn module_name(&self) -> &'static str {
38 "fs_watch"
39 }
40
41 fn register_builtins(&self, registry: &mut BuiltinRegistry) {
42 registry.register(RegisteredBuiltin {
43 name: SUBSCRIBE_BUILTIN,
44 module: "fs_watch",
45 method: "subscribe",
46 handler: subscribe_handler(),
47 });
48 registry.register(RegisteredBuiltin {
49 name: UNSUBSCRIBE_BUILTIN,
50 module: "fs_watch",
51 method: "unsubscribe",
52 handler: unsubscribe_handler(),
53 });
54 }
55}
56
57fn subscribe_handler() -> SyncHandler {
58 std::sync::Arc::new(subscribe)
59}
60
61fn unsubscribe_handler() -> SyncHandler {
62 std::sync::Arc::new(unsubscribe)
63}
64
65struct Subscription {
66 _watcher: RecommendedWatcher,
67 stop_tx: mpsc::Sender<WatchMessage>,
68 worker: Option<thread::JoinHandle<()>>,
69}
70
71impl Drop for Subscription {
72 fn drop(&mut self) {
73 let _ = self.stop_tx.send(WatchMessage::Stop);
74 if let Some(worker) = self.worker.take() {
75 let _ = worker.join();
76 }
77 }
78}
79
80enum WatchMessage {
81 Event(Event),
82 Error(String),
83 Stop,
84}
85
86#[derive(Clone)]
87struct WatchFilter {
88 session_id: String,
89 subscription_id: String,
90 root: PathBuf,
91 globs: Option<GlobSet>,
92 gitignore: Option<Gitignore>,
93 kinds: BTreeSet<String>,
94}
95
96fn subscriptions() -> &'static Mutex<HashMap<String, Subscription>> {
97 static SUBSCRIPTIONS: OnceLock<Mutex<HashMap<String, Subscription>>> = OnceLock::new();
98 SUBSCRIPTIONS.get_or_init(|| Mutex::new(HashMap::new()))
99}
100
101fn subscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
102 let raw = dict_arg(SUBSCRIBE_BUILTIN, args)?;
103 let dict = raw.as_ref();
104 let request = SubscribeRequest::from_dict(dict)?;
105 let subscription_id = next_subscription_id();
106 let (tx, rx) = mpsc::channel();
107
108 let filter = WatchFilter {
109 session_id: request.session_id.clone(),
110 subscription_id: subscription_id.clone(),
111 root: request.root.clone(),
112 globs: request.globs,
113 gitignore: request.gitignore,
114 kinds: request.kinds,
115 };
116 let debounce = Duration::from_millis(request.debounce_ms);
117 let worker = thread::Builder::new()
118 .name(format!("harn-fs-watch-{subscription_id}"))
119 .spawn(move || watch_worker(rx, debounce, filter))
120 .map_err(|err| HostlibError::Backend {
121 builtin: SUBSCRIBE_BUILTIN,
122 message: format!("failed to spawn watch worker: {err}"),
123 })?;
124
125 let notify_tx = tx.clone();
126 let mut watcher = notify::recommended_watcher(move |result: notify::Result<Event>| {
127 let message = match result {
128 Ok(event) => WatchMessage::Event(event),
129 Err(err) => WatchMessage::Error(err.to_string()),
130 };
131 let _ = notify_tx.send(message);
132 })
133 .map_err(|err| HostlibError::Backend {
134 builtin: SUBSCRIBE_BUILTIN,
135 message: format!("failed to create watcher: {err}"),
136 })?;
137
138 let mode = if request.recursive {
139 RecursiveMode::Recursive
140 } else {
141 RecursiveMode::NonRecursive
142 };
143 for path in &request.watch_paths {
144 watcher
145 .watch(path, mode)
146 .map_err(|err| HostlibError::Backend {
147 builtin: SUBSCRIBE_BUILTIN,
148 message: format!("failed to watch {}: {err}", path.display()),
149 })?;
150 }
151
152 subscriptions()
153 .lock()
154 .expect("fs_watch mutex poisoned")
155 .insert(
156 subscription_id.clone(),
157 Subscription {
158 _watcher: watcher,
159 stop_tx: tx,
160 worker: Some(worker),
161 },
162 );
163
164 Ok(build_dict([(
165 "subscription_id",
166 str_value(subscription_id.as_str()),
167 )]))
168}
169
170fn unsubscribe(args: &[VmValue]) -> Result<VmValue, HostlibError> {
171 let raw = dict_arg(UNSUBSCRIBE_BUILTIN, args)?;
172 let dict = raw.as_ref();
173 let subscription_id = match dict.get("subscription_id") {
174 Some(VmValue::String(value)) if !value.trim().is_empty() => value.to_string(),
175 Some(other) => {
176 return Err(HostlibError::InvalidParameter {
177 builtin: UNSUBSCRIBE_BUILTIN,
178 param: "subscription_id",
179 message: format!("expected non-empty string, got {}", other.type_name()),
180 });
181 }
182 None => {
183 return Err(HostlibError::MissingParameter {
184 builtin: UNSUBSCRIBE_BUILTIN,
185 param: "subscription_id",
186 });
187 }
188 };
189 let removed = subscriptions()
190 .lock()
191 .expect("fs_watch mutex poisoned")
192 .remove(&subscription_id)
193 .is_some();
194 Ok(build_dict([("removed", VmValue::Bool(removed))]))
195}
196
197struct SubscribeRequest {
198 session_id: String,
199 root: PathBuf,
200 watch_paths: Vec<PathBuf>,
201 recursive: bool,
202 debounce_ms: u64,
203 globs: Option<GlobSet>,
204 gitignore: Option<Gitignore>,
205 kinds: BTreeSet<String>,
206}
207
208impl SubscribeRequest {
209 fn from_dict(dict: &BTreeMap<String, VmValue>) -> Result<Self, HostlibError> {
210 let root_param = optional_string(SUBSCRIBE_BUILTIN, dict, "root")?;
211 let raw_paths = optional_string_list(SUBSCRIBE_BUILTIN, dict, "paths")?;
212 let raw_globs = optional_string_list(SUBSCRIBE_BUILTIN, dict, "globs")?;
213 let session_id = optional_string(SUBSCRIBE_BUILTIN, dict, "session_id")?
214 .or_else(harn_vm::agent_sessions::current_session_id)
215 .ok_or(HostlibError::MissingParameter {
216 builtin: SUBSCRIBE_BUILTIN,
217 param: "session_id",
218 })?;
219
220 if session_id.trim().is_empty() {
221 return Err(HostlibError::InvalidParameter {
222 builtin: SUBSCRIBE_BUILTIN,
223 param: "session_id",
224 message: "must not be empty".to_string(),
225 });
226 }
227
228 if root_param.is_none() && raw_paths.is_none() {
229 return Err(HostlibError::MissingParameter {
230 builtin: SUBSCRIBE_BUILTIN,
231 param: "root",
232 });
233 }
234
235 let root = match root_param.as_deref() {
236 Some(root) => normalize_existing_path(SUBSCRIBE_BUILTIN, "root", root)?,
237 None => std::env::current_dir().map_err(|err| HostlibError::Backend {
238 builtin: SUBSCRIBE_BUILTIN,
239 message: format!("failed to resolve current directory: {err}"),
240 })?,
241 };
242
243 let raw_paths = raw_paths.unwrap_or_else(|| {
244 root_param
245 .as_ref()
246 .map(|root| vec![root.clone()])
247 .unwrap_or_default()
248 });
249 if raw_paths.is_empty() {
250 return Err(HostlibError::InvalidParameter {
251 builtin: SUBSCRIBE_BUILTIN,
252 param: "paths",
253 message: "must contain at least one path".to_string(),
254 });
255 }
256
257 let mut watch_paths = Vec::with_capacity(raw_paths.len());
258 for path in raw_paths {
259 let path = PathBuf::from(path);
260 let resolved = if path.is_relative() && root_param.is_some() {
261 root.join(path)
262 } else {
263 path
264 };
265 watch_paths.push(normalize_existing_path_buf(
266 SUBSCRIBE_BUILTIN,
267 "paths",
268 &resolved,
269 )?);
270 }
271
272 let recursive = optional_bool(SUBSCRIBE_BUILTIN, dict, "recursive", true)?;
273 let debounce_ms = optional_int(
274 SUBSCRIBE_BUILTIN,
275 dict,
276 "debounce_ms",
277 DEFAULT_DEBOUNCE_MS as i64,
278 )?;
279 if debounce_ms < 0 {
280 return Err(HostlibError::InvalidParameter {
281 builtin: SUBSCRIBE_BUILTIN,
282 param: "debounce_ms",
283 message: "must be >= 0".to_string(),
284 });
285 }
286 let respect_gitignore = optional_bool(SUBSCRIBE_BUILTIN, dict, "respect_gitignore", false)?;
287
288 Ok(Self {
289 session_id,
290 gitignore: if respect_gitignore {
291 Some(build_gitignore(&root))
292 } else {
293 None
294 },
295 globs: build_globs(raw_globs.unwrap_or_default())?,
296 kinds: parse_kinds(dict)?,
297 root,
298 watch_paths,
299 recursive,
300 debounce_ms: debounce_ms as u64,
301 })
302 }
303}
304
305fn watch_worker(rx: mpsc::Receiver<WatchMessage>, debounce: Duration, filter: WatchFilter) {
306 let mut pending = Vec::new();
307 loop {
308 match rx.recv() {
309 Ok(WatchMessage::Event(event)) => {
310 pending.push(event);
311 loop {
312 match rx.recv_timeout(debounce) {
313 Ok(WatchMessage::Event(event)) => pending.push(event),
314 Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
315 Ok(WatchMessage::Stop) | Err(mpsc::RecvTimeoutError::Disconnected) => {
316 emit_pending(&filter, &mut pending);
317 return;
318 }
319 Err(mpsc::RecvTimeoutError::Timeout) => break,
320 }
321 }
322 emit_pending(&filter, &mut pending);
323 }
324 Ok(WatchMessage::Error(error)) => emit_watch_error(&filter, error),
325 Ok(WatchMessage::Stop) | Err(_) => return,
326 }
327 }
328}
329
330fn emit_pending(filter: &WatchFilter, pending: &mut Vec<Event>) {
331 if pending.is_empty() {
332 return;
333 }
334 let events = coalesce_events(std::mem::take(pending), filter);
335 if events.is_empty() {
336 return;
337 }
338 harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
339 session_id: filter.session_id.clone(),
340 subscription_id: filter.subscription_id.clone(),
341 events,
342 });
343}
344
345fn emit_watch_error(filter: &WatchFilter, error: String) {
346 harn_vm::agent_events::emit_event(&AgentEvent::FsWatch {
347 session_id: filter.session_id.clone(),
348 subscription_id: filter.subscription_id.clone(),
349 events: vec![FsWatchEvent {
350 kind: "error".to_string(),
351 paths: Vec::new(),
352 relative_paths: Vec::new(),
353 raw_kind: "error".to_string(),
354 error: Some(error),
355 }],
356 });
357}
358
359fn coalesce_events(events: Vec<Event>, filter: &WatchFilter) -> Vec<FsWatchEvent> {
360 let mut seen = BTreeSet::new();
361 let mut output = Vec::new();
362 for event in events {
363 let kind = normalize_kind(&event.kind);
364 if !filter.kinds.contains(kind) {
365 continue;
366 }
367 let mut paths = Vec::new();
368 let mut relative_paths = Vec::new();
369 for path in &event.paths {
370 if !filter.matches_path(path) {
371 continue;
372 }
373 paths.push(path_to_string(path));
374 relative_paths.push(filter.relative_path(path));
375 }
376 if paths.is_empty() {
377 continue;
378 }
379 paths.sort();
380 paths.dedup();
381 relative_paths.sort();
382 relative_paths.dedup();
383 let raw_kind = format!("{:?}", event.kind);
384 if !seen.insert((kind.to_string(), paths.clone(), raw_kind.clone())) {
385 continue;
386 }
387 output.push(FsWatchEvent {
388 kind: kind.to_string(),
389 paths,
390 relative_paths,
391 raw_kind,
392 error: None,
393 });
394 }
395 output
396}
397
398impl WatchFilter {
399 fn matches_path(&self, path: &Path) -> bool {
400 if let Some(gitignore) = &self.gitignore {
401 if gitignore.matched(path, path.is_dir()).is_ignore() {
402 return false;
403 }
404 }
405 if let Some(globs) = &self.globs {
406 let relative = self.relative_path(path);
407 return globs.is_match(relative);
408 }
409 true
410 }
411
412 fn relative_path(&self, path: &Path) -> String {
413 let relative = path.strip_prefix(&self.root).unwrap_or(path);
414 let value = path_to_string(relative);
415 if value.is_empty() {
416 ".".to_string()
417 } else {
418 value
419 }
420 }
421}
422
423fn normalize_kind(kind: &EventKind) -> &'static str {
424 match kind {
425 EventKind::Create(_) => "create",
426 EventKind::Remove(_) => "remove",
427 EventKind::Modify(ModifyKind::Name(
428 RenameMode::Any
429 | RenameMode::To
430 | RenameMode::From
431 | RenameMode::Both
432 | RenameMode::Other,
433 )) => "rename",
434 EventKind::Modify(_) | EventKind::Any => "modify",
435 EventKind::Access(_) => "access",
436 EventKind::Other => "other",
437 }
438}
439
440fn parse_kinds(dict: &BTreeMap<String, VmValue>) -> Result<BTreeSet<String>, HostlibError> {
441 let values = optional_string_list(SUBSCRIBE_BUILTIN, dict, "kinds")?.unwrap_or_else(|| {
442 vec![
443 "create".to_string(),
444 "modify".to_string(),
445 "remove".to_string(),
446 "rename".to_string(),
447 ]
448 });
449 let mut kinds = BTreeSet::new();
450 for kind in values {
451 match kind.as_str() {
452 "create" | "modify" | "remove" | "rename" => {
453 kinds.insert(kind);
454 }
455 _ => {
456 return Err(HostlibError::InvalidParameter {
457 builtin: SUBSCRIBE_BUILTIN,
458 param: "kinds",
459 message: format!("unsupported event kind `{kind}`"),
460 });
461 }
462 }
463 }
464 Ok(kinds)
465}
466
467fn build_globs(globs: Vec<String>) -> Result<Option<GlobSet>, HostlibError> {
468 if globs.is_empty() {
469 return Ok(None);
470 }
471 let mut builder = GlobSetBuilder::new();
472 for glob in globs {
473 let normalized = normalize_glob(&glob);
474 builder.add(
475 Glob::new(&normalized).map_err(|err| HostlibError::InvalidParameter {
476 builtin: SUBSCRIBE_BUILTIN,
477 param: "globs",
478 message: format!("invalid glob `{glob}`: {err}"),
479 })?,
480 );
481 }
482 Ok(Some(builder.build().map_err(|err| {
483 HostlibError::InvalidParameter {
484 builtin: SUBSCRIBE_BUILTIN,
485 param: "globs",
486 message: format!("invalid glob set: {err}"),
487 }
488 })?))
489}
490
491fn build_gitignore(root: &Path) -> Gitignore {
492 let mut builder = GitignoreBuilder::new(root);
493 let gitignore = root.join(".gitignore");
494 if gitignore.exists() {
495 let _ = builder.add(gitignore);
496 }
497 let exclude = root.join(".git").join("info").join("exclude");
498 if exclude.exists() {
499 let _ = builder.add(exclude);
500 }
501 builder.build().unwrap_or_else(|_| Gitignore::empty())
502}
503
504fn normalize_glob(glob: &str) -> String {
505 let glob = glob.replace('\\', "/");
506 if glob == "*" || glob.starts_with("**/") || glob.contains('/') {
507 glob
508 } else {
509 format!("**/{glob}")
510 }
511}
512
513fn optional_string_list(
514 builtin: &'static str,
515 dict: &BTreeMap<String, VmValue>,
516 key: &'static str,
517) -> Result<Option<Vec<String>>, HostlibError> {
518 let Some(value) = dict.get(key) else {
519 return Ok(None);
520 };
521 match value {
522 VmValue::Nil => Ok(None),
523 VmValue::List(items) => items
524 .iter()
525 .enumerate()
526 .map(|(idx, item)| match item {
527 VmValue::String(value) => Ok(value.to_string()),
528 other => Err(HostlibError::InvalidParameter {
529 builtin,
530 param: key,
531 message: format!("item {idx} must be a string, got {}", other.type_name()),
532 }),
533 })
534 .collect::<Result<Vec<_>, _>>()
535 .map(Some),
536 other => Err(HostlibError::InvalidParameter {
537 builtin,
538 param: key,
539 message: format!("expected list of strings, got {}", other.type_name()),
540 }),
541 }
542}
543
544fn normalize_existing_path(
545 builtin: &'static str,
546 param: &'static str,
547 path: &str,
548) -> Result<PathBuf, HostlibError> {
549 normalize_existing_path_buf(builtin, param, &PathBuf::from(path))
550}
551
552fn normalize_existing_path_buf(
553 builtin: &'static str,
554 param: &'static str,
555 path: &Path,
556) -> Result<PathBuf, HostlibError> {
557 path.canonicalize()
558 .map_err(|err| HostlibError::InvalidParameter {
559 builtin,
560 param,
561 message: format!(
562 "{} does not resolve to an existing path: {err}",
563 path.display()
564 ),
565 })
566}
567
568fn path_to_string(path: &Path) -> String {
569 path.to_string_lossy().replace('\\', "/")
570}
571
572fn next_subscription_id() -> String {
573 let seq = NEXT_SUBSCRIPTION_ID.fetch_add(1, Ordering::Relaxed);
574 let millis = SystemTime::now()
575 .duration_since(UNIX_EPOCH)
576 .map(|duration| duration.as_millis())
577 .unwrap_or(0);
578 format!("fsw-{millis}-{seq}")
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584
585 fn event(kind: EventKind, path: impl Into<PathBuf>) -> Event {
586 Event::new(kind).add_path(path.into())
587 }
588
589 fn filter(root: PathBuf, globs: Option<Vec<&str>>) -> WatchFilter {
590 WatchFilter {
591 session_id: "session".to_string(),
592 subscription_id: "sub".to_string(),
593 root,
594 globs: globs.map(|patterns| {
595 build_globs(patterns.into_iter().map(str::to_string).collect())
596 .unwrap()
597 .unwrap()
598 }),
599 gitignore: None,
600 kinds: parse_kinds(&BTreeMap::new()).unwrap(),
601 }
602 }
603
604 #[test]
605 fn coalesce_deduplicates_same_kind_and_path() {
606 let root = std::env::current_dir().unwrap();
607 let path = root.join("src/lib.rs");
608 let filter = filter(root, None);
609 let events = coalesce_events(
610 vec![
611 event(EventKind::Modify(ModifyKind::Any), &path),
612 event(EventKind::Modify(ModifyKind::Any), &path),
613 ],
614 &filter,
615 );
616 assert_eq!(events.len(), 1);
617 assert_eq!(events[0].kind, "modify");
618 }
619
620 #[test]
621 fn glob_filter_uses_relative_paths() {
622 let root = std::env::current_dir().unwrap();
623 let filter = filter(root.clone(), Some(vec!["*.rs"]));
624 let events = coalesce_events(
625 vec![
626 event(
627 EventKind::Create(notify::event::CreateKind::Any),
628 root.join("src/lib.rs"),
629 ),
630 event(
631 EventKind::Create(notify::event::CreateKind::Any),
632 root.join("README.md"),
633 ),
634 ],
635 &filter,
636 );
637 assert_eq!(events.len(), 1);
638 assert_eq!(events[0].relative_paths, vec!["src/lib.rs"]);
639 }
640
641 #[test]
642 fn kind_filter_drops_unrequested_events() {
643 let root = std::env::current_dir().unwrap();
644 let mut filter = filter(root.clone(), None);
645 filter.kinds = BTreeSet::from(["remove".to_string()]);
646
647 let events = coalesce_events(
648 vec![
649 event(
650 EventKind::Create(notify::event::CreateKind::Any),
651 root.join("src/lib.rs"),
652 ),
653 event(
654 EventKind::Remove(notify::event::RemoveKind::Any),
655 root.join("src/lib.rs"),
656 ),
657 ],
658 &filter,
659 );
660
661 assert_eq!(events.len(), 1);
662 assert_eq!(events[0].kind, "remove");
663 }
664
665 #[test]
666 fn gitignore_filter_drops_ignored_paths() {
667 let temp = tempfile::tempdir().unwrap();
668 std::fs::write(temp.path().join(".gitignore"), "ignored.txt\n").unwrap();
669 let mut filter = filter(temp.path().to_path_buf(), None);
670 filter.gitignore = Some(build_gitignore(temp.path()));
671
672 let events = coalesce_events(
673 vec![
674 event(
675 EventKind::Modify(ModifyKind::Any),
676 temp.path().join("allowed.txt"),
677 ),
678 event(
679 EventKind::Modify(ModifyKind::Any),
680 temp.path().join("ignored.txt"),
681 ),
682 ],
683 &filter,
684 );
685
686 assert_eq!(events.len(), 1);
687 assert_eq!(events[0].relative_paths, vec!["allowed.txt"]);
688 }
689}