Skip to main content

aft/inspect/
cache.rs

1use std::collections::{BTreeSet, HashMap, VecDeque};
2use std::fmt;
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::{Mutex, RwLock};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use rusqlite::{params, Connection, OptionalExtension};
9
10use crate::cache_freshness::{FileFreshness, FreshnessVerdict};
11
12use super::job::{FileContribution, InspectCategory, JobKey};
13
14#[derive(Debug, Default)]
15pub(crate) struct Tier2ContributionUpdates {
16    pub upserts: Vec<FileContribution>,
17    pub deletes: Vec<PathBuf>,
18    pub metadata_updates: Vec<(PathBuf, FileFreshness)>,
19}
20
21#[derive(Debug)]
22pub enum InspectCacheError {
23    Io(std::io::Error),
24    Sql(rusqlite::Error),
25    Json(serde_json::Error),
26    LockPoisoned(&'static str),
27    InvalidHash(String),
28}
29
30impl fmt::Display for InspectCacheError {
31    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            InspectCacheError::Io(error) => write!(formatter, "inspect cache io error: {error}"),
34            InspectCacheError::Sql(error) => {
35                write!(formatter, "inspect cache sqlite error: {error}")
36            }
37            InspectCacheError::Json(error) => {
38                write!(formatter, "inspect cache json error: {error}")
39            }
40            InspectCacheError::LockPoisoned(name) => {
41                write!(formatter, "inspect cache lock poisoned: {name}")
42            }
43            InspectCacheError::InvalidHash(hash) => {
44                write!(formatter, "inspect cache invalid blake3 hash: {hash}")
45            }
46        }
47    }
48}
49
50impl std::error::Error for InspectCacheError {}
51
52impl From<std::io::Error> for InspectCacheError {
53    fn from(error: std::io::Error) -> Self {
54        Self::Io(error)
55    }
56}
57
58impl From<rusqlite::Error> for InspectCacheError {
59    fn from(error: rusqlite::Error) -> Self {
60        Self::Sql(error)
61    }
62}
63
64impl From<serde_json::Error> for InspectCacheError {
65    fn from(error: serde_json::Error) -> Self {
66        Self::Json(error)
67    }
68}
69
70#[derive(Debug, Clone)]
71pub struct ContributionRecord {
72    pub category: InspectCategory,
73    pub file_path: PathBuf,
74    pub freshness: FileFreshness,
75    pub contribution: serde_json::Value,
76}
77
78#[derive(Debug, Clone)]
79struct MemoryAggregate {
80    payload: serde_json::Value,
81    generated_at: i64,
82    contribution_set_hash: Option<String>,
83}
84
85const TIER1_FILE_MEMO_MAX_ENTRIES: usize = 4_096;
86
87#[derive(Debug, Clone)]
88struct Tier1MemoEntry<T> {
89    freshness: FileFreshness,
90    value: T,
91    generation: u64,
92}
93
94#[derive(Debug, Clone)]
95struct LruNode {
96    path: PathBuf,
97    generation: u64,
98}
99
100#[derive(Debug)]
101struct Tier1MemoState<T> {
102    entries: HashMap<PathBuf, Tier1MemoEntry<T>>,
103    lru: VecDeque<LruNode>,
104    next_generation: u64,
105}
106
107impl<T> Default for Tier1MemoState<T> {
108    fn default() -> Self {
109        Self {
110            entries: HashMap::new(),
111            lru: VecDeque::new(),
112            next_generation: 0,
113        }
114    }
115}
116
117impl<T> Tier1MemoState<T> {
118    fn insert(&mut self, path: PathBuf, mut entry: Tier1MemoEntry<T>) {
119        let generation = self.allocate_generation();
120        entry.generation = generation;
121        self.entries.insert(path.clone(), entry);
122        self.lru.push_back(LruNode { path, generation });
123        self.compact_lru_if_needed();
124        self.evict_lru();
125    }
126
127    fn remove(&mut self, path: &Path) {
128        self.entries.remove(path);
129        self.compact_lru_if_needed();
130    }
131
132    fn touch(&mut self, path: &Path) {
133        if !self.entries.contains_key(path) {
134            return;
135        }
136
137        let generation = self.allocate_generation();
138        if let Some(entry) = self.entries.get_mut(path) {
139            entry.generation = generation;
140            self.lru.push_back(LruNode {
141                path: path.to_path_buf(),
142                generation,
143            });
144        }
145        self.compact_lru_if_needed();
146    }
147
148    fn allocate_generation(&mut self) -> u64 {
149        if self.next_generation == u64::MAX {
150            self.rebuild_lru();
151        }
152        let generation = self.next_generation;
153        self.next_generation += 1;
154        generation
155    }
156
157    fn compact_lru_if_needed(&mut self) {
158        let max_lru_nodes = TIER1_FILE_MEMO_MAX_ENTRIES
159            .saturating_mul(2)
160            .max(self.entries.len());
161        if self.lru.len() > max_lru_nodes {
162            self.rebuild_lru();
163        }
164    }
165
166    fn rebuild_lru(&mut self) {
167        let mut live_nodes = self
168            .entries
169            .iter()
170            .map(|(path, entry)| (entry.generation, path.clone()))
171            .collect::<Vec<_>>();
172        live_nodes.sort_by_key(|(generation, _)| *generation);
173
174        self.lru.clear();
175        for (generation, (_, path)) in live_nodes.into_iter().enumerate() {
176            let generation = generation as u64;
177            if let Some(entry) = self.entries.get_mut(&path) {
178                entry.generation = generation;
179            }
180            self.lru.push_back(LruNode { path, generation });
181        }
182        self.next_generation = self.lru.len() as u64;
183    }
184
185    fn evict_lru(&mut self) {
186        while self.entries.len() > TIER1_FILE_MEMO_MAX_ENTRIES {
187            let Some(node) = self.lru.pop_front() else {
188                break;
189            };
190            if self
191                .entries
192                .get(&node.path)
193                .is_some_and(|entry| entry.generation == node.generation)
194            {
195                self.entries.remove(&node.path);
196            }
197        }
198        self.compact_lru_if_needed();
199    }
200}
201
202#[derive(Debug)]
203pub(crate) struct Tier1FileMemo<T> {
204    state: Mutex<Tier1MemoState<T>>,
205}
206
207impl<T> Default for Tier1FileMemo<T> {
208    fn default() -> Self {
209        Self {
210            state: Mutex::new(Tier1MemoState::default()),
211        }
212    }
213}
214
215impl<T: Clone> Tier1FileMemo<T> {
216    pub(crate) fn get_or_insert_with<F>(&self, path: &Path, scan: F) -> T
217    where
218        F: FnOnce(&Path) -> (Option<FileFreshness>, T),
219    {
220        if let Some(cached) = self.cached_value(path) {
221            return cached;
222        }
223
224        let (freshness, value) = scan(path);
225        if let Ok(mut state) = self.state.lock() {
226            if let Some(freshness) = freshness {
227                state.insert(
228                    path.to_path_buf(),
229                    Tier1MemoEntry {
230                        freshness,
231                        value: value.clone(),
232                        generation: 0,
233                    },
234                );
235            } else {
236                state.remove(path);
237            }
238        }
239        value
240    }
241
242    fn cached_value(&self, path: &Path) -> Option<T> {
243        let mut cached = self
244            .state
245            .lock()
246            .ok()
247            .and_then(|state| state.entries.get(path).cloned())?;
248
249        match crate::cache_freshness::verify_file(path, &cached.freshness) {
250            FreshnessVerdict::HotFresh => {
251                if let Ok(mut state) = self.state.lock() {
252                    state.touch(path);
253                }
254                Some(cached.value)
255            }
256            FreshnessVerdict::ContentFresh {
257                new_mtime,
258                new_size,
259            } => {
260                cached.freshness.mtime = new_mtime;
261                cached.freshness.size = new_size;
262                let value = cached.value.clone();
263                if let Ok(mut state) = self.state.lock() {
264                    state.insert(path.to_path_buf(), cached);
265                }
266                Some(value)
267            }
268            FreshnessVerdict::Stale => None,
269            FreshnessVerdict::Deleted => {
270                if let Ok(mut state) = self.state.lock() {
271                    state.remove(path);
272                }
273                None
274            }
275        }
276    }
277}
278
279#[derive(Debug)]
280pub struct InspectCache {
281    project_root: PathBuf,
282    project_key: String,
283    sqlite_path: PathBuf,
284    conn: Mutex<Connection>,
285    memory: RwLock<HashMap<JobKey, MemoryAggregate>>,
286}
287
288impl InspectCache {
289    pub fn open(inspect_dir: PathBuf, project_root: PathBuf) -> Result<Self, InspectCacheError> {
290        std::fs::create_dir_all(&inspect_dir)?;
291        let project_key = crate::search_index::project_cache_key(&project_root);
292        let sqlite_path = inspect_dir.join(format!("{project_key}.sqlite"));
293        let conn = Connection::open(&sqlite_path)?;
294        configure_connection(&conn)?;
295        initialize_schema(&conn)?;
296        Ok(Self {
297            project_root,
298            project_key,
299            sqlite_path,
300            conn: Mutex::new(conn),
301            memory: RwLock::new(HashMap::new()),
302        })
303    }
304
305    pub fn project_root(&self) -> &Path {
306        &self.project_root
307    }
308
309    pub fn project_key(&self) -> &str {
310        &self.project_key
311    }
312
313    pub fn sqlite_path(&self) -> &Path {
314        &self.sqlite_path
315    }
316
317    pub fn store_aggregated(
318        &self,
319        key: JobKey,
320        payload: serde_json::Value,
321    ) -> Result<(), InspectCacheError> {
322        self.store_memory_aggregate(key, payload, None)
323    }
324
325    fn store_memory_aggregate(
326        &self,
327        key: JobKey,
328        payload: serde_json::Value,
329        contribution_set_hash: Option<String>,
330    ) -> Result<(), InspectCacheError> {
331        self.memory
332            .write()
333            .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
334            .insert(
335                key,
336                MemoryAggregate {
337                    payload,
338                    generated_at: unix_seconds_now(),
339                    contribution_set_hash,
340                },
341            );
342        Ok(())
343    }
344
345    pub fn get_aggregated(
346        &self,
347        key: &JobKey,
348    ) -> Result<Option<serde_json::Value>, InspectCacheError> {
349        if !key.category.is_tier2() {
350            return Ok(self
351                .memory
352                .read()
353                .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
354                .get(key)
355                .map(|entry| entry.payload.clone()));
356        }
357
358        let current_hash = {
359            let conn = self
360                .conn
361                .lock()
362                .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
363            contribution_set_hash_with_conn(
364                &conn,
365                key.category,
366                &self.project_key,
367                &self.project_root,
368            )?
369        };
370
371        let memory_entry = {
372            self.memory
373                .read()
374                .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
375                .get(key)
376                .cloned()
377        };
378        if let Some(entry) = memory_entry {
379            if entry.contribution_set_hash.as_deref() == Some(current_hash.as_str()) {
380                return Ok(Some(entry.payload));
381            }
382            self.memory
383                .write()
384                .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
385                .remove(key);
386        }
387
388        let payload = {
389            let conn = self
390                .conn
391                .lock()
392                .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
393            conn.query_row(
394                "SELECT aggregate FROM tier2_aggregates \
395                 WHERE category = ?1 AND project_key = ?2 AND contribution_set_hash = ?3",
396                params![key.category.as_str(), self.project_key, current_hash],
397                |row| row.get::<_, Vec<u8>>(0),
398            )
399            .optional()?
400        };
401
402        match payload {
403            Some(bytes) => {
404                let value = serde_json::from_slice::<serde_json::Value>(&bytes)?;
405                self.store_memory_aggregate(key.clone(), value.clone(), Some(current_hash))?;
406                Ok(Some(value))
407            }
408            None => Ok(None),
409        }
410    }
411
412    pub fn store_tier2_result(
413        &self,
414        key: JobKey,
415        scanned_files: &[PathBuf],
416        contributions: &[FileContribution],
417        aggregate: serde_json::Value,
418    ) -> Result<(), InspectCacheError> {
419        if !key.category.is_tier2() {
420            self.store_aggregated(key, aggregate)?;
421            return Ok(());
422        }
423
424        let now = unix_seconds_now();
425        let mut conn = self
426            .conn
427            .lock()
428            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
429        let tx = conn.transaction()?;
430
431        let scanned_relative = scanned_files
432            .iter()
433            .map(|path| relative_string(&self.project_root, path))
434            .collect::<BTreeSet<_>>();
435        let existing = existing_contribution_paths(&tx, key.category, &self.project_key)?;
436        for file_path in existing {
437            if !scanned_relative.contains(&file_path) {
438                tx.execute(
439                    "DELETE FROM tier2_contributions WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
440                    params![key.category.as_str(), self.project_key, file_path],
441                )?;
442            }
443        }
444
445        for contribution in contributions {
446            let file_path = relative_string(&self.project_root, &contribution.file_path);
447            let blob = serde_json::to_vec(&contribution.contribution)?;
448            tx.execute(
449                "INSERT INTO tier2_contributions \
450                 (category, project_key, file_path, file_mtime_ns, file_size, file_hash, contribution, generated_at) \
451                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
452                 ON CONFLICT(category, project_key, file_path) DO UPDATE SET \
453                 file_mtime_ns = excluded.file_mtime_ns, \
454                 file_size = excluded.file_size, \
455                 file_hash = excluded.file_hash, \
456                 contribution = excluded.contribution, \
457                 generated_at = excluded.generated_at",
458                params![
459                    contribution.category.as_str(),
460                    self.project_key,
461                    file_path,
462                    system_time_to_ns(contribution.freshness.mtime),
463                    contribution.freshness.size as i64,
464                    hash_to_hex(contribution.freshness.content_hash),
465                    blob,
466                    now,
467                ],
468            )?;
469        }
470
471        let contribution_set_hash = contribution_set_hash_with_conn(
472            &tx,
473            key.category,
474            &self.project_key,
475            &self.project_root,
476        )?;
477        let aggregate_blob = serde_json::to_vec(&aggregate)?;
478        tx.execute(
479            "INSERT INTO tier2_aggregates \
480             (category, project_key, contribution_set_hash, aggregate, generated_at) \
481             VALUES (?1, ?2, ?3, ?4, ?5) \
482             ON CONFLICT(category, project_key) DO UPDATE SET \
483             contribution_set_hash = excluded.contribution_set_hash, \
484             aggregate = excluded.aggregate, \
485             generated_at = excluded.generated_at",
486            params![
487                key.category.as_str(),
488                self.project_key,
489                contribution_set_hash,
490                aggregate_blob,
491                now,
492            ],
493        )?;
494        tx.execute(
495            "INSERT INTO tier2_meta (category, project_key, last_full_run) VALUES (?1, ?2, ?3) \
496             ON CONFLICT(category, project_key) DO UPDATE SET last_full_run = excluded.last_full_run",
497            params![key.category.as_str(), self.project_key, now],
498        )?;
499        tx.commit()?;
500
501        self.store_memory_aggregate(key, aggregate, Some(contribution_set_hash))
502    }
503
504    pub(crate) fn apply_contribution_updates(
505        &self,
506        category: InspectCategory,
507        updates: Tier2ContributionUpdates,
508    ) -> Result<String, InspectCacheError> {
509        let now = unix_seconds_now();
510        let mut conn = self
511            .conn
512            .lock()
513            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
514        let tx = conn.transaction()?;
515
516        for relative_file in updates.deletes {
517            tx.execute(
518                "DELETE FROM tier2_contributions WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
519                params![
520                    category.as_str(),
521                    self.project_key,
522                    relative_file.to_string_lossy().to_string()
523                ],
524            )?;
525        }
526
527        for (relative_file, freshness) in updates.metadata_updates {
528            tx.execute(
529                "UPDATE tier2_contributions \
530                 SET file_mtime_ns = ?4, file_size = ?5, file_hash = ?6 \
531                 WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
532                params![
533                    category.as_str(),
534                    self.project_key,
535                    relative_file.to_string_lossy().to_string(),
536                    system_time_to_ns(freshness.mtime),
537                    freshness.size as i64,
538                    hash_to_hex(freshness.content_hash),
539                ],
540            )?;
541        }
542
543        for contribution in updates.upserts {
544            let file_path = relative_string(&self.project_root, &contribution.file_path);
545            let blob = serde_json::to_vec(&contribution.contribution)?;
546            tx.execute(
547                "INSERT INTO tier2_contributions \
548                 (category, project_key, file_path, file_mtime_ns, file_size, file_hash, contribution, generated_at) \
549                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
550                 ON CONFLICT(category, project_key, file_path) DO UPDATE SET \
551                 file_mtime_ns = excluded.file_mtime_ns, \
552                 file_size = excluded.file_size, \
553                 file_hash = excluded.file_hash, \
554                 contribution = excluded.contribution, \
555                 generated_at = excluded.generated_at",
556                params![
557                    contribution.category.as_str(),
558                    self.project_key,
559                    file_path,
560                    system_time_to_ns(contribution.freshness.mtime),
561                    contribution.freshness.size as i64,
562                    hash_to_hex(contribution.freshness.content_hash),
563                    blob,
564                    now,
565                ],
566            )?;
567        }
568
569        let contribution_set_hash =
570            contribution_set_hash_with_conn(&tx, category, &self.project_key, &self.project_root)?;
571        tx.commit()?;
572
573        self.memory
574            .write()
575            .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
576            .remove(&JobKey::for_project_category(category));
577
578        Ok(contribution_set_hash)
579    }
580
581    pub(crate) fn load_aggregate_if_hash_matches(
582        &self,
583        category: InspectCategory,
584        contribution_set_hash: &str,
585    ) -> Result<Option<serde_json::Value>, InspectCacheError> {
586        let payload = {
587            let conn = self
588                .conn
589                .lock()
590                .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
591            conn.query_row(
592                "SELECT aggregate FROM tier2_aggregates \
593                 WHERE category = ?1 AND project_key = ?2 AND contribution_set_hash = ?3",
594                params![category.as_str(), self.project_key, contribution_set_hash],
595                |row| row.get::<_, Vec<u8>>(0),
596            )
597            .optional()?
598        };
599
600        match payload {
601            Some(bytes) => {
602                let value = serde_json::from_slice::<serde_json::Value>(&bytes)?;
603                self.store_memory_aggregate(
604                    JobKey::for_project_category(category),
605                    value.clone(),
606                    Some(contribution_set_hash.to_string()),
607                )?;
608                Ok(Some(value))
609            }
610            None => Ok(None),
611        }
612    }
613
614    pub(crate) fn latest_aggregate_any_hash(
615        &self,
616        category: InspectCategory,
617    ) -> Result<Option<serde_json::Value>, InspectCacheError> {
618        let payload = {
619            let conn = self
620                .conn
621                .lock()
622                .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
623            conn.query_row(
624                "SELECT aggregate FROM tier2_aggregates \
625                 WHERE category = ?1 AND project_key = ?2 \
626                 ORDER BY generated_at DESC LIMIT 1",
627                params![category.as_str(), self.project_key],
628                |row| row.get::<_, Vec<u8>>(0),
629            )
630            .optional()?
631        };
632
633        match payload {
634            Some(bytes) => serde_json::from_slice::<serde_json::Value>(&bytes)
635                .map(Some)
636                .map_err(InspectCacheError::from),
637            None => Ok(None),
638        }
639    }
640
641    pub(crate) fn touch_tier2_last_full_run(
642        &self,
643        category: InspectCategory,
644    ) -> Result<i64, InspectCacheError> {
645        let mut conn = self
646            .conn
647            .lock()
648            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
649        let tx = conn.transaction()?;
650        let previous = tx
651            .query_row(
652                "SELECT last_full_run FROM tier2_meta WHERE category = ?1 AND project_key = ?2",
653                params![category.as_str(), self.project_key],
654                |row| row.get::<_, i64>(0),
655            )
656            .optional()?;
657        let now = unix_seconds_now();
658        let last_full_run = previous.map_or(now, |previous| now.max(previous.saturating_add(1)));
659        tx.execute(
660            "INSERT INTO tier2_meta (category, project_key, last_full_run) VALUES (?1, ?2, ?3)              ON CONFLICT(category, project_key) DO UPDATE SET last_full_run = excluded.last_full_run",
661            params![category.as_str(), self.project_key, last_full_run],
662        )?;
663        tx.commit()?;
664        Ok(last_full_run)
665    }
666
667    pub(crate) fn store_tier2_aggregate(
668        &self,
669        key: JobKey,
670        contribution_set_hash: &str,
671        aggregate: serde_json::Value,
672    ) -> Result<(), InspectCacheError> {
673        if !key.category.is_tier2() {
674            self.store_aggregated(key, aggregate)?;
675            return Ok(());
676        }
677
678        let now = unix_seconds_now();
679        let aggregate_blob = serde_json::to_vec(&aggregate)?;
680        let mut conn = self
681            .conn
682            .lock()
683            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
684        let tx = conn.transaction()?;
685        tx.execute(
686            "INSERT INTO tier2_aggregates \
687             (category, project_key, contribution_set_hash, aggregate, generated_at) \
688             VALUES (?1, ?2, ?3, ?4, ?5) \
689             ON CONFLICT(category, project_key) DO UPDATE SET \
690             contribution_set_hash = excluded.contribution_set_hash, \
691             aggregate = excluded.aggregate, \
692             generated_at = excluded.generated_at",
693            params![
694                key.category.as_str(),
695                self.project_key,
696                contribution_set_hash,
697                aggregate_blob,
698                now,
699            ],
700        )?;
701        tx.execute(
702            "INSERT INTO tier2_meta (category, project_key, last_full_run) VALUES (?1, ?2, ?3) \
703             ON CONFLICT(category, project_key) DO UPDATE SET last_full_run = excluded.last_full_run",
704            params![key.category.as_str(), self.project_key, now],
705        )?;
706        tx.commit()?;
707
708        self.store_memory_aggregate(key, aggregate, Some(contribution_set_hash.to_string()))
709    }
710
711    pub fn load_tier2_contributions(
712        &self,
713        category: InspectCategory,
714    ) -> Result<Vec<ContributionRecord>, InspectCacheError> {
715        let conn = self
716            .conn
717            .lock()
718            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
719        let mut stmt = conn.prepare(
720            "SELECT file_path, file_mtime_ns, file_size, file_hash, contribution \
721             FROM tier2_contributions \
722             WHERE category = ?1 AND project_key = ?2 \
723             ORDER BY file_path ASC",
724        )?;
725        let rows = stmt.query_map(params![category.as_str(), self.project_key], |row| {
726            let file_path: String = row.get(0)?;
727            let mtime_ns: i64 = row.get(1)?;
728            let file_size: i64 = row.get(2)?;
729            let file_hash: String = row.get(3)?;
730            let contribution: Vec<u8> = row.get(4)?;
731            Ok((file_path, mtime_ns, file_size, file_hash, contribution))
732        })?;
733
734        let mut records = Vec::new();
735        for row in rows {
736            let (file_path, mtime_ns, file_size, file_hash, contribution) = row?;
737            records.push(ContributionRecord {
738                category,
739                file_path: PathBuf::from(file_path),
740                freshness: FileFreshness {
741                    mtime: ns_to_system_time(mtime_ns),
742                    size: file_size.max(0) as u64,
743                    content_hash: hash_from_hex(&file_hash)?,
744                },
745                contribution: serde_json::from_slice(&contribution)?,
746            });
747        }
748        Ok(records)
749    }
750
751    pub fn delete_tier2_contribution(
752        &self,
753        category: InspectCategory,
754        relative_file: &Path,
755    ) -> Result<(), InspectCacheError> {
756        let conn = self
757            .conn
758            .lock()
759            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
760        conn.execute(
761            "DELETE FROM tier2_contributions WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
762            params![
763                category.as_str(),
764                self.project_key,
765                relative_file.to_string_lossy().to_string()
766            ],
767        )?;
768        Ok(())
769    }
770
771    pub fn update_content_fresh_metadata(
772        &self,
773        category: InspectCategory,
774        relative_file: &Path,
775        freshness: &FileFreshness,
776    ) -> Result<(), InspectCacheError> {
777        let conn = self
778            .conn
779            .lock()
780            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
781        conn.execute(
782            "UPDATE tier2_contributions \
783             SET file_mtime_ns = ?4, file_size = ?5, file_hash = ?6 \
784             WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
785            params![
786                category.as_str(),
787                self.project_key,
788                relative_file.to_string_lossy().to_string(),
789                system_time_to_ns(freshness.mtime),
790                freshness.size as i64,
791                hash_to_hex(freshness.content_hash),
792            ],
793        )?;
794        Ok(())
795    }
796
797    pub(crate) fn contribution_fingerprint(
798        &self,
799        category: InspectCategory,
800    ) -> Result<(usize, String), InspectCacheError> {
801        let conn = self
802            .conn
803            .lock()
804            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
805        let mut stmt = conn.prepare(
806            "SELECT file_path, file_mtime_ns, file_size \
807             FROM tier2_contributions \
808             WHERE category = ?1 AND project_key = ?2 \
809             ORDER BY file_path ASC",
810        )?;
811        let rows = stmt.query_map(params![category.as_str(), self.project_key], |row| {
812            Ok((
813                row.get::<_, String>(0)?,
814                row.get::<_, i64>(1)?,
815                row.get::<_, i64>(2)?,
816            ))
817        })?;
818
819        let mut count = 0usize;
820        let mut hasher = blake3::Hasher::new();
821        for row in rows {
822            let (file_path, mtime_ns, file_size) = row?;
823            count += 1;
824            update_contribution_fingerprint_hash(
825                &mut hasher,
826                &file_path,
827                mtime_ns.max(0),
828                file_size.max(0) as u64,
829            );
830        }
831
832        Ok((count, hasher.finalize().to_hex().to_string()))
833    }
834
835    pub(crate) fn contribution_freshness(
836        &self,
837        category: InspectCategory,
838    ) -> Result<Vec<(PathBuf, FileFreshness)>, InspectCacheError> {
839        let conn = self
840            .conn
841            .lock()
842            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
843        let mut stmt = conn.prepare(
844            "SELECT file_path, file_mtime_ns, file_size, file_hash \
845             FROM tier2_contributions \
846             WHERE category = ?1 AND project_key = ?2 \
847             ORDER BY file_path ASC",
848        )?;
849        let rows = stmt.query_map(params![category.as_str(), self.project_key], |row| {
850            Ok((
851                row.get::<_, String>(0)?,
852                row.get::<_, i64>(1)?,
853                row.get::<_, i64>(2)?,
854                row.get::<_, String>(3)?,
855            ))
856        })?;
857
858        let mut records = Vec::new();
859        for row in rows {
860            let (file_path, mtime_ns, file_size, file_hash) = row?;
861            records.push((
862                PathBuf::from(file_path),
863                FileFreshness {
864                    mtime: ns_to_system_time(mtime_ns),
865                    size: file_size.max(0) as u64,
866                    content_hash: hash_from_hex(&file_hash)?,
867                },
868            ));
869        }
870        Ok(records)
871    }
872
873    pub fn contribution_set_hash(
874        &self,
875        category: InspectCategory,
876    ) -> Result<String, InspectCacheError> {
877        let conn = self
878            .conn
879            .lock()
880            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
881        contribution_set_hash_with_conn(&conn, category, &self.project_key, &self.project_root)
882    }
883
884    pub fn last_full_run(
885        &self,
886        category: InspectCategory,
887    ) -> Result<Option<i64>, InspectCacheError> {
888        let conn = self
889            .conn
890            .lock()
891            .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
892        conn.query_row(
893            "SELECT last_full_run FROM tier2_meta WHERE category = ?1 AND project_key = ?2",
894            params![category.as_str(), self.project_key],
895            |row| row.get::<_, i64>(0),
896        )
897        .optional()
898        .map_err(InspectCacheError::from)
899    }
900
901    pub fn memory_generated_at(&self, key: &JobKey) -> Result<Option<i64>, InspectCacheError> {
902        Ok(self
903            .memory
904            .read()
905            .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
906            .get(key)
907            .map(|entry| entry.generated_at))
908    }
909}
910
911fn configure_connection(conn: &Connection) -> Result<(), InspectCacheError> {
912    conn.pragma_update(None, "journal_mode", "WAL")?;
913    conn.pragma_update(None, "busy_timeout", 5_000)?;
914    Ok(())
915}
916
917fn initialize_schema(conn: &Connection) -> Result<(), InspectCacheError> {
918    conn.execute_batch(
919        "CREATE TABLE IF NOT EXISTS tier2_contributions (
920            category        TEXT NOT NULL,
921            project_key     TEXT NOT NULL,
922            file_path       TEXT NOT NULL,
923            file_mtime_ns   INTEGER NOT NULL,
924            file_size       INTEGER NOT NULL,
925            file_hash       TEXT NOT NULL,
926            contribution    BLOB NOT NULL,
927            generated_at    INTEGER NOT NULL,
928            PRIMARY KEY (category, project_key, file_path)
929        );
930
931        CREATE TABLE IF NOT EXISTS tier2_aggregates (
932            category        TEXT NOT NULL,
933            project_key     TEXT NOT NULL,
934            contribution_set_hash TEXT NOT NULL,
935            aggregate       BLOB NOT NULL,
936            generated_at    INTEGER NOT NULL,
937            PRIMARY KEY (category, project_key)
938        );
939
940        CREATE TABLE IF NOT EXISTS tier2_meta (
941            category        TEXT NOT NULL,
942            project_key     TEXT NOT NULL,
943            last_full_run   INTEGER NOT NULL,
944            PRIMARY KEY (category, project_key)
945        );",
946    )?;
947    Ok(())
948}
949
950fn existing_contribution_paths(
951    conn: &Connection,
952    category: InspectCategory,
953    project_key: &str,
954) -> Result<Vec<String>, InspectCacheError> {
955    let mut stmt = conn.prepare(
956        "SELECT file_path FROM tier2_contributions WHERE category = ?1 AND project_key = ?2",
957    )?;
958    let rows = stmt.query_map(params![category.as_str(), project_key], |row| {
959        row.get::<_, String>(0)
960    })?;
961    rows.collect::<Result<Vec<_>, _>>()
962        .map_err(InspectCacheError::from)
963}
964
965fn contribution_set_hash_with_conn(
966    conn: &Connection,
967    category: InspectCategory,
968    project_key: &str,
969    project_root: &Path,
970) -> Result<String, InspectCacheError> {
971    let mut stmt = conn.prepare(
972        "SELECT file_path, file_hash FROM tier2_contributions \
973         WHERE category = ?1 AND project_key = ?2 ORDER BY file_path ASC",
974    )?;
975    let rows = stmt.query_map(params![category.as_str(), project_key], |row| {
976        Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
977    })?;
978
979    let mut hasher = blake3::Hasher::new();
980    hasher.update(b"tier2-contributions\0");
981    for row in rows {
982        let (file_path, file_hash) = row?;
983        hasher.update(file_path.as_bytes());
984        hasher.update(b"\0");
985        hasher.update(file_hash.as_bytes());
986        hasher.update(b"\0");
987    }
988    update_manifest_fingerprint_hash(&mut hasher, project_root)?;
989    Ok(hasher.finalize().to_hex().to_string())
990}
991
992fn update_manifest_fingerprint_hash(
993    hasher: &mut blake3::Hasher,
994    project_root: &Path,
995) -> Result<(), InspectCacheError> {
996    let manifest_root =
997        fs::canonicalize(project_root).unwrap_or_else(|_| project_root.to_path_buf());
998    hasher.update(b"entry-point-manifests\0");
999    for manifest in super::entry_points::collect_entry_point_manifests(project_root) {
1000        let relative_path = manifest
1001            .strip_prefix(&manifest_root)
1002            .unwrap_or(manifest.as_path())
1003            .to_string_lossy()
1004            .replace('\\', "/");
1005        let content_hash = blake3::hash(&fs::read(&manifest)?);
1006        hasher.update(relative_path.as_bytes());
1007        hasher.update(b"\0");
1008        hasher.update(content_hash.as_bytes());
1009        hasher.update(b"\0");
1010    }
1011    Ok(())
1012}
1013
1014fn update_contribution_fingerprint_hash(
1015    hasher: &mut blake3::Hasher,
1016    relative_path: &str,
1017    mtime_ns: i64,
1018    file_size: u64,
1019) {
1020    hasher.update(relative_path.as_bytes());
1021    hasher.update(&[0]);
1022    hasher.update(&mtime_ns.to_le_bytes());
1023    hasher.update(&file_size.to_le_bytes());
1024}
1025
1026fn relative_string(project_root: &Path, path: &Path) -> String {
1027    path.strip_prefix(project_root)
1028        .unwrap_or(path)
1029        .to_string_lossy()
1030        .to_string()
1031}
1032
1033fn system_time_to_ns(time: SystemTime) -> i64 {
1034    let nanos = time
1035        .duration_since(UNIX_EPOCH)
1036        .unwrap_or_else(|_| Duration::from_secs(0))
1037        .as_nanos();
1038    nanos.min(i64::MAX as u128) as i64
1039}
1040
1041fn ns_to_system_time(value: i64) -> SystemTime {
1042    UNIX_EPOCH + Duration::from_nanos(value.max(0) as u64)
1043}
1044
1045fn hash_to_hex(hash: blake3::Hash) -> String {
1046    hash.to_hex().to_string()
1047}
1048
1049fn hash_from_hex(value: &str) -> Result<blake3::Hash, InspectCacheError> {
1050    if value.len() != 64 {
1051        return Err(InspectCacheError::InvalidHash(value.to_string()));
1052    }
1053    let mut bytes = [0u8; 32];
1054    for (index, chunk) in value.as_bytes().chunks(2).enumerate() {
1055        let hex = std::str::from_utf8(chunk)
1056            .map_err(|_| InspectCacheError::InvalidHash(value.to_string()))?;
1057        bytes[index] = u8::from_str_radix(hex, 16)
1058            .map_err(|_| InspectCacheError::InvalidHash(value.to_string()))?;
1059    }
1060    Ok(blake3::Hash::from_bytes(bytes))
1061}
1062
1063fn unix_seconds_now() -> i64 {
1064    SystemTime::now()
1065        .duration_since(UNIX_EPOCH)
1066        .unwrap_or_else(|_| Duration::from_secs(0))
1067        .as_secs()
1068        .min(i64::MAX as u64) as i64
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use super::*;
1074    use std::cell::Cell;
1075    use std::fs;
1076    use std::path::Path;
1077
1078    fn collect_freshness(path: &Path) -> FileFreshness {
1079        crate::cache_freshness::collect(path).unwrap()
1080    }
1081
1082    #[test]
1083    fn tier1_file_memo_evicts_lru_and_keeps_recent_hits() {
1084        let temp = tempfile::tempdir().unwrap();
1085        let memo = Tier1FileMemo::<usize>::default();
1086        let mut paths = Vec::with_capacity(TIER1_FILE_MEMO_MAX_ENTRIES);
1087
1088        for index in 0..TIER1_FILE_MEMO_MAX_ENTRIES {
1089            let path = temp.path().join(format!("file-{index}.txt"));
1090            fs::write(&path, index.to_string()).unwrap();
1091            let value =
1092                memo.get_or_insert_with(&path, |path| (Some(collect_freshness(path)), index));
1093            assert_eq!(value, index);
1094            paths.push(path);
1095        }
1096
1097        let recent_path = paths[0].clone();
1098        let recent_value = memo.get_or_insert_with(&recent_path, |_| {
1099            panic!("recently inserted entry should hit before eviction")
1100        });
1101        assert_eq!(recent_value, 0);
1102
1103        let evicting_path = temp.path().join("new-file.txt");
1104        fs::write(&evicting_path, "new").unwrap();
1105        let evicting_value = memo.get_or_insert_with(&evicting_path, |path| {
1106            (Some(collect_freshness(path)), TIER1_FILE_MEMO_MAX_ENTRIES)
1107        });
1108        assert_eq!(evicting_value, TIER1_FILE_MEMO_MAX_ENTRIES);
1109
1110        let state = memo.state.lock().unwrap();
1111        assert_eq!(state.entries.len(), TIER1_FILE_MEMO_MAX_ENTRIES);
1112        assert!(state.entries.contains_key(&recent_path));
1113        assert!(state.entries.contains_key(&evicting_path));
1114        assert!(!state.entries.contains_key(&paths[1]));
1115        drop(state);
1116
1117        let recent_value = memo.get_or_insert_with(&recent_path, |_| {
1118            panic!("recently used entry should survive eviction")
1119        });
1120        assert_eq!(recent_value, 0);
1121    }
1122
1123    #[test]
1124    fn tier1_file_memo_repeated_touches_keep_lazy_lru_bounded() {
1125        let temp = tempfile::tempdir().unwrap();
1126        let memo = Tier1FileMemo::<usize>::default();
1127        let mut paths = Vec::with_capacity(TIER1_FILE_MEMO_MAX_ENTRIES);
1128
1129        for index in 0..TIER1_FILE_MEMO_MAX_ENTRIES {
1130            let path = temp.path().join(format!("file-{index}.txt"));
1131            fs::write(&path, index.to_string()).unwrap();
1132            memo.get_or_insert_with(&path, |path| (Some(collect_freshness(path)), index));
1133            paths.push(path);
1134        }
1135
1136        for _ in 0..(TIER1_FILE_MEMO_MAX_ENTRIES * 3) {
1137            let value = memo.get_or_insert_with(&paths[0], |_| {
1138                panic!("hot entry should stay cached while it is repeatedly touched")
1139            });
1140            assert_eq!(value, 0);
1141        }
1142
1143        let evicting_path = temp.path().join("new-file.txt");
1144        fs::write(&evicting_path, "new").unwrap();
1145        memo.get_or_insert_with(&evicting_path, |path| {
1146            (Some(collect_freshness(path)), TIER1_FILE_MEMO_MAX_ENTRIES)
1147        });
1148
1149        let state = memo.state.lock().unwrap();
1150        assert_eq!(state.entries.len(), TIER1_FILE_MEMO_MAX_ENTRIES);
1151        assert!(state.entries.contains_key(&paths[0]));
1152        assert!(state.entries.contains_key(&evicting_path));
1153        assert!(!state.entries.contains_key(&paths[1]));
1154        assert!(
1155            state.lru.len() <= TIER1_FILE_MEMO_MAX_ENTRIES * 2,
1156            "lazy LRU queue should be compacted instead of growing without bound"
1157        );
1158    }
1159
1160    #[test]
1161    fn tier1_file_memo_reuses_fresh_entries_and_rescans_stale_files() {
1162        let temp = tempfile::tempdir().unwrap();
1163        let path = temp.path().join("memo.txt");
1164        fs::write(&path, "first").unwrap();
1165
1166        let memo = Tier1FileMemo::<String>::default();
1167        let scans = Cell::new(0);
1168
1169        let first = memo.get_or_insert_with(&path, |path| {
1170            scans.set(scans.get() + 1);
1171            (Some(collect_freshness(path)), "first scan".to_string())
1172        });
1173        assert_eq!(first, "first scan");
1174        assert_eq!(scans.get(), 1);
1175
1176        let unchanged =
1177            memo.get_or_insert_with(&path, |_| panic!("unchanged file should reuse Tier-1 memo"));
1178        assert_eq!(unchanged, "first scan");
1179        assert_eq!(scans.get(), 1);
1180
1181        fs::write(&path, "changed file contents").unwrap();
1182        let changed = memo.get_or_insert_with(&path, |path| {
1183            scans.set(scans.get() + 1);
1184            (Some(collect_freshness(path)), "second scan".to_string())
1185        });
1186        assert_eq!(changed, "second scan");
1187        assert_eq!(scans.get(), 2);
1188
1189        let fresh_after_rescan = memo.get_or_insert_with(&path, |_| {
1190            panic!("rescanned file should reuse refreshed Tier-1 memo")
1191        });
1192        assert_eq!(fresh_after_rescan, "second scan");
1193        assert_eq!(scans.get(), 2);
1194    }
1195}