multi_tier_cache/backends/
dashmap_cache.rs1use crate::traits::{CacheBackend, L2CacheBackend};
2use anyhow::Result;
3use bytes::Bytes;
4use dashmap::DashMap;
5use futures_util::future::BoxFuture;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tracing::{debug, info};
10
11#[derive(Debug, Clone)]
13struct CacheEntry {
14 value: Bytes,
15 expires_at: Option<Instant>,
16}
17
18impl CacheEntry {
19 fn new(value: Bytes, ttl: Duration) -> Self {
20 Self {
21 value,
22 expires_at: Some(Instant::now() + ttl),
23 }
24 }
25
26 fn is_expired(&self) -> bool {
27 self.expires_at
28 .is_some_and(|expires_at| Instant::now() > expires_at)
29 }
30}
31
32pub struct DashMapCache {
33 map: Arc<DashMap<String, CacheEntry>>,
35 hits: Arc<AtomicU64>,
37 misses: Arc<AtomicU64>,
39 sets: Arc<AtomicU64>,
41}
42
43impl DashMapCache {
44 pub fn new() -> Self {
46 info!("Initializing DashMap Cache (concurrent HashMap)");
47
48 Self {
49 map: Arc::new(DashMap::new()),
50 hits: Arc::new(AtomicU64::new(0)),
51 misses: Arc::new(AtomicU64::new(0)),
52 sets: Arc::new(AtomicU64::new(0)),
53 }
54 }
55
56 pub fn cleanup_expired(&self) -> usize {
58 let mut removed = 0;
59 self.map.retain(|_, entry| {
60 if entry.is_expired() {
61 removed += 1;
62 false
63 } else {
64 true
65 }
66 });
67 if removed > 0 {
68 debug!(count = removed, "[DashMap] Cleaned up expired entries");
69 }
70 removed
71 }
72
73 #[must_use]
75 pub fn len(&self) -> usize {
76 self.map.len()
77 }
78
79 #[must_use]
81 pub fn is_empty(&self) -> bool {
82 self.map.is_empty()
83 }
84}
85
86impl Default for DashMapCache {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl CacheBackend for DashMapCache {
96 fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
97 Box::pin(async move {
98 match self.map.get(key) {
99 Some(entry) => {
100 if entry.is_expired() {
101 drop(entry);
102 self.map.remove(key);
103 None
104 } else {
105 Some(entry.value.clone())
106 }
107 }
108 _ => None,
109 }
110 })
111 }
112
113 fn set_with_ttl<'a>(
114 &'a self,
115 key: &'a str,
116 value: Bytes,
117 ttl: Duration,
118 ) -> BoxFuture<'a, Result<()>> {
119 Box::pin(async move {
120 let entry = CacheEntry::new(value, ttl);
121 self.map.insert(key.to_string(), entry);
122 self.sets.fetch_add(1, Ordering::Relaxed);
123 debug!(key = %key, ttl_secs = %ttl.as_secs(), "[DashMap] Cached key bytes with TTL");
124 Ok(())
125 })
126 }
127
128 fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Result<()>> {
129 Box::pin(async move {
130 self.map.remove(key);
131 Ok(())
132 })
133 }
134
135 fn health_check(&self) -> BoxFuture<'_, bool> {
136 Box::pin(async move { true })
137 }
138
139 fn name(&self) -> &'static str {
140 "DashMap"
141 }
142}
143
144impl L2CacheBackend for DashMapCache {
145 fn get_with_ttl<'a>(
146 &'a self,
147 key: &'a str,
148 ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
149 Box::pin(async move {
150 if let Some(entry) = self.map.get(key) {
151 if entry.is_expired() {
152 drop(entry);
153 self.map.remove(key);
154 self.misses.fetch_add(1, Ordering::Relaxed);
155 None
156 } else {
157 let now = Instant::now();
158 if let Some(expires_at) = entry.expires_at {
159 let ttl = expires_at.checked_duration_since(now);
160 if ttl.is_none() {
161 drop(entry);
163 self.map.remove(key);
164 self.misses.fetch_add(1, Ordering::Relaxed);
165 return None;
166 }
167 self.hits.fetch_add(1, Ordering::Relaxed);
168 Some((entry.value.clone(), ttl))
169 } else {
170 self.hits.fetch_add(1, Ordering::Relaxed);
171 Some((entry.value.clone(), None))
172 }
173 }
174 } else {
175 self.misses.fetch_add(1, Ordering::Relaxed);
176 None
177 }
178 })
179 }
180}