Skip to main content

vtcode_core/tools/
edited_file_monitor.rs

1use crate::audit::{FileConflictAuditEvent, FileConflictAuditLog};
2use crate::config::PermissionsConfig;
3use crate::tools::file_ops::{build_diff_preview, diff_preview_error_skip};
4use anyhow::{Context, Result};
5use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
6use parking_lot::Mutex;
7use serde_json::{Value, json};
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::sync::Mutex as StdMutex;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
14use std::thread;
15use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
16use tokio::sync::Notify;
17use vtcode_commons::utils::calculate_sha256;
18
19pub const FILE_CONFLICT_OVERRIDE_ARG: &str = "__vtcode_conflict_override";
20pub const FILE_CONFLICT_DETECTED_FIELD: &str = "conflict_detected";
21pub const FILE_CONFLICT_PATH_FIELD: &str = "conflict_path";
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct FileSnapshot {
25    pub exists: bool,
26    pub size_bytes: u64,
27    pub modified_millis: Option<u128>,
28    pub sha256: String,
29    pub text_content: Option<String>,
30}
31
32impl FileSnapshot {
33    fn fingerprint(&self) -> String {
34        let modified = self
35            .modified_millis
36            .map(|value| value.to_string())
37            .unwrap_or_else(|| "none".to_string());
38        format!(
39            "exists={};size={};modified={};sha256={}",
40            self.exists, self.size_bytes, modified, self.sha256
41        )
42    }
43
44    fn to_json(&self) -> Value {
45        json!({
46            "exists": self.exists,
47            "size_bytes": self.size_bytes,
48            "modified_millis": self.modified_millis,
49            "sha256": self.sha256,
50        })
51    }
52
53    fn same_contents(&self, other: &Self) -> bool {
54        self.exists == other.exists
55            && self.size_bytes == other.size_bytes
56            && self.sha256 == other.sha256
57    }
58
59    fn same_identity(&self, other: &Self) -> bool {
60        self.same_contents(other) && self.modified_millis == other.modified_millis
61    }
62
63    fn from_identity_value(value: &Value) -> Option<Self> {
64        let object = value.as_object()?;
65        let modified_millis = match object.get("modified_millis") {
66            Some(Value::Null) | None => None,
67            Some(value) => value.as_u64().map(u128::from).or_else(|| {
68                value
69                    .as_i64()
70                    .filter(|millis| *millis >= 0)
71                    .map(|millis| millis as u128)
72            }),
73        };
74
75        Some(Self {
76            exists: object.get("exists")?.as_bool()?,
77            size_bytes: object.get("size_bytes")?.as_u64()?,
78            modified_millis,
79            sha256: object.get("sha256")?.as_str()?.to_string(),
80            text_content: None,
81        })
82    }
83
84    fn from_text_content(content: &str) -> Self {
85        let bytes = content.as_bytes();
86        let sha256 = calculate_sha256(bytes);
87
88        Self {
89            exists: true,
90            size_bytes: bytes.len() as u64,
91            modified_millis: None,
92            sha256,
93            text_content: Some(content.to_string()),
94        }
95    }
96
97    fn missing() -> Self {
98        Self {
99            exists: false,
100            size_bytes: 0,
101            modified_millis: None,
102            sha256: String::new(),
103            text_content: None,
104        }
105    }
106}
107
108#[derive(Clone, Debug)]
109pub struct FileConflict {
110    pub path: PathBuf,
111    pub read_snapshot: Option<FileSnapshot>,
112    pub disk_snapshot: Option<FileSnapshot>,
113    pub intended_content: Option<String>,
114    pub emit_hitl_notification: bool,
115}
116
117#[derive(Clone, Debug, PartialEq, Eq)]
118pub struct TrackedPathFreshness {
119    pub path: PathBuf,
120    pub is_stale: bool,
121    pub fingerprint: Option<String>,
122}
123
124impl FileConflict {
125    pub fn to_tool_output(&self, workspace_root: &Path) -> Value {
126        let display_path = workspace_relative_display(workspace_root, &self.path);
127        let disk_content = self
128            .disk_snapshot
129            .as_ref()
130            .and_then(|snapshot| snapshot.text_content.clone());
131        let diff_preview = match (&disk_content, &self.intended_content) {
132            (Some(before), Some(after)) => build_diff_preview(&display_path, Some(before), after),
133            (None, Some(_)) => diff_preview_error_skip("binary_or_non_utf8_disk_content", None),
134            _ => diff_preview_error_skip("missing_intended_content", None),
135        };
136
137        json!({
138            "success": true,
139            FILE_CONFLICT_DETECTED_FIELD: true,
140            FILE_CONFLICT_PATH_FIELD: display_path,
141            "message": "File changed on disk since the agent last read it.",
142            "resolution": "pending",
143            "emit_hitl_notification": self.emit_hitl_notification,
144            "disk_content": disk_content,
145            "intended_content": self.intended_content,
146            "read_snapshot": self.read_snapshot.as_ref().map(FileSnapshot::to_json),
147            "disk_snapshot": self.disk_snapshot.as_ref().map(FileSnapshot::to_json),
148            "diff_preview": diff_preview,
149        })
150    }
151}
152
153#[derive(Clone, Debug)]
154struct StaleConflictState {
155    notification_emitted: bool,
156}
157
158#[derive(Clone, Debug)]
159struct TrackedFileState {
160    last_read_snapshot: Option<FileSnapshot>,
161    last_known_disk_snapshot: Option<FileSnapshot>,
162    last_agent_write_snapshot: Option<FileSnapshot>,
163    active_mutation: Option<u64>,
164    pending_mutations: VecDeque<u64>,
165    stale_conflict: Option<StaleConflictState>,
166    notify: Arc<Notify>,
167}
168
169impl TrackedFileState {
170    fn new() -> Self {
171        Self {
172            last_read_snapshot: None,
173            last_known_disk_snapshot: None,
174            last_agent_write_snapshot: None,
175            active_mutation: None,
176            pending_mutations: VecDeque::new(),
177            stale_conflict: None,
178            notify: Arc::new(Notify::new()),
179        }
180    }
181}
182
183#[derive(Default)]
184struct MonitorState {
185    tracked_files: HashMap<PathBuf, TrackedFileState>,
186    watched_parents: HashSet<PathBuf>,
187}
188
189struct EditedFileMonitorInner {
190    state: Mutex<MonitorState>,
191    watcher: StdMutex<Option<RecommendedWatcher>>,
192    audit_log: Mutex<Option<FileConflictAuditLog>>,
193    next_mutation_id: AtomicU64,
194    debounce_duration: Duration,
195}
196
197#[derive(Clone)]
198pub struct EditedFileMonitor {
199    inner: Arc<EditedFileMonitorInner>,
200}
201
202pub struct MutationLease {
203    inner: Arc<EditedFileMonitorInner>,
204    path: PathBuf,
205    mutation_id: u64,
206    released: bool,
207}
208
209struct PendingMutationGuard {
210    inner: Arc<EditedFileMonitorInner>,
211    path: PathBuf,
212    mutation_id: u64,
213    armed: bool,
214}
215
216impl PendingMutationGuard {
217    fn new(inner: Arc<EditedFileMonitorInner>, path: PathBuf, mutation_id: u64) -> Self {
218        Self {
219            inner,
220            path,
221            mutation_id,
222            armed: true,
223        }
224    }
225
226    fn disarm(&mut self) {
227        self.armed = false;
228    }
229}
230
231impl Drop for PendingMutationGuard {
232    fn drop(&mut self) {
233        if !self.armed {
234            return;
235        }
236
237        let mut state = self.inner.state.lock();
238        let Some(entry) = state.tracked_files.get_mut(&self.path) else {
239            return;
240        };
241
242        if remove_pending_mutation(entry, self.mutation_id) {
243            entry.notify.notify_waiters();
244        }
245    }
246}
247
248impl MutationLease {
249    pub fn path(&self) -> &Path {
250        &self.path
251    }
252}
253
254impl Drop for MutationLease {
255    fn drop(&mut self) {
256        if self.released {
257            return;
258        }
259        let mut state = self.inner.state.lock();
260        if let Some(entry) = state.tracked_files.get_mut(&self.path) {
261            let mut released_active = false;
262            if entry.active_mutation == Some(self.mutation_id) {
263                entry.active_mutation = None;
264                released_active = true;
265            }
266            let removed_pending = remove_pending_mutation(entry, self.mutation_id);
267            if released_active || removed_pending {
268                entry.notify.notify_waiters();
269            }
270        }
271        self.released = true;
272    }
273}
274
275impl EditedFileMonitor {
276    pub fn new() -> Self {
277        let (event_tx, event_rx) = mpsc::channel::<PathBuf>();
278        let watcher = RecommendedWatcher::new(
279            move |event: notify::Result<notify::Event>| {
280                let Ok(event) = event else {
281                    return;
282                };
283
284                if !matches!(
285                    event.kind,
286                    EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
287                ) {
288                    return;
289                }
290
291                for path in event.paths {
292                    let _ = event_tx.send(path);
293                }
294            },
295            notify::Config::default(),
296        )
297        .ok();
298
299        let inner = Arc::new(EditedFileMonitorInner {
300            state: Mutex::new(MonitorState::default()),
301            watcher: StdMutex::new(watcher),
302            audit_log: Mutex::new(None),
303            next_mutation_id: AtomicU64::new(1),
304            debounce_duration: Duration::from_millis(250),
305        });
306
307        let monitor = Self {
308            inner: Arc::clone(&inner),
309        };
310        spawn_event_loop(inner, event_rx);
311        monitor
312    }
313
314    pub fn apply_permissions_config(&self, permissions: &PermissionsConfig) {
315        let mut audit_log = self.inner.audit_log.lock();
316        if !permissions.audit_enabled {
317            *audit_log = None;
318            return;
319        }
320
321        let audit_dir = vtcode_commons::paths::expand_tilde(&permissions.audit_directory);
322        match FileConflictAuditLog::new(audit_dir) {
323            Ok(log) => *audit_log = Some(log),
324            Err(err) => {
325                tracing::warn!(error = %err, "Failed to initialize file conflict audit log");
326                *audit_log = None;
327            }
328        }
329    }
330
331    pub async fn track_read(&self, path: &Path) -> Result<()> {
332        let path = normalize_event_path(path);
333        let snapshot = snapshot_path_async(path.clone()).await?;
334        self.record_read_snapshot(&path, snapshot)
335    }
336
337    pub async fn accept_disk_version(&self, path: &Path) -> Result<()> {
338        let path = normalize_event_path(path);
339        let snapshot = snapshot_path_async(path.clone()).await?;
340        self.record_read_snapshot(&path, snapshot)
341    }
342
343    pub fn record_read_snapshot(&self, path: &Path, snapshot: FileSnapshot) -> Result<()> {
344        let path = normalize_event_path(path);
345        {
346            let mut state = self.inner.state.lock();
347            let entry = state
348                .tracked_files
349                .entry(path.clone())
350                .or_insert_with(TrackedFileState::new);
351            entry.last_read_snapshot = Some(snapshot.clone());
352            entry.last_known_disk_snapshot = Some(snapshot);
353            entry.stale_conflict = None;
354        }
355        self.watch_parent(&path);
356        Ok(())
357    }
358
359    pub fn record_read_text(&self, path: &Path, content: &str) -> Result<()> {
360        self.record_read_snapshot(path, FileSnapshot::from_text_content(content))
361    }
362
363    pub fn record_agent_write_snapshot(&self, path: &Path, snapshot: FileSnapshot) -> Result<()> {
364        let path = normalize_event_path(path);
365        {
366            let mut state = self.inner.state.lock();
367            let entry = state
368                .tracked_files
369                .entry(path.clone())
370                .or_insert_with(TrackedFileState::new);
371            let snap_for_disk = snapshot.clone();
372            entry.last_read_snapshot = Some(snap_for_disk);
373            entry.last_known_disk_snapshot = Some(snapshot.clone());
374            entry.last_agent_write_snapshot = Some(snapshot);
375            entry.stale_conflict = None;
376        }
377        self.watch_parent(&path);
378        Ok(())
379    }
380
381    pub fn record_agent_write_text(&self, path: &Path, content: &str) -> Result<()> {
382        self.record_agent_write_snapshot(path, FileSnapshot::from_text_content(content))
383    }
384
385    pub fn record_agent_removal(&self, path: &Path) -> Result<()> {
386        self.record_agent_write_snapshot(path, FileSnapshot::missing())
387    }
388
389    pub async fn tracked_read_text(&self, path: &Path) -> Option<String> {
390        let path = normalize_event_path(path);
391        let state = self.inner.state.lock();
392        state
393            .tracked_files
394            .get(&path)
395            .and_then(|entry| entry.last_read_snapshot.as_ref())
396            .and_then(|snapshot| snapshot.text_content.clone())
397    }
398
399    pub fn tracked_paths(&self) -> Vec<PathBuf> {
400        let state = self.inner.state.lock();
401        let mut paths = state.tracked_files.keys().cloned().collect::<Vec<_>>();
402        paths.sort();
403        paths
404    }
405
406    pub fn stale_tracked_paths(&self) -> Vec<PathBuf> {
407        self.tracked_path_freshness()
408            .into_iter()
409            .filter_map(|entry| entry.is_stale.then_some(entry.path))
410            .collect()
411    }
412
413    pub fn tracked_path_freshness(&self) -> Vec<TrackedPathFreshness> {
414        let state = self.inner.state.lock();
415        let mut entries = state
416            .tracked_files
417            .iter()
418            .map(|(path, entry)| {
419                let is_stale = entry
420                    .last_read_snapshot
421                    .as_ref()
422                    .zip(entry.last_known_disk_snapshot.as_ref())
423                    .is_some_and(|(read_snapshot, disk_snapshot)| {
424                        !read_snapshot.same_contents(disk_snapshot)
425                    });
426                TrackedPathFreshness {
427                    path: path.clone(),
428                    is_stale,
429                    fingerprint: entry
430                        .last_known_disk_snapshot
431                        .as_ref()
432                        .map(FileSnapshot::fingerprint),
433                }
434            })
435            .collect::<Vec<_>>();
436        entries.sort_by(|left, right| left.path.cmp(&right.path));
437        entries
438    }
439
440    pub fn tracked_path_fingerprint(&self, path: &Path) -> Option<String> {
441        let path = normalize_event_path(path);
442        let state = self.inner.state.lock();
443        state
444            .tracked_files
445            .get(&path)
446            .and_then(|entry| entry.last_known_disk_snapshot.as_ref())
447            .map(FileSnapshot::fingerprint)
448    }
449
450    pub async fn acquire_mutation(&self, path: &Path) -> MutationLease {
451        let path = normalize_event_path(path);
452        let mutation_id = self.inner.next_mutation_id.fetch_add(1, Ordering::SeqCst);
453        let mut pending_guard =
454            PendingMutationGuard::new(Arc::clone(&self.inner), path.clone(), mutation_id);
455
456        loop {
457            let notify = {
458                let mut state = self.inner.state.lock();
459                let entry = state
460                    .tracked_files
461                    .entry(path.clone())
462                    .or_insert_with(TrackedFileState::new);
463
464                if entry.active_mutation.is_none() {
465                    if let Some(front) = entry.pending_mutations.front() {
466                        if *front == mutation_id {
467                            let _ = entry.pending_mutations.pop_front();
468                            entry.active_mutation = Some(mutation_id);
469                            pending_guard.disarm();
470                            return MutationLease {
471                                inner: Arc::clone(&self.inner),
472                                path,
473                                mutation_id,
474                                released: false,
475                            };
476                        }
477                    } else {
478                        entry.active_mutation = Some(mutation_id);
479                        pending_guard.disarm();
480                        return MutationLease {
481                            inner: Arc::clone(&self.inner),
482                            path,
483                            mutation_id,
484                            released: false,
485                        };
486                    }
487                }
488
489                if !entry
490                    .pending_mutations
491                    .iter()
492                    .any(|pending_id| *pending_id == mutation_id)
493                {
494                    entry.pending_mutations.push_back(mutation_id);
495                }
496
497                entry.notify.clone()
498            };
499
500            notify.notified().await;
501        }
502    }
503
504    pub async fn detect_conflict(
505        &self,
506        path: &Path,
507        intended_content: Option<String>,
508        approved_snapshot: Option<FileSnapshot>,
509    ) -> Result<Option<FileConflict>> {
510        let path = normalize_event_path(path);
511        let current_snapshot = snapshot_path_async(path.clone()).await?;
512        let mut should_audit = false;
513
514        let maybe_conflict = {
515            let mut state = self.inner.state.lock();
516            let entry = state
517                .tracked_files
518                .entry(path.clone())
519                .or_insert_with(TrackedFileState::new);
520            entry.last_known_disk_snapshot = Some(current_snapshot.clone());
521
522            if entry
523                .last_agent_write_snapshot
524                .as_ref()
525                .is_some_and(|snapshot| snapshot.same_contents(&current_snapshot))
526            {
527                entry.last_agent_write_snapshot = None;
528                entry.last_read_snapshot = Some(current_snapshot);
529                entry.stale_conflict = None;
530                return Ok(None);
531            }
532
533            let Some(read_snapshot) = entry.last_read_snapshot.clone() else {
534                return Ok(None);
535            };
536
537            if read_snapshot.same_contents(&current_snapshot) {
538                entry.stale_conflict = None;
539                return Ok(None);
540            }
541
542            if approved_snapshot
543                .as_ref()
544                .is_some_and(|snapshot| snapshot.same_identity(&current_snapshot))
545            {
546                entry.stale_conflict = None;
547                return Ok(None);
548            }
549
550            let emit_hitl_notification = match entry.stale_conflict.as_mut() {
551                Some(existing) => {
552                    let emit = !existing.notification_emitted;
553                    existing.notification_emitted = true;
554                    emit
555                }
556                None => {
557                    entry.stale_conflict = Some(StaleConflictState {
558                        notification_emitted: true,
559                    });
560                    should_audit = true;
561                    true
562                }
563            };
564
565            Some(FileConflict {
566                path: path.clone(),
567                read_snapshot: Some(read_snapshot),
568                disk_snapshot: Some(current_snapshot.clone()),
569                intended_content,
570                emit_hitl_notification,
571            })
572        };
573
574        if should_audit {
575            self.record_conflict_audit(&path, &current_snapshot, "pre_write_conflict");
576        }
577
578        Ok(maybe_conflict)
579    }
580
581    #[cfg(test)]
582    pub async fn debug_process_path_change(&self, path: &Path) -> Result<()> {
583        self.process_path_change(path.to_path_buf())
584    }
585
586    fn watch_parent(&self, path: &Path) {
587        let Some(parent) = path.parent().map(Path::to_path_buf) else {
588            return;
589        };
590
591        let should_watch = {
592            let mut state = self.inner.state.lock();
593            state.watched_parents.insert(parent.clone())
594        };
595
596        if !should_watch {
597            return;
598        }
599
600        let Ok(mut watcher) = self.inner.watcher.lock() else {
601            return;
602        };
603        let Some(watcher) = watcher.as_mut() else {
604            return;
605        };
606
607        if let Err(err) = watcher.watch(&parent, RecursiveMode::NonRecursive) {
608            tracing::warn!(path = %parent.display(), error = %err, "Failed to watch edited-file parent directory");
609        }
610    }
611
612    fn record_conflict_audit(&self, path: &Path, snapshot: &FileSnapshot, reason: &str) {
613        let event = FileConflictAuditEvent {
614            timestamp: chrono::Local::now(),
615            path: path.to_path_buf(),
616            reason: reason.to_string(),
617            file_exists: snapshot.exists,
618            size_bytes: snapshot.exists.then_some(snapshot.size_bytes),
619            sha256: snapshot.exists.then_some(snapshot.sha256.clone()),
620        };
621
622        if let Some(log) = self.inner.audit_log.lock().as_mut()
623            && let Err(err) = log.record(&event)
624        {
625            tracing::warn!(error = %err, "Failed to record file conflict audit event");
626        }
627    }
628
629    fn process_path_change(&self, path: PathBuf) -> Result<()> {
630        let tracked_path = normalize_event_path(&path);
631        let snapshot = snapshot_path_sync(&tracked_path).with_context(|| {
632            format!(
633                "Failed to snapshot externally modified file {}",
634                tracked_path.display()
635            )
636        })?;
637
638        let mut should_audit = false;
639
640        {
641            let mut state = self.inner.state.lock();
642            let Some(entry) = state.tracked_files.get_mut(&tracked_path) else {
643                return Ok(());
644            };
645
646            if entry
647                .last_known_disk_snapshot
648                .as_ref()
649                .is_some_and(|known| known.same_contents(&snapshot))
650            {
651                return Ok(());
652            }
653
654            entry.last_known_disk_snapshot = Some(snapshot.clone());
655
656            if entry
657                .last_agent_write_snapshot
658                .as_ref()
659                .is_some_and(|known| known.same_contents(&snapshot))
660            {
661                entry.last_read_snapshot = Some(snapshot);
662                entry.last_agent_write_snapshot = None;
663                entry.stale_conflict = None;
664                return Ok(());
665            }
666
667            if entry
668                .last_read_snapshot
669                .as_ref()
670                .is_some_and(|known| known.same_contents(&snapshot))
671            {
672                entry.stale_conflict = None;
673                return Ok(());
674            }
675
676            if entry
677                .last_read_snapshot
678                .as_ref()
679                .is_some_and(|read_snapshot| !read_snapshot.same_contents(&snapshot))
680            {
681                should_audit = true;
682                match entry.stale_conflict.as_mut() {
683                    Some(_) => {}
684                    None => {
685                        entry.stale_conflict = Some(StaleConflictState {
686                            notification_emitted: false,
687                        });
688                    }
689                }
690            }
691        }
692
693        if should_audit {
694            self.record_conflict_audit(
695                &tracked_path,
696                &snapshot,
697                "watcher_detected_external_change",
698            );
699        }
700
701        Ok(())
702    }
703}
704
705impl Default for EditedFileMonitor {
706    fn default() -> Self {
707        Self::new()
708    }
709}
710
711fn spawn_event_loop(inner: Arc<EditedFileMonitorInner>, event_rx: Receiver<PathBuf>) {
712    thread::spawn(move || {
713        let mut pending = HashMap::<PathBuf, Instant>::new();
714
715        loop {
716            match event_rx.recv_timeout(Duration::from_millis(50)) {
717                Ok(path) => {
718                    pending.insert(normalize_event_path(&path), Instant::now());
719                }
720                Err(RecvTimeoutError::Timeout) => {}
721                Err(RecvTimeoutError::Disconnected) => break,
722            }
723
724            let now = Instant::now();
725            let ready = pending
726                .iter()
727                .filter_map(|(path, observed_at)| {
728                    (now.duration_since(*observed_at) >= inner.debounce_duration)
729                        .then_some(path.clone())
730                })
731                .collect::<Vec<_>>();
732
733            for path in ready {
734                pending.remove(&path);
735                let monitor = EditedFileMonitor {
736                    inner: Arc::clone(&inner),
737                };
738                if let Err(err) = monitor.process_path_change(path.clone()) {
739                    tracing::debug!(path = %path.display(), error = %err, "Edited-file watcher refresh failed");
740                }
741            }
742        }
743    });
744}
745
746fn normalize_event_path(path: &Path) -> PathBuf {
747    std::fs::canonicalize(path).unwrap_or_else(|_| {
748        path.parent()
749            .and_then(|parent| std::fs::canonicalize(parent).ok())
750            .and_then(|parent| path.file_name().map(|name| parent.join(name)))
751            .unwrap_or_else(|| path.to_path_buf())
752    })
753}
754
755async fn snapshot_path_async(path: PathBuf) -> Result<FileSnapshot> {
756    tokio::task::spawn_blocking(move || snapshot_path_sync(&path))
757        .await
758        .context("Failed to join file snapshot task")?
759}
760
761fn snapshot_path_sync(path: &Path) -> Result<FileSnapshot> {
762    let metadata = match std::fs::metadata(path) {
763        Ok(metadata) => metadata,
764        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
765            return Ok(FileSnapshot {
766                exists: false,
767                size_bytes: 0,
768                modified_millis: None,
769                sha256: String::new(),
770                text_content: None,
771            });
772        }
773        Err(err) => {
774            return Err(err)
775                .with_context(|| format!("Failed to read metadata for {}", path.display()));
776        }
777    };
778
779    if !metadata.is_file() {
780        return Ok(FileSnapshot {
781            exists: false,
782            size_bytes: 0,
783            modified_millis: metadata.modified().ok().and_then(system_time_to_millis),
784            sha256: String::new(),
785            text_content: None,
786        });
787    }
788
789    let bytes = std::fs::read(path)
790        .with_context(|| format!("Failed to read file bytes for {}", path.display()))?;
791    let sha256 = calculate_sha256(&bytes);
792
793    Ok(FileSnapshot {
794        exists: true,
795        size_bytes: metadata.len(),
796        modified_millis: metadata.modified().ok().and_then(system_time_to_millis),
797        sha256,
798        text_content: String::from_utf8(bytes).ok(),
799    })
800}
801
802fn system_time_to_millis(time: SystemTime) -> Option<u128> {
803    time.duration_since(UNIX_EPOCH)
804        .ok()
805        .map(|duration| duration.as_millis())
806}
807
808fn workspace_relative_display(workspace_root: &Path, path: &Path) -> String {
809    if let Ok(relative) = path.strip_prefix(workspace_root) {
810        return relative.to_string_lossy().to_string();
811    }
812    if let Ok(canonical_root) = std::fs::canonicalize(workspace_root)
813        && let Ok(relative) = path.strip_prefix(canonical_root)
814    {
815        return relative.to_string_lossy().to_string();
816    }
817    path.to_string_lossy().to_string()
818}
819
820pub fn conflict_override_snapshot(args: &Value) -> Option<FileSnapshot> {
821    args.get(FILE_CONFLICT_OVERRIDE_ARG)
822        .and_then(FileSnapshot::from_identity_value)
823}
824
825fn remove_pending_mutation(entry: &mut TrackedFileState, mutation_id: u64) -> bool {
826    let Some(index) = entry
827        .pending_mutations
828        .iter()
829        .position(|pending_id| *pending_id == mutation_id)
830    else {
831        return false;
832    };
833
834    let _ = entry.pending_mutations.remove(index);
835    true
836}
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841    use tempfile::TempDir;
842
843    #[tokio::test]
844    async fn detects_same_mtime_same_size_hash_mismatch() -> Result<()> {
845        let temp = TempDir::new()?;
846        let file = temp.path().join("sample.txt");
847        std::fs::write(&file, "before\n")?;
848
849        let first = snapshot_path_async(file.clone()).await?;
850        std::fs::write(&file, "after!\n")?;
851        let second = snapshot_path_async(file.clone()).await?;
852
853        assert_eq!(first.size_bytes, second.size_bytes);
854        assert_ne!(first.sha256, second.sha256);
855        Ok(())
856    }
857
858    #[tokio::test]
859    async fn suppresses_self_write_event() -> Result<()> {
860        let temp = TempDir::new()?;
861        let file = temp.path().join("sample.txt");
862        std::fs::write(&file, "before\n")?;
863        let monitor = EditedFileMonitor::new();
864
865        monitor.track_read(&file).await?;
866        std::fs::write(&file, "after\n")?;
867        monitor.record_agent_write_text(&file, "after\n")?;
868        monitor.debug_process_path_change(&file).await?;
869
870        assert!(
871            monitor
872                .detect_conflict(&file, Some("after\n".to_string()), None)
873                .await?
874                .is_none()
875        );
876        Ok(())
877    }
878
879    #[tokio::test]
880    async fn expected_agent_snapshot_does_not_hide_external_follow_up_change() -> Result<()> {
881        let temp = TempDir::new()?;
882        let file = temp.path().join("sample.txt");
883        std::fs::write(&file, "before\n")?;
884        let monitor = EditedFileMonitor::new();
885
886        monitor.track_read(&file).await?;
887        std::fs::write(&file, "after\n")?;
888        monitor.record_agent_write_text(&file, "after\n")?;
889
890        std::fs::write(&file, "after formatted\n")?;
891        monitor.debug_process_path_change(&file).await?;
892
893        let conflict = monitor
894            .detect_conflict(&file, Some("agent\n".to_string()), None)
895            .await?;
896        assert!(conflict.is_some());
897        Ok(())
898    }
899
900    #[tokio::test]
901    async fn queues_mutations_in_order() -> Result<()> {
902        let temp = TempDir::new()?;
903        let file = temp.path().join("sample.txt");
904        std::fs::write(&file, "hello\n")?;
905        let monitor = Arc::new(EditedFileMonitor::new());
906        let normalized_file = normalize_event_path(&file);
907
908        let first = monitor.acquire_mutation(&file).await;
909        let monitor_clone = Arc::clone(&monitor);
910        let file_clone = file.clone();
911        let waiter = tokio::spawn(async move {
912            let lease = monitor_clone.acquire_mutation(&file_clone).await;
913            lease.path().to_path_buf()
914        });
915
916        tokio::time::sleep(Duration::from_millis(50)).await;
917        drop(first);
918        let acquired = waiter.await?;
919        assert_eq!(acquired, normalized_file);
920        Ok(())
921    }
922
923    #[tokio::test]
924    async fn detects_external_change_after_read() -> Result<()> {
925        let temp = TempDir::new()?;
926        let file = temp.path().join("sample.txt");
927        std::fs::write(&file, "before\n")?;
928        let monitor = EditedFileMonitor::new();
929
930        monitor.track_read(&file).await?;
931        std::fs::write(&file, "external\n")?;
932        monitor.debug_process_path_change(&file).await?;
933
934        let conflict = monitor
935            .detect_conflict(&file, Some("agent\n".to_string()), None)
936            .await?;
937        assert!(conflict.is_some());
938        Ok(())
939    }
940
941    #[tokio::test]
942    async fn cancelled_waiter_does_not_block_following_mutation() -> Result<()> {
943        let temp = TempDir::new()?;
944        let file = temp.path().join("sample.txt");
945        std::fs::write(&file, "hello\n")?;
946        let monitor = Arc::new(EditedFileMonitor::new());
947        let normalized_file = normalize_event_path(&file);
948
949        let first = monitor.acquire_mutation(&file).await;
950
951        let pending_monitor = Arc::clone(&monitor);
952        let pending_file = file.clone();
953        let pending = tokio::spawn(async move {
954            let _lease = pending_monitor.acquire_mutation(&pending_file).await;
955        });
956        tokio::time::sleep(Duration::from_millis(50)).await;
957        pending.abort();
958
959        let next_monitor = Arc::clone(&monitor);
960        let next_file = file.clone();
961        let next = tokio::spawn(async move {
962            let lease = next_monitor.acquire_mutation(&next_file).await;
963            lease.path().to_path_buf()
964        });
965
966        drop(first);
967
968        let acquired = tokio::time::timeout(Duration::from_secs(1), next).await??;
969        assert_eq!(acquired, normalized_file);
970        Ok(())
971    }
972
973    #[tokio::test]
974    async fn clears_conflict_state_when_disk_returns_to_read_snapshot() -> Result<()> {
975        let temp = TempDir::new()?;
976        let file = temp.path().join("sample.txt");
977        std::fs::write(&file, "before\n")?;
978        let monitor = EditedFileMonitor::new();
979
980        monitor.track_read(&file).await?;
981        std::fs::write(&file, "external one\n")?;
982        monitor.debug_process_path_change(&file).await?;
983
984        let first_conflict = monitor
985            .detect_conflict(&file, Some("agent\n".to_string()), None)
986            .await?
987            .expect("expected initial conflict");
988        assert!(first_conflict.emit_hitl_notification);
989
990        std::fs::write(&file, "before\n")?;
991        monitor.debug_process_path_change(&file).await?;
992        assert!(
993            monitor
994                .detect_conflict(&file, Some("agent\n".to_string()), None)
995                .await?
996                .is_none()
997        );
998
999        std::fs::write(&file, "external two\n")?;
1000        monitor.debug_process_path_change(&file).await?;
1001        let second_conflict = monitor
1002            .detect_conflict(&file, Some("agent\n".to_string()), None)
1003            .await?
1004            .expect("expected renewed conflict");
1005        assert!(second_conflict.emit_hitl_notification);
1006        Ok(())
1007    }
1008
1009    #[tokio::test]
1010    async fn stale_path_queries_expose_external_change_without_clearing_state() -> Result<()> {
1011        let temp = TempDir::new()?;
1012        let file = temp.path().join("sample.txt");
1013        std::fs::write(&file, "before\n")?;
1014        let monitor = EditedFileMonitor::new();
1015
1016        monitor.track_read(&file).await?;
1017        let tracked_paths = monitor.tracked_paths();
1018        assert_eq!(tracked_paths.len(), 1);
1019        let tracked_path = tracked_paths[0].clone();
1020        assert!(monitor.stale_tracked_paths().is_empty());
1021
1022        std::fs::write(&file, "external\n")?;
1023        monitor.debug_process_path_change(&file).await?;
1024
1025        let freshness = monitor.tracked_path_freshness();
1026        assert_eq!(freshness.len(), 1);
1027        assert_eq!(freshness[0].path, tracked_path);
1028        assert!(freshness[0].is_stale);
1029        assert!(freshness[0].fingerprint.is_some());
1030        assert_eq!(
1031            monitor.stale_tracked_paths(),
1032            vec![freshness[0].path.clone()]
1033        );
1034        assert_eq!(
1035            monitor.tracked_path_fingerprint(&freshness[0].path),
1036            freshness[0].fingerprint
1037        );
1038        Ok(())
1039    }
1040
1041    #[tokio::test]
1042    async fn override_requires_matching_approved_snapshot() -> Result<()> {
1043        let temp = TempDir::new()?;
1044        let file = temp.path().join("sample.txt");
1045        std::fs::write(&file, "before\n")?;
1046        let monitor = EditedFileMonitor::new();
1047
1048        monitor.track_read(&file).await?;
1049        std::fs::write(&file, "external one\n")?;
1050        let approved_snapshot = snapshot_path_async(file.clone()).await?;
1051
1052        std::fs::write(&file, "external two\n")?;
1053
1054        let conflict = monitor
1055            .detect_conflict(&file, Some("agent\n".to_string()), Some(approved_snapshot))
1056            .await?;
1057        assert!(conflict.is_some());
1058        Ok(())
1059    }
1060
1061    #[test]
1062    fn normalizes_missing_event_paths_via_canonical_parent() -> Result<()> {
1063        let temp = TempDir::new()?;
1064        let missing = temp.path().join("missing.txt");
1065        let canonical_parent = std::fs::canonicalize(temp.path())?;
1066
1067        assert_eq!(
1068            normalize_event_path(&missing),
1069            canonical_parent.join("missing.txt")
1070        );
1071        Ok(())
1072    }
1073}