skp_cache_storage/memory/
backend.rs1use async_trait::async_trait;
4use dashmap::DashMap;
5use parking_lot::RwLock;
6use std::collections::HashSet;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use skp_cache_core::{CacheBackend, CacheEntry, CacheOptions, CacheStats, DependencyBackend, Result, TaggableBackend};
11
12use super::ttl_index::TtlIndex;
13
14#[derive(Debug, Clone)]
16pub struct MemoryConfig {
17 pub max_capacity: usize,
19 pub cleanup_interval: Duration,
21 pub max_ttl: Duration,
23 pub enable_ttl_index: bool,
25}
26
27impl Default for MemoryConfig {
28 fn default() -> Self {
29 Self {
30 max_capacity: 10_000,
31 cleanup_interval: Duration::from_secs(60),
32 max_ttl: Duration::from_secs(86400), enable_ttl_index: true,
34 }
35 }
36}
37
38impl MemoryConfig {
39 pub fn with_capacity(capacity: usize) -> Self {
41 Self {
42 max_capacity: capacity,
43 ..Default::default()
44 }
45 }
46
47 pub fn unlimited() -> Self {
49 Self {
50 max_capacity: 0,
51 ..Default::default()
52 }
53 }
54}
55
56#[derive(Debug, Default)]
58struct MemoryStats {
59 hits: u64,
60 misses: u64,
61 stale_hits: u64,
62 writes: u64,
63 deletes: u64,
64 evictions: u64,
65}
66
67type TagIndex = DashMap<String, HashSet<String>>;
69type DepIndex = DashMap<String, HashSet<String>>;
71
72#[derive(Clone)]
77pub struct MemoryBackend {
78 data: Arc<DashMap<String, CacheEntry<Vec<u8>>>>,
80 tag_index: Arc<TagIndex>,
82 dep_index: Arc<DepIndex>,
84 ttl_index: Arc<RwLock<TtlIndex>>,
86 stats: Arc<RwLock<MemoryStats>>,
88 config: MemoryConfig,
90}
91
92impl MemoryBackend {
93 pub fn new(config: MemoryConfig) -> Self {
95 let ttl_index = TtlIndex::new(Duration::from_secs(1), config.max_ttl);
96
97 Self {
98 data: Arc::new(DashMap::with_capacity(config.max_capacity.min(10_000))),
99 tag_index: Arc::new(DashMap::new()),
100 dep_index: Arc::new(DashMap::new()),
101 ttl_index: Arc::new(RwLock::new(ttl_index)),
102 stats: Arc::new(RwLock::new(MemoryStats::default())),
103 config,
104 }
105 }
106
107 pub fn with_defaults() -> Self {
109 Self::new(MemoryConfig::default())
110 }
111
112 fn maybe_evict(&self) {
114 if self.config.max_capacity == 0 {
115 return; }
117
118 if self.data.len() < self.config.max_capacity {
120 return;
121 }
122
123 let keys_to_remove: Vec<String> = self
125 .data
126 .iter()
127 .take(self.data.len().saturating_sub(self.config.max_capacity - 1))
128 .map(|entry| entry.key().clone())
129 .collect();
130
131 for key in keys_to_remove {
132 self.data.remove(&key);
133 self.ttl_index.write().remove(&key);
134 self.stats.write().evictions += 1;
135 }
136 }
137
138 fn remove_entry(&self, key: &str) {
140 if let Some((_, entry)) = self.data.remove(key) {
141 self.ttl_index.write().remove(key);
143
144 for tag in &entry.tags {
146 if let Some(mut keys) = self.tag_index.get_mut(tag) {
147 keys.remove(key);
148 }
149 }
150
151 for dep in &entry.dependencies {
153 if let Some(mut dependents) = self.dep_index.get_mut(dep) {
154 dependents.remove(key);
155 }
156 }
157 }
158 }
159
160 pub fn cleanup_expired(&self) -> usize {
162 let expired = self.ttl_index.write().tick();
163 let mut count = 0;
164
165 for key in expired {
166 if let Some(entry) = self.data.get(&key) {
167 if entry.is_expired() && !entry.is_stale() {
168 drop(entry);
169 self.remove_entry(&key);
170 self.stats.write().evictions += 1;
171 count += 1;
172 }
173 }
174 }
175
176 count
177 }
178
179 pub fn memory_usage(&self) -> usize {
181 self.data
182 .iter()
183 .map(|entry| entry.size + entry.key().len())
184 .sum()
185 }
186}
187
188#[async_trait]
189impl CacheBackend for MemoryBackend {
190 async fn get(&self, key: &str) -> Result<Option<CacheEntry<Vec<u8>>>> {
191 match self.data.get_mut(key) {
192 Some(mut entry) => {
193 if entry.is_expired() && !entry.is_stale() {
195 drop(entry);
196 self.remove_entry(key);
197 self.stats.write().misses += 1;
198 return Ok(None);
199 }
200
201 entry.last_accessed = SystemTime::now();
203 entry.access_count += 1;
204
205 let mut stats = self.stats.write();
207 if entry.is_stale() {
208 stats.stale_hits += 1;
209 } else {
210 stats.hits += 1;
211 }
212
213 Ok(Some(entry.clone()))
214 }
215 None => {
216 self.stats.write().misses += 1;
217 Ok(None)
218 }
219 }
220 }
221
222 async fn set(&self, key: &str, value: Vec<u8>, options: &CacheOptions) -> Result<()> {
223 self.maybe_evict();
224
225 let size = value.len();
226 let now = SystemTime::now();
227
228 let entry = CacheEntry {
229 value,
230 created_at: now,
231 last_accessed: now,
232 access_count: 0,
233 ttl: options.ttl,
234 stale_while_revalidate: options.stale_while_revalidate,
235 tags: options.tags.clone(),
236 dependencies: options.dependencies.clone(),
237 cost: options.cost.unwrap_or(1),
238 size,
239 etag: options.etag.clone(),
240 version: 0,
241 };
242
243 if self.config.enable_ttl_index {
245 if let Some(ttl) = options.ttl {
246 let total_ttl = ttl + options.stale_while_revalidate.unwrap_or_default();
247 self.ttl_index.write().schedule(key.to_string(), total_ttl);
248 }
249 }
250
251 for tag in &options.tags {
253 self.tag_index
254 .entry(tag.clone())
255 .or_insert_with(HashSet::new)
256 .insert(key.to_string());
257 }
258
259 for dep in &options.dependencies {
261 self.dep_index
262 .entry(dep.clone())
263 .or_insert_with(HashSet::new)
264 .insert(key.to_string());
265 }
266
267 if let Some(old_entry) = self.data.insert(key.to_string(), entry) {
268 for dep in old_entry.dependencies {
270 if !options.dependencies.contains(&dep) {
271 if let Some(mut dependents) = self.dep_index.get_mut(&dep) {
272 dependents.remove(key);
273 }
274 }
275 }
276 }
277
278 self.stats.write().writes += 1;
279
280 Ok(())
281 }
282
283 async fn delete(&self, key: &str) -> Result<bool> {
284 if self.data.contains_key(key) {
285 self.remove_entry(key);
286 self.stats.write().deletes += 1;
287 Ok(true)
288 } else {
289 Ok(false)
290 }
291 }
292
293 async fn exists(&self, key: &str) -> Result<bool> {
294 match self.data.get(key) {
295 Some(entry) => Ok(!entry.is_expired() || entry.is_stale()),
296 None => Ok(false),
297 }
298 }
299
300 async fn delete_many(&self, keys: &[&str]) -> Result<u64> {
301 let mut count = 0;
302 for key in keys {
303 if self.delete(key).await? {
304 count += 1;
305 }
306 }
307 Ok(count)
308 }
309
310 async fn get_many(&self, keys: &[&str]) -> Result<Vec<Option<CacheEntry<Vec<u8>>>>> {
311 let mut results = Vec::with_capacity(keys.len());
312 for key in keys {
313 results.push(self.get(key).await?);
314 }
315 Ok(results)
316 }
317
318 async fn set_many(&self, entries: &[(&str, Vec<u8>, &CacheOptions)]) -> Result<()> {
319 for (key, value, options) in entries {
320 self.set(key, value.clone(), options).await?;
321 }
322 Ok(())
323 }
324
325 async fn clear(&self) -> Result<()> {
326 self.data.clear();
327 self.tag_index.clear();
328 self.dep_index.clear();
329 *self.ttl_index.write() = TtlIndex::new(Duration::from_secs(1), self.config.max_ttl);
330 Ok(())
331 }
332
333 async fn stats(&self) -> Result<CacheStats> {
334 let stats = self.stats.read();
335 Ok(CacheStats {
336 hits: stats.hits,
337 misses: stats.misses,
338 stale_hits: stats.stale_hits,
339 writes: stats.writes,
340 deletes: stats.deletes,
341 evictions: stats.evictions,
342 size: self.data.len(),
343 memory_bytes: self.memory_usage(),
344 })
345 }
346
347 async fn len(&self) -> Result<usize> {
348 Ok(self.data.len())
349 }
350}
351
352
353
354#[async_trait]
355impl TaggableBackend for MemoryBackend {
356 async fn get_by_tag(&self, tag: &str) -> Result<Vec<String>> {
357 if let Some(keys) = self.tag_index.get(tag) {
358 Ok(keys.iter().cloned().collect())
359 } else {
360 Ok(Vec::new())
361 }
362 }
363
364 async fn delete_by_tag(&self, tag: &str) -> Result<u64> {
365 if let Some((_, keys)) = self.tag_index.remove(tag) {
367 let mut count = 0;
368 for key in keys {
369 if self.data.contains_key(&key) {
371 self.remove_entry(&key);
372 self.stats.write().deletes += 1;
373 count += 1;
374 }
375 }
376 Ok(count)
377 } else {
378 Ok(0)
379 }
380 }
381}
382
383#[async_trait]
384impl DependencyBackend for MemoryBackend {
385 async fn get_dependents(&self, key: &str) -> Result<Vec<String>> {
386 if let Some(dependents) = self.dep_index.get(key) {
387 Ok(dependents.iter().cloned().collect())
388 } else {
389 Ok(Vec::new())
390 }
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[tokio::test]
399 async fn test_basic_get_set() {
400 let backend = MemoryBackend::new(MemoryConfig::default());
401
402 let options = CacheOptions {
403 ttl: Some(Duration::from_secs(60)),
404 ..Default::default()
405 };
406
407 backend
408 .set("key1", b"value1".to_vec(), &options)
409 .await
410 .unwrap();
411
412 let result = backend.get("key1").await.unwrap();
413 assert!(result.is_some());
414 assert_eq!(result.unwrap().value, b"value1".to_vec());
415 }
416
417 #[tokio::test]
418 async fn test_delete() {
419 let backend = MemoryBackend::new(MemoryConfig::default());
420 let options = CacheOptions::default();
421
422 backend
423 .set("key1", b"value1".to_vec(), &options)
424 .await
425 .unwrap();
426 assert!(backend.exists("key1").await.unwrap());
427
428 let deleted = backend.delete("key1").await.unwrap();
429 assert!(deleted);
430 assert!(!backend.exists("key1").await.unwrap());
431 }
432
433 #[tokio::test]
434 async fn test_get_nonexistent() {
435 let backend = MemoryBackend::new(MemoryConfig::default());
436 let result = backend.get("nonexistent").await.unwrap();
437 assert!(result.is_none());
438 }
439
440 #[tokio::test]
441 async fn test_clear() {
442 let backend = MemoryBackend::new(MemoryConfig::default());
443 let options = CacheOptions::default();
444
445 backend
446 .set("key1", b"value1".to_vec(), &options)
447 .await
448 .unwrap();
449 backend
450 .set("key2", b"value2".to_vec(), &options)
451 .await
452 .unwrap();
453
454 assert_eq!(backend.len().await.unwrap(), 2);
455
456 backend.clear().await.unwrap();
457 assert_eq!(backend.len().await.unwrap(), 0);
458 }
459
460 #[tokio::test]
461 async fn test_stats() {
462 let backend = MemoryBackend::new(MemoryConfig::default());
463 let options = CacheOptions::default();
464
465 backend
466 .set("key1", b"value1".to_vec(), &options)
467 .await
468 .unwrap();
469 backend.get("key1").await.unwrap();
470 backend.get("nonexistent").await.unwrap();
471
472 let stats = backend.stats().await.unwrap();
473 assert_eq!(stats.hits, 1);
474 assert_eq!(stats.misses, 1);
475 assert_eq!(stats.writes, 1);
476 }
477
478 #[tokio::test]
479 async fn test_capacity_eviction() {
480 let config = MemoryConfig {
481 max_capacity: 2,
482 ..Default::default()
483 };
484 let backend = MemoryBackend::new(config);
485 let options = CacheOptions::default();
486
487 backend
488 .set("key1", b"value1".to_vec(), &options)
489 .await
490 .unwrap();
491 backend
492 .set("key2", b"value2".to_vec(), &options)
493 .await
494 .unwrap();
495 backend
496 .set("key3", b"value3".to_vec(), &options)
497 .await
498 .unwrap();
499
500 assert!(backend.len().await.unwrap() <= 2);
502 }
503
504 #[tokio::test]
505 async fn test_get_many() {
506 let backend = MemoryBackend::new(MemoryConfig::default());
507 let options = CacheOptions::default();
508
509 backend
510 .set("key1", b"value1".to_vec(), &options)
511 .await
512 .unwrap();
513 backend
514 .set("key2", b"value2".to_vec(), &options)
515 .await
516 .unwrap();
517
518 let results = backend.get_many(&["key1", "key2", "key3"]).await.unwrap();
519 assert_eq!(results.len(), 3);
520 assert!(results[0].is_some());
521 assert!(results[1].is_some());
522 assert!(results[2].is_none());
523 }
524
525 #[tokio::test]
526 async fn test_dependencies() {
527 use skp_cache_core::{DependencyBackend, CacheOptions};
528 let backend = MemoryBackend::new(MemoryConfig::default());
529
530 let mut opts = CacheOptions::default();
531 opts.dependencies = vec!["dep1".to_string(), "dep2".to_string()];
532
533 backend.set("key1", b"val".to_vec(), &opts).await.unwrap();
534
535 let deps1 = backend.get_dependents("dep1").await.unwrap();
536 assert!(deps1.contains(&"key1".to_string()));
537
538 let deps2 = backend.get_dependents("dep2").await.unwrap();
539 assert!(deps2.contains(&"key1".to_string()));
540
541 opts.dependencies = vec!["dep1".to_string(), "dep3".to_string()];
543 backend.set("key1", b"val".to_vec(), &opts).await.unwrap();
544
545 assert!(backend.get_dependents("dep1").await.unwrap().contains(&"key1".to_string()));
549 assert!(!backend.get_dependents("dep2").await.unwrap().contains(&"key1".to_string()));
550 assert!(backend.get_dependents("dep3").await.unwrap().contains(&"key1".to_string()));
551
552 backend.delete("key1").await.unwrap();
554 assert!(backend.get_dependents("dep1").await.unwrap().is_empty());
555 assert!(backend.get_dependents("dep3").await.unwrap().is_empty());
556 }
557}