multi_tier_cache/backends/
dashmap_cache.rs1use std::sync::Arc;
7use std::time::{Duration, Instant};
8use anyhow::Result;
9use serde_json;
10use dashmap::DashMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use tracing::{info, debug};
13
14#[derive(Debug, Clone)]
16struct CacheEntry {
17 value: serde_json::Value,
18 expires_at: Option<Instant>,
19}
20
21impl CacheEntry {
22 fn new(value: serde_json::Value, ttl: Duration) -> Self {
23 Self {
24 value,
25 expires_at: Some(Instant::now() + ttl),
26 }
27 }
28
29 fn is_expired(&self) -> bool {
30 match self.expires_at {
31 Some(expires_at) => Instant::now() > expires_at,
32 None => false,
33 }
34 }
35}
36
37pub struct DashMapCache {
74 map: Arc<DashMap<String, CacheEntry>>,
76 hits: Arc<AtomicU64>,
78 misses: Arc<AtomicU64>,
80 sets: Arc<AtomicU64>,
82}
83
84impl DashMapCache {
85 pub fn new() -> Self {
87 info!("Initializing DashMap Cache (concurrent HashMap)");
88
89 Self {
90 map: Arc::new(DashMap::new()),
91 hits: Arc::new(AtomicU64::new(0)),
92 misses: Arc::new(AtomicU64::new(0)),
93 sets: Arc::new(AtomicU64::new(0)),
94 }
95 }
96
97 pub fn cleanup_expired(&self) -> usize {
102 let mut removed = 0;
103 self.map.retain(|_, entry| {
104 if entry.is_expired() {
105 removed += 1;
106 false } else {
108 true }
110 });
111 if removed > 0 {
112 debug!(count = removed, "[DashMap] Cleaned up expired entries");
113 }
114 removed
115 }
116
117 pub fn len(&self) -> usize {
119 self.map.len()
120 }
121
122 pub fn is_empty(&self) -> bool {
124 self.map.is_empty()
125 }
126}
127
128impl Default for DashMapCache {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134use crate::traits::CacheBackend;
137use async_trait::async_trait;
138
139#[async_trait]
141impl CacheBackend for DashMapCache {
142 async fn get(&self, key: &str) -> Option<serde_json::Value> {
143 match self.map.get(key) {
144 Some(entry) => {
145 if entry.is_expired() {
146 drop(entry); self.map.remove(key);
149 self.misses.fetch_add(1, Ordering::Relaxed);
150 None
151 } else {
152 self.hits.fetch_add(1, Ordering::Relaxed);
153 Some(entry.value.clone())
154 }
155 }
156 None => {
157 self.misses.fetch_add(1, Ordering::Relaxed);
158 None
159 }
160 }
161 }
162
163 async fn set_with_ttl(
164 &self,
165 key: &str,
166 value: serde_json::Value,
167 ttl: Duration,
168 ) -> Result<()> {
169 let entry = CacheEntry::new(value, ttl);
170 self.map.insert(key.to_string(), entry);
171 self.sets.fetch_add(1, Ordering::Relaxed);
172 debug!(key = %key, ttl_secs = %ttl.as_secs(), "[DashMap] Cached key with TTL");
173 Ok(())
174 }
175
176 async fn remove(&self, key: &str) -> Result<()> {
177 self.map.remove(key);
178 Ok(())
179 }
180
181 async fn health_check(&self) -> bool {
182 let test_key = "health_check_dashmap";
183 let test_value = serde_json::json!({"test": true});
184
185 match self.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(60)).await {
186 Ok(_) => {
187 match self.get(test_key).await {
188 Some(retrieved) => {
189 let _ = self.remove(test_key).await;
190 retrieved == test_value
191 }
192 None => false
193 }
194 }
195 Err(_) => false
196 }
197 }
198
199 fn name(&self) -> &str {
200 "DashMap"
201 }
202}