Skip to main content

aft/
runtime_drain.rs

1use crate as aft;
2use crate::context::{
3    AppContext, SemanticIndexEvent, SemanticIndexStatus, SemanticRefreshEvent,
4    SemanticRefreshRequest,
5};
6use crate::log_ctx;
7use crate::lsp::client::LspEvent;
8use crate::protocol::PushFrame;
9use crate::watcher_filter::{watcher_path_is_infra_skip, WatcherDispatchEvent};
10use std::collections::HashSet;
11use std::path::Path;
12use std::sync::Arc;
13use std::thread;
14use std::time::Duration;
15
16pub fn drain_configure_warning_events(ctx: &AppContext) {
17    for (generation, frame) in ctx.drain_configure_warnings() {
18        if ctx.configure_generation() != generation {
19            aft::slog_info!(
20                "dropping stale configure_warnings for generation {} (current {})",
21                generation,
22                ctx.configure_generation()
23            );
24            continue;
25        }
26
27        if let Some(sender) = ctx.progress_sender_handle() {
28            sender(PushFrame::ConfigureWarnings(frame));
29        }
30    }
31}
32
33pub fn drain_inspect_events(ctx: &AppContext) {
34    let drained = ctx.inspect_manager().drain_completions();
35    // Watcher-driven Tier-2 scans complete via the reuse path, which bypasses
36    // `result_rx`/`drain_completions`. Poll the manager's reuse counter so a
37    // background scan still refreshes the bar (#3) — otherwise the counts and
38    // `~` marker would only update on a manual `aft_inspect`.
39    let reuse_completed = ctx.take_new_reuse_completions();
40    // A completed background Tier-2 scan refreshes the agent status-bar counts
41    // to the freshly-persisted aggregate, and clears the stale marker — so the
42    // bar reflects the new numbers on the next tool result without waiting for
43    // an explicit aft_inspect call.
44    if drained > 0 || reuse_completed {
45        if let Some(project_root) = ctx.config().project_root.clone() {
46            let (dead_code, unused_exports, duplicates) = ctx
47                .inspect_manager()
48                .latest_tier2_counts(ctx.inspect_dir(), project_root);
49            // Don't clear the `~` stale marker until the whole serial Tier-2
50            // cycle has drained — while any category is still in flight the
51            // already-persisted categories may predate the latest edit, so
52            // claiming fresh would be premature (#20). `None` counts preserve
53            // the last-known value rather than fabricating a `0` (#1).
54            let stale = ctx.inspect_manager().tier2_any_in_flight();
55            ctx.update_status_bar_tier2(dead_code, unused_exports, duplicates, None, stale);
56            // Push the refreshed snapshot so the sidebar reflects the new Tier-2
57            // counts immediately. `update_status_bar_tier2` only mutates the
58            // in-memory counts (which the agent status bar reads live on each
59            // tool result); the push-driven sidebar would otherwise keep showing
60            // the pre-population snapshot — where `status_bar` was null and the
61            // Code Health section stayed hidden — until some unrelated event
62            // happened to emit a status frame.
63            ctx.status_emitter().signal(ctx.build_status_snapshot());
64        }
65    }
66}
67
68/// Drain all background build-completion receivers in standalone order.
69///
70/// Search installs first so watcher/pending updates apply to the freshest index,
71/// followed by callgraph store and semantic index completion.
72pub fn drain_build_completions(ctx: &AppContext) {
73    drain_search_index_events(ctx);
74    drain_callgraph_store_events(ctx);
75    drain_semantic_index_events(ctx);
76}
77
78/// Return true when any background build-completion receiver is currently set.
79///
80/// Each receiver is checked under its own short lock; no lock is held while
81/// checking the next subsystem.
82pub fn any_build_in_flight(ctx: &AppContext) -> bool {
83    {
84        let rx = ctx
85            .search_index_rx()
86            .read()
87            .unwrap_or_else(std::sync::PoisonError::into_inner);
88        if rx.is_some() {
89            return true;
90        }
91    }
92
93    {
94        let rx = ctx.callgraph_store_rx().lock();
95        if rx.is_some() {
96            return true;
97        }
98    }
99
100    {
101        let rx = ctx.semantic_index_rx().lock();
102        rx.is_some()
103    }
104}
105
106pub fn watcher_path_is_ignored_by_current_matcher(ctx: &AppContext, path: &Path) -> bool {
107    if watcher_path_is_infra_skip(path) {
108        return true;
109    }
110
111    if let Some(matcher) = ctx.gitignore() {
112        if path.starts_with(matcher.path()) {
113            let is_dir = path.is_dir();
114            return matcher
115                .matched_path_or_any_parents(path, is_dir)
116                .is_ignore();
117        }
118    }
119
120    false
121}
122
123fn replay_search_index_pending_updates(
124    ctx: &AppContext,
125    index: &mut crate::search_index::SearchIndex,
126    pending_paths: Vec<std::path::PathBuf>,
127) {
128    for path in pending_paths {
129        if path.exists() {
130            if watcher_path_is_ignored_by_current_matcher(ctx, &path) {
131                index.remove_file(&path);
132            } else {
133                index.update_file(&path);
134            }
135        } else {
136            index.remove_file(&path);
137        }
138    }
139}
140
141pub fn watcher_path_is_semantic_source(path: &Path) -> bool {
142    crate::semantic_index::is_semantic_indexed_extension(path)
143}
144
145pub fn mark_semantic_corpus_refresh_success(ctx: &AppContext) {
146    ctx.clear_all_semantic_refresh_retry_attempts();
147    ctx.reset_semantic_refresh_circuit_after_success();
148}
149
150pub fn drain_search_index_events(ctx: &AppContext) {
151    let (latest, disconnected) = {
152        let rx_ref = ctx
153            .search_index_rx()
154            .read()
155            .unwrap_or_else(std::sync::PoisonError::into_inner);
156        let Some(rx) = rx_ref.as_ref() else {
157            return;
158        };
159
160        let mut latest = None;
161        let mut disconnected = false;
162        loop {
163            match rx.try_recv() {
164                Ok(index) => latest = Some(index),
165                Err(crossbeam_channel::TryRecvError::Empty) => break,
166                Err(crossbeam_channel::TryRecvError::Disconnected) => {
167                    disconnected = true;
168                    break;
169                }
170            }
171        }
172        (latest, disconnected)
173    };
174
175    let mut status_changed = false;
176    let mut installed_index = false;
177    if let Some(mut index) = latest {
178        let pending_paths = ctx.take_pending_search_index_paths();
179        if !pending_paths.is_empty() {
180            replay_search_index_pending_updates(ctx, &mut index, pending_paths);
181        }
182        *ctx.search_index()
183            .write()
184            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
185        installed_index = true;
186        status_changed = true;
187    }
188
189    if disconnected || installed_index {
190        *ctx.search_index_rx()
191            .write()
192            .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
193        if disconnected && !installed_index {
194            let _ = ctx.take_pending_search_index_paths();
195        }
196        status_changed = true;
197    }
198
199    if status_changed {
200        ctx.status_emitter().signal(ctx.build_status_snapshot());
201    }
202}
203
204/// Install a background-built callgraph store once its cold build completes.
205/// Mirrors `drain_search_index_events`: drains the receiver, installs the
206/// freshest store, replays paths that changed during the build, and clears the
207/// receiver. On build failure (channel disconnected with nothing installed) the
208/// receiver is cleared so a later op can retry the cold build.
209pub fn drain_callgraph_store_events(ctx: &AppContext) {
210    let (latest, disconnected) = {
211        let rx_ref = ctx.callgraph_store_rx().lock();
212        let Some(rx) = rx_ref.as_ref() else {
213            return;
214        };
215
216        let mut latest = None;
217        let mut disconnected = false;
218        loop {
219            match rx.try_recv() {
220                Ok(store) => latest = Some(store),
221                Err(crossbeam_channel::TryRecvError::Empty) => break,
222                Err(crossbeam_channel::TryRecvError::Disconnected) => {
223                    disconnected = true;
224                    break;
225                }
226            }
227        }
228        (latest, disconnected)
229    };
230
231    let mut status_changed = false;
232    let mut installed = false;
233    if let Some(store) = latest {
234        // Replay source files that changed while the cold build was running so
235        // the freshly-installed store reflects mid-build edits.
236        let pending = ctx.take_pending_callgraph_store_paths();
237        if !pending.is_empty() {
238            if let Err(error) = store.refresh_files(&pending) {
239                crate::slog_warn!(
240                    "callgraph store post-build pending refresh failed: {}",
241                    error
242                );
243                if let Err(mark_error) = store.mark_files_stale(&pending) {
244                    crate::slog_warn!(
245                        "failed to mark callgraph store files stale after post-build refresh failure: {}",
246                        mark_error
247                    );
248                }
249            }
250        }
251        *ctx.callgraph_store()
252            .write()
253            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::new(store));
254        installed = true;
255        status_changed = true;
256    }
257
258    if disconnected || installed {
259        *ctx.callgraph_store_rx().lock() = None;
260        if disconnected && !installed {
261            // Build failed: discard pending paths (no store to apply them to);
262            // a later op restarts the build and re-walks the project.
263            let _ = ctx.take_pending_callgraph_store_paths();
264        }
265        status_changed = true;
266    }
267
268    if status_changed {
269        ctx.status_emitter().signal(ctx.build_status_snapshot());
270    }
271}
272
273pub fn drain_semantic_index_events(ctx: &AppContext) {
274    let (events, disconnected) = {
275        let rx_ref = ctx.semantic_index_rx().lock();
276        let Some(rx) = rx_ref.as_ref() else {
277            return;
278        };
279
280        let mut events = Vec::new();
281        let mut disconnected = false;
282        loop {
283            match rx.try_recv() {
284                Ok(event) => events.push(event),
285                Err(crossbeam_channel::TryRecvError::Empty) => break,
286                Err(crossbeam_channel::TryRecvError::Disconnected) => {
287                    disconnected = true;
288                    break;
289                }
290            }
291        }
292        (events, disconnected)
293    };
294
295    if events.is_empty() && !disconnected {
296        return;
297    }
298
299    let mut keep_receiver = true;
300    let mut status_changed = false;
301    let mut replay_refresh_paths = Vec::new();
302    let mut replay_corpus_refresh = false;
303    for event in events {
304        match event {
305            SemanticIndexEvent::Progress {
306                stage,
307                files,
308                entries_done,
309                entries_total,
310            } => {
311                *ctx.semantic_index_status()
312                    .write()
313                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
314                    SemanticIndexStatus::Building {
315                        stage,
316                        files,
317                        entries_done,
318                        entries_total,
319                    };
320                // Push progress to the sidebar. Without this, a long rebuild
321                // (e.g. a slow local embedding backend re-indexing after a prior
322                // failure) leaves the sidebar showing the stale prior state —
323                // "failed" with an old error — for the entire build, even though
324                // it is actively embedding. Progress transitions are exactly
325                // when the user needs to see "building".
326                status_changed = true;
327            }
328            SemanticIndexEvent::ColdSeedGateCleared => {
329                ctx.resume_deferred_work_after_semantic_cold_seed_gate_cleared();
330            }
331            SemanticIndexEvent::Ready(mut index) => {
332                mark_semantic_corpus_refresh_success(ctx);
333                let pending_paths = ctx.take_pending_semantic_index_paths();
334                for path in pending_paths {
335                    if watcher_path_is_semantic_source(&path) {
336                        index.invalidate_file(&path);
337                        replay_refresh_paths.push(path);
338                    }
339                }
340                replay_corpus_refresh = ctx.take_pending_semantic_corpus_refresh();
341                *ctx.semantic_index()
342                    .write()
343                    .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
344                *ctx.semantic_index_status()
345                    .write()
346                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
347                    SemanticIndexStatus::ready();
348                keep_receiver = false;
349                status_changed = true;
350                ctx.clear_semantic_cold_seed_gate_and_resume_deferred_work();
351            }
352            SemanticIndexEvent::Failed(error) => {
353                let _ = ctx.take_pending_semantic_index_paths();
354                let _ = ctx.take_pending_semantic_corpus_refresh();
355                *ctx.semantic_index()
356                    .write()
357                    .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
358                ctx.clear_semantic_refresh_worker();
359                *ctx.semantic_index_status()
360                    .write()
361                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
362                    SemanticIndexStatus::Failed(error);
363                keep_receiver = false;
364                status_changed = true;
365                ctx.clear_semantic_cold_seed_gate_and_resume_deferred_work();
366            }
367        }
368    }
369
370    if disconnected && keep_receiver {
371        let _ = ctx.take_pending_semantic_index_paths();
372        let _ = ctx.take_pending_semantic_corpus_refresh();
373        *ctx.semantic_index()
374            .write()
375            .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
376        ctx.clear_semantic_refresh_worker();
377        *ctx.semantic_index_status()
378            .write()
379            .unwrap_or_else(std::sync::PoisonError::into_inner) = SemanticIndexStatus::Failed(
380            "semantic index build worker disconnected before reporting completion".to_string(),
381        );
382        keep_receiver = false;
383        status_changed = true;
384        ctx.clear_semantic_cold_seed_gate_and_resume_deferred_work();
385    }
386
387    if !keep_receiver {
388        *ctx.semantic_index_rx().lock() = None;
389    }
390
391    if replay_corpus_refresh {
392        if ctx.canonical_cache_root_opt().is_some() {
393            *ctx.semantic_index_status()
394                .write()
395                .unwrap_or_else(std::sync::PoisonError::into_inner) =
396                SemanticIndexStatus::Building {
397                    stage: "refreshing_corpus".to_string(),
398                    files: None,
399                    entries_done: None,
400                    entries_total: None,
401                };
402            let sent = ctx
403                .semantic_refresh_sender()
404                .is_some_and(|sender| sender.send(SemanticRefreshRequest::Corpus).is_ok());
405            if !sent {
406                *ctx.semantic_index_status()
407                    .write()
408                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
409                    SemanticIndexStatus::Failed(
410                        "semantic corpus refresh worker unavailable".to_string(),
411                    );
412            }
413            status_changed = true;
414        }
415    } else if !replay_refresh_paths.is_empty() {
416        {
417            let mut status = ctx
418                .semantic_index_status()
419                .write()
420                .unwrap_or_else(std::sync::PoisonError::into_inner);
421            if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
422                for path in &replay_refresh_paths {
423                    status.add_refreshing_file(path.clone());
424                }
425                status_changed = true;
426            }
427        }
428        let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
429            sender
430                .send(SemanticRefreshRequest::Files {
431                    paths: replay_refresh_paths.clone(),
432                })
433                .is_ok()
434        });
435        if !sent {
436            crate::slog_warn!(
437                "semantic refresh worker unavailable; dropping {} replayed file(s)",
438                replay_refresh_paths.len()
439            );
440            let mut status = ctx
441                .semantic_index_status()
442                .write()
443                .unwrap_or_else(std::sync::PoisonError::into_inner);
444            for path in &replay_refresh_paths {
445                status.cancel_refreshing_file(path);
446            }
447            status_changed = true;
448        }
449    }
450
451    if status_changed {
452        ctx.status_emitter().signal(ctx.build_status_snapshot());
453    }
454}
455
456pub const MAX_RETRY_ATTEMPTS: usize = 6;
457pub const BREAKER_TRIP_THRESHOLD: usize = 3;
458
459/// Backoff for live semantic refresh retries after a transient embedding backend
460/// failure. Mirrors the cold-build retry cadence (15s -> 30s -> 60s capped) so
461/// a down backend cannot spin the watcher/refresh loop hot while still
462/// self-healing once the backend returns.
463fn semantic_refresh_retry_backoff(attempt: usize) -> Duration {
464    // Test seam, intentionally matching the build-level retry override.
465    if let Ok(raw) = std::env::var("AFT_SEMANTIC_RETRY_BACKOFF_MS") {
466        if let Ok(ms) = raw.parse::<u64>() {
467            return Duration::from_millis(ms);
468        }
469    }
470    const SCHEDULE_SECS: [u64; 3] = [15, 30, 60];
471    let secs = SCHEDULE_SECS
472        .get(attempt)
473        .copied()
474        .unwrap_or(*SCHEDULE_SECS.last().unwrap());
475    Duration::from_secs(secs)
476}
477
478struct SemanticRefreshRetryPlan {
479    retry_paths: Vec<std::path::PathBuf>,
480    capped_paths: Vec<std::path::PathBuf>,
481    delay: Option<Duration>,
482}
483
484fn next_semantic_refresh_retry_plan(
485    ctx: &AppContext,
486    paths: Vec<std::path::PathBuf>,
487) -> SemanticRefreshRetryPlan {
488    let mut retry_paths = Vec::new();
489    let mut capped_paths = Vec::new();
490    let mut max_attempt = 0usize;
491
492    ctx.with_semantic_refresh_retry_attempts_mut(|attempts| {
493        for path in paths {
494            let attempt = attempts.get(&path).copied().unwrap_or(0);
495            if attempt >= MAX_RETRY_ATTEMPTS {
496                capped_paths.push(path);
497                continue;
498            }
499            max_attempt = max_attempt.max(attempt);
500            attempts.insert(path.clone(), attempt.saturating_add(1));
501            retry_paths.push(path);
502        }
503    });
504
505    let delay = if retry_paths.is_empty() {
506        None
507    } else {
508        Some(semantic_refresh_retry_backoff(max_attempt))
509    };
510
511    SemanticRefreshRetryPlan {
512        retry_paths,
513        capped_paths,
514        delay,
515    }
516}
517
518fn clear_semantic_refresh_retry_attempts(ctx: &AppContext, paths: &[std::path::PathBuf]) {
519    ctx.clear_semantic_refresh_retry_attempts(paths);
520}
521
522fn clear_completed_pending_semantic_index_paths(
523    ctx: &AppContext,
524    completed_paths: &[std::path::PathBuf],
525) {
526    if completed_paths.is_empty() {
527        return;
528    }
529
530    let completed = completed_paths.iter().cloned().collect::<HashSet<_>>();
531    let remaining = ctx
532        .take_pending_semantic_index_paths()
533        .into_iter()
534        .filter(|path| !completed.contains(path))
535        .collect::<Vec<_>>();
536    if !remaining.is_empty() {
537        ctx.add_pending_semantic_index_paths(remaining);
538    }
539}
540
541fn semantic_refresh_probe_delay() -> Duration {
542    semantic_refresh_retry_backoff(usize::MAX)
543}
544
545pub fn semantic_refresh_circuit_is_open(ctx: &AppContext) -> bool {
546    ctx.semantic_refresh_circuit_is_open()
547}
548
549pub fn record_semantic_refresh_transient_failure(ctx: &AppContext) -> bool {
550    ctx.record_semantic_refresh_transient_failure(BREAKER_TRIP_THRESHOLD)
551}
552
553fn reset_semantic_refresh_transient_failure_count(ctx: &AppContext) {
554    ctx.reset_semantic_refresh_transient_failure_count();
555}
556
557fn reset_semantic_refresh_circuit_after_success(ctx: &AppContext) {
558    ctx.reset_semantic_refresh_circuit_after_success();
559}
560
561fn mark_semantic_refresh_success(ctx: &AppContext, completed_paths: &[std::path::PathBuf]) {
562    clear_semantic_refresh_retry_attempts(ctx, completed_paths);
563    clear_completed_pending_semantic_index_paths(ctx, completed_paths);
564    reset_semantic_refresh_circuit_after_success(ctx);
565}
566
567#[doc(hidden)]
568pub fn semantic_refresh_transient_failure_count_for_test(ctx: &AppContext) -> usize {
569    ctx.semantic_refresh_transient_failure_count()
570}
571
572#[doc(hidden)]
573pub fn semantic_refresh_probe_is_scheduled_for_test(ctx: &AppContext) -> bool {
574    ctx.semantic_refresh_probe_is_scheduled()
575}
576
577fn ensure_semantic_refresh_probe_scheduled(ctx: &AppContext) {
578    ctx.ensure_semantic_refresh_probe_scheduled(semantic_refresh_probe_delay());
579}
580
581fn maybe_fire_semantic_refresh_probe(ctx: &AppContext) {
582    if !ctx.take_semantic_refresh_probe_ready() {
583        return;
584    }
585    if !semantic_refresh_circuit_is_open(ctx) {
586        return;
587    }
588
589    let pending_paths = ctx.take_pending_semantic_index_paths();
590    if pending_paths.is_empty() {
591        return;
592    }
593
594    let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
595        sender
596            .send(SemanticRefreshRequest::Files {
597                paths: pending_paths.clone(),
598            })
599            .is_ok()
600    });
601    if !sent {
602        ctx.add_pending_semantic_index_paths(pending_paths);
603    }
604}
605
606pub fn schedule_semantic_refresh_retry(
607    ctx: &AppContext,
608    paths: Vec<std::path::PathBuf>,
609    error: &str,
610) -> bool {
611    if paths.is_empty() {
612        return false;
613    }
614    let Some(sender) = ctx.semantic_refresh_sender() else {
615        return false;
616    };
617
618    let SemanticRefreshRetryPlan {
619        retry_paths,
620        capped_paths,
621        delay,
622    } = next_semantic_refresh_retry_plan(ctx, paths);
623
624    if !capped_paths.is_empty() {
625        aft::slog_warn!(
626            "semantic refresh retry limit reached for {} file(s); preserving for next watcher/configure refresh",
627            capped_paths.len(),
628        );
629        ctx.add_pending_semantic_index_paths(capped_paths);
630    }
631
632    let Some(delay) = delay else {
633        return true;
634    };
635
636    let clean = aft::semantic_index::strip_transient_embedding_marker(error);
637    aft::slog_warn!(
638        "semantic refresh hit a transient backend error ({}); retrying {} file(s) in {}ms",
639        clean,
640        retry_paths.len(),
641        delay.as_millis(),
642    );
643
644    let session_id = log_ctx::current_session();
645    thread::spawn(move || {
646        log_ctx::with_session(session_id, || {
647            thread::sleep(delay);
648            let _ = sender.send(SemanticRefreshRequest::Files { paths: retry_paths });
649        });
650    });
651    true
652}
653
654pub fn drain_semantic_refresh_events(ctx: &AppContext) {
655    let (events, disconnected) = {
656        let rx_ref = ctx.semantic_refresh_event_rx().lock();
657        let Some(rx) = rx_ref.as_ref() else {
658            return;
659        };
660
661        let mut events = Vec::new();
662        let mut disconnected = false;
663        loop {
664            match rx.try_recv() {
665                Ok(event) => events.push(event),
666                Err(crossbeam_channel::TryRecvError::Empty) => break,
667                Err(crossbeam_channel::TryRecvError::Disconnected) => {
668                    disconnected = true;
669                    break;
670                }
671            }
672        }
673        (events, disconnected)
674    };
675
676    if events.is_empty() && !disconnected {
677        maybe_fire_semantic_refresh_probe(ctx);
678        return;
679    }
680
681    let had_events = !events.is_empty();
682    let mut status_changed = false;
683    let mut replay_refresh_paths = Vec::new();
684    for event in events {
685        match event {
686            SemanticRefreshEvent::Started { paths } => {
687                let mut status = ctx
688                    .semantic_index_status()
689                    .write()
690                    .unwrap_or_else(std::sync::PoisonError::into_inner);
691                if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
692                    for path in paths {
693                        status.start_refreshing_file(path);
694                    }
695                    status_changed = true;
696                }
697            }
698            SemanticRefreshEvent::CorpusStarted { files } => {
699                *ctx.semantic_index_status()
700                    .write()
701                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
702                    SemanticIndexStatus::Building {
703                        stage: "refreshing_corpus".to_string(),
704                        files: Some(files),
705                        entries_done: None,
706                        entries_total: None,
707                    };
708                status_changed = true;
709            }
710            SemanticRefreshEvent::Completed {
711                added_entries,
712                updated_metadata,
713                completed_paths,
714            } => {
715                if let Some(index) = ctx
716                    .semantic_index()
717                    .write()
718                    .unwrap_or_else(std::sync::PoisonError::into_inner)
719                    .as_mut()
720                {
721                    index.apply_refresh_update(added_entries, updated_metadata, &completed_paths);
722                }
723                mark_semantic_refresh_success(ctx, &completed_paths);
724                let mut status = ctx
725                    .semantic_index_status()
726                    .write()
727                    .unwrap_or_else(std::sync::PoisonError::into_inner);
728                if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
729                    for path in &completed_paths {
730                        status.complete_refreshing_file(path);
731                    }
732                    status_changed = true;
733                }
734            }
735            SemanticRefreshEvent::CorpusCompleted {
736                mut index,
737                changed,
738                added,
739                deleted,
740                total_processed,
741            } => {
742                aft::runtime_drain::mark_semantic_corpus_refresh_success(ctx);
743                if changed > 0 || added > 0 || deleted > 0 {
744                    aft::slog_info!(
745                        "semantic corpus refresh completed: {} changed, {} new, {} deleted, {} total processed",
746                        changed,
747                        added,
748                        deleted,
749                        total_processed
750                    );
751                }
752                let pending_paths = ctx.take_pending_semantic_index_paths();
753                for path in pending_paths {
754                    if !aft::runtime_drain::watcher_path_is_semantic_source(&path) {
755                        continue;
756                    }
757                    index.invalidate_file(&path);
758                    if !aft::runtime_drain::watcher_path_is_ignored_by_current_matcher(ctx, &path) {
759                        replay_refresh_paths.push(path);
760                    }
761                }
762                *ctx.semantic_index()
763                    .write()
764                    .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(index);
765                *ctx.semantic_index_status()
766                    .write()
767                    .unwrap_or_else(std::sync::PoisonError::into_inner) =
768                    SemanticIndexStatus::ready();
769                status_changed = true;
770            }
771            SemanticRefreshEvent::Failed { paths, error } => {
772                if aft::semantic_index::embedding_failure_is_transient(&error) {
773                    if record_semantic_refresh_transient_failure(ctx) {
774                        ctx.add_pending_semantic_index_paths(paths);
775                        ensure_semantic_refresh_probe_scheduled(ctx);
776                    } else if !schedule_semantic_refresh_retry(ctx, paths.clone(), &error) {
777                        aft::slog_warn!(
778                            "semantic refresh worker unavailable; preserving {} transiently failed file(s) for retry",
779                            paths.len(),
780                        );
781                        ctx.add_pending_semantic_index_paths(paths);
782                    }
783                } else {
784                    aft::slog_warn!("semantic refresh failed: {}", error);
785                    reset_semantic_refresh_transient_failure_count(ctx);
786                    clear_semantic_refresh_retry_attempts(ctx, &paths);
787                    let mut status = ctx
788                        .semantic_index_status()
789                        .write()
790                        .unwrap_or_else(std::sync::PoisonError::into_inner);
791                    if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
792                        for path in &paths {
793                            status.complete_refreshing_file(path);
794                        }
795                        status_changed = true;
796                    }
797                }
798            }
799            SemanticRefreshEvent::CorpusFailed { error } => {
800                // A transient backend blip during a corpus refresh must NOT
801                // destroy the working index — the prior index is still valid and
802                // serving. Keep it Ready and let the next watcher/ignore change
803                // re-trigger the refresh, rather than nuking everything to
804                // `Failed` over a connection hiccup (the same park-forever trap
805                // the initial build now rides out). Permanent errors (dimension
806                // mismatch, too-many-files) still drop the index and surface the
807                // real failure.
808                if aft::semantic_index::embedding_failure_is_transient(&error) {
809                    let clean = aft::semantic_index::strip_transient_embedding_marker(&error);
810                    let has_index = ctx
811                        .semantic_index()
812                        .read()
813                        .unwrap_or_else(std::sync::PoisonError::into_inner)
814                        .is_some();
815                    if has_index {
816                        aft::slog_warn!(
817                            "semantic corpus refresh hit a transient backend error ({}); keeping the existing index",
818                            clean,
819                        );
820                        *ctx.semantic_index_status()
821                            .write()
822                            .unwrap_or_else(std::sync::PoisonError::into_inner) =
823                            SemanticIndexStatus::ready();
824                    } else {
825                        // No index to fall back on — surface the clean message.
826                        aft::slog_warn!("semantic corpus refresh failed: {}", clean);
827                        *ctx.semantic_index_status()
828                            .write()
829                            .unwrap_or_else(std::sync::PoisonError::into_inner) =
830                            SemanticIndexStatus::Failed(clean);
831                    }
832                    status_changed = true;
833                } else {
834                    aft::slog_warn!("semantic corpus refresh failed: {}", error);
835                    let _ = ctx.take_pending_semantic_index_paths();
836                    *ctx.semantic_index()
837                        .write()
838                        .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
839                    *ctx.semantic_index_status()
840                        .write()
841                        .unwrap_or_else(std::sync::PoisonError::into_inner) =
842                        SemanticIndexStatus::Failed(error);
843                    status_changed = true;
844                }
845            }
846        }
847    }
848
849    if disconnected {
850        ctx.clear_semantic_refresh_worker();
851        let refreshing_paths = {
852            let status = ctx
853                .semantic_index_status()
854                .read()
855                .unwrap_or_else(std::sync::PoisonError::into_inner);
856            match &*status {
857                SemanticIndexStatus::Ready { refreshing, .. } => refreshing.clone(),
858                _ => Vec::new(),
859            }
860        };
861        if !refreshing_paths.is_empty() {
862            let mut status = ctx
863                .semantic_index_status()
864                .write()
865                .unwrap_or_else(std::sync::PoisonError::into_inner);
866            for path in &refreshing_paths {
867                status.cancel_refreshing_file(path);
868            }
869        }
870        if !refreshing_paths.is_empty() || had_events {
871            status_changed = true;
872        }
873    }
874
875    if !replay_refresh_paths.is_empty() {
876        {
877            let mut status = ctx
878                .semantic_index_status()
879                .write()
880                .unwrap_or_else(std::sync::PoisonError::into_inner);
881            if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
882                for path in &replay_refresh_paths {
883                    status.add_refreshing_file(path.clone());
884                }
885                status_changed = true;
886            }
887        }
888        let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
889            sender
890                .send(SemanticRefreshRequest::Files {
891                    paths: replay_refresh_paths.clone(),
892                })
893                .is_ok()
894        });
895        if !sent {
896            aft::slog_warn!(
897                "semantic refresh worker unavailable; dropping {} replayed corpus file(s)",
898                replay_refresh_paths.len()
899            );
900            let mut status = ctx
901                .semantic_index_status()
902                .write()
903                .unwrap_or_else(std::sync::PoisonError::into_inner);
904            for path in &replay_refresh_paths {
905                status.cancel_refreshing_file(path);
906            }
907            status_changed = true;
908        }
909    }
910
911    maybe_fire_semantic_refresh_probe(ctx);
912
913    if status_changed {
914        ctx.status_emitter().signal(ctx.build_status_snapshot());
915    }
916}
917
918/// Source file extensions that the call graph supports.
919const SOURCE_EXTENSIONS: &[&str] = &[
920    "ts", "tsx", "mts", "cts", "js", "jsx", "mjs", "cjs", "py", "pyi", "rs", "go",
921];
922
923pub const WATCHER_BATCH_INLINE_CAP: usize = 256;
924
925/// A `tsconfig.json` / `jsconfig.json` (including variant names like
926/// `tsconfig.base.json`). A change to any of these can shift TypeScript build
927/// membership (which files `tsc` checks), so the status-bar membership cache
928/// must be invalidated. Deliberately broad on the variant suffix and ignorant
929/// of `extends` graphs: the cache is cleared wholesale on a match, and base
930/// configs almost always follow the `tsconfig*.json` naming. Non-standard base
931/// names are covered on the next `tsconfig.json` change or `configure`.
932pub fn watcher_path_is_tsconfig(path: &std::path::Path) -> bool {
933    path.file_name()
934        .and_then(|n| n.to_str())
935        .map(|n| {
936            n == "tsconfig.json"
937                || n == "jsconfig.json"
938                || ((n.starts_with("tsconfig.") || n.starts_with("jsconfig."))
939                    && n.ends_with(".json"))
940        })
941        .unwrap_or(false)
942}
943
944pub fn watcher_path_is_source(path: &std::path::Path) -> bool {
945    path.extension()
946        .and_then(|ext| ext.to_str())
947        .is_some_and(|ext| SOURCE_EXTENSIONS.contains(&ext))
948}
949
950/// A file the callgraph STORE would have indexed at cold-build time. The store
951/// indexes every file `walk_project_files` yields (i.e. any detected language),
952/// not just the trigram `SOURCE_EXTENSIONS` set. Gating the store's watcher
953/// refresh on the narrower trigram set left edits to Java/C/C++/C#/Kotlin/Ruby/
954/// PHP/… (all of which the store extracts calls for) serving stale results until
955/// a full rebuild. Mirror cold-build exactly so refresh coverage == index
956/// coverage.
957pub fn watcher_path_is_callgraph_indexed(path: &std::path::Path) -> bool {
958    aft::parser::detect_language(path).is_some()
959}
960
961pub fn semantic_corpus_refresh_in_progress(ctx: &AppContext) -> bool {
962    let status = ctx
963        .semantic_index_status()
964        .read()
965        .unwrap_or_else(std::sync::PoisonError::into_inner);
966    matches!(
967        &*status,
968        SemanticIndexStatus::Building { stage, .. } if stage == "refreshing_corpus"
969    )
970}
971
972#[cfg(debug_assertions)]
973pub fn delay_search_rebuild_publish_for_debug() {
974    let Some(delay_ms) = std::env::var("AFT_TEST_SEARCH_REBUILD_PUBLISH_DELAY_MS")
975        .ok()
976        .and_then(|raw| raw.parse::<u64>().ok())
977    else {
978        return;
979    };
980    thread::sleep(Duration::from_millis(delay_ms));
981}
982
983#[cfg(not(debug_assertions))]
984pub fn delay_search_rebuild_publish_for_debug() {}
985
986pub fn spawn_search_corpus_refresh(
987    ctx: &AppContext,
988    root: std::path::PathBuf,
989    config: Arc<aft::config::Config>,
990) {
991    {
992        let mut search_index = ctx
993            .search_index()
994            .write()
995            .unwrap_or_else(std::sync::PoisonError::into_inner);
996        if let Some(index) = search_index.as_mut() {
997            index.ready = false;
998        }
999    }
1000
1001    let (tx, rx): (
1002        crossbeam_channel::Sender<aft::search_index::SearchIndex>,
1003        crossbeam_channel::Receiver<aft::search_index::SearchIndex>,
1004    ) = crossbeam_channel::unbounded();
1005    *ctx.search_index_rx()
1006        .write()
1007        .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(rx);
1008    ctx.reset_symbol_cache();
1009
1010    let is_worktree_bridge = ctx.is_worktree_bridge();
1011    let session_id = log_ctx::current_session();
1012    thread::spawn(move || {
1013        log_ctx::with_session(session_id, || {
1014            let cache_dir =
1015                aft::search_index::resolve_cache_dir(&root, config.storage_dir.as_deref());
1016            let _cache_lock = if is_worktree_bridge {
1017                None
1018            } else {
1019                match aft::search_index::CacheLock::acquire(&cache_dir) {
1020                    Ok(lock) => Some(lock),
1021                    Err(error) => {
1022                        aft::slog_warn!(
1023                            "failed to acquire search cache lock for ignore refresh: {}",
1024                            error
1025                        );
1026                        None
1027                    }
1028                }
1029            };
1030            let mut index = aft::search_index::SearchIndex::build_with_limit_to_cache_dir(
1031                &root,
1032                config.search_index_max_file_size,
1033                &cache_dir,
1034            );
1035            delay_search_rebuild_publish_for_debug();
1036            if !is_worktree_bridge {
1037                let head = index.stored_git_head().map(str::to_owned);
1038                index.write_to_disk(&cache_dir, head.as_deref());
1039            }
1040            let _ = tx.send(index);
1041        });
1042    });
1043}
1044
1045pub fn refresh_project_corpus(
1046    ctx: &AppContext,
1047    reason: &str,
1048    _invalidate_ignore_paths: bool,
1049) -> bool {
1050    let Some(root) = ctx.canonical_cache_root_opt() else {
1051        return false;
1052    };
1053    let config = ctx.config();
1054    let mut status_changed = false;
1055
1056    if !ctx.is_worktree_bridge() {
1057        // Do NOT cold-build the callgraph store synchronously here. This function
1058        // runs on the single-threaded dispatch loop from `drain_watcher_events`,
1059        // which fires before EVERY request (and on idle ticks). A full O(repo)
1060        // `refresh_corpus` (= `cold_build`: parse all files + resolve refs +
1061        // rewrite SQLite) blocks ALL queued requests — including `configure` and
1062        // `bash` — for its entire duration, which exceeds the 30s transport
1063        // timeout on a large repo. On a long-lived bridge (OpenCode Desktop) an
1064        // FSEvents overflow triggers this drain, so the user sees configure/bash
1065        // time out (regression: the watcher-overflow path that calls this is new
1066        // in 0.39.1; the ignore-rule path that also calls this had the same
1067        // latent inline block, just rarely triggered).
1068        //
1069        // Instead, drop the resident store and force a BACKGROUND rebuild: the
1070        // next `callgraph_store_for_ops()` spawns the cold build off-thread and
1071        // returns `Building` (callgraph ops + dead_code projection already handle
1072        // `Building`/unavailable gracefully). This mirrors the search/semantic
1073        // refreshes below, which are already async. A build already in flight
1074        // keeps running; the resident drop + force flag make the next op converge
1075        // to a fresh full rebuild.
1076        // Mirror the original "act only when the callgraph is actually loaded or
1077        // building" guard, but reschedule instead of inline-building.
1078        let callgraph_store_resident = {
1079            let guard = ctx
1080                .callgraph_store()
1081                .read()
1082                .unwrap_or_else(std::sync::PoisonError::into_inner);
1083            guard.is_some()
1084        };
1085        if callgraph_store_resident || ctx.callgraph_store_rx().lock().is_some() {
1086            *ctx.callgraph_store()
1087                .write()
1088                .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
1089            ctx.mark_callgraph_store_force_rebuild();
1090            status_changed = true;
1091            aft::slog_info!(
1092                "callgraph store scheduled for background rebuild after {}",
1093                reason
1094            );
1095        }
1096    }
1097
1098    if config.search_index {
1099        spawn_search_corpus_refresh(ctx, root.clone(), config.clone());
1100        status_changed = true;
1101        aft::slog_info!("started search index refresh after {}", reason);
1102    }
1103
1104    if config.semantic_search {
1105        if let Some(sender) = ctx.semantic_refresh_sender() {
1106            *ctx.semantic_index_status()
1107                .write()
1108                .unwrap_or_else(std::sync::PoisonError::into_inner) =
1109                SemanticIndexStatus::Building {
1110                    stage: "refreshing_corpus".to_string(),
1111                    files: None,
1112                    entries_done: None,
1113                    entries_total: None,
1114                };
1115            match sender.send(SemanticRefreshRequest::Corpus) {
1116                Ok(()) => {
1117                    status_changed = true;
1118                }
1119                Err(error) => {
1120                    *ctx.semantic_index_status()
1121                        .write()
1122                        .unwrap_or_else(std::sync::PoisonError::into_inner) =
1123                        SemanticIndexStatus::Failed(format!(
1124                            "semantic corpus refresh worker unavailable: {error}"
1125                        ));
1126                    status_changed = true;
1127                }
1128            }
1129        } else if ctx.semantic_index_rx().lock().is_some() {
1130            ctx.mark_pending_semantic_corpus_refresh();
1131        }
1132    }
1133
1134    status_changed
1135}
1136
1137pub fn refresh_corpus_after_ignore_change(ctx: &AppContext) -> bool {
1138    refresh_project_corpus(ctx, "ignore-rule change", true)
1139}
1140
1141pub fn refresh_project_after_watcher_rescan(ctx: &AppContext) -> bool {
1142    if ctx.canonical_cache_root_opt().is_none() {
1143        return false;
1144    }
1145    ctx.clear_pending_index_updates();
1146    ctx.reset_symbol_cache();
1147    let _ = ctx.mark_status_bar_tier2_stale();
1148    ctx.clear_tsconfig_membership_cache();
1149    let mut status_changed = true;
1150
1151    status_changed |= refresh_project_corpus(ctx, "watcher overflow", false);
1152    status_changed
1153}
1154
1155pub fn refresh_callgraph_store_for_watcher(
1156    ctx: &AppContext,
1157    changed: &HashSet<std::path::PathBuf>,
1158) {
1159    if ctx.is_worktree_bridge() {
1160        return;
1161    }
1162    let source_paths = changed
1163        .iter()
1164        .filter(|path| watcher_path_is_callgraph_indexed(path))
1165        .cloned()
1166        .collect::<Vec<_>>();
1167    if source_paths.is_empty() {
1168        return;
1169    }
1170    // Converge to the current generation before writing: if another process
1171    // published a newer one, drop our stale store so the changed paths get
1172    // recorded as pending and replayed against the fresh store (rather than
1173    // incrementally written into a superseded generation).
1174    ctx.revalidate_callgraph_store_generation();
1175    let store = {
1176        let guard = ctx
1177            .callgraph_store()
1178            .read()
1179            .unwrap_or_else(std::sync::PoisonError::into_inner);
1180        guard.as_ref().map(Arc::clone)
1181    };
1182    let Some(store) = store else {
1183        // Store not resident yet. If a cold build is in flight, record the
1184        // changed paths so they're replayed once the freshly-built store lands
1185        // (otherwise mid-build edits would be silently lost). If no build is
1186        // running, there's nothing to refresh.
1187        if ctx.callgraph_store_rx().lock().is_some() {
1188            ctx.add_pending_callgraph_store_paths(source_paths);
1189        }
1190        return;
1191    };
1192    if let Err(error) = store.refresh_files(&source_paths) {
1193        aft::slog_warn!("callgraph store refresh failed: {}", error);
1194        match store.mark_files_stale(&source_paths) {
1195            Ok(marked) => aft::slog_warn!(
1196                "marked {} callgraph store file(s) stale after refresh failure",
1197                marked.len()
1198            ),
1199            Err(mark_error) => aft::slog_warn!(
1200                "failed to mark callgraph store files stale after refresh failure: {}",
1201                mark_error
1202            ),
1203        }
1204    }
1205}
1206
1207/// Drain pre-filtered watcher events and apply cache invalidations on the
1208/// dispatch thread. The watcher filter thread owns notify receive/decode,
1209/// metadata filtering, ignore matching, root-deleted detection, and path
1210/// coalescing; this drain only reacts to compact control events and surviving
1211/// paths because the cache/index state below is not Send.
1212pub fn drain_watcher_events(ctx: &AppContext) {
1213    let mut changed: HashSet<std::path::PathBuf> = HashSet::new();
1214    let mut ignore_file_changed = false;
1215    let mut rescan_required = false;
1216    let mut watcher_failed = None;
1217    let mut root_deleted = false;
1218
1219    {
1220        let rx_ref = ctx.watcher_rx().lock();
1221        let rx = match rx_ref.as_ref() {
1222            Some(rx) => rx,
1223            None => {
1224                ctx.tick_tier2_refresh_scheduler(0);
1225                return; // No watcher configured
1226            }
1227        };
1228
1229        loop {
1230            match rx.try_recv() {
1231                Ok(WatcherDispatchEvent::Paths(paths)) => {
1232                    if !rescan_required {
1233                        changed.extend(paths);
1234                    }
1235                }
1236                Ok(WatcherDispatchEvent::RescanRequired) => {
1237                    rescan_required = true;
1238                    changed.clear();
1239                }
1240                Ok(WatcherDispatchEvent::IgnoreRulesChanged { path }) => {
1241                    ignore_file_changed = true;
1242                    log::debug!(
1243                        "watcher: ignore rules changed at {}, rebuilding matcher",
1244                        path.display()
1245                    );
1246                    if !rescan_required {
1247                        ctx.rebuild_gitignore();
1248                    }
1249                }
1250                Ok(WatcherDispatchEvent::RootDeleted) => {
1251                    root_deleted = true;
1252                    break;
1253                }
1254                Ok(WatcherDispatchEvent::Error(error)) => {
1255                    watcher_failed = Some(error);
1256                    break;
1257                }
1258                Err(crossbeam_channel::TryRecvError::Empty) => break,
1259                Err(crossbeam_channel::TryRecvError::Disconnected) => {
1260                    watcher_failed = Some("watcher channel disconnected".to_string());
1261                    break;
1262                }
1263            }
1264        }
1265    }
1266
1267    let mut watcher_status_changed = false;
1268    if root_deleted {
1269        ctx.stop_watcher_runtime();
1270        let _ = ctx.add_degraded_reason("project_root_deleted".to_string());
1271        aft::slog_warn!(
1272            "project root deleted; dropping watcher to avoid delete-storm: {:?}",
1273            ctx.canonical_cache_root_opt()
1274        );
1275        watcher_status_changed = true;
1276        changed.clear();
1277        rescan_required = false;
1278    } else if let Some(error) = watcher_failed {
1279        ctx.stop_watcher_runtime();
1280        let _ = ctx.add_degraded_reason("watcher_unavailable".to_string());
1281        aft::slog_warn!(
1282            "file watcher unavailable; continuing without live external-change invalidation: {}",
1283            error
1284        );
1285        watcher_status_changed = true;
1286        rescan_required = false;
1287    }
1288
1289    let mut status_changed = watcher_status_changed;
1290    let mut project_corpus_refresh_requested = false;
1291    if rescan_required {
1292        aft::slog_warn!("watcher overflow: forcing project rescan");
1293        ctx.rebuild_gitignore();
1294        status_changed |= refresh_project_after_watcher_rescan(ctx);
1295        project_corpus_refresh_requested = true;
1296        changed.clear();
1297    } else if ignore_file_changed {
1298        status_changed |= refresh_corpus_after_ignore_change(ctx);
1299        project_corpus_refresh_requested = true;
1300    }
1301
1302    let scheduler_changed_path_count = if rescan_required {
1303        aft::inspect::tier2_scheduler::TIER2_REFRESH_STORM_PATH_THRESHOLD + 1
1304    } else if ignore_file_changed {
1305        changed.len().max(1)
1306    } else {
1307        changed.len()
1308    };
1309    if changed.is_empty() {
1310        if status_changed {
1311            ctx.status_emitter().signal(ctx.build_status_snapshot());
1312        }
1313        ctx.tick_tier2_refresh_scheduler(scheduler_changed_path_count);
1314        return;
1315    }
1316
1317    ctx.add_pending_tier2_paths(changed.iter().cloned());
1318
1319    // A real source change makes the last-known Tier-2 counts stale until the
1320    // next background scan reconciles them — surface that in the status bar
1321    // immediately (the `~` marker) so the agent never reads them as live.
1322    if ctx.mark_status_bar_tier2_stale() {
1323        status_changed = true;
1324    }
1325
1326    // A tsconfig change can shift which files `tsc` checks, which is the policy
1327    // the status-bar E/W count filters on. Clear the membership cache wholesale
1328    // so the next bar count re-resolves from disk (handles new nested configs,
1329    // edited `extends` parents, and deletions without per-key bookkeeping).
1330    if changed.iter().any(|path| watcher_path_is_tsconfig(path)) {
1331        ctx.clear_tsconfig_membership_cache();
1332        status_changed = true;
1333    }
1334
1335    let oversized_inline_batch = changed.len() > WATCHER_BATCH_INLINE_CAP;
1336    if oversized_inline_batch {
1337        aft::slog_warn!(
1338            "watcher batch of {} paths exceeds inline cap {}; scheduling corpus refresh",
1339            changed.len(),
1340            WATCHER_BATCH_INLINE_CAP
1341        );
1342        if !project_corpus_refresh_requested {
1343            status_changed |= refresh_project_corpus(ctx, "oversized watcher batch", false);
1344        }
1345    }
1346
1347    let search_build_in_progress = {
1348        let search_index_rx = ctx
1349            .search_index_rx()
1350            .read()
1351            .unwrap_or_else(std::sync::PoisonError::into_inner);
1352        search_index_rx.is_some()
1353    };
1354    if !oversized_inline_batch && search_build_in_progress {
1355        ctx.add_pending_search_index_paths(changed.iter().cloned());
1356    }
1357    let semantic_source_paths = changed
1358        .iter()
1359        .filter(|path| aft::runtime_drain::watcher_path_is_semantic_source(path))
1360        .cloned()
1361        .collect::<Vec<_>>();
1362    let semantic_build_in_progress = ctx.semantic_index_rx().lock().is_some();
1363    let semantic_corpus_refresh_in_progress = semantic_corpus_refresh_in_progress(ctx);
1364    if !oversized_inline_batch
1365        && (semantic_build_in_progress || semantic_corpus_refresh_in_progress)
1366        && !semantic_source_paths.is_empty()
1367    {
1368        ctx.add_pending_semantic_index_paths(semantic_source_paths.clone());
1369    }
1370
1371    if let Ok(mut symbol_cache) = ctx.symbol_cache().write() {
1372        for path in &changed {
1373            symbol_cache.invalidate(path);
1374        }
1375    }
1376
1377    let mut semantic_refresh_paths = Vec::new();
1378    if !oversized_inline_batch {
1379        refresh_callgraph_store_for_watcher(ctx, &changed);
1380
1381        {
1382            let mut index_ref = ctx
1383                .search_index()
1384                .write()
1385                .unwrap_or_else(std::sync::PoisonError::into_inner);
1386            if let Some(index) = index_ref.as_mut() {
1387                for path in &changed {
1388                    if path.exists() {
1389                        index.update_file(path);
1390                    } else {
1391                        index.remove_file(path);
1392                    }
1393                }
1394            }
1395        }
1396
1397        let stale_paths = {
1398            let mut semantic_index_ref = ctx
1399                .semantic_index()
1400                .write()
1401                .unwrap_or_else(std::sync::PoisonError::into_inner);
1402            let mut stale_paths = Vec::new();
1403            if let Some(index) = semantic_index_ref.as_mut() {
1404                for path in &semantic_source_paths {
1405                    index.invalidate_file(path);
1406                    stale_paths.push(path.clone());
1407                }
1408            }
1409            stale_paths
1410        };
1411        if !stale_paths.is_empty() {
1412            let mut status = ctx
1413                .semantic_index_status()
1414                .write()
1415                .unwrap_or_else(std::sync::PoisonError::into_inner);
1416            if matches!(&*status, SemanticIndexStatus::Ready { .. }) {
1417                for path in &stale_paths {
1418                    status.add_refreshing_file(path.clone());
1419                }
1420                semantic_refresh_paths = stale_paths;
1421                status_changed = true;
1422            }
1423        }
1424    }
1425
1426    // A vanished file's LSP diagnostics would otherwise linger in the warm set
1427    // forever (no server republishes for a path that no longer exists),
1428    // inflating the error/warning counts in the status bar and `aft_inspect`.
1429    // Clear them here so every deletion source is covered (AFT delete, `rm`,
1430    // `git checkout`, branch switch) — not just the delete command. The agent
1431    // status bar reads E/W live from the warm set on each response, so clearing
1432    // the store is sufficient; the next tool call's bar reflects the new count.
1433    //
1434    // Not gated on the trigram `SOURCE_EXTENSIONS` set: any registered LSP
1435    // server (Bash, YAML, Solidity, Vue, C/C++, custom servers, …) can publish
1436    // diagnostics for files outside that set, and gating on it left their
1437    // diagnostics stranded after deletion. `clear_for_file` is a cheap no-op
1438    // when the store holds nothing for the path, so clearing unconditionally
1439    // for every vanished path is safe.
1440    for path in &changed {
1441        if !path.exists() && ctx.lsp_clear_diagnostics_for_file(path) {
1442            status_changed = true;
1443        }
1444    }
1445
1446    if !semantic_refresh_paths.is_empty() {
1447        let sent = ctx.semantic_refresh_sender().is_some_and(|sender| {
1448            sender
1449                .send(SemanticRefreshRequest::Files {
1450                    paths: semantic_refresh_paths.clone(),
1451                })
1452                .is_ok()
1453        });
1454        if !sent {
1455            aft::slog_warn!(
1456                "semantic refresh worker unavailable; dropping {} refreshing file(s)",
1457                semantic_refresh_paths.len()
1458            );
1459            let mut status = ctx
1460                .semantic_index_status()
1461                .write()
1462                .unwrap_or_else(std::sync::PoisonError::into_inner);
1463            for path in &semantic_refresh_paths {
1464                status.cancel_refreshing_file(path);
1465            }
1466            status_changed = true;
1467        }
1468    }
1469
1470    aft::slog_info!("invalidated {} files", changed.len());
1471    if status_changed {
1472        ctx.status_emitter().signal(ctx.build_status_snapshot());
1473    }
1474    ctx.tick_tier2_refresh_scheduler(scheduler_changed_path_count);
1475}
1476
1477pub fn drain_lsp_events(ctx: &AppContext) {
1478    let drained = {
1479        let mut lsp = ctx.lsp();
1480        lsp.drain_events()
1481    };
1482    let mut status_changed = drained.diagnostics_changed;
1483    for event in drained.events {
1484        match event {
1485            LspEvent::Notification {
1486                server_kind,
1487                root,
1488                method,
1489                params,
1490            } => {
1491                log::debug!(
1492                    "[aft-lsp] notification {:?} {} {} {}",
1493                    server_kind,
1494                    root.display(),
1495                    method,
1496                    params.unwrap_or(serde_json::Value::Null)
1497                );
1498            }
1499            LspEvent::ServerRequest {
1500                server_kind,
1501                root,
1502                id,
1503                method,
1504                params,
1505            } => {
1506                log::debug!(
1507                    "[aft-lsp] request {:?} {} {:?} {} {}",
1508                    server_kind,
1509                    root.display(),
1510                    id,
1511                    method,
1512                    params.unwrap_or(serde_json::Value::Null)
1513                );
1514            }
1515            LspEvent::ServerExited { server_kind, root } => {
1516                aft::slog_info!("exited {:?} {}", server_kind, root.display());
1517                status_changed = true;
1518            }
1519        }
1520    }
1521    if status_changed {
1522        ctx.status_emitter().signal(ctx.build_status_snapshot());
1523    }
1524}