matrixcode_core/tools/codegraph/
watcher.rs1use anyhow::Result;
4use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::sync::Mutex;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::{Duration, Instant};
10use tokio::sync::{RwLock, broadcast, mpsc};
11use tokio::time::sleep;
12
13use super::git::{
14 check_mcp_daemon_active, check_sync_lock_owner, get_git_status_changes, has_version_changed,
15 is_git_fsmonitor_running, is_git_repository, is_source_file, release_sync_lock,
16 release_watcher_lock, start_git_fsmonitor, try_acquire_sync_lock, try_acquire_watcher_lock,
17 update_version_after_sync, update_watcher_heartbeat,
18};
19use super::ignore::IgnoreMatcher;
20use super::install::get_codegraph_path;
21use super::manager::CodeGraphManager;
22use super::project::find_project_root;
23use super::types::{CodeGraphEnv, PendingChanges};
24use crate::cancel::CancellationToken;
25use crate::constants::CODEGRAPH_SYNC_INTERVAL_SECS;
26use crate::debug::debug_log;
27use crate::event::AgentEvent;
28use crate::memory::ProjectStructureAnalyzer;
29
30const GIT_STATUS_POLL_INTERVAL_SECS: u64 = 2;
32
33const STATUS_UPDATE_INTERVAL_SECS: u64 = 5;
35
36#[derive(Clone)]
40pub struct WatcherHandle {
41 handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
42 project_path: PathBuf,
43}
44
45impl WatcherHandle {
46 pub fn new(project_path: &Path) -> Self {
48 Self {
49 handle: Arc::new(Mutex::new(None)),
50 project_path: project_path.to_path_buf(),
51 }
52 }
53
54 pub fn with_auto_detect(start_path: &Path) -> Self {
56 let project_path = find_project_root(start_path);
57 debug_log().log("codegraph", &format!(
58 "detected project root at {}",
59 project_path.display()
60 ));
61 Self::new(&project_path)
62 }
63
64 pub fn is_running(&self) -> bool {
66 let guard = self.handle.lock().unwrap();
67 guard.as_ref().map(|h| !h.is_finished()).unwrap_or(false)
68 }
69
70 pub fn start_if_needed(&self, cancel_token: CancellationToken) -> bool {
73 if self.is_running() {
74 debug_log().log("codegraph", "watcher already running");
75 return false;
76 }
77
78 if CodeGraphWatcher::is_daemon_running(&self.project_path) {
79 debug_log().log("codegraph", "MCP daemon detected, skipping watcher to avoid conflict");
80 return false;
81 }
82
83 let watcher = CodeGraphWatcher::new(&self.project_path);
84 let handle = watcher.start(cancel_token);
85 debug_log().log("codegraph", "watcher started (no MCP daemon detected)");
86
87 *self.handle.lock().unwrap() = Some(handle);
88 true
89 }
90
91 pub fn stop(&self) {
93 let guard = self.handle.lock().unwrap();
94 if let Some(ref h) = *guard
95 && !h.is_finished()
96 {
97 debug_log().log("codegraph", "aborting watcher...");
98 h.abort();
99 }
100 }
101
102 pub fn inner(&self) -> Arc<Mutex<Option<tokio::task::JoinHandle<()>>>> {
104 self.handle.clone()
105 }
106
107 pub fn project_path(&self) -> &Path {
109 &self.project_path
110 }
111}
112
113pub struct CodeGraphWatcher {
115 project_path: PathBuf,
116 stop_tx: broadcast::Sender<()>,
117 sync_interval: Duration,
118}
119
120impl CodeGraphWatcher {
121 pub fn is_daemon_running(project_path: &Path) -> bool {
124 let daemon_pid_path = project_path.join(".codegraph").join("daemon.pid");
126 if daemon_pid_path.exists() {
127 let pid_running = std::fs::read_to_string(&daemon_pid_path)
128 .ok()
129 .and_then(|pid| pid.trim().parse::<u32>().ok())
130 .map(|pid| {
131 #[cfg(target_os = "windows")]
132 {
133 use std::os::windows::process::CommandExt;
134 const CREATE_NO_WINDOW: u32 = 0x08000000;
135 std::process::Command::new("tasklist")
136 .args(["/FI", &format!("PID eq {}", pid)])
137 .creation_flags(CREATE_NO_WINDOW)
138 .output()
139 .map(|o| String::from_utf8_lossy(&o.stdout).contains(&pid.to_string()))
140 .unwrap_or(false)
141 }
142 #[cfg(not(target_os = "windows"))]
143 std::path::Path::new("/proc").join(pid.to_string()).exists()
144 })
145 .unwrap_or(false);
146 if pid_running {
147 return true;
148 }
149 }
150
151 let daemon_log_path = project_path.join(".codegraph").join("daemon.log");
153 if daemon_log_path.exists() {
154 if let Ok(metadata) = std::fs::metadata(&daemon_log_path) {
155 if let Ok(modified) = metadata.modified() {
156 let now = std::time::SystemTime::now();
157 let elapsed = now
158 .duration_since(modified)
159 .unwrap_or(std::time::Duration::MAX);
160 if elapsed < std::time::Duration::from_secs(60) {
161 debug_log().log("codegraph", "daemon.log recently modified, daemon likely active");
162 return true;
163 }
164 }
165 }
166 }
167
168 false
169 }
170
171 pub fn new(project_path: &Path) -> Self {
173 let (stop_tx, _) = broadcast::channel(1);
174 Self {
175 project_path: project_path.to_path_buf(),
176 stop_tx,
177 sync_interval: Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS),
178 }
179 }
180
181 pub fn with_auto_detect(start_path: &Path) -> Self {
183 let project_path = find_project_root(start_path);
184 debug_log().log("codegraph", &format!(
185 "detected project root at {}",
186 project_path.display()
187 ));
188 Self::new(&project_path)
189 }
190
191 pub fn start(&self, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
193 let project_path = self.project_path.clone();
194 let sync_interval = self.sync_interval;
195
196 tokio::spawn(async move {
197 Self::run_watcher_loop(project_path, sync_interval, cancel_token, None).await;
198 })
199 }
200
201 pub fn start_with_status_updates(
203 &self,
204 cancel_token: CancellationToken,
205 event_tx: mpsc::Sender<AgentEvent>,
206 ) -> tokio::task::JoinHandle<()> {
207 let project_path = self.project_path.clone();
208 let sync_interval = self.sync_interval;
209
210 tokio::spawn(async move {
211 Self::run_watcher_loop(project_path, sync_interval, cancel_token, Some(event_tx)).await;
212 })
213 }
214
215 pub fn stop(&self) {
217 let _ = self.stop_tx.send(());
218 }
219
220 async fn send_status_update(
223 project_path: &Path,
224 event_tx: &Option<mpsc::Sender<AgentEvent>>,
225 pending_count: usize,
226 ) {
227 if let Some(tx) = event_tx {
228 let manager = CodeGraphManager::new(project_path);
229 if manager.is_initialized() {
230 if let Ok(mut status) = manager.status() {
231 status.pending_changes = PendingChanges {
234 added: pending_count as u32,
235 modified: 0,
236 removed: 0,
237 };
238 debug_log().log("codegraph", &format!(
239 "sending status update (pending: {}, nodes: {})",
240 pending_count,
241 status.node_count
242 ));
243 let _ = tx.send(AgentEvent::codegraph_status(status)).await;
244 } else {
245 debug_log().log("codegraph", "failed to get status");
246 }
247 } else {
248 debug_log().log("codegraph", "not initialized, skipping status update");
249 }
250 }
251 }
252
253 async fn run_watcher_loop(
255 project_path: PathBuf,
256 _sync_interval: Duration,
257 cancel_token: CancellationToken,
258 event_tx: Option<mpsc::Sender<AgentEvent>>,
259 ) {
260 if get_codegraph_path().is_none() {
262 debug_log().log("codegraph", "CLI not found, watcher disabled. Please install CodeGraph manually.");
263 return;
264 }
265
266 if !try_acquire_watcher_lock(&project_path) {
268 debug_log().log("codegraph", "another instance is watching this project, exiting");
269 return;
270 }
271
272 let analyzer = ProjectStructureAnalyzer::new(project_path.clone());
274 if analyzer.detect_project_type().is_none() {
275 debug_log().log("codegraph", &format!(
276 "skipping non-code directory: {}",
277 project_path.display()
278 ));
279 return;
280 }
281
282 let manager = CodeGraphManager::new(&project_path);
284 if !manager.is_initialized() {
285 debug_log().log("codegraph", &format!(
286 "not initialized for {}, skipping watcher. Run 'codegraph init -i' to create index.",
287 project_path.display()
288 ));
289 release_watcher_lock(&project_path);
290 return;
291 }
292
293 let env_type = if is_git_repository(&project_path) {
295 CodeGraphEnv::Git
296 } else {
297 CodeGraphEnv::NonGit
298 };
299
300 debug_log().log("codegraph", &format!(
301 "environment detected as {} for: {}",
302 match env_type {
303 CodeGraphEnv::Git => "Git repository",
304 CodeGraphEnv::NonGit => "non-Git directory",
305 },
306 project_path.display()
307 ));
308
309 if env_type == CodeGraphEnv::Git && has_version_changed(&project_path) {
311 debug_log().log("codegraph", "version changed, performing sync before starting watcher");
312 if let Err(e) = manager.sync().await {
313 debug_log().log("codegraph", &format!("version sync failed: {}", e));
314 }
315 update_version_after_sync(&project_path);
316 }
317
318 debug_log().log("codegraph", "performing initial sync on startup");
320 if let Err(e) = manager.sync().await {
321 debug_log().log("codegraph", &format!("initial sync failed: {}", e));
322 }
323 update_version_after_sync(&project_path);
324
325 Self::send_status_update(&project_path, &event_tx, 0).await;
327
328 let (change_tx, mut change_rx) = mpsc::channel::<PathBuf>(100);
330
331 let watcher_result = Self::create_file_watcher(&project_path, change_tx.clone());
333 if watcher_result.is_err() {
334 debug_log().log("codegraph", &format!(
335 "notify watcher failed to start: {}",
336 watcher_result.err().unwrap()
337 ));
338 release_watcher_lock(&project_path);
339 return;
340 }
341 let _watcher = watcher_result.unwrap();
342
343 let ignore_matcher = IgnoreMatcher::load(&project_path);
345
346 let syncing = Arc::new(AtomicBool::new(false));
348 let syncing_clone = syncing.clone();
349 let changed_files = Arc::new(RwLock::new(std::collections::HashSet::<PathBuf>::new()));
350 let last_change = Arc::new(std::sync::Mutex::new(Instant::now()));
351
352 let debounce_delay = Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS);
354 let git_poll_interval = Duration::from_secs(GIT_STATUS_POLL_INTERVAL_SECS);
355
356 let git_monitoring = if env_type == CodeGraphEnv::Git {
358 if start_git_fsmonitor(&project_path) {
359 debug_log().log("codegraph", "Git fsmonitor daemon started");
360 true
361 } else if is_git_fsmonitor_running(&project_path) {
362 debug_log().log("codegraph", "Git fsmonitor daemon already running");
363 true
364 } else {
365 debug_log().log("codegraph", "Git fsmonitor not available, using git status polling");
366 false
367 }
368 } else {
369 false
370 };
371
372 debug_log().log("codegraph", &format!(
373 "watcher started (Git monitoring: {}, notify fallback: always)",
374 git_monitoring
375 ));
376
377 let check_interval = Duration::from_secs(1);
378 let status_update_interval = Duration::from_secs(STATUS_UPDATE_INTERVAL_SECS);
379 let mut last_status_update = Instant::now();
380
381 loop {
382 if cancel_token.is_cancelled() {
383 let pending_count = changed_files.read().await.len();
385 if pending_count > 0 {
386 debug_log().log("codegraph", &format!(
387 "final sync before exit ({} unique files)",
388 pending_count
389 ));
390 let manager = CodeGraphManager::new(&project_path);
391 if manager.is_initialized() {
392 let _ = manager.sync().await;
393 update_version_after_sync(&project_path);
394 }
395 }
396 release_watcher_lock(&project_path);
397 debug_log().log("codegraph", "watcher stopped");
398 break;
399 }
400
401 update_watcher_heartbeat(&project_path);
403
404 if last_status_update.elapsed() >= status_update_interval {
406 let pending = changed_files.read().await.len();
407 Self::send_status_update(&project_path, &event_tx, pending).await;
408 last_status_update = Instant::now();
409 }
410
411 tokio::select! {
412 Some(path) = change_rx.recv() => {
414 if cancel_token.is_cancelled() {
415 break;
416 }
417 if is_source_file(&path)
418 && !ignore_matcher.should_ignore(&path, &project_path) {
419 {
420 let mut files = changed_files.write().await;
421 if files.insert(path.clone()) {
422 *last_change.lock().unwrap() = Instant::now();
423 }
424 }
425 }
426 }
427
428 _ = sleep(git_poll_interval), if git_monitoring => {
430 if cancel_token.is_cancelled() {
431 break;
432 }
433 let changes = get_git_status_changes(&project_path);
434 if changes.has_changes() {
435 let mut new_count = 0;
436 {
437 let mut files = changed_files.write().await;
438 for path in changes.modified.iter().chain(&changes.added).chain(&changes.deleted) {
439 if files.insert(path.clone()) {
440 new_count += 1;
441 }
442 }
443 }
444 if new_count > 0 {
445 *last_change.lock().unwrap() = Instant::now();
446 }
447 }
448 }
449
450 _ = sleep(check_interval) => {
452 if cancel_token.is_cancelled() {
453 break;
454 }
455
456 let files_count = changed_files.read().await.len();
457 let elapsed = last_change.lock().unwrap().elapsed();
458
459 if !syncing_clone.load(Ordering::SeqCst)
460 && files_count > 0
461 && elapsed >= debounce_delay {
462 syncing_clone.store(true, Ordering::SeqCst);
463 debug_log().log("codegraph", &format!(
464 "auto-sync triggered ({} unique files changed)",
465 files_count
466 ));
467
468 if check_mcp_daemon_active(&project_path) {
470 debug_log().log("codegraph", "MCP daemon active, skipping our sync to avoid conflict");
471 syncing_clone.store(false, Ordering::SeqCst);
472 } else {
473 let our_timestamp = try_acquire_sync_lock(&project_path);
474 if our_timestamp > 0 {
475 let manager = CodeGraphManager::new(&project_path);
476 if manager.is_initialized() {
477 if let Err(e) = manager.sync().await {
478 debug_log().log("codegraph", &format!("sync failed: {}", e));
479 } else {
480 if check_sync_lock_owner(&project_path, our_timestamp) {
482 update_version_after_sync(&project_path);
483 changed_files.write().await.clear();
484
485 Self::send_status_update(&project_path, &event_tx, 0).await;
487 } else {
488 debug_log().log("codegraph", "sync abandoned, another process took over");
490 }
492 }
493 }
494 release_sync_lock(&project_path);
495 }
496 syncing_clone.store(false, Ordering::SeqCst);
497 }
498 }
499 }
500 }
501 }
502 }
503
504 fn create_file_watcher(
506 project_path: &Path,
507 change_tx: mpsc::Sender<PathBuf>,
508 ) -> Result<RecommendedWatcher> {
509 let tx = change_tx.clone();
510
511 let handler = move |event: Result<Event, notify::Error>| {
512 if let Ok(event) = event {
513 if !event.kind.is_access() && !event.kind.is_other() {
514 for path in event.paths {
515 let _ = tx.try_send(path);
516 }
517 }
518 }
519 };
520
521 let config = Config::default()
522 .with_poll_interval(Duration::from_secs(2))
523 .with_compare_contents(false);
524
525 let mut watcher = RecommendedWatcher::new(handler, config)?;
526 watcher.watch(project_path, RecursiveMode::Recursive)?;
527
528 Ok(watcher)
529 }
530}