Skip to main content

matrixcode_core/tools/codegraph/
watcher.rs

1//! File watcher for automatic index synchronization.
2
3use anyhow::Result;
4use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::sync::Mutex;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::{Duration, Instant};
10use tokio::sync::{RwLock, broadcast, mpsc};
11use tokio::time::sleep;
12
13use super::git::{
14    check_mcp_daemon_active, check_sync_lock_owner, get_git_status_changes, has_version_changed,
15    is_git_fsmonitor_running, is_git_repository, is_source_file, release_sync_lock,
16    release_watcher_lock, start_git_fsmonitor, try_acquire_sync_lock, try_acquire_watcher_lock,
17    update_version_after_sync, update_watcher_heartbeat,
18};
19use super::ignore::IgnoreMatcher;
20use super::install::get_codegraph_path;
21use super::manager::CodeGraphManager;
22use super::project::find_project_root;
23use super::types::{CodeGraphEnv, PendingChanges};
24use crate::cancel::CancellationToken;
25use crate::constants::CODEGRAPH_SYNC_INTERVAL_SECS;
26use crate::debug::debug_log;
27use crate::event::AgentEvent;
28use crate::memory::ProjectStructureAnalyzer;
29
30/// Git status polling interval (for non-fsmonitor fallback).
31const GIT_STATUS_POLL_INTERVAL_SECS: u64 = 2;
32
33/// Status update interval for UI (seconds).
34const STATUS_UPDATE_INTERVAL_SECS: u64 = 5;
35
36/// Handle to manage a running CodeGraph watcher.
37/// Provides lifecycle management: start, stop, status check.
38/// Internally uses Arc, so it can be cloned and shared across threads.
39#[derive(Clone)]
40pub struct WatcherHandle {
41    handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
42    project_path: PathBuf,
43}
44
45impl WatcherHandle {
46    /// Create a new handle (no watcher running yet).
47    pub fn new(project_path: &Path) -> Self {
48        Self {
49            handle: Arc::new(Mutex::new(None)),
50            project_path: project_path.to_path_buf(),
51        }
52    }
53
54    /// Create handle with automatic project root detection.
55    pub fn with_auto_detect(start_path: &Path) -> Self {
56        let project_path = find_project_root(start_path);
57        debug_log().log("codegraph", &format!(
58            "detected project root at {}",
59            project_path.display()
60        ));
61        Self::new(&project_path)
62    }
63
64    /// Check if watcher is currently running.
65    pub fn is_running(&self) -> bool {
66        let guard = self.handle.lock().unwrap();
67        guard.as_ref().map(|h| !h.is_finished()).unwrap_or(false)
68    }
69
70    /// Start watcher if not running and no daemon conflict.
71    /// Returns true if watcher was started.
72    pub fn start_if_needed(&self, cancel_token: CancellationToken) -> bool {
73        if self.is_running() {
74            debug_log().log("codegraph", "watcher already running");
75            return false;
76        }
77
78        if CodeGraphWatcher::is_daemon_running(&self.project_path) {
79            debug_log().log("codegraph", "MCP daemon detected, skipping watcher to avoid conflict");
80            return false;
81        }
82
83        let watcher = CodeGraphWatcher::new(&self.project_path);
84        let handle = watcher.start(cancel_token);
85        debug_log().log("codegraph", "watcher started (no MCP daemon detected)");
86
87        *self.handle.lock().unwrap() = Some(handle);
88        true
89    }
90
91    /// Stop the watcher if running.
92    pub fn stop(&self) {
93        let guard = self.handle.lock().unwrap();
94        if let Some(ref h) = *guard
95            && !h.is_finished()
96        {
97            debug_log().log("codegraph", "aborting watcher...");
98            h.abort();
99        }
100    }
101
102    /// Get the underlying handle for passing to async contexts.
103    pub fn inner(&self) -> Arc<Mutex<Option<tokio::task::JoinHandle<()>>>> {
104        self.handle.clone()
105    }
106
107    /// Get the project path.
108    pub fn project_path(&self) -> &Path {
109        &self.project_path
110    }
111}
112
113/// CodeGraph file watcher for auto-sync.
114pub struct CodeGraphWatcher {
115    project_path: PathBuf,
116    stop_tx: broadcast::Sender<()>,
117    sync_interval: Duration,
118}
119
120impl CodeGraphWatcher {
121    /// Check if CodeGraph MCP daemon is already running.
122    /// Returns true if daemon is active (skip watcher to avoid conflict).
123    pub fn is_daemon_running(project_path: &Path) -> bool {
124        // Method 1: Check daemon.pid file
125        let daemon_pid_path = project_path.join(".codegraph").join("daemon.pid");
126        if daemon_pid_path.exists() {
127            let pid_running = std::fs::read_to_string(&daemon_pid_path)
128                .ok()
129                .and_then(|pid| pid.trim().parse::<u32>().ok())
130                .map(|pid| {
131                    #[cfg(target_os = "windows")]
132                    {
133                        use std::os::windows::process::CommandExt;
134                        const CREATE_NO_WINDOW: u32 = 0x08000000;
135                        std::process::Command::new("tasklist")
136                            .args(["/FI", &format!("PID eq {}", pid)])
137                            .creation_flags(CREATE_NO_WINDOW)
138                            .output()
139                            .map(|o| String::from_utf8_lossy(&o.stdout).contains(&pid.to_string()))
140                            .unwrap_or(false)
141                    }
142                    #[cfg(not(target_os = "windows"))]
143                    std::path::Path::new("/proc").join(pid.to_string()).exists()
144                })
145                .unwrap_or(false);
146            if pid_running {
147                return true;
148            }
149        }
150
151        // Method 2: Check daemon.log for recent activity
152        let daemon_log_path = project_path.join(".codegraph").join("daemon.log");
153        if daemon_log_path.exists() {
154            if let Ok(metadata) = std::fs::metadata(&daemon_log_path) {
155                if let Ok(modified) = metadata.modified() {
156                    let now = std::time::SystemTime::now();
157                    let elapsed = now
158                        .duration_since(modified)
159                        .unwrap_or(std::time::Duration::MAX);
160                    if elapsed < std::time::Duration::from_secs(60) {
161                        debug_log().log("codegraph", "daemon.log recently modified, daemon likely active");
162                        return true;
163                    }
164                }
165            }
166        }
167
168        false
169    }
170
171    /// Create a new watcher for the project.
172    pub fn new(project_path: &Path) -> Self {
173        let (stop_tx, _) = broadcast::channel(1);
174        Self {
175            project_path: project_path.to_path_buf(),
176            stop_tx,
177            sync_interval: Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS),
178        }
179    }
180
181    /// Create watcher with automatic project root detection.
182    pub fn with_auto_detect(start_path: &Path) -> Self {
183        let project_path = find_project_root(start_path);
184        debug_log().log("codegraph", &format!(
185            "detected project root at {}",
186            project_path.display()
187        ));
188        Self::new(&project_path)
189    }
190
191    /// Start watching for file changes.
192    pub fn start(&self, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
193        let project_path = self.project_path.clone();
194        let sync_interval = self.sync_interval;
195
196        tokio::spawn(async move {
197            Self::run_watcher_loop(project_path, sync_interval, cancel_token, None).await;
198        })
199    }
200
201    /// Start watching with status updates to UI.
202    pub fn start_with_status_updates(
203        &self,
204        cancel_token: CancellationToken,
205        event_tx: mpsc::Sender<AgentEvent>,
206    ) -> tokio::task::JoinHandle<()> {
207        let project_path = self.project_path.clone();
208        let sync_interval = self.sync_interval;
209
210        tokio::spawn(async move {
211            Self::run_watcher_loop(project_path, sync_interval, cancel_token, Some(event_tx)).await;
212        })
213    }
214
215    /// Stop the watcher via broadcast signal.
216    pub fn stop(&self) {
217        let _ = self.stop_tx.send(());
218    }
219
220    /// Send CodeGraph status update to UI if event_tx is available.
221    /// pending_count is the number of files waiting to be synced (from watcher's internal tracking).
222    async fn send_status_update(
223        project_path: &Path,
224        event_tx: &Option<mpsc::Sender<AgentEvent>>,
225        pending_count: usize,
226    ) {
227        if let Some(tx) = event_tx {
228            let manager = CodeGraphManager::new(project_path);
229            if manager.is_initialized() {
230                if let Ok(mut status) = manager.status() {
231                    // Override pending_changes with actual watcher state
232                    // pending_count represents files detected by watcher but not yet synced
233                    status.pending_changes = PendingChanges {
234                        added: pending_count as u32,
235                        modified: 0,
236                        removed: 0,
237                    };
238                    debug_log().log("codegraph", &format!(
239                        "sending status update (pending: {}, nodes: {})",
240                        pending_count,
241                        status.node_count
242                    ));
243                    let _ = tx.send(AgentEvent::codegraph_status(status)).await;
244                } else {
245                    debug_log().log("codegraph", "failed to get status");
246                }
247            } else {
248                debug_log().log("codegraph", "not initialized, skipping status update");
249            }
250        }
251    }
252
253    /// Run the watcher loop with dual-path monitoring.
254    async fn run_watcher_loop(
255        project_path: PathBuf,
256        _sync_interval: Duration,
257        cancel_token: CancellationToken,
258        event_tx: Option<mpsc::Sender<AgentEvent>>,
259    ) {
260        // Check if CodeGraph CLI is available (no auto-install)
261        if get_codegraph_path().is_none() {
262            debug_log().log("codegraph", "CLI not found, watcher disabled. Please install CodeGraph manually.");
263            return;
264        }
265
266        // Try to acquire watcher lock (prevent multiple instances)
267        if !try_acquire_watcher_lock(&project_path) {
268            debug_log().log("codegraph", "another instance is watching this project, exiting");
269            return;
270        }
271
272        // Check if this is a code project
273        let analyzer = ProjectStructureAnalyzer::new(project_path.clone());
274        if analyzer.detect_project_type().is_none() {
275            debug_log().log("codegraph", &format!(
276                "skipping non-code directory: {}",
277                project_path.display()
278            ));
279            return;
280        }
281
282        // Check if CodeGraph is initialized - DO NOT auto-initialize
283        let manager = CodeGraphManager::new(&project_path);
284        if !manager.is_initialized() {
285            debug_log().log("codegraph", &format!(
286                "not initialized for {}, skipping watcher. Run 'codegraph init -i' to create index.",
287                project_path.display()
288            ));
289            release_watcher_lock(&project_path);
290            return;
291        }
292
293        // Detect environment type
294        let env_type = if is_git_repository(&project_path) {
295            CodeGraphEnv::Git
296        } else {
297            CodeGraphEnv::NonGit
298        };
299
300        debug_log().log("codegraph", &format!(
301            "environment detected as {} for: {}",
302            match env_type {
303                CodeGraphEnv::Git => "Git repository",
304                CodeGraphEnv::NonGit => "non-Git directory",
305            },
306            project_path.display()
307        ));
308
309        // Check version consistency before starting
310        if env_type == CodeGraphEnv::Git && has_version_changed(&project_path) {
311            debug_log().log("codegraph", "version changed, performing sync before starting watcher");
312            if let Err(e) = manager.sync().await {
313                debug_log().log("codegraph", &format!("version sync failed: {}", e));
314            }
315            update_version_after_sync(&project_path);
316        }
317
318        // Initial sync on startup
319        debug_log().log("codegraph", "performing initial sync on startup");
320        if let Err(e) = manager.sync().await {
321            debug_log().log("codegraph", &format!("initial sync failed: {}", e));
322        }
323        update_version_after_sync(&project_path);
324
325        // Send initial status update to UI (pending = 0 after initial sync)
326        Self::send_status_update(&project_path, &event_tx, 0).await;
327
328        // Channel for file change events
329        let (change_tx, mut change_rx) = mpsc::channel::<PathBuf>(100);
330
331        // Create notify file watcher
332        let watcher_result = Self::create_file_watcher(&project_path, change_tx.clone());
333        if watcher_result.is_err() {
334            debug_log().log("codegraph", &format!(
335                "notify watcher failed to start: {}",
336                watcher_result.err().unwrap()
337            ));
338            release_watcher_lock(&project_path);
339            return;
340        }
341        let _watcher = watcher_result.unwrap();
342
343        // Load ignore matcher
344        let ignore_matcher = IgnoreMatcher::load(&project_path);
345
346        // Track sync state
347        let syncing = Arc::new(AtomicBool::new(false));
348        let syncing_clone = syncing.clone();
349        let changed_files = Arc::new(RwLock::new(std::collections::HashSet::<PathBuf>::new()));
350        let last_change = Arc::new(std::sync::Mutex::new(Instant::now()));
351
352        // Debounce settings
353        let debounce_delay = Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS);
354        let git_poll_interval = Duration::from_secs(GIT_STATUS_POLL_INTERVAL_SECS);
355
356        // Start Git monitoring if in Git environment
357        let git_monitoring = if env_type == CodeGraphEnv::Git {
358            if start_git_fsmonitor(&project_path) {
359                debug_log().log("codegraph", "Git fsmonitor daemon started");
360                true
361            } else if is_git_fsmonitor_running(&project_path) {
362                debug_log().log("codegraph", "Git fsmonitor daemon already running");
363                true
364            } else {
365                debug_log().log("codegraph", "Git fsmonitor not available, using git status polling");
366                false
367            }
368        } else {
369            false
370        };
371
372        debug_log().log("codegraph", &format!(
373            "watcher started (Git monitoring: {}, notify fallback: always)",
374            git_monitoring
375        ));
376
377        let check_interval = Duration::from_secs(1);
378        let status_update_interval = Duration::from_secs(STATUS_UPDATE_INTERVAL_SECS);
379        let mut last_status_update = Instant::now();
380
381        loop {
382            if cancel_token.is_cancelled() {
383                // Final sync before exit
384                let pending_count = changed_files.read().await.len();
385                if pending_count > 0 {
386                    debug_log().log("codegraph", &format!(
387                        "final sync before exit ({} unique files)",
388                        pending_count
389                    ));
390                    let manager = CodeGraphManager::new(&project_path);
391                    if manager.is_initialized() {
392                        let _ = manager.sync().await;
393                        update_version_after_sync(&project_path);
394                    }
395                }
396                release_watcher_lock(&project_path);
397                debug_log().log("codegraph", "watcher stopped");
398                break;
399            }
400
401            // Update heartbeat
402            update_watcher_heartbeat(&project_path);
403
404            // Periodic status update to UI (with current pending count)
405            if last_status_update.elapsed() >= status_update_interval {
406                let pending = changed_files.read().await.len();
407                Self::send_status_update(&project_path, &event_tx, pending).await;
408                last_status_update = Instant::now();
409            }
410
411            tokio::select! {
412                // Notify file changes
413                Some(path) = change_rx.recv() => {
414                    if cancel_token.is_cancelled() {
415                        break;
416                    }
417                    if is_source_file(&path)
418                        && !ignore_matcher.should_ignore(&path, &project_path) {
419                        {
420                            let mut files = changed_files.write().await;
421                            if files.insert(path.clone()) {
422                                *last_change.lock().unwrap() = Instant::now();
423                            }
424                        }
425                    }
426                }
427
428                // Git status polling
429                _ = sleep(git_poll_interval), if git_monitoring => {
430                    if cancel_token.is_cancelled() {
431                        break;
432                    }
433                    let changes = get_git_status_changes(&project_path);
434                    if changes.has_changes() {
435                        let mut new_count = 0;
436                        {
437                            let mut files = changed_files.write().await;
438                            for path in changes.modified.iter().chain(&changes.added).chain(&changes.deleted) {
439                                if files.insert(path.clone()) {
440                                    new_count += 1;
441                                }
442                            }
443                        }
444                        if new_count > 0 {
445                            *last_change.lock().unwrap() = Instant::now();
446                        }
447                    }
448                }
449
450                // Periodic sync check
451                _ = sleep(check_interval) => {
452                    if cancel_token.is_cancelled() {
453                        break;
454                    }
455
456                    let files_count = changed_files.read().await.len();
457                    let elapsed = last_change.lock().unwrap().elapsed();
458
459                    if !syncing_clone.load(Ordering::SeqCst)
460                        && files_count > 0
461                        && elapsed >= debounce_delay {
462                        syncing_clone.store(true, Ordering::SeqCst);
463                        debug_log().log("codegraph", &format!(
464                            "auto-sync triggered ({} unique files changed)",
465                            files_count
466                        ));
467
468                        // Check if MCP daemon is active before syncing
469                        if check_mcp_daemon_active(&project_path) {
470                            debug_log().log("codegraph", "MCP daemon active, skipping our sync to avoid conflict");
471                            syncing_clone.store(false, Ordering::SeqCst);
472                        } else {
473                            let our_timestamp = try_acquire_sync_lock(&project_path);
474                            if our_timestamp > 0 {
475                                let manager = CodeGraphManager::new(&project_path);
476                                if manager.is_initialized() {
477                                    if let Err(e) = manager.sync().await {
478                                        debug_log().log("codegraph", &format!("sync failed: {}", e));
479                                    } else {
480                                        // Check if lock still belongs to us before updating
481                                        if check_sync_lock_owner(&project_path, our_timestamp) {
482                                            update_version_after_sync(&project_path);
483                                            changed_files.write().await.clear();
484
485                                            // Send status update to UI after successful sync (pending = 0)
486                                            Self::send_status_update(&project_path, &event_tx, 0).await;
487                                        } else {
488                                            // Lock was stolen by another process, abandon this sync
489                                            debug_log().log("codegraph", "sync abandoned, another process took over");
490                                            // Don't clear changed_files, let next sync handle them
491                                        }
492                                    }
493                                }
494                                release_sync_lock(&project_path);
495                            }
496                            syncing_clone.store(false, Ordering::SeqCst);
497                        }
498                    }
499                }
500            }
501        }
502    }
503
504    /// Create the underlying file watcher with optimized config.
505    fn create_file_watcher(
506        project_path: &Path,
507        change_tx: mpsc::Sender<PathBuf>,
508    ) -> Result<RecommendedWatcher> {
509        let tx = change_tx.clone();
510
511        let handler = move |event: Result<Event, notify::Error>| {
512            if let Ok(event) = event {
513                if !event.kind.is_access() && !event.kind.is_other() {
514                    for path in event.paths {
515                        let _ = tx.try_send(path);
516                    }
517                }
518            }
519        };
520
521        let config = Config::default()
522            .with_poll_interval(Duration::from_secs(2))
523            .with_compare_contents(false);
524
525        let mut watcher = RecommendedWatcher::new(handler, config)?;
526        watcher.watch(project_path, RecursiveMode::Recursive)?;
527
528        Ok(watcher)
529    }
530}