Skip to main content

aft/
runtime_drain.rs

1use crate as aft;
2use crate::context::{
3    AppContext, SemanticIndexEvent, SemanticIndexStatus, SemanticRefreshEvent,
4    SemanticRefreshRequest,
5};
6use crate::log_ctx;
7use crate::lsp::client::LspEvent;
8use crate::protocol::PushFrame;
9use crate::watcher_filter::{watcher_path_is_infra_skip, WatcherDispatchEvent};
10use std::collections::HashSet;
11use std::path::Path;
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration;
15
16pub fn drain_configure_warning_events(ctx: &AppContext) {
17    for (generation, frame) in ctx.drain_configure_warnings() {
18        if ctx.configure_generation() != generation {
19            aft::slog_info!(
20                "dropping stale configure_warnings for generation {} (current {})",
21                generation,
22                ctx.configure_generation()
23            );
24            continue;
25        }
26
27        if let Some(sender) = ctx.progress_sender_handle() {
28            sender(PushFrame::ConfigureWarnings(frame));
29        }
30    }
31}
32
33pub fn drain_inspect_events(ctx: &AppContext) {
34    let drained = ctx.inspect_manager().drain_completions();
35    // Watcher-driven Tier-2 scans complete via the reuse path, which bypasses
36    // `result_rx`/`drain_completions`. Poll the manager's reuse counter so a
37    // background scan still refreshes the bar (#3) — otherwise the counts and
38    // `~` marker would only update on a manual `aft_inspect`.
39    let reuse_completed = ctx.take_new_reuse_completions();
40    // A completed background Tier-2 scan refreshes the agent status-bar counts
41    // to the freshly-persisted aggregate, and clears the stale marker — so the
42    // bar reflects the new numbers on the next tool result without waiting for
43    // an explicit aft_inspect call.
44    if drained > 0 || reuse_completed {
45        if let Some(project_root) = ctx.config().project_root.clone() {
46            let (dead_code, unused_exports, duplicates) = ctx
47                .inspect_manager()
48                .latest_tier2_counts(ctx.inspect_dir(), project_root);
49            // Don't clear the `~` stale marker until the whole serial Tier-2
50            // cycle has drained — while any category is still in flight the
51            // already-persisted categories may predate the latest edit, so
52            // claiming fresh would be premature (#20). `None` counts preserve
53            // the last-known value rather than fabricating a `0` (#1).
54            let stale = ctx.inspect_manager().tier2_any_in_flight();
55            ctx.update_status_bar_tier2(dead_code, unused_exports, duplicates, None, stale);
56            // Push the refreshed snapshot so the sidebar reflects the new Tier-2
57            // counts immediately. `update_status_bar_tier2` only mutates the
58            // in-memory counts (which the agent status bar reads live on each
59            // tool result); the push-driven sidebar would otherwise keep showing
60            // the pre-population snapshot — where `status_bar` was null and the
61            // Code Health section stayed hidden — until some unrelated event
62            // happened to emit a status frame.
63            ctx.status_emitter().signal(ctx.build_status_snapshot());
64        }
65    }
66}
67
68/// Drain all background build-completion receivers in standalone order.
69///
70/// Search installs first so watcher/pending updates apply to the freshest index,
71/// followed by callgraph store and semantic index completion.
72pub fn drain_build_completions(ctx: &AppContext) {
73    drain_search_index_events(ctx);
74    drain_callgraph_store_events(ctx);
75    drain_semantic_index_events(ctx);
76}
77
78/// Return true when any background build-completion receiver is currently set.
79///
80/// Each receiver is checked under its own short lock; no lock is held while
81/// checking the next subsystem.
82pub fn any_build_in_flight(ctx: &AppContext) -> bool {
83    {
84        let rx = ctx
85            .search_index_rx()
86            .read()
87            .unwrap_or_else(std::sync::PoisonError::into_inner);
88        if rx.is_some() {
89            return true;
90        }
91    }
92
93    {
94        let rx = ctx.callgraph_store_rx().lock();
95        if rx.is_some() {
96            return true;
97        }
98    }
99
100    {
101        let rx = ctx.semantic_index_rx().lock();
102        rx.is_some()
103    }
104}
105
106pub fn watcher_path_is_ignored_by_current_matcher(ctx: &AppContext, path: &Path) -> bool {
107    if watcher_path_is_infra_skip(path) {
108        return true;
109    }
110
111    if let Some(matcher) = ctx.gitignore() {
112        if path.starts_with(matcher.path()) {
113            let is_dir = path.is_dir();
114            return matcher
115                .matched_path_or_any_parents(path, is_dir)
116                .is_ignore();
117        }
118    }
119
120    false
121}
122
123fn replay_search_index_pending_updates(
124    ctx: &AppContext,
125    index: &mut crate::search_index::SearchIndex,
126    pending_paths: Vec<std::path::PathBuf>,
127) {
128    for path in pending_paths {
129        if path.exists() {
130            if watcher_path_is_ignored_by_current_matcher(ctx, &path) {
131                index.remove_file(&path);
132            } else {
133                index.update_file(&path);
134            }
135        } else {
136            index.remove_file(&path);
137        }
138    }
139}
140
141pub fn watcher_path_is_semantic_source(path: &Path) -> bool {
142    crate::semantic_index::is_semantic_indexed_extension(path)
143}
144
145pub fn mark_semantic_corpus_refresh_success(ctx: &AppContext) {
146    ctx.clear_all_semantic_refresh_retry_attempts();
147    ctx.reset_semantic_refresh_circuit_after_success();
148}
149
150pub fn drain_search_index_events(ctx: &AppContext) {
151    let (latest, disconnected) = {
152        let rx_ref = ctx
153            .search_index_rx()
154            .read()
155            .unwrap_or_else(std::sync::PoisonError::into_inner);
156        let Some(rx) = rx_ref.as_ref() else {
157            return;
158        };
159
160        let mut latest = None;
161        let mut disconnected = false;
162        loop {
163            match rx.try_recv() {
164                Ok(index) => latest = Some(index),
165                Err(crossbeam_channel::TryRecvError::Empty) => break,
166                Err(crossbeam_channel::TryRecvError::Disconnected) => {
167                    disconnected = true;
168                    break;
169                }
170            }
171        }
172        (latest, disconnected)
173    };
174
175    let mut status_changed = false;
176    let mut installed_index = false;
177    if let Some(mut index) = latest {
178        let pending_paths = ctx.take_pending_search_index_paths();
179        if !pending_paths.is_empty() {
180            replay_search_index_pending_updates(ctx, &mut index, pending_paths);
181        }
182        *ctx.search_index()
183            .write()
184            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
185        installed_index = true;
186        status_changed = true;
187    }
188
189    if disconnected || installed_index {
190        *ctx.search_index_rx()
191            .write()
192            .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
193        if disconnected && !installed_index {
194            let _ = ctx.take_pending_search_index_paths();
195        }
196        status_changed = true;
197    }
198
199    if status_changed {
200        ctx.status_emitter().signal(ctx.build_status_snapshot());
201    }
202}
203
204/// Install a background-built callgraph store once its cold build completes.
205/// Mirrors `drain_search_index_events`: drains the receiver, installs the
206/// freshest store, replays paths that changed during the build, and clears the
207/// receiver. On build failure (channel disconnected with nothing installed) the
208/// receiver is cleared so a later op can retry the cold build.
209pub fn drain_callgraph_store_events(ctx: &AppContext) {
210    let (latest, disconnected) = {
211        let rx_ref = ctx.callgraph_store_rx().lock();
212        let Some(rx) = rx_ref.as_ref() else {
213            return;
214        };
215
216        let mut latest = None;
217        let mut disconnected = false;
218        loop {
219            match rx.try_recv() {
220                Ok(store) => latest = Some(store),
221                Err(crossbeam_channel::TryRecvError::Empty) => break,
222                Err(crossbeam_channel::TryRecvError::Disconnected) => {
223                    disconnected = true;
224                    break;
225                }
226            }
227        }
228        (latest, disconnected)
229    };
230
231    let mut status_changed = false;
232    let mut installed = false;
233    if let Some(store) = latest {
234        // Replay source files that changed while the cold build was running so
235        // the freshly-installed store reflects mid-build edits.
236        let pending = ctx.take_pending_callgraph_store_paths();
237        if !pending.is_empty() {
238            if let Err(error) = store.refresh_files(&pending) {
239                crate::slog_warn!(
240                    "callgraph store post-build pending refresh failed: {}",
241                    error
242                );
243                if let Err(mark_error) = store.mark_files_stale(&pending) {
244                    crate::slog_warn!(
245                        "failed to mark callgraph store files stale after post-build refresh failure: {}",
246                        mark_error
247                    );
248                }
249            }
250        }
251        *ctx.callgraph_store()
252            .write()
253            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::new(store));
254        installed = true;
255        status_changed = true;
256    }
257
258    if disconnected || installed {
259        *ctx.callgraph_store_rx().lock() = None;
260        if disconnected && !installed {
261            // Build failed: discard pending paths (no store to apply them to);
262            // a later op restarts the build and re-walks the project.
263            let _ = ctx.take_pending_callgraph_store_paths();
264        }
265        status_changed = true;
266    }
267
268    if status_changed {
269        ctx.status_emitter().signal(ctx.build_status_snapshot());
270    }
271}
272
273pub fn drain_semantic_index_events(ctx: &AppContext) {
274    let (events, disconnected) = {
275        let rx_ref = ctx.semantic_index_rx().lock();
276        let Some(rx) = rx_ref.as_ref() else {
277            return;
278        };
279
280        let mut events = Vec::new();
281        let mut disconnected = false;
282        loop {
283            match rx.try_recv() {
284                Ok(event) => events.push(event),
285                Err(crossbeam_channel::TryRecvError::Empty) => break,
286                Err(crossbeam_channel::TryRecvError::Disconnected) => {
287                    disconnected = true;
288                    break;
289                }
290            }
291        }
292        (events, disconnected)
293    };
294
295    if events.is_empty() && !disconnected {
296        return;
297    }
298
299    let mut keep_receiver = true;
300    let mut status_changed = false;
301    let mut replay_refresh_paths = Vec::new();
302    let mut replay_corpus_refresh = false;
303    for event in events {
304        match event {
305            SemanticIndexEvent::Progress {
306                stage,
307                files,
308                entries_done,
309                entries_total,
310            } => {
311                *ctx.semantic_index_status()
312                    .write()
313                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
314                    SemanticIndexStatus::Building {
315                        stage,
316                        files,
317                        entries_done,
318                        entries_total,
319                    };
320                // Push progress to the sidebar. Without this, a long rebuild
321                // (e.g. a slow local embedding backend re-indexing after a prior
322                // failure) leaves the sidebar showing the stale prior state —
323                // "failed" with an old error — for the entire build, even though
324                // it is actively embedding. Progress transitions are exactly
325                // when the user needs to see "building".
326                status_changed = true;
327            }
328            SemanticIndexEvent::Ready(mut index) => {
329                mark_semantic_corpus_refresh_success(ctx);
330                let pending_paths = ctx.take_pending_semantic_index_paths();
331                for path in pending_paths {
332                    if watcher_path_is_semantic_source(&path) {
333                        index.invalidate_file(&path);
334                        replay_refresh_paths.push(path);
335                    }
336                }
337                replay_corpus_refresh = ctx.take_pending_semantic_corpus_refresh();
338                *ctx.semantic_index()
339                    .write()
340                    .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
341                *ctx.semantic_index_status()
342                    .write()
343                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
344                    SemanticIndexStatus::ready();
345                keep_receiver = false;
346                status_changed = true;
347            }
348            SemanticIndexEvent::Failed(error) => {
349                let _ = ctx.take_pending_semantic_index_paths();
350                let _ = ctx.take_pending_semantic_corpus_refresh();
351                *ctx.semantic_index()
352                    .write()
353                    .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
354                ctx.clear_semantic_refresh_worker();
355                *ctx.semantic_index_status()
356                    .write()
357                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
358                    SemanticIndexStatus::Failed(error);
359                keep_receiver = false;
360                status_changed = true;
361            }
362        }
363    }
364
365    if disconnected && keep_receiver {
366        let _ = ctx.take_pending_semantic_index_paths();
367        let _ = ctx.take_pending_semantic_corpus_refresh();
368        *ctx.semantic_index()
369            .write()
370            .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
371        ctx.clear_semantic_refresh_worker();
372        *ctx.semantic_index_status()
373            .write()
374            .unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::Failed(
375            "semantic index build worker disconnected before reporting completion".to_string(),
376        );
377        keep_receiver = false;
378        status_changed = true;
379    }
380
381    if !keep_receiver {
382        *ctx.semantic_index_rx().lock() = None;
383    }
384
385    if replay_corpus_refresh {
386        if ctx.canonical_cache_root_opt().is_some() {
387            *ctx.semantic_index_status()
388                .write()
389                .unwrap_or_else(std::sync::PoisonError::into_inner) =
390                SemanticIndexStatus::Building {
391                    stage: "refreshing_corpus".to_string(),
392                    files: None,
393                    entries_done: None,
394                    entries_total: None,
395                };
396            let sent = ctx
397                .semantic_refresh_sender()
398                .is_some_and(|sender| sender.send(SemanticRefreshRequest::Corpus).is_ok());
399            if !sent {
400                *ctx.semantic_index_status()
401                    .write()
402                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
403                    SemanticIndexStatus::Failed(
404                        "semantic corpus refresh worker unavailable".to_string(),
405                    );
406            }
407            status_changed = true;
408        }
409    } else if !replay_refresh_paths.is_empty() {
410        {
411            let mut status = ctx
412                .semantic_index_status()
413                .write()
414                .unwrap_or_else(std::sync::PoisonError::into_inner);
415            if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
416                for path in &replay_refresh_paths {
417                    status.add_refreshing_file(path.clone());
418                }
419                status_changed = true;
420            }
421        }
422        let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
423            sender
424                .send(SemanticRefreshRequest::Files {
425                    paths: replay_refresh_paths.clone(),
426                })
427                .is_ok()
428        });
429        if !sent {
430            crate::slog_warn!(
431                "semantic refresh worker unavailable; dropping {} replayed file(s)",
432                replay_refresh_paths.len()
433            );
434            let mut status = ctx
435                .semantic_index_status()
436                .write()
437                .unwrap_or_else(std::sync::PoisonError::into_inner);
438            for path in &replay_refresh_paths {
439                status.cancel_refreshing_file(path);
440            }
441            status_changed = true;
442        }
443    }
444
445    if status_changed {
446        ctx.status_emitter().signal(ctx.build_status_snapshot());
447    }
448}
449
450pub const MAX_RETRY_ATTEMPTS: usize = 6;
451pub const BREAKER_TRIP_THRESHOLD: usize = 3;
452
453/// Backoff for live semantic refresh retries after a transient embedding backend
454/// failure. Mirrors the cold-build retry cadence (15s -> 30s -> 60s capped) so
455/// a down backend cannot spin the watcher/refresh loop hot while still
456/// self-healing once the backend returns.
457fn semantic_refresh_retry_backoff(attempt: usize) -> Duration {
458    // Test seam, intentionally matching the build-level retry override.
459    if let Ok(raw) = std::env::var("AFT_SEMANTIC_RETRY_BACKOFF_MS") {
460        if let Ok(ms) = raw.parse::<u64>() {
461            return Duration::from_millis(ms);
462        }
463    }
464    const SCHEDULE_SECS: [u64; 3] = [15, 30, 60];
465    let secs = SCHEDULE_SECS
466        .get(attempt)
467        .copied()
468        .unwrap_or(*SCHEDULE_SECS.last().unwrap());
469    Duration::from_secs(secs)
470}
471
472struct SemanticRefreshRetryPlan {
473    retry_paths: Vec<std::path::PathBuf>,
474    capped_paths: Vec<std::path::PathBuf>,
475    delay: Option<Duration>,
476}
477
478fn next_semantic_refresh_retry_plan(
479    ctx: &AppContext,
480    paths: Vec<std::path::PathBuf>,
481) -> SemanticRefreshRetryPlan {
482    let mut retry_paths = Vec::new();
483    let mut capped_paths = Vec::new();
484    let mut max_attempt = 0usize;
485
486    ctx.with_semantic_refresh_retry_attempts_mut(|attempts| {
487        for path in paths {
488            let attempt = attempts.get(&path).copied().unwrap_or(0);
489            if attempt >= MAX_RETRY_ATTEMPTS {
490                capped_paths.push(path);
491                continue;
492            }
493            max_attempt = max_attempt.max(attempt);
494            attempts.insert(path.clone(), attempt.saturating_add(1));
495            retry_paths.push(path);
496        }
497    });
498
499    let delay = if retry_paths.is_empty() {
500        None
501    } else {
502        Some(semantic_refresh_retry_backoff(max_attempt))
503    };
504
505    SemanticRefreshRetryPlan {
506        retry_paths,
507        capped_paths,
508        delay,
509    }
510}
511
512fn clear_semantic_refresh_retry_attempts(ctx: &AppContext, paths: &[std::path::PathBuf]) {
513    ctx.clear_semantic_refresh_retry_attempts(paths);
514}
515
516fn clear_completed_pending_semantic_index_paths(
517    ctx: &AppContext,
518    completed_paths: &[std::path::PathBuf],
519) {
520    if completed_paths.is_empty() {
521        return;
522    }
523
524    let completed = completed_paths.iter().cloned().collect::<HashSet<_>>();
525    let remaining = ctx
526        .take_pending_semantic_index_paths()
527        .into_iter()
528        .filter(|path| !completed.contains(path))
529        .collect::<Vec<_>>();
530    if !remaining.is_empty() {
531        ctx.add_pending_semantic_index_paths(remaining);
532    }
533}
534
535fn semantic_refresh_probe_delay() -> Duration {
536    semantic_refresh_retry_backoff(usize::MAX)
537}
538
539pub fn semantic_refresh_circuit_is_open(ctx: &AppContext) -> bool {
540    ctx.semantic_refresh_circuit_is_open()
541}
542
543pub fn record_semantic_refresh_transient_failure(ctx: &AppContext) -> bool {
544    ctx.record_semantic_refresh_transient_failure(BREAKER_TRIP_THRESHOLD)
545}
546
547fn reset_semantic_refresh_transient_failure_count(ctx: &AppContext) {
548    ctx.reset_semantic_refresh_transient_failure_count();
549}
550
551fn reset_semantic_refresh_circuit_after_success(ctx: &AppContext) {
552    ctx.reset_semantic_refresh_circuit_after_success();
553}
554
555fn mark_semantic_refresh_success(ctx: &AppContext, completed_paths: &[std::path::PathBuf]) {
556    clear_semantic_refresh_retry_attempts(ctx, completed_paths);
557    clear_completed_pending_semantic_index_paths(ctx, completed_paths);
558    reset_semantic_refresh_circuit_after_success(ctx);
559}
560
561#[doc(hidden)]
562pub fn semantic_refresh_transient_failure_count_for_test(ctx: &AppContext) -> usize {
563    ctx.semantic_refresh_transient_failure_count()
564}
565
566#[doc(hidden)]
567pub fn semantic_refresh_probe_is_scheduled_for_test(ctx: &AppContext) -> bool {
568    ctx.semantic_refresh_probe_is_scheduled()
569}
570
571fn ensure_semantic_refresh_probe_scheduled(ctx: &AppContext) {
572    ctx.ensure_semantic_refresh_probe_scheduled(semantic_refresh_probe_delay());
573}
574
575fn maybe_fire_semantic_refresh_probe(ctx: &AppContext) {
576    if !ctx.take_semantic_refresh_probe_ready() {
577        return;
578    }
579    if !semantic_refresh_circuit_is_open(ctx) {
580        return;
581    }
582
583    let pending_paths = ctx.take_pending_semantic_index_paths();
584    if pending_paths.is_empty() {
585        return;
586    }
587
588    let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
589        sender
590            .send(SemanticRefreshRequest::Files {
591                paths: pending_paths.clone(),
592            })
593            .is_ok()
594    });
595    if !sent {
596        ctx.add_pending_semantic_index_paths(pending_paths);
597    }
598}
599
600pub fn schedule_semantic_refresh_retry(
601    ctx: &AppContext,
602    paths: Vec<std::path::PathBuf>,
603    error: &str,
604) -> bool {
605    if paths.is_empty() {
606        return false;
607    }
608    let Some(sender) = ctx.semantic_refresh_sender() else {
609        return false;
610    };
611
612    let SemanticRefreshRetryPlan {
613        retry_paths,
614        capped_paths,
615        delay,
616    } = next_semantic_refresh_retry_plan(ctx, paths);
617
618    if !capped_paths.is_empty() {
619        aft::slog_warn!(
620            "semantic refresh retry limit reached for {} file(s); preserving for next watcher/configure refresh",
621            capped_paths.len(),
622        );
623        ctx.add_pending_semantic_index_paths(capped_paths);
624    }
625
626    let Some(delay) = delay else {
627        return true;
628    };
629
630    let clean = aft::semantic_index::strip_transient_embedding_marker(error);
631    aft::slog_warn!(
632        "semantic refresh hit a transient backend error ({}); retrying {} file(s) in {}ms",
633        clean,
634        retry_paths.len(),
635        delay.as_millis(),
636    );
637
638    let session_id = log_ctx::current_session();
639    thread::spawn(move || {
640        log_ctx::with_session(session_id, || {
641            thread::sleep(delay);
642            let _ = sender.send(SemanticRefreshRequest::Files { paths: retry_paths });
643        });
644    });
645    true
646}
647
648pub fn drain_semantic_refresh_events(ctx: &AppContext) {
649    let (events, disconnected) = {
650        let rx_ref = ctx.semantic_refresh_event_rx().lock();
651        let Some(rx) = rx_ref.as_ref() else {
652            return;
653        };
654
655        let mut events = Vec::new();
656        let mut disconnected = false;
657        loop {
658            match rx.try_recv() {
659                Ok(event) => events.push(event),
660                Err(crossbeam_channel::TryRecvError::Empty) => break,
661                Err(crossbeam_channel::TryRecvError::Disconnected) => {
662                    disconnected = true;
663                    break;
664                }
665            }
666        }
667        (events, disconnected)
668    };
669
670    if events.is_empty() && !disconnected {
671        maybe_fire_semantic_refresh_probe(ctx);
672        return;
673    }
674
675    let had_events = !events.is_empty();
676    let mut status_changed = false;
677    let mut replay_refresh_paths = Vec::new();
678    for event in events {
679        match event {
680            SemanticRefreshEvent::Started { paths } => {
681                let mut status = ctx
682                    .semantic_index_status()
683                    .write()
684                    .unwrap_or_else(std::sync::PoisonError::into_inner);
685                if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
686                    for path in paths {
687                        status.start_refreshing_file(path);
688                    }
689                    status_changed = true;
690                }
691            }
692            SemanticRefreshEvent::CorpusStarted { files } => {
693                *ctx.semantic_index_status()
694                    .write()
695                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
696                    SemanticIndexStatus::Building {
697                        stage: "refreshing_corpus".to_string(),
698                        files: Some(files),
699                        entries_done: None,
700                        entries_total: None,
701                    };
702                status_changed = true;
703            }
704            SemanticRefreshEvent::Completed {
705                added_entries,
706                updated_metadata,
707                completed_paths,
708            } => {
709                if let Some(index) = ctx
710                    .semantic_index()
711                    .write()
712                    .unwrap_or_else(std::sync::PoisonError::into_inner)
713                    .as_mut()
714                {
715                    index.apply_refresh_update(added_entries, updated_metadata, &completed_paths);
716                }
717                mark_semantic_refresh_success(ctx, &completed_paths);
718                let mut status = ctx
719                    .semantic_index_status()
720                    .write()
721                    .unwrap_or_else(std::sync::PoisonError::into_inner);
722                if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
723                    for path in &completed_paths {
724                        status.complete_refreshing_file(path);
725                    }
726                    status_changed = true;
727                }
728            }
729            SemanticRefreshEvent::CorpusCompleted {
730                mut index,
731                changed,
732                added,
733                deleted,
734                total_processed,
735            } => {
736                aft::runtime_drain::mark_semantic_corpus_refresh_success(ctx);
737                if changed > 0 || added > 0 || deleted > 0 {
738                    aft::slog_info!(
739                        "semantic corpus refresh completed: {} changed, {} new, {} deleted, {} total processed",
740                        changed,
741                        added,
742                        deleted,
743                        total_processed
744                    );
745                }
746                let pending_paths = ctx.take_pending_semantic_index_paths();
747                for path in pending_paths {
748                    if !aft::runtime_drain::watcher_path_is_semantic_source(&path) {
749                        continue;
750                    }
751                    index.invalidate_file(&path);
752                    if !aft::runtime_drain::watcher_path_is_ignored_by_current_matcher(ctx, &path) {
753                        replay_refresh_paths.push(path);
754                    }
755                }
756                *ctx.semantic_index()
757                    .write()
758                    .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
759                *ctx.semantic_index_status()
760                    .write()
761                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
762                    SemanticIndexStatus::ready();
763                status_changed = true;
764            }
765            SemanticRefreshEvent::Failed { paths, error } => {
766                if aft::semantic_index::embedding_failure_is_transient(&error) {
767                    if record_semantic_refresh_transient_failure(ctx) {
768                        ctx.add_pending_semantic_index_paths(paths);
769                        ensure_semantic_refresh_probe_scheduled(ctx);
770                    } else if !schedule_semantic_refresh_retry(ctx, paths.clone(), &error) {
771                        aft::slog_warn!(
772                            "semantic refresh worker unavailable; preserving {} transiently failed file(s) for retry",
773                            paths.len(),
774                        );
775                        ctx.add_pending_semantic_index_paths(paths);
776                    }
777                } else {
778                    aft::slog_warn!("semantic refresh failed: {}", error);
779                    reset_semantic_refresh_transient_failure_count(ctx);
780                    clear_semantic_refresh_retry_attempts(ctx, &paths);
781                    let mut status = ctx
782                        .semantic_index_status()
783                        .write()
784                        .unwrap_or_else(std::sync::PoisonError::into_inner);
785                    if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
786                        for path in &paths {
787                            status.complete_refreshing_file(path);
788                        }
789                        status_changed = true;
790                    }
791                }
792            }
793            SemanticRefreshEvent::CorpusFailed { error } => {
794                // A transient backend blip during a corpus refresh must NOT
795                // destroy the working index — the prior index is still valid and
796                // serving. Keep it Ready and let the next watcher/ignore change
797                // re-trigger the refresh, rather than nuking everything to
798                // `Failed` over a connection hiccup (the same park-forever trap
799                // the initial build now rides out). Permanent errors (dimension
800                // mismatch, too-many-files) still drop the index and surface the
801                // real failure.
802                if aft::semantic_index::embedding_failure_is_transient(&error) {
803                    let clean = aft::semantic_index::strip_transient_embedding_marker(&error);
804                    let has_index = ctx
805                        .semantic_index()
806                        .read()
807                        .unwrap_or_else(std::sync::PoisonError::into_inner)
808                        .is_some();
809                    if has_index {
810                        aft::slog_warn!(
811                            "semantic corpus refresh hit a transient backend error ({}); keeping the existing index",
812                            clean,
813                        );
814                        *ctx.semantic_index_status()
815                            .write()
816                            .unwrap_or_else(std::sync::PoisonError::into_inner) =
817                            SemanticIndexStatus::ready();
818                    } else {
819                        // No index to fall back on — surface the clean message.
820                        aft::slog_warn!("semantic corpus refresh failed: {}", clean);
821                        *ctx.semantic_index_status()
822                            .write()
823                            .unwrap_or_else(std::sync::PoisonError::into_inner) =
824                            SemanticIndexStatus::Failed(clean);
825                    }
826                    status_changed = true;
827                } else {
828                    aft::slog_warn!("semantic corpus refresh failed: {}", error);
829                    let _ = ctx.take_pending_semantic_index_paths();
830                    *ctx.semantic_index()
831                        .write()
832                        .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
833                    *ctx.semantic_index_status()
834                        .write()
835                        .unwrap_or_else(std::sync::PoisonError::into_inner) =
836                        SemanticIndexStatus::Failed(error);
837                    status_changed = true;
838                }
839            }
840        }
841    }
842
843    if disconnected {
844        ctx.clear_semantic_refresh_worker();
845        let refreshing_paths = {
846            let status = ctx
847                .semantic_index_status()
848                .read()
849                .unwrap_or_else(std::sync::PoisonError::into_inner);
850            match &*status {
851                SemanticIndexStatus::Ready { refreshing, .. } => refreshing.clone(),
852                _ => Vec::new(),
853            }
854        };
855        if !refreshing_paths.is_empty() {
856            let mut status = ctx
857                .semantic_index_status()
858                .write()
859                .unwrap_or_else(std::sync::PoisonError::into_inner);
860            for path in &refreshing_paths {
861                status.cancel_refreshing_file(path);
862            }
863        }
864        if !refreshing_paths.is_empty() || had_events {
865            status_changed = true;
866        }
867    }
868
869    if !replay_refresh_paths.is_empty() {
870        {
871            let mut status = ctx
872                .semantic_index_status()
873                .write()
874                .unwrap_or_else(std::sync::PoisonError::into_inner);
875            if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
876                for path in &replay_refresh_paths {
877                    status.add_refreshing_file(path.clone());
878                }
879                status_changed = true;
880            }
881        }
882        let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
883            sender
884                .send(SemanticRefreshRequest::Files {
885                    paths: replay_refresh_paths.clone(),
886                })
887                .is_ok()
888        });
889        if !sent {
890            aft::slog_warn!(
891                "semantic refresh worker unavailable; dropping {} replayed corpus file(s)",
892                replay_refresh_paths.len()
893            );
894            let mut status = ctx
895                .semantic_index_status()
896                .write()
897                .unwrap_or_else(std::sync::PoisonError::into_inner);
898            for path in &replay_refresh_paths {
899                status.cancel_refreshing_file(path);
900            }
901            status_changed = true;
902        }
903    }
904
905    maybe_fire_semantic_refresh_probe(ctx);
906
907    if status_changed {
908        ctx.status_emitter().signal(ctx.build_status_snapshot());
909    }
910}
911
912/// Source file extensions that the call graph supports.
913const SOURCE_EXTENSIONS: &[&str] = &[
914    "ts", "tsx", "mts", "cts", "js", "jsx", "mjs", "cjs", "py", "pyi", "rs", "go",
915];
916
917pub const WATCHER_BATCH_INLINE_CAP: usize = 256;
918
919/// A `tsconfig.json` / `jsconfig.json` (including variant names like
920/// `tsconfig.base.json`). A change to any of these can shift TypeScript build
921/// membership (which files `tsc` checks), so the status-bar membership cache
922/// must be invalidated. Deliberately broad on the variant suffix and ignorant
923/// of `extends` graphs: the cache is cleared wholesale on a match, and base
924/// configs almost always follow the `tsconfig*.json` naming. Non-standard base
925/// names are covered on the next `tsconfig.json` change or `configure`.
926pub fn watcher_path_is_tsconfig(path: &std::path::Path) -> bool {
927    path.file_name()
928        .and_then(|n| n.to_str())
929        .map(|n| {
930            n == "tsconfig.json"
931                || n == "jsconfig.json"
932                || ((n.starts_with("tsconfig.") || n.starts_with("jsconfig."))
933                    && n.ends_with(".json"))
934        })
935        .unwrap_or(false)
936}
937
938pub fn watcher_path_is_source(path: &std::path::Path) -> bool {
939    path.extension()
940        .and_then(|ext| ext.to_str())
941        .is_some_and(|ext| SOURCE_EXTENSIONS.contains(&ext))
942}
943
944/// A file the callgraph STORE would have indexed at cold-build time. The store
945/// indexes every file `walk_project_files` yields (i.e. any detected language),
946/// not just the trigram `SOURCE_EXTENSIONS` set. Gating the store's watcher
947/// refresh on the narrower trigram set left edits to Java/C/C++/C#/Kotlin/Ruby/
948/// PHP/… (all of which the store extracts calls for) serving stale results until
949/// a full rebuild. Mirror cold-build exactly so refresh coverage == index
950/// coverage.
951pub fn watcher_path_is_callgraph_indexed(path: &std::path::Path) -> bool {
952    aft::parser::detect_language(path).is_some()
953}
954
955pub fn semantic_corpus_refresh_in_progress(ctx: &AppContext) -> bool {
956    let status = ctx
957        .semantic_index_status()
958        .read()
959        .unwrap_or_else(std::sync::PoisonError::into_inner);
960    matches!(
961        &*status,
962        SemanticIndexStatus::Building { stage, .. } if stage == "refreshing_corpus"
963    )
964}
965
966#[cfg(debug_assertions)]
967pub fn delay_search_rebuild_publish_for_debug() {
968    let Some(delay_ms) = std::env::var("AFT_TEST_SEARCH_REBUILD_PUBLISH_DELAY_MS")
969        .ok()
970        .and_then(|raw| raw.parse::<u64>().ok())
971    else {
972        return;
973    };
974    thread::sleep(Duration::from_millis(delay_ms));
975}
976
977#[cfg(not(debug_assertions))]
978pub fn delay_search_rebuild_publish_for_debug() {}
979
980pub fn spawn_search_corpus_refresh(
981    ctx: &AppContext,
982    root: std::path::PathBuf,
983    config: Arc<aft::config::Config>,
984) {
985    {
986        let mut search_index = ctx
987            .search_index()
988            .write()
989            .unwrap_or_else(std::sync::PoisonError::into_inner);
990        if let Some(index) = search_index.as_mut() {
991            index.ready = false;
992        }
993    }
994
995    let (tx, rx): (
996        crossbeam_channel::Sender<aft::search_index::SearchIndex>,
997        crossbeam_channel::Receiver<aft::search_index::SearchIndex>,
998    ) = crossbeam_channel::unbounded();
999    *ctx.search_index_rx()
1000        .write()
1001        .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(rx);
1002    ctx.reset_symbol_cache();
1003
1004    let is_worktree_bridge = ctx.is_worktree_bridge();
1005    let session_id = log_ctx::current_session();
1006    thread::spawn(move || {
1007        log_ctx::with_session(session_id, || {
1008            let cache_dir =
1009                aft::search_index::resolve_cache_dir(&root, config.storage_dir.as_deref());
1010            let _cache_lock = if is_worktree_bridge {
1011                None
1012            } else {
1013                match aft::search_index::CacheLock::acquire(&cache_dir) {
1014                    Ok(lock) => Some(lock),
1015                    Err(error) => {
1016                        aft::slog_warn!(
1017                            "failed to acquire search cache lock for ignore refresh: {}",
1018                            error
1019                        );
1020                        None
1021                    }
1022                }
1023            };
1024            let index = aft::search_index::SearchIndex::build_with_limit(
1025                &root,
1026                config.search_index_max_file_size,
1027            );
1028            delay_search_rebuild_publish_for_debug();
1029            if !is_worktree_bridge {
1030                index.write_to_disk(&cache_dir, index.stored_git_head());
1031            }
1032            let _ = tx.send(index);
1033        });
1034    });
1035}
1036
1037pub fn refresh_project_corpus(
1038    ctx: &AppContext,
1039    reason: &str,
1040    invalidate_ignore_paths: bool,
1041) -> bool {
1042    let Some(root) = ctx.canonical_cache_root_opt() else {
1043        return false;
1044    };
1045    let config = ctx.config();
1046    let mut status_changed = false;
1047
1048    if invalidate_ignore_paths {
1049        if let Some(graph) = ctx.callgraph().lock().as_mut() {
1050            graph.invalidate_file(&root.join(".gitignore"));
1051            graph.invalidate_file(&root.join(".aftignore"));
1052        }
1053    }
1054
1055    if !ctx.is_worktree_bridge() {
1056        // Do NOT cold-build the callgraph store synchronously here. This function
1057        // runs on the single-threaded dispatch loop from `drain_watcher_events`,
1058        // which fires before EVERY request (and on idle ticks). A full O(repo)
1059        // `refresh_corpus` (= `cold_build`: parse all files + resolve refs +
1060        // rewrite SQLite) blocks ALL queued requests — including `configure` and
1061        // `bash` — for its entire duration, which exceeds the 30s transport
1062        // timeout on a large repo. On a long-lived bridge (OpenCode Desktop) an
1063        // FSEvents overflow triggers this drain, so the user sees configure/bash
1064        // time out (regression: the watcher-overflow path that calls this is new
1065        // in 0.39.1; the ignore-rule path that also calls this had the same
1066        // latent inline block, just rarely triggered).
1067        //
1068        // Instead, drop the resident store and force a BACKGROUND rebuild: the
1069        // next `callgraph_store_for_ops()` spawns the cold build off-thread and
1070        // returns `Building` (callgraph ops + dead_code projection already handle
1071        // `Building`/unavailable gracefully). This mirrors the search/semantic
1072        // refreshes below, which are already async. A build already in flight
1073        // keeps running; the resident drop + force flag make the next op converge
1074        // to a fresh full rebuild.
1075        // Mirror the original "act only when the callgraph is actually loaded or
1076        // building" guard, but reschedule instead of inline-building.
1077        let callgraph_store_resident = {
1078            let guard = ctx
1079                .callgraph_store()
1080                .read()
1081                .unwrap_or_else(std::sync::PoisonError::into_inner);
1082            guard.is_some()
1083        };
1084        if callgraph_store_resident || ctx.callgraph_store_rx().lock().is_some() {
1085            *ctx.callgraph_store()
1086                .write()
1087                .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
1088            ctx.mark_callgraph_store_force_rebuild();
1089            status_changed = true;
1090            aft::slog_info!(
1091                "callgraph store scheduled for background rebuild after {}",
1092                reason
1093            );
1094        }
1095    }
1096
1097    if config.search_index {
1098        spawn_search_corpus_refresh(ctx, root.clone(), config.clone());
1099        status_changed = true;
1100        aft::slog_info!("started search index refresh after {}", reason);
1101    }
1102
1103    if config.semantic_search {
1104        if let Some(sender) = ctx.semantic_refresh_sender() {
1105            *ctx.semantic_index_status()
1106                .write()
1107                .unwrap_or_else(std::sync::PoisonError::into_inner) =
1108                SemanticIndexStatus::Building {
1109                    stage: "refreshing_corpus".to_string(),
1110                    files: None,
1111                    entries_done: None,
1112                    entries_total: None,
1113                };
1114            match sender.send(SemanticRefreshRequest::Corpus) {
1115                Ok(()) => {
1116                    status_changed = true;
1117                }
1118                Err(error) => {
1119                    *ctx.semantic_index_status()
1120                        .write()
1121                        .unwrap_or_else(std::sync::PoisonError::into_inner) =
1122                        SemanticIndexStatus::Failed(format!(
1123                            "semantic corpus refresh worker unavailable: {error}"
1124                        ));
1125                    status_changed = true;
1126                }
1127            }
1128        } else if ctx.semantic_index_rx().lock().is_some() {
1129            ctx.mark_pending_semantic_corpus_refresh();
1130        }
1131    }
1132
1133    status_changed
1134}
1135
1136pub fn refresh_corpus_after_ignore_change(ctx: &AppContext) -> bool {
1137    refresh_project_corpus(ctx, "ignore-rule change", true)
1138}
1139
1140pub fn refresh_project_after_watcher_rescan(ctx: &AppContext) -> bool {
1141    let Some(root) = ctx.canonical_cache_root_opt() else {
1142        return false;
1143    };
1144    ctx.clear_pending_index_updates();
1145    ctx.reset_symbol_cache();
1146    let _ = ctx.mark_status_bar_tier2_stale();
1147    ctx.clear_tsconfig_membership_cache();
1148    let mut status_changed = true;
1149
1150    if ctx.callgraph().lock().is_some() {
1151        *ctx.callgraph().lock() = Some(aft::callgraph::CallGraph::new(root));
1152    }
1153
1154    status_changed |= refresh_project_corpus(ctx, "watcher overflow", false);
1155    status_changed
1156}
1157
1158pub fn refresh_callgraph_store_for_watcher(
1159    ctx: &AppContext,
1160    changed: &HashSet<std::path::PathBuf>,
1161) {
1162    if ctx.is_worktree_bridge() {
1163        return;
1164    }
1165    let source_paths = changed
1166        .iter()
1167        .filter(|path| watcher_path_is_callgraph_indexed(path))
1168        .cloned()
1169        .collect::<Vec<_>>();
1170    if source_paths.is_empty() {
1171        return;
1172    }
1173    // Converge to the current generation before writing: if another process
1174    // published a newer one, drop our stale store so the changed paths get
1175    // recorded as pending and replayed against the fresh store (rather than
1176    // incrementally written into a superseded generation).
1177    ctx.revalidate_callgraph_store_generation();
1178    let store = {
1179        let guard = ctx
1180            .callgraph_store()
1181            .read()
1182            .unwrap_or_else(std::sync::PoisonError::into_inner);
1183        guard.as_ref().map(Arc::clone)
1184    };
1185    let Some(store) = store else {
1186        // Store not resident yet. If a cold build is in flight, record the
1187        // changed paths so they're replayed once the freshly-built store lands
1188        // (otherwise mid-build edits would be silently lost). If no build is
1189        // running, there's nothing to refresh.
1190        if ctx.callgraph_store_rx().lock().is_some() {
1191            ctx.add_pending_callgraph_store_paths(source_paths);
1192        }
1193        return;
1194    };
1195    if let Err(error) = store.refresh_files(&source_paths) {
1196        aft::slog_warn!("callgraph store refresh failed: {}", error);
1197        match store.mark_files_stale(&source_paths) {
1198            Ok(marked) => aft::slog_warn!(
1199                "marked {} callgraph store file(s) stale after refresh failure",
1200                marked.len()
1201            ),
1202            Err(mark_error) => aft::slog_warn!(
1203                "failed to mark callgraph store files stale after refresh failure: {}",
1204                mark_error
1205            ),
1206        }
1207    }
1208}
1209
1210/// Drain pre-filtered watcher events and apply cache invalidations on the
1211/// dispatch thread. The watcher filter thread owns notify receive/decode,
1212/// metadata filtering, ignore matching, root-deleted detection, and path
1213/// coalescing; this drain only reacts to compact control events and surviving
1214/// paths because the cache/index state below is not Send.
1215pub fn drain_watcher_events(ctx: &AppContext) {
1216    let mut changed: HashSet<std::path::PathBuf> = HashSet::new();
1217    let mut ignore_file_changed = false;
1218    let mut rescan_required = false;
1219    let mut watcher_failed = None;
1220    let mut root_deleted = false;
1221
1222    {
1223        let rx_ref = ctx.watcher_rx().lock();
1224        let rx = match rx_ref.as_ref() {
1225            Some(rx) => rx,
1226            None => {
1227                ctx.tick_tier2_refresh_scheduler(0);
1228                return; // No watcher configured
1229            }
1230        };
1231
1232        loop {
1233            match rx.try_recv() {
1234                Ok(WatcherDispatchEvent::Paths(paths)) => {
1235                    if !rescan_required {
1236                        changed.extend(paths);
1237                    }
1238                }
1239                Ok(WatcherDispatchEvent::RescanRequired) => {
1240                    rescan_required = true;
1241                    changed.clear();
1242                }
1243                Ok(WatcherDispatchEvent::IgnoreRulesChanged { path }) => {
1244                    ignore_file_changed = true;
1245                    log::debug!(
1246                        "watcher: ignore rules changed at {}, rebuilding matcher",
1247                        path.display()
1248                    );
1249                    if !rescan_required {
1250                        ctx.rebuild_gitignore();
1251                    }
1252                }
1253                Ok(WatcherDispatchEvent::RootDeleted) => {
1254                    root_deleted = true;
1255                    break;
1256                }
1257                Ok(WatcherDispatchEvent::Error(error)) => {
1258                    watcher_failed = Some(error);
1259                    break;
1260                }
1261                Err(crossbeam_channel::TryRecvError::Empty) => break,
1262                Err(crossbeam_channel::TryRecvError::Disconnected) => {
1263                    watcher_failed = Some("watcher channel disconnected".to_string());
1264                    break;
1265                }
1266            }
1267        }
1268    }
1269
1270    let mut watcher_status_changed = false;
1271    if root_deleted {
1272        ctx.stop_watcher_runtime();
1273        let _ = ctx.add_degraded_reason("project_root_deleted".to_string());
1274        aft::slog_warn!(
1275            "project root deleted; dropping watcher to avoid delete-storm: {:?}",
1276            ctx.canonical_cache_root_opt()
1277        );
1278        watcher_status_changed = true;
1279        changed.clear();
1280        rescan_required = false;
1281    } else if let Some(error) = watcher_failed {
1282        ctx.stop_watcher_runtime();
1283        let _ = ctx.add_degraded_reason("watcher_unavailable".to_string());
1284        aft::slog_warn!(
1285            "file watcher unavailable; continuing without live external-change invalidation: {}",
1286            error
1287        );
1288        watcher_status_changed = true;
1289        rescan_required = false;
1290    }
1291
1292    let mut status_changed = watcher_status_changed;
1293    let mut project_corpus_refresh_requested = false;
1294    if rescan_required {
1295        aft::slog_warn!("watcher overflow: forcing project rescan");
1296        ctx.rebuild_gitignore();
1297        status_changed |= refresh_project_after_watcher_rescan(ctx);
1298        project_corpus_refresh_requested = true;
1299        changed.clear();
1300    } else if ignore_file_changed {
1301        status_changed |= refresh_corpus_after_ignore_change(ctx);
1302        project_corpus_refresh_requested = true;
1303    }
1304
1305    let scheduler_changed_path_count = if rescan_required {
1306        aft::inspect::tier2_scheduler::TIER2_REFRESH_STORM_PATH_THRESHOLD + 1
1307    } else if ignore_file_changed {
1308        changed.len().max(1)
1309    } else {
1310        changed.len()
1311    };
1312    if changed.is_empty() {
1313        if status_changed {
1314            ctx.status_emitter().signal(ctx.build_status_snapshot());
1315        }
1316        ctx.tick_tier2_refresh_scheduler(scheduler_changed_path_count);
1317        return;
1318    }
1319
1320    ctx.add_pending_tier2_paths(changed.iter().cloned());
1321
1322    // A real source change makes the last-known Tier-2 counts stale until the
1323    // next background scan reconciles them — surface that in the status bar
1324    // immediately (the `~` marker) so the agent never reads them as live.
1325    if ctx.mark_status_bar_tier2_stale() {
1326        status_changed = true;
1327    }
1328
1329    // A tsconfig change can shift which files `tsc` checks, which is the policy
1330    // the status-bar E/W count filters on. Clear the membership cache wholesale
1331    // so the next bar count re-resolves from disk (handles new nested configs,
1332    // edited `extends` parents, and deletions without per-key bookkeeping).
1333    if changed.iter().any(|path| watcher_path_is_tsconfig(path)) {
1334        ctx.clear_tsconfig_membership_cache();
1335        status_changed = true;
1336    }
1337
1338    let oversized_inline_batch = changed.len() > WATCHER_BATCH_INLINE_CAP;
1339    if oversized_inline_batch {
1340        aft::slog_warn!(
1341            "watcher batch of {} paths exceeds inline cap {}; scheduling corpus refresh",
1342            changed.len(),
1343            WATCHER_BATCH_INLINE_CAP
1344        );
1345        if !project_corpus_refresh_requested {
1346            status_changed |= refresh_project_corpus(ctx, "oversized watcher batch", false);
1347        }
1348    }
1349
1350    let search_build_in_progress = {
1351        let search_index_rx = ctx
1352            .search_index_rx()
1353            .read()
1354            .unwrap_or_else(std::sync::PoisonError::into_inner);
1355        search_index_rx.is_some()
1356    };
1357    if !oversized_inline_batch && search_build_in_progress {
1358        ctx.add_pending_search_index_paths(changed.iter().cloned());
1359    }
1360    let semantic_source_paths = changed
1361        .iter()
1362        .filter(|path| aft::runtime_drain::watcher_path_is_semantic_source(path))
1363        .cloned()
1364        .collect::<Vec<_>>();
1365    let semantic_build_in_progress = ctx.semantic_index_rx().lock().is_some();
1366    let semantic_corpus_refresh_in_progress = semantic_corpus_refresh_in_progress(ctx);
1367    if !oversized_inline_batch
1368        && (semantic_build_in_progress || semantic_corpus_refresh_in_progress)
1369        && !semantic_source_paths.is_empty()
1370    {
1371        ctx.add_pending_semantic_index_paths(semantic_source_paths.clone());
1372    }
1373
1374    if let Ok(mut symbol_cache) = ctx.symbol_cache().write() {
1375        for path in &changed {
1376            symbol_cache.invalidate(path);
1377        }
1378    }
1379
1380    // Invalidate each changed file in the call graph.
1381    let mut graph_ref = ctx.callgraph().lock();
1382    if let Some(graph) = graph_ref.as_mut() {
1383        for path in &changed {
1384            if watcher_path_is_source(path) {
1385                graph.invalidate_file(path);
1386            }
1387        }
1388    }
1389    drop(graph_ref);
1390
1391    let mut semantic_refresh_paths = Vec::new();
1392    if !oversized_inline_batch {
1393        refresh_callgraph_store_for_watcher(ctx, &changed);
1394
1395        {
1396            let mut index_ref = ctx
1397                .search_index()
1398                .write()
1399                .unwrap_or_else(std::sync::PoisonError::into_inner);
1400            if let Some(index) = index_ref.as_mut() {
1401                for path in &changed {
1402                    if path.exists() {
1403                        index.update_file(path);
1404                    } else {
1405                        index.remove_file(path);
1406                    }
1407                }
1408            }
1409        }
1410
1411        let stale_paths = {
1412            let mut semantic_index_ref = ctx
1413                .semantic_index()
1414                .write()
1415                .unwrap_or_else(std::sync::PoisonError::into_inner);
1416            let mut stale_paths = Vec::new();
1417            if let Some(index) = semantic_index_ref.as_mut() {
1418                for path in &semantic_source_paths {
1419                    index.invalidate_file(path);
1420                    stale_paths.push(path.clone());
1421                }
1422            }
1423            stale_paths
1424        };
1425        if !stale_paths.is_empty() {
1426            let mut status = ctx
1427                .semantic_index_status()
1428                .write()
1429                .unwrap_or_else(std::sync::PoisonError::into_inner);
1430            if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
1431                for path in &stale_paths {
1432                    status.add_refreshing_file(path.clone());
1433                }
1434                semantic_refresh_paths = stale_paths;
1435                status_changed = true;
1436            }
1437        }
1438    }
1439
1440    // A vanished file's LSP diagnostics would otherwise linger in the warm set
1441    // forever (no server republishes for a path that no longer exists),
1442    // inflating the error/warning counts in the status bar and `aft_inspect`.
1443    // Clear them here so every deletion source is covered (AFT delete, `rm`,
1444    // `git checkout`, branch switch) — not just the delete command. The agent
1445    // status bar reads E/W live from the warm set on each response, so clearing
1446    // the store is sufficient; the next tool call's bar reflects the new count.
1447    //
1448    // Not gated on the trigram `SOURCE_EXTENSIONS` set: any registered LSP
1449    // server (Bash, YAML, Solidity, Vue, C/C++, custom servers, …) can publish
1450    // diagnostics for files outside that set, and gating on it left their
1451    // diagnostics stranded after deletion. `clear_for_file` is a cheap no-op
1452    // when the store holds nothing for the path, so clearing unconditionally
1453    // for every vanished path is safe.
1454    for path in &changed {
1455        if !path.exists() && ctx.lsp_clear_diagnostics_for_file(path) {
1456            status_changed = true;
1457        }
1458    }
1459
1460    if !semantic_refresh_paths.is_empty() {
1461        let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
1462            sender
1463                .send(SemanticRefreshRequest::Files {
1464                    paths: semantic_refresh_paths.clone(),
1465                })
1466                .is_ok()
1467        });
1468        if !sent {
1469            aft::slog_warn!(
1470                "semantic refresh worker unavailable; dropping {} refreshing file(s)",
1471                semantic_refresh_paths.len()
1472            );
1473            let mut status = ctx
1474                .semantic_index_status()
1475                .write()
1476                .unwrap_or_else(std::sync::PoisonError::into_inner);
1477            for path in &semantic_refresh_paths {
1478                status.cancel_refreshing_file(path);
1479            }
1480            status_changed = true;
1481        }
1482    }
1483
1484    aft::slog_info!("invalidated {} files", changed.len());
1485    if status_changed {
1486        ctx.status_emitter().signal(ctx.build_status_snapshot());
1487    }
1488    ctx.tick_tier2_refresh_scheduler(scheduler_changed_path_count);
1489}
1490
1491pub fn drain_lsp_events(ctx: &AppContext) {
1492    let drained = {
1493        let mut lsp = ctx.lsp();
1494        lsp.drain_events()
1495    };
1496    let mut status_changed = drained.diagnostics_changed;
1497    for event in drained.events {
1498        match event {
1499            LspEvent::Notification {
1500                server_kind,
1501                root,
1502                method,
1503                params,
1504            } => {
1505                log::debug!(
1506                    "[aft-lsp] notification {:?} {} {} {}",
1507                    server_kind,
1508                    root.display(),
1509                    method,
1510                    params.unwrap_or(serde_json::Value::Null)
1511                );
1512            }
1513            LspEvent::ServerRequest {
1514                server_kind,
1515                root,
1516                id,
1517                method,
1518                params,
1519            } => {
1520                log::debug!(
1521                    "[aft-lsp] request {:?} {} {:?} {} {}",
1522                    server_kind,
1523                    root.display(),
1524                    id,
1525                    method,
1526                    params.unwrap_or(serde_json::Value::Null)
1527                );
1528            }
1529            LspEvent::ServerExited { server_kind, root } => {
1530                aft::slog_info!("exited {:?} {}", server_kind, root.display());
1531                status_changed = true;
1532            }
1533        }
1534    }
1535    if status_changed {
1536        ctx.status_emitter().signal(ctx.build_status_snapshot());
1537    }
1538}