Skip to main content

matrixcode_core/tools/codegraph/
watcher.rs

1//! File watcher for automatic index synchronization.
2
3use anyhow::Result;
4use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10use tokio::sync::{broadcast, mpsc, RwLock};
11use tokio::time::sleep;
12
13use super::manager::CodeGraphManager;
14use super::git::{
15    is_git_repository, get_git_status_changes, is_git_fsmonitor_running, start_git_fsmonitor,
16    has_version_changed, update_version_after_sync,
17    try_acquire_watcher_lock, release_watcher_lock,
18    try_acquire_sync_lock, release_sync_lock, check_sync_lock_owner,
19    update_watcher_heartbeat,
20    is_source_file,
21    check_mcp_daemon_active,
22};
23use super::ignore::IgnoreMatcher;
24use super::install::get_codegraph_path;
25use super::project::find_project_root;
26use super::types::CodeGraphEnv;
27use crate::constants::CODEGRAPH_SYNC_INTERVAL_SECS;
28use crate::memory::ProjectStructureAnalyzer;
29use crate::cancel::CancellationToken;
30
31/// Git status polling interval (for non-fsmonitor fallback).
32const GIT_STATUS_POLL_INTERVAL_SECS: u64 = 2;
33
34/// Handle to manage a running CodeGraph watcher.
35/// Provides lifecycle management: start, stop, status check.
36/// Internally uses Arc, so it can be cloned and shared across threads.
37#[derive(Clone)]
38pub struct WatcherHandle {
39    handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
40    project_path: PathBuf,
41}
42
43impl WatcherHandle {
44    /// Create a new handle (no watcher running yet).
45    pub fn new(project_path: &Path) -> Self {
46        Self {
47            handle: Arc::new(Mutex::new(None)),
48            project_path: project_path.to_path_buf(),
49        }
50    }
51
52    /// Create handle with automatic project root detection.
53    pub fn with_auto_detect(start_path: &Path) -> Self {
54        let project_path = find_project_root(start_path);
55        log::info!("CodeGraph: detected project root at {}", project_path.display());
56        Self::new(&project_path)
57    }
58
59    /// Check if watcher is currently running.
60    pub fn is_running(&self) -> bool {
61        let guard = self.handle.lock().unwrap();
62        guard.as_ref().map(|h| !h.is_finished()).unwrap_or(false)
63    }
64
65    /// Start watcher if not running and no daemon conflict.
66    /// Returns true if watcher was started.
67    pub fn start_if_needed(&self, cancel_token: CancellationToken) -> bool {
68        if self.is_running() {
69            log::info!("CodeGraph watcher already running");
70            return false;
71        }
72
73        if CodeGraphWatcher::is_daemon_running(&self.project_path) {
74            log::info!("CodeGraph MCP daemon detected, skipping watcher to avoid conflict");
75            return false;
76        }
77
78        let watcher = CodeGraphWatcher::new(&self.project_path);
79        let handle = watcher.start(cancel_token);
80        log::info!("CodeGraph watcher started (no MCP daemon detected)");
81
82        *self.handle.lock().unwrap() = Some(handle);
83        true
84    }
85
86    /// Stop the watcher if running.
87    pub fn stop(&self) {
88        let guard = self.handle.lock().unwrap();
89        if let Some(ref h) = *guard
90            && !h.is_finished() {
91            log::info!("Aborting CodeGraph watcher...");
92            h.abort();
93        }
94    }
95
96    /// Get the underlying handle for passing to async contexts.
97    pub fn inner(&self) -> Arc<Mutex<Option<tokio::task::JoinHandle<()>>>> {
98        self.handle.clone()
99    }
100
101    /// Get the project path.
102    pub fn project_path(&self) -> &Path {
103        &self.project_path
104    }
105}
106
107/// CodeGraph file watcher for auto-sync.
108pub struct CodeGraphWatcher {
109    project_path: PathBuf,
110    stop_tx: broadcast::Sender<()>,
111    sync_interval: Duration,
112}
113
114impl CodeGraphWatcher {
115    /// Check if CodeGraph MCP daemon is already running.
116    /// Returns true if daemon is active (skip watcher to avoid conflict).
117    pub fn is_daemon_running(project_path: &Path) -> bool {
118        // Method 1: Check daemon.pid file
119        let daemon_pid_path = project_path.join(".codegraph").join("daemon.pid");
120        if daemon_pid_path.exists() {
121            let pid_running = std::fs::read_to_string(&daemon_pid_path)
122                .ok()
123                .and_then(|pid| pid.trim().parse::<u32>().ok())
124                .map(|pid| {
125                    #[cfg(target_os = "windows")]
126                    {
127                        use std::os::windows::process::CommandExt;
128                        const CREATE_NO_WINDOW: u32 = 0x08000000;
129                        std::process::Command::new("tasklist")
130                            .args(["/FI", &format!("PID eq {}", pid)])
131                            .creation_flags(CREATE_NO_WINDOW)
132                            .output()
133                            .map(|o| String::from_utf8_lossy(&o.stdout).contains(&pid.to_string()))
134                            .unwrap_or(false)
135                    }
136                    #[cfg(not(target_os = "windows"))]
137                    std::path::Path::new("/proc").join(pid.to_string()).exists()
138                })
139                .unwrap_or(false);
140            if pid_running {
141                return true;
142            }
143        }
144
145        // Method 2: Check daemon.log for recent activity
146        let daemon_log_path = project_path.join(".codegraph").join("daemon.log");
147        if daemon_log_path.exists() {
148            if let Ok(metadata) = std::fs::metadata(&daemon_log_path) {
149                if let Ok(modified) = metadata.modified() {
150                    let now = std::time::SystemTime::now();
151                    let elapsed = now.duration_since(modified).unwrap_or(std::time::Duration::MAX);
152                    if elapsed < std::time::Duration::from_secs(60) {
153                        log::info!("CodeGraph: daemon.log recently modified, daemon likely active");
154                        return true;
155                    }
156                }
157            }
158        }
159
160        false
161    }
162
163    /// Create a new watcher for the project.
164    pub fn new(project_path: &Path) -> Self {
165        let (stop_tx, _) = broadcast::channel(1);
166        Self {
167            project_path: project_path.to_path_buf(),
168            stop_tx,
169            sync_interval: Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS),
170        }
171    }
172
173    /// Create watcher with automatic project root detection.
174    pub fn with_auto_detect(start_path: &Path) -> Self {
175        let project_path = find_project_root(start_path);
176        log::info!("CodeGraph: detected project root at {}", project_path.display());
177        Self::new(&project_path)
178    }
179
180    /// Start watching for file changes.
181    pub fn start(&self, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
182        let project_path = self.project_path.clone();
183        let sync_interval = self.sync_interval;
184
185        tokio::spawn(async move {
186            Self::run_watcher_loop(project_path, sync_interval, cancel_token).await;
187        })
188    }
189
190    /// Stop the watcher via broadcast signal.
191    pub fn stop(&self) {
192        let _ = self.stop_tx.send(());
193    }
194
195    /// Run the watcher loop with dual-path monitoring.
196    async fn run_watcher_loop(
197        project_path: PathBuf,
198        _sync_interval: Duration,
199        cancel_token: CancellationToken,
200    ) {
201        // Check if CodeGraph CLI is available (no auto-install)
202        if get_codegraph_path().is_none() {
203            log::warn!("CodeGraph CLI not found, watcher disabled. Please install CodeGraph manually.");
204            return;
205        }
206
207        // Try to acquire watcher lock (prevent multiple instances)
208        if !try_acquire_watcher_lock(&project_path) {
209            log::info!("CodeGraph: another instance is watching this project, exiting");
210            return;
211        }
212
213        // Check if this is a code project
214        let analyzer = ProjectStructureAnalyzer::new(project_path.clone());
215        if analyzer.detect_project_type().is_none() {
216            log::info!(
217                "CodeGraph: skipping non-code directory: {}",
218                project_path.display()
219            );
220            return;
221        }
222
223        // Check if CodeGraph is initialized - DO NOT auto-initialize
224        let manager = CodeGraphManager::new(&project_path);
225        if !manager.is_initialized() {
226            log::info!(
227                "CodeGraph: not initialized for {}, skipping watcher. Run 'codegraph init -i' to create index.",
228                project_path.display()
229            );
230            release_watcher_lock(&project_path);
231            return;
232        }
233
234        // Detect environment type
235        let env_type = if is_git_repository(&project_path) {
236            CodeGraphEnv::Git
237        } else {
238            CodeGraphEnv::NonGit
239        };
240        
241        log::info!(
242            "CodeGraph: environment detected as {} for: {}",
243            match env_type {
244                CodeGraphEnv::Git => "Git repository",
245                CodeGraphEnv::NonGit => "non-Git directory",
246            },
247            project_path.display()
248        );
249
250        // Check version consistency before starting
251        if env_type == CodeGraphEnv::Git && has_version_changed(&project_path) {
252            log::info!("CodeGraph: version changed, performing sync before starting watcher");
253            if let Err(e) = manager.sync().await {
254                log::warn!("CodeGraph version sync failed: {}", e);
255            }
256            update_version_after_sync(&project_path);
257        }
258
259        // Initial sync on startup
260        log::info!("CodeGraph: performing initial sync on startup");
261        if let Err(e) = manager.sync().await {
262            log::warn!("CodeGraph initial sync failed: {}", e);
263        }
264        update_version_after_sync(&project_path);
265
266        // Channel for file change events
267        let (change_tx, mut change_rx) = mpsc::channel::<PathBuf>(100);
268
269        // Create notify file watcher
270        let watcher_result = Self::create_file_watcher(&project_path, change_tx.clone());
271        if watcher_result.is_err() {
272            log::warn!("CodeGraph notify watcher failed to start: {}", watcher_result.err().unwrap());
273            release_watcher_lock(&project_path);
274            return;
275        }
276        let _watcher = watcher_result.unwrap();
277
278        // Load ignore matcher
279        let ignore_matcher = IgnoreMatcher::load(&project_path);
280
281        // Track sync state
282        let syncing = Arc::new(AtomicBool::new(false));
283        let syncing_clone = syncing.clone();
284        let changed_files = Arc::new(RwLock::new(std::collections::HashSet::<PathBuf>::new()));
285        let last_change = Arc::new(std::sync::Mutex::new(Instant::now()));
286
287        // Debounce settings
288        let debounce_delay = Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS);
289        let git_poll_interval = Duration::from_secs(GIT_STATUS_POLL_INTERVAL_SECS);
290
291        // Start Git monitoring if in Git environment
292        let git_monitoring = if env_type == CodeGraphEnv::Git {
293            if start_git_fsmonitor(&project_path) {
294                log::info!("CodeGraph: Git fsmonitor daemon started");
295                true
296            } else if is_git_fsmonitor_running(&project_path) {
297                log::info!("CodeGraph: Git fsmonitor daemon already running");
298                true
299            } else {
300                log::info!("CodeGraph: Git fsmonitor not available, using git status polling");
301                false
302            }
303        } else {
304            false
305        };
306
307        log::info!(
308            "CodeGraph watcher started (Git monitoring: {}, notify fallback: always)",
309            git_monitoring
310        );
311
312        let check_interval = Duration::from_secs(1);
313
314        loop {
315            if cancel_token.is_cancelled() {
316                // Final sync before exit
317                let pending_count = changed_files.read().await.len();
318                if pending_count > 0 {
319                    log::info!(
320                        "CodeGraph: final sync before exit ({} unique files)",
321                        pending_count
322                    );
323                    let manager = CodeGraphManager::new(&project_path);
324                    if manager.is_initialized() {
325                        let _ = manager.sync().await;
326                        update_version_after_sync(&project_path);
327                    }
328                }
329                release_watcher_lock(&project_path);
330                log::info!("CodeGraph watcher stopped");
331                break;
332            }
333
334            // Update heartbeat
335            update_watcher_heartbeat(&project_path);
336
337            tokio::select! {
338                // Notify file changes
339                Some(path) = change_rx.recv() => {
340                    if cancel_token.is_cancelled() {
341                        break;
342                    }
343                    if is_source_file(&path)
344                        && !ignore_matcher.should_ignore(&path, &project_path) {
345                        {
346                            let mut files = changed_files.write().await;
347                            if files.insert(path.clone()) {
348                                *last_change.lock().unwrap() = Instant::now();
349                                log::debug!(
350                                    "CodeGraph [notify]: new file {} (total unique: {})",
351                                    path.display(),
352                                    files.len()
353                                );
354                            }
355                        }
356                    }
357                }
358
359                // Git status polling
360                _ = sleep(git_poll_interval), if git_monitoring => {
361                    if cancel_token.is_cancelled() {
362                        break;
363                    }
364                    let changes = get_git_status_changes(&project_path);
365                    if changes.has_changes() {
366                        let mut new_count = 0;
367                        {
368                            let mut files = changed_files.write().await;
369                            for path in changes.modified.iter().chain(&changes.added).chain(&changes.deleted) {
370                                if files.insert(path.clone()) {
371                                    new_count += 1;
372                                }
373                            }
374                            if new_count > 0 {
375                                log::debug!(
376                                    "CodeGraph [git]: {} new changes (total unique: {})",
377                                    new_count,
378                                    files.len()
379                                );
380                            }
381                        }
382                        if new_count > 0 {
383                            *last_change.lock().unwrap() = Instant::now();
384                        }
385                    }
386                }
387
388                // Periodic sync check
389                _ = sleep(check_interval) => {
390                    if cancel_token.is_cancelled() {
391                        break;
392                    }
393
394                    let files_count = changed_files.read().await.len();
395                    let elapsed = last_change.lock().unwrap().elapsed();
396
397                    if !syncing_clone.load(Ordering::SeqCst)
398                        && files_count > 0
399                        && elapsed >= debounce_delay {
400                        syncing_clone.store(true, Ordering::SeqCst);
401                        log::info!("CodeGraph: auto-sync triggered ({} unique files changed)", files_count);
402
403                        // Check if MCP daemon is active before syncing
404                        if check_mcp_daemon_active(&project_path) {
405                            log::info!("CodeGraph: MCP daemon active, skipping our sync to avoid conflict");
406                            syncing_clone.store(false, Ordering::SeqCst);
407                        } else {
408                            let our_timestamp = try_acquire_sync_lock(&project_path);
409                            if our_timestamp > 0 {
410                                let manager = CodeGraphManager::new(&project_path);
411                                if manager.is_initialized() {
412                                    if let Err(e) = manager.sync().await {
413                                        log::warn!("CodeGraph sync failed: {}", e);
414                                    } else {
415                                        // Check if lock still belongs to us before updating
416                                        if check_sync_lock_owner(&project_path, our_timestamp) {
417                                            update_version_after_sync(&project_path);
418                                            changed_files.write().await.clear();
419                                            log::debug!("CodeGraph: sync completed, lock verified");
420                                        } else {
421                                            // Lock was stolen by another process, abandon this sync
422                                            log::info!("CodeGraph: sync abandoned, another process took over");
423                                            // Don't clear changed_files, let next sync handle them
424                                        }
425                                    }
426                                }
427                                release_sync_lock(&project_path);
428                            } else {
429                                log::debug!("CodeGraph: skipping sync, another instance is syncing");
430                            }
431                            syncing_clone.store(false, Ordering::SeqCst);
432                        }
433                    }
434                }
435            }
436        }
437    }
438
439    /// Create the underlying file watcher with optimized config.
440    fn create_file_watcher(
441        project_path: &Path,
442        change_tx: mpsc::Sender<PathBuf>,
443    ) -> Result<RecommendedWatcher> {
444        let tx = change_tx.clone();
445
446        let handler = move |event: Result<Event, notify::Error>| {
447            if let Ok(event) = event {
448                if !event.kind.is_access() && !event.kind.is_other() {
449                    for path in event.paths {
450                        let _ = tx.try_send(path);
451                    }
452                }
453            }
454        };
455
456        let config = Config::default()
457            .with_poll_interval(Duration::from_secs(2))
458            .with_compare_contents(false);
459
460        let mut watcher = RecommendedWatcher::new(handler, config)?;
461        watcher.watch(project_path, RecursiveMode::Recursive)?;
462
463        Ok(watcher)
464    }
465}