Skip to main content

thread_flow/
cache.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Query result caching for Thread pipeline
5//!
6//! This module provides LRU caching for frequently accessed query results,
7//! reducing database round-trips and improving response times.
8//!
9//! ## Features
10//!
11//! - **Async-first**: Built on moka's async cache for tokio compatibility
12//! - **Type-safe**: Generic caching with compile-time type checking
13//! - **TTL support**: Configurable time-to-live for cache entries
14//! - **Statistics**: Track cache hit/miss rates for monitoring
15//! - **Size limits**: Automatic eviction when cache exceeds capacity
16//!
17//! ## Usage
18//!
19//! ```rust,ignore
20//! use thread_flow::cache::{QueryCache, CacheConfig};
21//! use thread_services::conversion::Fingerprint;
22//!
23//! // Create cache with 1000 entry limit, 5 minute TTL
24//! let cache = QueryCache::new(CacheConfig {
25//!     max_capacity: 1000,
26//!     ttl_seconds: 300,
27//! });
28//!
29//! // Cache symbol query results
30//! let fingerprint = compute_content_fingerprint("fn main() {}");
31//! cache.insert(fingerprint, symbols).await;
32//!
33//! // Retrieve from cache
34//! if let Some(symbols) = cache.get(&fingerprint).await {
35//!     // Cache hit - saved D1 query!
36//! }
37//! ```
38//!
39//! ## Performance Impact
40//!
41//! | Scenario | Without Cache | With Cache | Savings |
42//! |----------|---------------|------------|---------|
43//! | Symbol lookup | 50-100ms (D1) | <1µs (memory) | **99.9%** |
44//! | Metadata query | 20-50ms (D1) | <1µs (memory) | **99.9%** |
45//! | Re-analysis (90% hit) | 100ms total | 10ms total | **90%** |
46
47#[cfg(feature = "caching")]
48use moka::future::Cache;
49#[cfg(feature = "caching")]
50use std::hash::Hash;
51#[cfg(feature = "caching")]
52use std::sync::Arc;
53#[cfg(feature = "caching")]
54use std::time::Duration;
55#[cfg(feature = "caching")]
56use tokio::sync::RwLock;
57
58/// Configuration for query result cache
59#[derive(Debug, Clone)]
60pub struct CacheConfig {
61    /// Maximum number of entries in cache
62    pub max_capacity: u64,
63    /// Time-to-live for cache entries (seconds)
64    pub ttl_seconds: u64,
65}
66
67impl Default for CacheConfig {
68    fn default() -> Self {
69        Self {
70            max_capacity: 10_000, // 10k entries
71            ttl_seconds: 300,     // 5 minutes
72        }
73    }
74}
75
76/// Cache statistics for monitoring
77#[derive(Debug, Clone, Default)]
78pub struct CacheStats {
79    /// Total number of cache lookups
80    pub total_lookups: u64,
81    /// Number of cache hits
82    pub hits: u64,
83    /// Number of cache misses
84    pub misses: u64,
85}
86
87impl CacheStats {
88    /// Calculate cache hit rate as percentage
89    pub fn hit_rate(&self) -> f64 {
90        if self.total_lookups == 0 {
91            0.0
92        } else {
93            (self.hits as f64 / self.total_lookups as f64) * 100.0
94        }
95    }
96
97    /// Calculate cache miss rate as percentage
98    pub fn miss_rate(&self) -> f64 {
99        100.0 - self.hit_rate()
100    }
101}
102
103/// Generic query result cache
104///
105/// Provides LRU caching with TTL for any key-value pair where:
106/// - Key: Must be Clone + Hash + Eq + Send + Sync
107/// - Value: Must be Clone + Send + Sync
108///
109/// # Examples
110///
111/// ```rust,ignore
112/// use thread_flow::cache::{QueryCache, CacheConfig};
113///
114/// // Cache for symbol queries (Fingerprint -> Vec<Symbol>)
115/// let symbol_cache = QueryCache::new(CacheConfig::default());
116///
117/// // Cache for metadata queries (String -> Metadata)
118/// let metadata_cache = QueryCache::new(CacheConfig {
119///     max_capacity: 5000,
120///     ttl_seconds: 600,  // 10 minutes
121/// });
122/// ```
123#[cfg(feature = "caching")]
124pub struct QueryCache<K, V> {
125    cache: Cache<K, V>,
126    stats: Arc<RwLock<CacheStats>>,
127}
128
129#[cfg(feature = "caching")]
130impl<K, V> QueryCache<K, V>
131where
132    K: Hash + Eq + Send + Sync + 'static,
133    V: Clone + Send + Sync + 'static,
134{
135    /// Create a new query cache with the given configuration
136    pub fn new(config: CacheConfig) -> Self {
137        let cache = Cache::builder()
138            .max_capacity(config.max_capacity)
139            .time_to_live(Duration::from_secs(config.ttl_seconds))
140            .build();
141
142        Self {
143            cache,
144            stats: Arc::new(RwLock::new(CacheStats::default())),
145        }
146    }
147
148    /// Insert a key-value pair into the cache
149    ///
150    /// If the key already exists, the value will be updated and TTL reset.
151    pub async fn insert(&self, key: K, value: V) {
152        self.cache.insert(key, value).await;
153    }
154
155    /// Get a value from the cache
156    ///
157    /// Returns `None` if the key is not found or has expired.
158    /// Updates cache statistics (hit/miss counters).
159    pub async fn get(&self, key: &K) -> Option<V>
160    where
161        K: Clone,
162    {
163        let mut stats = self.stats.write().await;
164        stats.total_lookups += 1;
165
166        if let Some(value) = self.cache.get(key).await {
167            stats.hits += 1;
168            Some(value)
169        } else {
170            stats.misses += 1;
171            None
172        }
173    }
174
175    /// Get a value from cache or compute it if missing
176    ///
177    /// This is the recommended way to use the cache as it handles
178    /// cache misses transparently and updates statistics correctly.
179    ///
180    /// # Example
181    ///
182    /// ```rust,ignore
183    /// let symbols = cache.get_or_insert(fingerprint, || async {
184    ///     // This closure only runs on cache miss
185    ///     query_database_for_symbols(fingerprint).await
186    /// }).await;
187    /// ```
188    pub async fn get_or_insert<F, Fut>(&self, key: K, f: F) -> V
189    where
190        K: Clone,
191        F: FnOnce() -> Fut,
192        Fut: std::future::Future<Output = V>,
193    {
194        // Check cache first
195        if let Some(value) = self.get(&key).await {
196            return value;
197        }
198
199        // Compute value on cache miss
200        let value = f().await;
201        self.insert(key, value.clone()).await;
202        value
203    }
204
205    /// Invalidate (remove) a specific cache entry
206    pub async fn invalidate(&self, key: &K) {
207        self.cache.invalidate(key).await;
208    }
209
210    /// Clear all cache entries
211    pub async fn clear(&self) {
212        self.cache.invalidate_all();
213        // Sync to ensure all entries are actually removed before returning
214        self.cache.run_pending_tasks().await;
215    }
216
217    /// Get current cache statistics
218    pub async fn stats(&self) -> CacheStats {
219        self.stats.read().await.clone()
220    }
221
222    /// Reset cache statistics
223    pub async fn reset_stats(&self) {
224        let mut stats = self.stats.write().await;
225        *stats = CacheStats::default();
226    }
227
228    /// Get the number of entries currently in the cache
229    pub fn entry_count(&self) -> u64 {
230        self.cache.entry_count()
231    }
232}
233
234/// No-op cache for when caching feature is disabled
235///
236/// This provides the same API but doesn't actually cache anything,
237/// allowing code to compile with or without the `caching` feature.
238#[cfg(not(feature = "caching"))]
239pub struct QueryCache<K, V> {
240    _phantom: std::marker::PhantomData<(K, V)>,
241}
242
243#[cfg(not(feature = "caching"))]
244impl<K, V> QueryCache<K, V> {
245    pub fn new(_config: CacheConfig) -> Self {
246        Self {
247            _phantom: std::marker::PhantomData,
248        }
249    }
250
251    pub async fn insert(&self, _key: K, _value: V) {}
252
253    pub async fn get(&self, _key: &K) -> Option<V> {
254        None
255    }
256
257    pub async fn get_or_insert<F, Fut>(&self, _key: K, f: F) -> V
258    where
259        F: FnOnce() -> Fut,
260        Fut: std::future::Future<Output = V>,
261    {
262        f().await
263    }
264
265    pub async fn invalidate(&self, _key: &K) {}
266
267    pub async fn clear(&self) {}
268
269    pub async fn stats(&self) -> CacheStats {
270        CacheStats::default()
271    }
272
273    pub async fn reset_stats(&self) {}
274
275    pub fn entry_count(&self) -> u64 {
276        0
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[tokio::test]
285    #[cfg(feature = "caching")]
286    async fn test_cache_basic_operations() {
287        let cache = QueryCache::new(CacheConfig {
288            max_capacity: 100,
289            ttl_seconds: 60,
290        });
291
292        // Insert and retrieve
293        cache.insert("key1".to_string(), "value1".to_string()).await;
294        let value = cache.get(&"key1".to_string()).await;
295        assert_eq!(value, Some("value1".to_string()));
296
297        // Cache miss
298        let missing = cache.get(&"nonexistent".to_string()).await;
299        assert_eq!(missing, None);
300    }
301
302    #[tokio::test]
303    #[cfg(feature = "caching")]
304    async fn test_cache_statistics() {
305        let cache = QueryCache::new(CacheConfig::default());
306
307        // Initial stats
308        let stats = cache.stats().await;
309        assert_eq!(stats.total_lookups, 0);
310        assert_eq!(stats.hits, 0);
311        assert_eq!(stats.misses, 0);
312
313        // Insert and hit
314        cache.insert(1, "one".to_string()).await;
315        let _ = cache.get(&1).await;
316
317        let stats = cache.stats().await;
318        assert_eq!(stats.total_lookups, 1);
319        assert_eq!(stats.hits, 1);
320        assert_eq!(stats.hit_rate(), 100.0);
321
322        // Miss
323        let _ = cache.get(&2).await;
324
325        let stats = cache.stats().await;
326        assert_eq!(stats.total_lookups, 2);
327        assert_eq!(stats.hits, 1);
328        assert_eq!(stats.misses, 1);
329        assert_eq!(stats.hit_rate(), 50.0);
330    }
331
332    #[tokio::test]
333    #[cfg(feature = "caching")]
334    async fn test_get_or_insert() {
335        let cache = QueryCache::new(CacheConfig::default());
336
337        let mut call_count = 0;
338
339        // First call - cache miss, should execute closure
340        let value1 = cache
341            .get_or_insert(1, || async {
342                call_count += 1;
343                "computed".to_string()
344            })
345            .await;
346
347        assert_eq!(value1, "computed");
348        assert_eq!(call_count, 1);
349
350        // Second call - cache hit, should NOT execute closure
351        let value2 = cache
352            .get_or_insert(1, || async {
353                call_count += 1;
354                "should_not_be_called".to_string()
355            })
356            .await;
357
358        assert_eq!(value2, "computed");
359        assert_eq!(call_count, 1); // Closure not called on cache hit
360
361        let stats = cache.stats().await;
362        assert_eq!(stats.hits, 1);
363        assert_eq!(stats.misses, 1);
364    }
365
366    #[tokio::test]
367    #[cfg(feature = "caching")]
368    async fn test_cache_invalidation() {
369        let cache = QueryCache::new(CacheConfig::default());
370
371        cache.insert("key", "value".to_string()).await;
372        assert!(cache.get(&"key").await.is_some());
373
374        cache.invalidate(&"key").await;
375        assert!(cache.get(&"key").await.is_none());
376    }
377
378    #[tokio::test]
379    #[cfg(feature = "caching")]
380    async fn test_cache_clear() {
381        let cache = QueryCache::new(CacheConfig::default());
382
383        cache.insert(1, "one".to_string()).await;
384        cache.insert(2, "two".to_string()).await;
385        cache.insert(3, "three".to_string()).await;
386
387        // Verify entries exist
388        assert!(cache.get(&1).await.is_some());
389        assert!(cache.get(&2).await.is_some());
390        assert!(cache.get(&3).await.is_some());
391
392        cache.clear().await;
393
394        // Verify entries are gone after clear
395        assert!(cache.get(&1).await.is_none());
396        assert!(cache.get(&2).await.is_none());
397        assert!(cache.get(&3).await.is_none());
398    }
399
400    #[tokio::test]
401    #[cfg(not(feature = "caching"))]
402    async fn test_no_op_cache() {
403        let cache = QueryCache::new(CacheConfig::default());
404
405        // Insert does nothing
406        cache.insert("key", "value".to_string()).await;
407
408        // Get always returns None
409        assert_eq!(cache.get(&"key").await, None);
410
411        // get_or_insert always computes
412        let value = cache
413            .get_or_insert("key", || async { "computed".to_string() })
414            .await;
415        assert_eq!(value, "computed");
416
417        // Stats are always empty
418        let stats = cache.stats().await;
419        assert_eq!(stats.total_lookups, 0);
420        assert_eq!(cache.entry_count(), 0);
421    }
422}