ruvector_graph/executor/
cache.rs1use crate::executor::pipeline::RowBatch;
6use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone)]
12pub struct CacheConfig {
13 pub max_entries: usize,
15 pub max_memory_bytes: usize,
17 pub ttl_seconds: u64,
19}
20
21impl CacheConfig {
22 pub fn new(max_entries: usize, max_memory_bytes: usize, ttl_seconds: u64) -> Self {
24 Self {
25 max_entries,
26 max_memory_bytes,
27 ttl_seconds,
28 }
29 }
30}
31
32impl Default for CacheConfig {
33 fn default() -> Self {
34 Self {
35 max_entries: 1000,
36 max_memory_bytes: 100 * 1024 * 1024, ttl_seconds: 300, }
39 }
40}
41
42#[derive(Debug, Clone)]
44pub struct CacheEntry {
45 pub results: Vec<RowBatch>,
47 pub created_at: Instant,
49 pub last_accessed: Instant,
51 pub size_bytes: usize,
53 pub access_count: u64,
55}
56
57impl CacheEntry {
58 pub fn new(results: Vec<RowBatch>) -> Self {
60 let size_bytes = Self::estimate_size(&results);
61 let now = Instant::now();
62
63 Self {
64 results,
65 created_at: now,
66 last_accessed: now,
67 size_bytes,
68 access_count: 0,
69 }
70 }
71
72 fn estimate_size(results: &[RowBatch]) -> usize {
74 results
75 .iter()
76 .map(|batch| {
77 batch.len() * batch.schema.columns.len() * 8 + 1024
79 })
80 .sum()
81 }
82
83 pub fn is_expired(&self, ttl: Duration) -> bool {
85 self.created_at.elapsed() > ttl
86 }
87
88 pub fn mark_accessed(&mut self) {
90 self.last_accessed = Instant::now();
91 self.access_count += 1;
92 }
93}
94
95pub struct QueryCache {
97 entries: Arc<RwLock<HashMap<String, CacheEntry>>>,
99 lru_order: Arc<RwLock<Vec<String>>>,
101 config: CacheConfig,
103 memory_used: Arc<RwLock<usize>>,
105 stats: Arc<RwLock<CacheStats>>,
107}
108
109impl QueryCache {
110 pub fn new(config: CacheConfig) -> Self {
112 Self {
113 entries: Arc::new(RwLock::new(HashMap::new())),
114 lru_order: Arc::new(RwLock::new(Vec::new())),
115 config,
116 memory_used: Arc::new(RwLock::new(0)),
117 stats: Arc::new(RwLock::new(CacheStats::default())),
118 }
119 }
120
121 pub fn get(&self, key: &str) -> Option<CacheEntry> {
123 let mut entries = self.entries.write().ok()?;
124 let mut lru = self.lru_order.write().ok()?;
125 let mut stats = self.stats.write().ok()?;
126
127 if let Some(entry) = entries.get_mut(key) {
128 if entry.is_expired(Duration::from_secs(self.config.ttl_seconds)) {
130 stats.misses += 1;
131 return None;
132 }
133
134 if let Some(pos) = lru.iter().position(|k| k == key) {
136 lru.remove(pos);
137 }
138 lru.push(key.to_string());
139
140 entry.mark_accessed();
142 stats.hits += 1;
143
144 Some(entry.clone())
145 } else {
146 stats.misses += 1;
147 None
148 }
149 }
150
151 pub fn insert(&self, key: String, results: Vec<RowBatch>) {
153 let entry = CacheEntry::new(results);
154 let entry_size = entry.size_bytes;
155
156 let mut entries = self.entries.write().unwrap();
157 let mut lru = self.lru_order.write().unwrap();
158 let mut memory = self.memory_used.write().unwrap();
159 let mut stats = self.stats.write().unwrap();
160
161 while (entries.len() >= self.config.max_entries
163 || *memory + entry_size > self.config.max_memory_bytes)
164 && !lru.is_empty()
165 {
166 if let Some(old_key) = lru.first().cloned() {
167 if let Some(old_entry) = entries.remove(&old_key) {
168 *memory = memory.saturating_sub(old_entry.size_bytes);
169 stats.evictions += 1;
170 }
171 lru.remove(0);
172 }
173 }
174
175 entries.insert(key.clone(), entry);
177 lru.push(key);
178 *memory += entry_size;
179 stats.inserts += 1;
180 }
181
182 pub fn remove(&self, key: &str) -> bool {
184 let mut entries = self.entries.write().unwrap();
185 let mut lru = self.lru_order.write().unwrap();
186 let mut memory = self.memory_used.write().unwrap();
187
188 if let Some(entry) = entries.remove(key) {
189 *memory = memory.saturating_sub(entry.size_bytes);
190 if let Some(pos) = lru.iter().position(|k| k == key) {
191 lru.remove(pos);
192 }
193 true
194 } else {
195 false
196 }
197 }
198
199 pub fn clear(&self) {
201 let mut entries = self.entries.write().unwrap();
202 let mut lru = self.lru_order.write().unwrap();
203 let mut memory = self.memory_used.write().unwrap();
204
205 entries.clear();
206 lru.clear();
207 *memory = 0;
208 }
209
210 pub fn stats(&self) -> CacheStats {
212 self.stats.read().unwrap().clone()
213 }
214
215 pub fn memory_used(&self) -> usize {
217 *self.memory_used.read().unwrap()
218 }
219
220 pub fn len(&self) -> usize {
222 self.entries.read().unwrap().len()
223 }
224
225 pub fn is_empty(&self) -> bool {
227 self.entries.read().unwrap().is_empty()
228 }
229
230 pub fn clean_expired(&self) {
232 let ttl = Duration::from_secs(self.config.ttl_seconds);
233 let mut entries = self.entries.write().unwrap();
234 let mut lru = self.lru_order.write().unwrap();
235 let mut memory = self.memory_used.write().unwrap();
236 let mut stats = self.stats.write().unwrap();
237
238 let expired_keys: Vec<_> = entries
239 .iter()
240 .filter(|(_, entry)| entry.is_expired(ttl))
241 .map(|(key, _)| key.clone())
242 .collect();
243
244 for key in expired_keys {
245 if let Some(entry) = entries.remove(&key) {
246 *memory = memory.saturating_sub(entry.size_bytes);
247 if let Some(pos) = lru.iter().position(|k| k == &key) {
248 lru.remove(pos);
249 }
250 stats.evictions += 1;
251 }
252 }
253 }
254}
255
256#[derive(Debug, Clone, Default)]
258pub struct CacheStats {
259 pub hits: u64,
261 pub misses: u64,
263 pub inserts: u64,
265 pub evictions: u64,
267}
268
269impl CacheStats {
270 pub fn hit_rate(&self) -> f64 {
272 let total = self.hits + self.misses;
273 if total == 0 {
274 0.0
275 } else {
276 self.hits as f64 / total as f64
277 }
278 }
279
280 pub fn reset(&mut self) {
282 self.hits = 0;
283 self.misses = 0;
284 self.inserts = 0;
285 self.evictions = 0;
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use crate::executor::plan::{ColumnDef, DataType, QuerySchema};
293
294 fn create_test_batch() -> RowBatch {
295 let schema = QuerySchema::new(vec![ColumnDef {
296 name: "id".to_string(),
297 data_type: DataType::Int64,
298 nullable: false,
299 }]);
300 RowBatch::new(schema)
301 }
302
303 #[test]
304 fn test_cache_insert_and_get() {
305 let cache = QueryCache::new(CacheConfig::default());
306 let batch = create_test_batch();
307
308 cache.insert("test_key".to_string(), vec![batch.clone()]);
309 assert_eq!(cache.len(), 1);
310
311 let cached = cache.get("test_key");
312 assert!(cached.is_some());
313 }
314
315 #[test]
316 fn test_cache_miss() {
317 let cache = QueryCache::new(CacheConfig::default());
318 let result = cache.get("nonexistent");
319 assert!(result.is_none());
320
321 let stats = cache.stats();
322 assert_eq!(stats.misses, 1);
323 }
324
325 #[test]
326 fn test_cache_eviction() {
327 let config = CacheConfig {
328 max_entries: 2,
329 max_memory_bytes: 1024 * 1024,
330 ttl_seconds: 300,
331 };
332 let cache = QueryCache::new(config);
333 let batch = create_test_batch();
334
335 cache.insert("key1".to_string(), vec![batch.clone()]);
336 cache.insert("key2".to_string(), vec![batch.clone()]);
337 cache.insert("key3".to_string(), vec![batch.clone()]);
338
339 assert_eq!(cache.len(), 2);
341 assert!(cache.get("key1").is_none());
342 }
343
344 #[test]
345 fn test_cache_clear() {
346 let cache = QueryCache::new(CacheConfig::default());
347 let batch = create_test_batch();
348
349 cache.insert("key1".to_string(), vec![batch.clone()]);
350 cache.insert("key2".to_string(), vec![batch.clone()]);
351
352 cache.clear();
353 assert_eq!(cache.len(), 0);
354 assert_eq!(cache.memory_used(), 0);
355 }
356
357 #[test]
358 fn test_hit_rate() {
359 let mut stats = CacheStats::default();
360 stats.hits = 7;
361 stats.misses = 3;
362
363 assert!((stats.hit_rate() - 0.7).abs() < 0.001);
364 }
365}