1use crate as aft;
2use crate::context::{
3 AppContext, SemanticIndexEvent, SemanticIndexStatus, SemanticRefreshEvent,
4 SemanticRefreshRequest,
5};
6use crate::log_ctx;
7use crate::lsp::client::LspEvent;
8use crate::protocol::PushFrame;
9use crate::watcher_filter::{watcher_path_is_infra_skip, WatcherDispatchEvent};
10use std::collections::HashSet;
11use std::path::Path;
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration;
15
16pub fn drain_configure_warning_events(ctx: &AppContext) {
17 for (generation, frame) in ctx.drain_configure_warnings() {
18 if ctx.configure_generation() != generation {
19 aft::slog_info!(
20 "dropping stale configure_warnings for generation {} (current {})",
21 generation,
22 ctx.configure_generation()
23 );
24 continue;
25 }
26
27 if let Some(sender) = ctx.progress_sender_handle() {
28 sender(PushFrame::ConfigureWarnings(frame));
29 }
30 }
31}
32
33pub fn drain_inspect_events(ctx: &AppContext) {
34 let drained = ctx.inspect_manager().drain_completions();
35 let reuse_completed = ctx.take_new_reuse_completions();
40 if drained > 0 || reuse_completed {
45 if let Some(project_root) = ctx.config().project_root.clone() {
46 let (dead_code, unused_exports, duplicates) = ctx
47 .inspect_manager()
48 .latest_tier2_counts(ctx.inspect_dir(), project_root);
49 let stale = ctx.inspect_manager().tier2_any_in_flight();
55 ctx.update_status_bar_tier2(dead_code, unused_exports, duplicates, None, stale);
56 ctx.status_emitter().signal(ctx.build_status_snapshot());
64 }
65 }
66}
67
68pub fn drain_build_completions(ctx: &AppContext) {
73 drain_search_index_events(ctx);
74 drain_callgraph_store_events(ctx);
75 drain_semantic_index_events(ctx);
76}
77
78pub fn any_build_in_flight(ctx: &AppContext) -> bool {
83 {
84 let rx = ctx
85 .search_index_rx()
86 .read()
87 .unwrap_or_else(std::sync::PoisonError::into_inner);
88 if rx.is_some() {
89 return true;
90 }
91 }
92
93 {
94 let rx = ctx.callgraph_store_rx().lock();
95 if rx.is_some() {
96 return true;
97 }
98 }
99
100 {
101 let rx = ctx.semantic_index_rx().lock();
102 rx.is_some()
103 }
104}
105
106pub fn watcher_path_is_ignored_by_current_matcher(ctx: &AppContext, path: &Path) -> bool {
107 if watcher_path_is_infra_skip(path) {
108 return true;
109 }
110
111 if let Some(matcher) = ctx.gitignore() {
112 if path.starts_with(matcher.path()) {
113 let is_dir = path.is_dir();
114 return matcher
115 .matched_path_or_any_parents(path, is_dir)
116 .is_ignore();
117 }
118 }
119
120 false
121}
122
123fn replay_search_index_pending_updates(
124 ctx: &AppContext,
125 index: &mut crate::search_index::SearchIndex,
126 pending_paths: Vec<std::path::PathBuf>,
127) {
128 for path in pending_paths {
129 if path.exists() {
130 if watcher_path_is_ignored_by_current_matcher(ctx, &path) {
131 index.remove_file(&path);
132 } else {
133 index.update_file(&path);
134 }
135 } else {
136 index.remove_file(&path);
137 }
138 }
139}
140
141pub fn watcher_path_is_semantic_source(path: &Path) -> bool {
142 crate::semantic_index::is_semantic_indexed_extension(path)
143}
144
145pub fn mark_semantic_corpus_refresh_success(ctx: &AppContext) {
146 ctx.clear_all_semantic_refresh_retry_attempts();
147 ctx.reset_semantic_refresh_circuit_after_success();
148}
149
150pub fn drain_search_index_events(ctx: &AppContext) {
151 let (latest, disconnected) = {
152 let rx_ref = ctx
153 .search_index_rx()
154 .read()
155 .unwrap_or_else(std::sync::PoisonError::into_inner);
156 let Some(rx) = rx_ref.as_ref() else {
157 return;
158 };
159
160 let mut latest = None;
161 let mut disconnected = false;
162 loop {
163 match rx.try_recv() {
164 Ok(index) => latest = Some(index),
165 Err(crossbeam_channel::TryRecvError::Empty) => break,
166 Err(crossbeam_channel::TryRecvError::Disconnected) => {
167 disconnected = true;
168 break;
169 }
170 }
171 }
172 (latest, disconnected)
173 };
174
175 let mut status_changed = false;
176 let mut installed_index = false;
177 if let Some(mut index) = latest {
178 let pending_paths = ctx.take_pending_search_index_paths();
179 if !pending_paths.is_empty() {
180 replay_search_index_pending_updates(ctx, &mut index, pending_paths);
181 }
182 *ctx.search_index()
183 .write()
184 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
185 installed_index = true;
186 status_changed = true;
187 }
188
189 if disconnected || installed_index {
190 *ctx.search_index_rx()
191 .write()
192 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
193 if disconnected && !installed_index {
194 let _ = ctx.take_pending_search_index_paths();
195 }
196 status_changed = true;
197 }
198
199 if status_changed {
200 ctx.status_emitter().signal(ctx.build_status_snapshot());
201 }
202}
203
204pub fn drain_callgraph_store_events(ctx: &AppContext) {
210 let (latest, disconnected) = {
211 let rx_ref = ctx.callgraph_store_rx().lock();
212 let Some(rx) = rx_ref.as_ref() else {
213 return;
214 };
215
216 let mut latest = None;
217 let mut disconnected = false;
218 loop {
219 match rx.try_recv() {
220 Ok(store) => latest = Some(store),
221 Err(crossbeam_channel::TryRecvError::Empty) => break,
222 Err(crossbeam_channel::TryRecvError::Disconnected) => {
223 disconnected = true;
224 break;
225 }
226 }
227 }
228 (latest, disconnected)
229 };
230
231 let mut status_changed = false;
232 let mut installed = false;
233 if let Some(store) = latest {
234 let pending = ctx.take_pending_callgraph_store_paths();
237 if !pending.is_empty() {
238 if let Err(error) = store.refresh_files(&pending) {
239 crate::slog_warn!(
240 "callgraph store post-build pending refresh failed: {}",
241 error
242 );
243 if let Err(mark_error) = store.mark_files_stale(&pending) {
244 crate::slog_warn!(
245 "failed to mark callgraph store files stale after post-build refresh failure: {}",
246 mark_error
247 );
248 }
249 }
250 }
251 *ctx.callgraph_store()
252 .write()
253 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::new(store));
254 installed = true;
255 status_changed = true;
256 }
257
258 if disconnected || installed {
259 *ctx.callgraph_store_rx().lock() = None;
260 if disconnected && !installed {
261 let _ = ctx.take_pending_callgraph_store_paths();
264 }
265 status_changed = true;
266 }
267
268 if status_changed {
269 ctx.status_emitter().signal(ctx.build_status_snapshot());
270 }
271}
272
273pub fn drain_semantic_index_events(ctx: &AppContext) {
274 let (events, disconnected) = {
275 let rx_ref = ctx.semantic_index_rx().lock();
276 let Some(rx) = rx_ref.as_ref() else {
277 return;
278 };
279
280 let mut events = Vec::new();
281 let mut disconnected = false;
282 loop {
283 match rx.try_recv() {
284 Ok(event) => events.push(event),
285 Err(crossbeam_channel::TryRecvError::Empty) => break,
286 Err(crossbeam_channel::TryRecvError::Disconnected) => {
287 disconnected = true;
288 break;
289 }
290 }
291 }
292 (events, disconnected)
293 };
294
295 if events.is_empty() && !disconnected {
296 return;
297 }
298
299 let mut keep_receiver = true;
300 let mut status_changed = false;
301 let mut replay_refresh_paths = Vec::new();
302 let mut replay_corpus_refresh = false;
303 for event in events {
304 match event {
305 SemanticIndexEvent::Progress {
306 stage,
307 files,
308 entries_done,
309 entries_total,
310 } => {
311 *ctx.semantic_index_status()
312 .write()
313 .unwrap_or_else(std::sync::PoisonError::into_inner) =
314 SemanticIndexStatus::Building {
315 stage,
316 files,
317 entries_done,
318 entries_total,
319 };
320 status_changed = true;
327 }
328 SemanticIndexEvent::ColdSeedGateCleared => {
329 ctx.resume_deferred_work_after_semantic_cold_seed_gate_cleared();
330 }
331 SemanticIndexEvent::Ready(mut index) => {
332 mark_semantic_corpus_refresh_success(ctx);
333 let pending_paths = ctx.take_pending_semantic_index_paths();
334 for path in pending_paths {
335 if watcher_path_is_semantic_source(&path) {
336 index.invalidate_file(&path);
337 replay_refresh_paths.push(path);
338 }
339 }
340 replay_corpus_refresh = ctx.take_pending_semantic_corpus_refresh();
341 *ctx.semantic_index()
342 .write()
343 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
344 *ctx.semantic_index_status()
345 .write()
346 .unwrap_or_else(std::sync::PoisonError::into_inner) =
347 SemanticIndexStatus::ready();
348 keep_receiver = false;
349 status_changed = true;
350 ctx.clear_semantic_cold_seed_gate_and_resume_deferred_work();
351 }
352 SemanticIndexEvent::Failed(error) => {
353 let _ = ctx.take_pending_semantic_index_paths();
354 let _ = ctx.take_pending_semantic_corpus_refresh();
355 *ctx.semantic_index()
356 .write()
357 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
358 ctx.clear_semantic_refresh_worker();
359 *ctx.semantic_index_status()
360 .write()
361 .unwrap_or_else(std::sync::PoisonError::into_inner) =
362 SemanticIndexStatus::Failed(error);
363 keep_receiver = false;
364 status_changed = true;
365 ctx.clear_semantic_cold_seed_gate_and_resume_deferred_work();
366 }
367 }
368 }
369
370 if disconnected && keep_receiver {
371 let _ = ctx.take_pending_semantic_index_paths();
372 let _ = ctx.take_pending_semantic_corpus_refresh();
373 *ctx.semantic_index()
374 .write()
375 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
376 ctx.clear_semantic_refresh_worker();
377 *ctx.semantic_index_status()
378 .write()
379 .unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::Failed(
380 "semantic index build worker disconnected before reporting completion".to_string(),
381 );
382 keep_receiver = false;
383 status_changed = true;
384 ctx.clear_semantic_cold_seed_gate_and_resume_deferred_work();
385 }
386
387 if !keep_receiver {
388 *ctx.semantic_index_rx().lock() = None;
389 }
390
391 if replay_corpus_refresh {
392 if ctx.canonical_cache_root_opt().is_some() {
393 *ctx.semantic_index_status()
394 .write()
395 .unwrap_or_else(std::sync::PoisonError::into_inner) =
396 SemanticIndexStatus::Building {
397 stage: "refreshing_corpus".to_string(),
398 files: None,
399 entries_done: None,
400 entries_total: None,
401 };
402 let sent = ctx
403 .semantic_refresh_sender()
404 .is_some_and(|sender| sender.send(SemanticRefreshRequest::Corpus).is_ok());
405 if !sent {
406 *ctx.semantic_index_status()
407 .write()
408 .unwrap_or_else(std::sync::PoisonError::into_inner) =
409 SemanticIndexStatus::Failed(
410 "semantic corpus refresh worker unavailable".to_string(),
411 );
412 }
413 status_changed = true;
414 }
415 } else if !replay_refresh_paths.is_empty() {
416 {
417 let mut status = ctx
418 .semantic_index_status()
419 .write()
420 .unwrap_or_else(std::sync::PoisonError::into_inner);
421 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
422 for path in &replay_refresh_paths {
423 status.add_refreshing_file(path.clone());
424 }
425 status_changed = true;
426 }
427 }
428 let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
429 sender
430 .send(SemanticRefreshRequest::Files {
431 paths: replay_refresh_paths.clone(),
432 })
433 .is_ok()
434 });
435 if !sent {
436 crate::slog_warn!(
437 "semantic refresh worker unavailable; dropping {} replayed file(s)",
438 replay_refresh_paths.len()
439 );
440 let mut status = ctx
441 .semantic_index_status()
442 .write()
443 .unwrap_or_else(std::sync::PoisonError::into_inner);
444 for path in &replay_refresh_paths {
445 status.cancel_refreshing_file(path);
446 }
447 status_changed = true;
448 }
449 }
450
451 if status_changed {
452 ctx.status_emitter().signal(ctx.build_status_snapshot());
453 }
454}
455
456pub const MAX_RETRY_ATTEMPTS: usize = 6;
457pub const BREAKER_TRIP_THRESHOLD: usize = 3;
458
459fn semantic_refresh_retry_backoff(attempt: usize) -> Duration {
464 if let Ok(raw) = std::env::var("AFT_SEMANTIC_RETRY_BACKOFF_MS") {
466 if let Ok(ms) = raw.parse::<u64>() {
467 return Duration::from_millis(ms);
468 }
469 }
470 const SCHEDULE_SECS: [u64; 3] = [15, 30, 60];
471 let secs = SCHEDULE_SECS
472 .get(attempt)
473 .copied()
474 .unwrap_or(*SCHEDULE_SECS.last().unwrap());
475 Duration::from_secs(secs)
476}
477
478struct SemanticRefreshRetryPlan {
479 retry_paths: Vec<std::path::PathBuf>,
480 capped_paths: Vec<std::path::PathBuf>,
481 delay: Option<Duration>,
482}
483
484fn next_semantic_refresh_retry_plan(
485 ctx: &AppContext,
486 paths: Vec<std::path::PathBuf>,
487) -> SemanticRefreshRetryPlan {
488 let mut retry_paths = Vec::new();
489 let mut capped_paths = Vec::new();
490 let mut max_attempt = 0usize;
491
492 ctx.with_semantic_refresh_retry_attempts_mut(|attempts| {
493 for path in paths {
494 let attempt = attempts.get(&path).copied().unwrap_or(0);
495 if attempt >= MAX_RETRY_ATTEMPTS {
496 capped_paths.push(path);
497 continue;
498 }
499 max_attempt = max_attempt.max(attempt);
500 attempts.insert(path.clone(), attempt.saturating_add(1));
501 retry_paths.push(path);
502 }
503 });
504
505 let delay = if retry_paths.is_empty() {
506 None
507 } else {
508 Some(semantic_refresh_retry_backoff(max_attempt))
509 };
510
511 SemanticRefreshRetryPlan {
512 retry_paths,
513 capped_paths,
514 delay,
515 }
516}
517
518fn clear_semantic_refresh_retry_attempts(ctx: &AppContext, paths: &[std::path::PathBuf]) {
519 ctx.clear_semantic_refresh_retry_attempts(paths);
520}
521
522fn clear_completed_pending_semantic_index_paths(
523 ctx: &AppContext,
524 completed_paths: &[std::path::PathBuf],
525) {
526 if completed_paths.is_empty() {
527 return;
528 }
529
530 let completed = completed_paths.iter().cloned().collect::<HashSet<_>>();
531 let remaining = ctx
532 .take_pending_semantic_index_paths()
533 .into_iter()
534 .filter(|path| !completed.contains(path))
535 .collect::<Vec<_>>();
536 if !remaining.is_empty() {
537 ctx.add_pending_semantic_index_paths(remaining);
538 }
539}
540
541fn semantic_refresh_probe_delay() -> Duration {
542 semantic_refresh_retry_backoff(usize::MAX)
543}
544
545pub fn semantic_refresh_circuit_is_open(ctx: &AppContext) -> bool {
546 ctx.semantic_refresh_circuit_is_open()
547}
548
549pub fn record_semantic_refresh_transient_failure(ctx: &AppContext) -> bool {
550 ctx.record_semantic_refresh_transient_failure(BREAKER_TRIP_THRESHOLD)
551}
552
553fn reset_semantic_refresh_transient_failure_count(ctx: &AppContext) {
554 ctx.reset_semantic_refresh_transient_failure_count();
555}
556
557fn reset_semantic_refresh_circuit_after_success(ctx: &AppContext) {
558 ctx.reset_semantic_refresh_circuit_after_success();
559}
560
561fn mark_semantic_refresh_success(ctx: &AppContext, completed_paths: &[std::path::PathBuf]) {
562 clear_semantic_refresh_retry_attempts(ctx, completed_paths);
563 clear_completed_pending_semantic_index_paths(ctx, completed_paths);
564 reset_semantic_refresh_circuit_after_success(ctx);
565}
566
567#[doc(hidden)]
568pub fn semantic_refresh_transient_failure_count_for_test(ctx: &AppContext) -> usize {
569 ctx.semantic_refresh_transient_failure_count()
570}
571
572#[doc(hidden)]
573pub fn semantic_refresh_probe_is_scheduled_for_test(ctx: &AppContext) -> bool {
574 ctx.semantic_refresh_probe_is_scheduled()
575}
576
577fn ensure_semantic_refresh_probe_scheduled(ctx: &AppContext) {
578 ctx.ensure_semantic_refresh_probe_scheduled(semantic_refresh_probe_delay());
579}
580
581fn maybe_fire_semantic_refresh_probe(ctx: &AppContext) {
582 if !ctx.take_semantic_refresh_probe_ready() {
583 return;
584 }
585 if !semantic_refresh_circuit_is_open(ctx) {
586 return;
587 }
588
589 let pending_paths = ctx.take_pending_semantic_index_paths();
590 if pending_paths.is_empty() {
591 return;
592 }
593
594 let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
595 sender
596 .send(SemanticRefreshRequest::Files {
597 paths: pending_paths.clone(),
598 })
599 .is_ok()
600 });
601 if !sent {
602 ctx.add_pending_semantic_index_paths(pending_paths);
603 }
604}
605
606pub fn schedule_semantic_refresh_retry(
607 ctx: &AppContext,
608 paths: Vec<std::path::PathBuf>,
609 error: &str,
610) -> bool {
611 if paths.is_empty() {
612 return false;
613 }
614 let Some(sender) = ctx.semantic_refresh_sender() else {
615 return false;
616 };
617
618 let SemanticRefreshRetryPlan {
619 retry_paths,
620 capped_paths,
621 delay,
622 } = next_semantic_refresh_retry_plan(ctx, paths);
623
624 if !capped_paths.is_empty() {
625 aft::slog_warn!(
626 "semantic refresh retry limit reached for {} file(s); preserving for next watcher/configure refresh",
627 capped_paths.len(),
628 );
629 ctx.add_pending_semantic_index_paths(capped_paths);
630 }
631
632 let Some(delay) = delay else {
633 return true;
634 };
635
636 let clean = aft::semantic_index::strip_transient_embedding_marker(error);
637 aft::slog_warn!(
638 "semantic refresh hit a transient backend error ({}); retrying {} file(s) in {}ms",
639 clean,
640 retry_paths.len(),
641 delay.as_millis(),
642 );
643
644 let session_id = log_ctx::current_session();
645 thread::spawn(move || {
646 log_ctx::with_session(session_id, || {
647 thread::sleep(delay);
648 let _ = sender.send(SemanticRefreshRequest::Files { paths: retry_paths });
649 });
650 });
651 true
652}
653
654pub fn drain_semantic_refresh_events(ctx: &AppContext) {
655 let (events, disconnected) = {
656 let rx_ref = ctx.semantic_refresh_event_rx().lock();
657 let Some(rx) = rx_ref.as_ref() else {
658 return;
659 };
660
661 let mut events = Vec::new();
662 let mut disconnected = false;
663 loop {
664 match rx.try_recv() {
665 Ok(event) => events.push(event),
666 Err(crossbeam_channel::TryRecvError::Empty) => break,
667 Err(crossbeam_channel::TryRecvError::Disconnected) => {
668 disconnected = true;
669 break;
670 }
671 }
672 }
673 (events, disconnected)
674 };
675
676 if events.is_empty() && !disconnected {
677 maybe_fire_semantic_refresh_probe(ctx);
678 return;
679 }
680
681 let had_events = !events.is_empty();
682 let mut status_changed = false;
683 let mut replay_refresh_paths = Vec::new();
684 for event in events {
685 match event {
686 SemanticRefreshEvent::Started { paths } => {
687 let mut status = ctx
688 .semantic_index_status()
689 .write()
690 .unwrap_or_else(std::sync::PoisonError::into_inner);
691 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
692 for path in paths {
693 status.start_refreshing_file(path);
694 }
695 status_changed = true;
696 }
697 }
698 SemanticRefreshEvent::CorpusStarted { files } => {
699 *ctx.semantic_index_status()
700 .write()
701 .unwrap_or_else(std::sync::PoisonError::into_inner) =
702 SemanticIndexStatus::Building {
703 stage: "refreshing_corpus".to_string(),
704 files: Some(files),
705 entries_done: None,
706 entries_total: None,
707 };
708 status_changed = true;
709 }
710 SemanticRefreshEvent::Completed {
711 added_entries,
712 updated_metadata,
713 completed_paths,
714 } => {
715 if let Some(index) = ctx
716 .semantic_index()
717 .write()
718 .unwrap_or_else(std::sync::PoisonError::into_inner)
719 .as_mut()
720 {
721 index.apply_refresh_update(added_entries, updated_metadata, &completed_paths);
722 }
723 mark_semantic_refresh_success(ctx, &completed_paths);
724 let mut status = ctx
725 .semantic_index_status()
726 .write()
727 .unwrap_or_else(std::sync::PoisonError::into_inner);
728 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
729 for path in &completed_paths {
730 status.complete_refreshing_file(path);
731 }
732 status_changed = true;
733 }
734 }
735 SemanticRefreshEvent::CorpusCompleted {
736 mut index,
737 changed,
738 added,
739 deleted,
740 total_processed,
741 } => {
742 aft::runtime_drain::mark_semantic_corpus_refresh_success(ctx);
743 if changed > 0 || added > 0 || deleted > 0 {
744 aft::slog_info!(
745 "semantic corpus refresh completed: {} changed, {} new, {} deleted, {} total processed",
746 changed,
747 added,
748 deleted,
749 total_processed
750 );
751 }
752 let pending_paths = ctx.take_pending_semantic_index_paths();
753 for path in pending_paths {
754 if !aft::runtime_drain::watcher_path_is_semantic_source(&path) {
755 continue;
756 }
757 index.invalidate_file(&path);
758 if !aft::runtime_drain::watcher_path_is_ignored_by_current_matcher(ctx, &path) {
759 replay_refresh_paths.push(path);
760 }
761 }
762 *ctx.semantic_index()
763 .write()
764 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
765 *ctx.semantic_index_status()
766 .write()
767 .unwrap_or_else(std::sync::PoisonError::into_inner) =
768 SemanticIndexStatus::ready();
769 status_changed = true;
770 }
771 SemanticRefreshEvent::Failed { paths, error } => {
772 if aft::semantic_index::embedding_failure_is_transient(&error) {
773 if record_semantic_refresh_transient_failure(ctx) {
774 ctx.add_pending_semantic_index_paths(paths);
775 ensure_semantic_refresh_probe_scheduled(ctx);
776 } else if !schedule_semantic_refresh_retry(ctx, paths.clone(), &error) {
777 aft::slog_warn!(
778 "semantic refresh worker unavailable; preserving {} transiently failed file(s) for retry",
779 paths.len(),
780 );
781 ctx.add_pending_semantic_index_paths(paths);
782 }
783 } else {
784 aft::slog_warn!("semantic refresh failed: {}", error);
785 reset_semantic_refresh_transient_failure_count(ctx);
786 clear_semantic_refresh_retry_attempts(ctx, &paths);
787 let mut status = ctx
788 .semantic_index_status()
789 .write()
790 .unwrap_or_else(std::sync::PoisonError::into_inner);
791 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
792 for path in &paths {
793 status.complete_refreshing_file(path);
794 }
795 status_changed = true;
796 }
797 }
798 }
799 SemanticRefreshEvent::CorpusFailed { error } => {
800 if aft::semantic_index::embedding_failure_is_transient(&error) {
809 let clean = aft::semantic_index::strip_transient_embedding_marker(&error);
810 let has_index = ctx
811 .semantic_index()
812 .read()
813 .unwrap_or_else(std::sync::PoisonError::into_inner)
814 .is_some();
815 if has_index {
816 aft::slog_warn!(
817 "semantic corpus refresh hit a transient backend error ({}); keeping the existing index",
818 clean,
819 );
820 *ctx.semantic_index_status()
821 .write()
822 .unwrap_or_else(std::sync::PoisonError::into_inner) =
823 SemanticIndexStatus::ready();
824 } else {
825 aft::slog_warn!("semantic corpus refresh failed: {}", clean);
827 *ctx.semantic_index_status()
828 .write()
829 .unwrap_or_else(std::sync::PoisonError::into_inner) =
830 SemanticIndexStatus::Failed(clean);
831 }
832 status_changed = true;
833 } else {
834 aft::slog_warn!("semantic corpus refresh failed: {}", error);
835 let _ = ctx.take_pending_semantic_index_paths();
836 *ctx.semantic_index()
837 .write()
838 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
839 *ctx.semantic_index_status()
840 .write()
841 .unwrap_or_else(std::sync::PoisonError::into_inner) =
842 SemanticIndexStatus::Failed(error);
843 status_changed = true;
844 }
845 }
846 }
847 }
848
849 if disconnected {
850 ctx.clear_semantic_refresh_worker();
851 let refreshing_paths = {
852 let status = ctx
853 .semantic_index_status()
854 .read()
855 .unwrap_or_else(std::sync::PoisonError::into_inner);
856 match &*status {
857 SemanticIndexStatus::Ready { refreshing, .. } => refreshing.clone(),
858 _ => Vec::new(),
859 }
860 };
861 if !refreshing_paths.is_empty() {
862 let mut status = ctx
863 .semantic_index_status()
864 .write()
865 .unwrap_or_else(std::sync::PoisonError::into_inner);
866 for path in &refreshing_paths {
867 status.cancel_refreshing_file(path);
868 }
869 }
870 if !refreshing_paths.is_empty() || had_events {
871 status_changed = true;
872 }
873 }
874
875 if !replay_refresh_paths.is_empty() {
876 {
877 let mut status = ctx
878 .semantic_index_status()
879 .write()
880 .unwrap_or_else(std::sync::PoisonError::into_inner);
881 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
882 for path in &replay_refresh_paths {
883 status.add_refreshing_file(path.clone());
884 }
885 status_changed = true;
886 }
887 }
888 let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
889 sender
890 .send(SemanticRefreshRequest::Files {
891 paths: replay_refresh_paths.clone(),
892 })
893 .is_ok()
894 });
895 if !sent {
896 aft::slog_warn!(
897 "semantic refresh worker unavailable; dropping {} replayed corpus file(s)",
898 replay_refresh_paths.len()
899 );
900 let mut status = ctx
901 .semantic_index_status()
902 .write()
903 .unwrap_or_else(std::sync::PoisonError::into_inner);
904 for path in &replay_refresh_paths {
905 status.cancel_refreshing_file(path);
906 }
907 status_changed = true;
908 }
909 }
910
911 maybe_fire_semantic_refresh_probe(ctx);
912
913 if status_changed {
914 ctx.status_emitter().signal(ctx.build_status_snapshot());
915 }
916}
917
918const SOURCE_EXTENSIONS: &[&str] = &[
920 "ts", "tsx", "mts", "cts", "js", "jsx", "mjs", "cjs", "py", "pyi", "rs", "go",
921];
922
923pub const WATCHER_BATCH_INLINE_CAP: usize = 256;
924
925pub fn watcher_path_is_tsconfig(path: &std::path::Path) -> bool {
933 path.file_name()
934 .and_then(|n| n.to_str())
935 .map(|n| {
936 n == "tsconfig.json"
937 || n == "jsconfig.json"
938 || ((n.starts_with("tsconfig.") || n.starts_with("jsconfig."))
939 && n.ends_with(".json"))
940 })
941 .unwrap_or(false)
942}
943
944pub fn watcher_path_is_source(path: &std::path::Path) -> bool {
945 path.extension()
946 .and_then(|ext| ext.to_str())
947 .is_some_and(|ext| SOURCE_EXTENSIONS.contains(&ext))
948}
949
950pub fn watcher_path_is_callgraph_indexed(path: &std::path::Path) -> bool {
958 aft::parser::detect_language(path).is_some()
959}
960
961pub fn semantic_corpus_refresh_in_progress(ctx: &AppContext) -> bool {
962 let status = ctx
963 .semantic_index_status()
964 .read()
965 .unwrap_or_else(std::sync::PoisonError::into_inner);
966 matches!(
967 &*status,
968 SemanticIndexStatus::Building { stage, .. } if stage == "refreshing_corpus"
969 )
970}
971
972#[cfg(debug_assertions)]
973pub fn delay_search_rebuild_publish_for_debug() {
974 let Some(delay_ms) = std::env::var("AFT_TEST_SEARCH_REBUILD_PUBLISH_DELAY_MS")
975 .ok()
976 .and_then(|raw| raw.parse::<u64>().ok())
977 else {
978 return;
979 };
980 thread::sleep(Duration::from_millis(delay_ms));
981}
982
983#[cfg(not(debug_assertions))]
984pub fn delay_search_rebuild_publish_for_debug() {}
985
986pub fn spawn_search_corpus_refresh(
987 ctx: &AppContext,
988 root: std::path::PathBuf,
989 config: Arc<aft::config::Config>,
990) {
991 {
992 let mut search_index = ctx
993 .search_index()
994 .write()
995 .unwrap_or_else(std::sync::PoisonError::into_inner);
996 if let Some(index) = search_index.as_mut() {
997 index.ready = false;
998 }
999 }
1000
1001 let (tx, rx): (
1002 crossbeam_channel::Sender<aft::search_index::SearchIndex>,
1003 crossbeam_channel::Receiver<aft::search_index::SearchIndex>,
1004 ) = crossbeam_channel::unbounded();
1005 *ctx.search_index_rx()
1006 .write()
1007 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(rx);
1008 ctx.reset_symbol_cache();
1009
1010 let is_worktree_bridge = ctx.is_worktree_bridge();
1011 let session_id = log_ctx::current_session();
1012 thread::spawn(move || {
1013 log_ctx::with_session(session_id, || {
1014 let cache_dir =
1015 aft::search_index::resolve_cache_dir(&root, config.storage_dir.as_deref());
1016 let _cache_lock = if is_worktree_bridge {
1017 None
1018 } else {
1019 match aft::search_index::CacheLock::acquire(&cache_dir) {
1020 Ok(lock) => Some(lock),
1021 Err(error) => {
1022 aft::slog_warn!(
1023 "failed to acquire search cache lock for ignore refresh: {}",
1024 error
1025 );
1026 None
1027 }
1028 }
1029 };
1030 let mut index = aft::search_index::SearchIndex::build_with_limit_to_cache_dir(
1031 &root,
1032 config.search_index_max_file_size,
1033 &cache_dir,
1034 );
1035 delay_search_rebuild_publish_for_debug();
1036 if !is_worktree_bridge {
1037 let head = index.stored_git_head().map(str::to_owned);
1038 index.write_to_disk(&cache_dir, head.as_deref());
1039 }
1040 let _ = tx.send(index);
1041 });
1042 });
1043}
1044
1045pub fn refresh_project_corpus(
1046 ctx: &AppContext,
1047 reason: &str,
1048 _invalidate_ignore_paths: bool,
1049) -> bool {
1050 let Some(root) = ctx.canonical_cache_root_opt() else {
1051 return false;
1052 };
1053 let config = ctx.config();
1054 let mut status_changed = false;
1055
1056 if !ctx.is_worktree_bridge() {
1057 let callgraph_store_resident = {
1079 let guard = ctx
1080 .callgraph_store()
1081 .read()
1082 .unwrap_or_else(std::sync::PoisonError::into_inner);
1083 guard.is_some()
1084 };
1085 if callgraph_store_resident || ctx.callgraph_store_rx().lock().is_some() {
1086 *ctx.callgraph_store()
1087 .write()
1088 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
1089 ctx.mark_callgraph_store_force_rebuild();
1090 status_changed = true;
1091 aft::slog_info!(
1092 "callgraph store scheduled for background rebuild after {}",
1093 reason
1094 );
1095 }
1096 }
1097
1098 if config.search_index {
1099 spawn_search_corpus_refresh(ctx, root.clone(), config.clone());
1100 status_changed = true;
1101 aft::slog_info!("started search index refresh after {}", reason);
1102 }
1103
1104 if config.semantic_search {
1105 if let Some(sender) = ctx.semantic_refresh_sender() {
1106 *ctx.semantic_index_status()
1107 .write()
1108 .unwrap_or_else(std::sync::PoisonError::into_inner) =
1109 SemanticIndexStatus::Building {
1110 stage: "refreshing_corpus".to_string(),
1111 files: None,
1112 entries_done: None,
1113 entries_total: None,
1114 };
1115 match sender.send(SemanticRefreshRequest::Corpus) {
1116 Ok(()) => {
1117 status_changed = true;
1118 }
1119 Err(error) => {
1120 *ctx.semantic_index_status()
1121 .write()
1122 .unwrap_or_else(std::sync::PoisonError::into_inner) =
1123 SemanticIndexStatus::Failed(format!(
1124 "semantic corpus refresh worker unavailable: {error}"
1125 ));
1126 status_changed = true;
1127 }
1128 }
1129 } else if ctx.semantic_index_rx().lock().is_some() {
1130 ctx.mark_pending_semantic_corpus_refresh();
1131 }
1132 }
1133
1134 status_changed
1135}
1136
1137pub fn refresh_corpus_after_ignore_change(ctx: &AppContext) -> bool {
1138 refresh_project_corpus(ctx, "ignore-rule change", true)
1139}
1140
1141pub fn refresh_project_after_watcher_rescan(ctx: &AppContext) -> bool {
1142 if ctx.canonical_cache_root_opt().is_none() {
1143 return false;
1144 }
1145 ctx.clear_pending_index_updates();
1146 ctx.reset_symbol_cache();
1147 let _ = ctx.mark_status_bar_tier2_stale();
1148 ctx.clear_tsconfig_membership_cache();
1149 let mut status_changed = true;
1150
1151 status_changed |= refresh_project_corpus(ctx, "watcher overflow", false);
1152 status_changed
1153}
1154
1155pub fn refresh_callgraph_store_for_watcher(
1156 ctx: &AppContext,
1157 changed: &HashSet<std::path::PathBuf>,
1158) {
1159 if ctx.is_worktree_bridge() {
1160 return;
1161 }
1162 let source_paths = changed
1163 .iter()
1164 .filter(|path| watcher_path_is_callgraph_indexed(path))
1165 .cloned()
1166 .collect::<Vec<_>>();
1167 if source_paths.is_empty() {
1168 return;
1169 }
1170 ctx.revalidate_callgraph_store_generation();
1175 let store = {
1176 let guard = ctx
1177 .callgraph_store()
1178 .read()
1179 .unwrap_or_else(std::sync::PoisonError::into_inner);
1180 guard.as_ref().map(Arc::clone)
1181 };
1182 let Some(store) = store else {
1183 if ctx.callgraph_store_rx().lock().is_some() {
1188 ctx.add_pending_callgraph_store_paths(source_paths);
1189 }
1190 return;
1191 };
1192 if let Err(error) = store.refresh_files(&source_paths) {
1193 aft::slog_warn!("callgraph store refresh failed: {}", error);
1194 match store.mark_files_stale(&source_paths) {
1195 Ok(marked) => aft::slog_warn!(
1196 "marked {} callgraph store file(s) stale after refresh failure",
1197 marked.len()
1198 ),
1199 Err(mark_error) => aft::slog_warn!(
1200 "failed to mark callgraph store files stale after refresh failure: {}",
1201 mark_error
1202 ),
1203 }
1204 }
1205}
1206
1207pub fn drain_watcher_events(ctx: &AppContext) {
1213 let mut changed: HashSet<std::path::PathBuf> = HashSet::new();
1214 let mut ignore_file_changed = false;
1215 let mut rescan_required = false;
1216 let mut watcher_failed = None;
1217 let mut root_deleted = false;
1218
1219 {
1220 let rx_ref = ctx.watcher_rx().lock();
1221 let rx = match rx_ref.as_ref() {
1222 Some(rx) => rx,
1223 None => {
1224 ctx.tick_tier2_refresh_scheduler(0);
1225 return; }
1227 };
1228
1229 loop {
1230 match rx.try_recv() {
1231 Ok(WatcherDispatchEvent::Paths(paths)) => {
1232 if !rescan_required {
1233 changed.extend(paths);
1234 }
1235 }
1236 Ok(WatcherDispatchEvent::RescanRequired) => {
1237 rescan_required = true;
1238 changed.clear();
1239 }
1240 Ok(WatcherDispatchEvent::IgnoreRulesChanged { path }) => {
1241 ignore_file_changed = true;
1242 log::debug!(
1243 "watcher: ignore rules changed at {}, rebuilding matcher",
1244 path.display()
1245 );
1246 if !rescan_required {
1247 ctx.rebuild_gitignore();
1248 }
1249 }
1250 Ok(WatcherDispatchEvent::RootDeleted) => {
1251 root_deleted = true;
1252 break;
1253 }
1254 Ok(WatcherDispatchEvent::Error(error)) => {
1255 watcher_failed = Some(error);
1256 break;
1257 }
1258 Err(crossbeam_channel::TryRecvError::Empty) => break,
1259 Err(crossbeam_channel::TryRecvError::Disconnected) => {
1260 watcher_failed = Some("watcher channel disconnected".to_string());
1261 break;
1262 }
1263 }
1264 }
1265 }
1266
1267 let mut watcher_status_changed = false;
1268 if root_deleted {
1269 ctx.stop_watcher_runtime();
1270 let _ = ctx.add_degraded_reason("project_root_deleted".to_string());
1271 aft::slog_warn!(
1272 "project root deleted; dropping watcher to avoid delete-storm: {:?}",
1273 ctx.canonical_cache_root_opt()
1274 );
1275 watcher_status_changed = true;
1276 changed.clear();
1277 rescan_required = false;
1278 } else if let Some(error) = watcher_failed {
1279 ctx.stop_watcher_runtime();
1280 let _ = ctx.add_degraded_reason("watcher_unavailable".to_string());
1281 aft::slog_warn!(
1282 "file watcher unavailable; continuing without live external-change invalidation: {}",
1283 error
1284 );
1285 watcher_status_changed = true;
1286 rescan_required = false;
1287 }
1288
1289 let mut status_changed = watcher_status_changed;
1290 let mut project_corpus_refresh_requested = false;
1291 if rescan_required {
1292 aft::slog_warn!("watcher overflow: forcing project rescan");
1293 ctx.rebuild_gitignore();
1294 status_changed |= refresh_project_after_watcher_rescan(ctx);
1295 project_corpus_refresh_requested = true;
1296 changed.clear();
1297 } else if ignore_file_changed {
1298 status_changed |= refresh_corpus_after_ignore_change(ctx);
1299 project_corpus_refresh_requested = true;
1300 }
1301
1302 let scheduler_changed_path_count = if rescan_required {
1303 aft::inspect::tier2_scheduler::TIER2_REFRESH_STORM_PATH_THRESHOLD + 1
1304 } else if ignore_file_changed {
1305 changed.len().max(1)
1306 } else {
1307 changed.len()
1308 };
1309 if changed.is_empty() {
1310 if status_changed {
1311 ctx.status_emitter().signal(ctx.build_status_snapshot());
1312 }
1313 ctx.tick_tier2_refresh_scheduler(scheduler_changed_path_count);
1314 return;
1315 }
1316
1317 ctx.add_pending_tier2_paths(changed.iter().cloned());
1318
1319 if ctx.mark_status_bar_tier2_stale() {
1323 status_changed = true;
1324 }
1325
1326 if changed.iter().any(|path| watcher_path_is_tsconfig(path)) {
1331 ctx.clear_tsconfig_membership_cache();
1332 status_changed = true;
1333 }
1334
1335 let oversized_inline_batch = changed.len() > WATCHER_BATCH_INLINE_CAP;
1336 if oversized_inline_batch {
1337 aft::slog_warn!(
1338 "watcher batch of {} paths exceeds inline cap {}; scheduling corpus refresh",
1339 changed.len(),
1340 WATCHER_BATCH_INLINE_CAP
1341 );
1342 if !project_corpus_refresh_requested {
1343 status_changed |= refresh_project_corpus(ctx, "oversized watcher batch", false);
1344 }
1345 }
1346
1347 let search_build_in_progress = {
1348 let search_index_rx = ctx
1349 .search_index_rx()
1350 .read()
1351 .unwrap_or_else(std::sync::PoisonError::into_inner);
1352 search_index_rx.is_some()
1353 };
1354 if !oversized_inline_batch && search_build_in_progress {
1355 ctx.add_pending_search_index_paths(changed.iter().cloned());
1356 }
1357 let semantic_source_paths = changed
1358 .iter()
1359 .filter(|path| aft::runtime_drain::watcher_path_is_semantic_source(path))
1360 .cloned()
1361 .collect::<Vec<_>>();
1362 let semantic_build_in_progress = ctx.semantic_index_rx().lock().is_some();
1363 let semantic_corpus_refresh_in_progress = semantic_corpus_refresh_in_progress(ctx);
1364 if !oversized_inline_batch
1365 && (semantic_build_in_progress || semantic_corpus_refresh_in_progress)
1366 && !semantic_source_paths.is_empty()
1367 {
1368 ctx.add_pending_semantic_index_paths(semantic_source_paths.clone());
1369 }
1370
1371 if let Ok(mut symbol_cache) = ctx.symbol_cache().write() {
1372 for path in &changed {
1373 symbol_cache.invalidate(path);
1374 }
1375 }
1376
1377 let mut semantic_refresh_paths = Vec::new();
1378 if !oversized_inline_batch {
1379 refresh_callgraph_store_for_watcher(ctx, &changed);
1380
1381 {
1382 let mut index_ref = ctx
1383 .search_index()
1384 .write()
1385 .unwrap_or_else(std::sync::PoisonError::into_inner);
1386 if let Some(index) = index_ref.as_mut() {
1387 for path in &changed {
1388 if path.exists() {
1389 index.update_file(path);
1390 } else {
1391 index.remove_file(path);
1392 }
1393 }
1394 }
1395 }
1396
1397 let stale_paths = {
1398 let mut semantic_index_ref = ctx
1399 .semantic_index()
1400 .write()
1401 .unwrap_or_else(std::sync::PoisonError::into_inner);
1402 let mut stale_paths = Vec::new();
1403 if let Some(index) = semantic_index_ref.as_mut() {
1404 for path in &semantic_source_paths {
1405 index.invalidate_file(path);
1406 stale_paths.push(path.clone());
1407 }
1408 }
1409 stale_paths
1410 };
1411 if !stale_paths.is_empty() {
1412 let mut status = ctx
1413 .semantic_index_status()
1414 .write()
1415 .unwrap_or_else(std::sync::PoisonError::into_inner);
1416 if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
1417 for path in &stale_paths {
1418 status.add_refreshing_file(path.clone());
1419 }
1420 semantic_refresh_paths = stale_paths;
1421 status_changed = true;
1422 }
1423 }
1424 }
1425
1426 for path in &changed {
1441 if !path.exists() && ctx.lsp_clear_diagnostics_for_file(path) {
1442 status_changed = true;
1443 }
1444 }
1445
1446 if !semantic_refresh_paths.is_empty() {
1447 let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
1448 sender
1449 .send(SemanticRefreshRequest::Files {
1450 paths: semantic_refresh_paths.clone(),
1451 })
1452 .is_ok()
1453 });
1454 if !sent {
1455 aft::slog_warn!(
1456 "semantic refresh worker unavailable; dropping {} refreshing file(s)",
1457 semantic_refresh_paths.len()
1458 );
1459 let mut status = ctx
1460 .semantic_index_status()
1461 .write()
1462 .unwrap_or_else(std::sync::PoisonError::into_inner);
1463 for path in &semantic_refresh_paths {
1464 status.cancel_refreshing_file(path);
1465 }
1466 status_changed = true;
1467 }
1468 }
1469
1470 aft::slog_info!("invalidated {} files", changed.len());
1471 if status_changed {
1472 ctx.status_emitter().signal(ctx.build_status_snapshot());
1473 }
1474 ctx.tick_tier2_refresh_scheduler(scheduler_changed_path_count);
1475}
1476
1477pub fn drain_lsp_events(ctx: &AppContext) {
1478 let drained = {
1479 let mut lsp = ctx.lsp();
1480 lsp.drain_events()
1481 };
1482 let mut status_changed = drained.diagnostics_changed;
1483 for event in drained.events {
1484 match event {
1485 LspEvent::Notification {
1486 server_kind,
1487 root,
1488 method,
1489 params,
1490 } => {
1491 log::debug!(
1492 "[aft-lsp] notification {:?} {} {} {}",
1493 server_kind,
1494 root.display(),
1495 method,
1496 params.unwrap_or(serde_json::Value::Null)
1497 );
1498 }
1499 LspEvent::ServerRequest {
1500 server_kind,
1501 root,
1502 id,
1503 method,
1504 params,
1505 } => {
1506 log::debug!(
1507 "[aft-lsp] request {:?} {} {:?} {} {}",
1508 server_kind,
1509 root.display(),
1510 id,
1511 method,
1512 params.unwrap_or(serde_json::Value::Null)
1513 );
1514 }
1515 LspEvent::ServerExited { server_kind, root } => {
1516 aft::slog_info!("exited {:?} {}", server_kind, root.display());
1517 status_changed = true;
1518 }
1519 }
1520 }
1521 if status_changed {
1522 ctx.status_emitter().signal(ctx.build_status_snapshot());
1523 }
1524}