Skip to main content

sqry_db/
cache.rs

1//! 64-shard query cache with three-tier invalidation.
2//!
3//! Each shard is a `parking_lot::RwLock<HashMap<QueryKey, CachedResult>>`.
4//! Sharding reduces lock contention: concurrent reads to different query types
5//! never contend. The shard count is configurable (must be a power of two).
6//!
7//! # PN3 raw-byte retention
8//!
9//! For queries with `PERSISTENT = true`, [`ShardedCache::insert_query`]
10//! serialises both the input key and the output value via `postcard` at insert
11//! time and stores the raw bytes alongside the typed value. This makes
12//! streaming the cache to disk in [`iter_persistent`] allocation-free after
13//! the fact — no re-serialisation is needed during save.
14//!
15//! Entries whose serialised size exceeds [`QueryDbConfig::max_entry_size_bytes`]
16//! are **not** stored (soft skip — insert returns `Ok(())`). The caller's
17//! computed value is unaffected because `QueryDb::get` returns the value
18//! directly without going through the cache for that invocation.
19//!
20//! For `PERSISTENT = false` queries the raw bytes are set to empty slices and
21//! [`iter_persistent`] skips them.
22
23use std::any::Any;
24use std::collections::HashMap;
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28use serde::Serialize;
29use smallvec::SmallVec;
30
31use crate::config::QueryDbConfig;
32use crate::dependency::FileDep;
33use crate::input::FileInputStore;
34use crate::persistence::QueryDeps;
35use crate::query::{DerivedQuery, QueryKey};
36
37/// A cache entry yielded by [`ShardedCache::iter_persistent`].
38///
39/// Each entry carries everything the SAVE_PATH unit needs to write the entry to
40/// disk: the stable on-disk discriminator, the serialised key + value, and the
41/// dependency metadata needed to validate the entry on reload.
42///
43/// `Arc<[u8]>` is used instead of `Vec<u8>` so that collecting entries from a
44/// shard (while holding the shard lock) does only cheap reference-count bumps,
45/// not byte copies. The save loop can then release all shard locks before
46/// performing any I/O.
47// SAVE_PATH (the next DAG unit) constructs and consumes this type.
48// Allow dead-code lint until that unit is implemented.
49#[allow(dead_code)]
50pub(crate) struct PersistableEntry {
51    /// Stable on-disk discriminator from [`DerivedQuery::QUERY_TYPE_ID`].
52    pub query_type_id: u32,
53    /// Postcard-serialised form of the query's input key.
54    pub raw_key_bytes: Arc<[u8]>,
55    /// Postcard-serialised form of the query's output value.
56    pub raw_result_bytes: Arc<[u8]>,
57    /// Dependency metadata for three-tier cache validation on reload.
58    pub deps: QueryDeps,
59}
60
61/// A cached query result with dependency metadata for three-tier validation.
62///
63/// # Raw byte fields
64///
65/// `raw_key_bytes` and `raw_result_bytes` are populated at insert time by
66/// [`ShardedCache::insert_query`] for queries with `PERSISTENT = true`. They
67/// hold the postcard-serialised key and value respectively, enabling
68/// [`ShardedCache::iter_persistent`] to stream entries to disk without
69/// acquiring shard locks during I/O.
70///
71/// For `PERSISTENT = false` queries (or entries inserted via the bare
72/// [`ShardedCache::insert`] method) both byte slices are empty.
73///
74/// The typed `value` field is always populated for both persistent and
75/// non-persistent queries; the raw bytes are a read-side convenience only.
76pub struct CachedResult {
77    /// Type-erased query result value.
78    value: Box<dyn Any + Send + Sync>,
79    /// Tier 1: File-level dependencies recorded during execution.
80    ///
81    /// Each entry is `(FileId, revision_at_read_time)`. SmallVec with inline
82    /// capacity 8 covers the common case of queries touching ≤8 files without
83    /// heap allocation.
84    file_deps: SmallVec<[FileDep; 8]>,
85    /// Tier 2: Global edge revision at cache time (None if query doesn't track).
86    edge_revision: Option<u64>,
87    /// Tier 3: Global metadata revision at cache time (None if query doesn't track).
88    metadata_revision: Option<u64>,
89    /// Postcard-serialised input key (empty for non-persistent queries).
90    raw_key_bytes: Arc<[u8]>,
91    /// Postcard-serialised output value (empty for non-persistent queries).
92    raw_result_bytes: Arc<[u8]>,
93    /// Stable on-disk discriminator for the query type.
94    ///
95    /// Zero for entries inserted via the bare [`ShardedCache::insert`] path
96    /// (non-typed, no serialisation). Set to [`DerivedQuery::QUERY_TYPE_ID`]
97    /// by [`ShardedCache::insert_query`].
98    query_type_id: u32,
99    /// Whether this entry is eligible for persistence.
100    ///
101    /// `true` only when inserted via [`ShardedCache::insert_query`] for a
102    /// query whose `PERSISTENT = true` and whose serialised size is within
103    /// [`QueryDbConfig::max_entry_size_bytes`].
104    persistent: bool,
105}
106
107impl CachedResult {
108    /// Creates a new cached result with dependency metadata.
109    ///
110    /// Raw-byte fields are left empty; `persistent` is `false`. Use
111    /// [`ShardedCache::insert_query`] when raw-byte retention is required.
112    pub fn new<V: Clone + Send + Sync + 'static>(
113        value: V,
114        file_deps: SmallVec<[FileDep; 8]>,
115        edge_revision: Option<u64>,
116        metadata_revision: Option<u64>,
117    ) -> Self {
118        let empty: Arc<[u8]> = Arc::from(Vec::<u8>::new().into_boxed_slice());
119        Self {
120            value: Box::new(value),
121            file_deps,
122            edge_revision,
123            metadata_revision,
124            raw_key_bytes: Arc::clone(&empty),
125            raw_result_bytes: empty,
126            query_type_id: 0,
127            persistent: false,
128        }
129    }
130
131    /// Creates a fully-populated cached result for a persistent query.
132    ///
133    /// This is called by [`ShardedCache::insert_query`] after serialisation.
134    fn new_persistent<V: Clone + Send + Sync + 'static>(
135        value: V,
136        file_deps: SmallVec<[FileDep; 8]>,
137        edge_revision: Option<u64>,
138        metadata_revision: Option<u64>,
139        raw_key_bytes: Arc<[u8]>,
140        raw_result_bytes: Arc<[u8]>,
141        query_type_id: u32,
142    ) -> Self {
143        Self {
144            value: Box::new(value),
145            file_deps,
146            edge_revision,
147            metadata_revision,
148            raw_key_bytes,
149            raw_result_bytes,
150            query_type_id,
151            persistent: true,
152        }
153    }
154
155    /// Attempts to downcast the value to the expected type.
156    #[must_use]
157    pub fn downcast_value<V: Clone + 'static>(&self) -> Option<&V> {
158        self.value.downcast_ref::<V>()
159    }
160
161    /// Returns the cached edge revision, if tracked.
162    #[inline]
163    #[must_use]
164    pub fn edge_revision(&self) -> Option<u64> {
165        self.edge_revision
166    }
167
168    /// Returns the cached metadata revision, if tracked.
169    #[inline]
170    #[must_use]
171    pub fn metadata_revision(&self) -> Option<u64> {
172        self.metadata_revision
173    }
174
175    /// Returns the file deps for external validation.
176    #[inline]
177    #[must_use]
178    pub fn file_deps(&self) -> &SmallVec<[FileDep; 8]> {
179        &self.file_deps
180    }
181
182    /// Returns the raw postcard-serialised key bytes.
183    ///
184    /// Empty (`is_empty() == true`) for non-persistent entries.
185    #[inline]
186    #[must_use]
187    pub fn raw_key_bytes(&self) -> &Arc<[u8]> {
188        &self.raw_key_bytes
189    }
190
191    /// Returns the raw postcard-serialised result bytes.
192    ///
193    /// Empty (`is_empty() == true`) for non-persistent entries.
194    #[inline]
195    #[must_use]
196    pub fn raw_result_bytes(&self) -> &Arc<[u8]> {
197        &self.raw_result_bytes
198    }
199
200    /// Returns the stable on-disk query type discriminator.
201    ///
202    /// Zero for non-typed entries (those inserted without [`ShardedCache::insert_query`]).
203    #[inline]
204    #[must_use]
205    pub fn query_type_id(&self) -> u32 {
206        self.query_type_id
207    }
208
209    /// Returns whether this entry is eligible for persistence.
210    #[inline]
211    #[must_use]
212    pub fn persistent(&self) -> bool {
213        self.persistent
214    }
215
216    /// Validates Tier 1 file-level dependencies against the current input store.
217    ///
218    /// Returns `true` if ALL recorded `(FileId, revision)` pairs match the
219    /// current revision in the store. Returns `false` if any file's revision
220    /// has advanced or if a file has been removed from the store.
221    #[must_use]
222    pub fn validate_file_deps(&self, inputs: &FileInputStore) -> bool {
223        self.file_deps
224            .iter()
225            .all(|&(fid, rev)| inputs.revision(fid) == Some(rev))
226    }
227}
228
229// CachedResult cannot derive Debug due to Box<dyn Any>, so manual impl.
230impl std::fmt::Debug for CachedResult {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        f.debug_struct("CachedResult")
233            .field("file_deps", &self.file_deps)
234            .field("edge_revision", &self.edge_revision)
235            .field("metadata_revision", &self.metadata_revision)
236            .field("raw_key_bytes_len", &self.raw_key_bytes.len())
237            .field("raw_result_bytes_len", &self.raw_result_bytes.len())
238            .field("query_type_id", &self.query_type_id)
239            .field("persistent", &self.persistent)
240            .finish_non_exhaustive()
241    }
242}
243
244/// 64-shard query cache.
245///
246/// Each shard protects a `HashMap<QueryKey, CachedResult>` behind a
247/// `parking_lot::RwLock`. The query registry assigns each query type to a
248/// specific shard via `TypeId` hashing, so reads for different query types
249/// never contend on the same lock.
250///
251/// # Raw-byte retention and persistence
252///
253/// Use [`insert_query`] (generic over `Q: DerivedQuery`) to insert entries with
254/// raw-byte retention. The method serialises the key and value at insert time
255/// and enforces the `max_entry_size_bytes` cap from [`QueryDbConfig`].
256///
257/// Use [`iter_persistent`] to stream all persistent entries for the SAVE_PATH unit.
258/// It collects cheap `Arc` clones under each shard lock, then releases the lock
259/// before yielding, so shard locks are never held during I/O.
260pub struct ShardedCache {
261    shards: Vec<RwLock<HashMap<QueryKey, CachedResult>>>,
262}
263
264impl ShardedCache {
265    /// Creates a new cache with the given number of shards.
266    ///
267    /// # Panics
268    ///
269    /// Panics if `shard_count` is zero or not a power of two.
270    #[must_use]
271    pub fn new(shard_count: usize) -> Self {
272        assert!(shard_count > 0 && shard_count.is_power_of_two());
273        let shards = (0..shard_count)
274            .map(|_| RwLock::new(HashMap::new()))
275            .collect();
276        Self { shards }
277    }
278
279    /// Returns the number of shards.
280    #[inline]
281    #[must_use]
282    pub fn shard_count(&self) -> usize {
283        self.shards.len()
284    }
285
286    /// Attempts to retrieve a cached value, validating and downcasting within
287    /// the read lock scope.
288    ///
289    /// The `validate` closure receives the cached result and should return
290    /// `true` if the cache entry is still valid. If valid, the value is
291    /// downcast and cloned. Returns `None` on miss, failed validation, or
292    /// downcast failure.
293    ///
294    /// This design avoids lifetime issues with read guards by performing all
295    /// work within the lock scope.
296    pub fn get_if_valid<V: Clone + 'static>(
297        &self,
298        shard_idx: usize,
299        key: &QueryKey,
300        validate: impl FnOnce(&CachedResult) -> bool,
301    ) -> Option<V> {
302        let shard = self.shards[shard_idx].read();
303        let cached = shard.get(key)?;
304        if !validate(cached) {
305            return None;
306        }
307        cached.downcast_value::<V>().cloned()
308    }
309
310    /// Cold-load rehydration lookup.
311    ///
312    /// Companion to [`get_if_valid`] for entries placed by
313    /// [`ShardedCache::insert_validated`] during PN3's `load_derived`. Those
314    /// entries carry raw `postcard` bytes for the value but only a unit
315    /// placeholder in the typed `Box<dyn Any>` slot, so [`get_if_valid`]'s
316    /// downcast returns `None` on them. This method:
317    ///
318    /// 1. Reads the cached entry; returns `None` on miss.
319    /// 2. Runs `validate` (three-tier revision check). Returns `None` on fail.
320    /// 3. If the entry's raw bytes decode into `V` via `postcard::from_bytes`,
321    ///    **promotes** the entry in place **only if** no other thread has
322    ///    already written a typed value into the slot. Subsequent lookups
323    ///    hit the fast `get_if_valid` path. Returns the decoded value.
324    ///
325    /// The read lock is dropped before the decode so the (moderately-sized)
326    /// postcard work does not block the shard. The write lock for promotion
327    /// is re-acquired briefly.
328    ///
329    /// # Concurrent-update safety
330    ///
331    /// Between this method's read and write phases another thread may have
332    /// recomputed the entry, producing a fresher typed value, OR loaded a
333    /// more recent revision from disk. Blindly overwriting `cached.value`
334    /// in that gap would clobber the newer result with the stale cold-
335    /// loaded one (Codex review finding on commit `a41787179`). The promote
336    /// step therefore:
337    ///
338    /// 1. Re-reads the entry under the write lock.
339    /// 2. Verifies it is still the unit placeholder left by
340    ///    [`insert_validated`] — i.e. `value.downcast_ref::<()>()` succeeds.
341    ///    If anything else lives there (a typed `V`, a different revision's
342    ///    value, or a newly-recomputed result), skip the overwrite.
343    /// 3. Verifies the raw bytes have not changed (e.g., a concurrent
344    ///    `insert_query` with a different value).
345    /// 4. Only then writes the decoded value.
346    ///
347    /// The caller still receives the decoded value it returned up, because
348    /// correctness for this individual call does not depend on the promote
349    /// succeeding — a lost promotion just means the next reader pays the
350    /// decode cost too. The race window is bounded to at most one decode
351    /// per concurrent reader per entry per cold-start session.
352    ///
353    /// # Errors
354    ///
355    /// Returns `None` on miss, validation failure, or decode failure. The
356    /// caller falls back to recomputation on `None` just as it would for a
357    /// cold cache miss.
358    pub fn get_cold_if_valid<V: Clone + Send + Sync + serde::de::DeserializeOwned + 'static>(
359        &self,
360        shard_idx: usize,
361        key: &QueryKey,
362        validate: impl FnOnce(&CachedResult) -> bool,
363    ) -> Option<V> {
364        // Read phase: load the entry, validate tiers, clone out the raw bytes.
365        let raw_bytes_snapshot: Arc<[u8]> = {
366            let shard = self.shards[shard_idx].read();
367            let cached = shard.get(key)?;
368            if !validate(cached) {
369                return None;
370            }
371            // Require the placeholder shape for the cold path. If the entry
372            // already carries a typed V, the warm `get_if_valid` in the
373            // caller path would have taken it; finding a non-placeholder here
374            // implies a concurrent recompute landed between the caller's
375            // warm-probe and this cold-probe. Defer to that result by
376            // returning None so the caller falls through to recomputation
377            // (which will find and reuse the concurrent result on its own
378            // warm probe).
379            cached.value.downcast_ref::<()>()?;
380            Arc::clone(&cached.raw_result_bytes)
381        };
382
383        // Decode outside any lock.
384        let decoded: V = postcard::from_bytes(&raw_bytes_snapshot).ok()?;
385
386        // Write phase: promote the entry only if it is still the same
387        // placeholder we decoded from. Skip the write if anything changed.
388        {
389            let mut shard = self.shards[shard_idx].write();
390            if let Some(cached) = shard.get_mut(key) {
391                let still_placeholder = cached.value.downcast_ref::<()>().is_some();
392                let bytes_unchanged = Arc::ptr_eq(&cached.raw_result_bytes, &raw_bytes_snapshot);
393                if still_placeholder && bytes_unchanged {
394                    cached.value = Box::new(decoded.clone());
395                }
396                // else: a concurrent thread promoted, recomputed, or
397                // replaced the entry — leave their work intact.
398            }
399        }
400
401        Some(decoded)
402    }
403
404    /// Inserts a pre-built [`CachedResult`] into the specified shard.
405    ///
406    /// This is the low-level, non-generic insert used internally and by tests
407    /// that construct [`CachedResult`] directly (without needing raw-byte
408    /// retention). For production call-sites that require serialisation and the
409    /// `max_entry_size_bytes` cap, use [`insert_query`] instead.
410    pub fn insert(&self, shard_idx: usize, key: QueryKey, result: CachedResult) {
411        let mut shard = self.shards[shard_idx].write();
412        shard.insert(key, result);
413    }
414
415    /// Type-aware insert for queries that require raw-byte retention.
416    ///
417    /// Serialises `key` and `value` via `postcard` at insert time. If
418    /// `Q::PERSISTENT = true` and `raw_result_bytes.len() <=
419    /// config.max_entry_size_bytes`, the entry is stored with full persistence
420    /// metadata. If the serialised value exceeds the cap, the entry is **not**
421    /// stored (soft skip — returns `Ok(())`); the caller's computed value is
422    /// unaffected.
423    ///
424    /// For `Q::PERSISTENT = false`, the typed value is stored but raw bytes are
425    /// left empty and `persistent = false` — the entry is invisible to
426    /// [`iter_persistent`].
427    ///
428    /// # Errors
429    ///
430    /// Returns an error only if `postcard` serialisation of the key or value
431    /// fails (should not occur for well-formed types that implement `Serialize`).
432    #[allow(clippy::too_many_arguments)]
433    pub fn insert_query<Q: DerivedQuery>(
434        &self,
435        shard_idx: usize,
436        query_key: QueryKey,
437        key: &Q::Key,
438        value: Q::Value,
439        file_deps: SmallVec<[FileDep; 8]>,
440        edge_revision: Option<u64>,
441        metadata_revision: Option<u64>,
442        config: &QueryDbConfig,
443    ) -> Result<(), postcard::Error>
444    where
445        Q::Key: Serialize,
446        Q::Value: Serialize,
447    {
448        if !Q::PERSISTENT {
449            // Non-persistent query: store the typed value without raw bytes.
450            let result = CachedResult::new(value, file_deps, edge_revision, metadata_revision);
451            let mut shard = self.shards[shard_idx].write();
452            shard.insert(query_key, result);
453            return Ok(());
454        }
455
456        // Serialise key and value.
457        let raw_key = postcard::to_allocvec(key)?;
458        let raw_value = postcard::to_allocvec(&value)?;
459
460        // Enforce the per-entry size cap on the value bytes.
461        if raw_value.len() > config.max_entry_size_bytes {
462            log::debug!(
463                "sqry-db: skipping oversized cache entry (query_type_id={:#06x}, \
464                 raw_result_bytes={} bytes, max={})",
465                Q::QUERY_TYPE_ID,
466                raw_value.len(),
467                config.max_entry_size_bytes,
468            );
469            // Soft skip: do NOT store the entry. The caller's computed value is
470            // still returned by QueryDb::get directly.
471            return Ok(());
472        }
473
474        let raw_key_bytes: Arc<[u8]> = Arc::from(raw_key.into_boxed_slice());
475        let raw_result_bytes: Arc<[u8]> = Arc::from(raw_value.into_boxed_slice());
476
477        let result = CachedResult::new_persistent(
478            value,
479            file_deps,
480            edge_revision,
481            metadata_revision,
482            raw_key_bytes,
483            raw_result_bytes,
484            Q::QUERY_TYPE_ID,
485        );
486
487        let mut shard = self.shards[shard_idx].write();
488        shard.insert(query_key, result);
489        Ok(())
490    }
491
492    /// Removes a specific key from a shard.
493    pub fn remove(&self, shard_idx: usize, key: &QueryKey) -> bool {
494        let mut shard = self.shards[shard_idx].write();
495        shard.remove(key).is_some()
496    }
497
498    /// Clears all entries from all shards.
499    pub fn clear_all(&self) {
500        for shard in &self.shards {
501            shard.write().clear();
502        }
503    }
504
505    /// Returns the total number of cached entries across all shards.
506    #[must_use]
507    pub fn total_entries(&self) -> usize {
508        self.shards.iter().map(|s| s.read().len()).sum()
509    }
510
511    /// Returns per-shard entry counts for diagnostics.
512    #[must_use]
513    pub fn shard_entry_counts(&self) -> Vec<usize> {
514        self.shards.iter().map(|s| s.read().len()).collect()
515    }
516
517    /// Inserts a pre-validated cold-load entry using raw bytes only.
518    ///
519    /// Bypasses the typed deserialise path because at cold-load time only the
520    /// raw postcard bytes from disk are available — no typed value has been
521    /// decoded. The shard is selected by hashing `raw_key_bytes`.
522    ///
523    /// **Infallible by construction**: uses only `HashMap::insert`. Called
524    /// exclusively from [`QueryDb::commit_staged_load`] — the single
525    /// infallible commit boundary in LOAD_PATH.
526    ///
527    /// # Typed vs raw-only entries
528    ///
529    /// Entries inserted here have `value = Box::new(())` (a unit placeholder)
530    /// because the typed value is not available at cold-load time. The cache
531    /// entry is still valid for persistence re-export (raw bytes are
532    /// populated and `persistent = true`) but `get_if_valid::<V>` will
533    /// return `None` for any typed `V` on first access — the cache will
534    /// transparently recompute and replace the entry with a fully-typed
535    /// version at that point.
536    pub(crate) fn insert_validated(
537        &self,
538        query_type_id: u32,
539        raw_key_bytes: Arc<[u8]>,
540        raw_result_bytes: Arc<[u8]>,
541        deps: crate::persistence::QueryDeps,
542    ) {
543        // INVARIANT: all calls below are infallible — see spec §5.7
544        //
545        // Shard selection MUST match warm-path `QueryRegistry::shard_for::<Q>`
546        // so that a later typed `QueryDb::get::<Q>(&key)` probes the same
547        // shard this cold-load insert is writing to. Both paths route by
548        // `u64::from(Q::QUERY_TYPE_ID) & (shard_count - 1)`.
549        use std::hash::{Hash, Hasher};
550        let shard_idx =
551            crate::query::QueryRegistry::shard_for_query_type_id(query_type_id, self.shards.len());
552
553        // Key hash MUST also match warm-path `QueryKey::new::<Q>(&key)` so
554        // that `get::<Q>` finds the rehydrated entry on the FIRST call. Warm
555        // path hashes `postcard::to_allocvec(&key)`; cold-load hashes
556        // `raw_key_bytes`, which IS that same postcard encoding (set by
557        // `insert_query`).
558        let mut hasher = std::hash::DefaultHasher::new();
559        raw_key_bytes.hash(&mut hasher);
560        let hash = hasher.finish();
561
562        let file_deps: SmallVec<[crate::dependency::FileDep; 8]> =
563            deps.file_deps.iter().copied().collect();
564
565        // Store a placeholder unit value — the typed value will be populated
566        // on the first typed cache hit via QueryDb::get.
567        let result = CachedResult {
568            value: Box::new(()),
569            file_deps,
570            edge_revision: deps.edge_revision,
571            metadata_revision: deps.metadata_revision,
572            raw_key_bytes,
573            raw_result_bytes,
574            query_type_id,
575            persistent: true,
576        };
577
578        // QueryKey for the cold-load entry: the same `(type_hash, key_hash)`
579        // pair `QueryKey::new::<Q>(&key)` produces on the warm path —
580        // `type_hash = u64::from(Q::QUERY_TYPE_ID)`, `key_hash =
581        // hash(postcard_encoding(&key))`. This is what fulfils the spec §2
582        // promise that "the first query after a cold start is free": a
583        // typed `QueryDb::get::<Q>(&key)` issued immediately after
584        // `load_derived` finds this entry on its first lookup.
585        //
586        // Typed-value reconstruction note: `insert_validated` stores a unit
587        // placeholder in the `value: Box<dyn Any>` slot because cold-load
588        // does not know `Q` and therefore cannot deserialise
589        // `raw_result_bytes` into `Q::Value`. On the first typed `get::<Q>`,
590        // `get_if_valid` passes the `CachedResult` to the validator — if
591        // the revision tiers pass but the typed downcast fails, the caller
592        // (`QueryDb::get`) decodes `raw_result_bytes` via
593        // `postcard::from_bytes::<Q::Value>` and replaces the placeholder
594        // in-place with the properly typed value. That promotion path is
595        // implemented in the `get::<Q>` body.
596        let shard_key = QueryKey::from_raw(u64::from(query_type_id), hash);
597
598        let mut shard = self.shards[shard_idx].write();
599        shard.insert(shard_key, result);
600    }
601
602    /// Yields all persistent cache entries as [`PersistableEntry`] values.
603    ///
604    /// This is the feed for the SAVE_PATH persistence unit.
605    ///
606    /// # Implementation note
607    ///
608    /// To avoid holding shard locks during I/O, the method collects cheap
609    /// `Arc` clones within each shard lock, then releases the lock before
610    /// yielding from the collected `Vec`. No bytes are copied — the `Arc`
611    /// reference counts are simply incremented.
612    // SAVE_PATH (the next DAG unit) calls this method. Allow dead-code until then.
613    #[allow(dead_code)]
614    pub(crate) fn iter_persistent(&self) -> impl Iterator<Item = PersistableEntry> + '_ {
615        self.shards.iter().flat_map(|shard| {
616            // Take the read lock, collect persistent entries as cheap Arc clones,
617            // then drop the lock before yielding.
618            let guard = shard.read();
619            let entries: Vec<PersistableEntry> = guard
620                .values()
621                .filter(|e| e.persistent)
622                .map(|e| PersistableEntry {
623                    query_type_id: e.query_type_id,
624                    raw_key_bytes: Arc::clone(&e.raw_key_bytes),
625                    raw_result_bytes: Arc::clone(&e.raw_result_bytes),
626                    // Convert SmallVec to Vec for the serialisable QueryDeps type.
627                    deps: QueryDeps {
628                        file_deps: e.file_deps.to_vec(),
629                        edge_revision: e.edge_revision,
630                        metadata_revision: e.metadata_revision,
631                    },
632                })
633                .collect();
634            drop(guard);
635            entries.into_iter()
636        })
637    }
638}
639
640// SAFETY: All mutation is behind `parking_lot::RwLock`.
641unsafe impl Send for ShardedCache {}
642unsafe impl Sync for ShardedCache {}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647    use serde::{Deserialize, Serialize};
648    use sqry_core::graph::unified::concurrent::CodeGraph;
649
650    use sqry_core::graph::unified::file::id::FileId;
651
652    use crate::query::QueryKey;
653
654    // ---------------------------------------------------------------------------
655    // Helpers
656    // ---------------------------------------------------------------------------
657
658    fn empty_snapshot() -> Arc<sqry_core::graph::unified::concurrent::GraphSnapshot> {
659        Arc::new(CodeGraph::new().snapshot())
660    }
661
662    // Test query: persistent, with serialisable key + value.
663    struct PersistentTestQuery;
664
665    #[derive(Serialize, Deserialize, Hash, Eq, PartialEq, Clone)]
666    struct PersistentTestKey(u32);
667
668    impl DerivedQuery for PersistentTestQuery {
669        type Key = PersistentTestKey;
670        type Value = Vec<u8>;
671        const QUERY_TYPE_ID: u32 = 0xF100;
672        const PERSISTENT: bool = true;
673
674        fn execute(
675            _key: &Self::Key,
676            _db: &crate::QueryDb,
677            _snapshot: &sqry_core::graph::unified::concurrent::GraphSnapshot,
678        ) -> Self::Value {
679            vec![]
680        }
681    }
682
683    // Test query: non-persistent.
684    struct NonPersistentTestQuery;
685
686    #[derive(Serialize, Deserialize, Hash, Eq, PartialEq, Clone)]
687    struct NonPersistentTestKey(u32);
688
689    impl DerivedQuery for NonPersistentTestQuery {
690        type Key = NonPersistentTestKey;
691        type Value = String;
692        const QUERY_TYPE_ID: u32 = 0xF101;
693        const PERSISTENT: bool = false;
694
695        fn execute(
696            key: &Self::Key,
697            _db: &crate::QueryDb,
698            _snapshot: &sqry_core::graph::unified::concurrent::GraphSnapshot,
699        ) -> Self::Value {
700            format!("result_{}", key.0)
701        }
702    }
703
704    // ---------------------------------------------------------------------------
705    // Original tests (preserved, using bare CachedResult::new)
706    // ---------------------------------------------------------------------------
707
708    #[test]
709    fn sharded_cache_basic_ops() {
710        let cache = ShardedCache::new(4);
711        assert_eq!(cache.shard_count(), 4);
712        assert_eq!(cache.total_entries(), 0);
713
714        let key = QueryKey::from_raw(42, 0);
715        let result = CachedResult::new(vec![1u32, 2, 3], SmallVec::new(), None, None);
716
717        cache.insert(0, key.clone(), result);
718        assert_eq!(cache.total_entries(), 1);
719
720        let val: Option<Vec<u32>> = cache.get_if_valid(0, &key, |_| true);
721        assert_eq!(val, Some(vec![1u32, 2, 3]));
722
723        assert!(cache.remove(0, &key));
724        assert_eq!(cache.total_entries(), 0);
725    }
726
727    #[test]
728    fn sharded_cache_validation_rejects() {
729        let cache = ShardedCache::new(4);
730        let key = QueryKey::from_raw(1, 0);
731        cache.insert(
732            0,
733            key.clone(),
734            CachedResult::new(42u32, SmallVec::new(), None, None),
735        );
736
737        // Validation fails — should return None
738        let val: Option<u32> = cache.get_if_valid(0, &key, |_| false);
739        assert!(val.is_none());
740
741        // Validation passes
742        let val: Option<u32> = cache.get_if_valid(0, &key, |_| true);
743        assert_eq!(val, Some(42));
744    }
745
746    #[test]
747    fn sharded_cache_clear_all() {
748        let cache = ShardedCache::new(4);
749        for i in 0..4 {
750            let key = QueryKey::from_raw(i as u64, 0);
751            cache.insert(i, key, CachedResult::new(i, SmallVec::new(), None, None));
752        }
753        assert_eq!(cache.total_entries(), 4);
754        cache.clear_all();
755        assert_eq!(cache.total_entries(), 0);
756    }
757
758    #[test]
759    fn cached_result_validates_file_deps() {
760        let mut store = crate::input::FileInputStore::new();
761        store.insert(
762            FileId::new(1),
763            crate::input::FileInput::new(Default::default()),
764        );
765        store.insert(
766            FileId::new(2),
767            crate::input::FileInput::new(Default::default()),
768        );
769
770        let mut deps: SmallVec<[FileDep; 8]> = SmallVec::new();
771        deps.push((FileId::new(1), 1)); // matches initial revision
772        deps.push((FileId::new(2), 1));
773
774        let result = CachedResult::new(42u32, deps, None, None);
775        assert!(result.validate_file_deps(&store));
776
777        // Bump file 1's revision
778        store
779            .get_mut(FileId::new(1))
780            .unwrap()
781            .update(Default::default());
782        assert!(
783            !result.validate_file_deps(&store),
784            "should invalidate after revision bump"
785        );
786    }
787
788    #[test]
789    #[should_panic(expected = "is_power_of_two")]
790    fn sharded_cache_rejects_non_power_of_two() {
791        let _ = ShardedCache::new(3);
792    }
793
794    #[test]
795    fn shard_entry_counts() {
796        let cache = ShardedCache::new(4);
797        cache.insert(
798            0,
799            QueryKey::from_raw(1, 0),
800            CachedResult::new(1u32, SmallVec::new(), None, None),
801        );
802        cache.insert(
803            0,
804            QueryKey::from_raw(2, 0),
805            CachedResult::new(2u32, SmallVec::new(), None, None),
806        );
807        cache.insert(
808            2,
809            QueryKey::from_raw(3, 0),
810            CachedResult::new(3u32, SmallVec::new(), None, None),
811        );
812
813        let counts = cache.shard_entry_counts();
814        assert_eq!(counts, vec![2, 0, 1, 0]);
815    }
816
817    // ---------------------------------------------------------------------------
818    // New tests: insert_query (typed, raw-byte retention)
819    // ---------------------------------------------------------------------------
820
821    fn default_config() -> QueryDbConfig {
822        QueryDbConfig::default()
823    }
824
825    /// CachedResult::new leaves raw bytes empty and persistent=false.
826    #[test]
827    fn cached_result_new_has_empty_raw_bytes() {
828        let r = CachedResult::new(42u32, SmallVec::new(), None, None);
829        assert!(r.raw_key_bytes().is_empty());
830        assert!(r.raw_result_bytes().is_empty());
831        assert_eq!(r.query_type_id(), 0);
832        assert!(!r.persistent());
833    }
834
835    /// A persistent insert stores raw bytes, sets query_type_id, and is
836    /// visible to iter_persistent.
837    #[test]
838    fn insert_query_persistent_stores_raw_bytes() {
839        let cache = ShardedCache::new(4);
840        let cfg = default_config();
841
842        let key = PersistentTestKey(7);
843        let value: Vec<u8> = vec![0xDE, 0xAD, 0xBE, 0xEF];
844        let query_key = QueryKey::new::<PersistentTestQuery>(&key);
845        let shard_idx = {
846            use std::hash::{Hash, Hasher};
847            let tid = std::any::TypeId::of::<PersistentTestQuery>();
848            let mut h = std::collections::hash_map::DefaultHasher::new();
849            tid.hash(&mut h);
850            (h.finish() & 3) as usize
851        };
852
853        cache
854            .insert_query::<PersistentTestQuery>(
855                shard_idx,
856                query_key.clone(),
857                &key,
858                value.clone(),
859                SmallVec::new(),
860                None,
861                None,
862                &cfg,
863            )
864            .expect("insert_query should not fail");
865
866        // Typed value still retrievable.
867        let got: Option<Vec<u8>> = cache.get_if_valid(shard_idx, &query_key, |_| true);
868        assert_eq!(got, Some(value));
869
870        // iter_persistent yields this entry.
871        let persistent: Vec<_> = cache.iter_persistent().collect();
872        assert_eq!(persistent.len(), 1);
873        assert_eq!(persistent[0].query_type_id, 0xF100);
874        assert!(!persistent[0].raw_key_bytes.is_empty());
875        assert!(!persistent[0].raw_result_bytes.is_empty());
876    }
877
878    /// Oversize entries are silently skipped: get returns None (cache miss)
879    /// and iter_persistent yields nothing.
880    #[test]
881    fn insert_query_oversize_entry_skipped() {
882        let cache = ShardedCache::new(4);
883        // Max = 1024 bytes; we'll insert a value that serialises to ~2048 bytes.
884        let cfg = QueryDbConfig::builder().max_entry_size_bytes(1024).build();
885
886        let key = PersistentTestKey(99);
887        // 2048 bytes of payload → postcard adds a varint length header; still > 1024.
888        let value: Vec<u8> = vec![0xABu8; 2048];
889        let query_key = QueryKey::new::<PersistentTestQuery>(&key);
890        let shard_idx = {
891            use std::hash::{Hash, Hasher};
892            let tid = std::any::TypeId::of::<PersistentTestQuery>();
893            let mut h = std::collections::hash_map::DefaultHasher::new();
894            tid.hash(&mut h);
895            (h.finish() & 3) as usize
896        };
897
898        cache
899            .insert_query::<PersistentTestQuery>(
900                shard_idx,
901                query_key.clone(),
902                &key,
903                value,
904                SmallVec::new(),
905                None,
906                None,
907                &cfg,
908            )
909            .expect("oversize soft-skip must still return Ok");
910
911        // Entry was NOT stored → cache miss.
912        let got: Option<Vec<u8>> = cache.get_if_valid(shard_idx, &query_key, |_| true);
913        assert!(got.is_none(), "oversize entry must not be present in cache");
914
915        // iter_persistent must not yield the oversized entry.
916        let persistent: Vec<_> = cache.iter_persistent().collect();
917        assert!(
918            persistent.is_empty(),
919            "oversize entry must not appear in iter_persistent"
920        );
921    }
922
923    /// Non-persistent queries: insert succeeds, get returns value, but
924    /// iter_persistent skips the entry and raw_key_bytes is empty.
925    ///
926    /// This is verified indirectly via insert_query then iter_persistent count.
927    #[test]
928    fn insert_query_non_persistent_invisible_to_iter_persistent() {
929        let cache = ShardedCache::new(4);
930        let cfg = default_config();
931
932        let key = NonPersistentTestKey(42);
933        let value = "hello".to_owned();
934        let query_key = QueryKey::new::<NonPersistentTestQuery>(&key);
935        let shard_idx = {
936            use std::hash::{Hash, Hasher};
937            let tid = std::any::TypeId::of::<NonPersistentTestQuery>();
938            let mut h = std::collections::hash_map::DefaultHasher::new();
939            tid.hash(&mut h);
940            (h.finish() & 3) as usize
941        };
942
943        cache
944            .insert_query::<NonPersistentTestQuery>(
945                shard_idx,
946                query_key.clone(),
947                &key,
948                value.clone(),
949                SmallVec::new(),
950                None,
951                None,
952                &cfg,
953            )
954            .expect("non-persistent insert must succeed");
955
956        // Typed value is retrievable.
957        let got: Option<String> = cache.get_if_valid(shard_idx, &query_key, |_| true);
958        assert_eq!(got, Some(value));
959
960        // Raw bytes on the stored entry are empty.
961        {
962            let shard = cache.shards[shard_idx].read();
963            let entry = shard.get(&query_key).expect("entry must be present");
964            assert!(
965                entry.raw_key_bytes().is_empty(),
966                "non-persistent entry must have empty raw_key_bytes"
967            );
968            assert!(
969                entry.raw_result_bytes().is_empty(),
970                "non-persistent entry must have empty raw_result_bytes"
971            );
972            assert!(
973                !entry.persistent(),
974                "PERSISTENT=false must set persistent=false"
975            );
976        }
977
978        // iter_persistent must skip it.
979        let persistent: Vec<_> = cache.iter_persistent().collect();
980        assert!(
981            persistent.is_empty(),
982            "non-persistent entry must not appear in iter_persistent"
983        );
984    }
985
986    /// Edge-revision and metadata-revision propagate correctly to
987    /// PersistableEntry.deps.
988    #[test]
989    fn insert_query_deps_propagated_to_persistable_entry() {
990        let cache = ShardedCache::new(4);
991        let cfg = default_config();
992
993        let key = PersistentTestKey(1);
994        let value: Vec<u8> = vec![1, 2, 3];
995        let query_key = QueryKey::new::<PersistentTestQuery>(&key);
996        let shard_idx = {
997            use std::hash::{Hash, Hasher};
998            let tid = std::any::TypeId::of::<PersistentTestQuery>();
999            let mut h = std::collections::hash_map::DefaultHasher::new();
1000            tid.hash(&mut h);
1001            (h.finish() & 3) as usize
1002        };
1003
1004        let mut file_deps: SmallVec<[FileDep; 8]> = SmallVec::new();
1005        file_deps.push((FileId::new(10), 5));
1006
1007        cache
1008            .insert_query::<PersistentTestQuery>(
1009                shard_idx,
1010                query_key,
1011                &key,
1012                value,
1013                file_deps,
1014                Some(42),
1015                Some(7),
1016                &cfg,
1017            )
1018            .expect("insert_query should succeed");
1019
1020        let entries: Vec<_> = cache.iter_persistent().collect();
1021        assert_eq!(entries.len(), 1);
1022
1023        let deps = &entries[0].deps;
1024        assert_eq!(deps.file_deps.len(), 1);
1025        assert_eq!(deps.file_deps[0], (FileId::new(10), 5));
1026        assert_eq!(deps.edge_revision, Some(42));
1027        assert_eq!(deps.metadata_revision, Some(7));
1028    }
1029
1030    /// Insert two persistent and one non-persistent; iter_persistent yields
1031    /// exactly two entries.
1032    #[test]
1033    fn iter_persistent_counts_correctly() {
1034        let cache = ShardedCache::new(4);
1035        let cfg = default_config();
1036
1037        // Persistent 1
1038        let k1 = PersistentTestKey(1);
1039        let qk1 = QueryKey::new::<PersistentTestQuery>(&k1);
1040        let si1 = {
1041            use std::hash::{Hash, Hasher};
1042            let tid = std::any::TypeId::of::<PersistentTestQuery>();
1043            let mut h = std::collections::hash_map::DefaultHasher::new();
1044            tid.hash(&mut h);
1045            (h.finish() & 3) as usize
1046        };
1047        cache
1048            .insert_query::<PersistentTestQuery>(
1049                si1,
1050                qk1,
1051                &k1,
1052                vec![1u8],
1053                SmallVec::new(),
1054                None,
1055                None,
1056                &cfg,
1057            )
1058            .unwrap();
1059
1060        // Persistent 2
1061        let k2 = PersistentTestKey(2);
1062        let qk2 = QueryKey::new::<PersistentTestQuery>(&k2);
1063        cache
1064            .insert_query::<PersistentTestQuery>(
1065                si1,
1066                qk2,
1067                &k2,
1068                vec![2u8],
1069                SmallVec::new(),
1070                None,
1071                None,
1072                &cfg,
1073            )
1074            .unwrap();
1075
1076        // Non-persistent
1077        let nk = NonPersistentTestKey(3);
1078        let nqk = QueryKey::new::<NonPersistentTestQuery>(&nk);
1079        let nsi = {
1080            use std::hash::{Hash, Hasher};
1081            let tid = std::any::TypeId::of::<NonPersistentTestQuery>();
1082            let mut h = std::collections::hash_map::DefaultHasher::new();
1083            tid.hash(&mut h);
1084            (h.finish() & 3) as usize
1085        };
1086        cache
1087            .insert_query::<NonPersistentTestQuery>(
1088                nsi,
1089                nqk,
1090                &nk,
1091                "skip".to_owned(),
1092                SmallVec::new(),
1093                None,
1094                None,
1095                &cfg,
1096            )
1097            .unwrap();
1098
1099        let count = cache.iter_persistent().count();
1100        assert_eq!(count, 2, "only the two persistent entries should appear");
1101    }
1102
1103    // Verify that the test helper is used to silence the unused-import warning.
1104    #[test]
1105    fn empty_snapshot_compiles() {
1106        let _ = empty_snapshot();
1107    }
1108}