Skip to main content

aft/inspect/
cache.rs

1use std::collections::{BTreeSet, HashMap, VecDeque};
2use std::fmt;
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::{Mutex, RwLock};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use rusqlite::{params, Connection, OptionalExtension};
9
10use crate::cache_freshness::{self, FileFreshness, FreshnessVerdict};
11
12use super::job::{
13    contribution_with_type_ref_names, type_ref_names_from_contribution, FileContribution,
14    InspectCategory, JobKey,
15};
16
17#[derive(Debug, Default)]
18pub(crate) struct Tier2ContributionUpdates {
19    pub upserts: Vec<FileContribution>,
20    pub deletes: Vec<PathBuf>,
21    pub metadata_updates: Vec<(PathBuf, FileFreshness)>,
22}
23
24#[derive(Debug)]
25pub enum InspectCacheError {
26    Io(std::io::Error),
27    Sql(rusqlite::Error),
28    Json(serde_json::Error),
29    LockPoisoned(&'static str),
30    InvalidHash(String),
31}
32
33impl fmt::Display for InspectCacheError {
34    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
35        match self {
36            InspectCacheError::Io(error) => write!(formatter, "inspect cache io error: {error}"),
37            InspectCacheError::Sql(error) => {
38                write!(formatter, "inspect cache sqlite error: {error}")
39            }
40            InspectCacheError::Json(error) => {
41                write!(formatter, "inspect cache json error: {error}")
42            }
43            InspectCacheError::LockPoisoned(name) => {
44                write!(formatter, "inspect cache lock poisoned: {name}")
45            }
46            InspectCacheError::InvalidHash(hash) => {
47                write!(formatter, "inspect cache invalid blake3 hash: {hash}")
48            }
49        }
50    }
51}
52
53impl std::error::Error for InspectCacheError {}
54
55impl From<std::io::Error> for InspectCacheError {
56    fn from(error: std::io::Error) -> Self {
57        Self::Io(error)
58    }
59}
60
61impl From<rusqlite::Error> for InspectCacheError {
62    fn from(error: rusqlite::Error) -> Self {
63        Self::Sql(error)
64    }
65}
66
67impl From<serde_json::Error> for InspectCacheError {
68    fn from(error: serde_json::Error) -> Self {
69        Self::Json(error)
70    }
71}
72
73/// Persisted Tier-2 contribution/aggregate format version.
74///
75/// Bump this when `FileContribution.contribution` JSON changes in a way that
76/// requires existing per-file contributions to be rebuilt before roll-up.
77pub(crate) const TIER2_CONTRIBUTION_CACHE_VERSION: u32 = 5;
78
79#[derive(Debug, Clone)]
80pub struct ContributionRecord {
81    pub category: InspectCategory,
82    pub file_path: PathBuf,
83    pub freshness: FileFreshness,
84    pub contribution: serde_json::Value,
85    pub type_ref_names: BTreeSet<String>,
86}
87
88#[derive(Debug, Clone)]
89struct MemoryAggregate {
90    payload: serde_json::Value,
91    generated_at: i64,
92    contribution_set_hash: Option<String>,
93}
94
95const TIER1_FILE_MEMO_MAX_ENTRIES: usize = 4_096;
96
97#[derive(Debug, Clone)]
98struct Tier1MemoEntry<T> {
99    freshness: FileFreshness,
100    value: T,
101    generation: u64,
102}
103
104#[derive(Debug, Clone)]
105struct LruNode {
106    path: PathBuf,
107    generation: u64,
108}
109
110#[derive(Debug)]
111struct Tier1MemoState<T> {
112    entries: HashMap<PathBuf, Tier1MemoEntry<T>>,
113    lru: VecDeque<LruNode>,
114    next_generation: u64,
115}
116
117impl<T> Default for Tier1MemoState<T> {
118    fn default() -> Self {
119        Self {
120            entries: HashMap::new(),
121            lru: VecDeque::new(),
122            next_generation: 0,
123        }
124    }
125}
126
127impl<T> Tier1MemoState<T> {
128    fn insert(&mut self, path: PathBuf, mut entry: Tier1MemoEntry<T>) {
129        let generation = self.allocate_generation();
130        entry.generation = generation;
131        self.entries.insert(path.clone(), entry);
132        self.lru.push_back(LruNode { path, generation });
133        self.compact_lru_if_needed();
134        self.evict_lru();
135    }
136
137    fn remove(&mut self, path: &Path) {
138        self.entries.remove(path);
139        self.compact_lru_if_needed();
140    }
141
142    fn touch(&mut self, path: &Path) {
143        if !self.entries.contains_key(path) {
144            return;
145        }
146
147        let generation = self.allocate_generation();
148        if let Some(entry) = self.entries.get_mut(path) {
149            entry.generation = generation;
150            self.lru.push_back(LruNode {
151                path: path.to_path_buf(),
152                generation,
153            });
154        }
155        self.compact_lru_if_needed();
156    }
157
158    fn allocate_generation(&mut self) -> u64 {
159        if self.next_generation == u64::MAX {
160            self.rebuild_lru();
161        }
162        let generation = self.next_generation;
163        self.next_generation += 1;
164        generation
165    }
166
167    fn compact_lru_if_needed(&mut self) {
168        let max_lru_nodes = TIER1_FILE_MEMO_MAX_ENTRIES
169            .saturating_mul(2)
170            .max(self.entries.len());
171        if self.lru.len() > max_lru_nodes {
172            self.rebuild_lru();
173        }
174    }
175
176    fn rebuild_lru(&mut self) {
177        let mut live_nodes = self
178            .entries
179            .iter()
180            .map(|(path, entry)| (entry.generation, path.clone()))
181            .collect::<Vec<_>>();
182        live_nodes.sort_by_key(|(generation, _)| *generation);
183
184        self.lru.clear();
185        for (generation, (_, path)) in live_nodes.into_iter().enumerate() {
186            let generation = generation as u64;
187            if let Some(entry) = self.entries.get_mut(&path) {
188                entry.generation = generation;
189            }
190            self.lru.push_back(LruNode { path, generation });
191        }
192        self.next_generation = self.lru.len() as u64;
193    }
194
195    fn evict_lru(&mut self) {
196        while self.entries.len() > TIER1_FILE_MEMO_MAX_ENTRIES {
197            let Some(node) = self.lru.pop_front() else {
198                break;
199            };
200            if self
201                .entries
202                .get(&node.path)
203                .is_some_and(|entry| entry.generation == node.generation)
204            {
205                self.entries.remove(&node.path);
206            }
207        }
208        self.compact_lru_if_needed();
209    }
210}
211
212#[derive(Debug)]
213pub(crate) struct Tier1FileMemo<T> {
214    state: Mutex<Tier1MemoState<T>>,
215}
216
217impl<T> Default for Tier1FileMemo<T> {
218    fn default() -> Self {
219        Self {
220            state: Mutex::new(Tier1MemoState::default()),
221        }
222    }
223}
224
225impl<T: Clone> Tier1FileMemo<T> {
226    pub(crate) fn get_or_insert_with<F>(&self, path: &Path, scan: F) -> T
227    where
228        F: FnOnce(&Path) -> (Option<FileFreshness>, T),
229    {
230        if let Some(cached) = self.cached_value(path) {
231            return cached;
232        }
233
234        let (freshness, value) = scan(path);
235        if let Ok(mut state) = self.state.lock() {
236            if let Some(freshness) = freshness {
237                state.insert(
238                    path.to_path_buf(),
239                    Tier1MemoEntry {
240                        freshness,
241                        value: value.clone(),
242                        generation: 0,
243                    },
244                );
245            } else {
246                state.remove(path);
247            }
248        }
249        value
250    }
251
252    fn cached_value(&self, path: &Path) -> Option<T> {
253        let mut cached = self
254            .state
255            .lock()
256            .ok()
257            .and_then(|state| state.entries.get(path).cloned())?;
258
259        match crate::cache_freshness::verify_file(path, &cached.freshness) {
260            FreshnessVerdict::HotFresh => {
261                if let Ok(mut state) = self.state.lock() {
262                    state.touch(path);
263                }
264                Some(cached.value)
265            }
266            FreshnessVerdict::ContentFresh {
267                new_mtime,
268                new_size,
269            } => {
270                cached.freshness.mtime = new_mtime;
271                cached.freshness.size = new_size;
272                let value = cached.value.clone();
273                if let Ok(mut state) = self.state.lock() {
274                    state.insert(path.to_path_buf(), cached);
275                }
276                Some(value)
277            }
278            FreshnessVerdict::Stale => None,
279            FreshnessVerdict::Deleted => {
280                if let Ok(mut state) = self.state.lock() {
281                    state.remove(path);
282                }
283                None
284            }
285        }
286    }
287}
288
289#[derive(Debug)]
290pub struct InspectCache {
291    project_root: PathBuf,
292    project_key: String,
293    sqlite_path: PathBuf,
294    conn: Mutex<Connection>,
295    memory: RwLock<HashMap<JobKey, MemoryAggregate>>,
296}
297
298impl InspectCache {
299    pub fn open(inspect_dir: PathBuf, project_root: PathBuf) -> Result<Self, InspectCacheError> {
300        std::fs::create_dir_all(&inspect_dir)?;
301        let project_key = crate::search_index::project_cache_key(&project_root);
302        let sqlite_path = inspect_dir.join(format!("{project_key}.sqlite"));
303        let conn = Connection::open(&sqlite_path)?;
304        configure_connection(&conn)?;
305        initialize_schema(&conn)?;
306        Ok(Self {
307            project_root,
308            project_key,
309            sqlite_path,
310            conn: Mutex::new(conn),
311            memory: RwLock::new(HashMap::new()),
312        })
313    }
314
315    pub fn project_root(&self) -> &Path {
316        &self.project_root
317    }
318
319    pub fn project_key(&self) -> &str {
320        &self.project_key
321    }
322
323    pub fn sqlite_path(&self) -> &Path {
324        &self.sqlite_path
325    }
326
327    pub fn store_aggregated(
328        &self,
329        key: JobKey,
330        payload: serde_json::Value,
331    ) -> Result<(), InspectCacheError> {
332        self.store_memory_aggregate(key, payload, None)
333    }
334
335    fn store_memory_aggregate(
336        &self,
337        key: JobKey,
338        payload: serde_json::Value,
339        contribution_set_hash: Option<String>,
340    ) -> Result<(), InspectCacheError> {
341        self.memory
342            .write()
343            .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
344            .insert(
345                key,
346                MemoryAggregate {
347                    payload,
348                    generated_at: unix_seconds_now(),
349                    contribution_set_hash,
350                },
351            );
352        Ok(())
353    }
354
355    pub fn get_aggregated(
356        &self,
357        key: &JobKey,
358    ) -> Result<Option<serde_json::Value>, InspectCacheError> {
359        if !key.category.is_tier2() {
360            return Ok(self
361                .memory
362                .read()
363                .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
364                .get(key)
365                .map(|entry| entry.payload.clone()));
366        }
367
368        let current_hash = {
369            let conn = self
370                .conn
371                .lock()
372                .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
373            contribution_set_hash_with_conn(
374                &conn,
375                key.category,
376                &self.project_key,
377                &self.project_root,
378            )?
379        };
380
381        let memory_entry = {
382            self.memory
383                .read()
384                .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
385                .get(key)
386                .cloned()
387        };
388        if let Some(entry) = memory_entry {
389            if entry.contribution_set_hash.as_deref() == Some(current_hash.as_str()) {
390                return Ok(Some(entry.payload));
391            }
392            self.memory
393                .write()
394                .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
395                .remove(key);
396        }
397
398        let payload = {
399            let conn = self
400                .conn
401                .lock()
402                .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
403            conn.query_row(
404                "SELECT aggregate FROM tier2_aggregates \
405                 WHERE category = ?1 AND project_key = ?2 AND contribution_set_hash = ?3",
406                params![key.category.as_str(), self.project_key, current_hash],
407                |row| row.get::<_, Vec<u8>>(0),
408            )
409            .optional()?
410        };
411
412        match payload {
413            Some(bytes) => {
414                let value = serde_json::from_slice::<serde_json::Value>(&bytes)?;
415                self.store_memory_aggregate(key.clone(), value.clone(), Some(current_hash))?;
416                Ok(Some(value))
417            }
418            None => Ok(None),
419        }
420    }
421
422    pub fn store_tier2_result(
423        &self,
424        key: JobKey,
425        scanned_files: &[PathBuf],
426        contributions: &[FileContribution],
427        aggregate: serde_json::Value,
428    ) -> Result<(), InspectCacheError> {
429        if !key.category.is_tier2() {
430            self.store_aggregated(key, aggregate)?;
431            return Ok(());
432        }
433
434        let now = unix_seconds_now();
435        let mut conn = self
436            .conn
437            .lock()
438            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
439        let tx = conn.transaction()?;
440
441        let scanned_relative = scanned_files
442            .iter()
443            .map(|path| relative_string(&self.project_root, path))
444            .collect::<BTreeSet<_>>();
445        let existing = existing_contribution_paths(&tx, key.category, &self.project_key)?;
446        for file_path in existing {
447            if !scanned_relative.contains(&file_path) {
448                tx.execute(
449                    "DELETE FROM tier2_contributions WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
450                    params![key.category.as_str(), self.project_key, file_path],
451                )?;
452            }
453        }
454
455        for contribution in contributions {
456            let file_path = relative_string(&self.project_root, &contribution.file_path);
457            let blob = serde_json::to_vec(&contribution_with_type_ref_names(
458                contribution.contribution.clone(),
459                &contribution.type_ref_names,
460            ))?;
461            tx.execute(
462                "INSERT INTO tier2_contributions \
463                 (category, project_key, file_path, file_mtime_ns, file_size, file_hash, contribution, generated_at) \
464                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
465                 ON CONFLICT(category, project_key, file_path) DO UPDATE SET \
466                 file_mtime_ns = excluded.file_mtime_ns, \
467                 file_size = excluded.file_size, \
468                 file_hash = excluded.file_hash, \
469                 contribution = excluded.contribution, \
470                 generated_at = excluded.generated_at",
471                params![
472                    contribution.category.as_str(),
473                    self.project_key,
474                    file_path,
475                    system_time_to_ns(contribution.freshness.mtime),
476                    contribution.freshness.size as i64,
477                    hash_to_hex(contribution.freshness.content_hash),
478                    blob,
479                    now,
480                ],
481            )?;
482        }
483
484        let contribution_set_hash = contribution_set_hash_with_conn(
485            &tx,
486            key.category,
487            &self.project_key,
488            &self.project_root,
489        )?;
490        let aggregate_blob = serde_json::to_vec(&aggregate)?;
491        tx.execute(
492            "INSERT INTO tier2_aggregates \
493             (category, project_key, contribution_set_hash, aggregate, generated_at) \
494             VALUES (?1, ?2, ?3, ?4, ?5) \
495             ON CONFLICT(category, project_key) DO UPDATE SET \
496             contribution_set_hash = excluded.contribution_set_hash, \
497             aggregate = excluded.aggregate, \
498             generated_at = excluded.generated_at",
499            params![
500                key.category.as_str(),
501                self.project_key,
502                contribution_set_hash,
503                aggregate_blob,
504                now,
505            ],
506        )?;
507        tx.execute(
508            "INSERT INTO tier2_meta (category, project_key, last_full_run) VALUES (?1, ?2, ?3) \
509             ON CONFLICT(category, project_key) DO UPDATE SET last_full_run = excluded.last_full_run",
510            params![key.category.as_str(), self.project_key, now],
511        )?;
512        tx.commit()?;
513
514        self.store_memory_aggregate(key, aggregate, Some(contribution_set_hash))
515    }
516
517    pub(crate) fn apply_contribution_updates(
518        &self,
519        category: InspectCategory,
520        updates: Tier2ContributionUpdates,
521    ) -> Result<String, InspectCacheError> {
522        let now = unix_seconds_now();
523        let mut conn = self
524            .conn
525            .lock()
526            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
527        let tx = conn.transaction()?;
528
529        for relative_file in updates.deletes {
530            tx.execute(
531                "DELETE FROM tier2_contributions WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
532                params![
533                    category.as_str(),
534                    self.project_key,
535                    relative_file.to_string_lossy().to_string()
536                ],
537            )?;
538        }
539
540        for (relative_file, freshness) in updates.metadata_updates {
541            tx.execute(
542                "UPDATE tier2_contributions \
543                 SET file_mtime_ns = ?4, file_size = ?5, file_hash = ?6 \
544                 WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
545                params![
546                    category.as_str(),
547                    self.project_key,
548                    relative_file.to_string_lossy().to_string(),
549                    system_time_to_ns(freshness.mtime),
550                    freshness.size as i64,
551                    hash_to_hex(freshness.content_hash),
552                ],
553            )?;
554        }
555
556        for contribution in updates.upserts {
557            let file_path = relative_string(&self.project_root, &contribution.file_path);
558            let blob = serde_json::to_vec(&contribution_with_type_ref_names(
559                contribution.contribution.clone(),
560                &contribution.type_ref_names,
561            ))?;
562            tx.execute(
563                "INSERT INTO tier2_contributions \
564                 (category, project_key, file_path, file_mtime_ns, file_size, file_hash, contribution, generated_at) \
565                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
566                 ON CONFLICT(category, project_key, file_path) DO UPDATE SET \
567                 file_mtime_ns = excluded.file_mtime_ns, \
568                 file_size = excluded.file_size, \
569                 file_hash = excluded.file_hash, \
570                 contribution = excluded.contribution, \
571                 generated_at = excluded.generated_at",
572                params![
573                    contribution.category.as_str(),
574                    self.project_key,
575                    file_path,
576                    system_time_to_ns(contribution.freshness.mtime),
577                    contribution.freshness.size as i64,
578                    hash_to_hex(contribution.freshness.content_hash),
579                    blob,
580                    now,
581                ],
582            )?;
583        }
584
585        let contribution_set_hash =
586            contribution_set_hash_with_conn(&tx, category, &self.project_key, &self.project_root)?;
587        tx.commit()?;
588
589        self.memory
590            .write()
591            .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
592            .remove(&JobKey::for_project_category(category));
593
594        Ok(contribution_set_hash)
595    }
596
597    pub(crate) fn load_aggregate_if_hash_matches(
598        &self,
599        category: InspectCategory,
600        contribution_set_hash: &str,
601    ) -> Result<Option<serde_json::Value>, InspectCacheError> {
602        let payload = {
603            let conn = self
604                .conn
605                .lock()
606                .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
607            conn.query_row(
608                "SELECT aggregate FROM tier2_aggregates \
609                 WHERE category = ?1 AND project_key = ?2 AND contribution_set_hash = ?3",
610                params![category.as_str(), self.project_key, contribution_set_hash],
611                |row| row.get::<_, Vec<u8>>(0),
612            )
613            .optional()?
614        };
615
616        match payload {
617            Some(bytes) => {
618                let value = serde_json::from_slice::<serde_json::Value>(&bytes)?;
619                self.store_memory_aggregate(
620                    JobKey::for_project_category(category),
621                    value.clone(),
622                    Some(contribution_set_hash.to_string()),
623                )?;
624                Ok(Some(value))
625            }
626            None => Ok(None),
627        }
628    }
629
630    pub(crate) fn latest_aggregate_any_hash(
631        &self,
632        category: InspectCategory,
633    ) -> Result<Option<serde_json::Value>, InspectCacheError> {
634        let payload = {
635            let conn = self
636                .conn
637                .lock()
638                .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
639            conn.query_row(
640                "SELECT aggregate FROM tier2_aggregates \
641                 WHERE category = ?1 AND project_key = ?2 \
642                 ORDER BY generated_at DESC LIMIT 1",
643                params![category.as_str(), self.project_key],
644                |row| row.get::<_, Vec<u8>>(0),
645            )
646            .optional()?
647        };
648
649        match payload {
650            Some(bytes) => serde_json::from_slice::<serde_json::Value>(&bytes)
651                .map(Some)
652                .map_err(InspectCacheError::from),
653            None => Ok(None),
654        }
655    }
656
657    pub(crate) fn touch_tier2_last_full_run(
658        &self,
659        category: InspectCategory,
660    ) -> Result<i64, InspectCacheError> {
661        let mut conn = self
662            .conn
663            .lock()
664            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
665        let tx = conn.transaction()?;
666        let previous = tx
667            .query_row(
668                "SELECT last_full_run FROM tier2_meta WHERE category = ?1 AND project_key = ?2",
669                params![category.as_str(), self.project_key],
670                |row| row.get::<_, i64>(0),
671            )
672            .optional()?;
673        let now = unix_seconds_now();
674        let last_full_run = previous.map_or(now, |previous| now.max(previous.saturating_add(1)));
675        tx.execute(
676            "INSERT INTO tier2_meta (category, project_key, last_full_run) VALUES (?1, ?2, ?3)              ON CONFLICT(category, project_key) DO UPDATE SET last_full_run = excluded.last_full_run",
677            params![category.as_str(), self.project_key, last_full_run],
678        )?;
679        tx.commit()?;
680        Ok(last_full_run)
681    }
682
683    pub(crate) fn store_tier2_aggregate(
684        &self,
685        key: JobKey,
686        contribution_set_hash: &str,
687        aggregate: serde_json::Value,
688    ) -> Result<(), InspectCacheError> {
689        if !key.category.is_tier2() {
690            self.store_aggregated(key, aggregate)?;
691            return Ok(());
692        }
693
694        let now = unix_seconds_now();
695        let aggregate_blob = serde_json::to_vec(&aggregate)?;
696        let mut conn = self
697            .conn
698            .lock()
699            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
700        let tx = conn.transaction()?;
701        tx.execute(
702            "INSERT INTO tier2_aggregates \
703             (category, project_key, contribution_set_hash, aggregate, generated_at) \
704             VALUES (?1, ?2, ?3, ?4, ?5) \
705             ON CONFLICT(category, project_key) DO UPDATE SET \
706             contribution_set_hash = excluded.contribution_set_hash, \
707             aggregate = excluded.aggregate, \
708             generated_at = excluded.generated_at",
709            params![
710                key.category.as_str(),
711                self.project_key,
712                contribution_set_hash,
713                aggregate_blob,
714                now,
715            ],
716        )?;
717        tx.execute(
718            "INSERT INTO tier2_meta (category, project_key, last_full_run) VALUES (?1, ?2, ?3) \
719             ON CONFLICT(category, project_key) DO UPDATE SET last_full_run = excluded.last_full_run",
720            params![key.category.as_str(), self.project_key, now],
721        )?;
722        tx.commit()?;
723
724        self.store_memory_aggregate(key, aggregate, Some(contribution_set_hash.to_string()))
725    }
726
727    pub fn load_tier2_contributions(
728        &self,
729        category: InspectCategory,
730    ) -> Result<Vec<ContributionRecord>, InspectCacheError> {
731        let conn = self
732            .conn
733            .lock()
734            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
735        let mut stmt = conn.prepare(
736            "SELECT file_path, file_mtime_ns, file_size, file_hash, contribution \
737             FROM tier2_contributions \
738             WHERE category = ?1 AND project_key = ?2 \
739             ORDER BY file_path ASC",
740        )?;
741        let rows = stmt.query_map(params![category.as_str(), self.project_key], |row| {
742            let file_path: String = row.get(0)?;
743            let mtime_ns: i64 = row.get(1)?;
744            let file_size: i64 = row.get(2)?;
745            let file_hash: String = row.get(3)?;
746            let contribution: Vec<u8> = row.get(4)?;
747            Ok((file_path, mtime_ns, file_size, file_hash, contribution))
748        })?;
749
750        let mut records = Vec::new();
751        for row in rows {
752            let (file_path, mtime_ns, file_size, file_hash, contribution) = row?;
753            let contribution: serde_json::Value = serde_json::from_slice(&contribution)?;
754            let type_ref_names = type_ref_names_from_contribution(&contribution);
755            records.push(ContributionRecord {
756                category,
757                file_path: PathBuf::from(file_path),
758                freshness: FileFreshness {
759                    mtime: ns_to_system_time(mtime_ns),
760                    size: file_size.max(0) as u64,
761                    content_hash: hash_from_hex(&file_hash)?,
762                },
763                contribution,
764                type_ref_names,
765            });
766        }
767        Ok(records)
768    }
769
770    pub fn delete_tier2_contribution(
771        &self,
772        category: InspectCategory,
773        relative_file: &Path,
774    ) -> Result<(), InspectCacheError> {
775        let conn = self
776            .conn
777            .lock()
778            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
779        conn.execute(
780            "DELETE FROM tier2_contributions WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
781            params![
782                category.as_str(),
783                self.project_key,
784                relative_file.to_string_lossy().to_string()
785            ],
786        )?;
787        Ok(())
788    }
789
790    pub fn update_content_fresh_metadata(
791        &self,
792        category: InspectCategory,
793        relative_file: &Path,
794        freshness: &FileFreshness,
795    ) -> Result<(), InspectCacheError> {
796        let conn = self
797            .conn
798            .lock()
799            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
800        conn.execute(
801            "UPDATE tier2_contributions \
802             SET file_mtime_ns = ?4, file_size = ?5, file_hash = ?6 \
803             WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
804            params![
805                category.as_str(),
806                self.project_key,
807                relative_file.to_string_lossy().to_string(),
808                system_time_to_ns(freshness.mtime),
809                freshness.size as i64,
810                hash_to_hex(freshness.content_hash),
811            ],
812        )?;
813        Ok(())
814    }
815
816    pub(crate) fn contribution_fingerprint(
817        &self,
818        category: InspectCategory,
819    ) -> Result<(usize, String, bool), InspectCacheError> {
820        let conn = self
821            .conn
822            .lock()
823            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
824        let mut stmt = conn.prepare(
825            "SELECT file_path, file_mtime_ns, file_size, file_hash \
826             FROM tier2_contributions \
827             WHERE category = ?1 AND project_key = ?2 \
828             ORDER BY file_path ASC",
829        )?;
830        let rows = stmt.query_map(params![category.as_str(), self.project_key], |row| {
831            Ok((
832                row.get::<_, String>(0)?,
833                row.get::<_, i64>(1)?,
834                row.get::<_, i64>(2)?,
835                row.get::<_, String>(3)?,
836            ))
837        })?;
838
839        let zero_hash = hash_to_hex(cache_freshness::zero_hash());
840        let mut count = 0usize;
841        let mut hash_complete = true;
842        let mut hasher = blake3::Hasher::new();
843        for row in rows {
844            let (file_path, mtime_ns, file_size, file_hash) = row?;
845            count += 1;
846            if file_hash == zero_hash {
847                hash_complete = false;
848            }
849            update_contribution_fingerprint_hash(
850                &mut hasher,
851                &file_path,
852                mtime_ns.max(0),
853                file_size.max(0) as u64,
854                &file_hash,
855            );
856        }
857
858        Ok((count, hasher.finalize().to_hex().to_string(), hash_complete))
859    }
860
861    pub(crate) fn contribution_freshness(
862        &self,
863        category: InspectCategory,
864    ) -> Result<Vec<(PathBuf, FileFreshness)>, InspectCacheError> {
865        let conn = self
866            .conn
867            .lock()
868            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
869        let mut stmt = conn.prepare(
870            "SELECT file_path, file_mtime_ns, file_size, file_hash \
871             FROM tier2_contributions \
872             WHERE category = ?1 AND project_key = ?2 \
873             ORDER BY file_path ASC",
874        )?;
875        let rows = stmt.query_map(params![category.as_str(), self.project_key], |row| {
876            Ok((
877                row.get::<_, String>(0)?,
878                row.get::<_, i64>(1)?,
879                row.get::<_, i64>(2)?,
880                row.get::<_, String>(3)?,
881            ))
882        })?;
883
884        let mut records = Vec::new();
885        for row in rows {
886            let (file_path, mtime_ns, file_size, file_hash) = row?;
887            records.push((
888                PathBuf::from(file_path),
889                FileFreshness {
890                    mtime: ns_to_system_time(mtime_ns),
891                    size: file_size.max(0) as u64,
892                    content_hash: hash_from_hex(&file_hash)?,
893                },
894            ));
895        }
896        Ok(records)
897    }
898
899    pub fn contribution_set_hash(
900        &self,
901        category: InspectCategory,
902    ) -> Result<String, InspectCacheError> {
903        let conn = self
904            .conn
905            .lock()
906            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
907        contribution_set_hash_with_conn(&conn, category, &self.project_key, &self.project_root)
908    }
909
910    pub fn last_full_run(
911        &self,
912        category: InspectCategory,
913    ) -> Result<Option<i64>, InspectCacheError> {
914        let conn = self
915            .conn
916            .lock()
917            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
918        conn.query_row(
919            "SELECT last_full_run FROM tier2_meta WHERE category = ?1 AND project_key = ?2",
920            params![category.as_str(), self.project_key],
921            |row| row.get::<_, i64>(0),
922        )
923        .optional()
924        .map_err(InspectCacheError::from)
925    }
926
927    pub fn memory_generated_at(&self, key: &JobKey) -> Result<Option<i64>, InspectCacheError> {
928        Ok(self
929            .memory
930            .read()
931            .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
932            .get(key)
933            .map(|entry| entry.generated_at))
934    }
935}
936
937fn configure_connection(conn: &Connection) -> Result<(), InspectCacheError> {
938    conn.pragma_update(None, "journal_mode", "WAL")?;
939    conn.pragma_update(None, "busy_timeout", 5_000)?;
940    Ok(())
941}
942
943fn initialize_schema(conn: &Connection) -> Result<(), InspectCacheError> {
944    conn.execute_batch(
945        "CREATE TABLE IF NOT EXISTS tier2_contributions (
946            category        TEXT NOT NULL,
947            project_key     TEXT NOT NULL,
948            file_path       TEXT NOT NULL,
949            file_mtime_ns   INTEGER NOT NULL,
950            file_size       INTEGER NOT NULL,
951            file_hash       TEXT NOT NULL,
952            contribution    BLOB NOT NULL,
953            generated_at    INTEGER NOT NULL,
954            PRIMARY KEY (category, project_key, file_path)
955        );
956
957        CREATE TABLE IF NOT EXISTS tier2_aggregates (
958            category        TEXT NOT NULL,
959            project_key     TEXT NOT NULL,
960            contribution_set_hash TEXT NOT NULL,
961            aggregate       BLOB NOT NULL,
962            generated_at    INTEGER NOT NULL,
963            PRIMARY KEY (category, project_key)
964        );
965
966        CREATE TABLE IF NOT EXISTS tier2_meta (
967            category        TEXT NOT NULL,
968            project_key     TEXT NOT NULL,
969            last_full_run   INTEGER NOT NULL,
970            PRIMARY KEY (category, project_key)
971        );",
972    )?;
973    Ok(())
974}
975
976fn existing_contribution_paths(
977    conn: &Connection,
978    category: InspectCategory,
979    project_key: &str,
980) -> Result<Vec<String>, InspectCacheError> {
981    let mut stmt = conn.prepare(
982        "SELECT file_path FROM tier2_contributions WHERE category = ?1 AND project_key = ?2",
983    )?;
984    let rows = stmt.query_map(params![category.as_str(), project_key], |row| {
985        row.get::<_, String>(0)
986    })?;
987    rows.collect::<Result<Vec<_>, _>>()
988        .map_err(InspectCacheError::from)
989}
990
991fn contribution_set_hash_with_conn(
992    conn: &Connection,
993    category: InspectCategory,
994    project_key: &str,
995    project_root: &Path,
996) -> Result<String, InspectCacheError> {
997    let mut stmt = conn.prepare(
998        "SELECT file_path, file_hash FROM tier2_contributions \
999         WHERE category = ?1 AND project_key = ?2 ORDER BY file_path ASC",
1000    )?;
1001    let rows = stmt.query_map(params![category.as_str(), project_key], |row| {
1002        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1003    })?;
1004
1005    let mut hasher = blake3::Hasher::new();
1006    hasher.update(b"tier2-contributions\0");
1007    hasher.update(&TIER2_CONTRIBUTION_CACHE_VERSION.to_le_bytes());
1008    hasher.update(b"\0");
1009    for row in rows {
1010        let (file_path, file_hash) = row?;
1011        hasher.update(file_path.as_bytes());
1012        hasher.update(b"\0");
1013        hasher.update(file_hash.as_bytes());
1014        hasher.update(b"\0");
1015    }
1016    update_manifest_fingerprint_hash(&mut hasher, project_root)?;
1017    Ok(hasher.finalize().to_hex().to_string())
1018}
1019
1020fn update_manifest_fingerprint_hash(
1021    hasher: &mut blake3::Hasher,
1022    project_root: &Path,
1023) -> Result<(), InspectCacheError> {
1024    let manifest_root =
1025        fs::canonicalize(project_root).unwrap_or_else(|_| project_root.to_path_buf());
1026    hasher.update(b"entry-point-manifests\0");
1027    for manifest in super::entry_points::collect_entry_point_manifests(project_root) {
1028        let relative_path = manifest
1029            .strip_prefix(&manifest_root)
1030            .unwrap_or(manifest.as_path())
1031            .to_string_lossy()
1032            .replace('\\', "/");
1033        let content_hash = blake3::hash(&fs::read(&manifest)?);
1034        hasher.update(relative_path.as_bytes());
1035        hasher.update(b"\0");
1036        hasher.update(content_hash.as_bytes());
1037        hasher.update(b"\0");
1038    }
1039    Ok(())
1040}
1041
1042fn update_contribution_fingerprint_hash(
1043    hasher: &mut blake3::Hasher,
1044    relative_path: &str,
1045    mtime_ns: i64,
1046    file_size: u64,
1047    file_hash: &str,
1048) {
1049    hasher.update(relative_path.as_bytes());
1050    hasher.update(&[0]);
1051    hasher.update(&mtime_ns.to_le_bytes());
1052    hasher.update(&file_size.to_le_bytes());
1053    hasher.update(&[0]);
1054    hasher.update(file_hash.as_bytes());
1055}
1056
1057fn relative_string(project_root: &Path, path: &Path) -> String {
1058    path.strip_prefix(project_root)
1059        .unwrap_or(path)
1060        .to_string_lossy()
1061        .to_string()
1062}
1063
1064fn system_time_to_ns(time: SystemTime) -> i64 {
1065    let nanos = time
1066        .duration_since(UNIX_EPOCH)
1067        .unwrap_or_else(|_| Duration::from_secs(0))
1068        .as_nanos();
1069    nanos.min(i64::MAX as u128) as i64
1070}
1071
1072fn ns_to_system_time(value: i64) -> SystemTime {
1073    UNIX_EPOCH + Duration::from_nanos(value.max(0) as u64)
1074}
1075
1076fn hash_to_hex(hash: blake3::Hash) -> String {
1077    hash.to_hex().to_string()
1078}
1079
1080fn hash_from_hex(value: &str) -> Result<blake3::Hash, InspectCacheError> {
1081    if value.len() != 64 {
1082        return Err(InspectCacheError::InvalidHash(value.to_string()));
1083    }
1084    let mut bytes = [0u8; 32];
1085    for (index, chunk) in value.as_bytes().chunks(2).enumerate() {
1086        let hex = std::str::from_utf8(chunk)
1087            .map_err(|_| InspectCacheError::InvalidHash(value.to_string()))?;
1088        bytes[index] = u8::from_str_radix(hex, 16)
1089            .map_err(|_| InspectCacheError::InvalidHash(value.to_string()))?;
1090    }
1091    Ok(blake3::Hash::from_bytes(bytes))
1092}
1093
1094fn unix_seconds_now() -> i64 {
1095    SystemTime::now()
1096        .duration_since(UNIX_EPOCH)
1097        .unwrap_or_else(|_| Duration::from_secs(0))
1098        .as_secs()
1099        .min(i64::MAX as u64) as i64
1100}
1101
1102#[cfg(test)]
1103mod tests {
1104    use super::*;
1105    use std::cell::Cell;
1106    use std::fs;
1107    use std::path::{Path, PathBuf};
1108
1109    fn collect_freshness(path: &Path) -> FileFreshness {
1110        crate::cache_freshness::collect(path).unwrap()
1111    }
1112
1113    #[test]
1114    fn tier1_file_memo_evicts_lru_and_keeps_recent_hits() {
1115        let temp = tempfile::tempdir().unwrap();
1116        let memo = Tier1FileMemo::<usize>::default();
1117        let mut paths = Vec::with_capacity(TIER1_FILE_MEMO_MAX_ENTRIES);
1118
1119        for index in 0..TIER1_FILE_MEMO_MAX_ENTRIES {
1120            let path = temp.path().join(format!("file-{index}.txt"));
1121            fs::write(&path, index.to_string()).unwrap();
1122            let value =
1123                memo.get_or_insert_with(&path, |path| (Some(collect_freshness(path)), index));
1124            assert_eq!(value, index);
1125            paths.push(path);
1126        }
1127
1128        let recent_path = paths[0].clone();
1129        let recent_value = memo.get_or_insert_with(&recent_path, |_| {
1130            panic!("recently inserted entry should hit before eviction")
1131        });
1132        assert_eq!(recent_value, 0);
1133
1134        let evicting_path = temp.path().join("new-file.txt");
1135        fs::write(&evicting_path, "new").unwrap();
1136        let evicting_value = memo.get_or_insert_with(&evicting_path, |path| {
1137            (Some(collect_freshness(path)), TIER1_FILE_MEMO_MAX_ENTRIES)
1138        });
1139        assert_eq!(evicting_value, TIER1_FILE_MEMO_MAX_ENTRIES);
1140
1141        let state = memo.state.lock().unwrap();
1142        assert_eq!(state.entries.len(), TIER1_FILE_MEMO_MAX_ENTRIES);
1143        assert!(state.entries.contains_key(&recent_path));
1144        assert!(state.entries.contains_key(&evicting_path));
1145        assert!(!state.entries.contains_key(&paths[1]));
1146        drop(state);
1147
1148        let recent_value = memo.get_or_insert_with(&recent_path, |_| {
1149            panic!("recently used entry should survive eviction")
1150        });
1151        assert_eq!(recent_value, 0);
1152    }
1153
1154    #[test]
1155    fn tier1_file_memo_repeated_touches_keep_lazy_lru_bounded() {
1156        let temp = tempfile::tempdir().unwrap();
1157        let memo = Tier1FileMemo::<usize>::default();
1158        let mut paths = Vec::with_capacity(TIER1_FILE_MEMO_MAX_ENTRIES);
1159
1160        for index in 0..TIER1_FILE_MEMO_MAX_ENTRIES {
1161            let path = temp.path().join(format!("file-{index}.txt"));
1162            fs::write(&path, index.to_string()).unwrap();
1163            memo.get_or_insert_with(&path, |path| (Some(collect_freshness(path)), index));
1164            paths.push(path);
1165        }
1166
1167        for _ in 0..(TIER1_FILE_MEMO_MAX_ENTRIES * 3) {
1168            let value = memo.get_or_insert_with(&paths[0], |_| {
1169                panic!("hot entry should stay cached while it is repeatedly touched")
1170            });
1171            assert_eq!(value, 0);
1172        }
1173
1174        let evicting_path = temp.path().join("new-file.txt");
1175        fs::write(&evicting_path, "new").unwrap();
1176        memo.get_or_insert_with(&evicting_path, |path| {
1177            (Some(collect_freshness(path)), TIER1_FILE_MEMO_MAX_ENTRIES)
1178        });
1179
1180        let state = memo.state.lock().unwrap();
1181        assert_eq!(state.entries.len(), TIER1_FILE_MEMO_MAX_ENTRIES);
1182        assert!(state.entries.contains_key(&paths[0]));
1183        assert!(state.entries.contains_key(&evicting_path));
1184        assert!(!state.entries.contains_key(&paths[1]));
1185        assert!(
1186            state.lru.len() <= TIER1_FILE_MEMO_MAX_ENTRIES * 2,
1187            "lazy LRU queue should be compacted instead of growing without bound"
1188        );
1189    }
1190
1191    #[test]
1192    fn tier1_file_memo_reuses_fresh_entries_and_rescans_stale_files() {
1193        let temp = tempfile::tempdir().unwrap();
1194        let path = temp.path().join("memo.txt");
1195        fs::write(&path, "first").unwrap();
1196
1197        let memo = Tier1FileMemo::<String>::default();
1198        let scans = Cell::new(0);
1199
1200        let first = memo.get_or_insert_with(&path, |path| {
1201            scans.set(scans.get() + 1);
1202            (Some(collect_freshness(path)), "first scan".to_string())
1203        });
1204        assert_eq!(first, "first scan");
1205        assert_eq!(scans.get(), 1);
1206
1207        let unchanged =
1208            memo.get_or_insert_with(&path, |_| panic!("unchanged file should reuse Tier-1 memo"));
1209        assert_eq!(unchanged, "first scan");
1210        assert_eq!(scans.get(), 1);
1211
1212        fs::write(&path, "changed file contents").unwrap();
1213        let changed = memo.get_or_insert_with(&path, |path| {
1214            scans.set(scans.get() + 1);
1215            (Some(collect_freshness(path)), "second scan".to_string())
1216        });
1217        assert_eq!(changed, "second scan");
1218        assert_eq!(scans.get(), 2);
1219
1220        let fresh_after_rescan = memo.get_or_insert_with(&path, |_| {
1221            panic!("rescanned file should reuse refreshed Tier-1 memo")
1222        });
1223        assert_eq!(fresh_after_rescan, "second scan");
1224        assert_eq!(scans.get(), 2);
1225    }
1226
1227    #[derive(serde::Deserialize, serde::Serialize)]
1228    struct RoundTripContributionRecord {
1229        category: String,
1230        file_path: PathBuf,
1231        contribution: serde_json::Value,
1232        type_ref_names: BTreeSet<String>,
1233    }
1234
1235    impl From<&ContributionRecord> for RoundTripContributionRecord {
1236        fn from(record: &ContributionRecord) -> Self {
1237            Self {
1238                category: record.category.as_str().to_string(),
1239                file_path: record.file_path.clone(),
1240                contribution: record.contribution.clone(),
1241                type_ref_names: record.type_ref_names.clone(),
1242            }
1243        }
1244    }
1245
1246    #[test]
1247    fn contribution_record_round_trip_preserves_dead_code_liveness_metadata() {
1248        let temp = tempfile::tempdir().unwrap();
1249        let project_root = temp.path().join("project");
1250        let inspect_dir = temp.path().join("inspect");
1251        let source = project_root.join("src/lib.ts");
1252        fs::create_dir_all(source.parent().unwrap()).unwrap();
1253        fs::write(&source, "export interface Widget { id: string }\n").unwrap();
1254
1255        let cache = InspectCache::open(inspect_dir.clone(), project_root.clone()).unwrap();
1256        let contribution = FileContribution::new(
1257            InspectCategory::DeadCode,
1258            source.clone(),
1259            collect_freshness(&source),
1260            serde_json::json!({
1261                "file": "src/lib.ts",
1262                "exports": [{
1263                    "symbol": "Widget",
1264                    "kind": "interface",
1265                    "line": 1,
1266                    "is_type_like": true,
1267                    "is_entry_point": false,
1268                }],
1269                "internal_calls": [],
1270                "liveness_roots": [],
1271                "dispatched_method_names": ["render"],
1272                "type_ref_names": ["Widget"],
1273            }),
1274        )
1275        .with_type_ref_names(["Widget".to_string()]);
1276        cache
1277            .store_tier2_result(
1278                JobKey::for_project_category(InspectCategory::DeadCode),
1279                std::slice::from_ref(&source),
1280                &[contribution],
1281                serde_json::json!({ "count": 0, "items": [] }),
1282            )
1283            .unwrap();
1284        drop(cache);
1285
1286        let cache = InspectCache::open(inspect_dir, project_root).unwrap();
1287        let records = cache
1288            .load_tier2_contributions(InspectCategory::DeadCode)
1289            .unwrap();
1290        assert_eq!(records.len(), 1);
1291
1292        let serialized =
1293            serde_json::to_vec(&RoundTripContributionRecord::from(&records[0])).unwrap();
1294        let decoded: RoundTripContributionRecord = serde_json::from_slice(&serialized).unwrap();
1295        assert_eq!(decoded.category, InspectCategory::DeadCode.as_str());
1296        assert_eq!(decoded.contribution["dispatched_method_names"][0], "render");
1297        assert_eq!(decoded.contribution["type_ref_names"][0], "Widget");
1298        assert!(decoded.type_ref_names.contains("Widget"));
1299        assert_eq!(
1300            decoded.contribution["exports"][0]["is_type_like"].as_bool(),
1301            Some(true)
1302        );
1303        assert_eq!(TIER2_CONTRIBUTION_CACHE_VERSION, 5);
1304    }
1305}