Skip to main content

matrixcode_core/tools/codegraph/
watcher.rs

1//! File watcher for automatic index synchronization.
2
3use anyhow::Result;
4use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::sync::Mutex;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::{Duration, Instant};
10use tokio::sync::{RwLock, broadcast, mpsc};
11use tokio::time::sleep;
12
13use super::git::{
14    check_mcp_daemon_active, check_sync_lock_owner, get_git_status_changes, has_version_changed,
15    is_git_fsmonitor_running, is_git_repository, is_source_file, release_sync_lock,
16    release_watcher_lock, start_git_fsmonitor, try_acquire_sync_lock, try_acquire_watcher_lock,
17    update_version_after_sync, update_watcher_heartbeat,
18};
19use super::ignore::IgnoreMatcher;
20use super::install::get_codegraph_path;
21use super::manager::CodeGraphManager;
22use super::project::find_project_root;
23use super::types::CodeGraphEnv;
24use crate::cancel::CancellationToken;
25use crate::constants::CODEGRAPH_SYNC_INTERVAL_SECS;
26use crate::memory::ProjectStructureAnalyzer;
27
28/// Git status polling interval (for non-fsmonitor fallback).
29const GIT_STATUS_POLL_INTERVAL_SECS: u64 = 2;
30
31/// Handle to manage a running CodeGraph watcher.
32/// Provides lifecycle management: start, stop, status check.
33/// Internally uses Arc, so it can be cloned and shared across threads.
34#[derive(Clone)]
35pub struct WatcherHandle {
36    handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
37    project_path: PathBuf,
38}
39
40impl WatcherHandle {
41    /// Create a new handle (no watcher running yet).
42    pub fn new(project_path: &Path) -> Self {
43        Self {
44            handle: Arc::new(Mutex::new(None)),
45            project_path: project_path.to_path_buf(),
46        }
47    }
48
49    /// Create handle with automatic project root detection.
50    pub fn with_auto_detect(start_path: &Path) -> Self {
51        let project_path = find_project_root(start_path);
52        log::info!(
53            "CodeGraph: detected project root at {}",
54            project_path.display()
55        );
56        Self::new(&project_path)
57    }
58
59    /// Check if watcher is currently running.
60    pub fn is_running(&self) -> bool {
61        let guard = self.handle.lock().unwrap();
62        guard.as_ref().map(|h| !h.is_finished()).unwrap_or(false)
63    }
64
65    /// Start watcher if not running and no daemon conflict.
66    /// Returns true if watcher was started.
67    pub fn start_if_needed(&self, cancel_token: CancellationToken) -> bool {
68        if self.is_running() {
69            log::info!("CodeGraph watcher already running");
70            return false;
71        }
72
73        if CodeGraphWatcher::is_daemon_running(&self.project_path) {
74            log::info!("CodeGraph MCP daemon detected, skipping watcher to avoid conflict");
75            return false;
76        }
77
78        let watcher = CodeGraphWatcher::new(&self.project_path);
79        let handle = watcher.start(cancel_token);
80        log::info!("CodeGraph watcher started (no MCP daemon detected)");
81
82        *self.handle.lock().unwrap() = Some(handle);
83        true
84    }
85
86    /// Stop the watcher if running.
87    pub fn stop(&self) {
88        let guard = self.handle.lock().unwrap();
89        if let Some(ref h) = *guard
90            && !h.is_finished()
91        {
92            log::info!("Aborting CodeGraph watcher...");
93            h.abort();
94        }
95    }
96
97    /// Get the underlying handle for passing to async contexts.
98    pub fn inner(&self) -> Arc<Mutex<Option<tokio::task::JoinHandle<()>>>> {
99        self.handle.clone()
100    }
101
102    /// Get the project path.
103    pub fn project_path(&self) -> &Path {
104        &self.project_path
105    }
106}
107
108/// CodeGraph file watcher for auto-sync.
109pub struct CodeGraphWatcher {
110    project_path: PathBuf,
111    stop_tx: broadcast::Sender<()>,
112    sync_interval: Duration,
113}
114
115impl CodeGraphWatcher {
116    /// Check if CodeGraph MCP daemon is already running.
117    /// Returns true if daemon is active (skip watcher to avoid conflict).
118    pub fn is_daemon_running(project_path: &Path) -> bool {
119        // Method 1: Check daemon.pid file
120        let daemon_pid_path = project_path.join(".codegraph").join("daemon.pid");
121        if daemon_pid_path.exists() {
122            let pid_running = std::fs::read_to_string(&daemon_pid_path)
123                .ok()
124                .and_then(|pid| pid.trim().parse::<u32>().ok())
125                .map(|pid| {
126                    #[cfg(target_os = "windows")]
127                    {
128                        use std::os::windows::process::CommandExt;
129                        const CREATE_NO_WINDOW: u32 = 0x08000000;
130                        std::process::Command::new("tasklist")
131                            .args(["/FI", &format!("PID eq {}", pid)])
132                            .creation_flags(CREATE_NO_WINDOW)
133                            .output()
134                            .map(|o| String::from_utf8_lossy(&o.stdout).contains(&pid.to_string()))
135                            .unwrap_or(false)
136                    }
137                    #[cfg(not(target_os = "windows"))]
138                    std::path::Path::new("/proc").join(pid.to_string()).exists()
139                })
140                .unwrap_or(false);
141            if pid_running {
142                return true;
143            }
144        }
145
146        // Method 2: Check daemon.log for recent activity
147        let daemon_log_path = project_path.join(".codegraph").join("daemon.log");
148        if daemon_log_path.exists() {
149            if let Ok(metadata) = std::fs::metadata(&daemon_log_path) {
150                if let Ok(modified) = metadata.modified() {
151                    let now = std::time::SystemTime::now();
152                    let elapsed = now
153                        .duration_since(modified)
154                        .unwrap_or(std::time::Duration::MAX);
155                    if elapsed < std::time::Duration::from_secs(60) {
156                        log::info!("CodeGraph: daemon.log recently modified, daemon likely active");
157                        return true;
158                    }
159                }
160            }
161        }
162
163        false
164    }
165
166    /// Create a new watcher for the project.
167    pub fn new(project_path: &Path) -> Self {
168        let (stop_tx, _) = broadcast::channel(1);
169        Self {
170            project_path: project_path.to_path_buf(),
171            stop_tx,
172            sync_interval: Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS),
173        }
174    }
175
176    /// Create watcher with automatic project root detection.
177    pub fn with_auto_detect(start_path: &Path) -> Self {
178        let project_path = find_project_root(start_path);
179        log::info!(
180            "CodeGraph: detected project root at {}",
181            project_path.display()
182        );
183        Self::new(&project_path)
184    }
185
186    /// Start watching for file changes.
187    pub fn start(&self, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
188        let project_path = self.project_path.clone();
189        let sync_interval = self.sync_interval;
190
191        tokio::spawn(async move {
192            Self::run_watcher_loop(project_path, sync_interval, cancel_token).await;
193        })
194    }
195
196    /// Stop the watcher via broadcast signal.
197    pub fn stop(&self) {
198        let _ = self.stop_tx.send(());
199    }
200
201    /// Run the watcher loop with dual-path monitoring.
202    async fn run_watcher_loop(
203        project_path: PathBuf,
204        _sync_interval: Duration,
205        cancel_token: CancellationToken,
206    ) {
207        // Check if CodeGraph CLI is available (no auto-install)
208        if get_codegraph_path().is_none() {
209            log::warn!(
210                "CodeGraph CLI not found, watcher disabled. Please install CodeGraph manually."
211            );
212            return;
213        }
214
215        // Try to acquire watcher lock (prevent multiple instances)
216        if !try_acquire_watcher_lock(&project_path) {
217            log::info!("CodeGraph: another instance is watching this project, exiting");
218            return;
219        }
220
221        // Check if this is a code project
222        let analyzer = ProjectStructureAnalyzer::new(project_path.clone());
223        if analyzer.detect_project_type().is_none() {
224            log::info!(
225                "CodeGraph: skipping non-code directory: {}",
226                project_path.display()
227            );
228            return;
229        }
230
231        // Check if CodeGraph is initialized - DO NOT auto-initialize
232        let manager = CodeGraphManager::new(&project_path);
233        if !manager.is_initialized() {
234            log::info!(
235                "CodeGraph: not initialized for {}, skipping watcher. Run 'codegraph init -i' to create index.",
236                project_path.display()
237            );
238            release_watcher_lock(&project_path);
239            return;
240        }
241
242        // Detect environment type
243        let env_type = if is_git_repository(&project_path) {
244            CodeGraphEnv::Git
245        } else {
246            CodeGraphEnv::NonGit
247        };
248
249        log::info!(
250            "CodeGraph: environment detected as {} for: {}",
251            match env_type {
252                CodeGraphEnv::Git => "Git repository",
253                CodeGraphEnv::NonGit => "non-Git directory",
254            },
255            project_path.display()
256        );
257
258        // Check version consistency before starting
259        if env_type == CodeGraphEnv::Git && has_version_changed(&project_path) {
260            log::info!("CodeGraph: version changed, performing sync before starting watcher");
261            if let Err(e) = manager.sync().await {
262                log::warn!("CodeGraph version sync failed: {}", e);
263            }
264            update_version_after_sync(&project_path);
265        }
266
267        // Initial sync on startup
268        log::info!("CodeGraph: performing initial sync on startup");
269        if let Err(e) = manager.sync().await {
270            log::warn!("CodeGraph initial sync failed: {}", e);
271        }
272        update_version_after_sync(&project_path);
273
274        // Channel for file change events
275        let (change_tx, mut change_rx) = mpsc::channel::<PathBuf>(100);
276
277        // Create notify file watcher
278        let watcher_result = Self::create_file_watcher(&project_path, change_tx.clone());
279        if watcher_result.is_err() {
280            log::warn!(
281                "CodeGraph notify watcher failed to start: {}",
282                watcher_result.err().unwrap()
283            );
284            release_watcher_lock(&project_path);
285            return;
286        }
287        let _watcher = watcher_result.unwrap();
288
289        // Load ignore matcher
290        let ignore_matcher = IgnoreMatcher::load(&project_path);
291
292        // Track sync state
293        let syncing = Arc::new(AtomicBool::new(false));
294        let syncing_clone = syncing.clone();
295        let changed_files = Arc::new(RwLock::new(std::collections::HashSet::<PathBuf>::new()));
296        let last_change = Arc::new(std::sync::Mutex::new(Instant::now()));
297
298        // Debounce settings
299        let debounce_delay = Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS);
300        let git_poll_interval = Duration::from_secs(GIT_STATUS_POLL_INTERVAL_SECS);
301
302        // Start Git monitoring if in Git environment
303        let git_monitoring = if env_type == CodeGraphEnv::Git {
304            if start_git_fsmonitor(&project_path) {
305                log::info!("CodeGraph: Git fsmonitor daemon started");
306                true
307            } else if is_git_fsmonitor_running(&project_path) {
308                log::info!("CodeGraph: Git fsmonitor daemon already running");
309                true
310            } else {
311                log::info!("CodeGraph: Git fsmonitor not available, using git status polling");
312                false
313            }
314        } else {
315            false
316        };
317
318        log::info!(
319            "CodeGraph watcher started (Git monitoring: {}, notify fallback: always)",
320            git_monitoring
321        );
322
323        let check_interval = Duration::from_secs(1);
324
325        loop {
326            if cancel_token.is_cancelled() {
327                // Final sync before exit
328                let pending_count = changed_files.read().await.len();
329                if pending_count > 0 {
330                    log::info!(
331                        "CodeGraph: final sync before exit ({} unique files)",
332                        pending_count
333                    );
334                    let manager = CodeGraphManager::new(&project_path);
335                    if manager.is_initialized() {
336                        let _ = manager.sync().await;
337                        update_version_after_sync(&project_path);
338                    }
339                }
340                release_watcher_lock(&project_path);
341                log::info!("CodeGraph watcher stopped");
342                break;
343            }
344
345            // Update heartbeat
346            update_watcher_heartbeat(&project_path);
347
348            tokio::select! {
349                // Notify file changes
350                Some(path) = change_rx.recv() => {
351                    if cancel_token.is_cancelled() {
352                        break;
353                    }
354                    if is_source_file(&path)
355                        && !ignore_matcher.should_ignore(&path, &project_path) {
356                        {
357                            let mut files = changed_files.write().await;
358                            if files.insert(path.clone()) {
359                                *last_change.lock().unwrap() = Instant::now();
360                                log::debug!(
361                                    "CodeGraph [notify]: new file {} (total unique: {})",
362                                    path.display(),
363                                    files.len()
364                                );
365                            }
366                        }
367                    }
368                }
369
370                // Git status polling
371                _ = sleep(git_poll_interval), if git_monitoring => {
372                    if cancel_token.is_cancelled() {
373                        break;
374                    }
375                    let changes = get_git_status_changes(&project_path);
376                    if changes.has_changes() {
377                        let mut new_count = 0;
378                        {
379                            let mut files = changed_files.write().await;
380                            for path in changes.modified.iter().chain(&changes.added).chain(&changes.deleted) {
381                                if files.insert(path.clone()) {
382                                    new_count += 1;
383                                }
384                            }
385                            if new_count > 0 {
386                                log::debug!(
387                                    "CodeGraph [git]: {} new changes (total unique: {})",
388                                    new_count,
389                                    files.len()
390                                );
391                            }
392                        }
393                        if new_count > 0 {
394                            *last_change.lock().unwrap() = Instant::now();
395                        }
396                    }
397                }
398
399                // Periodic sync check
400                _ = sleep(check_interval) => {
401                    if cancel_token.is_cancelled() {
402                        break;
403                    }
404
405                    let files_count = changed_files.read().await.len();
406                    let elapsed = last_change.lock().unwrap().elapsed();
407
408                    if !syncing_clone.load(Ordering::SeqCst)
409                        && files_count > 0
410                        && elapsed >= debounce_delay {
411                        syncing_clone.store(true, Ordering::SeqCst);
412                        log::info!("CodeGraph: auto-sync triggered ({} unique files changed)", files_count);
413
414                        // Check if MCP daemon is active before syncing
415                        if check_mcp_daemon_active(&project_path) {
416                            log::info!("CodeGraph: MCP daemon active, skipping our sync to avoid conflict");
417                            syncing_clone.store(false, Ordering::SeqCst);
418                        } else {
419                            let our_timestamp = try_acquire_sync_lock(&project_path);
420                            if our_timestamp > 0 {
421                                let manager = CodeGraphManager::new(&project_path);
422                                if manager.is_initialized() {
423                                    if let Err(e) = manager.sync().await {
424                                        log::warn!("CodeGraph sync failed: {}", e);
425                                    } else {
426                                        // Check if lock still belongs to us before updating
427                                        if check_sync_lock_owner(&project_path, our_timestamp) {
428                                            update_version_after_sync(&project_path);
429                                            changed_files.write().await.clear();
430                                            log::debug!("CodeGraph: sync completed, lock verified");
431                                        } else {
432                                            // Lock was stolen by another process, abandon this sync
433                                            log::info!("CodeGraph: sync abandoned, another process took over");
434                                            // Don't clear changed_files, let next sync handle them
435                                        }
436                                    }
437                                }
438                                release_sync_lock(&project_path);
439                            } else {
440                                log::debug!("CodeGraph: skipping sync, another instance is syncing");
441                            }
442                            syncing_clone.store(false, Ordering::SeqCst);
443                        }
444                    }
445                }
446            }
447        }
448    }
449
450    /// Create the underlying file watcher with optimized config.
451    fn create_file_watcher(
452        project_path: &Path,
453        change_tx: mpsc::Sender<PathBuf>,
454    ) -> Result<RecommendedWatcher> {
455        let tx = change_tx.clone();
456
457        let handler = move |event: Result<Event, notify::Error>| {
458            if let Ok(event) = event {
459                if !event.kind.is_access() && !event.kind.is_other() {
460                    for path in event.paths {
461                        let _ = tx.try_send(path);
462                    }
463                }
464            }
465        };
466
467        let config = Config::default()
468            .with_poll_interval(Duration::from_secs(2))
469            .with_compare_contents(false);
470
471        let mut watcher = RecommendedWatcher::new(handler, config)?;
472        watcher.watch(project_path, RecursiveMode::Recursive)?;
473
474        Ok(watcher)
475    }
476}