matrixcode_core/tools/codegraph/
watcher.rs1use anyhow::Result;
4use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::time::{Duration, Instant};
9use tokio::sync::{broadcast, mpsc, RwLock};
10use tokio::time::sleep;
11
12use super::manager::CodeGraphManager;
13use super::git::{
14 is_git_repository, get_git_status_changes, is_git_fsmonitor_running, start_git_fsmonitor,
15 has_version_changed, update_version_after_sync,
16 try_acquire_watcher_lock, release_watcher_lock,
17 try_acquire_sync_lock, release_sync_lock, check_sync_lock_owner,
18 update_watcher_heartbeat,
19 is_source_file,
20};
21use super::ignore::IgnoreMatcher;
22use super::install::get_codegraph_path;
23use super::project::find_project_root;
24use super::types::CodeGraphEnv;
25use crate::constants::CODEGRAPH_SYNC_INTERVAL_SECS;
26use crate::memory::ProjectStructureAnalyzer;
27use crate::cancel::CancellationToken;
28
29const GIT_STATUS_POLL_INTERVAL_SECS: u64 = 2;
31
32pub struct CodeGraphWatcher {
34 project_path: PathBuf,
35 stop_tx: broadcast::Sender<()>,
36 sync_interval: Duration,
37}
38
39impl CodeGraphWatcher {
40 pub fn new(project_path: &Path) -> Self {
42 let (stop_tx, _) = broadcast::channel(1);
43 Self {
44 project_path: project_path.to_path_buf(),
45 stop_tx,
46 sync_interval: Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS),
47 }
48 }
49
50 pub fn with_auto_detect(start_path: &Path) -> Self {
52 let project_path = find_project_root(start_path);
53 log::info!("CodeGraph: detected project root at {}", project_path.display());
54 Self::new(&project_path)
55 }
56
57 pub fn start(&self, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
59 let project_path = self.project_path.clone();
60 let sync_interval = self.sync_interval;
61
62 tokio::spawn(async move {
63 Self::run_watcher_loop(project_path, sync_interval, cancel_token).await;
64 })
65 }
66
67 pub fn stop(&self) {
69 let _ = self.stop_tx.send(());
70 }
71
72 async fn run_watcher_loop(
74 project_path: PathBuf,
75 _sync_interval: Duration,
76 cancel_token: CancellationToken,
77 ) {
78 if get_codegraph_path().is_none() {
80 log::warn!("CodeGraph CLI not found, watcher disabled. Please install CodeGraph manually.");
81 return;
82 }
83
84 if !try_acquire_watcher_lock(&project_path) {
86 log::info!("CodeGraph: another instance is watching this project, exiting");
87 return;
88 }
89
90 let analyzer = ProjectStructureAnalyzer::new(project_path.clone());
92 if analyzer.detect_project_type().is_none() {
93 log::info!(
94 "CodeGraph: skipping non-code directory: {}",
95 project_path.display()
96 );
97 return;
98 }
99
100 let manager = CodeGraphManager::new(&project_path);
102 if !manager.is_initialized() {
103 log::info!(
104 "CodeGraph: not initialized for {}, skipping watcher. Run 'codegraph init -i' to create index.",
105 project_path.display()
106 );
107 release_watcher_lock(&project_path);
108 return;
109 }
110
111 let env_type = if is_git_repository(&project_path) {
113 CodeGraphEnv::Git
114 } else {
115 CodeGraphEnv::NonGit
116 };
117
118 log::info!(
119 "CodeGraph: environment detected as {} for: {}",
120 match env_type {
121 CodeGraphEnv::Git => "Git repository",
122 CodeGraphEnv::NonGit => "non-Git directory",
123 },
124 project_path.display()
125 );
126
127 if env_type == CodeGraphEnv::Git && has_version_changed(&project_path) {
129 log::info!("CodeGraph: version changed, performing sync before starting watcher");
130 if let Err(e) = manager.sync().await {
131 log::warn!("CodeGraph version sync failed: {}", e);
132 }
133 update_version_after_sync(&project_path);
134 }
135
136 log::info!("CodeGraph: performing initial sync on startup");
138 if let Err(e) = manager.sync().await {
139 log::warn!("CodeGraph initial sync failed: {}", e);
140 }
141 update_version_after_sync(&project_path);
142
143 let (change_tx, mut change_rx) = mpsc::channel::<PathBuf>(100);
145
146 let watcher_result = Self::create_file_watcher(&project_path, change_tx.clone());
148 if watcher_result.is_err() {
149 log::warn!("CodeGraph notify watcher failed to start: {}", watcher_result.err().unwrap());
150 release_watcher_lock(&project_path);
151 return;
152 }
153 let _watcher = watcher_result.unwrap();
154
155 let ignore_matcher = IgnoreMatcher::load(&project_path);
157
158 let syncing = Arc::new(AtomicBool::new(false));
160 let syncing_clone = syncing.clone();
161 let changed_files = Arc::new(RwLock::new(std::collections::HashSet::<PathBuf>::new()));
162 let last_change = Arc::new(std::sync::Mutex::new(Instant::now()));
163
164 let debounce_delay = Duration::from_secs(CODEGRAPH_SYNC_INTERVAL_SECS);
166 let git_poll_interval = Duration::from_secs(GIT_STATUS_POLL_INTERVAL_SECS);
167
168 let git_monitoring = if env_type == CodeGraphEnv::Git {
170 if start_git_fsmonitor(&project_path) {
171 log::info!("CodeGraph: Git fsmonitor daemon started");
172 true
173 } else if is_git_fsmonitor_running(&project_path) {
174 log::info!("CodeGraph: Git fsmonitor daemon already running");
175 true
176 } else {
177 log::info!("CodeGraph: Git fsmonitor not available, using git status polling");
178 false
179 }
180 } else {
181 false
182 };
183
184 log::info!(
185 "CodeGraph watcher started (Git monitoring: {}, notify fallback: always)",
186 git_monitoring
187 );
188
189 let check_interval = Duration::from_secs(1);
190
191 loop {
192 if cancel_token.is_cancelled() {
193 let pending_count = changed_files.read().await.len();
195 if pending_count > 0 {
196 log::info!(
197 "CodeGraph: final sync before exit ({} unique files)",
198 pending_count
199 );
200 let manager = CodeGraphManager::new(&project_path);
201 if manager.is_initialized() {
202 let _ = manager.sync().await;
203 update_version_after_sync(&project_path);
204 }
205 }
206 release_watcher_lock(&project_path);
207 log::info!("CodeGraph watcher stopped");
208 break;
209 }
210
211 update_watcher_heartbeat(&project_path);
213
214 tokio::select! {
215 Some(path) = change_rx.recv() => {
217 if cancel_token.is_cancelled() {
218 break;
219 }
220 if is_source_file(&path)
221 && !ignore_matcher.should_ignore(&path, &project_path) {
222 {
223 let mut files = changed_files.write().await;
224 if files.insert(path.clone()) {
225 *last_change.lock().unwrap() = Instant::now();
226 log::debug!(
227 "CodeGraph [notify]: new file {} (total unique: {})",
228 path.display(),
229 files.len()
230 );
231 }
232 }
233 }
234 }
235
236 _ = sleep(git_poll_interval), if git_monitoring => {
238 if cancel_token.is_cancelled() {
239 break;
240 }
241 let changes = get_git_status_changes(&project_path);
242 if changes.has_changes() {
243 let mut new_count = 0;
244 {
245 let mut files = changed_files.write().await;
246 for path in changes.modified.iter().chain(&changes.added).chain(&changes.deleted) {
247 if files.insert(path.clone()) {
248 new_count += 1;
249 }
250 }
251 if new_count > 0 {
252 log::debug!(
253 "CodeGraph [git]: {} new changes (total unique: {})",
254 new_count,
255 files.len()
256 );
257 }
258 }
259 if new_count > 0 {
260 *last_change.lock().unwrap() = Instant::now();
261 }
262 }
263 }
264
265 _ = sleep(check_interval) => {
267 if cancel_token.is_cancelled() {
268 break;
269 }
270
271 let files_count = changed_files.read().await.len();
272 let elapsed = last_change.lock().unwrap().elapsed();
273
274 if !syncing_clone.load(Ordering::SeqCst)
275 && files_count > 0
276 && elapsed >= debounce_delay {
277 syncing_clone.store(true, Ordering::SeqCst);
278 log::info!("CodeGraph: auto-sync triggered ({} unique files changed)", files_count);
279
280 let our_timestamp = try_acquire_sync_lock(&project_path);
281 if our_timestamp > 0 {
282 let manager = CodeGraphManager::new(&project_path);
283 if manager.is_initialized() {
284 if let Err(e) = manager.sync().await {
285 log::warn!("CodeGraph sync failed: {}", e);
286 } else {
287 if check_sync_lock_owner(&project_path, our_timestamp) {
289 update_version_after_sync(&project_path);
290 changed_files.write().await.clear();
291 log::debug!("CodeGraph: sync completed, lock verified");
292 } else {
293 log::info!("CodeGraph: sync abandoned, another process took over");
295 }
297 }
298 }
299 release_sync_lock(&project_path);
300 } else {
301 log::debug!("CodeGraph: skipping sync, another instance is syncing");
302 }
303 syncing_clone.store(false, Ordering::SeqCst);
304 }
305 }
306 }
307 }
308 }
309
310 fn create_file_watcher(
312 project_path: &Path,
313 change_tx: mpsc::Sender<PathBuf>,
314 ) -> Result<RecommendedWatcher> {
315 let tx = change_tx.clone();
316
317 let handler = move |event: Result<Event, notify::Error>| {
318 if let Ok(event) = event {
319 if !event.kind.is_access() && !event.kind.is_other() {
320 for path in event.paths {
321 let _ = tx.try_send(path);
322 }
323 }
324 }
325 };
326
327 let config = Config::default()
328 .with_poll_interval(Duration::from_secs(2))
329 .with_compare_contents(false);
330
331 let mut watcher = RecommendedWatcher::new(handler, config)?;
332 watcher.watch(project_path, RecursiveMode::Recursive)?;
333
334 Ok(watcher)
335 }
336}