1use crate as aft;
2use crate::context::{
3 AppContext, SemanticIndexEvent, SemanticIndexStatus, SemanticRefreshEvent,
4 SemanticRefreshRequest,
5};
6use crate::log_ctx;
7use crate::lsp::client::LspEvent;
8use crate::protocol::PushFrame;
9use crate::watcher_filter::{watcher_path_is_infra_skip, WatcherDispatchEvent};
10use std::collections::HashSet;
11use std::path::Path;
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration;
15
16pub fn drain_configure_warning_events(ctx: &AppContext) {
17 for (generation, frame) in ctx.drain_configure_warnings() {
18 if ctx.configure_generation() != generation {
19 aft::slog_info!(
20 "dropping stale configure_warnings for generation {} (current {})",
21 generation,
22 ctx.configure_generation()
23 );
24 continue;
25 }
26
27 if let Some(sender) = ctx.progress_sender_handle() {
28 sender(PushFrame::ConfigureWarnings(frame));
29 }
30 }
31}
32
33pub fn drain_inspect_events(ctx: &AppContext) {
34 let drained = ctx.inspect_manager().drain_completions();
35 let reuse_completed = ctx.take_new_reuse_completions();
40 if drained > 0 || reuse_completed {
45 if let Some(project_root) = ctx.config().project_root.clone() {
46 let (dead_code, unused_exports, duplicates) = ctx
47 .inspect_manager()
48 .latest_tier2_counts(ctx.inspect_dir(), project_root);
49 let stale = ctx.inspect_manager().tier2_any_in_flight();
55 ctx.update_status_bar_tier2(dead_code, unused_exports, duplicates, None, stale);
56 ctx.status_emitter().signal(ctx.build_status_snapshot());
64 }
65 }
66}
67
68pub fn drain_build_completions(ctx: &AppContext) {
73 drain_search_index_events(ctx);
74 drain_callgraph_store_events(ctx);
75 drain_semantic_index_events(ctx);
76}
77
78pub fn any_build_in_flight(ctx: &AppContext) -> bool {
83 {
84 let rx = ctx
85 .search_index_rx()
86 .read()
87 .unwrap_or_else(std::sync::PoisonError::into_inner);
88 if rx.is_some() {
89 return true;
90 }
91 }
92
93 {
94 let rx = ctx.callgraph_store_rx().lock();
95 if rx.is_some() {
96 return true;
97 }
98 }
99
100 {
101 let rx = ctx.semantic_index_rx().lock();
102 rx.is_some()
103 }
104}
105
106pub fn watcher_path_is_ignored_by_current_matcher(ctx: &AppContext, path: &Path) -> bool {
107 if watcher_path_is_infra_skip(path) {
108 return true;
109 }
110
111 if let Some(matcher) = ctx.gitignore() {
112 if path.starts_with(matcher.path()) {
113 let is_dir = path.is_dir();
114 return matcher
115 .matched_path_or_any_parents(path, is_dir)
116 .is_ignore();
117 }
118 }
119
120 false
121}
122
123fn replay_search_index_pending_updates(
124 ctx: &AppContext,
125 index: &mut crate::search_index::SearchIndex,
126 pending_paths: Vec<std::path::PathBuf>,
127) {
128 for path in pending_paths {
129 if path.exists() {
130 if watcher_path_is_ignored_by_current_matcher(ctx, &path) {
131 index.remove_file(&path);
132 } else {
133 index.update_file(&path);
134 }
135 } else {
136 index.remove_file(&path);
137 }
138 }
139}
140
141pub fn watcher_path_is_semantic_source(path: &Path) -> bool {
142 crate::semantic_index::is_semantic_indexed_extension(path)
143}
144
145pub fn mark_semantic_corpus_refresh_success(ctx: &AppContext) {
146 ctx.clear_all_semantic_refresh_retry_attempts();
147 ctx.reset_semantic_refresh_circuit_after_success();
148}
149
150pub fn drain_search_index_events(ctx: &AppContext) {
151 let (latest, disconnected) = {
152 let rx_ref = ctx
153 .search_index_rx()
154 .read()
155 .unwrap_or_else(std::sync::PoisonError::into_inner);
156 let Some(rx) = rx_ref.as_ref() else {
157 return;
158 };
159
160 let mut latest = None;
161 let mut disconnected = false;
162 loop {
163 match rx.try_recv() {
164 Ok(index) => latest = Some(index),
165 Err(crossbeam_channel::TryRecvError::Empty) => break,
166 Err(crossbeam_channel::TryRecvError::Disconnected) => {
167 disconnected = true;
168 break;
169 }
170 }
171 }
172 (latest, disconnected)
173 };
174
175 let mut status_changed = false;
176 let mut installed_index = false;
177 if let Some(mut index) = latest {
178 let pending_paths = ctx.take_pending_search_index_paths();
179 if !pending_paths.is_empty() {
180 replay_search_index_pending_updates(ctx, &mut index, pending_paths);
181 }
182 *ctx.search_index()
183 .write()
184 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
185 installed_index = true;
186 status_changed = true;
187 }
188
189 if disconnected || installed_index {
190 *ctx.search_index_rx()
191 .write()
192 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
193 if disconnected && !installed_index {
194 let _ = ctx.take_pending_search_index_paths();
195 }
196 status_changed = true;
197 }
198
199 if status_changed {
200 ctx.status_emitter().signal(ctx.build_status_snapshot());
201 }
202}
203
204pub fn drain_callgraph_store_events(ctx: &AppContext) {
210 let (latest, disconnected) = {
211 let rx_ref = ctx.callgraph_store_rx().lock();
212 let Some(rx) = rx_ref.as_ref() else {
213 return;
214 };
215
216 let mut latest = None;
217 let mut disconnected = false;
218 loop {
219 match rx.try_recv() {
220 Ok(store) => latest = Some(store),
221 Err(crossbeam_channel::TryRecvError::Empty) => break,
222 Err(crossbeam_channel::TryRecvError::Disconnected) => {
223 disconnected = true;
224 break;
225 }
226 }
227 }
228 (latest, disconnected)
229 };
230
231 let mut status_changed = false;
232 let mut installed = false;
233 if let Some(store) = latest {
234 let pending = ctx.take_pending_callgraph_store_paths();
237 if !pending.is_empty() {
238 if let Err(error) = store.refresh_files(&pending) {
239 crate::slog_warn!(
240 "callgraph store post-build pending refresh failed: {}",
241 error
242 );
243 if let Err(mark_error) = store.mark_files_stale(&pending) {
244 crate::slog_warn!(
245 "failed to mark callgraph store files stale after post-build refresh failure: {}",
246 mark_error
247 );
248 }
249 }
250 }
251 *ctx.callgraph_store()
252 .write()
253 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::new(store));
254 installed = true;
255 status_changed = true;
256 }
257
258 if disconnected || installed {
259 *ctx.callgraph_store_rx().lock() = None;
260 if disconnected && !installed {
261 let _ = ctx.take_pending_callgraph_store_paths();
264 }
265 status_changed = true;
266 }
267
268 if status_changed {
269 ctx.status_emitter().signal(ctx.build_status_snapshot());
270 }
271}
272
273pub fn drain_semantic_index_events(ctx: &AppContext) {
274 let (events, disconnected) = {
275 let rx_ref = ctx.semantic_index_rx().lock();
276 let Some(rx) = rx_ref.as_ref() else {
277 return;
278 };
279
280 let mut events = Vec::new();
281 let mut disconnected = false;
282 loop {
283 match rx.try_recv() {
284 Ok(event) => events.push(event),
285 Err(crossbeam_channel::TryRecvError::Empty) => break,
286 Err(crossbeam_channel::TryRecvError::Disconnected) => {
287 disconnected = true;
288 break;
289 }
290 }
291 }
292 (events, disconnected)
293 };
294
295 if events.is_empty() && !disconnected {
296 return;
297 }
298
299 let mut keep_receiver = true;
300 let mut status_changed = false;
301 let mut replay_refresh_paths = Vec::new();
302 let mut replay_corpus_refresh = false;
303 for event in events {
304 match event {
305 SemanticIndexEvent::Progress {
306 stage,
307 files,
308 entries_done,
309 entries_total,
310 } => {
311 *ctx.semantic_index_status()
312 .write()
313 .unwrap_or_else(std::sync::PoisonError::into_inner) =
314 SemanticIndexStatus::Building {
315 stage,
316 files,
317 entries_done,
318 entries_total,
319 };
320 status_changed = true;
327 }
328 SemanticIndexEvent::Ready(mut index) => {
329 mark_semantic_corpus_refresh_success(ctx);
330 let pending_paths = ctx.take_pending_semantic_index_paths();
331 for path in pending_paths {
332 if watcher_path_is_semantic_source(&path) {
333 index.invalidate_file(&path);
334 replay_refresh_paths.push(path);
335 }
336 }
337 replay_corpus_refresh = ctx.take_pending_semantic_corpus_refresh();
338 *ctx.semantic_index()
339 .write()
340 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
341 *ctx.semantic_index_status()
342 .write()
343 .unwrap_or_else(std::sync::PoisonError::into_inner) =
344 SemanticIndexStatus::ready();
345 keep_receiver = false;
346 status_changed = true;
347 }
348 SemanticIndexEvent::Failed(error) => {
349 let _ = ctx.take_pending_semantic_index_paths();
350 let _ = ctx.take_pending_semantic_corpus_refresh();
351 *ctx.semantic_index()
352 .write()
353 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
354 ctx.clear_semantic_refresh_worker();
355 *ctx.semantic_index_status()
356 .write()
357 .unwrap_or_else(std::sync::PoisonError::into_inner) =
358 SemanticIndexStatus::Failed(error);
359 keep_receiver = false;
360 status_changed = true;
361 }
362 }
363 }
364
365 if disconnected && keep_receiver {
366 let _ = ctx.take_pending_semantic_index_paths();
367 let _ = ctx.take_pending_semantic_corpus_refresh();
368 *ctx.semantic_index()
369 .write()
370 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
371 ctx.clear_semantic_refresh_worker();
372 *ctx.semantic_index_status()
373 .write()
374 .unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::Failed(
375 "semantic index build worker disconnected before reporting completion".to_string(),
376 );
377 keep_receiver = false;
378 status_changed = true;
379 }
380
381 if !keep_receiver {
382 *ctx.semantic_index_rx().lock() = None;
383 }
384
385 if replay_corpus_refresh {
386 if ctx.canonical_cache_root_opt().is_some() {
387 *ctx.semantic_index_status()
388 .write()
389 .unwrap_or_else(std::sync::PoisonError::into_inner) =
390 SemanticIndexStatus::Building {
391 stage: "refreshing_corpus".to_string(),
392 files: None,
393 entries_done: None,
394 entries_total: None,
395 };
396 let sent = ctx
397 .semantic_refresh_sender()
398 .is_some_and(|sender| sender.send(SemanticRefreshRequest::Corpus).is_ok());
399 if !sent {
400 *ctx.semantic_index_status()
401 .write()
402 .unwrap_or_else(std::sync::PoisonError::into_inner) =
403 SemanticIndexStatus::Failed(
404 "semantic corpus refresh worker unavailable".to_string(),
405 );
406 }
407 status_changed = true;
408 }
409 } else if !replay_refresh_paths.is_empty() {
410 {
411 let mut status = ctx
412 .semantic_index_status()
413 .write()
414 .unwrap_or_else(std::sync::PoisonError::into_inner);
415 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
416 for path in &replay_refresh_paths {
417 status.add_refreshing_file(path.clone());
418 }
419 status_changed = true;
420 }
421 }
422 let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
423 sender
424 .send(SemanticRefreshRequest::Files {
425 paths: replay_refresh_paths.clone(),
426 })
427 .is_ok()
428 });
429 if !sent {
430 crate::slog_warn!(
431 "semantic refresh worker unavailable; dropping {} replayed file(s)",
432 replay_refresh_paths.len()
433 );
434 let mut status = ctx
435 .semantic_index_status()
436 .write()
437 .unwrap_or_else(std::sync::PoisonError::into_inner);
438 for path in &replay_refresh_paths {
439 status.cancel_refreshing_file(path);
440 }
441 status_changed = true;
442 }
443 }
444
445 if status_changed {
446 ctx.status_emitter().signal(ctx.build_status_snapshot());
447 }
448}
449
450pub const MAX_RETRY_ATTEMPTS: usize = 6;
451pub const BREAKER_TRIP_THRESHOLD: usize = 3;
452
453fn semantic_refresh_retry_backoff(attempt: usize) -> Duration {
458 if let Ok(raw) = std::env::var("AFT_SEMANTIC_RETRY_BACKOFF_MS") {
460 if let Ok(ms) = raw.parse::<u64>() {
461 return Duration::from_millis(ms);
462 }
463 }
464 const SCHEDULE_SECS: [u64; 3] = [15, 30, 60];
465 let secs = SCHEDULE_SECS
466 .get(attempt)
467 .copied()
468 .unwrap_or(*SCHEDULE_SECS.last().unwrap());
469 Duration::from_secs(secs)
470}
471
472struct SemanticRefreshRetryPlan {
473 retry_paths: Vec<std::path::PathBuf>,
474 capped_paths: Vec<std::path::PathBuf>,
475 delay: Option<Duration>,
476}
477
478fn next_semantic_refresh_retry_plan(
479 ctx: &AppContext,
480 paths: Vec<std::path::PathBuf>,
481) -> SemanticRefreshRetryPlan {
482 let mut retry_paths = Vec::new();
483 let mut capped_paths = Vec::new();
484 let mut max_attempt = 0usize;
485
486 ctx.with_semantic_refresh_retry_attempts_mut(|attempts| {
487 for path in paths {
488 let attempt = attempts.get(&path).copied().unwrap_or(0);
489 if attempt >= MAX_RETRY_ATTEMPTS {
490 capped_paths.push(path);
491 continue;
492 }
493 max_attempt = max_attempt.max(attempt);
494 attempts.insert(path.clone(), attempt.saturating_add(1));
495 retry_paths.push(path);
496 }
497 });
498
499 let delay = if retry_paths.is_empty() {
500 None
501 } else {
502 Some(semantic_refresh_retry_backoff(max_attempt))
503 };
504
505 SemanticRefreshRetryPlan {
506 retry_paths,
507 capped_paths,
508 delay,
509 }
510}
511
512fn clear_semantic_refresh_retry_attempts(ctx: &AppContext, paths: &[std::path::PathBuf]) {
513 ctx.clear_semantic_refresh_retry_attempts(paths);
514}
515
516fn clear_completed_pending_semantic_index_paths(
517 ctx: &AppContext,
518 completed_paths: &[std::path::PathBuf],
519) {
520 if completed_paths.is_empty() {
521 return;
522 }
523
524 let completed = completed_paths.iter().cloned().collect::<HashSet<_>>();
525 let remaining = ctx
526 .take_pending_semantic_index_paths()
527 .into_iter()
528 .filter(|path| !completed.contains(path))
529 .collect::<Vec<_>>();
530 if !remaining.is_empty() {
531 ctx.add_pending_semantic_index_paths(remaining);
532 }
533}
534
535fn semantic_refresh_probe_delay() -> Duration {
536 semantic_refresh_retry_backoff(usize::MAX)
537}
538
539pub fn semantic_refresh_circuit_is_open(ctx: &AppContext) -> bool {
540 ctx.semantic_refresh_circuit_is_open()
541}
542
543pub fn record_semantic_refresh_transient_failure(ctx: &AppContext) -> bool {
544 ctx.record_semantic_refresh_transient_failure(BREAKER_TRIP_THRESHOLD)
545}
546
547fn reset_semantic_refresh_transient_failure_count(ctx: &AppContext) {
548 ctx.reset_semantic_refresh_transient_failure_count();
549}
550
551fn reset_semantic_refresh_circuit_after_success(ctx: &AppContext) {
552 ctx.reset_semantic_refresh_circuit_after_success();
553}
554
555fn mark_semantic_refresh_success(ctx: &AppContext, completed_paths: &[std::path::PathBuf]) {
556 clear_semantic_refresh_retry_attempts(ctx, completed_paths);
557 clear_completed_pending_semantic_index_paths(ctx, completed_paths);
558 reset_semantic_refresh_circuit_after_success(ctx);
559}
560
561#[doc(hidden)]
562pub fn semantic_refresh_transient_failure_count_for_test(ctx: &AppContext) -> usize {
563 ctx.semantic_refresh_transient_failure_count()
564}
565
566#[doc(hidden)]
567pub fn semantic_refresh_probe_is_scheduled_for_test(ctx: &AppContext) -> bool {
568 ctx.semantic_refresh_probe_is_scheduled()
569}
570
571fn ensure_semantic_refresh_probe_scheduled(ctx: &AppContext) {
572 ctx.ensure_semantic_refresh_probe_scheduled(semantic_refresh_probe_delay());
573}
574
575fn maybe_fire_semantic_refresh_probe(ctx: &AppContext) {
576 if !ctx.take_semantic_refresh_probe_ready() {
577 return;
578 }
579 if !semantic_refresh_circuit_is_open(ctx) {
580 return;
581 }
582
583 let pending_paths = ctx.take_pending_semantic_index_paths();
584 if pending_paths.is_empty() {
585 return;
586 }
587
588 let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
589 sender
590 .send(SemanticRefreshRequest::Files {
591 paths: pending_paths.clone(),
592 })
593 .is_ok()
594 });
595 if !sent {
596 ctx.add_pending_semantic_index_paths(pending_paths);
597 }
598}
599
600pub fn schedule_semantic_refresh_retry(
601 ctx: &AppContext,
602 paths: Vec<std::path::PathBuf>,
603 error: &str,
604) -> bool {
605 if paths.is_empty() {
606 return false;
607 }
608 let Some(sender) = ctx.semantic_refresh_sender() else {
609 return false;
610 };
611
612 let SemanticRefreshRetryPlan {
613 retry_paths,
614 capped_paths,
615 delay,
616 } = next_semantic_refresh_retry_plan(ctx, paths);
617
618 if !capped_paths.is_empty() {
619 aft::slog_warn!(
620 "semantic refresh retry limit reached for {} file(s); preserving for next watcher/configure refresh",
621 capped_paths.len(),
622 );
623 ctx.add_pending_semantic_index_paths(capped_paths);
624 }
625
626 let Some(delay) = delay else {
627 return true;
628 };
629
630 let clean = aft::semantic_index::strip_transient_embedding_marker(error);
631 aft::slog_warn!(
632 "semantic refresh hit a transient backend error ({}); retrying {} file(s) in {}ms",
633 clean,
634 retry_paths.len(),
635 delay.as_millis(),
636 );
637
638 let session_id = log_ctx::current_session();
639 thread::spawn(move || {
640 log_ctx::with_session(session_id, || {
641 thread::sleep(delay);
642 let _ = sender.send(SemanticRefreshRequest::Files { paths: retry_paths });
643 });
644 });
645 true
646}
647
648pub fn drain_semantic_refresh_events(ctx: &AppContext) {
649 let (events, disconnected) = {
650 let rx_ref = ctx.semantic_refresh_event_rx().lock();
651 let Some(rx) = rx_ref.as_ref() else {
652 return;
653 };
654
655 let mut events = Vec::new();
656 let mut disconnected = false;
657 loop {
658 match rx.try_recv() {
659 Ok(event) => events.push(event),
660 Err(crossbeam_channel::TryRecvError::Empty) => break,
661 Err(crossbeam_channel::TryRecvError::Disconnected) => {
662 disconnected = true;
663 break;
664 }
665 }
666 }
667 (events, disconnected)
668 };
669
670 if events.is_empty() && !disconnected {
671 maybe_fire_semantic_refresh_probe(ctx);
672 return;
673 }
674
675 let had_events = !events.is_empty();
676 let mut status_changed = false;
677 let mut replay_refresh_paths = Vec::new();
678 for event in events {
679 match event {
680 SemanticRefreshEvent::Started { paths } => {
681 let mut status = ctx
682 .semantic_index_status()
683 .write()
684 .unwrap_or_else(std::sync::PoisonError::into_inner);
685 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
686 for path in paths {
687 status.start_refreshing_file(path);
688 }
689 status_changed = true;
690 }
691 }
692 SemanticRefreshEvent::CorpusStarted { files } => {
693 *ctx.semantic_index_status()
694 .write()
695 .unwrap_or_else(std::sync::PoisonError::into_inner) =
696 SemanticIndexStatus::Building {
697 stage: "refreshing_corpus".to_string(),
698 files: Some(files),
699 entries_done: None,
700 entries_total: None,
701 };
702 status_changed = true;
703 }
704 SemanticRefreshEvent::Completed {
705 added_entries,
706 updated_metadata,
707 completed_paths,
708 } => {
709 if let Some(index) = ctx
710 .semantic_index()
711 .write()
712 .unwrap_or_else(std::sync::PoisonError::into_inner)
713 .as_mut()
714 {
715 index.apply_refresh_update(added_entries, updated_metadata, &completed_paths);
716 }
717 mark_semantic_refresh_success(ctx, &completed_paths);
718 let mut status = ctx
719 .semantic_index_status()
720 .write()
721 .unwrap_or_else(std::sync::PoisonError::into_inner);
722 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
723 for path in &completed_paths {
724 status.complete_refreshing_file(path);
725 }
726 status_changed = true;
727 }
728 }
729 SemanticRefreshEvent::CorpusCompleted {
730 mut index,
731 changed,
732 added,
733 deleted,
734 total_processed,
735 } => {
736 aft::runtime_drain::mark_semantic_corpus_refresh_success(ctx);
737 if changed > 0 || added > 0 || deleted > 0 {
738 aft::slog_info!(
739 "semantic corpus refresh completed: {} changed, {} new, {} deleted, {} total processed",
740 changed,
741 added,
742 deleted,
743 total_processed
744 );
745 }
746 let pending_paths = ctx.take_pending_semantic_index_paths();
747 for path in pending_paths {
748 if !aft::runtime_drain::watcher_path_is_semantic_source(&path) {
749 continue;
750 }
751 index.invalidate_file(&path);
752 if !aft::runtime_drain::watcher_path_is_ignored_by_current_matcher(ctx, &path) {
753 replay_refresh_paths.push(path);
754 }
755 }
756 *ctx.semantic_index()
757 .write()
758 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
759 *ctx.semantic_index_status()
760 .write()
761 .unwrap_or_else(std::sync::PoisonError::into_inner) =
762 SemanticIndexStatus::ready();
763 status_changed = true;
764 }
765 SemanticRefreshEvent::Failed { paths, error } => {
766 if aft::semantic_index::embedding_failure_is_transient(&error) {
767 if record_semantic_refresh_transient_failure(ctx) {
768 ctx.add_pending_semantic_index_paths(paths);
769 ensure_semantic_refresh_probe_scheduled(ctx);
770 } else if !schedule_semantic_refresh_retry(ctx, paths.clone(), &error) {
771 aft::slog_warn!(
772 "semantic refresh worker unavailable; preserving {} transiently failed file(s) for retry",
773 paths.len(),
774 );
775 ctx.add_pending_semantic_index_paths(paths);
776 }
777 } else {
778 aft::slog_warn!("semantic refresh failed: {}", error);
779 reset_semantic_refresh_transient_failure_count(ctx);
780 clear_semantic_refresh_retry_attempts(ctx, &paths);
781 let mut status = ctx
782 .semantic_index_status()
783 .write()
784 .unwrap_or_else(std::sync::PoisonError::into_inner);
785 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
786 for path in &paths {
787 status.complete_refreshing_file(path);
788 }
789 status_changed = true;
790 }
791 }
792 }
793 SemanticRefreshEvent::CorpusFailed { error } => {
794 if aft::semantic_index::embedding_failure_is_transient(&error) {
803 let clean = aft::semantic_index::strip_transient_embedding_marker(&error);
804 let has_index = ctx
805 .semantic_index()
806 .read()
807 .unwrap_or_else(std::sync::PoisonError::into_inner)
808 .is_some();
809 if has_index {
810 aft::slog_warn!(
811 "semantic corpus refresh hit a transient backend error ({}); keeping the existing index",
812 clean,
813 );
814 *ctx.semantic_index_status()
815 .write()
816 .unwrap_or_else(std::sync::PoisonError::into_inner) =
817 SemanticIndexStatus::ready();
818 } else {
819 aft::slog_warn!("semantic corpus refresh failed: {}", clean);
821 *ctx.semantic_index_status()
822 .write()
823 .unwrap_or_else(std::sync::PoisonError::into_inner) =
824 SemanticIndexStatus::Failed(clean);
825 }
826 status_changed = true;
827 } else {
828 aft::slog_warn!("semantic corpus refresh failed: {}", error);
829 let _ = ctx.take_pending_semantic_index_paths();
830 *ctx.semantic_index()
831 .write()
832 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
833 *ctx.semantic_index_status()
834 .write()
835 .unwrap_or_else(std::sync::PoisonError::into_inner) =
836 SemanticIndexStatus::Failed(error);
837 status_changed = true;
838 }
839 }
840 }
841 }
842
843 if disconnected {
844 ctx.clear_semantic_refresh_worker();
845 let refreshing_paths = {
846 let status = ctx
847 .semantic_index_status()
848 .read()
849 .unwrap_or_else(std::sync::PoisonError::into_inner);
850 match &*status {
851 SemanticIndexStatus::Ready { refreshing, .. } => refreshing.clone(),
852 _ => Vec::new(),
853 }
854 };
855 if !refreshing_paths.is_empty() {
856 let mut status = ctx
857 .semantic_index_status()
858 .write()
859 .unwrap_or_else(std::sync::PoisonError::into_inner);
860 for path in &refreshing_paths {
861 status.cancel_refreshing_file(path);
862 }
863 }
864 if !refreshing_paths.is_empty() || had_events {
865 status_changed = true;
866 }
867 }
868
869 if !replay_refresh_paths.is_empty() {
870 {
871 let mut status = ctx
872 .semantic_index_status()
873 .write()
874 .unwrap_or_else(std::sync::PoisonError::into_inner);
875 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
876 for path in &replay_refresh_paths {
877 status.add_refreshing_file(path.clone());
878 }
879 status_changed = true;
880 }
881 }
882 let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
883 sender
884 .send(SemanticRefreshRequest::Files {
885 paths: replay_refresh_paths.clone(),
886 })
887 .is_ok()
888 });
889 if !sent {
890 aft::slog_warn!(
891 "semantic refresh worker unavailable; dropping {} replayed corpus file(s)",
892 replay_refresh_paths.len()
893 );
894 let mut status = ctx
895 .semantic_index_status()
896 .write()
897 .unwrap_or_else(std::sync::PoisonError::into_inner);
898 for path in &replay_refresh_paths {
899 status.cancel_refreshing_file(path);
900 }
901 status_changed = true;
902 }
903 }
904
905 maybe_fire_semantic_refresh_probe(ctx);
906
907 if status_changed {
908 ctx.status_emitter().signal(ctx.build_status_snapshot());
909 }
910}
911
912const SOURCE_EXTENSIONS: &[&str] = &[
914 "ts", "tsx", "mts", "cts", "js", "jsx", "mjs", "cjs", "py", "pyi", "rs", "go",
915];
916
917pub const WATCHER_BATCH_INLINE_CAP: usize = 256;
918
919pub fn watcher_path_is_tsconfig(path: &std::path::Path) -> bool {
927 path.file_name()
928 .and_then(|n| n.to_str())
929 .map(|n| {
930 n == "tsconfig.json"
931 || n == "jsconfig.json"
932 || ((n.starts_with("tsconfig.") || n.starts_with("jsconfig."))
933 && n.ends_with(".json"))
934 })
935 .unwrap_or(false)
936}
937
938pub fn watcher_path_is_source(path: &std::path::Path) -> bool {
939 path.extension()
940 .and_then(|ext| ext.to_str())
941 .is_some_and(|ext| SOURCE_EXTENSIONS.contains(&ext))
942}
943
944pub fn watcher_path_is_callgraph_indexed(path: &std::path::Path) -> bool {
952 aft::parser::detect_language(path).is_some()
953}
954
955pub fn semantic_corpus_refresh_in_progress(ctx: &AppContext) -> bool {
956 let status = ctx
957 .semantic_index_status()
958 .read()
959 .unwrap_or_else(std::sync::PoisonError::into_inner);
960 matches!(
961 &*status,
962 SemanticIndexStatus::Building { stage, .. } if stage == "refreshing_corpus"
963 )
964}
965
966#[cfg(debug_assertions)]
967pub fn delay_search_rebuild_publish_for_debug() {
968 let Some(delay_ms) = std::env::var("AFT_TEST_SEARCH_REBUILD_PUBLISH_DELAY_MS")
969 .ok()
970 .and_then(|raw| raw.parse::<u64>().ok())
971 else {
972 return;
973 };
974 thread::sleep(Duration::from_millis(delay_ms));
975}
976
977#[cfg(not(debug_assertions))]
978pub fn delay_search_rebuild_publish_for_debug() {}
979
980pub fn spawn_search_corpus_refresh(
981 ctx: &AppContext,
982 root: std::path::PathBuf,
983 config: Arc<aft::config::Config>,
984) {
985 {
986 let mut search_index = ctx
987 .search_index()
988 .write()
989 .unwrap_or_else(std::sync::PoisonError::into_inner);
990 if let Some(index) = search_index.as_mut() {
991 index.ready = false;
992 }
993 }
994
995 let (tx, rx): (
996 crossbeam_channel::Sender<aft::search_index::SearchIndex>,
997 crossbeam_channel::Receiver<aft::search_index::SearchIndex>,
998 ) = crossbeam_channel::unbounded();
999 *ctx.search_index_rx()
1000 .write()
1001 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(rx);
1002 ctx.reset_symbol_cache();
1003
1004 let is_worktree_bridge = ctx.is_worktree_bridge();
1005 let session_id = log_ctx::current_session();
1006 thread::spawn(move || {
1007 log_ctx::with_session(session_id, || {
1008 let cache_dir =
1009 aft::search_index::resolve_cache_dir(&root, config.storage_dir.as_deref());
1010 let _cache_lock = if is_worktree_bridge {
1011 None
1012 } else {
1013 match aft::search_index::CacheLock::acquire(&cache_dir) {
1014 Ok(lock) => Some(lock),
1015 Err(error) => {
1016 aft::slog_warn!(
1017 "failed to acquire search cache lock for ignore refresh: {}",
1018 error
1019 );
1020 None
1021 }
1022 }
1023 };
1024 let index = aft::search_index::SearchIndex::build_with_limit(
1025 &root,
1026 config.search_index_max_file_size,
1027 );
1028 delay_search_rebuild_publish_for_debug();
1029 if !is_worktree_bridge {
1030 index.write_to_disk(&cache_dir, index.stored_git_head());
1031 }
1032 let _ = tx.send(index);
1033 });
1034 });
1035}
1036
1037pub fn refresh_project_corpus(
1038 ctx: &AppContext,
1039 reason: &str,
1040 invalidate_ignore_paths: bool,
1041) -> bool {
1042 let Some(root) = ctx.canonical_cache_root_opt() else {
1043 return false;
1044 };
1045 let config = ctx.config();
1046 let mut status_changed = false;
1047
1048 if invalidate_ignore_paths {
1049 if let Some(graph) = ctx.callgraph().lock().as_mut() {
1050 graph.invalidate_file(&root.join(".gitignore"));
1051 graph.invalidate_file(&root.join(".aftignore"));
1052 }
1053 }
1054
1055 if !ctx.is_worktree_bridge() {
1056 let callgraph_store_resident = {
1078 let guard = ctx
1079 .callgraph_store()
1080 .read()
1081 .unwrap_or_else(std::sync::PoisonError::into_inner);
1082 guard.is_some()
1083 };
1084 if callgraph_store_resident || ctx.callgraph_store_rx().lock().is_some() {
1085 *ctx.callgraph_store()
1086 .write()
1087 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
1088 ctx.mark_callgraph_store_force_rebuild();
1089 status_changed = true;
1090 aft::slog_info!(
1091 "callgraph store scheduled for background rebuild after {}",
1092 reason
1093 );
1094 }
1095 }
1096
1097 if config.search_index {
1098 spawn_search_corpus_refresh(ctx, root.clone(), config.clone());
1099 status_changed = true;
1100 aft::slog_info!("started search index refresh after {}", reason);
1101 }
1102
1103 if config.semantic_search {
1104 if let Some(sender) = ctx.semantic_refresh_sender() {
1105 *ctx.semantic_index_status()
1106 .write()
1107 .unwrap_or_else(std::sync::PoisonError::into_inner) =
1108 SemanticIndexStatus::Building {
1109 stage: "refreshing_corpus".to_string(),
1110 files: None,
1111 entries_done: None,
1112 entries_total: None,
1113 };
1114 match sender.send(SemanticRefreshRequest::Corpus) {
1115 Ok(()) => {
1116 status_changed = true;
1117 }
1118 Err(error) => {
1119 *ctx.semantic_index_status()
1120 .write()
1121 .unwrap_or_else(std::sync::PoisonError::into_inner) =
1122 SemanticIndexStatus::Failed(format!(
1123 "semantic corpus refresh worker unavailable: {error}"
1124 ));
1125 status_changed = true;
1126 }
1127 }
1128 } else if ctx.semantic_index_rx().lock().is_some() {
1129 ctx.mark_pending_semantic_corpus_refresh();
1130 }
1131 }
1132
1133 status_changed
1134}
1135
1136pub fn refresh_corpus_after_ignore_change(ctx: &AppContext) -> bool {
1137 refresh_project_corpus(ctx, "ignore-rule change", true)
1138}
1139
1140pub fn refresh_project_after_watcher_rescan(ctx: &AppContext) -> bool {
1141 let Some(root) = ctx.canonical_cache_root_opt() else {
1142 return false;
1143 };
1144 ctx.clear_pending_index_updates();
1145 ctx.reset_symbol_cache();
1146 let _ = ctx.mark_status_bar_tier2_stale();
1147 ctx.clear_tsconfig_membership_cache();
1148 let mut status_changed = true;
1149
1150 if ctx.callgraph().lock().is_some() {
1151 *ctx.callgraph().lock() = Some(aft::callgraph::CallGraph::new(root));
1152 }
1153
1154 status_changed |= refresh_project_corpus(ctx, "watcher overflow", false);
1155 status_changed
1156}
1157
1158pub fn refresh_callgraph_store_for_watcher(
1159 ctx: &AppContext,
1160 changed: &HashSet<std::path::PathBuf>,
1161) {
1162 if ctx.is_worktree_bridge() {
1163 return;
1164 }
1165 let source_paths = changed
1166 .iter()
1167 .filter(|path| watcher_path_is_callgraph_indexed(path))
1168 .cloned()
1169 .collect::<Vec<_>>();
1170 if source_paths.is_empty() {
1171 return;
1172 }
1173 ctx.revalidate_callgraph_store_generation();
1178 let store = {
1179 let guard = ctx
1180 .callgraph_store()
1181 .read()
1182 .unwrap_or_else(std::sync::PoisonError::into_inner);
1183 guard.as_ref().map(Arc::clone)
1184 };
1185 let Some(store) = store else {
1186 if ctx.callgraph_store_rx().lock().is_some() {
1191 ctx.add_pending_callgraph_store_paths(source_paths);
1192 }
1193 return;
1194 };
1195 if let Err(error) = store.refresh_files(&source_paths) {
1196 aft::slog_warn!("callgraph store refresh failed: {}", error);
1197 match store.mark_files_stale(&source_paths) {
1198 Ok(marked) => aft::slog_warn!(
1199 "marked {} callgraph store file(s) stale after refresh failure",
1200 marked.len()
1201 ),
1202 Err(mark_error) => aft::slog_warn!(
1203 "failed to mark callgraph store files stale after refresh failure: {}",
1204 mark_error
1205 ),
1206 }
1207 }
1208}
1209
1210pub fn drain_watcher_events(ctx: &AppContext) {
1216 let mut changed: HashSet<std::path::PathBuf> = HashSet::new();
1217 let mut ignore_file_changed = false;
1218 let mut rescan_required = false;
1219 let mut watcher_failed = None;
1220 let mut root_deleted = false;
1221
1222 {
1223 let rx_ref = ctx.watcher_rx().lock();
1224 let rx = match rx_ref.as_ref() {
1225 Some(rx) => rx,
1226 None => {
1227 ctx.tick_tier2_refresh_scheduler(0);
1228 return; }
1230 };
1231
1232 loop {
1233 match rx.try_recv() {
1234 Ok(WatcherDispatchEvent::Paths(paths)) => {
1235 if !rescan_required {
1236 changed.extend(paths);
1237 }
1238 }
1239 Ok(WatcherDispatchEvent::RescanRequired) => {
1240 rescan_required = true;
1241 changed.clear();
1242 }
1243 Ok(WatcherDispatchEvent::IgnoreRulesChanged { path }) => {
1244 ignore_file_changed = true;
1245 log::debug!(
1246 "watcher: ignore rules changed at {}, rebuilding matcher",
1247 path.display()
1248 );
1249 if !rescan_required {
1250 ctx.rebuild_gitignore();
1251 }
1252 }
1253 Ok(WatcherDispatchEvent::RootDeleted) => {
1254 root_deleted = true;
1255 break;
1256 }
1257 Ok(WatcherDispatchEvent::Error(error)) => {
1258 watcher_failed = Some(error);
1259 break;
1260 }
1261 Err(crossbeam_channel::TryRecvError::Empty) => break,
1262 Err(crossbeam_channel::TryRecvError::Disconnected) => {
1263 watcher_failed = Some("watcher channel disconnected".to_string());
1264 break;
1265 }
1266 }
1267 }
1268 }
1269
1270 let mut watcher_status_changed = false;
1271 if root_deleted {
1272 ctx.stop_watcher_runtime();
1273 let _ = ctx.add_degraded_reason("project_root_deleted".to_string());
1274 aft::slog_warn!(
1275 "project root deleted; dropping watcher to avoid delete-storm: {:?}",
1276 ctx.canonical_cache_root_opt()
1277 );
1278 watcher_status_changed = true;
1279 changed.clear();
1280 rescan_required = false;
1281 } else if let Some(error) = watcher_failed {
1282 ctx.stop_watcher_runtime();
1283 let _ = ctx.add_degraded_reason("watcher_unavailable".to_string());
1284 aft::slog_warn!(
1285 "file watcher unavailable; continuing without live external-change invalidation: {}",
1286 error
1287 );
1288 watcher_status_changed = true;
1289 rescan_required = false;
1290 }
1291
1292 let mut status_changed = watcher_status_changed;
1293 let mut project_corpus_refresh_requested = false;
1294 if rescan_required {
1295 aft::slog_warn!("watcher overflow: forcing project rescan");
1296 ctx.rebuild_gitignore();
1297 status_changed |= refresh_project_after_watcher_rescan(ctx);
1298 project_corpus_refresh_requested = true;
1299 changed.clear();
1300 } else if ignore_file_changed {
1301 status_changed |= refresh_corpus_after_ignore_change(ctx);
1302 project_corpus_refresh_requested = true;
1303 }
1304
1305 let scheduler_changed_path_count = if rescan_required {
1306 aft::inspect::tier2_scheduler::TIER2_REFRESH_STORM_PATH_THRESHOLD + 1
1307 } else if ignore_file_changed {
1308 changed.len().max(1)
1309 } else {
1310 changed.len()
1311 };
1312 if changed.is_empty() {
1313 if status_changed {
1314 ctx.status_emitter().signal(ctx.build_status_snapshot());
1315 }
1316 ctx.tick_tier2_refresh_scheduler(scheduler_changed_path_count);
1317 return;
1318 }
1319
1320 ctx.add_pending_tier2_paths(changed.iter().cloned());
1321
1322 if ctx.mark_status_bar_tier2_stale() {
1326 status_changed = true;
1327 }
1328
1329 if changed.iter().any(|path| watcher_path_is_tsconfig(path)) {
1334 ctx.clear_tsconfig_membership_cache();
1335 status_changed = true;
1336 }
1337
1338 let oversized_inline_batch = changed.len() > WATCHER_BATCH_INLINE_CAP;
1339 if oversized_inline_batch {
1340 aft::slog_warn!(
1341 "watcher batch of {} paths exceeds inline cap {}; scheduling corpus refresh",
1342 changed.len(),
1343 WATCHER_BATCH_INLINE_CAP
1344 );
1345 if !project_corpus_refresh_requested {
1346 status_changed |= refresh_project_corpus(ctx, "oversized watcher batch", false);
1347 }
1348 }
1349
1350 let search_build_in_progress = {
1351 let search_index_rx = ctx
1352 .search_index_rx()
1353 .read()
1354 .unwrap_or_else(std::sync::PoisonError::into_inner);
1355 search_index_rx.is_some()
1356 };
1357 if !oversized_inline_batch && search_build_in_progress {
1358 ctx.add_pending_search_index_paths(changed.iter().cloned());
1359 }
1360 let semantic_source_paths = changed
1361 .iter()
1362 .filter(|path| aft::runtime_drain::watcher_path_is_semantic_source(path))
1363 .cloned()
1364 .collect::<Vec<_>>();
1365 let semantic_build_in_progress = ctx.semantic_index_rx().lock().is_some();
1366 let semantic_corpus_refresh_in_progress = semantic_corpus_refresh_in_progress(ctx);
1367 if !oversized_inline_batch
1368 && (semantic_build_in_progress || semantic_corpus_refresh_in_progress)
1369 && !semantic_source_paths.is_empty()
1370 {
1371 ctx.add_pending_semantic_index_paths(semantic_source_paths.clone());
1372 }
1373
1374 if let Ok(mut symbol_cache) = ctx.symbol_cache().write() {
1375 for path in &changed {
1376 symbol_cache.invalidate(path);
1377 }
1378 }
1379
1380 let mut graph_ref = ctx.callgraph().lock();
1382 if let Some(graph) = graph_ref.as_mut() {
1383 for path in &changed {
1384 if watcher_path_is_source(path) {
1385 graph.invalidate_file(path);
1386 }
1387 }
1388 }
1389 drop(graph_ref);
1390
1391 let mut semantic_refresh_paths = Vec::new();
1392 if !oversized_inline_batch {
1393 refresh_callgraph_store_for_watcher(ctx, &changed);
1394
1395 {
1396 let mut index_ref = ctx
1397 .search_index()
1398 .write()
1399 .unwrap_or_else(std::sync::PoisonError::into_inner);
1400 if let Some(index) = index_ref.as_mut() {
1401 for path in &changed {
1402 if path.exists() {
1403 index.update_file(path);
1404 } else {
1405 index.remove_file(path);
1406 }
1407 }
1408 }
1409 }
1410
1411 let stale_paths = {
1412 let mut semantic_index_ref = ctx
1413 .semantic_index()
1414 .write()
1415 .unwrap_or_else(std::sync::PoisonError::into_inner);
1416 let mut stale_paths = Vec::new();
1417 if let Some(index) = semantic_index_ref.as_mut() {
1418 for path in &semantic_source_paths {
1419 index.invalidate_file(path);
1420 stale_paths.push(path.clone());
1421 }
1422 }
1423 stale_paths
1424 };
1425 if !stale_paths.is_empty() {
1426 let mut status = ctx
1427 .semantic_index_status()
1428 .write()
1429 .unwrap_or_else(std::sync::PoisonError::into_inner);
1430 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
1431 for path in &stale_paths {
1432 status.add_refreshing_file(path.clone());
1433 }
1434 semantic_refresh_paths = stale_paths;
1435 status_changed = true;
1436 }
1437 }
1438 }
1439
1440 for path in &changed {
1455 if !path.exists() && ctx.lsp_clear_diagnostics_for_file(path) {
1456 status_changed = true;
1457 }
1458 }
1459
1460 if !semantic_refresh_paths.is_empty() {
1461 let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
1462 sender
1463 .send(SemanticRefreshRequest::Files {
1464 paths: semantic_refresh_paths.clone(),
1465 })
1466 .is_ok()
1467 });
1468 if !sent {
1469 aft::slog_warn!(
1470 "semantic refresh worker unavailable; dropping {} refreshing file(s)",
1471 semantic_refresh_paths.len()
1472 );
1473 let mut status = ctx
1474 .semantic_index_status()
1475 .write()
1476 .unwrap_or_else(std::sync::PoisonError::into_inner);
1477 for path in &semantic_refresh_paths {
1478 status.cancel_refreshing_file(path);
1479 }
1480 status_changed = true;
1481 }
1482 }
1483
1484 aft::slog_info!("invalidated {} files", changed.len());
1485 if status_changed {
1486 ctx.status_emitter().signal(ctx.build_status_snapshot());
1487 }
1488 ctx.tick_tier2_refresh_scheduler(scheduler_changed_path_count);
1489}
1490
1491pub fn drain_lsp_events(ctx: &AppContext) {
1492 let drained = {
1493 let mut lsp = ctx.lsp();
1494 lsp.drain_events()
1495 };
1496 let mut status_changed = drained.diagnostics_changed;
1497 for event in drained.events {
1498 match event {
1499 LspEvent::Notification {
1500 server_kind,
1501 root,
1502 method,
1503 params,
1504 } => {
1505 log::debug!(
1506 "[aft-lsp] notification {:?} {} {} {}",
1507 server_kind,
1508 root.display(),
1509 method,
1510 params.unwrap_or(serde_json::Value::Null)
1511 );
1512 }
1513 LspEvent::ServerRequest {
1514 server_kind,
1515 root,
1516 id,
1517 method,
1518 params,
1519 } => {
1520 log::debug!(
1521 "[aft-lsp] request {:?} {} {:?} {} {}",
1522 server_kind,
1523 root.display(),
1524 id,
1525 method,
1526 params.unwrap_or(serde_json::Value::Null)
1527 );
1528 }
1529 LspEvent::ServerExited { server_kind, root } => {
1530 aft::slog_info!("exited {:?} {}", server_kind, root.display());
1531 status_changed = true;
1532 }
1533 }
1534 }
1535 if status_changed {
1536 ctx.status_emitter().signal(ctx.build_status_snapshot());
1537 }
1538}