matrixcode_core/tools/codegraph/
watcher.rs1use anyhow::Result;
4use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::sync::Mutex;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::{Duration, Instant};
10use tokio::sync::{RwLock, broadcast, mpsc};
11use tokio::time::sleep;
12
13use super::git::{
14 check_mcp_daemon_active, check_sync_lock_owner, get_git_status_changes, has_version_changed,
15 is_git_fsmonitor_running, is_git_repository, is_source_file, release_sync_lock,
16 release_watcher_lock, start_git_fsmonitor, try_acquire_sync_lock, try_acquire_watcher_lock,
17 update_version_after_sync, update_watcher_heartbeat,
18};
19use super::ignore::IgnoreMatcher;
20use super::install::get_codegraph_path;
21use super::manager::CodeGraphManager;
22use super::project::find_project_root;
23use super::types::CodeGraphEnv;
24use crate::cancel::CancellationToken;
25use crate::constants::CODEGRAPH_SYNC_INTERVAL_SECS;
26use crate::memory::ProjectStructureAnalyzer;
27
28const GIT_STATUS_POLL_INTERVAL_SECS: u64 = 2;
30
31#[derive(Clone)]
35pub struct WatcherHandle {
36 handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
37 project_path: PathBuf,
38}
39
40impl WatcherHandle {
41 pub fn new(project_path: &Path) -> Self {
43 Self {
44 handle: Arc::new(Mutex::new(None)),
45 project_path: project_path.to_path_buf(),
46 }
47 }
48
49 pub fn with_auto_detect(start_path: &Path) -> Self {
51 let project_path = find_project_root(start_path);
52 log::info!(
53 "CodeGraph: detected project root at {}",
54 project_path.display()
55 );
56 Self::new(&project_path)
57 }
58
59 pub fn is_running(&self) -> bool {
61 let guard = self.handle.lock().unwrap();
62 guard.as_ref().map(|h| !h.is_finished()).unwrap_or(false)
63 }
64
65 pub fn start_if_needed(&self, cancel_token: CancellationToken) -> bool {
68 if self.is_running() {
69 log::info!("CodeGraph watcher already running");
70 return false;
71 }
72
73 if CodeGraphWatcher::is_daemon_running(&self.project_path) {
74 log::info!("CodeGraph MCP daemon detected, skipping watcher to avoid conflict");
75 return false;
76 }
77
78 let watcher = CodeGraphWatcher::new(&self.project_path);
79 let handle = watcher.start(cancel_token);
80 log::info!("CodeGraph watcher started (no MCP daemon detected)");
81
82 *self.handle.lock().unwrap() = Some(handle);
83 true
84 }
85
86 pub fn stop(&self) {
88 let guard = self.handle.lock().unwrap();
89 if let Some(ref h) = *guard
90 && !h.is_finished()
91 {
92 log::info!("Aborting CodeGraph watcher...");
93 h.abort();
94 }
95 }
96
97 pub fn inner(&self) -> Arc<Mutex<Option<tokio::task::JoinHandle<()>>>> {
99 self.handle.clone()
100 }
101
102 pub fn project_path(&self) -> &Path {
104 &self.project_path
105 }
106}
107
108pub struct CodeGraphWatcher {
110 project_path: PathBuf,
111 stop_tx: broadcast::Sender<()>,
112 sync_interval: Duration,
113}
114
115impl CodeGraphWatcher {
116 pub fn is_daemon_running(project_path: &Path) -> bool {
119 let daemon_pid_path = project_path.join(".codegraph").join("daemon.pid");
121 if daemon_pid_path.exists() {
122 let pid_running = std::fs::read_to_string(&daemon_pid_path)
123 .ok()
124 .and_then(|pid| pid.trim().parse::<u32>().ok())
125 .map(|pid| {
126 #[cfg(target_os = "windows")]
127 {
128 use std::os::windows::process::CommandExt;
129 const CREATE_NO_WINDOW: u32 = 0x08000000;
130 std::process::Command::new("tasklist")
131 .args(["/FI", &format!("PID eq {}", pid)])
132 .creation_flags(CREATE_NO_WINDOW)
133 .output()
134 .map(|o| String::from_utf8_lossy(&o.stdout).contains(&pid.to_string()))
135 .unwrap_or(false)
136 }
137 #[cfg(not(target_os = "windows"))]
138 std::path::Path::new("/proc").join(pid.to_string()).exists()
139 })
140 .unwrap_or(false);
141 if pid_running {
142 return true;
143 }
144 }
145
146 let daemon_log_path = project_path.join(".codegraph").join("daemon.log");
148 if daemon_log_path.exists() {
149 if let Ok(metadata) = std::fs::metadata(&daemon_log_path) {
150 if let Ok(modified) = metadata.modified() {
151 let now = std::time::SystemTime::now();
152 let elapsed = now
153 .duration_since(modified)
154 .unwrap_or(std::time::Duration::MAX);
155 if elapsed < std::time::Duration::from_secs(60) {
156 log::info!("CodeGraph: daemon.log recently modified, daemon likely active");
157 return true;
158 }
159 }
160 }
161 }
162
163 false
164 }
165
166 pub fn new(project_path: &Path) -> Self {
168 let (stop_tx, _) = broadcast::channel(1);
169 Self {
170 project_path: project_path.to_path_buf(),
171 stop_tx,
172 sync_interval: Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS),
173 }
174 }
175
176 pub fn with_auto_detect(start_path: &Path) -> Self {
178 let project_path = find_project_root(start_path);
179 log::info!(
180 "CodeGraph: detected project root at {}",
181 project_path.display()
182 );
183 Self::new(&project_path)
184 }
185
186 pub fn start(&self, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
188 let project_path = self.project_path.clone();
189 let sync_interval = self.sync_interval;
190
191 tokio::spawn(async move {
192 Self::run_watcher_loop(project_path, sync_interval, cancel_token).await;
193 })
194 }
195
196 pub fn stop(&self) {
198 let _ = self.stop_tx.send(());
199 }
200
201 async fn run_watcher_loop(
203 project_path: PathBuf,
204 _sync_interval: Duration,
205 cancel_token: CancellationToken,
206 ) {
207 if get_codegraph_path().is_none() {
209 log::warn!(
210 "CodeGraph CLI not found, watcher disabled. Please install CodeGraph manually."
211 );
212 return;
213 }
214
215 if !try_acquire_watcher_lock(&project_path) {
217 log::info!("CodeGraph: another instance is watching this project, exiting");
218 return;
219 }
220
221 let analyzer = ProjectStructureAnalyzer::new(project_path.clone());
223 if analyzer.detect_project_type().is_none() {
224 log::info!(
225 "CodeGraph: skipping non-code directory: {}",
226 project_path.display()
227 );
228 return;
229 }
230
231 let manager = CodeGraphManager::new(&project_path);
233 if !manager.is_initialized() {
234 log::info!(
235 "CodeGraph: not initialized for {}, skipping watcher. Run 'codegraph init -i' to create index.",
236 project_path.display()
237 );
238 release_watcher_lock(&project_path);
239 return;
240 }
241
242 let env_type = if is_git_repository(&project_path) {
244 CodeGraphEnv::Git
245 } else {
246 CodeGraphEnv::NonGit
247 };
248
249 log::info!(
250 "CodeGraph: environment detected as {} for: {}",
251 match env_type {
252 CodeGraphEnv::Git => "Git repository",
253 CodeGraphEnv::NonGit => "non-Git directory",
254 },
255 project_path.display()
256 );
257
258 if env_type == CodeGraphEnv::Git && has_version_changed(&project_path) {
260 log::info!("CodeGraph: version changed, performing sync before starting watcher");
261 if let Err(e) = manager.sync().await {
262 log::warn!("CodeGraph version sync failed: {}", e);
263 }
264 update_version_after_sync(&project_path);
265 }
266
267 log::info!("CodeGraph: performing initial sync on startup");
269 if let Err(e) = manager.sync().await {
270 log::warn!("CodeGraph initial sync failed: {}", e);
271 }
272 update_version_after_sync(&project_path);
273
274 let (change_tx, mut change_rx) = mpsc::channel::<PathBuf>(100);
276
277 let watcher_result = Self::create_file_watcher(&project_path, change_tx.clone());
279 if watcher_result.is_err() {
280 log::warn!(
281 "CodeGraph notify watcher failed to start: {}",
282 watcher_result.err().unwrap()
283 );
284 release_watcher_lock(&project_path);
285 return;
286 }
287 let _watcher = watcher_result.unwrap();
288
289 let ignore_matcher = IgnoreMatcher::load(&project_path);
291
292 let syncing = Arc::new(AtomicBool::new(false));
294 let syncing_clone = syncing.clone();
295 let changed_files = Arc::new(RwLock::new(std::collections::HashSet::<PathBuf>::new()));
296 let last_change = Arc::new(std::sync::Mutex::new(Instant::now()));
297
298 let debounce_delay = Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS);
300 let git_poll_interval = Duration::from_secs(GIT_STATUS_POLL_INTERVAL_SECS);
301
302 let git_monitoring = if env_type == CodeGraphEnv::Git {
304 if start_git_fsmonitor(&project_path) {
305 log::info!("CodeGraph: Git fsmonitor daemon started");
306 true
307 } else if is_git_fsmonitor_running(&project_path) {
308 log::info!("CodeGraph: Git fsmonitor daemon already running");
309 true
310 } else {
311 log::info!("CodeGraph: Git fsmonitor not available, using git status polling");
312 false
313 }
314 } else {
315 false
316 };
317
318 log::info!(
319 "CodeGraph watcher started (Git monitoring: {}, notify fallback: always)",
320 git_monitoring
321 );
322
323 let check_interval = Duration::from_secs(1);
324
325 loop {
326 if cancel_token.is_cancelled() {
327 let pending_count = changed_files.read().await.len();
329 if pending_count > 0 {
330 log::info!(
331 "CodeGraph: final sync before exit ({} unique files)",
332 pending_count
333 );
334 let manager = CodeGraphManager::new(&project_path);
335 if manager.is_initialized() {
336 let _ = manager.sync().await;
337 update_version_after_sync(&project_path);
338 }
339 }
340 release_watcher_lock(&project_path);
341 log::info!("CodeGraph watcher stopped");
342 break;
343 }
344
345 update_watcher_heartbeat(&project_path);
347
348 tokio::select! {
349 Some(path) = change_rx.recv() => {
351 if cancel_token.is_cancelled() {
352 break;
353 }
354 if is_source_file(&path)
355 && !ignore_matcher.should_ignore(&path, &project_path) {
356 {
357 let mut files = changed_files.write().await;
358 if files.insert(path.clone()) {
359 *last_change.lock().unwrap() = Instant::now();
360 log::debug!(
361 "CodeGraph [notify]: new file {} (total unique: {})",
362 path.display(),
363 files.len()
364 );
365 }
366 }
367 }
368 }
369
370 _ = sleep(git_poll_interval), if git_monitoring => {
372 if cancel_token.is_cancelled() {
373 break;
374 }
375 let changes = get_git_status_changes(&project_path);
376 if changes.has_changes() {
377 let mut new_count = 0;
378 {
379 let mut files = changed_files.write().await;
380 for path in changes.modified.iter().chain(&changes.added).chain(&changes.deleted) {
381 if files.insert(path.clone()) {
382 new_count += 1;
383 }
384 }
385 if new_count > 0 {
386 log::debug!(
387 "CodeGraph [git]: {} new changes (total unique: {})",
388 new_count,
389 files.len()
390 );
391 }
392 }
393 if new_count > 0 {
394 *last_change.lock().unwrap() = Instant::now();
395 }
396 }
397 }
398
399 _ = sleep(check_interval) => {
401 if cancel_token.is_cancelled() {
402 break;
403 }
404
405 let files_count = changed_files.read().await.len();
406 let elapsed = last_change.lock().unwrap().elapsed();
407
408 if !syncing_clone.load(Ordering::SeqCst)
409 && files_count > 0
410 && elapsed >= debounce_delay {
411 syncing_clone.store(true, Ordering::SeqCst);
412 log::info!("CodeGraph: auto-sync triggered ({} unique files changed)", files_count);
413
414 if check_mcp_daemon_active(&project_path) {
416 log::info!("CodeGraph: MCP daemon active, skipping our sync to avoid conflict");
417 syncing_clone.store(false, Ordering::SeqCst);
418 } else {
419 let our_timestamp = try_acquire_sync_lock(&project_path);
420 if our_timestamp > 0 {
421 let manager = CodeGraphManager::new(&project_path);
422 if manager.is_initialized() {
423 if let Err(e) = manager.sync().await {
424 log::warn!("CodeGraph sync failed: {}", e);
425 } else {
426 if check_sync_lock_owner(&project_path, our_timestamp) {
428 update_version_after_sync(&project_path);
429 changed_files.write().await.clear();
430 log::debug!("CodeGraph: sync completed, lock verified");
431 } else {
432 log::info!("CodeGraph: sync abandoned, another process took over");
434 }
436 }
437 }
438 release_sync_lock(&project_path);
439 } else {
440 log::debug!("CodeGraph: skipping sync, another instance is syncing");
441 }
442 syncing_clone.store(false, Ordering::SeqCst);
443 }
444 }
445 }
446 }
447 }
448 }
449
450 fn create_file_watcher(
452 project_path: &Path,
453 change_tx: mpsc::Sender<PathBuf>,
454 ) -> Result<RecommendedWatcher> {
455 let tx = change_tx.clone();
456
457 let handler = move |event: Result<Event, notify::Error>| {
458 if let Ok(event) = event {
459 if !event.kind.is_access() && !event.kind.is_other() {
460 for path in event.paths {
461 let _ = tx.try_send(path);
462 }
463 }
464 }
465 };
466
467 let config = Config::default()
468 .with_poll_interval(Duration::from_secs(2))
469 .with_compare_contents(false);
470
471 let mut watcher = RecommendedWatcher::new(handler, config)?;
472 watcher.watch(project_path, RecursiveMode::Recursive)?;
473
474 Ok(watcher)
475 }
476}