Skip to main content

multi_tier_cache/backends/
dashmap_cache.rs

1use crate::error::CacheResult;
2use crate::traits::{CacheBackend, L2CacheBackend};
3use bytes::Bytes;
4use dashmap::DashMap;
5use futures_util::future::BoxFuture;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tracing::{debug, info};
10
11/// Cache entry with expiration tracking
12#[derive(Debug, Clone)]
13struct CacheEntry {
14    value: Bytes,
15    expires_at: Option<Instant>,
16}
17
18impl CacheEntry {
19    fn new(value: Bytes, ttl: Duration) -> Self {
20        Self {
21            value,
22            expires_at: Some(Instant::now() + ttl),
23        }
24    }
25
26    fn is_expired(&self) -> bool {
27        self.expires_at
28            .is_some_and(|expires_at| Instant::now() > expires_at)
29    }
30}
31
32pub struct DashMapCache {
33    /// Concurrent `HashMap`
34    map: Arc<DashMap<String, CacheEntry>>,
35    /// Hit counter
36    hits: Arc<AtomicU64>,
37    /// Miss counter
38    misses: Arc<AtomicU64>,
39    /// Set counter
40    sets: Arc<AtomicU64>,
41}
42
43impl DashMapCache {
44    /// Create new `DashMap` cache
45    pub fn new() -> Self {
46        info!("Initializing DashMap Cache (concurrent HashMap)");
47
48        Self {
49            map: Arc::new(DashMap::new()),
50            hits: Arc::new(AtomicU64::new(0)),
51            misses: Arc::new(AtomicU64::new(0)),
52            sets: Arc::new(AtomicU64::new(0)),
53        }
54    }
55
56    /// Cleanup expired entries
57    pub fn cleanup_expired(&self) -> usize {
58        let mut removed = 0;
59        self.map.retain(|_, entry| {
60            if entry.is_expired() {
61                removed += 1;
62                false
63            } else {
64                true
65            }
66        });
67        if removed > 0 {
68            debug!(count = removed, "[DashMap] Cleaned up expired entries");
69        }
70        removed
71    }
72
73    /// Get current cache size
74    #[must_use]
75    pub fn len(&self) -> usize {
76        self.map.len()
77    }
78
79    /// Check if cache is empty
80    #[must_use]
81    pub fn is_empty(&self) -> bool {
82        self.map.is_empty()
83    }
84}
85
86impl Default for DashMapCache {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92// ===== Trait Implementations =====
93
94/// Implement `CacheBackend` trait for `DashMapCache`
95impl CacheBackend for DashMapCache {
96    fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
97        Box::pin(async move {
98            match self.map.get(key) {
99                Some(entry) => {
100                    if entry.is_expired() {
101                        drop(entry);
102                        self.map.remove(key);
103                        None
104                    } else {
105                        Some(entry.value.clone())
106                    }
107                }
108                None => None,
109            }
110        })
111    }
112
113    fn set_with_ttl<'a>(
114        &'a self,
115        key: &'a str,
116        value: Bytes,
117        ttl: Duration,
118    ) -> BoxFuture<'a, CacheResult<()>> {
119        Box::pin(async move {
120            let entry = CacheEntry::new(value, ttl);
121            self.map.insert(key.to_string(), entry);
122            self.sets.fetch_add(1, Ordering::Relaxed);
123            debug!(key = %key, ttl_secs = %ttl.as_secs(), "[DashMap] Cached key bytes with TTL");
124            Ok(())
125        })
126    }
127
128    fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
129        Box::pin(async move {
130            self.map.remove(key);
131            Ok(())
132        })
133    }
134
135    fn health_check(&self) -> BoxFuture<'_, bool> {
136        Box::pin(async move { true })
137    }
138
139    fn name(&self) -> &'static str {
140        "DashMap"
141    }
142}
143
144impl L2CacheBackend for DashMapCache {
145    fn get_with_ttl<'a>(
146        &'a self,
147        key: &'a str,
148    ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
149        Box::pin(async move {
150            if let Some(entry) = self.map.get(key) {
151                if entry.is_expired() {
152                    drop(entry);
153                    self.map.remove(key);
154                    self.misses.fetch_add(1, Ordering::Relaxed);
155                    None
156                } else {
157                    let now = Instant::now();
158                    if let Some(expires_at) = entry.expires_at {
159                        let ttl = expires_at.checked_duration_since(now);
160                        if ttl.is_none() {
161                            // Expired
162                            drop(entry);
163                            self.map.remove(key);
164                            self.misses.fetch_add(1, Ordering::Relaxed);
165                            return None;
166                        }
167                        self.hits.fetch_add(1, Ordering::Relaxed);
168                        Some((entry.value.clone(), ttl))
169                    } else {
170                        self.hits.fetch_add(1, Ordering::Relaxed);
171                        Some((entry.value.clone(), None))
172                    }
173                }
174            } else {
175                self.misses.fetch_add(1, Ordering::Relaxed);
176                None
177            }
178        })
179    }
180}