1use crate::audit::{FileConflictAuditEvent, FileConflictAuditLog};
2use crate::config::PermissionsConfig;
3use crate::tools::file_ops::{build_diff_preview, diff_preview_error_skip};
4use anyhow::{Context, Result};
5use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
6use parking_lot::Mutex;
7use serde_json::{Value, json};
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::Mutex as StdMutex;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
14use std::thread;
15use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
16use tokio::sync::Notify;
17use vtcode_commons::utils::calculate_sha256;
18
19pub const FILE_CONFLICT_OVERRIDE_ARG: &str = "__vtcode_conflict_override";
20pub const FILE_CONFLICT_DETECTED_FIELD: &str = "conflict_detected";
21pub const FILE_CONFLICT_PATH_FIELD: &str = "conflict_path";
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct FileSnapshot {
25 pub exists: bool,
26 pub size_bytes: u64,
27 pub modified_millis: Option<u128>,
28 pub sha256: String,
29 pub text_content: Option<String>,
30}
31
32impl FileSnapshot {
33 fn fingerprint(&self) -> String {
34 let modified = self
35 .modified_millis
36 .map(|value| value.to_string())
37 .unwrap_or_else(|| "none".to_string());
38 format!(
39 "exists={};size={};modified={};sha256={}",
40 self.exists, self.size_bytes, modified, self.sha256
41 )
42 }
43
44 fn to_json(&self) -> Value {
45 json!({
46 "exists": self.exists,
47 "size_bytes": self.size_bytes,
48 "modified_millis": self.modified_millis,
49 "sha256": self.sha256,
50 })
51 }
52
53 fn same_contents(&self, other: &Self) -> bool {
54 self.exists == other.exists
55 && self.size_bytes == other.size_bytes
56 && self.sha256 == other.sha256
57 }
58
59 fn same_identity(&self, other: &Self) -> bool {
60 self.same_contents(other) && self.modified_millis == other.modified_millis
61 }
62
63 fn from_identity_value(value: &Value) -> Option<Self> {
64 let object = value.as_object()?;
65 let modified_millis = match object.get("modified_millis") {
66 Some(Value::Null) | None => None,
67 Some(value) => value.as_u64().map(u128::from).or_else(|| {
68 value
69 .as_i64()
70 .filter(|millis| *millis >= 0)
71 .map(|millis| millis as u128)
72 }),
73 };
74
75 Some(Self {
76 exists: object.get("exists")?.as_bool()?,
77 size_bytes: object.get("size_bytes")?.as_u64()?,
78 modified_millis,
79 sha256: object.get("sha256")?.as_str()?.to_string(),
80 text_content: None,
81 })
82 }
83
84 fn from_text_content(content: &str) -> Self {
85 let bytes = content.as_bytes();
86 let sha256 = calculate_sha256(bytes);
87
88 Self {
89 exists: true,
90 size_bytes: bytes.len() as u64,
91 modified_millis: None,
92 sha256,
93 text_content: Some(content.to_string()),
94 }
95 }
96
97 fn missing() -> Self {
98 Self {
99 exists: false,
100 size_bytes: 0,
101 modified_millis: None,
102 sha256: String::new(),
103 text_content: None,
104 }
105 }
106}
107
108#[derive(Clone, Debug)]
109pub struct FileConflict {
110 pub path: PathBuf,
111 pub read_snapshot: Option<FileSnapshot>,
112 pub disk_snapshot: Option<FileSnapshot>,
113 pub intended_content: Option<String>,
114 pub emit_hitl_notification: bool,
115}
116
117#[derive(Clone, Debug, PartialEq, Eq)]
118pub struct TrackedPathFreshness {
119 pub path: PathBuf,
120 pub is_stale: bool,
121 pub fingerprint: Option<String>,
122}
123
124impl FileConflict {
125 pub fn to_tool_output(&self, workspace_root: &Path) -> Value {
126 let display_path = workspace_relative_display(workspace_root, &self.path);
127 let disk_content = self
128 .disk_snapshot
129 .as_ref()
130 .and_then(|snapshot| snapshot.text_content.clone());
131 let diff_preview = match (&disk_content, &self.intended_content) {
132 (Some(before), Some(after)) => build_diff_preview(&display_path, Some(before), after),
133 (None, Some(_)) => diff_preview_error_skip("binary_or_non_utf8_disk_content", None),
134 _ => diff_preview_error_skip("missing_intended_content", None),
135 };
136
137 json!({
138 "success": true,
139 FILE_CONFLICT_DETECTED_FIELD: true,
140 FILE_CONFLICT_PATH_FIELD: display_path,
141 "message": "File changed on disk since the agent last read it.",
142 "resolution": "pending",
143 "emit_hitl_notification": self.emit_hitl_notification,
144 "disk_content": disk_content,
145 "intended_content": self.intended_content,
146 "read_snapshot": self.read_snapshot.as_ref().map(FileSnapshot::to_json),
147 "disk_snapshot": self.disk_snapshot.as_ref().map(FileSnapshot::to_json),
148 "diff_preview": diff_preview,
149 })
150 }
151}
152
153#[derive(Clone, Debug)]
154struct StaleConflictState {
155 notification_emitted: bool,
156}
157
158#[derive(Clone, Debug)]
159struct TrackedFileState {
160 last_read_snapshot: Option<FileSnapshot>,
161 last_known_disk_snapshot: Option<FileSnapshot>,
162 last_agent_write_snapshot: Option<FileSnapshot>,
163 active_mutation: Option<u64>,
164 pending_mutations: VecDeque<u64>,
165 stale_conflict: Option<StaleConflictState>,
166 notify: Arc<Notify>,
167}
168
169impl TrackedFileState {
170 fn new() -> Self {
171 Self {
172 last_read_snapshot: None,
173 last_known_disk_snapshot: None,
174 last_agent_write_snapshot: None,
175 active_mutation: None,
176 pending_mutations: VecDeque::new(),
177 stale_conflict: None,
178 notify: Arc::new(Notify::new()),
179 }
180 }
181}
182
183#[derive(Default)]
184struct MonitorState {
185 tracked_files: HashMap<PathBuf, TrackedFileState>,
186 watched_parents: HashSet<PathBuf>,
187}
188
189struct EditedFileMonitorInner {
190 state: Mutex<MonitorState>,
191 watcher: StdMutex<Option<RecommendedWatcher>>,
192 audit_log: Mutex<Option<FileConflictAuditLog>>,
193 next_mutation_id: AtomicU64,
194 debounce_duration: Duration,
195}
196
197#[derive(Clone)]
198pub struct EditedFileMonitor {
199 inner: Arc<EditedFileMonitorInner>,
200}
201
202pub struct MutationLease {
203 inner: Arc<EditedFileMonitorInner>,
204 path: PathBuf,
205 mutation_id: u64,
206 released: bool,
207}
208
209struct PendingMutationGuard {
210 inner: Arc<EditedFileMonitorInner>,
211 path: PathBuf,
212 mutation_id: u64,
213 armed: bool,
214}
215
216impl PendingMutationGuard {
217 fn new(inner: Arc<EditedFileMonitorInner>, path: PathBuf, mutation_id: u64) -> Self {
218 Self {
219 inner,
220 path,
221 mutation_id,
222 armed: true,
223 }
224 }
225
226 fn disarm(&mut self) {
227 self.armed = false;
228 }
229}
230
231impl Drop for PendingMutationGuard {
232 fn drop(&mut self) {
233 if !self.armed {
234 return;
235 }
236
237 let mut state = self.inner.state.lock();
238 let Some(entry) = state.tracked_files.get_mut(&self.path) else {
239 return;
240 };
241
242 if remove_pending_mutation(entry, self.mutation_id) {
243 entry.notify.notify_waiters();
244 }
245 }
246}
247
248impl MutationLease {
249 pub fn path(&self) -> &Path {
250 &self.path
251 }
252}
253
254impl Drop for MutationLease {
255 fn drop(&mut self) {
256 if self.released {
257 return;
258 }
259 let mut state = self.inner.state.lock();
260 if let Some(entry) = state.tracked_files.get_mut(&self.path) {
261 let mut released_active = false;
262 if entry.active_mutation == Some(self.mutation_id) {
263 entry.active_mutation = None;
264 released_active = true;
265 }
266 let removed_pending = remove_pending_mutation(entry, self.mutation_id);
267 if released_active || removed_pending {
268 entry.notify.notify_waiters();
269 }
270 }
271 self.released = true;
272 }
273}
274
275impl EditedFileMonitor {
276 pub fn new() -> Self {
277 let (event_tx, event_rx) = mpsc::channel::<PathBuf>();
278 let watcher = RecommendedWatcher::new(
279 move |event: notify::Result<notify::Event>| {
280 let Ok(event) = event else {
281 return;
282 };
283
284 if !matches!(
285 event.kind,
286 EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
287 ) {
288 return;
289 }
290
291 for path in event.paths {
292 let _ = event_tx.send(path);
293 }
294 },
295 notify::Config::default(),
296 )
297 .ok();
298
299 let inner = Arc::new(EditedFileMonitorInner {
300 state: Mutex::new(MonitorState::default()),
301 watcher: StdMutex::new(watcher),
302 audit_log: Mutex::new(None),
303 next_mutation_id: AtomicU64::new(1),
304 debounce_duration: Duration::from_millis(250),
305 });
306
307 let monitor = Self {
308 inner: Arc::clone(&inner),
309 };
310 spawn_event_loop(inner, event_rx);
311 monitor
312 }
313
314 pub fn apply_permissions_config(&self, permissions: &PermissionsConfig) {
315 let mut audit_log = self.inner.audit_log.lock();
316 if !permissions.audit_enabled {
317 *audit_log = None;
318 return;
319 }
320
321 let audit_dir = vtcode_commons::paths::expand_tilde(&permissions.audit_directory);
322 match FileConflictAuditLog::new(audit_dir) {
323 Ok(log) => *audit_log = Some(log),
324 Err(err) => {
325 tracing::warn!(error = %err, "Failed to initialize file conflict audit log");
326 *audit_log = None;
327 }
328 }
329 }
330
331 pub async fn track_read(&self, path: &Path) -> Result<()> {
332 let path = normalize_event_path(path);
333 let snapshot = snapshot_path_async(path.clone()).await?;
334 self.record_read_snapshot(&path, snapshot)
335 }
336
337 pub async fn accept_disk_version(&self, path: &Path) -> Result<()> {
338 let path = normalize_event_path(path);
339 let snapshot = snapshot_path_async(path.clone()).await?;
340 self.record_read_snapshot(&path, snapshot)
341 }
342
343 pub fn record_read_snapshot(&self, path: &Path, snapshot: FileSnapshot) -> Result<()> {
344 let path = normalize_event_path(path);
345 {
346 let mut state = self.inner.state.lock();
347 let entry = state
348 .tracked_files
349 .entry(path.clone())
350 .or_insert_with(TrackedFileState::new);
351 entry.last_read_snapshot = Some(snapshot.clone());
352 entry.last_known_disk_snapshot = Some(snapshot);
353 entry.stale_conflict = None;
354 }
355 self.watch_parent(&path);
356 Ok(())
357 }
358
359 pub fn record_read_text(&self, path: &Path, content: &str) -> Result<()> {
360 self.record_read_snapshot(path, FileSnapshot::from_text_content(content))
361 }
362
363 pub fn record_agent_write_snapshot(&self, path: &Path, snapshot: FileSnapshot) -> Result<()> {
364 let path = normalize_event_path(path);
365 {
366 let mut state = self.inner.state.lock();
367 let entry = state
368 .tracked_files
369 .entry(path.clone())
370 .or_insert_with(TrackedFileState::new);
371 let snap_for_disk = snapshot.clone();
372 entry.last_read_snapshot = Some(snap_for_disk);
373 entry.last_known_disk_snapshot = Some(snapshot.clone());
374 entry.last_agent_write_snapshot = Some(snapshot);
375 entry.stale_conflict = None;
376 }
377 self.watch_parent(&path);
378 Ok(())
379 }
380
381 pub fn record_agent_write_text(&self, path: &Path, content: &str) -> Result<()> {
382 self.record_agent_write_snapshot(path, FileSnapshot::from_text_content(content))
383 }
384
385 pub fn record_agent_removal(&self, path: &Path) -> Result<()> {
386 self.record_agent_write_snapshot(path, FileSnapshot::missing())
387 }
388
389 pub async fn tracked_read_text(&self, path: &Path) -> Option<String> {
390 let path = normalize_event_path(path);
391 let state = self.inner.state.lock();
392 state
393 .tracked_files
394 .get(&path)
395 .and_then(|entry| entry.last_read_snapshot.as_ref())
396 .and_then(|snapshot| snapshot.text_content.clone())
397 }
398
399 pub fn tracked_paths(&self) -> Vec<PathBuf> {
400 let state = self.inner.state.lock();
401 let mut paths = state.tracked_files.keys().cloned().collect::<Vec<_>>();
402 paths.sort();
403 paths
404 }
405
406 pub fn stale_tracked_paths(&self) -> Vec<PathBuf> {
407 self.tracked_path_freshness()
408 .into_iter()
409 .filter_map(|entry| entry.is_stale.then_some(entry.path))
410 .collect()
411 }
412
413 pub fn tracked_path_freshness(&self) -> Vec<TrackedPathFreshness> {
414 let state = self.inner.state.lock();
415 let mut entries = state
416 .tracked_files
417 .iter()
418 .map(|(path, entry)| {
419 let is_stale = entry
420 .last_read_snapshot
421 .as_ref()
422 .zip(entry.last_known_disk_snapshot.as_ref())
423 .is_some_and(|(read_snapshot, disk_snapshot)| {
424 !read_snapshot.same_contents(disk_snapshot)
425 });
426 TrackedPathFreshness {
427 path: path.clone(),
428 is_stale,
429 fingerprint: entry
430 .last_known_disk_snapshot
431 .as_ref()
432 .map(FileSnapshot::fingerprint),
433 }
434 })
435 .collect::<Vec<_>>();
436 entries.sort_by(|left, right| left.path.cmp(&right.path));
437 entries
438 }
439
440 pub fn tracked_path_fingerprint(&self, path: &Path) -> Option<String> {
441 let path = normalize_event_path(path);
442 let state = self.inner.state.lock();
443 state
444 .tracked_files
445 .get(&path)
446 .and_then(|entry| entry.last_known_disk_snapshot.as_ref())
447 .map(FileSnapshot::fingerprint)
448 }
449
450 pub async fn acquire_mutation(&self, path: &Path) -> MutationLease {
451 let path = normalize_event_path(path);
452 let mutation_id = self.inner.next_mutation_id.fetch_add(1, Ordering::SeqCst);
453 let mut pending_guard =
454 PendingMutationGuard::new(Arc::clone(&self.inner), path.clone(), mutation_id);
455
456 loop {
457 let notify = {
458 let mut state = self.inner.state.lock();
459 let entry = state
460 .tracked_files
461 .entry(path.clone())
462 .or_insert_with(TrackedFileState::new);
463
464 if entry.active_mutation.is_none() {
465 if let Some(front) = entry.pending_mutations.front() {
466 if *front == mutation_id {
467 let _ = entry.pending_mutations.pop_front();
468 entry.active_mutation = Some(mutation_id);
469 pending_guard.disarm();
470 return MutationLease {
471 inner: Arc::clone(&self.inner),
472 path,
473 mutation_id,
474 released: false,
475 };
476 }
477 } else {
478 entry.active_mutation = Some(mutation_id);
479 pending_guard.disarm();
480 return MutationLease {
481 inner: Arc::clone(&self.inner),
482 path,
483 mutation_id,
484 released: false,
485 };
486 }
487 }
488
489 if !entry
490 .pending_mutations
491 .iter()
492 .any(|pending_id| *pending_id == mutation_id)
493 {
494 entry.pending_mutations.push_back(mutation_id);
495 }
496
497 entry.notify.clone()
498 };
499
500 notify.notified().await;
501 }
502 }
503
504 pub async fn detect_conflict(
505 &self,
506 path: &Path,
507 intended_content: Option<String>,
508 approved_snapshot: Option<FileSnapshot>,
509 ) -> Result<Option<FileConflict>> {
510 let path = normalize_event_path(path);
511 let current_snapshot = snapshot_path_async(path.clone()).await?;
512 let mut should_audit = false;
513
514 let maybe_conflict = {
515 let mut state = self.inner.state.lock();
516 let entry = state
517 .tracked_files
518 .entry(path.clone())
519 .or_insert_with(TrackedFileState::new);
520 entry.last_known_disk_snapshot = Some(current_snapshot.clone());
521
522 if entry
523 .last_agent_write_snapshot
524 .as_ref()
525 .is_some_and(|snapshot| snapshot.same_contents(¤t_snapshot))
526 {
527 entry.last_agent_write_snapshot = None;
528 entry.last_read_snapshot = Some(current_snapshot);
529 entry.stale_conflict = None;
530 return Ok(None);
531 }
532
533 let Some(read_snapshot) = entry.last_read_snapshot.clone() else {
534 return Ok(None);
535 };
536
537 if read_snapshot.same_contents(¤t_snapshot) {
538 entry.stale_conflict = None;
539 return Ok(None);
540 }
541
542 if approved_snapshot
543 .as_ref()
544 .is_some_and(|snapshot| snapshot.same_identity(¤t_snapshot))
545 {
546 entry.stale_conflict = None;
547 return Ok(None);
548 }
549
550 let emit_hitl_notification = match entry.stale_conflict.as_mut() {
551 Some(existing) => {
552 let emit = !existing.notification_emitted;
553 existing.notification_emitted = true;
554 emit
555 }
556 None => {
557 entry.stale_conflict = Some(StaleConflictState {
558 notification_emitted: true,
559 });
560 should_audit = true;
561 true
562 }
563 };
564
565 Some(FileConflict {
566 path: path.clone(),
567 read_snapshot: Some(read_snapshot),
568 disk_snapshot: Some(current_snapshot.clone()),
569 intended_content,
570 emit_hitl_notification,
571 })
572 };
573
574 if should_audit {
575 self.record_conflict_audit(&path, ¤t_snapshot, "pre_write_conflict");
576 }
577
578 Ok(maybe_conflict)
579 }
580
581 #[cfg(test)]
582 pub async fn debug_process_path_change(&self, path: &Path) -> Result<()> {
583 self.process_path_change(path.to_path_buf())
584 }
585
586 fn watch_parent(&self, path: &Path) {
587 let Some(parent) = path.parent().map(Path::to_path_buf) else {
588 return;
589 };
590
591 let should_watch = {
592 let mut state = self.inner.state.lock();
593 state.watched_parents.insert(parent.clone())
594 };
595
596 if !should_watch {
597 return;
598 }
599
600 let Ok(mut watcher) = self.inner.watcher.lock() else {
601 return;
602 };
603 let Some(watcher) = watcher.as_mut() else {
604 return;
605 };
606
607 if let Err(err) = watcher.watch(&parent, RecursiveMode::NonRecursive) {
608 tracing::warn!(path = %parent.display(), error = %err, "Failed to watch edited-file parent directory");
609 }
610 }
611
612 fn record_conflict_audit(&self, path: &Path, snapshot: &FileSnapshot, reason: &str) {
613 let event = FileConflictAuditEvent {
614 timestamp: chrono::Local::now(),
615 path: path.to_path_buf(),
616 reason: reason.to_string(),
617 file_exists: snapshot.exists,
618 size_bytes: snapshot.exists.then_some(snapshot.size_bytes),
619 sha256: snapshot.exists.then_some(snapshot.sha256.clone()),
620 };
621
622 if let Some(log) = self.inner.audit_log.lock().as_mut()
623 && let Err(err) = log.record(&event)
624 {
625 tracing::warn!(error = %err, "Failed to record file conflict audit event");
626 }
627 }
628
629 fn process_path_change(&self, path: PathBuf) -> Result<()> {
630 let tracked_path = normalize_event_path(&path);
631 let snapshot = snapshot_path_sync(&tracked_path).with_context(|| {
632 format!(
633 "Failed to snapshot externally modified file {}",
634 tracked_path.display()
635 )
636 })?;
637
638 let mut should_audit = false;
639
640 {
641 let mut state = self.inner.state.lock();
642 let Some(entry) = state.tracked_files.get_mut(&tracked_path) else {
643 return Ok(());
644 };
645
646 if entry
647 .last_known_disk_snapshot
648 .as_ref()
649 .is_some_and(|known| known.same_contents(&snapshot))
650 {
651 return Ok(());
652 }
653
654 entry.last_known_disk_snapshot = Some(snapshot.clone());
655
656 if entry
657 .last_agent_write_snapshot
658 .as_ref()
659 .is_some_and(|known| known.same_contents(&snapshot))
660 {
661 entry.last_read_snapshot = Some(snapshot);
662 entry.last_agent_write_snapshot = None;
663 entry.stale_conflict = None;
664 return Ok(());
665 }
666
667 if entry
668 .last_read_snapshot
669 .as_ref()
670 .is_some_and(|known| known.same_contents(&snapshot))
671 {
672 entry.stale_conflict = None;
673 return Ok(());
674 }
675
676 if entry
677 .last_read_snapshot
678 .as_ref()
679 .is_some_and(|read_snapshot| !read_snapshot.same_contents(&snapshot))
680 {
681 should_audit = true;
682 match entry.stale_conflict.as_mut() {
683 Some(_) => {}
684 None => {
685 entry.stale_conflict = Some(StaleConflictState {
686 notification_emitted: false,
687 });
688 }
689 }
690 }
691 }
692
693 if should_audit {
694 self.record_conflict_audit(
695 &tracked_path,
696 &snapshot,
697 "watcher_detected_external_change",
698 );
699 }
700
701 Ok(())
702 }
703}
704
705impl Default for EditedFileMonitor {
706 fn default() -> Self {
707 Self::new()
708 }
709}
710
711fn spawn_event_loop(inner: Arc<EditedFileMonitorInner>, event_rx: Receiver<PathBuf>) {
712 thread::spawn(move || {
713 let mut pending = HashMap::<PathBuf, Instant>::new();
714
715 loop {
716 match event_rx.recv_timeout(Duration::from_millis(50)) {
717 Ok(path) => {
718 pending.insert(normalize_event_path(&path), Instant::now());
719 }
720 Err(RecvTimeoutError::Timeout) => {}
721 Err(RecvTimeoutError::Disconnected) => break,
722 }
723
724 let now = Instant::now();
725 let ready = pending
726 .iter()
727 .filter_map(|(path, observed_at)| {
728 (now.duration_since(*observed_at) >= inner.debounce_duration)
729 .then_some(path.clone())
730 })
731 .collect::<Vec<_>>();
732
733 for path in ready {
734 pending.remove(&path);
735 let monitor = EditedFileMonitor {
736 inner: Arc::clone(&inner),
737 };
738 if let Err(err) = monitor.process_path_change(path.clone()) {
739 tracing::debug!(path = %path.display(), error = %err, "Edited-file watcher refresh failed");
740 }
741 }
742 }
743 });
744}
745
746fn normalize_event_path(path: &Path) -> PathBuf {
747 std::fs::canonicalize(path).unwrap_or_else(|_| {
748 path.parent()
749 .and_then(|parent| std::fs::canonicalize(parent).ok())
750 .and_then(|parent| path.file_name().map(|name| parent.join(name)))
751 .unwrap_or_else(|| path.to_path_buf())
752 })
753}
754
755async fn snapshot_path_async(path: PathBuf) -> Result<FileSnapshot> {
756 tokio::task::spawn_blocking(move || snapshot_path_sync(&path))
757 .await
758 .context("Failed to join file snapshot task")?
759}
760
761fn snapshot_path_sync(path: &Path) -> Result<FileSnapshot> {
762 let metadata = match std::fs::metadata(path) {
763 Ok(metadata) => metadata,
764 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
765 return Ok(FileSnapshot {
766 exists: false,
767 size_bytes: 0,
768 modified_millis: None,
769 sha256: String::new(),
770 text_content: None,
771 });
772 }
773 Err(err) => {
774 return Err(err)
775 .with_context(|| format!("Failed to read metadata for {}", path.display()));
776 }
777 };
778
779 if !metadata.is_file() {
780 return Ok(FileSnapshot {
781 exists: false,
782 size_bytes: 0,
783 modified_millis: metadata.modified().ok().and_then(system_time_to_millis),
784 sha256: String::new(),
785 text_content: None,
786 });
787 }
788
789 let bytes = std::fs::read(path)
790 .with_context(|| format!("Failed to read file bytes for {}", path.display()))?;
791 let sha256 = calculate_sha256(&bytes);
792
793 Ok(FileSnapshot {
794 exists: true,
795 size_bytes: metadata.len(),
796 modified_millis: metadata.modified().ok().and_then(system_time_to_millis),
797 sha256,
798 text_content: String::from_utf8(bytes).ok(),
799 })
800}
801
802fn system_time_to_millis(time: SystemTime) -> Option<u128> {
803 time.duration_since(UNIX_EPOCH)
804 .ok()
805 .map(|duration| duration.as_millis())
806}
807
808fn workspace_relative_display(workspace_root: &Path, path: &Path) -> String {
809 if let Ok(relative) = path.strip_prefix(workspace_root) {
810 return relative.to_string_lossy().to_string();
811 }
812 if let Ok(canonical_root) = std::fs::canonicalize(workspace_root)
813 && let Ok(relative) = path.strip_prefix(canonical_root)
814 {
815 return relative.to_string_lossy().to_string();
816 }
817 path.to_string_lossy().to_string()
818}
819
820pub fn conflict_override_snapshot(args: &Value) -> Option<FileSnapshot> {
821 args.get(FILE_CONFLICT_OVERRIDE_ARG)
822 .and_then(FileSnapshot::from_identity_value)
823}
824
825fn remove_pending_mutation(entry: &mut TrackedFileState, mutation_id: u64) -> bool {
826 let Some(index) = entry
827 .pending_mutations
828 .iter()
829 .position(|pending_id| *pending_id == mutation_id)
830 else {
831 return false;
832 };
833
834 let _ = entry.pending_mutations.remove(index);
835 true
836}
837
838#[cfg(test)]
839mod tests {
840 use super::*;
841 use tempfile::TempDir;
842
843 #[tokio::test]
844 async fn detects_same_mtime_same_size_hash_mismatch() -> Result<()> {
845 let temp = TempDir::new()?;
846 let file = temp.path().join("sample.txt");
847 std::fs::write(&file, "before\n")?;
848
849 let first = snapshot_path_async(file.clone()).await?;
850 std::fs::write(&file, "after!\n")?;
851 let second = snapshot_path_async(file.clone()).await?;
852
853 assert_eq!(first.size_bytes, second.size_bytes);
854 assert_ne!(first.sha256, second.sha256);
855 Ok(())
856 }
857
858 #[tokio::test]
859 async fn suppresses_self_write_event() -> Result<()> {
860 let temp = TempDir::new()?;
861 let file = temp.path().join("sample.txt");
862 std::fs::write(&file, "before\n")?;
863 let monitor = EditedFileMonitor::new();
864
865 monitor.track_read(&file).await?;
866 std::fs::write(&file, "after\n")?;
867 monitor.record_agent_write_text(&file, "after\n")?;
868 monitor.debug_process_path_change(&file).await?;
869
870 assert!(
871 monitor
872 .detect_conflict(&file, Some("after\n".to_string()), None)
873 .await?
874 .is_none()
875 );
876 Ok(())
877 }
878
879 #[tokio::test]
880 async fn expected_agent_snapshot_does_not_hide_external_follow_up_change() -> Result<()> {
881 let temp = TempDir::new()?;
882 let file = temp.path().join("sample.txt");
883 std::fs::write(&file, "before\n")?;
884 let monitor = EditedFileMonitor::new();
885
886 monitor.track_read(&file).await?;
887 std::fs::write(&file, "after\n")?;
888 monitor.record_agent_write_text(&file, "after\n")?;
889
890 std::fs::write(&file, "after formatted\n")?;
891 monitor.debug_process_path_change(&file).await?;
892
893 let conflict = monitor
894 .detect_conflict(&file, Some("agent\n".to_string()), None)
895 .await?;
896 assert!(conflict.is_some());
897 Ok(())
898 }
899
900 #[tokio::test]
901 async fn queues_mutations_in_order() -> Result<()> {
902 let temp = TempDir::new()?;
903 let file = temp.path().join("sample.txt");
904 std::fs::write(&file, "hello\n")?;
905 let monitor = Arc::new(EditedFileMonitor::new());
906 let normalized_file = normalize_event_path(&file);
907
908 let first = monitor.acquire_mutation(&file).await;
909 let monitor_clone = Arc::clone(&monitor);
910 let file_clone = file.clone();
911 let waiter = tokio::spawn(async move {
912 let lease = monitor_clone.acquire_mutation(&file_clone).await;
913 lease.path().to_path_buf()
914 });
915
916 tokio::time::sleep(Duration::from_millis(50)).await;
917 drop(first);
918 let acquired = waiter.await?;
919 assert_eq!(acquired, normalized_file);
920 Ok(())
921 }
922
923 #[tokio::test]
924 async fn detects_external_change_after_read() -> Result<()> {
925 let temp = TempDir::new()?;
926 let file = temp.path().join("sample.txt");
927 std::fs::write(&file, "before\n")?;
928 let monitor = EditedFileMonitor::new();
929
930 monitor.track_read(&file).await?;
931 std::fs::write(&file, "external\n")?;
932 monitor.debug_process_path_change(&file).await?;
933
934 let conflict = monitor
935 .detect_conflict(&file, Some("agent\n".to_string()), None)
936 .await?;
937 assert!(conflict.is_some());
938 Ok(())
939 }
940
941 #[tokio::test]
942 async fn cancelled_waiter_does_not_block_following_mutation() -> Result<()> {
943 let temp = TempDir::new()?;
944 let file = temp.path().join("sample.txt");
945 std::fs::write(&file, "hello\n")?;
946 let monitor = Arc::new(EditedFileMonitor::new());
947 let normalized_file = normalize_event_path(&file);
948
949 let first = monitor.acquire_mutation(&file).await;
950
951 let pending_monitor = Arc::clone(&monitor);
952 let pending_file = file.clone();
953 let pending = tokio::spawn(async move {
954 let _lease = pending_monitor.acquire_mutation(&pending_file).await;
955 });
956 tokio::time::sleep(Duration::from_millis(50)).await;
957 pending.abort();
958
959 let next_monitor = Arc::clone(&monitor);
960 let next_file = file.clone();
961 let next = tokio::spawn(async move {
962 let lease = next_monitor.acquire_mutation(&next_file).await;
963 lease.path().to_path_buf()
964 });
965
966 drop(first);
967
968 let acquired = tokio::time::timeout(Duration::from_secs(1), next).await??;
969 assert_eq!(acquired, normalized_file);
970 Ok(())
971 }
972
973 #[tokio::test]
974 async fn clears_conflict_state_when_disk_returns_to_read_snapshot() -> Result<()> {
975 let temp = TempDir::new()?;
976 let file = temp.path().join("sample.txt");
977 std::fs::write(&file, "before\n")?;
978 let monitor = EditedFileMonitor::new();
979
980 monitor.track_read(&file).await?;
981 std::fs::write(&file, "external one\n")?;
982 monitor.debug_process_path_change(&file).await?;
983
984 let first_conflict = monitor
985 .detect_conflict(&file, Some("agent\n".to_string()), None)
986 .await?
987 .expect("expected initial conflict");
988 assert!(first_conflict.emit_hitl_notification);
989
990 std::fs::write(&file, "before\n")?;
991 monitor.debug_process_path_change(&file).await?;
992 assert!(
993 monitor
994 .detect_conflict(&file, Some("agent\n".to_string()), None)
995 .await?
996 .is_none()
997 );
998
999 std::fs::write(&file, "external two\n")?;
1000 monitor.debug_process_path_change(&file).await?;
1001 let second_conflict = monitor
1002 .detect_conflict(&file, Some("agent\n".to_string()), None)
1003 .await?
1004 .expect("expected renewed conflict");
1005 assert!(second_conflict.emit_hitl_notification);
1006 Ok(())
1007 }
1008
1009 #[tokio::test]
1010 async fn stale_path_queries_expose_external_change_without_clearing_state() -> Result<()> {
1011 let temp = TempDir::new()?;
1012 let file = temp.path().join("sample.txt");
1013 std::fs::write(&file, "before\n")?;
1014 let monitor = EditedFileMonitor::new();
1015
1016 monitor.track_read(&file).await?;
1017 let tracked_paths = monitor.tracked_paths();
1018 assert_eq!(tracked_paths.len(), 1);
1019 let tracked_path = tracked_paths[0].clone();
1020 assert!(monitor.stale_tracked_paths().is_empty());
1021
1022 std::fs::write(&file, "external\n")?;
1023 monitor.debug_process_path_change(&file).await?;
1024
1025 let freshness = monitor.tracked_path_freshness();
1026 assert_eq!(freshness.len(), 1);
1027 assert_eq!(freshness[0].path, tracked_path);
1028 assert!(freshness[0].is_stale);
1029 assert!(freshness[0].fingerprint.is_some());
1030 assert_eq!(
1031 monitor.stale_tracked_paths(),
1032 vec![freshness[0].path.clone()]
1033 );
1034 assert_eq!(
1035 monitor.tracked_path_fingerprint(&freshness[0].path),
1036 freshness[0].fingerprint
1037 );
1038 Ok(())
1039 }
1040
1041 #[tokio::test]
1042 async fn override_requires_matching_approved_snapshot() -> Result<()> {
1043 let temp = TempDir::new()?;
1044 let file = temp.path().join("sample.txt");
1045 std::fs::write(&file, "before\n")?;
1046 let monitor = EditedFileMonitor::new();
1047
1048 monitor.track_read(&file).await?;
1049 std::fs::write(&file, "external one\n")?;
1050 let approved_snapshot = snapshot_path_async(file.clone()).await?;
1051
1052 std::fs::write(&file, "external two\n")?;
1053
1054 let conflict = monitor
1055 .detect_conflict(&file, Some("agent\n".to_string()), Some(approved_snapshot))
1056 .await?;
1057 assert!(conflict.is_some());
1058 Ok(())
1059 }
1060
1061 #[test]
1062 fn normalizes_missing_event_paths_via_canonical_parent() -> Result<()> {
1063 let temp = TempDir::new()?;
1064 let missing = temp.path().join("missing.txt");
1065 let canonical_parent = std::fs::canonicalize(temp.path())?;
1066
1067 assert_eq!(
1068 normalize_event_path(&missing),
1069 canonical_parent.join("missing.txt")
1070 );
1071 Ok(())
1072 }
1073}