skp_cache_storage/memory/
backend.rs

1//! In-memory cache backend using DashMap
2
3use async_trait::async_trait;
4use dashmap::DashMap;
5use parking_lot::RwLock;
6use std::collections::HashSet;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use skp_cache_core::{CacheBackend, CacheEntry, CacheOptions, CacheStats, DependencyBackend, Result, TaggableBackend};
11
12use super::ttl_index::TtlIndex;
13
14/// Configuration for the memory backend
15#[derive(Debug, Clone)]
16pub struct MemoryConfig {
17    /// Maximum number of entries (0 = unlimited)
18    pub max_capacity: usize,
19    /// Cleanup interval for expired entries
20    pub cleanup_interval: Duration,
21    /// Maximum TTL supported (for TTL index sizing)
22    pub max_ttl: Duration,
23    /// Enable TTL index for efficient expiration
24    pub enable_ttl_index: bool,
25}
26
27impl Default for MemoryConfig {
28    fn default() -> Self {
29        Self {
30            max_capacity: 10_000,
31            cleanup_interval: Duration::from_secs(60),
32            max_ttl: Duration::from_secs(86400), // 24 hours
33            enable_ttl_index: true,
34        }
35    }
36}
37
38impl MemoryConfig {
39    /// Create config with specific capacity
40    pub fn with_capacity(capacity: usize) -> Self {
41        Self {
42            max_capacity: capacity,
43            ..Default::default()
44        }
45    }
46
47    /// Create config with unlimited capacity
48    pub fn unlimited() -> Self {
49        Self {
50            max_capacity: 0,
51            ..Default::default()
52        }
53    }
54}
55
56/// Internal statistics tracking
57#[derive(Debug, Default)]
58struct MemoryStats {
59    hits: u64,
60    misses: u64,
61    stale_hits: u64,
62    writes: u64,
63    deletes: u64,
64    evictions: u64,
65}
66
67/// Tag index for tag-based lookups
68type TagIndex = DashMap<String, HashSet<String>>;
69/// Dependency index for dependency-based invalidation (Dependency -> Dependent Keys)
70type DepIndex = DashMap<String, HashSet<String>>;
71
72/// In-memory cache backend
73///
74/// Uses `DashMap` for concurrent access and `TtlIndex` for efficient expiration.
75/// Cloning creates a new handle to the SAME underlying store.
76#[derive(Clone)]
77pub struct MemoryBackend {
78    /// Main data store
79    data: Arc<DashMap<String, CacheEntry<Vec<u8>>>>,
80    /// Tag -> keys index
81    tag_index: Arc<TagIndex>,
82    /// Dependency -> keys index
83    dep_index: Arc<DepIndex>,
84    /// TTL expiration index
85    ttl_index: Arc<RwLock<TtlIndex>>,
86    /// Statistics
87    stats: Arc<RwLock<MemoryStats>>,
88    /// Configuration
89    config: MemoryConfig,
90}
91
92impl MemoryBackend {
93    /// Create a new memory backend
94    pub fn new(config: MemoryConfig) -> Self {
95        let ttl_index = TtlIndex::new(Duration::from_secs(1), config.max_ttl);
96
97        Self {
98            data: Arc::new(DashMap::with_capacity(config.max_capacity.min(10_000))),
99            tag_index: Arc::new(DashMap::new()),
100            dep_index: Arc::new(DashMap::new()),
101            ttl_index: Arc::new(RwLock::new(ttl_index)),
102            stats: Arc::new(RwLock::new(MemoryStats::default())),
103            config,
104        }
105    }
106
107    /// Create with default configuration
108    pub fn with_defaults() -> Self {
109        Self::new(MemoryConfig::default())
110    }
111
112    /// Evict entries if at capacity
113    fn maybe_evict(&self) {
114        if self.config.max_capacity == 0 {
115            return; // Unlimited
116        }
117
118        // Only evict if we're at or over capacity
119        if self.data.len() < self.config.max_capacity {
120            return;
121        }
122
123        // Simple eviction: collect keys to remove first
124        let keys_to_remove: Vec<String> = self
125            .data
126            .iter()
127            .take(self.data.len().saturating_sub(self.config.max_capacity - 1))
128            .map(|entry| entry.key().clone())
129            .collect();
130
131        for key in keys_to_remove {
132            self.data.remove(&key);
133            self.ttl_index.write().remove(&key);
134            self.stats.write().evictions += 1;
135        }
136    }
137
138    /// Remove an entry and clean up indexes
139    fn remove_entry(&self, key: &str) {
140        if let Some((_, entry)) = self.data.remove(key) {
141            // Remove from TTL index
142            self.ttl_index.write().remove(key);
143
144            // Remove from tag index
145            for tag in &entry.tags {
146                if let Some(mut keys) = self.tag_index.get_mut(tag) {
147                    keys.remove(key);
148                }
149            }
150
151            // Remove from dependency index
152            for dep in &entry.dependencies {
153                if let Some(mut dependents) = self.dep_index.get_mut(dep) {
154                    dependents.remove(key);
155                }
156            }
157        }
158    }
159
160    /// Run TTL cleanup and return number of expired entries removed
161    pub fn cleanup_expired(&self) -> usize {
162        let expired = self.ttl_index.write().tick();
163        let mut count = 0;
164
165        for key in expired {
166            if let Some(entry) = self.data.get(&key) {
167                if entry.is_expired() && !entry.is_stale() {
168                    drop(entry);
169                    self.remove_entry(&key);
170                    self.stats.write().evictions += 1;
171                    count += 1;
172                }
173            }
174        }
175
176        count
177    }
178
179    /// Get approximate memory usage
180    pub fn memory_usage(&self) -> usize {
181        self.data
182            .iter()
183            .map(|entry| entry.size + entry.key().len())
184            .sum()
185    }
186}
187
188#[async_trait]
189impl CacheBackend for MemoryBackend {
190    async fn get(&self, key: &str) -> Result<Option<CacheEntry<Vec<u8>>>> {
191        match self.data.get_mut(key) {
192            Some(mut entry) => {
193                // Check expiration
194                if entry.is_expired() && !entry.is_stale() {
195                    drop(entry);
196                    self.remove_entry(key);
197                    self.stats.write().misses += 1;
198                    return Ok(None);
199                }
200
201                // Update access metadata
202                entry.last_accessed = SystemTime::now();
203                entry.access_count += 1;
204
205                // Update stats
206                let mut stats = self.stats.write();
207                if entry.is_stale() {
208                    stats.stale_hits += 1;
209                } else {
210                    stats.hits += 1;
211                }
212
213                Ok(Some(entry.clone()))
214            }
215            None => {
216                self.stats.write().misses += 1;
217                Ok(None)
218            }
219        }
220    }
221
222    async fn set(&self, key: &str, value: Vec<u8>, options: &CacheOptions) -> Result<()> {
223        self.maybe_evict();
224
225        let size = value.len();
226        let now = SystemTime::now();
227
228        let entry = CacheEntry {
229            value,
230            created_at: now,
231            last_accessed: now,
232            access_count: 0,
233            ttl: options.ttl,
234            stale_while_revalidate: options.stale_while_revalidate,
235            tags: options.tags.clone(),
236            dependencies: options.dependencies.clone(),
237            cost: options.cost.unwrap_or(1),
238            size,
239            etag: options.etag.clone(),
240            version: 0,
241        };
242
243        // Schedule TTL expiration
244        if self.config.enable_ttl_index {
245            if let Some(ttl) = options.ttl {
246                let total_ttl = ttl + options.stale_while_revalidate.unwrap_or_default();
247                self.ttl_index.write().schedule(key.to_string(), total_ttl);
248            }
249        }
250
251        // Update tag index
252        for tag in &options.tags {
253            self.tag_index
254                .entry(tag.clone())
255                .or_insert_with(HashSet::new)
256                .insert(key.to_string());
257        }
258
259        // Update dependency index
260        for dep in &options.dependencies {
261            self.dep_index
262                .entry(dep.clone())
263                .or_insert_with(HashSet::new)
264                .insert(key.to_string());
265        }
266
267        if let Some(old_entry) = self.data.insert(key.to_string(), entry) {
268            // Clean up old dependencies that are no longer present
269            for dep in old_entry.dependencies {
270                if !options.dependencies.contains(&dep) {
271                    if let Some(mut dependents) = self.dep_index.get_mut(&dep) {
272                        dependents.remove(key);
273                    }
274                }
275            }
276        }
277        
278        self.stats.write().writes += 1;
279
280        Ok(())
281    }
282
283    async fn delete(&self, key: &str) -> Result<bool> {
284        if self.data.contains_key(key) {
285            self.remove_entry(key);
286            self.stats.write().deletes += 1;
287            Ok(true)
288        } else {
289            Ok(false)
290        }
291    }
292
293    async fn exists(&self, key: &str) -> Result<bool> {
294        match self.data.get(key) {
295            Some(entry) => Ok(!entry.is_expired() || entry.is_stale()),
296            None => Ok(false),
297        }
298    }
299
300    async fn delete_many(&self, keys: &[&str]) -> Result<u64> {
301        let mut count = 0;
302        for key in keys {
303            if self.delete(key).await? {
304                count += 1;
305            }
306        }
307        Ok(count)
308    }
309
310    async fn get_many(&self, keys: &[&str]) -> Result<Vec<Option<CacheEntry<Vec<u8>>>>> {
311        let mut results = Vec::with_capacity(keys.len());
312        for key in keys {
313            results.push(self.get(key).await?);
314        }
315        Ok(results)
316    }
317
318    async fn set_many(&self, entries: &[(&str, Vec<u8>, &CacheOptions)]) -> Result<()> {
319        for (key, value, options) in entries {
320            self.set(key, value.clone(), options).await?;
321        }
322        Ok(())
323    }
324
325    async fn clear(&self) -> Result<()> {
326        self.data.clear();
327        self.tag_index.clear();
328        self.dep_index.clear();
329        *self.ttl_index.write() = TtlIndex::new(Duration::from_secs(1), self.config.max_ttl);
330        Ok(())
331    }
332
333    async fn stats(&self) -> Result<CacheStats> {
334        let stats = self.stats.read();
335        Ok(CacheStats {
336            hits: stats.hits,
337            misses: stats.misses,
338            stale_hits: stats.stale_hits,
339            writes: stats.writes,
340            deletes: stats.deletes,
341            evictions: stats.evictions,
342            size: self.data.len(),
343            memory_bytes: self.memory_usage(),
344        })
345    }
346
347    async fn len(&self) -> Result<usize> {
348        Ok(self.data.len())
349    }
350}
351
352
353
354#[async_trait]
355impl TaggableBackend for MemoryBackend {
356    async fn get_by_tag(&self, tag: &str) -> Result<Vec<String>> {
357        if let Some(keys) = self.tag_index.get(tag) {
358             Ok(keys.iter().cloned().collect())
359        } else {
360             Ok(Vec::new())
361        }
362    }
363
364    async fn delete_by_tag(&self, tag: &str) -> Result<u64> {
365        // We get the keys and remove the tag entry first
366        if let Some((_, keys)) = self.tag_index.remove(tag) {
367             let mut count = 0;
368             for key in keys {
369                 // Check if key exists (it might have been evicted)
370                 if self.data.contains_key(&key) {
371                     self.remove_entry(&key);
372                     self.stats.write().deletes += 1;
373                     count += 1;
374                 }
375             }
376             Ok(count)
377        } else {
378             Ok(0)
379        }
380    }
381}
382
383#[async_trait]
384impl DependencyBackend for MemoryBackend {
385    async fn get_dependents(&self, key: &str) -> Result<Vec<String>> {
386        if let Some(dependents) = self.dep_index.get(key) {
387             Ok(dependents.iter().cloned().collect())
388        } else {
389             Ok(Vec::new())
390        }
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[tokio::test]
399    async fn test_basic_get_set() {
400        let backend = MemoryBackend::new(MemoryConfig::default());
401
402        let options = CacheOptions {
403            ttl: Some(Duration::from_secs(60)),
404            ..Default::default()
405        };
406
407        backend
408            .set("key1", b"value1".to_vec(), &options)
409            .await
410            .unwrap();
411
412        let result = backend.get("key1").await.unwrap();
413        assert!(result.is_some());
414        assert_eq!(result.unwrap().value, b"value1".to_vec());
415    }
416
417    #[tokio::test]
418    async fn test_delete() {
419        let backend = MemoryBackend::new(MemoryConfig::default());
420        let options = CacheOptions::default();
421
422        backend
423            .set("key1", b"value1".to_vec(), &options)
424            .await
425            .unwrap();
426        assert!(backend.exists("key1").await.unwrap());
427
428        let deleted = backend.delete("key1").await.unwrap();
429        assert!(deleted);
430        assert!(!backend.exists("key1").await.unwrap());
431    }
432
433    #[tokio::test]
434    async fn test_get_nonexistent() {
435        let backend = MemoryBackend::new(MemoryConfig::default());
436        let result = backend.get("nonexistent").await.unwrap();
437        assert!(result.is_none());
438    }
439
440    #[tokio::test]
441    async fn test_clear() {
442        let backend = MemoryBackend::new(MemoryConfig::default());
443        let options = CacheOptions::default();
444
445        backend
446            .set("key1", b"value1".to_vec(), &options)
447            .await
448            .unwrap();
449        backend
450            .set("key2", b"value2".to_vec(), &options)
451            .await
452            .unwrap();
453
454        assert_eq!(backend.len().await.unwrap(), 2);
455
456        backend.clear().await.unwrap();
457        assert_eq!(backend.len().await.unwrap(), 0);
458    }
459
460    #[tokio::test]
461    async fn test_stats() {
462        let backend = MemoryBackend::new(MemoryConfig::default());
463        let options = CacheOptions::default();
464
465        backend
466            .set("key1", b"value1".to_vec(), &options)
467            .await
468            .unwrap();
469        backend.get("key1").await.unwrap();
470        backend.get("nonexistent").await.unwrap();
471
472        let stats = backend.stats().await.unwrap();
473        assert_eq!(stats.hits, 1);
474        assert_eq!(stats.misses, 1);
475        assert_eq!(stats.writes, 1);
476    }
477
478    #[tokio::test]
479    async fn test_capacity_eviction() {
480        let config = MemoryConfig {
481            max_capacity: 2,
482            ..Default::default()
483        };
484        let backend = MemoryBackend::new(config);
485        let options = CacheOptions::default();
486
487        backend
488            .set("key1", b"value1".to_vec(), &options)
489            .await
490            .unwrap();
491        backend
492            .set("key2", b"value2".to_vec(), &options)
493            .await
494            .unwrap();
495        backend
496            .set("key3", b"value3".to_vec(), &options)
497            .await
498            .unwrap();
499
500        // Should have evicted one entry
501        assert!(backend.len().await.unwrap() <= 2);
502    }
503
504    #[tokio::test]
505    async fn test_get_many() {
506        let backend = MemoryBackend::new(MemoryConfig::default());
507        let options = CacheOptions::default();
508
509        backend
510            .set("key1", b"value1".to_vec(), &options)
511            .await
512            .unwrap();
513        backend
514            .set("key2", b"value2".to_vec(), &options)
515            .await
516            .unwrap();
517
518        let results = backend.get_many(&["key1", "key2", "key3"]).await.unwrap();
519        assert_eq!(results.len(), 3);
520        assert!(results[0].is_some());
521        assert!(results[1].is_some());
522        assert!(results[2].is_none());
523    }
524
525    #[tokio::test]
526    async fn test_dependencies() {
527        use skp_cache_core::{DependencyBackend, CacheOptions};
528        let backend = MemoryBackend::new(MemoryConfig::default());
529        
530        let mut opts = CacheOptions::default();
531        opts.dependencies = vec!["dep1".to_string(), "dep2".to_string()];
532
533        backend.set("key1", b"val".to_vec(), &opts).await.unwrap();
534        
535        let deps1 = backend.get_dependents("dep1").await.unwrap();
536        assert!(deps1.contains(&"key1".to_string()));
537        
538        let deps2 = backend.get_dependents("dep2").await.unwrap();
539        assert!(deps2.contains(&"key1".to_string()));
540        
541        // Update
542        opts.dependencies = vec!["dep1".to_string(), "dep3".to_string()];
543        backend.set("key1", b"val".to_vec(), &opts).await.unwrap();
544        
545        // dep1: still has key1
546        // dep2: key1 removed
547        // dep3: key1 added
548        assert!(backend.get_dependents("dep1").await.unwrap().contains(&"key1".to_string()));
549        assert!(!backend.get_dependents("dep2").await.unwrap().contains(&"key1".to_string()));
550        assert!(backend.get_dependents("dep3").await.unwrap().contains(&"key1".to_string()));
551        
552        // Delete
553        backend.delete("key1").await.unwrap();
554        assert!(backend.get_dependents("dep1").await.unwrap().is_empty());
555        assert!(backend.get_dependents("dep3").await.unwrap().is_empty());
556    }
557}