Skip to main content

matrixcode_core/tools/codegraph/
watcher.rs

1//! File watcher for automatic index synchronization.
2
3use anyhow::Result;
4use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::time::{Duration, Instant};
9use tokio::sync::{broadcast, mpsc, RwLock};
10use tokio::time::sleep;
11
12use super::manager::CodeGraphManager;
13use super::git::{
14    is_git_repository, get_git_status_changes, is_git_fsmonitor_running, start_git_fsmonitor,
15    has_version_changed, update_version_after_sync,
16    try_acquire_watcher_lock, release_watcher_lock,
17    try_acquire_sync_lock, release_sync_lock, check_sync_lock_owner,
18    update_watcher_heartbeat,
19    is_source_file,
20};
21use super::ignore::IgnoreMatcher;
22use super::install::get_codegraph_path;
23use super::project::find_project_root;
24use super::types::CodeGraphEnv;
25use crate::constants::CODEGRAPH_SYNC_INTERVAL_SECS;
26use crate::memory::ProjectStructureAnalyzer;
27use crate::cancel::CancellationToken;
28
29/// Git status polling interval (for non-fsmonitor fallback).
30const GIT_STATUS_POLL_INTERVAL_SECS: u64 = 2;
31
32/// CodeGraph file watcher for auto-sync.
33pub struct CodeGraphWatcher {
34    project_path: PathBuf,
35    stop_tx: broadcast::Sender<()>,
36    sync_interval: Duration,
37}
38
39impl CodeGraphWatcher {
40    /// Create a new watcher for the project.
41    pub fn new(project_path: &Path) -> Self {
42        let (stop_tx, _) = broadcast::channel(1);
43        Self {
44            project_path: project_path.to_path_buf(),
45            stop_tx,
46            sync_interval: Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS),
47        }
48    }
49
50    /// Create watcher with automatic project root detection.
51    pub fn with_auto_detect(start_path: &Path) -> Self {
52        let project_path = find_project_root(start_path);
53        log::info!("CodeGraph: detected project root at {}", project_path.display());
54        Self::new(&project_path)
55    }
56
57    /// Start watching for file changes.
58    pub fn start(&self, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
59        let project_path = self.project_path.clone();
60        let sync_interval = self.sync_interval;
61
62        tokio::spawn(async move {
63            Self::run_watcher_loop(project_path, sync_interval, cancel_token).await;
64        })
65    }
66
67    /// Stop the watcher via broadcast signal.
68    pub fn stop(&self) {
69        let _ = self.stop_tx.send(());
70    }
71
72    /// Run the watcher loop with dual-path monitoring.
73    async fn run_watcher_loop(
74        project_path: PathBuf,
75        _sync_interval: Duration,
76        cancel_token: CancellationToken,
77    ) {
78        // Check if CodeGraph CLI is available (no auto-install)
79        if get_codegraph_path().is_none() {
80            log::warn!("CodeGraph CLI not found, watcher disabled. Please install CodeGraph manually.");
81            return;
82        }
83
84        // Try to acquire watcher lock (prevent multiple instances)
85        if !try_acquire_watcher_lock(&project_path) {
86            log::info!("CodeGraph: another instance is watching this project, exiting");
87            return;
88        }
89
90        // Check if this is a code project
91        let analyzer = ProjectStructureAnalyzer::new(project_path.clone());
92        if analyzer.detect_project_type().is_none() {
93            log::info!(
94                "CodeGraph: skipping non-code directory: {}",
95                project_path.display()
96            );
97            return;
98        }
99
100        // Check if CodeGraph is initialized - DO NOT auto-initialize
101        let manager = CodeGraphManager::new(&project_path);
102        if !manager.is_initialized() {
103            log::info!(
104                "CodeGraph: not initialized for {}, skipping watcher. Run 'codegraph init -i' to create index.",
105                project_path.display()
106            );
107            release_watcher_lock(&project_path);
108            return;
109        }
110
111        // Detect environment type
112        let env_type = if is_git_repository(&project_path) {
113            CodeGraphEnv::Git
114        } else {
115            CodeGraphEnv::NonGit
116        };
117        
118        log::info!(
119            "CodeGraph: environment detected as {} for: {}",
120            match env_type {
121                CodeGraphEnv::Git => "Git repository",
122                CodeGraphEnv::NonGit => "non-Git directory",
123            },
124            project_path.display()
125        );
126
127        // Check version consistency before starting
128        if env_type == CodeGraphEnv::Git && has_version_changed(&project_path) {
129            log::info!("CodeGraph: version changed, performing sync before starting watcher");
130            if let Err(e) = manager.sync().await {
131                log::warn!("CodeGraph version sync failed: {}", e);
132            }
133            update_version_after_sync(&project_path);
134        }
135
136        // Initial sync on startup
137        log::info!("CodeGraph: performing initial sync on startup");
138        if let Err(e) = manager.sync().await {
139            log::warn!("CodeGraph initial sync failed: {}", e);
140        }
141        update_version_after_sync(&project_path);
142
143        // Channel for file change events
144        let (change_tx, mut change_rx) = mpsc::channel::<PathBuf>(100);
145
146        // Create notify file watcher
147        let watcher_result = Self::create_file_watcher(&project_path, change_tx.clone());
148        if watcher_result.is_err() {
149            log::warn!("CodeGraph notify watcher failed to start: {}", watcher_result.err().unwrap());
150            release_watcher_lock(&project_path);
151            return;
152        }
153        let _watcher = watcher_result.unwrap();
154
155        // Load ignore matcher
156        let ignore_matcher = IgnoreMatcher::load(&project_path);
157
158        // Track sync state
159        let syncing = Arc::new(AtomicBool::new(false));
160        let syncing_clone = syncing.clone();
161        let changed_files = Arc::new(RwLock::new(std::collections::HashSet::<PathBuf>::new()));
162        let last_change = Arc::new(std::sync::Mutex::new(Instant::now()));
163
164        // Debounce settings
165        let debounce_delay = Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS);
166        let git_poll_interval = Duration::from_secs(GIT_STATUS_POLL_INTERVAL_SECS);
167
168        // Start Git monitoring if in Git environment
169        let git_monitoring = if env_type == CodeGraphEnv::Git {
170            if start_git_fsmonitor(&project_path) {
171                log::info!("CodeGraph: Git fsmonitor daemon started");
172                true
173            } else if is_git_fsmonitor_running(&project_path) {
174                log::info!("CodeGraph: Git fsmonitor daemon already running");
175                true
176            } else {
177                log::info!("CodeGraph: Git fsmonitor not available, using git status polling");
178                false
179            }
180        } else {
181            false
182        };
183
184        log::info!(
185            "CodeGraph watcher started (Git monitoring: {}, notify fallback: always)",
186            git_monitoring
187        );
188
189        let check_interval = Duration::from_secs(1);
190
191        loop {
192            if cancel_token.is_cancelled() {
193                // Final sync before exit
194                let pending_count = changed_files.read().await.len();
195                if pending_count > 0 {
196                    log::info!(
197                        "CodeGraph: final sync before exit ({} unique files)",
198                        pending_count
199                    );
200                    let manager = CodeGraphManager::new(&project_path);
201                    if manager.is_initialized() {
202                        let _ = manager.sync().await;
203                        update_version_after_sync(&project_path);
204                    }
205                }
206                release_watcher_lock(&project_path);
207                log::info!("CodeGraph watcher stopped");
208                break;
209            }
210
211            // Update heartbeat
212            update_watcher_heartbeat(&project_path);
213
214            tokio::select! {
215                // Notify file changes
216                Some(path) = change_rx.recv() => {
217                    if cancel_token.is_cancelled() {
218                        break;
219                    }
220                    if is_source_file(&path)
221                        && !ignore_matcher.should_ignore(&path, &project_path) {
222                        {
223                            let mut files = changed_files.write().await;
224                            if files.insert(path.clone()) {
225                                *last_change.lock().unwrap() = Instant::now();
226                                log::debug!(
227                                    "CodeGraph [notify]: new file {} (total unique: {})",
228                                    path.display(),
229                                    files.len()
230                                );
231                            }
232                        }
233                    }
234                }
235
236                // Git status polling
237                _ = sleep(git_poll_interval), if git_monitoring => {
238                    if cancel_token.is_cancelled() {
239                        break;
240                    }
241                    let changes = get_git_status_changes(&project_path);
242                    if changes.has_changes() {
243                        let mut new_count = 0;
244                        {
245                            let mut files = changed_files.write().await;
246                            for path in changes.modified.iter().chain(&changes.added).chain(&changes.deleted) {
247                                if files.insert(path.clone()) {
248                                    new_count += 1;
249                                }
250                            }
251                            if new_count > 0 {
252                                log::debug!(
253                                    "CodeGraph [git]: {} new changes (total unique: {})",
254                                    new_count,
255                                    files.len()
256                                );
257                            }
258                        }
259                        if new_count > 0 {
260                            *last_change.lock().unwrap() = Instant::now();
261                        }
262                    }
263                }
264
265                // Periodic sync check
266                _ = sleep(check_interval) => {
267                    if cancel_token.is_cancelled() {
268                        break;
269                    }
270
271                    let files_count = changed_files.read().await.len();
272                    let elapsed = last_change.lock().unwrap().elapsed();
273
274                    if !syncing_clone.load(Ordering::SeqCst)
275                        && files_count > 0
276                        && elapsed >= debounce_delay {
277                        syncing_clone.store(true, Ordering::SeqCst);
278                        log::info!("CodeGraph: auto-sync triggered ({} unique files changed)", files_count);
279
280                        let our_timestamp = try_acquire_sync_lock(&project_path);
281                        if our_timestamp > 0 {
282                            let manager = CodeGraphManager::new(&project_path);
283                            if manager.is_initialized() {
284                                if let Err(e) = manager.sync().await {
285                                    log::warn!("CodeGraph sync failed: {}", e);
286                                } else {
287                                    // Check if lock still belongs to us before updating
288                                    if check_sync_lock_owner(&project_path, our_timestamp) {
289                                        update_version_after_sync(&project_path);
290                                        changed_files.write().await.clear();
291                                        log::debug!("CodeGraph: sync completed, lock verified");
292                                    } else {
293                                        // Lock was stolen by another process, abandon this sync
294                                        log::info!("CodeGraph: sync abandoned, another process took over");
295                                        // Don't clear changed_files, let next sync handle them
296                                    }
297                                }
298                            }
299                            release_sync_lock(&project_path);
300                        } else {
301                            log::debug!("CodeGraph: skipping sync, another instance is syncing");
302                        }
303                        syncing_clone.store(false, Ordering::SeqCst);
304                    }
305                }
306            }
307        }
308    }
309
310    /// Create the underlying file watcher with optimized config.
311    fn create_file_watcher(
312        project_path: &Path,
313        change_tx: mpsc::Sender<PathBuf>,
314    ) -> Result<RecommendedWatcher> {
315        let tx = change_tx.clone();
316
317        let handler = move |event: Result<Event, notify::Error>| {
318            if let Ok(event) = event {
319                if !event.kind.is_access() && !event.kind.is_other() {
320                    for path in event.paths {
321                        let _ = tx.try_send(path);
322                    }
323                }
324            }
325        };
326
327        let config = Config::default()
328            .with_poll_interval(Duration::from_secs(2))
329            .with_compare_contents(false);
330
331        let mut watcher = RecommendedWatcher::new(handler, config)?;
332        watcher.watch(project_path, RecursiveMode::Recursive)?;
333
334        Ok(watcher)
335    }
336}