1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
//! # Query Cache Implementation
//!
//! LRU cache with TTL for query results to improve retrieval performance.
//! Target: 2-3x speedup for repeated queries with ≥40% cache hit rate.
//!
//! This module was split from retrieval/cache.rs to meet 500 LOC compliance.
use crate::episode::Episode;
use crate::retrieval::cache::types::{
CacheKey, CacheMetrics, CachedResult, DEFAULT_CACHE_TTL, DEFAULT_MAX_ENTRIES,
};
use lru::LruCache;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::{Duration, Instant};
/// Query cache with LRU eviction and TTL
pub struct QueryCache {
/// LRU cache storage
cache: Arc<RwLock<LruCache<u64, CachedResult>>>,
/// Domain index: maps domain -> set of cache key hashes (`Arc<str>` avoids cloning)
domain_index: Arc<RwLock<HashMap<Arc<str>, HashSet<u64>>>>,
/// Lazy invalidation: set of cache key hashes marked for removal
/// Entries are not removed immediately, but filtered on access
invalidated_hashes: Arc<RwLock<HashSet<u64>>>,
/// Cache metrics
metrics: Arc<RwLock<CacheMetrics>>,
/// Default TTL for new entries
default_ttl: Duration,
/// Maximum number of entries
max_entries: usize,
}
impl QueryCache {
/// Create a new query cache with default settings
#[must_use]
pub fn new() -> Self {
Self::with_capacity_and_ttl(DEFAULT_MAX_ENTRIES, DEFAULT_CACHE_TTL)
}
/// Create a new query cache with custom capacity and TTL
#[must_use]
pub fn with_capacity_and_ttl(capacity: usize, ttl: Duration) -> Self {
// Ensure capacity is at least 1 to create NonZeroUsize
let safe_capacity = capacity.max(1);
let cache = LruCache::new(
NonZeroUsize::new(safe_capacity)
.expect("QueryCache: capacity is guaranteed to be non-zero after max(1)"),
);
let metrics = CacheMetrics {
capacity: safe_capacity,
..Default::default()
};
Self {
cache: Arc::new(RwLock::new(cache)),
domain_index: Arc::new(RwLock::new(HashMap::new())),
invalidated_hashes: Arc::new(RwLock::new(HashSet::new())),
metrics: Arc::new(RwLock::new(metrics)),
default_ttl: ttl,
max_entries: safe_capacity,
}
}
/// Get a cached query result
#[must_use]
pub fn get(&self, key: &CacheKey) -> Option<Vec<Arc<Episode>>> {
let key_hash = key.compute_hash();
// Fast path: Check if this entry is marked for lazy invalidation
{
let invalidated = self.invalidated_hashes.read().expect(
"QueryCache: invalidated_hashes lock poisoned - this indicates a panic in invalidation tracking",
);
if invalidated.contains(&key_hash) {
// Entry is invalidated - count as miss and return None
let mut metrics = self.metrics.write().expect(
"QueryCache: metrics lock poisoned - this indicates a panic in metrics tracking",
);
metrics.misses += 1;
return None;
}
}
let mut cache = self
.cache
.write()
.expect("QueryCache: cache lock poisoned - this indicates a panic in cache code");
let mut metrics = self.metrics.write().expect(
"QueryCache: metrics lock poisoned - this indicates a panic in metrics tracking",
);
// Check if entry exists and is not expired
if let Some(result) = cache.get(&key_hash) {
// Check if expired
if result.is_expired() {
// Remove expired entry
cache.pop(&key_hash);
metrics.misses += 1;
metrics.evictions += 1;
metrics.size = cache.len();
return None;
}
// Cache hit - clone the Arc pointers from the slice
metrics.hits += 1;
let episodes: Vec<Arc<Episode>> = result.episodes.to_vec();
Some(episodes)
} else {
// Cache miss
metrics.misses += 1;
metrics.size = cache.len();
None
}
}
/// Store a query result in the cache
pub fn put(&self, key: CacheKey, episodes: Vec<Arc<Episode>>) {
let key_hash = key.compute_hash();
// Convert Vec<Arc<Episode>> to Arc<[Arc<Episode>]>
// This is zero-copy for the episodes themselves
let episodes_slice: Arc<[Arc<Episode>]> = episodes.into();
let cached_result = CachedResult {
episodes: episodes_slice,
cached_at: Instant::now(),
ttl: self.default_ttl,
};
let mut cache = self
.cache
.write()
.expect("QueryCache: cache lock poisoned - this indicates a panic in cache code");
// Check if this is an update to an existing entry
let was_present = cache.contains(&key_hash);
// Check if cache is at capacity before adding (for eviction tracking)
let was_at_capacity = cache.len() >= self.max_entries;
// Add to cache (LruCache automatically evicts oldest if at capacity)
cache.put(key_hash, cached_result);
// Update domain index if domain is specified
if let Some(ref domain) = key.domain {
let mut domain_index = self.domain_index.write().expect(
"QueryCache: domain_index lock poisoned - this indicates a panic in domain tracking",
);
// Arc<str> clone is cheap (just ref count increment)
domain_index
.entry(Arc::clone(domain))
.or_default()
.insert(key_hash);
}
// Update metrics
let mut metrics = self.metrics.write().expect(
"QueryCache: metrics lock poisoned - this indicates a panic in metrics tracking",
);
metrics.size = cache.len();
// If this was an update (not a new entry), don't count as eviction
if was_present {
return;
}
// If cache was at capacity and we added a new entry, an eviction occurred
if was_at_capacity {
metrics.evictions += 1;
}
}
/// Invalidate all cached entries (use for cross-domain changes)
pub fn invalidate_all(&self) {
let mut cache = self
.cache
.write()
.expect("QueryCache: cache lock poisoned - this indicates a panic in cache code");
let count = cache.len();
cache.clear();
// Clear domain index
let mut domain_index = self.domain_index.write().expect(
"QueryCache: domain_index lock poisoned - this indicates a panic in domain tracking",
);
domain_index.clear();
// Clear invalidation set
let mut invalidated = self.invalidated_hashes.write().expect(
"QueryCache: invalidated_hashes lock poisoned - this indicates a panic in invalidation tracking",
);
invalidated.clear();
// Update metrics
let mut metrics = self.metrics.write().expect(
"QueryCache: metrics lock poisoned - this indicates a panic in metrics tracking",
);
metrics.size = 0;
metrics.invalidations += count as u64;
}
/// Invalidate entries for a specific domain
///
/// This is more efficient than `invalidate_all()` for multi-domain workloads
/// because it only clears entries for the specified domain.
///
/// # Arguments
///
/// * `domain` - The domain to invalidate
pub fn invalidate_domain(&self, domain: &str) {
// First, check if domain exists and get hashes (read lock)
let hashes_to_invalidate = {
let domain_index = self.domain_index.read().expect(
"QueryCache: domain_index lock poisoned - this indicates a panic in domain tracking",
);
domain_index.get(domain).cloned()
}; // Read lock released here
if let Some(hashes) = hashes_to_invalidate {
let count = hashes.len();
// Mark entries for lazy invalidation
{
let mut invalidated = self.invalidated_hashes.write().expect(
"QueryCache: invalidated_hashes lock poisoned - this indicates a panic in invalidation tracking",
);
for hash in &hashes {
invalidated.insert(*hash);
}
} // Write lock on invalidated_hashes released here
// Remove from domain index (now safe to acquire write lock)
{
let mut domain_index = self.domain_index.write().expect(
"QueryCache: domain_index lock poisoned - this indicates a panic in domain tracking",
);
domain_index.remove(domain);
} // Write lock on domain_index released here
// Update metrics
{
let mut metrics = self.metrics.write().expect(
"QueryCache: metrics lock poisoned - this indicates a panic in metrics tracking",
);
metrics.invalidations += count as u64;
} // Write lock on metrics released here
}
// If domain not found, no-op (already cleared or never existed)
}
/// Get current cache metrics
#[must_use]
pub fn metrics(&self) -> CacheMetrics {
let metrics = self.metrics.read().expect(
"QueryCache: metrics lock poisoned - this indicates a panic in metrics tracking",
);
metrics.clone()
}
/// Clear all metrics
pub fn clear_metrics(&self) {
let mut metrics = self.metrics.write().expect(
"QueryCache: metrics lock poisoned - this indicates a panic in metrics tracking",
);
*metrics = CacheMetrics {
capacity: self.max_entries,
..Default::default()
};
}
/// Get cache size (number of entries)
///
/// Note: This returns the physical size of the cache, which may include
/// entries that are marked for lazy invalidation. These entries will be
/// filtered out when accessed via `get()`.
#[must_use]
pub fn size(&self) -> usize {
self.cache
.read()
.expect("QueryCache: cache lock poisoned - this indicates a panic in cache code")
.len()
}
/// Get effective cache size (excluding invalidated entries)
///
/// This returns the logical size of the cache, excluding entries that
/// are marked for lazy invalidation.
#[must_use]
pub fn effective_size(&self) -> usize {
let cache_size = self
.cache
.read()
.expect("QueryCache: cache lock poisoned - this indicates a panic in cache code")
.len();
let invalidated_size = self
.invalidated_hashes
.read()
.expect("QueryCache: invalidated_hashes lock poisoned - this indicates a panic in invalidation tracking")
.len();
cache_size.saturating_sub(invalidated_size)
}
/// Check if cache is empty
#[must_use]
pub fn is_empty(&self) -> bool {
self.cache
.read()
.expect("QueryCache: cache lock poisoned - this indicates a panic in cache code")
.is_empty()
}
}
impl Default for QueryCache {
fn default() -> Self {
Self::new()
}
}