Skip to main content

aft/inspect/
manager.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6
7use crossbeam_channel::{after, bounded, select, Receiver, Sender};
8use serde::Deserialize;
9use serde_json::{json, Value};
10
11use super::cache::{InspectCache, Tier2ContributionUpdates};
12use super::dispatch::{default_worker, start_dispatch_loop, InspectWorker};
13use super::freshness::ContributionFreshness;
14use super::job::{
15    normalize_path, CallgraphExport, CallgraphOutboundCall, CallgraphSnapshot, FileContribution,
16    InspectCategory, InspectJob, InspectResult, InspectScanSuccess, InspectSnapshot, JobKey,
17    JobOutcome, JobScope, DISPATCHED_CALLEE_SEPARATOR,
18};
19use super::scanners::DEFAULT_EXPORT_MARKER_KIND;
20use crate::cache_freshness::{self, FileFreshness, FreshnessVerdict};
21use crate::callgraph::{is_bare_callee, resolve_symbol_query_in_data, CallGraph, EdgeResolution};
22use crate::symbols::SymbolKind;
23
24const DEFAULT_SOFT_DEADLINE: Duration = Duration::from_secs(1);
25
26type WaiterTx = Sender<JobOutcome>;
27
28#[derive(Clone)]
29struct Waiter {
30    tx: WaiterTx,
31}
32
33struct CachedContributionFreshness {
34    file_path: PathBuf,
35    freshness: FileFreshness,
36}
37
38#[derive(PartialEq, Eq)]
39struct ContributionFingerprint {
40    count: usize,
41    set_hash: String,
42    hash_complete: bool,
43}
44
45#[derive(Debug, Clone)]
46pub struct Tier2RunSubmissionError {
47    pub category: InspectCategory,
48    pub message: String,
49}
50
51#[derive(Debug, Clone, Default)]
52pub struct Tier2RunSubmission {
53    pub queued_categories: Vec<InspectCategory>,
54    pub newly_queued_categories: Vec<InspectCategory>,
55    pub errors: Vec<Tier2RunSubmissionError>,
56}
57
58impl Tier2RunSubmission {
59    pub fn has_new_work(&self) -> bool {
60        !self.newly_queued_categories.is_empty()
61    }
62}
63
64pub struct InspectManager {
65    request_tx: Sender<InspectJob>,
66    result_rx: Receiver<InspectResult>,
67    #[allow(dead_code)]
68    pool: Arc<rayon::ThreadPool>,
69    in_flight: Mutex<HashMap<JobKey, Vec<Waiter>>>,
70    caches: Mutex<HashMap<PathBuf, Arc<InspectCache>>>,
71    soft_deadline: Duration,
72    next_job_id: AtomicU64,
73    /// Monotonic count of Tier-2 completions delivered via the reuse path
74    /// (watcher-driven scheduler runs). These bypass `result_rx`/
75    /// `drain_completions`, so the `&AppContext`-side drain polls this counter
76    /// to know when to refresh the agent status bar after a background scan.
77    reuse_completions: AtomicU64,
78}
79
80impl InspectManager {
81    pub fn new() -> Self {
82        Self::with_worker(default_worker(), DEFAULT_SOFT_DEADLINE)
83    }
84
85    #[doc(hidden)]
86    pub fn with_worker(worker: InspectWorker, soft_deadline: Duration) -> Self {
87        let handles = start_dispatch_loop(worker);
88        Self {
89            request_tx: handles.request_tx,
90            result_rx: handles.result_rx,
91            pool: handles.pool,
92            in_flight: Mutex::new(HashMap::new()),
93            caches: Mutex::new(HashMap::new()),
94            soft_deadline,
95            next_job_id: AtomicU64::new(1),
96            reuse_completions: AtomicU64::new(0),
97        }
98    }
99
100    pub fn submit_category(
101        &self,
102        snapshot: InspectSnapshot,
103        category: InspectCategory,
104        caller_scope: JobScope,
105    ) -> JobOutcome {
106        self.submit_category_with_callgraph(snapshot, category, caller_scope, None)
107    }
108
109    pub fn submit_category_with_callgraph(
110        &self,
111        snapshot: InspectSnapshot,
112        category: InspectCategory,
113        caller_scope: JobScope,
114        callgraph_snapshot: Option<Arc<CallgraphSnapshot>>,
115    ) -> JobOutcome {
116        if !category.is_active() {
117            return JobOutcome::Failed {
118                message: format!("inspect category '{category}' is disabled in v0.33"),
119            };
120        }
121
122        let cache = match self.cache_for_snapshot(&snapshot) {
123            Ok(cache) => cache,
124            Err(message) => return JobOutcome::Failed { message },
125        };
126        let key = JobKey::for_category_scope(category, &caller_scope);
127        let (waiter_tx, waiter_rx) = bounded(1);
128
129        let wait_snapshot = snapshot.clone();
130        match self.enqueue_with_waiter(
131            snapshot,
132            category,
133            caller_scope.clone(),
134            key.clone(),
135            waiter_tx,
136            callgraph_snapshot,
137        ) {
138            Ok(()) => self.wait_for_outcome(key, caller_scope, cache, waiter_rx, wait_snapshot),
139            Err(message) => JobOutcome::Failed { message },
140        }
141    }
142
143    pub fn submit_background(
144        &self,
145        snapshot: InspectSnapshot,
146        category: InspectCategory,
147        caller_scope: JobScope,
148    ) -> Result<JobKey, String> {
149        self.submit_background_with_callgraph(snapshot, category, caller_scope, None)
150    }
151
152    pub fn submit_background_with_callgraph(
153        &self,
154        snapshot: InspectSnapshot,
155        category: InspectCategory,
156        caller_scope: JobScope,
157        callgraph_snapshot: Option<Arc<CallgraphSnapshot>>,
158    ) -> Result<JobKey, String> {
159        if !category.is_active() {
160            return Err(format!(
161                "inspect category '{category}' is disabled in v0.33"
162            ));
163        }
164        let key = JobKey::for_category_scope(category, &caller_scope);
165        self.enqueue_without_waiter(
166            snapshot,
167            category,
168            caller_scope,
169            key.clone(),
170            callgraph_snapshot,
171        )?;
172        Ok(key)
173    }
174
175    pub fn submit_tier2_run_with_reuse_background(
176        self: &Arc<Self>,
177        snapshot: InspectSnapshot,
178        category: InspectCategory,
179    ) -> Result<JobKey, String> {
180        if !category.is_active() {
181            return Err(format!(
182                "inspect category '{category}' is disabled in v0.33"
183            ));
184        }
185        if !category.is_tier2() {
186            return Err(format!(
187                "inspect category '{category}' is not a Tier 2 category"
188            ));
189        }
190
191        let job = self.tier2_reuse_job(snapshot, category, None);
192        let key = job.key.clone();
193        let mut in_flight = self
194            .in_flight
195            .lock()
196            .map_err(|_| "inspect in-flight map lock poisoned".to_string())?;
197        if in_flight.contains_key(&key) {
198            return Ok(key);
199        }
200        in_flight.insert(key.clone(), Vec::new());
201        drop(in_flight);
202
203        let manager = Arc::clone(self);
204        let pool = Arc::clone(&self.pool);
205        pool.spawn(move || {
206            let result = manager.tier2_run_with_reuse_job_result(job);
207            manager.route_tier2_reuse_completion(result);
208        });
209
210        Ok(key)
211    }
212
213    pub fn submit_tier2_run_with_reuse_serial_background(
214        self: &Arc<Self>,
215        snapshot: InspectSnapshot,
216        categories: Vec<InspectCategory>,
217    ) -> Tier2RunSubmission {
218        let mut submission = Tier2RunSubmission::default();
219        let mut requested = Vec::new();
220
221        for category in categories {
222            if !category.is_active() {
223                submission.errors.push(Tier2RunSubmissionError {
224                    category,
225                    message: format!("inspect category '{category}' is disabled in v0.33"),
226                });
227                continue;
228            }
229            if !category.is_tier2() {
230                submission.errors.push(Tier2RunSubmissionError {
231                    category,
232                    message: format!("inspect category '{category}' is not a Tier 2 category"),
233                });
234                continue;
235            }
236            requested.push(category);
237        }
238
239        if requested.is_empty() {
240            return submission;
241        }
242
243        let mut in_flight = match self.in_flight.lock() {
244            Ok(in_flight) => in_flight,
245            Err(_) => {
246                for category in requested {
247                    submission.errors.push(Tier2RunSubmissionError {
248                        category,
249                        message: "inspect in-flight map lock poisoned".to_string(),
250                    });
251                }
252                return submission;
253            }
254        };
255
256        for category in requested {
257            let key = JobKey::for_project_category(category);
258            submission.queued_categories.push(category);
259            if in_flight.contains_key(&key) {
260                continue;
261            }
262            in_flight.insert(key, Vec::new());
263            submission.newly_queued_categories.push(category);
264        }
265        drop(in_flight);
266
267        if submission.newly_queued_categories.is_empty() {
268            return submission;
269        }
270
271        let categories_for_worker = submission.newly_queued_categories.clone();
272        let manager = Arc::clone(self);
273        let pool = Arc::clone(&self.pool);
274        pool.spawn(move || {
275            for category in categories_for_worker {
276                let result = manager.tier2_run_with_reuse_result(snapshot.clone(), category, None);
277                manager.route_tier2_reuse_completion(result);
278            }
279        });
280
281        submission
282    }
283
284    pub fn tier2_any_in_flight(&self) -> bool {
285        self.in_flight
286            .lock()
287            .map(|in_flight| in_flight.keys().any(|key| key.category.is_tier2()))
288            .unwrap_or(false)
289    }
290
291    pub fn drain_completions(&self) -> usize {
292        let mut drained = 0usize;
293        while let Ok(result) = self.result_rx.try_recv() {
294            self.route_completion(result);
295            drained += 1;
296        }
297        drained
298    }
299
300    pub fn cache_for_snapshot(
301        &self,
302        snapshot: &InspectSnapshot,
303    ) -> Result<Arc<InspectCache>, String> {
304        self.cache_for_paths(snapshot.inspect_dir.clone(), snapshot.project_root.clone())
305    }
306
307    /// Latest persisted counts for the three Tier-2 categories, in
308    /// `(dead_code, unused_exports, duplicates)` order. Reads the most recent
309    /// aggregate regardless of contribution-hash freshness (last-known), so the
310    /// agent status bar can refresh after a background scan completes without a
311    /// freshness round-trip. A category with no readable aggregate reports
312    /// `None` (never a fabricated `0`), so the status bar can preserve any
313    /// last-known value and stay suppressed until every category is real (#1).
314    pub fn latest_tier2_counts(
315        &self,
316        inspect_dir: PathBuf,
317        project_root: PathBuf,
318    ) -> (Option<usize>, Option<usize>, Option<usize>) {
319        let Ok(cache) = self.cache_for_paths(inspect_dir, project_root) else {
320            return (None, None, None);
321        };
322        let count_of = |category: InspectCategory| -> Option<usize> {
323            cache
324                .latest_aggregate_any_hash(category)
325                .ok()
326                .flatten()
327                .and_then(|payload| {
328                    payload
329                        .get("count")
330                        .and_then(serde_json::Value::as_u64)
331                        .map(|count| count as usize)
332                })
333        };
334        (
335            count_of(InspectCategory::DeadCode),
336            count_of(InspectCategory::UnusedExports),
337            count_of(InspectCategory::Duplicates),
338        )
339    }
340
341    pub fn cache_for_paths(
342        &self,
343        inspect_dir: PathBuf,
344        project_root: PathBuf,
345    ) -> Result<Arc<InspectCache>, String> {
346        let project_key = crate::search_index::project_cache_key(&project_root);
347        let sqlite_path = inspect_dir.join(format!("{project_key}.sqlite"));
348        let mut caches = self
349            .caches
350            .lock()
351            .map_err(|_| "inspect manager cache map lock poisoned".to_string())?;
352        if let Some(cache) = caches.get(&sqlite_path) {
353            return Ok(Arc::clone(cache));
354        }
355        let cache = Arc::new(
356            InspectCache::open(inspect_dir, project_root)
357                .map_err(|error| format!("failed to open inspect cache: {error}"))?,
358        );
359        caches.insert(sqlite_path, Arc::clone(&cache));
360        Ok(cache)
361    }
362
363    pub fn tier2_run_with_reuse(
364        &self,
365        snapshot: InspectSnapshot,
366        category: InspectCategory,
367        caller_scope: JobScope,
368        callgraph_snapshot: Option<Arc<CallgraphSnapshot>>,
369    ) -> JobOutcome {
370        let result =
371            self.tier2_run_with_reuse_result(snapshot.clone(), category, callgraph_snapshot);
372        let outcome = match result.outcome {
373            Ok(success) => JobOutcome::Fresh {
374                payload: success.aggregate,
375            },
376            Err(message) => JobOutcome::Failed { message },
377        };
378        match self.cache_for_snapshot(&snapshot) {
379            Ok(cache) => filter_outcome_for_scope_with_contributions(
380                outcome,
381                &snapshot,
382                category,
383                cache.as_ref(),
384                &caller_scope,
385            ),
386            Err(message) => JobOutcome::Failed { message },
387        }
388    }
389
390    /// Read-only Tier 2 aggregate lookup for `aft_inspect`. Does NOT run any
391    /// scanner — returns the latest cached aggregate if present and verifies
392    /// its contribution freshness so warm cache hits are reported as fresh.
393    /// This is the non-blocking variant intended for the synchronous `inspect`
394    /// command path; Tier 2 scans run via the watcher-driven scheduler or the
395    /// compatibility `aft_inspect_tier2_run` command.
396    pub fn tier2_read_cached(
397        &self,
398        snapshot: InspectSnapshot,
399        category: InspectCategory,
400        caller_scope: JobScope,
401    ) -> JobOutcome {
402        if let Err(outcome) = validate_tier2_read_category(category) {
403            return outcome;
404        }
405        let cache = match self.cache_for_snapshot(&snapshot) {
406            Ok(cache) => cache,
407            Err(message) => return JobOutcome::Failed { message },
408        };
409        self.tier2_read_cached_from_cache(&snapshot, category, &caller_scope, cache.as_ref())
410    }
411
412    pub fn tier2_read_cached_readonly(
413        &self,
414        snapshot: InspectSnapshot,
415        category: InspectCategory,
416        caller_scope: JobScope,
417    ) -> JobOutcome {
418        if let Err(outcome) = validate_tier2_read_category(category) {
419            return outcome;
420        }
421        let key = JobKey::for_project_category(category);
422        let in_flight = self
423            .in_flight
424            .lock()
425            .map(|guard| guard.contains_key(&key))
426            .unwrap_or(false);
427        let cache = match InspectCache::open_readonly(
428            snapshot.inspect_dir.clone(),
429            snapshot.project_root.clone(),
430        ) {
431            Ok(Some(cache)) => cache,
432            Ok(None) => return JobOutcome::Pending { in_flight },
433            Err(error) => {
434                return JobOutcome::Failed {
435                    message: error.to_string(),
436                }
437            }
438        };
439        self.tier2_read_cached_from_cache(&snapshot, category, &caller_scope, &cache)
440    }
441
442    fn tier2_read_cached_from_cache(
443        &self,
444        snapshot: &InspectSnapshot,
445        category: InspectCategory,
446        caller_scope: &JobScope,
447        cache: &InspectCache,
448    ) -> JobOutcome {
449        let key = JobKey::for_project_category(category);
450        let in_flight = self
451            .in_flight
452            .lock()
453            .map(|guard| guard.contains_key(&key))
454            .unwrap_or(false);
455        match cache.get_aggregated(&key) {
456            Ok(Some(payload)) => {
457                match self.tier2_cached_aggregate_is_fresh(snapshot, category, cache) {
458                    Ok(true) => filter_outcome_for_scope_with_contributions(
459                        JobOutcome::Fresh { payload },
460                        snapshot,
461                        category,
462                        cache,
463                        caller_scope,
464                    ),
465                    Ok(false) => filter_outcome_for_scope_with_contributions(
466                        JobOutcome::Stale {
467                            cached: Some(payload),
468                            in_flight,
469                        },
470                        snapshot,
471                        category,
472                        cache,
473                        caller_scope,
474                    ),
475                    Err(message) => JobOutcome::Failed { message },
476                }
477            }
478            Ok(None) => match cache.latest_aggregate_any_hash(category) {
479                Ok(Some(payload)) => filter_outcome_for_scope_with_contributions(
480                    JobOutcome::Stale {
481                        cached: Some(payload),
482                        in_flight,
483                    },
484                    snapshot,
485                    category,
486                    cache,
487                    caller_scope,
488                ),
489                Ok(None) => JobOutcome::Pending { in_flight },
490                Err(error) => JobOutcome::Failed {
491                    message: error.to_string(),
492                },
493            },
494            Err(error) => JobOutcome::Failed {
495                message: error.to_string(),
496            },
497        }
498    }
499
500    fn tier2_cached_aggregate_is_fresh(
501        &self,
502        snapshot: &InspectSnapshot,
503        category: InspectCategory,
504        cache: &InspectCache,
505    ) -> Result<bool, String> {
506        let cached_records = load_contribution_freshness(cache, category)?;
507        let project_scope = JobScope::for_project(snapshot.project_root.clone());
508        let project_files = scope_files(&snapshot.project_root, &project_scope);
509        let current_by_relative = current_project_files(&snapshot.project_root, &project_files);
510        let cached_relative = cached_records
511            .iter()
512            .map(freshness_record_relative_key)
513            .collect::<BTreeSet<_>>();
514
515        for record in &cached_records {
516            let relative = freshness_record_relative_key(record);
517            if !current_by_relative.contains_key(&relative) {
518                return Ok(false);
519            }
520
521            let absolute = if record.file_path.is_absolute() {
522                record.file_path.clone()
523            } else {
524                snapshot.project_root.join(&record.file_path)
525            };
526            match verify_contribution_file_strict(&absolute, &record.freshness) {
527                ContributionFreshness::Fresh { .. } => {}
528                ContributionFreshness::Stale | ContributionFreshness::Deleted => return Ok(false),
529            }
530        }
531
532        Ok(current_by_relative
533            .keys()
534            .all(|relative| cached_relative.contains(relative)))
535    }
536
537    #[doc(hidden)]
538    pub fn tier2_run_with_reuse_result(
539        &self,
540        snapshot: InspectSnapshot,
541        category: InspectCategory,
542        callgraph_snapshot: Option<Arc<CallgraphSnapshot>>,
543    ) -> InspectResult {
544        let job = self.tier2_reuse_job(snapshot, category, callgraph_snapshot);
545        self.tier2_run_with_reuse_job_result(job)
546    }
547
548    fn tier2_run_with_reuse_job_result(&self, mut job: InspectJob) -> InspectResult {
549        let started = Instant::now();
550        if !job.category.is_active() {
551            return InspectResult::failed(
552                &job,
553                format!("inspect category '{}' is disabled in v0.33", job.category),
554                started.elapsed(),
555            );
556        }
557        if !job.category.is_tier2() {
558            return InspectResult::failed(
559                &job,
560                format!(
561                    "inspect category '{}' is not a Tier 2 category",
562                    job.category
563                ),
564                started.elapsed(),
565            );
566        }
567
568        let project_scope = JobScope::for_project(job.project_root.clone());
569        job.scope_files = scope_files(&job.project_root, &project_scope);
570        let cache = match self.cache_for_paths(job.inspect_dir.clone(), job.project_root.clone()) {
571            Ok(cache) => cache,
572            Err(message) => return InspectResult::failed(&job, message, started.elapsed()),
573        };
574        if let Ok(Some(success)) = self.tier2_quick_reuse_success(&job, cache.as_ref()) {
575            return InspectResult::success(&job, success, started.elapsed());
576        }
577
578        match self.tier2_run_with_reuse_job(&job, &cache) {
579            Ok(success) => InspectResult::success(&job, success, started.elapsed()),
580            Err(message) => InspectResult::failed(&job, message, started.elapsed()),
581        }
582    }
583
584    fn tier2_reuse_job(
585        &self,
586        snapshot: InspectSnapshot,
587        category: InspectCategory,
588        callgraph_snapshot: Option<Arc<CallgraphSnapshot>>,
589    ) -> InspectJob {
590        InspectJob {
591            job_id: self.next_job_id.fetch_add(1, Ordering::Relaxed),
592            key: JobKey::for_project_category(category),
593            category,
594            scope_files: Vec::new(),
595            project_root: snapshot.project_root,
596            inspect_dir: snapshot.inspect_dir,
597            config: snapshot.config,
598            symbol_cache: snapshot.symbol_cache,
599            callgraph_snapshot,
600        }
601    }
602
603    fn tier2_quick_reuse_success(
604        &self,
605        job: &InspectJob,
606        cache: &InspectCache,
607    ) -> Result<Option<InspectScanSuccess>, String> {
608        let Some(aggregate) = cache
609            .get_aggregated(&job.key)
610            .map_err(|error| error.to_string())?
611        else {
612            return Ok(None);
613        };
614        let cached = load_contribution_fingerprint(cache, job.category)?;
615        let current = current_file_fingerprint(&job.project_root, &job.scope_files)?;
616        if !cached.hash_complete || !current.hash_complete || cached != current {
617            return Ok(None);
618        }
619
620        cache
621            .touch_tier2_last_full_run(job.category)
622            .map_err(|error| error.to_string())?;
623        Ok(Some(InspectScanSuccess {
624            scanned_files: Vec::new(),
625            contributions: Vec::new(),
626            aggregate,
627        }))
628    }
629
630    fn tier2_run_with_reuse_job(
631        &self,
632        job: &InspectJob,
633        cache: &InspectCache,
634    ) -> Result<InspectScanSuccess, String> {
635        let cached_records = load_contribution_freshness(cache, job.category)?;
636        let current_by_relative = current_project_files(&job.project_root, &job.scope_files);
637        let cached_relative = cached_records
638            .iter()
639            .map(freshness_record_relative_key)
640            .collect::<BTreeSet<_>>();
641        #[cfg(debug_assertions)]
642        let cold_cache = cached_relative.is_empty();
643
644        let mut updates = Tier2ContributionUpdates::default();
645        let mut scan_by_relative = BTreeMap::<String, PathBuf>::new();
646        let mut aggregate_job = job.clone();
647
648        for record in cached_records {
649            let relative = freshness_record_relative_key(&record);
650            let relative_path = PathBuf::from(&relative);
651            let Some(current_file) = current_by_relative.get(&relative) else {
652                updates.deletes.push(relative_path);
653                continue;
654            };
655
656            let absolute = job.project_root.join(&record.file_path);
657            match verify_contribution_file_strict(&absolute, &record.freshness) {
658                ContributionFreshness::Fresh {
659                    metadata_changed,
660                    freshness,
661                } => {
662                    if metadata_changed {
663                        updates.metadata_updates.push((relative_path, freshness));
664                    }
665                }
666                ContributionFreshness::Stale => {
667                    updates.deletes.push(relative_path);
668                    scan_by_relative.insert(relative, current_file.clone());
669                }
670                ContributionFreshness::Deleted => {
671                    updates.deletes.push(relative_path);
672                }
673            }
674        }
675
676        for (relative, file) in &current_by_relative {
677            if !cached_relative.contains(relative) {
678                scan_by_relative.insert(relative.clone(), file.clone());
679            }
680        }
681
682        let mut scan_files = scan_by_relative.into_values().collect::<Vec<_>>();
683        if !scan_files.is_empty() {
684            let mut scan_job = job.clone();
685            scan_job.job_id = self.next_job_id.fetch_add(1, Ordering::Relaxed);
686            scan_job.scope_files = scan_files.clone();
687            if scan_job.category == InspectCategory::DeadCode
688                && scan_job.callgraph_snapshot.is_none()
689            {
690                scan_job.callgraph_snapshot =
691                    Some(build_tier2_callgraph_snapshot(&scan_job.project_root));
692            }
693            aggregate_job.callgraph_snapshot = scan_job.callgraph_snapshot.clone();
694            #[cfg(debug_assertions)]
695            if cold_cache {
696                std::thread::sleep(Duration::from_millis(10));
697            }
698            let scan_result = run_tier2_scan(&scan_job);
699            let scan_success = scan_result.outcome.map_err(|message| {
700                format!("{} incremental scan failed: {message}", job.category)
701            })?;
702            updates.upserts.extend(scan_success.contributions);
703        }
704
705        let has_updates = !updates.upserts.is_empty()
706            || !updates.deletes.is_empty()
707            || !updates.metadata_updates.is_empty();
708        if !has_updates {
709            if let Some(aggregate) = cache
710                .get_aggregated(&job.key)
711                .map_err(|error| error.to_string())?
712            {
713                cache
714                    .touch_tier2_last_full_run(job.category)
715                    .map_err(|error| error.to_string())?;
716                return Ok(InspectScanSuccess {
717                    scanned_files: scan_files,
718                    contributions: Vec::new(),
719                    aggregate,
720                });
721            }
722        }
723
724        let mut contribution_set_hash = if has_updates {
725            cache
726                .apply_contribution_updates(job.category, updates)
727                .map_err(|error| error.to_string())?
728        } else {
729            cache
730                .contribution_set_hash(job.category)
731                .map_err(|error| error.to_string())?
732        };
733
734        if let Some(aggregate) = cache
735            .load_aggregate_if_hash_matches(job.category, &contribution_set_hash)
736            .map_err(|error| error.to_string())?
737        {
738            cache
739                .touch_tier2_last_full_run(job.category)
740                .map_err(|error| error.to_string())?;
741            let contributions = load_contributions(cache, job)?;
742            return Ok(InspectScanSuccess {
743                scanned_files: scan_files,
744                contributions,
745                aggregate,
746            });
747        }
748
749        if category_contributions_depend_on_entry_points(job.category) {
750            // Manifest edits can change entry/public roots without touching any
751            // source file. Dead-code and unused-export file contributions embed
752            // those roots, so an aggregate hash miss for these categories must
753            // refresh every current contribution before rolling up again.
754            let full_scan_files = current_by_relative.into_values().collect::<Vec<_>>();
755            if !full_scan_files.is_empty() {
756                let mut rescan_job = job.clone();
757                rescan_job.job_id = self.next_job_id.fetch_add(1, Ordering::Relaxed);
758                rescan_job.scope_files = full_scan_files.clone();
759                if rescan_job.category == InspectCategory::DeadCode
760                    && rescan_job.callgraph_snapshot.is_none()
761                {
762                    rescan_job.callgraph_snapshot =
763                        Some(build_tier2_callgraph_snapshot(&rescan_job.project_root));
764                }
765                let scan_result = run_tier2_scan(&rescan_job);
766                let scan_success = scan_result.outcome.map_err(|message| {
767                    format!(
768                        "{} full rescan after entry-point cache miss failed: {message}",
769                        job.category
770                    )
771                })?;
772                let rescan_updates = Tier2ContributionUpdates {
773                    upserts: scan_success.contributions,
774                    ..Tier2ContributionUpdates::default()
775                };
776                contribution_set_hash = cache
777                    .apply_contribution_updates(job.category, rescan_updates)
778                    .map_err(|error| error.to_string())?;
779                aggregate_job.callgraph_snapshot = rescan_job.callgraph_snapshot.clone();
780                scan_files = full_scan_files;
781
782                if let Some(aggregate) = cache
783                    .load_aggregate_if_hash_matches(job.category, &contribution_set_hash)
784                    .map_err(|error| error.to_string())?
785                {
786                    cache
787                        .touch_tier2_last_full_run(job.category)
788                        .map_err(|error| error.to_string())?;
789                    let contributions = load_contributions(cache, job)?;
790                    return Ok(InspectScanSuccess {
791                        scanned_files: scan_files,
792                        contributions,
793                        aggregate,
794                    });
795                }
796            }
797        }
798
799        if aggregate_job.category == InspectCategory::DeadCode
800            && aggregate_job.callgraph_snapshot.is_none()
801        {
802            aggregate_job.callgraph_snapshot =
803                Some(build_tier2_callgraph_snapshot(&aggregate_job.project_root));
804        }
805        let contributions = load_contributions(cache, &aggregate_job)?;
806        let aggregate = roll_up_tier2_contributions(&aggregate_job, &contributions);
807        cache
808            .store_tier2_aggregate(job.key.clone(), &contribution_set_hash, aggregate.clone())
809            .map_err(|error| error.to_string())?;
810
811        Ok(InspectScanSuccess {
812            scanned_files: scan_files,
813            contributions,
814            aggregate,
815        })
816    }
817
818    fn enqueue_with_waiter(
819        &self,
820        snapshot: InspectSnapshot,
821        category: InspectCategory,
822        caller_scope: JobScope,
823        key: JobKey,
824        waiter_tx: WaiterTx,
825        callgraph_snapshot: Option<Arc<CallgraphSnapshot>>,
826    ) -> Result<(), String> {
827        let mut in_flight = self
828            .in_flight
829            .lock()
830            .map_err(|_| "inspect in-flight map lock poisoned".to_string())?;
831        if let Some(waiters) = in_flight.get_mut(&key) {
832            waiters.push(Waiter { tx: waiter_tx });
833            return Ok(());
834        }
835
836        in_flight.insert(key.clone(), vec![Waiter { tx: waiter_tx }]);
837        drop(in_flight);
838
839        if let Err(message) = self.enqueue_new_job(
840            snapshot,
841            category,
842            caller_scope,
843            key.clone(),
844            callgraph_snapshot,
845        ) {
846            if let Ok(mut in_flight) = self.in_flight.lock() {
847                in_flight.remove(&key);
848            }
849            return Err(message);
850        }
851        Ok(())
852    }
853
854    fn enqueue_without_waiter(
855        &self,
856        snapshot: InspectSnapshot,
857        category: InspectCategory,
858        caller_scope: JobScope,
859        key: JobKey,
860        callgraph_snapshot: Option<Arc<CallgraphSnapshot>>,
861    ) -> Result<(), String> {
862        let mut in_flight = self
863            .in_flight
864            .lock()
865            .map_err(|_| "inspect in-flight map lock poisoned".to_string())?;
866        if in_flight.contains_key(&key) {
867            return Ok(());
868        }
869        in_flight.insert(key.clone(), Vec::new());
870        drop(in_flight);
871
872        if let Err(message) = self.enqueue_new_job(
873            snapshot,
874            category,
875            caller_scope,
876            key.clone(),
877            callgraph_snapshot,
878        ) {
879            if let Ok(mut in_flight) = self.in_flight.lock() {
880                in_flight.remove(&key);
881            }
882            return Err(message);
883        }
884        Ok(())
885    }
886
887    fn enqueue_new_job(
888        &self,
889        snapshot: InspectSnapshot,
890        category: InspectCategory,
891        caller_scope: JobScope,
892        key: JobKey,
893        callgraph_snapshot: Option<Arc<CallgraphSnapshot>>,
894    ) -> Result<(), String> {
895        let scan_scope = if category.is_tier2() {
896            JobScope::for_project(snapshot.project_root.clone())
897        } else {
898            caller_scope
899        };
900        let scope_files = scope_files(&snapshot.project_root, &scan_scope);
901        let job = InspectJob {
902            job_id: self.next_job_id.fetch_add(1, Ordering::Relaxed),
903            key,
904            category,
905            scope_files,
906            project_root: snapshot.project_root,
907            inspect_dir: snapshot.inspect_dir,
908            config: snapshot.config,
909            symbol_cache: snapshot.symbol_cache,
910            callgraph_snapshot,
911        };
912        self.request_tx
913            .send(job)
914            .map_err(|_| "inspect dispatch loop is unavailable".to_string())
915    }
916
917    fn wait_for_outcome(
918        &self,
919        key: JobKey,
920        caller_scope: JobScope,
921        cache: Arc<InspectCache>,
922        waiter_rx: Receiver<JobOutcome>,
923        snapshot: InspectSnapshot,
924    ) -> JobOutcome {
925        let timeout = after(self.soft_deadline);
926        let result_rx = self.result_rx.clone();
927        loop {
928            select! {
929                recv(waiter_rx) -> outcome => {
930                    return match outcome {
931                        Ok(outcome) => filter_outcome_for_scope_with_contributions(
932                            outcome,
933                            &snapshot,
934                            key.category,
935                            cache.as_ref(),
936                            &caller_scope,
937                        ),
938                        Err(_) => self.timeout_outcome(&key, &caller_scope, &cache, &snapshot),
939                    };
940                }
941                recv(result_rx) -> result => {
942                    match result {
943                        Ok(result) => self.route_completion(result),
944                        Err(_) => return self.timeout_outcome(&key, &caller_scope, &cache, &snapshot),
945                    }
946                }
947                recv(timeout) -> _ => {
948                    return self.timeout_outcome(&key, &caller_scope, &cache, &snapshot);
949                }
950            }
951        }
952    }
953
954    fn timeout_outcome(
955        &self,
956        key: &JobKey,
957        caller_scope: &JobScope,
958        cache: &InspectCache,
959        snapshot: &InspectSnapshot,
960    ) -> JobOutcome {
961        match cache.get_aggregated(key) {
962            Ok(Some(cached)) => filter_outcome_for_scope_with_contributions(
963                JobOutcome::Stale {
964                    cached: Some(cached),
965                    in_flight: true,
966                },
967                snapshot,
968                key.category,
969                cache,
970                caller_scope,
971            ),
972            Ok(None) => JobOutcome::Pending { in_flight: true },
973            Err(error) => JobOutcome::Failed {
974                message: error.to_string(),
975            },
976        }
977    }
978
979    fn route_completion(&self, result: InspectResult) {
980        let outcome = self.completion_outcome(result.clone());
981        let waiters = self
982            .in_flight
983            .lock()
984            .ok()
985            .and_then(|mut in_flight| in_flight.remove(&result.key))
986            .unwrap_or_default();
987        for waiter in waiters {
988            let _ = waiter.tx.send(outcome.clone());
989        }
990    }
991
992    fn route_tier2_reuse_completion(&self, result: InspectResult) {
993        let outcome = match result.outcome.clone() {
994            Ok(success) => JobOutcome::Fresh {
995                payload: success.aggregate,
996            },
997            Err(message) => JobOutcome::Failed { message },
998        };
999        let waiters = self
1000            .in_flight
1001            .lock()
1002            .ok()
1003            .and_then(|mut in_flight| in_flight.remove(&result.key))
1004            .unwrap_or_default();
1005        for waiter in waiters {
1006            let _ = waiter.tx.send(outcome.clone());
1007        }
1008        // Signal the main-thread drain that a background (watcher-driven) Tier-2
1009        // scan finished so it can refresh the status bar. This path bypasses
1010        // `result_rx`/`drain_completions`, so without this counter the bar's
1011        // counts and `~` marker would only update on a manual `aft_inspect`.
1012        self.reuse_completions.fetch_add(1, Ordering::SeqCst);
1013    }
1014
1015    /// Snapshot the cumulative count of reuse-path (watcher-driven) Tier-2
1016    /// completions. The main-thread drain compares this against its last-seen
1017    /// value to detect background scans that finished since the previous tick.
1018    pub fn reuse_completion_count(&self) -> u64 {
1019        self.reuse_completions.load(Ordering::SeqCst)
1020    }
1021
1022    fn completion_outcome(&self, result: InspectResult) -> JobOutcome {
1023        let cache =
1024            match self.cache_for_paths(result.inspect_dir.clone(), result.project_root.clone()) {
1025                Ok(cache) => cache,
1026                Err(message) => return JobOutcome::Failed { message },
1027            };
1028
1029        match result.outcome {
1030            Ok(success) => {
1031                let store_result = if result.category.is_tier2() {
1032                    cache.store_tier2_result(
1033                        result.key.clone(),
1034                        &success.scanned_files,
1035                        &success.contributions,
1036                        success.aggregate.clone(),
1037                    )
1038                } else {
1039                    cache.store_aggregated(result.key, success.aggregate.clone())
1040                };
1041
1042                match store_result {
1043                    Ok(()) => JobOutcome::Fresh {
1044                        payload: success.aggregate,
1045                    },
1046                    Err(error) => JobOutcome::Failed {
1047                        message: error.to_string(),
1048                    },
1049                }
1050            }
1051            Err(message) => JobOutcome::Failed { message },
1052        }
1053    }
1054}
1055
1056impl Default for InspectManager {
1057    fn default() -> Self {
1058        Self::new()
1059    }
1060}
1061
1062fn validate_tier2_read_category(category: InspectCategory) -> Result<(), JobOutcome> {
1063    if !category.is_active() {
1064        return Err(JobOutcome::Failed {
1065            message: format!("inspect category '{category}' is disabled in v0.33"),
1066        });
1067    }
1068    if !category.is_tier2() {
1069        return Err(JobOutcome::Failed {
1070            message: format!("inspect category '{category}' is not a Tier 2 category"),
1071        });
1072    }
1073    Ok(())
1074}
1075
1076fn scope_files(project_root: &Path, scope: &JobScope) -> Vec<PathBuf> {
1077    let mut files = crate::callgraph::walk_project_files(project_root)
1078        .filter(|path| scope.contains(path))
1079        .collect::<Vec<_>>();
1080    files.sort();
1081    files
1082}
1083
1084fn current_project_files(project_root: &Path, files: &[PathBuf]) -> BTreeMap<String, PathBuf> {
1085    files
1086        .iter()
1087        .map(|file| (relative_cache_key(project_root, file), file.clone()))
1088        .collect()
1089}
1090
1091fn build_tier2_callgraph_snapshot(project_root: &Path) -> Arc<CallgraphSnapshot> {
1092    let mut graph = CallGraph::new(project_root.to_path_buf());
1093    let graph_files = graph.project_files().to_vec();
1094    let files = graph_files
1095        .iter()
1096        .map(canonicalize_for_snapshot)
1097        .collect::<Vec<_>>();
1098    let resolved_entry_points = super::entry_points::resolve_entry_points(project_root);
1099
1100    let mut exported_symbols = Vec::new();
1101    let mut outbound_calls = Vec::new();
1102    let mut entry_points = BTreeSet::new();
1103
1104    for file in &graph_files {
1105        let snapshot_file = canonicalize_for_snapshot(file);
1106        if is_entry_point_file(&resolved_entry_points, &snapshot_file) {
1107            entry_points.insert(snapshot_file.clone());
1108        }
1109
1110        let file_data = match graph.build_file(file) {
1111            Ok(file_data) => file_data.clone(),
1112            Err(_) => continue,
1113        };
1114
1115        for symbol in &file_data.exported_symbols {
1116            let metadata = file_data.symbol_metadata_for(symbol);
1117            exported_symbols.push(CallgraphExport {
1118                file: snapshot_file.clone(),
1119                symbol: symbol.clone(),
1120                kind: metadata
1121                    .map(|metadata| symbol_kind_name(&metadata.kind))
1122                    .unwrap_or("unknown")
1123                    .to_string(),
1124                line: metadata.map(|metadata| metadata.line).unwrap_or(1),
1125            });
1126        }
1127
1128        if let Some(default_symbol) = &file_data.default_export_symbol {
1129            let metadata = file_data.symbol_metadata_for(default_symbol);
1130            exported_symbols.push(CallgraphExport {
1131                file: snapshot_file.clone(),
1132                symbol: default_symbol.clone(),
1133                kind: DEFAULT_EXPORT_MARKER_KIND.to_string(),
1134                line: metadata.map(|metadata| metadata.line).unwrap_or(1),
1135            });
1136        }
1137
1138        for (caller_symbol, calls) in &file_data.calls_by_symbol {
1139            for call in calls {
1140                let target = match graph.resolve_cross_file_edge(
1141                    &call.full_callee,
1142                    &call.callee_name,
1143                    file,
1144                    &file_data.import_block,
1145                ) {
1146                    EdgeResolution::Resolved { file, symbol } => {
1147                        let file = canonicalize_for_snapshot(&file);
1148                        format!("{}::{symbol}", file.display())
1149                    }
1150                    // Unresolved cross-file edge. Before falling back to a bare
1151                    // callee name, try to resolve it to a symbol DEFINED IN THE
1152                    // SAME FILE (private functions included) — mirroring
1153                    // build_reverse_index. This is what makes a local call like
1154                    // `main()` -> `dispatch()` resolve to `main.rs::dispatch`
1155                    // (the private command router) instead of leaking a bare
1156                    // `dispatch` that dead_code then misresolves to an unrelated
1157                    // exported `dispatch` in another file. Without this, liveness
1158                    // breaks at every private same-file intermediary.
1159                    EdgeResolution::Unresolved { callee_name } => {
1160                        if is_bare_callee(&call.full_callee, &callee_name) {
1161                            match resolve_symbol_query_in_data(&file_data, file, &callee_name) {
1162                                Ok(symbol) => {
1163                                    format!("{}::{symbol}", snapshot_file.display())
1164                                }
1165                                Err(_) => callee_name,
1166                            }
1167                        } else {
1168                            callee_name
1169                        }
1170                    }
1171                };
1172                let target = if is_method_dispatch_callee(&call.full_callee, &call.callee_name) {
1173                    format!("{target}{DISPATCHED_CALLEE_SEPARATOR}{}", call.full_callee)
1174                } else {
1175                    target
1176                };
1177                outbound_calls.push(CallgraphOutboundCall {
1178                    caller_file: snapshot_file.clone(),
1179                    caller_symbol: caller_symbol.clone(),
1180                    target,
1181                    line: call.line,
1182                });
1183            }
1184        }
1185    }
1186
1187    Arc::new(CallgraphSnapshot {
1188        generated_at: Some(SystemTime::now()),
1189        files,
1190        exported_symbols,
1191        outbound_calls,
1192        entry_points,
1193    })
1194}
1195
1196fn canonicalize_for_snapshot(path: &PathBuf) -> PathBuf {
1197    std::fs::canonicalize(path).unwrap_or_else(|_| normalize_path(path))
1198}
1199
1200fn is_entry_point_file(entry_points: &super::entry_points::EntryPointSet, file: &Path) -> bool {
1201    entry_points.is_entry_point(file)
1202}
1203
1204fn is_method_dispatch_callee(full_callee: &str, callee_name: &str) -> bool {
1205    let full_callee = full_callee.trim();
1206    if !full_callee.contains('.') || full_callee == callee_name.trim() {
1207        return false;
1208    }
1209
1210    full_callee
1211        .rsplit('.')
1212        .next()
1213        .map(|segment| segment.trim().trim_start_matches('?') == callee_name.trim())
1214        .unwrap_or(false)
1215}
1216
1217fn symbol_kind_name(kind: &SymbolKind) -> &'static str {
1218    match kind {
1219        SymbolKind::Function => "function",
1220        SymbolKind::Method => "method",
1221        SymbolKind::Class => "class",
1222        SymbolKind::Struct => "struct",
1223        SymbolKind::Interface => "interface",
1224        SymbolKind::Enum => "enum",
1225        SymbolKind::TypeAlias => "type_alias",
1226        SymbolKind::Variable => "variable",
1227        SymbolKind::Heading => "heading",
1228        SymbolKind::FileSummary => "file_summary",
1229    }
1230}
1231
1232fn load_contribution_fingerprint(
1233    cache: &InspectCache,
1234    category: InspectCategory,
1235) -> Result<ContributionFingerprint, String> {
1236    let (count, set_hash, hash_complete) = cache
1237        .contribution_fingerprint(category)
1238        .map_err(|error| error.to_string())?;
1239    Ok(ContributionFingerprint {
1240        count,
1241        set_hash,
1242        hash_complete,
1243    })
1244}
1245
1246fn current_file_fingerprint(
1247    project_root: &Path,
1248    files: &[PathBuf],
1249) -> Result<ContributionFingerprint, String> {
1250    let mut entries = Vec::with_capacity(files.len());
1251    let mut hash_complete = true;
1252    for file in files {
1253        let freshness = cache_freshness::collect(file)
1254            .map_err(|error| format!("failed to fingerprint {}: {error}", file.display()))?;
1255        let relative_path = relative_cache_key(project_root, file);
1256        let mtime_ns = system_time_to_ns_i64(freshness.mtime);
1257        if freshness.content_hash == cache_freshness::zero_hash() {
1258            hash_complete = false;
1259        }
1260        entries.push((
1261            relative_path,
1262            mtime_ns,
1263            freshness.size,
1264            freshness.content_hash.to_hex().to_string(),
1265        ));
1266    }
1267    entries.sort_by(|left, right| left.0.cmp(&right.0));
1268
1269    let mut hasher = blake3::Hasher::new();
1270    for (relative_path, mtime_ns, file_size, file_hash) in &entries {
1271        update_contribution_fingerprint_hash(
1272            &mut hasher,
1273            relative_path,
1274            *mtime_ns,
1275            *file_size,
1276            file_hash,
1277        );
1278    }
1279
1280    Ok(ContributionFingerprint {
1281        count: entries.len(),
1282        set_hash: hasher.finalize().to_hex().to_string(),
1283        hash_complete,
1284    })
1285}
1286
1287fn update_contribution_fingerprint_hash(
1288    hasher: &mut blake3::Hasher,
1289    relative_path: &str,
1290    mtime_ns: i64,
1291    file_size: u64,
1292    file_hash: &str,
1293) {
1294    hasher.update(relative_path.as_bytes());
1295    hasher.update(&[0]);
1296    hasher.update(&mtime_ns.to_le_bytes());
1297    hasher.update(&file_size.to_le_bytes());
1298    hasher.update(&[0]);
1299    hasher.update(file_hash.as_bytes());
1300}
1301
1302fn verify_contribution_file_strict(path: &Path, cached: &FileFreshness) -> ContributionFreshness {
1303    match cache_freshness::verify_file_strict(path, cached) {
1304        FreshnessVerdict::HotFresh => ContributionFreshness::Fresh {
1305            metadata_changed: false,
1306            freshness: *cached,
1307        },
1308        FreshnessVerdict::ContentFresh {
1309            new_mtime,
1310            new_size,
1311        } => ContributionFreshness::Fresh {
1312            metadata_changed: true,
1313            freshness: FileFreshness {
1314                mtime: new_mtime,
1315                size: new_size,
1316                content_hash: cached.content_hash,
1317            },
1318        },
1319        FreshnessVerdict::Stale => ContributionFreshness::Stale,
1320        FreshnessVerdict::Deleted => ContributionFreshness::Deleted,
1321    }
1322}
1323
1324fn load_contribution_freshness(
1325    cache: &InspectCache,
1326    category: InspectCategory,
1327) -> Result<Vec<CachedContributionFreshness>, String> {
1328    cache
1329        .contribution_freshness(category)
1330        .map_err(|error| error.to_string())
1331        .map(|records| {
1332            records
1333                .into_iter()
1334                .map(|(file_path, freshness)| CachedContributionFreshness {
1335                    file_path,
1336                    freshness,
1337                })
1338                .collect()
1339        })
1340}
1341
1342fn freshness_record_relative_key(record: &CachedContributionFreshness) -> String {
1343    record.file_path.to_string_lossy().to_string()
1344}
1345
1346fn system_time_to_ns_i64(time: SystemTime) -> i64 {
1347    let nanos = time
1348        .duration_since(UNIX_EPOCH)
1349        .unwrap_or_else(|_| Duration::from_secs(0))
1350        .as_nanos();
1351    nanos.min(i64::MAX as u128) as i64
1352}
1353
1354fn relative_cache_key(project_root: &Path, path: &Path) -> String {
1355    path.strip_prefix(project_root)
1356        .unwrap_or(path)
1357        .to_string_lossy()
1358        .to_string()
1359}
1360
1361fn load_contributions(
1362    cache: &InspectCache,
1363    job: &InspectJob,
1364) -> Result<Vec<FileContribution>, String> {
1365    cache
1366        .load_tier2_contributions(job.category)
1367        .map_err(|error| error.to_string())
1368        .map(|records| {
1369            records
1370                .into_iter()
1371                .map(|record| contribution_from_record(&job.project_root, record))
1372                .collect()
1373        })
1374}
1375
1376fn contribution_from_record(
1377    project_root: &Path,
1378    record: super::cache::ContributionRecord,
1379) -> FileContribution {
1380    FileContribution::new(
1381        record.category,
1382        project_root.join(record.file_path),
1383        record.freshness,
1384        record.contribution,
1385    )
1386    .with_type_ref_names(record.type_ref_names)
1387}
1388
1389fn run_tier2_scan(job: &InspectJob) -> InspectResult {
1390    use super::scanners;
1391
1392    match job.category {
1393        InspectCategory::DeadCode => scanners::dead_code::run_dead_code_scan(job),
1394        InspectCategory::UnusedExports => scanners::unused_exports::run_unused_exports_scan(job),
1395        InspectCategory::Duplicates => scanners::duplicates::run_duplicates_scan(job),
1396        other => InspectResult::failed(
1397            job,
1398            format!("inspect category '{other}' is not an active Tier 2 scanner"),
1399            Duration::from_secs(0),
1400        ),
1401    }
1402}
1403
1404fn roll_up_tier2_contributions(job: &InspectJob, contributions: &[FileContribution]) -> Value {
1405    roll_up_tier2_contributions_with_limit(job, contributions, Some(MAX_DRILL_DOWN_ITEMS))
1406}
1407
1408fn roll_up_tier2_contributions_with_limit(
1409    job: &InspectJob,
1410    contributions: &[FileContribution],
1411    drill_down_limit: Option<usize>,
1412) -> Value {
1413    match job.category {
1414        InspectCategory::DeadCode => {
1415            roll_up_dead_code_contributions(job, contributions, drill_down_limit)
1416        }
1417        InspectCategory::UnusedExports => {
1418            roll_up_unused_exports_contributions(job, contributions, drill_down_limit)
1419        }
1420        InspectCategory::Duplicates => {
1421            roll_up_duplicate_contributions(job, contributions, drill_down_limit)
1422        }
1423        _ => json!({
1424            "count": 0,
1425            "items": [],
1426            "scanned_files": contributions.len(),
1427        }),
1428    }
1429}
1430
1431fn scoped_tier2_payload_from_contributions(
1432    snapshot: &InspectSnapshot,
1433    category: InspectCategory,
1434    cache: &InspectCache,
1435    project_payload: Value,
1436    scope: &JobScope,
1437) -> Result<Value, String> {
1438    if scope.is_project_wide() {
1439        return Ok(project_payload);
1440    }
1441
1442    let project_scope = JobScope::for_project(snapshot.project_root.clone());
1443    let rollup_job = scoped_tier2_rollup_job(snapshot, category, &project_scope);
1444    let contributions = load_contributions(cache, &rollup_job)?;
1445    let full_payload = roll_up_tier2_contributions_with_limit(&rollup_job, &contributions, None);
1446    let scoped_payload = filter_payload_for_scope(full_payload, scope);
1447    Ok(cap_payload_drill_down(scoped_payload, MAX_DRILL_DOWN_ITEMS))
1448}
1449
1450fn scoped_tier2_rollup_job(
1451    snapshot: &InspectSnapshot,
1452    category: InspectCategory,
1453    scope: &JobScope,
1454) -> InspectJob {
1455    InspectJob {
1456        job_id: 0,
1457        key: JobKey::for_project_category(category),
1458        category,
1459        scope_files: scope_files(&snapshot.project_root, scope),
1460        project_root: snapshot.project_root.clone(),
1461        inspect_dir: snapshot.inspect_dir.clone(),
1462        config: Arc::clone(&snapshot.config),
1463        symbol_cache: Arc::clone(&snapshot.symbol_cache),
1464        callgraph_snapshot: (category == InspectCategory::DeadCode)
1465            .then(|| Arc::new(CallgraphSnapshot::default())),
1466    }
1467}
1468
1469fn roll_up_dead_code_contributions(
1470    job: &InspectJob,
1471    contributions: &[FileContribution],
1472    drill_down_limit: Option<usize>,
1473) -> Value {
1474    if job.callgraph_snapshot.is_none() {
1475        return super::scanners::dead_code::callgraph_unavailable_aggregate(job.scope_files.len());
1476    }
1477
1478    let public_api_files = super::scanners::dead_code::collect_public_api_files(&job.project_root);
1479    let roles = super::entry_points::resolve_project_roles(&job.project_root);
1480    super::scanners::dead_code::aggregate_dead_code_contributions_with_limit(
1481        contributions,
1482        &public_api_files,
1483        &roles,
1484        drill_down_limit,
1485    )
1486}
1487
1488fn roll_up_unused_exports_contributions(
1489    job: &InspectJob,
1490    contributions: &[FileContribution],
1491    drill_down_limit: Option<usize>,
1492) -> Value {
1493    let parsed = contributions
1494        .iter()
1495        .filter_map(|contribution| {
1496            serde_json::from_value::<UnusedExportsContribution>(contribution.contribution.clone())
1497                .ok()
1498        })
1499        .collect::<Vec<_>>();
1500
1501    let (public_api_entries, package_warnings) = unused_public_api_entries(&job.project_root);
1502    let mut imported_by: BTreeMap<(String, String), BTreeSet<String>> = BTreeMap::new();
1503    let mut uncertain_by: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
1504    for scan in &parsed {
1505        for import in &scan.imports {
1506            let Some(resolved_file) = &import.resolved_file else {
1507                continue;
1508            };
1509            for name in &import.named {
1510                if name == "*" {
1511                    uncertain_by
1512                        .entry(resolved_file.clone())
1513                        .or_default()
1514                        .insert(scan.file.clone());
1515                } else {
1516                    imported_by
1517                        .entry((resolved_file.clone(), name.clone()))
1518                        .or_default()
1519                        .insert(scan.file.clone());
1520                }
1521            }
1522        }
1523    }
1524
1525    let mut count = 0usize;
1526    let mut items = Vec::new();
1527    let mut uncertain_count = 0usize;
1528    let mut uncertain_items = Vec::new();
1529    for scan in &parsed {
1530        if public_api_entries.contains(&scan.file) {
1531            continue;
1532        }
1533        // Mirror the fresh-scan path: fixtures/corpora/mock data are consumed
1534        // by path, never imported, so their exports always look unused.
1535        if super::job::is_test_support_file(&scan.file) {
1536            continue;
1537        }
1538
1539        for export in &scan.exports {
1540            let imported = imported_by
1541                .get(&(scan.file.clone(), export.symbol.clone()))
1542                .map(|files| !files.is_empty())
1543                .unwrap_or(false);
1544            let uncertain = uncertain_by
1545                .get(&scan.file)
1546                .map(|files| !files.is_empty())
1547                .unwrap_or(false);
1548
1549            if imported {
1550                continue;
1551            }
1552            if uncertain {
1553                uncertain_count += 1;
1554                if drill_down_limit.is_none_or(|limit| uncertain_items.len() < limit) {
1555                    uncertain_items.push(json!({
1556                        "file": scan.file,
1557                        "symbol": export.symbol,
1558                        "kind": export.kind,
1559                        "line": export.line,
1560                        "reason": "wildcard_import",
1561                    }));
1562                }
1563                continue;
1564            }
1565
1566            count += 1;
1567            // Collect uncapped; rank by signal tier and truncate below.
1568            items.push(json!({
1569                "file": scan.file,
1570                "symbol": export.symbol,
1571                "kind": export.kind,
1572                "line": export.line,
1573            }));
1574        }
1575    }
1576
1577    let roles = super::entry_points::resolve_project_roles(&job.project_root);
1578    let items = super::entry_points::rank_and_truncate_items(items, &roles, drill_down_limit);
1579    let top = super::entry_points::top_preview_symbols(&items);
1580
1581    let mut aggregate = json!({
1582        "count": count,
1583        "items": items,
1584        "top": top,
1585        "drill_down_capped": drill_down_limit.is_some_and(|limit| count > limit),
1586        "scanned_files": parsed.len(),
1587        "languages_skipped": skipped_languages(&job.scope_files, LanguageSkipMode::UnusedExports),
1588        "uncertain_count": uncertain_count,
1589        "uncertain_items": uncertain_items,
1590    });
1591    if !package_warnings.is_empty() {
1592        aggregate["note"] = Value::String(package_warnings.join("; "));
1593    }
1594    aggregate
1595}
1596
1597fn roll_up_duplicate_contributions(
1598    job: &InspectJob,
1599    contributions: &[FileContribution],
1600    drill_down_limit: Option<usize>,
1601) -> Value {
1602    super::scanners::duplicates::aggregate_duplicate_contributions_with_limit(
1603        contributions,
1604        skipped_languages(&job.scope_files, LanguageSkipMode::Duplicates),
1605        drill_down_limit,
1606    )
1607}
1608
1609fn cap_payload_drill_down(mut payload: Value, limit: usize) -> Value {
1610    let mut capped = false;
1611    if let Some(items) = payload.get_mut("items").and_then(Value::as_array_mut) {
1612        capped |= items.len() > limit;
1613        items.truncate(limit);
1614    }
1615    if let Some(groups) = payload.get_mut("groups").and_then(Value::as_array_mut) {
1616        capped |= groups.len() > limit;
1617        groups.truncate(limit);
1618    }
1619    if let Some(object) = payload.as_object_mut() {
1620        object.insert("drill_down_capped".to_string(), json!(capped));
1621    }
1622    payload
1623}
1624
1625const MAX_DRILL_DOWN_ITEMS: usize = 100;
1626
1627#[derive(Debug, Clone, Deserialize)]
1628struct ExportContribution {
1629    symbol: String,
1630    kind: String,
1631    line: u32,
1632}
1633
1634#[derive(Debug, Clone, Deserialize)]
1635struct UnusedExportsContribution {
1636    file: String,
1637    exports: Vec<ExportContribution>,
1638    imports: Vec<ImportContribution>,
1639}
1640
1641#[derive(Debug, Clone, Deserialize)]
1642struct ImportContribution {
1643    resolved_file: Option<String>,
1644    named: Vec<String>,
1645}
1646
1647#[derive(Debug, Clone, Copy)]
1648enum LanguageSkipMode {
1649    Duplicates,
1650    UnusedExports,
1651}
1652
1653fn category_contributions_depend_on_entry_points(category: InspectCategory) -> bool {
1654    matches!(
1655        category,
1656        InspectCategory::DeadCode | InspectCategory::UnusedExports
1657    )
1658}
1659
1660fn skipped_languages(files: &[PathBuf], mode: LanguageSkipMode) -> Vec<String> {
1661    files
1662        .iter()
1663        .filter_map(|file| skipped_language(file, mode))
1664        .collect::<BTreeSet<_>>()
1665        .into_iter()
1666        .collect()
1667}
1668
1669fn skipped_language(file: &Path, mode: LanguageSkipMode) -> Option<String> {
1670    let Some(language) = crate::parser::detect_language(file) else {
1671        return match mode {
1672            LanguageSkipMode::Duplicates => Some("unknown".to_string()),
1673            LanguageSkipMode::UnusedExports => None,
1674        };
1675    };
1676
1677    let skipped = match mode {
1678        LanguageSkipMode::Duplicates => !duplicates_supports_language(language),
1679        LanguageSkipMode::UnusedExports => !is_js_ts_language(language),
1680    };
1681    skipped.then(|| language_name(language).to_string())
1682}
1683
1684fn duplicates_supports_language(language: crate::parser::LangId) -> bool {
1685    !matches!(
1686        language,
1687        crate::parser::LangId::Bash
1688            | crate::parser::LangId::Html
1689            | crate::parser::LangId::Json
1690            | crate::parser::LangId::Scala
1691            | crate::parser::LangId::Solidity
1692            | crate::parser::LangId::Scss
1693            | crate::parser::LangId::Vue
1694            | crate::parser::LangId::Markdown
1695            | crate::parser::LangId::Java
1696            | crate::parser::LangId::Ruby
1697            | crate::parser::LangId::Kotlin
1698            | crate::parser::LangId::Swift
1699            | crate::parser::LangId::Php
1700            | crate::parser::LangId::Lua
1701            | crate::parser::LangId::Perl
1702    )
1703}
1704
1705fn is_js_ts_language(language: crate::parser::LangId) -> bool {
1706    matches!(
1707        language,
1708        crate::parser::LangId::TypeScript
1709            | crate::parser::LangId::Tsx
1710            | crate::parser::LangId::JavaScript
1711    )
1712}
1713
1714fn language_name(language: crate::parser::LangId) -> &'static str {
1715    match language {
1716        crate::parser::LangId::TypeScript => "typescript",
1717        crate::parser::LangId::Tsx => "tsx",
1718        crate::parser::LangId::JavaScript => "javascript",
1719        crate::parser::LangId::Python => "python",
1720        crate::parser::LangId::Rust => "rust",
1721        crate::parser::LangId::Go => "go",
1722        crate::parser::LangId::C => "c",
1723        crate::parser::LangId::Cpp => "cpp",
1724        crate::parser::LangId::Zig => "zig",
1725        crate::parser::LangId::CSharp => "csharp",
1726        crate::parser::LangId::Bash => "bash",
1727        crate::parser::LangId::Html => "html",
1728        crate::parser::LangId::Markdown => "markdown",
1729        crate::parser::LangId::Yaml => "yaml",
1730        crate::parser::LangId::Solidity => "solidity",
1731        crate::parser::LangId::Scss => "scss",
1732        crate::parser::LangId::Vue => "vue",
1733        crate::parser::LangId::Json => "json",
1734        crate::parser::LangId::Scala => "scala",
1735        crate::parser::LangId::Java => "java",
1736        crate::parser::LangId::Ruby => "ruby",
1737        crate::parser::LangId::Kotlin => "kotlin",
1738        crate::parser::LangId::Swift => "swift",
1739        crate::parser::LangId::Php => "php",
1740        crate::parser::LangId::Lua => "lua",
1741        crate::parser::LangId::Perl => "perl",
1742    }
1743}
1744
1745fn unused_public_api_entries(project_root: &Path) -> (BTreeSet<String>, Vec<String>) {
1746    let entry_points = super::entry_points::resolve_entry_points(project_root);
1747    (
1748        entry_points.public_api_files_relative(project_root),
1749        entry_points.warnings().to_vec(),
1750    )
1751}
1752
1753fn filter_outcome_for_scope_with_contributions(
1754    outcome: JobOutcome,
1755    snapshot: &InspectSnapshot,
1756    category: InspectCategory,
1757    cache: &InspectCache,
1758    scope: &JobScope,
1759) -> JobOutcome {
1760    if !category.is_tier2() || scope.is_project_wide() {
1761        return filter_outcome_for_scope(outcome, scope);
1762    }
1763
1764    match outcome {
1765        JobOutcome::Fresh { payload } => {
1766            match scoped_tier2_payload_from_contributions(snapshot, category, cache, payload, scope)
1767            {
1768                Ok(payload) => JobOutcome::Fresh { payload },
1769                Err(message) => JobOutcome::Failed { message },
1770            }
1771        }
1772        JobOutcome::Stale { cached, in_flight } => match cached {
1773            Some(payload) => {
1774                match scoped_tier2_payload_from_contributions(
1775                    snapshot, category, cache, payload, scope,
1776                ) {
1777                    Ok(payload) => JobOutcome::Stale {
1778                        cached: Some(payload),
1779                        in_flight,
1780                    },
1781                    Err(message) => JobOutcome::Failed { message },
1782                }
1783            }
1784            None => JobOutcome::Stale {
1785                cached: None,
1786                in_flight,
1787            },
1788        },
1789        JobOutcome::Pending { in_flight } => JobOutcome::Pending { in_flight },
1790        JobOutcome::Failed { message } => JobOutcome::Failed { message },
1791    }
1792}
1793
1794fn filter_outcome_for_scope(outcome: JobOutcome, scope: &JobScope) -> JobOutcome {
1795    match outcome {
1796        JobOutcome::Fresh { payload } => JobOutcome::Fresh {
1797            payload: filter_payload_for_scope(payload, scope),
1798        },
1799        JobOutcome::Stale { cached, in_flight } => JobOutcome::Stale {
1800            cached: cached.map(|payload| filter_payload_for_scope(payload, scope)),
1801            in_flight,
1802        },
1803        JobOutcome::Pending { in_flight } => JobOutcome::Pending { in_flight },
1804        JobOutcome::Failed { message } => JobOutcome::Failed { message },
1805    }
1806}
1807
1808fn filter_payload_for_scope(mut payload: serde_json::Value, scope: &JobScope) -> serde_json::Value {
1809    if scope.is_project_wide() {
1810        return payload;
1811    }
1812
1813    // Scoped Tier 2 callers pass an uncapped rollup into this filter and cap
1814    // drill-down only afterwards, so the recomputed count below remains the
1815    // true in-scope total rather than the size of a capped sample.
1816    if let Some(items) = payload
1817        .get_mut("items")
1818        .and_then(|value| value.as_array_mut())
1819    {
1820        let count = filter_values_for_scope(items, scope);
1821        if let Some(object) = payload.as_object_mut() {
1822            object.insert("count".to_string(), serde_json::json!(count));
1823            if object.contains_key("total_groups") {
1824                object.insert("total_groups".to_string(), serde_json::json!(count));
1825            }
1826            if object.contains_key("groups_count") {
1827                object.insert("groups_count".to_string(), serde_json::json!(count));
1828            }
1829        }
1830    }
1831
1832    if let Some(groups) = payload
1833        .get_mut("groups")
1834        .and_then(|value| value.as_array_mut())
1835    {
1836        let count = filter_values_for_scope(groups, scope);
1837        if let Some(object) = payload.as_object_mut() {
1838            object.insert("count".to_string(), serde_json::json!(count));
1839            object.insert("total_groups".to_string(), serde_json::json!(count));
1840            if object.contains_key("groups_count") {
1841                object.insert("groups_count".to_string(), serde_json::json!(count));
1842            }
1843        }
1844    }
1845
1846    payload
1847}
1848
1849fn filter_values_for_scope(values: &mut Vec<serde_json::Value>, scope: &JobScope) -> usize {
1850    values.retain_mut(|value| prune_value_for_scope(value, scope));
1851    values.len()
1852}
1853
1854fn prune_value_for_scope(value: &mut serde_json::Value, scope: &JobScope) -> bool {
1855    if let Some(file) = value.get("file").and_then(|file| file.as_str()) {
1856        return scope.contains_display_path(file);
1857    }
1858
1859    let first_scoped_occurrence = if let Some(files) = value
1860        .get_mut("files")
1861        .and_then(|files| files.as_array_mut())
1862    {
1863        files.retain(|file| {
1864            file.as_str()
1865                .is_some_and(|file| scope.contains_display_path(display_file_from_occurrence(file)))
1866        });
1867        if files.len() < 2 {
1868            return false;
1869        }
1870        files.first().and_then(Value::as_str).map(str::to_string)
1871    } else {
1872        None
1873    };
1874
1875    if let Some(occurrence) = first_scoped_occurrence {
1876        update_duplicate_group_sample(value, &occurrence);
1877    }
1878
1879    true
1880}
1881
1882fn update_duplicate_group_sample(value: &mut serde_json::Value, occurrence: &str) {
1883    let Some((file, start_line, end_line)) = parse_duplicate_occurrence(occurrence) else {
1884        return;
1885    };
1886    let Some(object) = value.as_object_mut() else {
1887        return;
1888    };
1889
1890    if object.contains_key("sample_file") {
1891        object.insert("sample_file".to_string(), json!(file));
1892    }
1893    if object.contains_key("sample_start_line") {
1894        object.insert("sample_start_line".to_string(), json!(start_line));
1895    }
1896    if object.contains_key("sample_end_line") {
1897        object.insert("sample_end_line".to_string(), json!(end_line));
1898    }
1899}
1900
1901fn parse_duplicate_occurrence(value: &str) -> Option<(&str, u64, u64)> {
1902    let (file, range) = value.rsplit_once(':')?;
1903    let (start, end) = range.split_once('-')?;
1904    if !start.chars().all(|char| char.is_ascii_digit())
1905        || !end.chars().all(|char| char.is_ascii_digit())
1906    {
1907        return None;
1908    }
1909
1910    Some((file, start.parse().ok()?, end.parse().ok()?))
1911}
1912
1913fn display_file_from_occurrence(value: &str) -> &str {
1914    let Some((file, range)) = value.rsplit_once(':') else {
1915        return value;
1916    };
1917    let Some((start, end)) = range.split_once('-') else {
1918        return value;
1919    };
1920    if start.chars().all(|char| char.is_ascii_digit())
1921        && end.chars().all(|char| char.is_ascii_digit())
1922    {
1923        file
1924    } else {
1925        value
1926    }
1927}
1928
1929#[allow(dead_code)]
1930fn normalize_scope_root(path: &Path) -> PathBuf {
1931    std::fs::canonicalize(path).unwrap_or_else(|_| normalize_path(path))
1932}