1use std::collections::{BTreeSet, HashMap, VecDeque};
2use std::fmt;
3use std::fs;
4use std::path::{Path, PathBuf};
5use std::sync::{Mutex, RwLock};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use rusqlite::{params, Connection, OptionalExtension};
9
10use crate::cache_freshness::{FileFreshness, FreshnessVerdict};
11
12use super::job::{FileContribution, InspectCategory, JobKey};
13
14#[derive(Debug, Default)]
15pub(crate) struct Tier2ContributionUpdates {
16 pub upserts: Vec<FileContribution>,
17 pub deletes: Vec<PathBuf>,
18 pub metadata_updates: Vec<(PathBuf, FileFreshness)>,
19}
20
21#[derive(Debug)]
22pub enum InspectCacheError {
23 Io(std::io::Error),
24 Sql(rusqlite::Error),
25 Json(serde_json::Error),
26 LockPoisoned(&'static str),
27 InvalidHash(String),
28}
29
30impl fmt::Display for InspectCacheError {
31 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 InspectCacheError::Io(error) => write!(formatter, "inspect cache io error: {error}"),
34 InspectCacheError::Sql(error) => {
35 write!(formatter, "inspect cache sqlite error: {error}")
36 }
37 InspectCacheError::Json(error) => {
38 write!(formatter, "inspect cache json error: {error}")
39 }
40 InspectCacheError::LockPoisoned(name) => {
41 write!(formatter, "inspect cache lock poisoned: {name}")
42 }
43 InspectCacheError::InvalidHash(hash) => {
44 write!(formatter, "inspect cache invalid blake3 hash: {hash}")
45 }
46 }
47 }
48}
49
50impl std::error::Error for InspectCacheError {}
51
52impl From<std::io::Error> for InspectCacheError {
53 fn from(error: std::io::Error) -> Self {
54 Self::Io(error)
55 }
56}
57
58impl From<rusqlite::Error> for InspectCacheError {
59 fn from(error: rusqlite::Error) -> Self {
60 Self::Sql(error)
61 }
62}
63
64impl From<serde_json::Error> for InspectCacheError {
65 fn from(error: serde_json::Error) -> Self {
66 Self::Json(error)
67 }
68}
69
70#[derive(Debug, Clone)]
71pub struct ContributionRecord {
72 pub category: InspectCategory,
73 pub file_path: PathBuf,
74 pub freshness: FileFreshness,
75 pub contribution: serde_json::Value,
76}
77
78#[derive(Debug, Clone)]
79struct MemoryAggregate {
80 payload: serde_json::Value,
81 generated_at: i64,
82 contribution_set_hash: Option<String>,
83}
84
85const TIER1_FILE_MEMO_MAX_ENTRIES: usize = 4_096;
86
87#[derive(Debug, Clone)]
88struct Tier1MemoEntry<T> {
89 freshness: FileFreshness,
90 value: T,
91 generation: u64,
92}
93
94#[derive(Debug, Clone)]
95struct LruNode {
96 path: PathBuf,
97 generation: u64,
98}
99
100#[derive(Debug)]
101struct Tier1MemoState<T> {
102 entries: HashMap<PathBuf, Tier1MemoEntry<T>>,
103 lru: VecDeque<LruNode>,
104 next_generation: u64,
105}
106
107impl<T> Default for Tier1MemoState<T> {
108 fn default() -> Self {
109 Self {
110 entries: HashMap::new(),
111 lru: VecDeque::new(),
112 next_generation: 0,
113 }
114 }
115}
116
117impl<T> Tier1MemoState<T> {
118 fn insert(&mut self, path: PathBuf, mut entry: Tier1MemoEntry<T>) {
119 let generation = self.allocate_generation();
120 entry.generation = generation;
121 self.entries.insert(path.clone(), entry);
122 self.lru.push_back(LruNode { path, generation });
123 self.compact_lru_if_needed();
124 self.evict_lru();
125 }
126
127 fn remove(&mut self, path: &Path) {
128 self.entries.remove(path);
129 self.compact_lru_if_needed();
130 }
131
132 fn touch(&mut self, path: &Path) {
133 if !self.entries.contains_key(path) {
134 return;
135 }
136
137 let generation = self.allocate_generation();
138 if let Some(entry) = self.entries.get_mut(path) {
139 entry.generation = generation;
140 self.lru.push_back(LruNode {
141 path: path.to_path_buf(),
142 generation,
143 });
144 }
145 self.compact_lru_if_needed();
146 }
147
148 fn allocate_generation(&mut self) -> u64 {
149 if self.next_generation == u64::MAX {
150 self.rebuild_lru();
151 }
152 let generation = self.next_generation;
153 self.next_generation += 1;
154 generation
155 }
156
157 fn compact_lru_if_needed(&mut self) {
158 let max_lru_nodes = TIER1_FILE_MEMO_MAX_ENTRIES
159 .saturating_mul(2)
160 .max(self.entries.len());
161 if self.lru.len() > max_lru_nodes {
162 self.rebuild_lru();
163 }
164 }
165
166 fn rebuild_lru(&mut self) {
167 let mut live_nodes = self
168 .entries
169 .iter()
170 .map(|(path, entry)| (entry.generation, path.clone()))
171 .collect::<Vec<_>>();
172 live_nodes.sort_by_key(|(generation, _)| *generation);
173
174 self.lru.clear();
175 for (generation, (_, path)) in live_nodes.into_iter().enumerate() {
176 let generation = generation as u64;
177 if let Some(entry) = self.entries.get_mut(&path) {
178 entry.generation = generation;
179 }
180 self.lru.push_back(LruNode { path, generation });
181 }
182 self.next_generation = self.lru.len() as u64;
183 }
184
185 fn evict_lru(&mut self) {
186 while self.entries.len() > TIER1_FILE_MEMO_MAX_ENTRIES {
187 let Some(node) = self.lru.pop_front() else {
188 break;
189 };
190 if self
191 .entries
192 .get(&node.path)
193 .is_some_and(|entry| entry.generation == node.generation)
194 {
195 self.entries.remove(&node.path);
196 }
197 }
198 self.compact_lru_if_needed();
199 }
200}
201
202#[derive(Debug)]
203pub(crate) struct Tier1FileMemo<T> {
204 state: Mutex<Tier1MemoState<T>>,
205}
206
207impl<T> Default for Tier1FileMemo<T> {
208 fn default() -> Self {
209 Self {
210 state: Mutex::new(Tier1MemoState::default()),
211 }
212 }
213}
214
215impl<T: Clone> Tier1FileMemo<T> {
216 pub(crate) fn get_or_insert_with<F>(&self, path: &Path, scan: F) -> T
217 where
218 F: FnOnce(&Path) -> (Option<FileFreshness>, T),
219 {
220 if let Some(cached) = self.cached_value(path) {
221 return cached;
222 }
223
224 let (freshness, value) = scan(path);
225 if let Ok(mut state) = self.state.lock() {
226 if let Some(freshness) = freshness {
227 state.insert(
228 path.to_path_buf(),
229 Tier1MemoEntry {
230 freshness,
231 value: value.clone(),
232 generation: 0,
233 },
234 );
235 } else {
236 state.remove(path);
237 }
238 }
239 value
240 }
241
242 fn cached_value(&self, path: &Path) -> Option<T> {
243 let mut cached = self
244 .state
245 .lock()
246 .ok()
247 .and_then(|state| state.entries.get(path).cloned())?;
248
249 match crate::cache_freshness::verify_file(path, &cached.freshness) {
250 FreshnessVerdict::HotFresh => {
251 if let Ok(mut state) = self.state.lock() {
252 state.touch(path);
253 }
254 Some(cached.value)
255 }
256 FreshnessVerdict::ContentFresh {
257 new_mtime,
258 new_size,
259 } => {
260 cached.freshness.mtime = new_mtime;
261 cached.freshness.size = new_size;
262 let value = cached.value.clone();
263 if let Ok(mut state) = self.state.lock() {
264 state.insert(path.to_path_buf(), cached);
265 }
266 Some(value)
267 }
268 FreshnessVerdict::Stale => None,
269 FreshnessVerdict::Deleted => {
270 if let Ok(mut state) = self.state.lock() {
271 state.remove(path);
272 }
273 None
274 }
275 }
276 }
277}
278
279#[derive(Debug)]
280pub struct InspectCache {
281 project_root: PathBuf,
282 project_key: String,
283 sqlite_path: PathBuf,
284 conn: Mutex<Connection>,
285 memory: RwLock<HashMap<JobKey, MemoryAggregate>>,
286}
287
288impl InspectCache {
289 pub fn open(inspect_dir: PathBuf, project_root: PathBuf) -> Result<Self, InspectCacheError> {
290 std::fs::create_dir_all(&inspect_dir)?;
291 let project_key = crate::search_index::project_cache_key(&project_root);
292 let sqlite_path = inspect_dir.join(format!("{project_key}.sqlite"));
293 let conn = Connection::open(&sqlite_path)?;
294 configure_connection(&conn)?;
295 initialize_schema(&conn)?;
296 Ok(Self {
297 project_root,
298 project_key,
299 sqlite_path,
300 conn: Mutex::new(conn),
301 memory: RwLock::new(HashMap::new()),
302 })
303 }
304
305 pub fn project_root(&self) -> &Path {
306 &self.project_root
307 }
308
309 pub fn project_key(&self) -> &str {
310 &self.project_key
311 }
312
313 pub fn sqlite_path(&self) -> &Path {
314 &self.sqlite_path
315 }
316
317 pub fn store_aggregated(
318 &self,
319 key: JobKey,
320 payload: serde_json::Value,
321 ) -> Result<(), InspectCacheError> {
322 self.store_memory_aggregate(key, payload, None)
323 }
324
325 fn store_memory_aggregate(
326 &self,
327 key: JobKey,
328 payload: serde_json::Value,
329 contribution_set_hash: Option<String>,
330 ) -> Result<(), InspectCacheError> {
331 self.memory
332 .write()
333 .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
334 .insert(
335 key,
336 MemoryAggregate {
337 payload,
338 generated_at: unix_seconds_now(),
339 contribution_set_hash,
340 },
341 );
342 Ok(())
343 }
344
345 pub fn get_aggregated(
346 &self,
347 key: &JobKey,
348 ) -> Result<Option<serde_json::Value>, InspectCacheError> {
349 if !key.category.is_tier2() {
350 return Ok(self
351 .memory
352 .read()
353 .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
354 .get(key)
355 .map(|entry| entry.payload.clone()));
356 }
357
358 let current_hash = {
359 let conn = self
360 .conn
361 .lock()
362 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
363 contribution_set_hash_with_conn(
364 &conn,
365 key.category,
366 &self.project_key,
367 &self.project_root,
368 )?
369 };
370
371 let memory_entry = {
372 self.memory
373 .read()
374 .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
375 .get(key)
376 .cloned()
377 };
378 if let Some(entry) = memory_entry {
379 if entry.contribution_set_hash.as_deref() == Some(current_hash.as_str()) {
380 return Ok(Some(entry.payload));
381 }
382 self.memory
383 .write()
384 .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
385 .remove(key);
386 }
387
388 let payload = {
389 let conn = self
390 .conn
391 .lock()
392 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
393 conn.query_row(
394 "SELECT aggregate FROM tier2_aggregates \
395 WHERE category = ?1 AND project_key = ?2 AND contribution_set_hash = ?3",
396 params![key.category.as_str(), self.project_key, current_hash],
397 |row| row.get::<_, Vec<u8>>(0),
398 )
399 .optional()?
400 };
401
402 match payload {
403 Some(bytes) => {
404 let value = serde_json::from_slice::<serde_json::Value>(&bytes)?;
405 self.store_memory_aggregate(key.clone(), value.clone(), Some(current_hash))?;
406 Ok(Some(value))
407 }
408 None => Ok(None),
409 }
410 }
411
412 pub fn store_tier2_result(
413 &self,
414 key: JobKey,
415 scanned_files: &[PathBuf],
416 contributions: &[FileContribution],
417 aggregate: serde_json::Value,
418 ) -> Result<(), InspectCacheError> {
419 if !key.category.is_tier2() {
420 self.store_aggregated(key, aggregate)?;
421 return Ok(());
422 }
423
424 let now = unix_seconds_now();
425 let mut conn = self
426 .conn
427 .lock()
428 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
429 let tx = conn.transaction()?;
430
431 let scanned_relative = scanned_files
432 .iter()
433 .map(|path| relative_string(&self.project_root, path))
434 .collect::<BTreeSet<_>>();
435 let existing = existing_contribution_paths(&tx, key.category, &self.project_key)?;
436 for file_path in existing {
437 if !scanned_relative.contains(&file_path) {
438 tx.execute(
439 "DELETE FROM tier2_contributions WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
440 params![key.category.as_str(), self.project_key, file_path],
441 )?;
442 }
443 }
444
445 for contribution in contributions {
446 let file_path = relative_string(&self.project_root, &contribution.file_path);
447 let blob = serde_json::to_vec(&contribution.contribution)?;
448 tx.execute(
449 "INSERT INTO tier2_contributions \
450 (category, project_key, file_path, file_mtime_ns, file_size, file_hash, contribution, generated_at) \
451 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
452 ON CONFLICT(category, project_key, file_path) DO UPDATE SET \
453 file_mtime_ns = excluded.file_mtime_ns, \
454 file_size = excluded.file_size, \
455 file_hash = excluded.file_hash, \
456 contribution = excluded.contribution, \
457 generated_at = excluded.generated_at",
458 params![
459 contribution.category.as_str(),
460 self.project_key,
461 file_path,
462 system_time_to_ns(contribution.freshness.mtime),
463 contribution.freshness.size as i64,
464 hash_to_hex(contribution.freshness.content_hash),
465 blob,
466 now,
467 ],
468 )?;
469 }
470
471 let contribution_set_hash = contribution_set_hash_with_conn(
472 &tx,
473 key.category,
474 &self.project_key,
475 &self.project_root,
476 )?;
477 let aggregate_blob = serde_json::to_vec(&aggregate)?;
478 tx.execute(
479 "INSERT INTO tier2_aggregates \
480 (category, project_key, contribution_set_hash, aggregate, generated_at) \
481 VALUES (?1, ?2, ?3, ?4, ?5) \
482 ON CONFLICT(category, project_key) DO UPDATE SET \
483 contribution_set_hash = excluded.contribution_set_hash, \
484 aggregate = excluded.aggregate, \
485 generated_at = excluded.generated_at",
486 params![
487 key.category.as_str(),
488 self.project_key,
489 contribution_set_hash,
490 aggregate_blob,
491 now,
492 ],
493 )?;
494 tx.execute(
495 "INSERT INTO tier2_meta (category, project_key, last_full_run) VALUES (?1, ?2, ?3) \
496 ON CONFLICT(category, project_key) DO UPDATE SET last_full_run = excluded.last_full_run",
497 params![key.category.as_str(), self.project_key, now],
498 )?;
499 tx.commit()?;
500
501 self.store_memory_aggregate(key, aggregate, Some(contribution_set_hash))
502 }
503
504 pub(crate) fn apply_contribution_updates(
505 &self,
506 category: InspectCategory,
507 updates: Tier2ContributionUpdates,
508 ) -> Result<String, InspectCacheError> {
509 let now = unix_seconds_now();
510 let mut conn = self
511 .conn
512 .lock()
513 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
514 let tx = conn.transaction()?;
515
516 for relative_file in updates.deletes {
517 tx.execute(
518 "DELETE FROM tier2_contributions WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
519 params![
520 category.as_str(),
521 self.project_key,
522 relative_file.to_string_lossy().to_string()
523 ],
524 )?;
525 }
526
527 for (relative_file, freshness) in updates.metadata_updates {
528 tx.execute(
529 "UPDATE tier2_contributions \
530 SET file_mtime_ns = ?4, file_size = ?5, file_hash = ?6 \
531 WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
532 params![
533 category.as_str(),
534 self.project_key,
535 relative_file.to_string_lossy().to_string(),
536 system_time_to_ns(freshness.mtime),
537 freshness.size as i64,
538 hash_to_hex(freshness.content_hash),
539 ],
540 )?;
541 }
542
543 for contribution in updates.upserts {
544 let file_path = relative_string(&self.project_root, &contribution.file_path);
545 let blob = serde_json::to_vec(&contribution.contribution)?;
546 tx.execute(
547 "INSERT INTO tier2_contributions \
548 (category, project_key, file_path, file_mtime_ns, file_size, file_hash, contribution, generated_at) \
549 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
550 ON CONFLICT(category, project_key, file_path) DO UPDATE SET \
551 file_mtime_ns = excluded.file_mtime_ns, \
552 file_size = excluded.file_size, \
553 file_hash = excluded.file_hash, \
554 contribution = excluded.contribution, \
555 generated_at = excluded.generated_at",
556 params![
557 contribution.category.as_str(),
558 self.project_key,
559 file_path,
560 system_time_to_ns(contribution.freshness.mtime),
561 contribution.freshness.size as i64,
562 hash_to_hex(contribution.freshness.content_hash),
563 blob,
564 now,
565 ],
566 )?;
567 }
568
569 let contribution_set_hash =
570 contribution_set_hash_with_conn(&tx, category, &self.project_key, &self.project_root)?;
571 tx.commit()?;
572
573 self.memory
574 .write()
575 .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
576 .remove(&JobKey::for_project_category(category));
577
578 Ok(contribution_set_hash)
579 }
580
581 pub(crate) fn load_aggregate_if_hash_matches(
582 &self,
583 category: InspectCategory,
584 contribution_set_hash: &str,
585 ) -> Result<Option<serde_json::Value>, InspectCacheError> {
586 let payload = {
587 let conn = self
588 .conn
589 .lock()
590 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
591 conn.query_row(
592 "SELECT aggregate FROM tier2_aggregates \
593 WHERE category = ?1 AND project_key = ?2 AND contribution_set_hash = ?3",
594 params![category.as_str(), self.project_key, contribution_set_hash],
595 |row| row.get::<_, Vec<u8>>(0),
596 )
597 .optional()?
598 };
599
600 match payload {
601 Some(bytes) => {
602 let value = serde_json::from_slice::<serde_json::Value>(&bytes)?;
603 self.store_memory_aggregate(
604 JobKey::for_project_category(category),
605 value.clone(),
606 Some(contribution_set_hash.to_string()),
607 )?;
608 Ok(Some(value))
609 }
610 None => Ok(None),
611 }
612 }
613
614 pub(crate) fn latest_aggregate_any_hash(
615 &self,
616 category: InspectCategory,
617 ) -> Result<Option<serde_json::Value>, InspectCacheError> {
618 let payload = {
619 let conn = self
620 .conn
621 .lock()
622 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
623 conn.query_row(
624 "SELECT aggregate FROM tier2_aggregates \
625 WHERE category = ?1 AND project_key = ?2 \
626 ORDER BY generated_at DESC LIMIT 1",
627 params![category.as_str(), self.project_key],
628 |row| row.get::<_, Vec<u8>>(0),
629 )
630 .optional()?
631 };
632
633 match payload {
634 Some(bytes) => serde_json::from_slice::<serde_json::Value>(&bytes)
635 .map(Some)
636 .map_err(InspectCacheError::from),
637 None => Ok(None),
638 }
639 }
640
641 pub(crate) fn touch_tier2_last_full_run(
642 &self,
643 category: InspectCategory,
644 ) -> Result<i64, InspectCacheError> {
645 let mut conn = self
646 .conn
647 .lock()
648 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
649 let tx = conn.transaction()?;
650 let previous = tx
651 .query_row(
652 "SELECT last_full_run FROM tier2_meta WHERE category = ?1 AND project_key = ?2",
653 params![category.as_str(), self.project_key],
654 |row| row.get::<_, i64>(0),
655 )
656 .optional()?;
657 let now = unix_seconds_now();
658 let last_full_run = previous.map_or(now, |previous| now.max(previous.saturating_add(1)));
659 tx.execute(
660 "INSERT INTO tier2_meta (category, project_key, last_full_run) VALUES (?1, ?2, ?3) ON CONFLICT(category, project_key) DO UPDATE SET last_full_run = excluded.last_full_run",
661 params![category.as_str(), self.project_key, last_full_run],
662 )?;
663 tx.commit()?;
664 Ok(last_full_run)
665 }
666
667 pub(crate) fn store_tier2_aggregate(
668 &self,
669 key: JobKey,
670 contribution_set_hash: &str,
671 aggregate: serde_json::Value,
672 ) -> Result<(), InspectCacheError> {
673 if !key.category.is_tier2() {
674 self.store_aggregated(key, aggregate)?;
675 return Ok(());
676 }
677
678 let now = unix_seconds_now();
679 let aggregate_blob = serde_json::to_vec(&aggregate)?;
680 let mut conn = self
681 .conn
682 .lock()
683 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
684 let tx = conn.transaction()?;
685 tx.execute(
686 "INSERT INTO tier2_aggregates \
687 (category, project_key, contribution_set_hash, aggregate, generated_at) \
688 VALUES (?1, ?2, ?3, ?4, ?5) \
689 ON CONFLICT(category, project_key) DO UPDATE SET \
690 contribution_set_hash = excluded.contribution_set_hash, \
691 aggregate = excluded.aggregate, \
692 generated_at = excluded.generated_at",
693 params![
694 key.category.as_str(),
695 self.project_key,
696 contribution_set_hash,
697 aggregate_blob,
698 now,
699 ],
700 )?;
701 tx.execute(
702 "INSERT INTO tier2_meta (category, project_key, last_full_run) VALUES (?1, ?2, ?3) \
703 ON CONFLICT(category, project_key) DO UPDATE SET last_full_run = excluded.last_full_run",
704 params![key.category.as_str(), self.project_key, now],
705 )?;
706 tx.commit()?;
707
708 self.store_memory_aggregate(key, aggregate, Some(contribution_set_hash.to_string()))
709 }
710
711 pub fn load_tier2_contributions(
712 &self,
713 category: InspectCategory,
714 ) -> Result<Vec<ContributionRecord>, InspectCacheError> {
715 let conn = self
716 .conn
717 .lock()
718 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
719 let mut stmt = conn.prepare(
720 "SELECT file_path, file_mtime_ns, file_size, file_hash, contribution \
721 FROM tier2_contributions \
722 WHERE category = ?1 AND project_key = ?2 \
723 ORDER BY file_path ASC",
724 )?;
725 let rows = stmt.query_map(params![category.as_str(), self.project_key], |row| {
726 let file_path: String = row.get(0)?;
727 let mtime_ns: i64 = row.get(1)?;
728 let file_size: i64 = row.get(2)?;
729 let file_hash: String = row.get(3)?;
730 let contribution: Vec<u8> = row.get(4)?;
731 Ok((file_path, mtime_ns, file_size, file_hash, contribution))
732 })?;
733
734 let mut records = Vec::new();
735 for row in rows {
736 let (file_path, mtime_ns, file_size, file_hash, contribution) = row?;
737 records.push(ContributionRecord {
738 category,
739 file_path: PathBuf::from(file_path),
740 freshness: FileFreshness {
741 mtime: ns_to_system_time(mtime_ns),
742 size: file_size.max(0) as u64,
743 content_hash: hash_from_hex(&file_hash)?,
744 },
745 contribution: serde_json::from_slice(&contribution)?,
746 });
747 }
748 Ok(records)
749 }
750
751 pub fn delete_tier2_contribution(
752 &self,
753 category: InspectCategory,
754 relative_file: &Path,
755 ) -> Result<(), InspectCacheError> {
756 let conn = self
757 .conn
758 .lock()
759 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
760 conn.execute(
761 "DELETE FROM tier2_contributions WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
762 params![
763 category.as_str(),
764 self.project_key,
765 relative_file.to_string_lossy().to_string()
766 ],
767 )?;
768 Ok(())
769 }
770
771 pub fn update_content_fresh_metadata(
772 &self,
773 category: InspectCategory,
774 relative_file: &Path,
775 freshness: &FileFreshness,
776 ) -> Result<(), InspectCacheError> {
777 let conn = self
778 .conn
779 .lock()
780 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
781 conn.execute(
782 "UPDATE tier2_contributions \
783 SET file_mtime_ns = ?4, file_size = ?5, file_hash = ?6 \
784 WHERE category = ?1 AND project_key = ?2 AND file_path = ?3",
785 params![
786 category.as_str(),
787 self.project_key,
788 relative_file.to_string_lossy().to_string(),
789 system_time_to_ns(freshness.mtime),
790 freshness.size as i64,
791 hash_to_hex(freshness.content_hash),
792 ],
793 )?;
794 Ok(())
795 }
796
797 pub(crate) fn contribution_fingerprint(
798 &self,
799 category: InspectCategory,
800 ) -> Result<(usize, String), InspectCacheError> {
801 let conn = self
802 .conn
803 .lock()
804 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
805 let mut stmt = conn.prepare(
806 "SELECT file_path, file_mtime_ns, file_size \
807 FROM tier2_contributions \
808 WHERE category = ?1 AND project_key = ?2 \
809 ORDER BY file_path ASC",
810 )?;
811 let rows = stmt.query_map(params![category.as_str(), self.project_key], |row| {
812 Ok((
813 row.get::<_, String>(0)?,
814 row.get::<_, i64>(1)?,
815 row.get::<_, i64>(2)?,
816 ))
817 })?;
818
819 let mut count = 0usize;
820 let mut hasher = blake3::Hasher::new();
821 for row in rows {
822 let (file_path, mtime_ns, file_size) = row?;
823 count += 1;
824 update_contribution_fingerprint_hash(
825 &mut hasher,
826 &file_path,
827 mtime_ns.max(0),
828 file_size.max(0) as u64,
829 );
830 }
831
832 Ok((count, hasher.finalize().to_hex().to_string()))
833 }
834
835 pub(crate) fn contribution_freshness(
836 &self,
837 category: InspectCategory,
838 ) -> Result<Vec<(PathBuf, FileFreshness)>, InspectCacheError> {
839 let conn = self
840 .conn
841 .lock()
842 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
843 let mut stmt = conn.prepare(
844 "SELECT file_path, file_mtime_ns, file_size, file_hash \
845 FROM tier2_contributions \
846 WHERE category = ?1 AND project_key = ?2 \
847 ORDER BY file_path ASC",
848 )?;
849 let rows = stmt.query_map(params![category.as_str(), self.project_key], |row| {
850 Ok((
851 row.get::<_, String>(0)?,
852 row.get::<_, i64>(1)?,
853 row.get::<_, i64>(2)?,
854 row.get::<_, String>(3)?,
855 ))
856 })?;
857
858 let mut records = Vec::new();
859 for row in rows {
860 let (file_path, mtime_ns, file_size, file_hash) = row?;
861 records.push((
862 PathBuf::from(file_path),
863 FileFreshness {
864 mtime: ns_to_system_time(mtime_ns),
865 size: file_size.max(0) as u64,
866 content_hash: hash_from_hex(&file_hash)?,
867 },
868 ));
869 }
870 Ok(records)
871 }
872
873 pub fn contribution_set_hash(
874 &self,
875 category: InspectCategory,
876 ) -> Result<String, InspectCacheError> {
877 let conn = self
878 .conn
879 .lock()
880 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
881 contribution_set_hash_with_conn(&conn, category, &self.project_key, &self.project_root)
882 }
883
884 pub fn last_full_run(
885 &self,
886 category: InspectCategory,
887 ) -> Result<Option<i64>, InspectCacheError> {
888 let conn = self
889 .conn
890 .lock()
891 .map_err(|_| InspectCacheError::LockPoisoned("connection"))?;
892 conn.query_row(
893 "SELECT last_full_run FROM tier2_meta WHERE category = ?1 AND project_key = ?2",
894 params![category.as_str(), self.project_key],
895 |row| row.get::<_, i64>(0),
896 )
897 .optional()
898 .map_err(InspectCacheError::from)
899 }
900
901 pub fn memory_generated_at(&self, key: &JobKey) -> Result<Option<i64>, InspectCacheError> {
902 Ok(self
903 .memory
904 .read()
905 .map_err(|_| InspectCacheError::LockPoisoned("memory"))?
906 .get(key)
907 .map(|entry| entry.generated_at))
908 }
909}
910
911fn configure_connection(conn: &Connection) -> Result<(), InspectCacheError> {
912 conn.pragma_update(None, "journal_mode", "WAL")?;
913 conn.pragma_update(None, "busy_timeout", 5_000)?;
914 Ok(())
915}
916
917fn initialize_schema(conn: &Connection) -> Result<(), InspectCacheError> {
918 conn.execute_batch(
919 "CREATE TABLE IF NOT EXISTS tier2_contributions (
920 category TEXT NOT NULL,
921 project_key TEXT NOT NULL,
922 file_path TEXT NOT NULL,
923 file_mtime_ns INTEGER NOT NULL,
924 file_size INTEGER NOT NULL,
925 file_hash TEXT NOT NULL,
926 contribution BLOB NOT NULL,
927 generated_at INTEGER NOT NULL,
928 PRIMARY KEY (category, project_key, file_path)
929 );
930
931 CREATE TABLE IF NOT EXISTS tier2_aggregates (
932 category TEXT NOT NULL,
933 project_key TEXT NOT NULL,
934 contribution_set_hash TEXT NOT NULL,
935 aggregate BLOB NOT NULL,
936 generated_at INTEGER NOT NULL,
937 PRIMARY KEY (category, project_key)
938 );
939
940 CREATE TABLE IF NOT EXISTS tier2_meta (
941 category TEXT NOT NULL,
942 project_key TEXT NOT NULL,
943 last_full_run INTEGER NOT NULL,
944 PRIMARY KEY (category, project_key)
945 );",
946 )?;
947 Ok(())
948}
949
950fn existing_contribution_paths(
951 conn: &Connection,
952 category: InspectCategory,
953 project_key: &str,
954) -> Result<Vec<String>, InspectCacheError> {
955 let mut stmt = conn.prepare(
956 "SELECT file_path FROM tier2_contributions WHERE category = ?1 AND project_key = ?2",
957 )?;
958 let rows = stmt.query_map(params![category.as_str(), project_key], |row| {
959 row.get::<_, String>(0)
960 })?;
961 rows.collect::<Result<Vec<_>, _>>()
962 .map_err(InspectCacheError::from)
963}
964
965fn contribution_set_hash_with_conn(
966 conn: &Connection,
967 category: InspectCategory,
968 project_key: &str,
969 project_root: &Path,
970) -> Result<String, InspectCacheError> {
971 let mut stmt = conn.prepare(
972 "SELECT file_path, file_hash FROM tier2_contributions \
973 WHERE category = ?1 AND project_key = ?2 ORDER BY file_path ASC",
974 )?;
975 let rows = stmt.query_map(params![category.as_str(), project_key], |row| {
976 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
977 })?;
978
979 let mut hasher = blake3::Hasher::new();
980 hasher.update(b"tier2-contributions\0");
981 for row in rows {
982 let (file_path, file_hash) = row?;
983 hasher.update(file_path.as_bytes());
984 hasher.update(b"\0");
985 hasher.update(file_hash.as_bytes());
986 hasher.update(b"\0");
987 }
988 update_manifest_fingerprint_hash(&mut hasher, project_root)?;
989 Ok(hasher.finalize().to_hex().to_string())
990}
991
992fn update_manifest_fingerprint_hash(
993 hasher: &mut blake3::Hasher,
994 project_root: &Path,
995) -> Result<(), InspectCacheError> {
996 let manifest_root =
997 fs::canonicalize(project_root).unwrap_or_else(|_| project_root.to_path_buf());
998 hasher.update(b"entry-point-manifests\0");
999 for manifest in super::entry_points::collect_entry_point_manifests(project_root) {
1000 let relative_path = manifest
1001 .strip_prefix(&manifest_root)
1002 .unwrap_or(manifest.as_path())
1003 .to_string_lossy()
1004 .replace('\\', "/");
1005 let content_hash = blake3::hash(&fs::read(&manifest)?);
1006 hasher.update(relative_path.as_bytes());
1007 hasher.update(b"\0");
1008 hasher.update(content_hash.as_bytes());
1009 hasher.update(b"\0");
1010 }
1011 Ok(())
1012}
1013
1014fn update_contribution_fingerprint_hash(
1015 hasher: &mut blake3::Hasher,
1016 relative_path: &str,
1017 mtime_ns: i64,
1018 file_size: u64,
1019) {
1020 hasher.update(relative_path.as_bytes());
1021 hasher.update(&[0]);
1022 hasher.update(&mtime_ns.to_le_bytes());
1023 hasher.update(&file_size.to_le_bytes());
1024}
1025
1026fn relative_string(project_root: &Path, path: &Path) -> String {
1027 path.strip_prefix(project_root)
1028 .unwrap_or(path)
1029 .to_string_lossy()
1030 .to_string()
1031}
1032
1033fn system_time_to_ns(time: SystemTime) -> i64 {
1034 let nanos = time
1035 .duration_since(UNIX_EPOCH)
1036 .unwrap_or_else(|_| Duration::from_secs(0))
1037 .as_nanos();
1038 nanos.min(i64::MAX as u128) as i64
1039}
1040
1041fn ns_to_system_time(value: i64) -> SystemTime {
1042 UNIX_EPOCH + Duration::from_nanos(value.max(0) as u64)
1043}
1044
1045fn hash_to_hex(hash: blake3::Hash) -> String {
1046 hash.to_hex().to_string()
1047}
1048
1049fn hash_from_hex(value: &str) -> Result<blake3::Hash, InspectCacheError> {
1050 if value.len() != 64 {
1051 return Err(InspectCacheError::InvalidHash(value.to_string()));
1052 }
1053 let mut bytes = [0u8; 32];
1054 for (index, chunk) in value.as_bytes().chunks(2).enumerate() {
1055 let hex = std::str::from_utf8(chunk)
1056 .map_err(|_| InspectCacheError::InvalidHash(value.to_string()))?;
1057 bytes[index] = u8::from_str_radix(hex, 16)
1058 .map_err(|_| InspectCacheError::InvalidHash(value.to_string()))?;
1059 }
1060 Ok(blake3::Hash::from_bytes(bytes))
1061}
1062
1063fn unix_seconds_now() -> i64 {
1064 SystemTime::now()
1065 .duration_since(UNIX_EPOCH)
1066 .unwrap_or_else(|_| Duration::from_secs(0))
1067 .as_secs()
1068 .min(i64::MAX as u64) as i64
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073 use super::*;
1074 use std::cell::Cell;
1075 use std::fs;
1076 use std::path::Path;
1077
1078 fn collect_freshness(path: &Path) -> FileFreshness {
1079 crate::cache_freshness::collect(path).unwrap()
1080 }
1081
1082 #[test]
1083 fn tier1_file_memo_evicts_lru_and_keeps_recent_hits() {
1084 let temp = tempfile::tempdir().unwrap();
1085 let memo = Tier1FileMemo::<usize>::default();
1086 let mut paths = Vec::with_capacity(TIER1_FILE_MEMO_MAX_ENTRIES);
1087
1088 for index in 0..TIER1_FILE_MEMO_MAX_ENTRIES {
1089 let path = temp.path().join(format!("file-{index}.txt"));
1090 fs::write(&path, index.to_string()).unwrap();
1091 let value =
1092 memo.get_or_insert_with(&path, |path| (Some(collect_freshness(path)), index));
1093 assert_eq!(value, index);
1094 paths.push(path);
1095 }
1096
1097 let recent_path = paths[0].clone();
1098 let recent_value = memo.get_or_insert_with(&recent_path, |_| {
1099 panic!("recently inserted entry should hit before eviction")
1100 });
1101 assert_eq!(recent_value, 0);
1102
1103 let evicting_path = temp.path().join("new-file.txt");
1104 fs::write(&evicting_path, "new").unwrap();
1105 let evicting_value = memo.get_or_insert_with(&evicting_path, |path| {
1106 (Some(collect_freshness(path)), TIER1_FILE_MEMO_MAX_ENTRIES)
1107 });
1108 assert_eq!(evicting_value, TIER1_FILE_MEMO_MAX_ENTRIES);
1109
1110 let state = memo.state.lock().unwrap();
1111 assert_eq!(state.entries.len(), TIER1_FILE_MEMO_MAX_ENTRIES);
1112 assert!(state.entries.contains_key(&recent_path));
1113 assert!(state.entries.contains_key(&evicting_path));
1114 assert!(!state.entries.contains_key(&paths[1]));
1115 drop(state);
1116
1117 let recent_value = memo.get_or_insert_with(&recent_path, |_| {
1118 panic!("recently used entry should survive eviction")
1119 });
1120 assert_eq!(recent_value, 0);
1121 }
1122
1123 #[test]
1124 fn tier1_file_memo_repeated_touches_keep_lazy_lru_bounded() {
1125 let temp = tempfile::tempdir().unwrap();
1126 let memo = Tier1FileMemo::<usize>::default();
1127 let mut paths = Vec::with_capacity(TIER1_FILE_MEMO_MAX_ENTRIES);
1128
1129 for index in 0..TIER1_FILE_MEMO_MAX_ENTRIES {
1130 let path = temp.path().join(format!("file-{index}.txt"));
1131 fs::write(&path, index.to_string()).unwrap();
1132 memo.get_or_insert_with(&path, |path| (Some(collect_freshness(path)), index));
1133 paths.push(path);
1134 }
1135
1136 for _ in 0..(TIER1_FILE_MEMO_MAX_ENTRIES * 3) {
1137 let value = memo.get_or_insert_with(&paths[0], |_| {
1138 panic!("hot entry should stay cached while it is repeatedly touched")
1139 });
1140 assert_eq!(value, 0);
1141 }
1142
1143 let evicting_path = temp.path().join("new-file.txt");
1144 fs::write(&evicting_path, "new").unwrap();
1145 memo.get_or_insert_with(&evicting_path, |path| {
1146 (Some(collect_freshness(path)), TIER1_FILE_MEMO_MAX_ENTRIES)
1147 });
1148
1149 let state = memo.state.lock().unwrap();
1150 assert_eq!(state.entries.len(), TIER1_FILE_MEMO_MAX_ENTRIES);
1151 assert!(state.entries.contains_key(&paths[0]));
1152 assert!(state.entries.contains_key(&evicting_path));
1153 assert!(!state.entries.contains_key(&paths[1]));
1154 assert!(
1155 state.lru.len() <= TIER1_FILE_MEMO_MAX_ENTRIES * 2,
1156 "lazy LRU queue should be compacted instead of growing without bound"
1157 );
1158 }
1159
1160 #[test]
1161 fn tier1_file_memo_reuses_fresh_entries_and_rescans_stale_files() {
1162 let temp = tempfile::tempdir().unwrap();
1163 let path = temp.path().join("memo.txt");
1164 fs::write(&path, "first").unwrap();
1165
1166 let memo = Tier1FileMemo::<String>::default();
1167 let scans = Cell::new(0);
1168
1169 let first = memo.get_or_insert_with(&path, |path| {
1170 scans.set(scans.get() + 1);
1171 (Some(collect_freshness(path)), "first scan".to_string())
1172 });
1173 assert_eq!(first, "first scan");
1174 assert_eq!(scans.get(), 1);
1175
1176 let unchanged =
1177 memo.get_or_insert_with(&path, |_| panic!("unchanged file should reuse Tier-1 memo"));
1178 assert_eq!(unchanged, "first scan");
1179 assert_eq!(scans.get(), 1);
1180
1181 fs::write(&path, "changed file contents").unwrap();
1182 let changed = memo.get_or_insert_with(&path, |path| {
1183 scans.set(scans.get() + 1);
1184 (Some(collect_freshness(path)), "second scan".to_string())
1185 });
1186 assert_eq!(changed, "second scan");
1187 assert_eq!(scans.get(), 2);
1188
1189 let fresh_after_rescan = memo.get_or_insert_with(&path, |_| {
1190 panic!("rescanned file should reuse refreshed Tier-1 memo")
1191 });
1192 assert_eq!(fresh_after_rescan, "second scan");
1193 assert_eq!(scans.get(), 2);
1194 }
1195}