sqry_db/cache.rs
1//! 64-shard query cache with three-tier invalidation.
2//!
3//! Each shard is a `parking_lot::RwLock<HashMap<QueryKey, CachedResult>>`.
4//! Sharding reduces lock contention: concurrent reads to different query types
5//! never contend. The shard count is configurable (must be a power of two).
6//!
7//! # PN3 raw-byte retention
8//!
9//! For queries with `PERSISTENT = true`, [`ShardedCache::insert_query`]
10//! serialises both the input key and the output value via `postcard` at insert
11//! time and stores the raw bytes alongside the typed value. This makes
12//! streaming the cache to disk in [`iter_persistent`] allocation-free after
13//! the fact — no re-serialisation is needed during save.
14//!
15//! Entries whose serialised size exceeds [`QueryDbConfig::max_entry_size_bytes`]
16//! are **not** stored (soft skip — insert returns `Ok(())`). The caller's
17//! computed value is unaffected because `QueryDb::get` returns the value
18//! directly without going through the cache for that invocation.
19//!
20//! For `PERSISTENT = false` queries the raw bytes are set to empty slices and
21//! [`iter_persistent`] skips them.
22
23use std::any::Any;
24use std::collections::HashMap;
25use std::sync::Arc;
26
27use parking_lot::RwLock;
28use serde::Serialize;
29use smallvec::SmallVec;
30
31use crate::config::QueryDbConfig;
32use crate::dependency::FileDep;
33use crate::input::FileInputStore;
34use crate::persistence::QueryDeps;
35use crate::query::{DerivedQuery, QueryKey};
36
37/// A cache entry yielded by [`ShardedCache::iter_persistent`].
38///
39/// Each entry carries everything the SAVE_PATH unit needs to write the entry to
40/// disk: the stable on-disk discriminator, the serialised key + value, and the
41/// dependency metadata needed to validate the entry on reload.
42///
43/// `Arc<[u8]>` is used instead of `Vec<u8>` so that collecting entries from a
44/// shard (while holding the shard lock) does only cheap reference-count bumps,
45/// not byte copies. The save loop can then release all shard locks before
46/// performing any I/O.
47// SAVE_PATH (the next DAG unit) constructs and consumes this type.
48// Allow dead-code lint until that unit is implemented.
49#[allow(dead_code)]
50pub(crate) struct PersistableEntry {
51 /// Stable on-disk discriminator from [`DerivedQuery::QUERY_TYPE_ID`].
52 pub query_type_id: u32,
53 /// Postcard-serialised form of the query's input key.
54 pub raw_key_bytes: Arc<[u8]>,
55 /// Postcard-serialised form of the query's output value.
56 pub raw_result_bytes: Arc<[u8]>,
57 /// Dependency metadata for three-tier cache validation on reload.
58 pub deps: QueryDeps,
59}
60
61/// A cached query result with dependency metadata for three-tier validation.
62///
63/// # Raw byte fields
64///
65/// `raw_key_bytes` and `raw_result_bytes` are populated at insert time by
66/// [`ShardedCache::insert_query`] for queries with `PERSISTENT = true`. They
67/// hold the postcard-serialised key and value respectively, enabling
68/// [`ShardedCache::iter_persistent`] to stream entries to disk without
69/// acquiring shard locks during I/O.
70///
71/// For `PERSISTENT = false` queries (or entries inserted via the bare
72/// [`ShardedCache::insert`] method) both byte slices are empty.
73///
74/// The typed `value` field is always populated for both persistent and
75/// non-persistent queries; the raw bytes are a read-side convenience only.
76pub struct CachedResult {
77 /// Type-erased query result value.
78 value: Box<dyn Any + Send + Sync>,
79 /// Tier 1: File-level dependencies recorded during execution.
80 ///
81 /// Each entry is `(FileId, revision_at_read_time)`. SmallVec with inline
82 /// capacity 8 covers the common case of queries touching ≤8 files without
83 /// heap allocation.
84 file_deps: SmallVec<[FileDep; 8]>,
85 /// Tier 2: Global edge revision at cache time (None if query doesn't track).
86 edge_revision: Option<u64>,
87 /// Tier 3: Global metadata revision at cache time (None if query doesn't track).
88 metadata_revision: Option<u64>,
89 /// Postcard-serialised input key (empty for non-persistent queries).
90 raw_key_bytes: Arc<[u8]>,
91 /// Postcard-serialised output value (empty for non-persistent queries).
92 raw_result_bytes: Arc<[u8]>,
93 /// Stable on-disk discriminator for the query type.
94 ///
95 /// Zero for entries inserted via the bare [`ShardedCache::insert`] path
96 /// (non-typed, no serialisation). Set to [`DerivedQuery::QUERY_TYPE_ID`]
97 /// by [`ShardedCache::insert_query`].
98 query_type_id: u32,
99 /// Whether this entry is eligible for persistence.
100 ///
101 /// `true` only when inserted via [`ShardedCache::insert_query`] for a
102 /// query whose `PERSISTENT = true` and whose serialised size is within
103 /// [`QueryDbConfig::max_entry_size_bytes`].
104 persistent: bool,
105}
106
107impl CachedResult {
108 /// Creates a new cached result with dependency metadata.
109 ///
110 /// Raw-byte fields are left empty; `persistent` is `false`. Use
111 /// [`ShardedCache::insert_query`] when raw-byte retention is required.
112 pub fn new<V: Clone + Send + Sync + 'static>(
113 value: V,
114 file_deps: SmallVec<[FileDep; 8]>,
115 edge_revision: Option<u64>,
116 metadata_revision: Option<u64>,
117 ) -> Self {
118 let empty: Arc<[u8]> = Arc::from(Vec::<u8>::new().into_boxed_slice());
119 Self {
120 value: Box::new(value),
121 file_deps,
122 edge_revision,
123 metadata_revision,
124 raw_key_bytes: Arc::clone(&empty),
125 raw_result_bytes: empty,
126 query_type_id: 0,
127 persistent: false,
128 }
129 }
130
131 /// Creates a fully-populated cached result for a persistent query.
132 ///
133 /// This is called by [`ShardedCache::insert_query`] after serialisation.
134 fn new_persistent<V: Clone + Send + Sync + 'static>(
135 value: V,
136 file_deps: SmallVec<[FileDep; 8]>,
137 edge_revision: Option<u64>,
138 metadata_revision: Option<u64>,
139 raw_key_bytes: Arc<[u8]>,
140 raw_result_bytes: Arc<[u8]>,
141 query_type_id: u32,
142 ) -> Self {
143 Self {
144 value: Box::new(value),
145 file_deps,
146 edge_revision,
147 metadata_revision,
148 raw_key_bytes,
149 raw_result_bytes,
150 query_type_id,
151 persistent: true,
152 }
153 }
154
155 /// Attempts to downcast the value to the expected type.
156 #[must_use]
157 pub fn downcast_value<V: Clone + 'static>(&self) -> Option<&V> {
158 self.value.downcast_ref::<V>()
159 }
160
161 /// Returns the cached edge revision, if tracked.
162 #[inline]
163 #[must_use]
164 pub fn edge_revision(&self) -> Option<u64> {
165 self.edge_revision
166 }
167
168 /// Returns the cached metadata revision, if tracked.
169 #[inline]
170 #[must_use]
171 pub fn metadata_revision(&self) -> Option<u64> {
172 self.metadata_revision
173 }
174
175 /// Returns the file deps for external validation.
176 #[inline]
177 #[must_use]
178 pub fn file_deps(&self) -> &SmallVec<[FileDep; 8]> {
179 &self.file_deps
180 }
181
182 /// Returns the raw postcard-serialised key bytes.
183 ///
184 /// Empty (`is_empty() == true`) for non-persistent entries.
185 #[inline]
186 #[must_use]
187 pub fn raw_key_bytes(&self) -> &Arc<[u8]> {
188 &self.raw_key_bytes
189 }
190
191 /// Returns the raw postcard-serialised result bytes.
192 ///
193 /// Empty (`is_empty() == true`) for non-persistent entries.
194 #[inline]
195 #[must_use]
196 pub fn raw_result_bytes(&self) -> &Arc<[u8]> {
197 &self.raw_result_bytes
198 }
199
200 /// Returns the stable on-disk query type discriminator.
201 ///
202 /// Zero for non-typed entries (those inserted without [`ShardedCache::insert_query`]).
203 #[inline]
204 #[must_use]
205 pub fn query_type_id(&self) -> u32 {
206 self.query_type_id
207 }
208
209 /// Returns whether this entry is eligible for persistence.
210 #[inline]
211 #[must_use]
212 pub fn persistent(&self) -> bool {
213 self.persistent
214 }
215
216 /// Validates Tier 1 file-level dependencies against the current input store.
217 ///
218 /// Returns `true` if ALL recorded `(FileId, revision)` pairs match the
219 /// current revision in the store. Returns `false` if any file's revision
220 /// has advanced or if a file has been removed from the store.
221 #[must_use]
222 pub fn validate_file_deps(&self, inputs: &FileInputStore) -> bool {
223 self.file_deps
224 .iter()
225 .all(|&(fid, rev)| inputs.revision(fid) == Some(rev))
226 }
227}
228
229// CachedResult cannot derive Debug due to Box<dyn Any>, so manual impl.
230impl std::fmt::Debug for CachedResult {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 f.debug_struct("CachedResult")
233 .field("file_deps", &self.file_deps)
234 .field("edge_revision", &self.edge_revision)
235 .field("metadata_revision", &self.metadata_revision)
236 .field("raw_key_bytes_len", &self.raw_key_bytes.len())
237 .field("raw_result_bytes_len", &self.raw_result_bytes.len())
238 .field("query_type_id", &self.query_type_id)
239 .field("persistent", &self.persistent)
240 .finish_non_exhaustive()
241 }
242}
243
244/// 64-shard query cache.
245///
246/// Each shard protects a `HashMap<QueryKey, CachedResult>` behind a
247/// `parking_lot::RwLock`. The query registry assigns each query type to a
248/// specific shard via `TypeId` hashing, so reads for different query types
249/// never contend on the same lock.
250///
251/// # Raw-byte retention and persistence
252///
253/// Use [`insert_query`] (generic over `Q: DerivedQuery`) to insert entries with
254/// raw-byte retention. The method serialises the key and value at insert time
255/// and enforces the `max_entry_size_bytes` cap from [`QueryDbConfig`].
256///
257/// Use [`iter_persistent`] to stream all persistent entries for the SAVE_PATH unit.
258/// It collects cheap `Arc` clones under each shard lock, then releases the lock
259/// before yielding, so shard locks are never held during I/O.
260pub struct ShardedCache {
261 shards: Vec<RwLock<HashMap<QueryKey, CachedResult>>>,
262}
263
264impl ShardedCache {
265 /// Creates a new cache with the given number of shards.
266 ///
267 /// # Panics
268 ///
269 /// Panics if `shard_count` is zero or not a power of two.
270 #[must_use]
271 pub fn new(shard_count: usize) -> Self {
272 assert!(shard_count > 0 && shard_count.is_power_of_two());
273 let shards = (0..shard_count)
274 .map(|_| RwLock::new(HashMap::new()))
275 .collect();
276 Self { shards }
277 }
278
279 /// Returns the number of shards.
280 #[inline]
281 #[must_use]
282 pub fn shard_count(&self) -> usize {
283 self.shards.len()
284 }
285
286 /// Attempts to retrieve a cached value, validating and downcasting within
287 /// the read lock scope.
288 ///
289 /// The `validate` closure receives the cached result and should return
290 /// `true` if the cache entry is still valid. If valid, the value is
291 /// downcast and cloned. Returns `None` on miss, failed validation, or
292 /// downcast failure.
293 ///
294 /// This design avoids lifetime issues with read guards by performing all
295 /// work within the lock scope.
296 pub fn get_if_valid<V: Clone + 'static>(
297 &self,
298 shard_idx: usize,
299 key: &QueryKey,
300 validate: impl FnOnce(&CachedResult) -> bool,
301 ) -> Option<V> {
302 let shard = self.shards[shard_idx].read();
303 let cached = shard.get(key)?;
304 if !validate(cached) {
305 return None;
306 }
307 cached.downcast_value::<V>().cloned()
308 }
309
310 /// Cold-load rehydration lookup.
311 ///
312 /// Companion to [`get_if_valid`] for entries placed by
313 /// [`ShardedCache::insert_validated`] during PN3's `load_derived`. Those
314 /// entries carry raw `postcard` bytes for the value but only a unit
315 /// placeholder in the typed `Box<dyn Any>` slot, so [`get_if_valid`]'s
316 /// downcast returns `None` on them. This method:
317 ///
318 /// 1. Reads the cached entry; returns `None` on miss.
319 /// 2. Runs `validate` (three-tier revision check). Returns `None` on fail.
320 /// 3. If the entry's raw bytes decode into `V` via `postcard::from_bytes`,
321 /// **promotes** the entry in place **only if** no other thread has
322 /// already written a typed value into the slot. Subsequent lookups
323 /// hit the fast `get_if_valid` path. Returns the decoded value.
324 ///
325 /// The read lock is dropped before the decode so the (moderately-sized)
326 /// postcard work does not block the shard. The write lock for promotion
327 /// is re-acquired briefly.
328 ///
329 /// # Concurrent-update safety
330 ///
331 /// Between this method's read and write phases another thread may have
332 /// recomputed the entry, producing a fresher typed value, OR loaded a
333 /// more recent revision from disk. Blindly overwriting `cached.value`
334 /// in that gap would clobber the newer result with the stale cold-
335 /// loaded one (Codex review finding on commit `a41787179`). The promote
336 /// step therefore:
337 ///
338 /// 1. Re-reads the entry under the write lock.
339 /// 2. Verifies it is still the unit placeholder left by
340 /// [`insert_validated`] — i.e. `value.downcast_ref::<()>()` succeeds.
341 /// If anything else lives there (a typed `V`, a different revision's
342 /// value, or a newly-recomputed result), skip the overwrite.
343 /// 3. Verifies the raw bytes have not changed (e.g., a concurrent
344 /// `insert_query` with a different value).
345 /// 4. Only then writes the decoded value.
346 ///
347 /// The caller still receives the decoded value it returned up, because
348 /// correctness for this individual call does not depend on the promote
349 /// succeeding — a lost promotion just means the next reader pays the
350 /// decode cost too. The race window is bounded to at most one decode
351 /// per concurrent reader per entry per cold-start session.
352 ///
353 /// # Errors
354 ///
355 /// Returns `None` on miss, validation failure, or decode failure. The
356 /// caller falls back to recomputation on `None` just as it would for a
357 /// cold cache miss.
358 pub fn get_cold_if_valid<V: Clone + Send + Sync + serde::de::DeserializeOwned + 'static>(
359 &self,
360 shard_idx: usize,
361 key: &QueryKey,
362 validate: impl FnOnce(&CachedResult) -> bool,
363 ) -> Option<V> {
364 // Read phase: load the entry, validate tiers, clone out the raw bytes.
365 let raw_bytes_snapshot: Arc<[u8]> = {
366 let shard = self.shards[shard_idx].read();
367 let cached = shard.get(key)?;
368 if !validate(cached) {
369 return None;
370 }
371 // Require the placeholder shape for the cold path. If the entry
372 // already carries a typed V, the warm `get_if_valid` in the
373 // caller path would have taken it; finding a non-placeholder here
374 // implies a concurrent recompute landed between the caller's
375 // warm-probe and this cold-probe. Defer to that result by
376 // returning None so the caller falls through to recomputation
377 // (which will find and reuse the concurrent result on its own
378 // warm probe).
379 cached.value.downcast_ref::<()>()?;
380 Arc::clone(&cached.raw_result_bytes)
381 };
382
383 // Decode outside any lock.
384 let decoded: V = postcard::from_bytes(&raw_bytes_snapshot).ok()?;
385
386 // Write phase: promote the entry only if it is still the same
387 // placeholder we decoded from. Skip the write if anything changed.
388 {
389 let mut shard = self.shards[shard_idx].write();
390 if let Some(cached) = shard.get_mut(key) {
391 let still_placeholder = cached.value.downcast_ref::<()>().is_some();
392 let bytes_unchanged = Arc::ptr_eq(&cached.raw_result_bytes, &raw_bytes_snapshot);
393 if still_placeholder && bytes_unchanged {
394 cached.value = Box::new(decoded.clone());
395 }
396 // else: a concurrent thread promoted, recomputed, or
397 // replaced the entry — leave their work intact.
398 }
399 }
400
401 Some(decoded)
402 }
403
404 /// Inserts a pre-built [`CachedResult`] into the specified shard.
405 ///
406 /// This is the low-level, non-generic insert used internally and by tests
407 /// that construct [`CachedResult`] directly (without needing raw-byte
408 /// retention). For production call-sites that require serialisation and the
409 /// `max_entry_size_bytes` cap, use [`insert_query`] instead.
410 pub fn insert(&self, shard_idx: usize, key: QueryKey, result: CachedResult) {
411 let mut shard = self.shards[shard_idx].write();
412 shard.insert(key, result);
413 }
414
415 /// Type-aware insert for queries that require raw-byte retention.
416 ///
417 /// Serialises `key` and `value` via `postcard` at insert time. If
418 /// `Q::PERSISTENT = true` and `raw_result_bytes.len() <=
419 /// config.max_entry_size_bytes`, the entry is stored with full persistence
420 /// metadata. If the serialised value exceeds the cap, the entry is **not**
421 /// stored (soft skip — returns `Ok(())`); the caller's computed value is
422 /// unaffected.
423 ///
424 /// For `Q::PERSISTENT = false`, the typed value is stored but raw bytes are
425 /// left empty and `persistent = false` — the entry is invisible to
426 /// [`iter_persistent`].
427 ///
428 /// # Errors
429 ///
430 /// Returns an error only if `postcard` serialisation of the key or value
431 /// fails (should not occur for well-formed types that implement `Serialize`).
432 #[allow(clippy::too_many_arguments)]
433 pub fn insert_query<Q: DerivedQuery>(
434 &self,
435 shard_idx: usize,
436 query_key: QueryKey,
437 key: &Q::Key,
438 value: Q::Value,
439 file_deps: SmallVec<[FileDep; 8]>,
440 edge_revision: Option<u64>,
441 metadata_revision: Option<u64>,
442 config: &QueryDbConfig,
443 ) -> Result<(), postcard::Error>
444 where
445 Q::Key: Serialize,
446 Q::Value: Serialize,
447 {
448 if !Q::PERSISTENT {
449 // Non-persistent query: store the typed value without raw bytes.
450 let result = CachedResult::new(value, file_deps, edge_revision, metadata_revision);
451 let mut shard = self.shards[shard_idx].write();
452 shard.insert(query_key, result);
453 return Ok(());
454 }
455
456 // Serialise key and value.
457 let raw_key = postcard::to_allocvec(key)?;
458 let raw_value = postcard::to_allocvec(&value)?;
459
460 // Enforce the per-entry size cap on the value bytes.
461 if raw_value.len() > config.max_entry_size_bytes {
462 log::debug!(
463 "sqry-db: skipping oversized cache entry (query_type_id={:#06x}, \
464 raw_result_bytes={} bytes, max={})",
465 Q::QUERY_TYPE_ID,
466 raw_value.len(),
467 config.max_entry_size_bytes,
468 );
469 // Soft skip: do NOT store the entry. The caller's computed value is
470 // still returned by QueryDb::get directly.
471 return Ok(());
472 }
473
474 let raw_key_bytes: Arc<[u8]> = Arc::from(raw_key.into_boxed_slice());
475 let raw_result_bytes: Arc<[u8]> = Arc::from(raw_value.into_boxed_slice());
476
477 let result = CachedResult::new_persistent(
478 value,
479 file_deps,
480 edge_revision,
481 metadata_revision,
482 raw_key_bytes,
483 raw_result_bytes,
484 Q::QUERY_TYPE_ID,
485 );
486
487 let mut shard = self.shards[shard_idx].write();
488 shard.insert(query_key, result);
489 Ok(())
490 }
491
492 /// Removes a specific key from a shard.
493 pub fn remove(&self, shard_idx: usize, key: &QueryKey) -> bool {
494 let mut shard = self.shards[shard_idx].write();
495 shard.remove(key).is_some()
496 }
497
498 /// Clears all entries from all shards.
499 pub fn clear_all(&self) {
500 for shard in &self.shards {
501 shard.write().clear();
502 }
503 }
504
505 /// Returns the total number of cached entries across all shards.
506 #[must_use]
507 pub fn total_entries(&self) -> usize {
508 self.shards.iter().map(|s| s.read().len()).sum()
509 }
510
511 /// Returns per-shard entry counts for diagnostics.
512 #[must_use]
513 pub fn shard_entry_counts(&self) -> Vec<usize> {
514 self.shards.iter().map(|s| s.read().len()).collect()
515 }
516
517 /// Inserts a pre-validated cold-load entry using raw bytes only.
518 ///
519 /// Bypasses the typed deserialise path because at cold-load time only the
520 /// raw postcard bytes from disk are available — no typed value has been
521 /// decoded. The shard is selected by hashing `raw_key_bytes`.
522 ///
523 /// **Infallible by construction**: uses only `HashMap::insert`. Called
524 /// exclusively from [`QueryDb::commit_staged_load`] — the single
525 /// infallible commit boundary in LOAD_PATH.
526 ///
527 /// # Typed vs raw-only entries
528 ///
529 /// Entries inserted here have `value = Box::new(())` (a unit placeholder)
530 /// because the typed value is not available at cold-load time. The cache
531 /// entry is still valid for persistence re-export (raw bytes are
532 /// populated and `persistent = true`) but `get_if_valid::<V>` will
533 /// return `None` for any typed `V` on first access — the cache will
534 /// transparently recompute and replace the entry with a fully-typed
535 /// version at that point.
536 pub(crate) fn insert_validated(
537 &self,
538 query_type_id: u32,
539 raw_key_bytes: Arc<[u8]>,
540 raw_result_bytes: Arc<[u8]>,
541 deps: crate::persistence::QueryDeps,
542 ) {
543 // INVARIANT: all calls below are infallible — see spec §5.7
544 //
545 // Shard selection MUST match warm-path `QueryRegistry::shard_for::<Q>`
546 // so that a later typed `QueryDb::get::<Q>(&key)` probes the same
547 // shard this cold-load insert is writing to. Both paths route by
548 // `u64::from(Q::QUERY_TYPE_ID) & (shard_count - 1)`.
549 use std::hash::{Hash, Hasher};
550 let shard_idx =
551 crate::query::QueryRegistry::shard_for_query_type_id(query_type_id, self.shards.len());
552
553 // Key hash MUST also match warm-path `QueryKey::new::<Q>(&key)` so
554 // that `get::<Q>` finds the rehydrated entry on the FIRST call. Warm
555 // path hashes `postcard::to_allocvec(&key)`; cold-load hashes
556 // `raw_key_bytes`, which IS that same postcard encoding (set by
557 // `insert_query`).
558 let mut hasher = std::hash::DefaultHasher::new();
559 raw_key_bytes.hash(&mut hasher);
560 let hash = hasher.finish();
561
562 let file_deps: SmallVec<[crate::dependency::FileDep; 8]> =
563 deps.file_deps.iter().copied().collect();
564
565 // Store a placeholder unit value — the typed value will be populated
566 // on the first typed cache hit via QueryDb::get.
567 let result = CachedResult {
568 value: Box::new(()),
569 file_deps,
570 edge_revision: deps.edge_revision,
571 metadata_revision: deps.metadata_revision,
572 raw_key_bytes,
573 raw_result_bytes,
574 query_type_id,
575 persistent: true,
576 };
577
578 // QueryKey for the cold-load entry: the same `(type_hash, key_hash)`
579 // pair `QueryKey::new::<Q>(&key)` produces on the warm path —
580 // `type_hash = u64::from(Q::QUERY_TYPE_ID)`, `key_hash =
581 // hash(postcard_encoding(&key))`. This is what fulfils the spec §2
582 // promise that "the first query after a cold start is free": a
583 // typed `QueryDb::get::<Q>(&key)` issued immediately after
584 // `load_derived` finds this entry on its first lookup.
585 //
586 // Typed-value reconstruction note: `insert_validated` stores a unit
587 // placeholder in the `value: Box<dyn Any>` slot because cold-load
588 // does not know `Q` and therefore cannot deserialise
589 // `raw_result_bytes` into `Q::Value`. On the first typed `get::<Q>`,
590 // `get_if_valid` passes the `CachedResult` to the validator — if
591 // the revision tiers pass but the typed downcast fails, the caller
592 // (`QueryDb::get`) decodes `raw_result_bytes` via
593 // `postcard::from_bytes::<Q::Value>` and replaces the placeholder
594 // in-place with the properly typed value. That promotion path is
595 // implemented in the `get::<Q>` body.
596 let shard_key = QueryKey::from_raw(u64::from(query_type_id), hash);
597
598 let mut shard = self.shards[shard_idx].write();
599 shard.insert(shard_key, result);
600 }
601
602 /// Yields all persistent cache entries as [`PersistableEntry`] values.
603 ///
604 /// This is the feed for the SAVE_PATH persistence unit.
605 ///
606 /// # Implementation note
607 ///
608 /// To avoid holding shard locks during I/O, the method collects cheap
609 /// `Arc` clones within each shard lock, then releases the lock before
610 /// yielding from the collected `Vec`. No bytes are copied — the `Arc`
611 /// reference counts are simply incremented.
612 // SAVE_PATH (the next DAG unit) calls this method. Allow dead-code until then.
613 #[allow(dead_code)]
614 pub(crate) fn iter_persistent(&self) -> impl Iterator<Item = PersistableEntry> + '_ {
615 self.shards.iter().flat_map(|shard| {
616 // Take the read lock, collect persistent entries as cheap Arc clones,
617 // then drop the lock before yielding.
618 let guard = shard.read();
619 let entries: Vec<PersistableEntry> = guard
620 .values()
621 .filter(|e| e.persistent)
622 .map(|e| PersistableEntry {
623 query_type_id: e.query_type_id,
624 raw_key_bytes: Arc::clone(&e.raw_key_bytes),
625 raw_result_bytes: Arc::clone(&e.raw_result_bytes),
626 // Convert SmallVec to Vec for the serialisable QueryDeps type.
627 deps: QueryDeps {
628 file_deps: e.file_deps.to_vec(),
629 edge_revision: e.edge_revision,
630 metadata_revision: e.metadata_revision,
631 },
632 })
633 .collect();
634 drop(guard);
635 entries.into_iter()
636 })
637 }
638}
639
640// SAFETY: All mutation is behind `parking_lot::RwLock`.
641unsafe impl Send for ShardedCache {}
642unsafe impl Sync for ShardedCache {}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use serde::{Deserialize, Serialize};
648 use sqry_core::graph::unified::concurrent::CodeGraph;
649
650 use sqry_core::graph::unified::file::id::FileId;
651
652 use crate::query::QueryKey;
653
654 // ---------------------------------------------------------------------------
655 // Helpers
656 // ---------------------------------------------------------------------------
657
658 fn empty_snapshot() -> Arc<sqry_core::graph::unified::concurrent::GraphSnapshot> {
659 Arc::new(CodeGraph::new().snapshot())
660 }
661
662 // Test query: persistent, with serialisable key + value.
663 struct PersistentTestQuery;
664
665 #[derive(Serialize, Deserialize, Hash, Eq, PartialEq, Clone)]
666 struct PersistentTestKey(u32);
667
668 impl DerivedQuery for PersistentTestQuery {
669 type Key = PersistentTestKey;
670 type Value = Vec<u8>;
671 const QUERY_TYPE_ID: u32 = 0xF100;
672 const PERSISTENT: bool = true;
673
674 fn execute(
675 _key: &Self::Key,
676 _db: &crate::QueryDb,
677 _snapshot: &sqry_core::graph::unified::concurrent::GraphSnapshot,
678 ) -> Self::Value {
679 vec![]
680 }
681 }
682
683 // Test query: non-persistent.
684 struct NonPersistentTestQuery;
685
686 #[derive(Serialize, Deserialize, Hash, Eq, PartialEq, Clone)]
687 struct NonPersistentTestKey(u32);
688
689 impl DerivedQuery for NonPersistentTestQuery {
690 type Key = NonPersistentTestKey;
691 type Value = String;
692 const QUERY_TYPE_ID: u32 = 0xF101;
693 const PERSISTENT: bool = false;
694
695 fn execute(
696 key: &Self::Key,
697 _db: &crate::QueryDb,
698 _snapshot: &sqry_core::graph::unified::concurrent::GraphSnapshot,
699 ) -> Self::Value {
700 format!("result_{}", key.0)
701 }
702 }
703
704 // ---------------------------------------------------------------------------
705 // Original tests (preserved, using bare CachedResult::new)
706 // ---------------------------------------------------------------------------
707
708 #[test]
709 fn sharded_cache_basic_ops() {
710 let cache = ShardedCache::new(4);
711 assert_eq!(cache.shard_count(), 4);
712 assert_eq!(cache.total_entries(), 0);
713
714 let key = QueryKey::from_raw(42, 0);
715 let result = CachedResult::new(vec![1u32, 2, 3], SmallVec::new(), None, None);
716
717 cache.insert(0, key.clone(), result);
718 assert_eq!(cache.total_entries(), 1);
719
720 let val: Option<Vec<u32>> = cache.get_if_valid(0, &key, |_| true);
721 assert_eq!(val, Some(vec![1u32, 2, 3]));
722
723 assert!(cache.remove(0, &key));
724 assert_eq!(cache.total_entries(), 0);
725 }
726
727 #[test]
728 fn sharded_cache_validation_rejects() {
729 let cache = ShardedCache::new(4);
730 let key = QueryKey::from_raw(1, 0);
731 cache.insert(
732 0,
733 key.clone(),
734 CachedResult::new(42u32, SmallVec::new(), None, None),
735 );
736
737 // Validation fails — should return None
738 let val: Option<u32> = cache.get_if_valid(0, &key, |_| false);
739 assert!(val.is_none());
740
741 // Validation passes
742 let val: Option<u32> = cache.get_if_valid(0, &key, |_| true);
743 assert_eq!(val, Some(42));
744 }
745
746 #[test]
747 fn sharded_cache_clear_all() {
748 let cache = ShardedCache::new(4);
749 for i in 0..4 {
750 let key = QueryKey::from_raw(i as u64, 0);
751 cache.insert(i, key, CachedResult::new(i, SmallVec::new(), None, None));
752 }
753 assert_eq!(cache.total_entries(), 4);
754 cache.clear_all();
755 assert_eq!(cache.total_entries(), 0);
756 }
757
758 #[test]
759 fn cached_result_validates_file_deps() {
760 let mut store = crate::input::FileInputStore::new();
761 store.insert(
762 FileId::new(1),
763 crate::input::FileInput::new(Default::default()),
764 );
765 store.insert(
766 FileId::new(2),
767 crate::input::FileInput::new(Default::default()),
768 );
769
770 let mut deps: SmallVec<[FileDep; 8]> = SmallVec::new();
771 deps.push((FileId::new(1), 1)); // matches initial revision
772 deps.push((FileId::new(2), 1));
773
774 let result = CachedResult::new(42u32, deps, None, None);
775 assert!(result.validate_file_deps(&store));
776
777 // Bump file 1's revision
778 store
779 .get_mut(FileId::new(1))
780 .unwrap()
781 .update(Default::default());
782 assert!(
783 !result.validate_file_deps(&store),
784 "should invalidate after revision bump"
785 );
786 }
787
788 #[test]
789 #[should_panic(expected = "is_power_of_two")]
790 fn sharded_cache_rejects_non_power_of_two() {
791 let _ = ShardedCache::new(3);
792 }
793
794 #[test]
795 fn shard_entry_counts() {
796 let cache = ShardedCache::new(4);
797 cache.insert(
798 0,
799 QueryKey::from_raw(1, 0),
800 CachedResult::new(1u32, SmallVec::new(), None, None),
801 );
802 cache.insert(
803 0,
804 QueryKey::from_raw(2, 0),
805 CachedResult::new(2u32, SmallVec::new(), None, None),
806 );
807 cache.insert(
808 2,
809 QueryKey::from_raw(3, 0),
810 CachedResult::new(3u32, SmallVec::new(), None, None),
811 );
812
813 let counts = cache.shard_entry_counts();
814 assert_eq!(counts, vec![2, 0, 1, 0]);
815 }
816
817 // ---------------------------------------------------------------------------
818 // New tests: insert_query (typed, raw-byte retention)
819 // ---------------------------------------------------------------------------
820
821 fn default_config() -> QueryDbConfig {
822 QueryDbConfig::default()
823 }
824
825 /// CachedResult::new leaves raw bytes empty and persistent=false.
826 #[test]
827 fn cached_result_new_has_empty_raw_bytes() {
828 let r = CachedResult::new(42u32, SmallVec::new(), None, None);
829 assert!(r.raw_key_bytes().is_empty());
830 assert!(r.raw_result_bytes().is_empty());
831 assert_eq!(r.query_type_id(), 0);
832 assert!(!r.persistent());
833 }
834
835 /// A persistent insert stores raw bytes, sets query_type_id, and is
836 /// visible to iter_persistent.
837 #[test]
838 fn insert_query_persistent_stores_raw_bytes() {
839 let cache = ShardedCache::new(4);
840 let cfg = default_config();
841
842 let key = PersistentTestKey(7);
843 let value: Vec<u8> = vec![0xDE, 0xAD, 0xBE, 0xEF];
844 let query_key = QueryKey::new::<PersistentTestQuery>(&key);
845 let shard_idx = {
846 use std::hash::{Hash, Hasher};
847 let tid = std::any::TypeId::of::<PersistentTestQuery>();
848 let mut h = std::collections::hash_map::DefaultHasher::new();
849 tid.hash(&mut h);
850 (h.finish() & 3) as usize
851 };
852
853 cache
854 .insert_query::<PersistentTestQuery>(
855 shard_idx,
856 query_key.clone(),
857 &key,
858 value.clone(),
859 SmallVec::new(),
860 None,
861 None,
862 &cfg,
863 )
864 .expect("insert_query should not fail");
865
866 // Typed value still retrievable.
867 let got: Option<Vec<u8>> = cache.get_if_valid(shard_idx, &query_key, |_| true);
868 assert_eq!(got, Some(value));
869
870 // iter_persistent yields this entry.
871 let persistent: Vec<_> = cache.iter_persistent().collect();
872 assert_eq!(persistent.len(), 1);
873 assert_eq!(persistent[0].query_type_id, 0xF100);
874 assert!(!persistent[0].raw_key_bytes.is_empty());
875 assert!(!persistent[0].raw_result_bytes.is_empty());
876 }
877
878 /// Oversize entries are silently skipped: get returns None (cache miss)
879 /// and iter_persistent yields nothing.
880 #[test]
881 fn insert_query_oversize_entry_skipped() {
882 let cache = ShardedCache::new(4);
883 // Max = 1024 bytes; we'll insert a value that serialises to ~2048 bytes.
884 let cfg = QueryDbConfig::builder().max_entry_size_bytes(1024).build();
885
886 let key = PersistentTestKey(99);
887 // 2048 bytes of payload → postcard adds a varint length header; still > 1024.
888 let value: Vec<u8> = vec![0xABu8; 2048];
889 let query_key = QueryKey::new::<PersistentTestQuery>(&key);
890 let shard_idx = {
891 use std::hash::{Hash, Hasher};
892 let tid = std::any::TypeId::of::<PersistentTestQuery>();
893 let mut h = std::collections::hash_map::DefaultHasher::new();
894 tid.hash(&mut h);
895 (h.finish() & 3) as usize
896 };
897
898 cache
899 .insert_query::<PersistentTestQuery>(
900 shard_idx,
901 query_key.clone(),
902 &key,
903 value,
904 SmallVec::new(),
905 None,
906 None,
907 &cfg,
908 )
909 .expect("oversize soft-skip must still return Ok");
910
911 // Entry was NOT stored → cache miss.
912 let got: Option<Vec<u8>> = cache.get_if_valid(shard_idx, &query_key, |_| true);
913 assert!(got.is_none(), "oversize entry must not be present in cache");
914
915 // iter_persistent must not yield the oversized entry.
916 let persistent: Vec<_> = cache.iter_persistent().collect();
917 assert!(
918 persistent.is_empty(),
919 "oversize entry must not appear in iter_persistent"
920 );
921 }
922
923 /// Non-persistent queries: insert succeeds, get returns value, but
924 /// iter_persistent skips the entry and raw_key_bytes is empty.
925 ///
926 /// This is verified indirectly via insert_query then iter_persistent count.
927 #[test]
928 fn insert_query_non_persistent_invisible_to_iter_persistent() {
929 let cache = ShardedCache::new(4);
930 let cfg = default_config();
931
932 let key = NonPersistentTestKey(42);
933 let value = "hello".to_owned();
934 let query_key = QueryKey::new::<NonPersistentTestQuery>(&key);
935 let shard_idx = {
936 use std::hash::{Hash, Hasher};
937 let tid = std::any::TypeId::of::<NonPersistentTestQuery>();
938 let mut h = std::collections::hash_map::DefaultHasher::new();
939 tid.hash(&mut h);
940 (h.finish() & 3) as usize
941 };
942
943 cache
944 .insert_query::<NonPersistentTestQuery>(
945 shard_idx,
946 query_key.clone(),
947 &key,
948 value.clone(),
949 SmallVec::new(),
950 None,
951 None,
952 &cfg,
953 )
954 .expect("non-persistent insert must succeed");
955
956 // Typed value is retrievable.
957 let got: Option<String> = cache.get_if_valid(shard_idx, &query_key, |_| true);
958 assert_eq!(got, Some(value));
959
960 // Raw bytes on the stored entry are empty.
961 {
962 let shard = cache.shards[shard_idx].read();
963 let entry = shard.get(&query_key).expect("entry must be present");
964 assert!(
965 entry.raw_key_bytes().is_empty(),
966 "non-persistent entry must have empty raw_key_bytes"
967 );
968 assert!(
969 entry.raw_result_bytes().is_empty(),
970 "non-persistent entry must have empty raw_result_bytes"
971 );
972 assert!(
973 !entry.persistent(),
974 "PERSISTENT=false must set persistent=false"
975 );
976 }
977
978 // iter_persistent must skip it.
979 let persistent: Vec<_> = cache.iter_persistent().collect();
980 assert!(
981 persistent.is_empty(),
982 "non-persistent entry must not appear in iter_persistent"
983 );
984 }
985
986 /// Edge-revision and metadata-revision propagate correctly to
987 /// PersistableEntry.deps.
988 #[test]
989 fn insert_query_deps_propagated_to_persistable_entry() {
990 let cache = ShardedCache::new(4);
991 let cfg = default_config();
992
993 let key = PersistentTestKey(1);
994 let value: Vec<u8> = vec![1, 2, 3];
995 let query_key = QueryKey::new::<PersistentTestQuery>(&key);
996 let shard_idx = {
997 use std::hash::{Hash, Hasher};
998 let tid = std::any::TypeId::of::<PersistentTestQuery>();
999 let mut h = std::collections::hash_map::DefaultHasher::new();
1000 tid.hash(&mut h);
1001 (h.finish() & 3) as usize
1002 };
1003
1004 let mut file_deps: SmallVec<[FileDep; 8]> = SmallVec::new();
1005 file_deps.push((FileId::new(10), 5));
1006
1007 cache
1008 .insert_query::<PersistentTestQuery>(
1009 shard_idx,
1010 query_key,
1011 &key,
1012 value,
1013 file_deps,
1014 Some(42),
1015 Some(7),
1016 &cfg,
1017 )
1018 .expect("insert_query should succeed");
1019
1020 let entries: Vec<_> = cache.iter_persistent().collect();
1021 assert_eq!(entries.len(), 1);
1022
1023 let deps = &entries[0].deps;
1024 assert_eq!(deps.file_deps.len(), 1);
1025 assert_eq!(deps.file_deps[0], (FileId::new(10), 5));
1026 assert_eq!(deps.edge_revision, Some(42));
1027 assert_eq!(deps.metadata_revision, Some(7));
1028 }
1029
1030 /// Insert two persistent and one non-persistent; iter_persistent yields
1031 /// exactly two entries.
1032 #[test]
1033 fn iter_persistent_counts_correctly() {
1034 let cache = ShardedCache::new(4);
1035 let cfg = default_config();
1036
1037 // Persistent 1
1038 let k1 = PersistentTestKey(1);
1039 let qk1 = QueryKey::new::<PersistentTestQuery>(&k1);
1040 let si1 = {
1041 use std::hash::{Hash, Hasher};
1042 let tid = std::any::TypeId::of::<PersistentTestQuery>();
1043 let mut h = std::collections::hash_map::DefaultHasher::new();
1044 tid.hash(&mut h);
1045 (h.finish() & 3) as usize
1046 };
1047 cache
1048 .insert_query::<PersistentTestQuery>(
1049 si1,
1050 qk1,
1051 &k1,
1052 vec![1u8],
1053 SmallVec::new(),
1054 None,
1055 None,
1056 &cfg,
1057 )
1058 .unwrap();
1059
1060 // Persistent 2
1061 let k2 = PersistentTestKey(2);
1062 let qk2 = QueryKey::new::<PersistentTestQuery>(&k2);
1063 cache
1064 .insert_query::<PersistentTestQuery>(
1065 si1,
1066 qk2,
1067 &k2,
1068 vec![2u8],
1069 SmallVec::new(),
1070 None,
1071 None,
1072 &cfg,
1073 )
1074 .unwrap();
1075
1076 // Non-persistent
1077 let nk = NonPersistentTestKey(3);
1078 let nqk = QueryKey::new::<NonPersistentTestQuery>(&nk);
1079 let nsi = {
1080 use std::hash::{Hash, Hasher};
1081 let tid = std::any::TypeId::of::<NonPersistentTestQuery>();
1082 let mut h = std::collections::hash_map::DefaultHasher::new();
1083 tid.hash(&mut h);
1084 (h.finish() & 3) as usize
1085 };
1086 cache
1087 .insert_query::<NonPersistentTestQuery>(
1088 nsi,
1089 nqk,
1090 &nk,
1091 "skip".to_owned(),
1092 SmallVec::new(),
1093 None,
1094 None,
1095 &cfg,
1096 )
1097 .unwrap();
1098
1099 let count = cache.iter_persistent().count();
1100 assert_eq!(count, 2, "only the two persistent entries should appear");
1101 }
1102
1103 // Verify that the test helper is used to silence the unused-import warning.
1104 #[test]
1105 fn empty_snapshot_compiles() {
1106 let _ = empty_snapshot();
1107 }
1108}