ruvector_graph/executor/
cache.rs

1//! Query result caching for performance optimization
2//!
3//! Implements LRU cache with TTL support
4
5use crate::executor::pipeline::RowBatch;
6use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8use std::time::{Duration, Instant};
9
10/// Cache configuration
11#[derive(Debug, Clone)]
12pub struct CacheConfig {
13    /// Maximum number of cached entries
14    pub max_entries: usize,
15    /// Maximum memory usage in bytes
16    pub max_memory_bytes: usize,
17    /// Time-to-live for cache entries in seconds
18    pub ttl_seconds: u64,
19}
20
21impl CacheConfig {
22    /// Create new cache config
23    pub fn new(max_entries: usize, max_memory_bytes: usize, ttl_seconds: u64) -> Self {
24        Self {
25            max_entries,
26            max_memory_bytes,
27            ttl_seconds,
28        }
29    }
30}
31
32impl Default for CacheConfig {
33    fn default() -> Self {
34        Self {
35            max_entries: 1000,
36            max_memory_bytes: 100 * 1024 * 1024, // 100MB
37            ttl_seconds: 300,                    // 5 minutes
38        }
39    }
40}
41
42/// Cache entry with metadata
43#[derive(Debug, Clone)]
44pub struct CacheEntry {
45    /// Cached query results
46    pub results: Vec<RowBatch>,
47    /// Entry creation time
48    pub created_at: Instant,
49    /// Last access time
50    pub last_accessed: Instant,
51    /// Estimated memory size in bytes
52    pub size_bytes: usize,
53    /// Access count
54    pub access_count: u64,
55}
56
57impl CacheEntry {
58    /// Create new cache entry
59    pub fn new(results: Vec<RowBatch>) -> Self {
60        let size_bytes = Self::estimate_size(&results);
61        let now = Instant::now();
62
63        Self {
64            results,
65            created_at: now,
66            last_accessed: now,
67            size_bytes,
68            access_count: 0,
69        }
70    }
71
72    /// Estimate memory size of results
73    fn estimate_size(results: &[RowBatch]) -> usize {
74        results
75            .iter()
76            .map(|batch| {
77                // Rough estimate: 8 bytes per value + overhead
78                batch.len() * batch.schema.columns.len() * 8 + 1024
79            })
80            .sum()
81    }
82
83    /// Check if entry is expired
84    pub fn is_expired(&self, ttl: Duration) -> bool {
85        self.created_at.elapsed() > ttl
86    }
87
88    /// Update access metadata
89    pub fn mark_accessed(&mut self) {
90        self.last_accessed = Instant::now();
91        self.access_count += 1;
92    }
93}
94
95/// LRU cache for query results
96pub struct QueryCache {
97    /// Cache storage
98    entries: Arc<RwLock<HashMap<String, CacheEntry>>>,
99    /// LRU tracking
100    lru_order: Arc<RwLock<Vec<String>>>,
101    /// Configuration
102    config: CacheConfig,
103    /// Current memory usage
104    memory_used: Arc<RwLock<usize>>,
105    /// Cache statistics
106    stats: Arc<RwLock<CacheStats>>,
107}
108
109impl QueryCache {
110    /// Create a new query cache
111    pub fn new(config: CacheConfig) -> Self {
112        Self {
113            entries: Arc::new(RwLock::new(HashMap::new())),
114            lru_order: Arc::new(RwLock::new(Vec::new())),
115            config,
116            memory_used: Arc::new(RwLock::new(0)),
117            stats: Arc::new(RwLock::new(CacheStats::default())),
118        }
119    }
120
121    /// Get cached results
122    pub fn get(&self, key: &str) -> Option<CacheEntry> {
123        let mut entries = self.entries.write().ok()?;
124        let mut lru = self.lru_order.write().ok()?;
125        let mut stats = self.stats.write().ok()?;
126
127        if let Some(entry) = entries.get_mut(key) {
128            // Check if expired
129            if entry.is_expired(Duration::from_secs(self.config.ttl_seconds)) {
130                stats.misses += 1;
131                return None;
132            }
133
134            // Update LRU order
135            if let Some(pos) = lru.iter().position(|k| k == key) {
136                lru.remove(pos);
137            }
138            lru.push(key.to_string());
139
140            // Update access metadata
141            entry.mark_accessed();
142            stats.hits += 1;
143
144            Some(entry.clone())
145        } else {
146            stats.misses += 1;
147            None
148        }
149    }
150
151    /// Insert results into cache
152    pub fn insert(&self, key: String, results: Vec<RowBatch>) {
153        let entry = CacheEntry::new(results);
154        let entry_size = entry.size_bytes;
155
156        let mut entries = self.entries.write().unwrap();
157        let mut lru = self.lru_order.write().unwrap();
158        let mut memory = self.memory_used.write().unwrap();
159        let mut stats = self.stats.write().unwrap();
160
161        // Evict if necessary
162        while (entries.len() >= self.config.max_entries
163            || *memory + entry_size > self.config.max_memory_bytes)
164            && !lru.is_empty()
165        {
166            if let Some(old_key) = lru.first().cloned() {
167                if let Some(old_entry) = entries.remove(&old_key) {
168                    *memory = memory.saturating_sub(old_entry.size_bytes);
169                    stats.evictions += 1;
170                }
171                lru.remove(0);
172            }
173        }
174
175        // Insert new entry
176        entries.insert(key.clone(), entry);
177        lru.push(key);
178        *memory += entry_size;
179        stats.inserts += 1;
180    }
181
182    /// Remove entry from cache
183    pub fn remove(&self, key: &str) -> bool {
184        let mut entries = self.entries.write().unwrap();
185        let mut lru = self.lru_order.write().unwrap();
186        let mut memory = self.memory_used.write().unwrap();
187
188        if let Some(entry) = entries.remove(key) {
189            *memory = memory.saturating_sub(entry.size_bytes);
190            if let Some(pos) = lru.iter().position(|k| k == key) {
191                lru.remove(pos);
192            }
193            true
194        } else {
195            false
196        }
197    }
198
199    /// Clear all cache entries
200    pub fn clear(&self) {
201        let mut entries = self.entries.write().unwrap();
202        let mut lru = self.lru_order.write().unwrap();
203        let mut memory = self.memory_used.write().unwrap();
204
205        entries.clear();
206        lru.clear();
207        *memory = 0;
208    }
209
210    /// Get cache statistics
211    pub fn stats(&self) -> CacheStats {
212        self.stats.read().unwrap().clone()
213    }
214
215    /// Get current memory usage
216    pub fn memory_used(&self) -> usize {
217        *self.memory_used.read().unwrap()
218    }
219
220    /// Get number of cached entries
221    pub fn len(&self) -> usize {
222        self.entries.read().unwrap().len()
223    }
224
225    /// Check if cache is empty
226    pub fn is_empty(&self) -> bool {
227        self.entries.read().unwrap().is_empty()
228    }
229
230    /// Clean expired entries
231    pub fn clean_expired(&self) {
232        let ttl = Duration::from_secs(self.config.ttl_seconds);
233        let mut entries = self.entries.write().unwrap();
234        let mut lru = self.lru_order.write().unwrap();
235        let mut memory = self.memory_used.write().unwrap();
236        let mut stats = self.stats.write().unwrap();
237
238        let expired_keys: Vec<_> = entries
239            .iter()
240            .filter(|(_, entry)| entry.is_expired(ttl))
241            .map(|(key, _)| key.clone())
242            .collect();
243
244        for key in expired_keys {
245            if let Some(entry) = entries.remove(&key) {
246                *memory = memory.saturating_sub(entry.size_bytes);
247                if let Some(pos) = lru.iter().position(|k| k == &key) {
248                    lru.remove(pos);
249                }
250                stats.evictions += 1;
251            }
252        }
253    }
254}
255
256/// Cache statistics
257#[derive(Debug, Clone, Default)]
258pub struct CacheStats {
259    /// Number of cache hits
260    pub hits: u64,
261    /// Number of cache misses
262    pub misses: u64,
263    /// Number of insertions
264    pub inserts: u64,
265    /// Number of evictions
266    pub evictions: u64,
267}
268
269impl CacheStats {
270    /// Calculate hit rate
271    pub fn hit_rate(&self) -> f64 {
272        let total = self.hits + self.misses;
273        if total == 0 {
274            0.0
275        } else {
276            self.hits as f64 / total as f64
277        }
278    }
279
280    /// Reset statistics
281    pub fn reset(&mut self) {
282        self.hits = 0;
283        self.misses = 0;
284        self.inserts = 0;
285        self.evictions = 0;
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use crate::executor::plan::{ColumnDef, DataType, QuerySchema};
293
294    fn create_test_batch() -> RowBatch {
295        let schema = QuerySchema::new(vec![ColumnDef {
296            name: "id".to_string(),
297            data_type: DataType::Int64,
298            nullable: false,
299        }]);
300        RowBatch::new(schema)
301    }
302
303    #[test]
304    fn test_cache_insert_and_get() {
305        let cache = QueryCache::new(CacheConfig::default());
306        let batch = create_test_batch();
307
308        cache.insert("test_key".to_string(), vec![batch.clone()]);
309        assert_eq!(cache.len(), 1);
310
311        let cached = cache.get("test_key");
312        assert!(cached.is_some());
313    }
314
315    #[test]
316    fn test_cache_miss() {
317        let cache = QueryCache::new(CacheConfig::default());
318        let result = cache.get("nonexistent");
319        assert!(result.is_none());
320
321        let stats = cache.stats();
322        assert_eq!(stats.misses, 1);
323    }
324
325    #[test]
326    fn test_cache_eviction() {
327        let config = CacheConfig {
328            max_entries: 2,
329            max_memory_bytes: 1024 * 1024,
330            ttl_seconds: 300,
331        };
332        let cache = QueryCache::new(config);
333        let batch = create_test_batch();
334
335        cache.insert("key1".to_string(), vec![batch.clone()]);
336        cache.insert("key2".to_string(), vec![batch.clone()]);
337        cache.insert("key3".to_string(), vec![batch.clone()]);
338
339        // Should have evicted oldest entry
340        assert_eq!(cache.len(), 2);
341        assert!(cache.get("key1").is_none());
342    }
343
344    #[test]
345    fn test_cache_clear() {
346        let cache = QueryCache::new(CacheConfig::default());
347        let batch = create_test_batch();
348
349        cache.insert("key1".to_string(), vec![batch.clone()]);
350        cache.insert("key2".to_string(), vec![batch.clone()]);
351
352        cache.clear();
353        assert_eq!(cache.len(), 0);
354        assert_eq!(cache.memory_used(), 0);
355    }
356
357    #[test]
358    fn test_hit_rate() {
359        let mut stats = CacheStats::default();
360        stats.hits = 7;
361        stats.misses = 3;
362
363        assert!((stats.hit_rate() - 0.7).abs() < 0.001);
364    }
365}