matrixcode_core/tools/codegraph/
watcher.rs1use anyhow::Result;
4use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10use tokio::sync::{broadcast, mpsc, RwLock};
11use tokio::time::sleep;
12
13use super::manager::CodeGraphManager;
14use super::git::{
15 is_git_repository, get_git_status_changes, is_git_fsmonitor_running, start_git_fsmonitor,
16 has_version_changed, update_version_after_sync,
17 try_acquire_watcher_lock, release_watcher_lock,
18 try_acquire_sync_lock, release_sync_lock, check_sync_lock_owner,
19 update_watcher_heartbeat,
20 is_source_file,
21 check_mcp_daemon_active,
22};
23use super::ignore::IgnoreMatcher;
24use super::install::get_codegraph_path;
25use super::project::find_project_root;
26use super::types::CodeGraphEnv;
27use crate::constants::CODEGRAPH_SYNC_INTERVAL_SECS;
28use crate::memory::ProjectStructureAnalyzer;
29use crate::cancel::CancellationToken;
30
31const GIT_STATUS_POLL_INTERVAL_SECS: u64 = 2;
33
34#[derive(Clone)]
38pub struct WatcherHandle {
39 handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
40 project_path: PathBuf,
41}
42
43impl WatcherHandle {
44 pub fn new(project_path: &Path) -> Self {
46 Self {
47 handle: Arc::new(Mutex::new(None)),
48 project_path: project_path.to_path_buf(),
49 }
50 }
51
52 pub fn with_auto_detect(start_path: &Path) -> Self {
54 let project_path = find_project_root(start_path);
55 log::info!("CodeGraph: detected project root at {}", project_path.display());
56 Self::new(&project_path)
57 }
58
59 pub fn is_running(&self) -> bool {
61 let guard = self.handle.lock().unwrap();
62 guard.as_ref().map(|h| !h.is_finished()).unwrap_or(false)
63 }
64
65 pub fn start_if_needed(&self, cancel_token: CancellationToken) -> bool {
68 if self.is_running() {
69 log::info!("CodeGraph watcher already running");
70 return false;
71 }
72
73 if CodeGraphWatcher::is_daemon_running(&self.project_path) {
74 log::info!("CodeGraph MCP daemon detected, skipping watcher to avoid conflict");
75 return false;
76 }
77
78 let watcher = CodeGraphWatcher::new(&self.project_path);
79 let handle = watcher.start(cancel_token);
80 log::info!("CodeGraph watcher started (no MCP daemon detected)");
81
82 *self.handle.lock().unwrap() = Some(handle);
83 true
84 }
85
86 pub fn stop(&self) {
88 let guard = self.handle.lock().unwrap();
89 if let Some(ref h) = *guard
90 && !h.is_finished() {
91 log::info!("Aborting CodeGraph watcher...");
92 h.abort();
93 }
94 }
95
96 pub fn inner(&self) -> Arc<Mutex<Option<tokio::task::JoinHandle<()>>>> {
98 self.handle.clone()
99 }
100
101 pub fn project_path(&self) -> &Path {
103 &self.project_path
104 }
105}
106
107pub struct CodeGraphWatcher {
109 project_path: PathBuf,
110 stop_tx: broadcast::Sender<()>,
111 sync_interval: Duration,
112}
113
114impl CodeGraphWatcher {
115 pub fn is_daemon_running(project_path: &Path) -> bool {
118 let daemon_pid_path = project_path.join(".codegraph").join("daemon.pid");
120 if daemon_pid_path.exists() {
121 let pid_running = std::fs::read_to_string(&daemon_pid_path)
122 .ok()
123 .and_then(|pid| pid.trim().parse::<u32>().ok())
124 .map(|pid| {
125 #[cfg(target_os = "windows")]
126 {
127 use std::os::windows::process::CommandExt;
128 const CREATE_NO_WINDOW: u32 = 0x08000000;
129 std::process::Command::new("tasklist")
130 .args(["/FI", &format!("PID eq {}", pid)])
131 .creation_flags(CREATE_NO_WINDOW)
132 .output()
133 .map(|o| String::from_utf8_lossy(&o.stdout).contains(&pid.to_string()))
134 .unwrap_or(false)
135 }
136 #[cfg(not(target_os = "windows"))]
137 std::path::Path::new("/proc").join(pid.to_string()).exists()
138 })
139 .unwrap_or(false);
140 if pid_running {
141 return true;
142 }
143 }
144
145 let daemon_log_path = project_path.join(".codegraph").join("daemon.log");
147 if daemon_log_path.exists() {
148 if let Ok(metadata) = std::fs::metadata(&daemon_log_path) {
149 if let Ok(modified) = metadata.modified() {
150 let now = std::time::SystemTime::now();
151 let elapsed = now.duration_since(modified).unwrap_or(std::time::Duration::MAX);
152 if elapsed < std::time::Duration::from_secs(60) {
153 log::info!("CodeGraph: daemon.log recently modified, daemon likely active");
154 return true;
155 }
156 }
157 }
158 }
159
160 false
161 }
162
163 pub fn new(project_path: &Path) -> Self {
165 let (stop_tx, _) = broadcast::channel(1);
166 Self {
167 project_path: project_path.to_path_buf(),
168 stop_tx,
169 sync_interval: Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS),
170 }
171 }
172
173 pub fn with_auto_detect(start_path: &Path) -> Self {
175 let project_path = find_project_root(start_path);
176 log::info!("CodeGraph: detected project root at {}", project_path.display());
177 Self::new(&project_path)
178 }
179
180 pub fn start(&self, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
182 let project_path = self.project_path.clone();
183 let sync_interval = self.sync_interval;
184
185 tokio::spawn(async move {
186 Self::run_watcher_loop(project_path, sync_interval, cancel_token).await;
187 })
188 }
189
190 pub fn stop(&self) {
192 let _ = self.stop_tx.send(());
193 }
194
195 async fn run_watcher_loop(
197 project_path: PathBuf,
198 _sync_interval: Duration,
199 cancel_token: CancellationToken,
200 ) {
201 if get_codegraph_path().is_none() {
203 log::warn!("CodeGraph CLI not found, watcher disabled. Please install CodeGraph manually.");
204 return;
205 }
206
207 if !try_acquire_watcher_lock(&project_path) {
209 log::info!("CodeGraph: another instance is watching this project, exiting");
210 return;
211 }
212
213 let analyzer = ProjectStructureAnalyzer::new(project_path.clone());
215 if analyzer.detect_project_type().is_none() {
216 log::info!(
217 "CodeGraph: skipping non-code directory: {}",
218 project_path.display()
219 );
220 return;
221 }
222
223 let manager = CodeGraphManager::new(&project_path);
225 if !manager.is_initialized() {
226 log::info!(
227 "CodeGraph: not initialized for {}, skipping watcher. Run 'codegraph init -i' to create index.",
228 project_path.display()
229 );
230 release_watcher_lock(&project_path);
231 return;
232 }
233
234 let env_type = if is_git_repository(&project_path) {
236 CodeGraphEnv::Git
237 } else {
238 CodeGraphEnv::NonGit
239 };
240
241 log::info!(
242 "CodeGraph: environment detected as {} for: {}",
243 match env_type {
244 CodeGraphEnv::Git => "Git repository",
245 CodeGraphEnv::NonGit => "non-Git directory",
246 },
247 project_path.display()
248 );
249
250 if env_type == CodeGraphEnv::Git && has_version_changed(&project_path) {
252 log::info!("CodeGraph: version changed, performing sync before starting watcher");
253 if let Err(e) = manager.sync().await {
254 log::warn!("CodeGraph version sync failed: {}", e);
255 }
256 update_version_after_sync(&project_path);
257 }
258
259 log::info!("CodeGraph: performing initial sync on startup");
261 if let Err(e) = manager.sync().await {
262 log::warn!("CodeGraph initial sync failed: {}", e);
263 }
264 update_version_after_sync(&project_path);
265
266 let (change_tx, mut change_rx) = mpsc::channel::<PathBuf>(100);
268
269 let watcher_result = Self::create_file_watcher(&project_path, change_tx.clone());
271 if watcher_result.is_err() {
272 log::warn!("CodeGraph notify watcher failed to start: {}", watcher_result.err().unwrap());
273 release_watcher_lock(&project_path);
274 return;
275 }
276 let _watcher = watcher_result.unwrap();
277
278 let ignore_matcher = IgnoreMatcher::load(&project_path);
280
281 let syncing = Arc::new(AtomicBool::new(false));
283 let syncing_clone = syncing.clone();
284 let changed_files = Arc::new(RwLock::new(std::collections::HashSet::<PathBuf>::new()));
285 let last_change = Arc::new(std::sync::Mutex::new(Instant::now()));
286
287 let debounce_delay = Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS);
289 let git_poll_interval = Duration::from_secs(GIT_STATUS_POLL_INTERVAL_SECS);
290
291 let git_monitoring = if env_type == CodeGraphEnv::Git {
293 if start_git_fsmonitor(&project_path) {
294 log::info!("CodeGraph: Git fsmonitor daemon started");
295 true
296 } else if is_git_fsmonitor_running(&project_path) {
297 log::info!("CodeGraph: Git fsmonitor daemon already running");
298 true
299 } else {
300 log::info!("CodeGraph: Git fsmonitor not available, using git status polling");
301 false
302 }
303 } else {
304 false
305 };
306
307 log::info!(
308 "CodeGraph watcher started (Git monitoring: {}, notify fallback: always)",
309 git_monitoring
310 );
311
312 let check_interval = Duration::from_secs(1);
313
314 loop {
315 if cancel_token.is_cancelled() {
316 let pending_count = changed_files.read().await.len();
318 if pending_count > 0 {
319 log::info!(
320 "CodeGraph: final sync before exit ({} unique files)",
321 pending_count
322 );
323 let manager = CodeGraphManager::new(&project_path);
324 if manager.is_initialized() {
325 let _ = manager.sync().await;
326 update_version_after_sync(&project_path);
327 }
328 }
329 release_watcher_lock(&project_path);
330 log::info!("CodeGraph watcher stopped");
331 break;
332 }
333
334 update_watcher_heartbeat(&project_path);
336
337 tokio::select! {
338 Some(path) = change_rx.recv() => {
340 if cancel_token.is_cancelled() {
341 break;
342 }
343 if is_source_file(&path)
344 && !ignore_matcher.should_ignore(&path, &project_path) {
345 {
346 let mut files = changed_files.write().await;
347 if files.insert(path.clone()) {
348 *last_change.lock().unwrap() = Instant::now();
349 log::debug!(
350 "CodeGraph [notify]: new file {} (total unique: {})",
351 path.display(),
352 files.len()
353 );
354 }
355 }
356 }
357 }
358
359 _ = sleep(git_poll_interval), if git_monitoring => {
361 if cancel_token.is_cancelled() {
362 break;
363 }
364 let changes = get_git_status_changes(&project_path);
365 if changes.has_changes() {
366 let mut new_count = 0;
367 {
368 let mut files = changed_files.write().await;
369 for path in changes.modified.iter().chain(&changes.added).chain(&changes.deleted) {
370 if files.insert(path.clone()) {
371 new_count += 1;
372 }
373 }
374 if new_count > 0 {
375 log::debug!(
376 "CodeGraph [git]: {} new changes (total unique: {})",
377 new_count,
378 files.len()
379 );
380 }
381 }
382 if new_count > 0 {
383 *last_change.lock().unwrap() = Instant::now();
384 }
385 }
386 }
387
388 _ = sleep(check_interval) => {
390 if cancel_token.is_cancelled() {
391 break;
392 }
393
394 let files_count = changed_files.read().await.len();
395 let elapsed = last_change.lock().unwrap().elapsed();
396
397 if !syncing_clone.load(Ordering::SeqCst)
398 && files_count > 0
399 && elapsed >= debounce_delay {
400 syncing_clone.store(true, Ordering::SeqCst);
401 log::info!("CodeGraph: auto-sync triggered ({} unique files changed)", files_count);
402
403 if check_mcp_daemon_active(&project_path) {
405 log::info!("CodeGraph: MCP daemon active, skipping our sync to avoid conflict");
406 syncing_clone.store(false, Ordering::SeqCst);
407 } else {
408 let our_timestamp = try_acquire_sync_lock(&project_path);
409 if our_timestamp > 0 {
410 let manager = CodeGraphManager::new(&project_path);
411 if manager.is_initialized() {
412 if let Err(e) = manager.sync().await {
413 log::warn!("CodeGraph sync failed: {}", e);
414 } else {
415 if check_sync_lock_owner(&project_path, our_timestamp) {
417 update_version_after_sync(&project_path);
418 changed_files.write().await.clear();
419 log::debug!("CodeGraph: sync completed, lock verified");
420 } else {
421 log::info!("CodeGraph: sync abandoned, another process took over");
423 }
425 }
426 }
427 release_sync_lock(&project_path);
428 } else {
429 log::debug!("CodeGraph: skipping sync, another instance is syncing");
430 }
431 syncing_clone.store(false, Ordering::SeqCst);
432 }
433 }
434 }
435 }
436 }
437 }
438
439 fn create_file_watcher(
441 project_path: &Path,
442 change_tx: mpsc::Sender<PathBuf>,
443 ) -> Result<RecommendedWatcher> {
444 let tx = change_tx.clone();
445
446 let handler = move |event: Result<Event, notify::Error>| {
447 if let Ok(event) = event {
448 if !event.kind.is_access() && !event.kind.is_other() {
449 for path in event.paths {
450 let _ = tx.try_send(path);
451 }
452 }
453 }
454 };
455
456 let config = Config::default()
457 .with_poll_interval(Duration::from_secs(2))
458 .with_compare_contents(false);
459
460 let mut watcher = RecommendedWatcher::new(handler, config)?;
461 watcher.watch(project_path, RecursiveMode::Recursive)?;
462
463 Ok(watcher)
464 }
465}