1#[cfg(feature = "cache")]
4use dashmap::DashMap;
5#[cfg(feature = "cache")]
6use lru::LruCache;
7use std::collections::VecDeque;
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11
12#[cfg(feature = "async")]
13use tokio::sync::RwLock;
14
15use bytes::Bytes;
16use std::time::Duration;
17
18use super::CacheConfig;
19use super::metadata::{CacheEntry, CacheKey, CacheStats};
20use crate::error::{CacheError, CloudError, Result};
21
22#[cfg(feature = "cache")]
24pub struct LruTtlCache {
25 pub(crate) cache: Arc<RwLock<LruCache<CacheKey, CacheEntry>>>,
27 pub(crate) current_size: Arc<AtomicUsize>,
29 config: CacheConfig,
31 stats: CacheStats,
33}
34
35#[cfg(feature = "cache")]
36impl LruTtlCache {
37 pub fn new(config: CacheConfig) -> Result<Self> {
39 let capacity = NonZeroUsize::new(config.max_entries.max(1)).ok_or_else(|| {
40 CloudError::Cache(CacheError::Full {
41 message: "Invalid cache capacity".to_string(),
42 })
43 })?;
44
45 Ok(Self {
46 cache: Arc::new(RwLock::new(LruCache::new(capacity))),
47 current_size: Arc::new(AtomicUsize::new(0)),
48 config,
49 stats: CacheStats::default(),
50 })
51 }
52
53 pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
55 let mut cache = self.cache.write().await;
56
57 if let Some(entry) = cache.get_mut(key) {
58 if entry.is_expired() {
60 let size = entry.size;
61 cache.pop(key);
62 self.current_size.fetch_sub(size, Ordering::SeqCst);
63 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
64 return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
65 }
66
67 if let Some(max_age) = self.config.max_age {
69 if entry.age() > max_age {
70 let size = entry.size;
71 cache.pop(key);
72 self.current_size.fetch_sub(size, Ordering::SeqCst);
73 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
74 return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
75 }
76 }
77
78 entry.record_access();
79 self.stats.hits.fetch_add(1, Ordering::Relaxed);
80
81 let data = if entry.compressed {
82 self.decompress(&entry.data)?
83 } else {
84 entry.data.clone()
85 };
86
87 Ok(data)
88 } else {
89 self.stats.misses.fetch_add(1, Ordering::Relaxed);
90 Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
91 }
92 }
93
94 pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
96 let (final_data, is_compressed) =
97 if self.config.compress && data.len() > self.config.compress_threshold {
98 (self.compress(&data)?, true)
99 } else {
100 (data, false)
101 };
102
103 let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
104 CacheEntry::with_ttl(final_data.clone(), is_compressed, ttl_duration)
105 } else {
106 CacheEntry::new(final_data.clone(), is_compressed)
107 };
108
109 let entry_size = entry.size;
110 let mut cache = self.cache.write().await;
111
112 self.evict_expired(&mut cache).await;
114
115 while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
117 && !cache.is_empty()
118 {
119 if let Some((_, evicted_entry)) = cache.pop_lru() {
120 self.current_size
121 .fetch_sub(evicted_entry.size, Ordering::SeqCst);
122 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
123 }
124 }
125
126 if let Some(old_entry) = cache.put(key, entry) {
127 self.current_size
128 .fetch_sub(old_entry.size, Ordering::SeqCst);
129 }
130
131 self.current_size.fetch_add(entry_size, Ordering::SeqCst);
132 self.stats.writes.fetch_add(1, Ordering::Relaxed);
133
134 Ok(())
135 }
136
137 async fn evict_expired(&self, cache: &mut LruCache<CacheKey, CacheEntry>) {
139 let mut keys_to_remove = Vec::new();
140
141 for (key, entry) in cache.iter() {
142 if entry.is_expired() {
143 keys_to_remove.push(key.clone());
144 }
145 }
146
147 for key in keys_to_remove {
148 if let Some(entry) = cache.pop(&key) {
149 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
150 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
151 }
152 }
153 }
154
155 pub async fn remove(&self, key: &CacheKey) -> Result<()> {
157 let mut cache = self.cache.write().await;
158 if let Some(entry) = cache.pop(key) {
159 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
160 }
161 Ok(())
162 }
163
164 pub async fn clear(&self) -> Result<()> {
166 let mut cache = self.cache.write().await;
167 cache.clear();
168 self.current_size.store(0, Ordering::SeqCst);
169 Ok(())
170 }
171
172 #[must_use]
174 pub fn stats(&self) -> &CacheStats {
175 &self.stats
176 }
177
178 fn compress(&self, data: &Bytes) -> Result<Bytes> {
180 use flate2::Compression;
181 use flate2::write::GzEncoder;
182 use std::io::Write;
183
184 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
185 encoder.write_all(data).map_err(|e| {
186 CloudError::Cache(CacheError::Compression {
187 message: format!("Compression failed: {e}"),
188 })
189 })?;
190
191 let compressed = encoder.finish().map_err(|e| {
192 CloudError::Cache(CacheError::Compression {
193 message: format!("Compression failed: {e}"),
194 })
195 })?;
196
197 Ok(Bytes::from(compressed))
198 }
199
200 fn decompress(&self, data: &Bytes) -> Result<Bytes> {
202 use flate2::read::GzDecoder;
203 use std::io::Read;
204
205 let mut decoder = GzDecoder::new(&data[..]);
206 let mut decompressed = Vec::new();
207
208 decoder.read_to_end(&mut decompressed).map_err(|e| {
209 CloudError::Cache(CacheError::Decompression {
210 message: format!("Decompression failed: {e}"),
211 })
212 })?;
213
214 Ok(Bytes::from(decompressed))
215 }
216}
217
218#[cfg(feature = "cache")]
220pub struct LfuCache {
221 storage: Arc<DashMap<CacheKey, CacheEntry>>,
223 frequency_map: Arc<DashMap<CacheKey, u64>>,
225 min_frequency: Arc<AtomicU64>,
227 current_size: Arc<AtomicUsize>,
229 config: CacheConfig,
231 stats: CacheStats,
233}
234
235#[cfg(feature = "cache")]
236impl LfuCache {
237 pub fn new(config: CacheConfig) -> Self {
239 Self {
240 storage: Arc::new(DashMap::new()),
241 frequency_map: Arc::new(DashMap::new()),
242 min_frequency: Arc::new(AtomicU64::new(0)),
243 current_size: Arc::new(AtomicUsize::new(0)),
244 config,
245 stats: CacheStats::default(),
246 }
247 }
248
249 pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
251 if let Some(mut entry) = self.storage.get_mut(key) {
252 if entry.is_expired() {
254 drop(entry);
255 self.remove(key).await?;
256 return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
257 }
258
259 entry.record_access();
260 self.frequency_map
261 .entry(key.clone())
262 .and_modify(|f| *f += 1)
263 .or_insert(1);
264
265 self.stats.hits.fetch_add(1, Ordering::Relaxed);
266 Ok(entry.data.clone())
267 } else {
268 self.stats.misses.fetch_add(1, Ordering::Relaxed);
269 Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
270 }
271 }
272
273 pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
275 let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
276 CacheEntry::with_ttl(data, false, ttl_duration)
277 } else {
278 CacheEntry::new(data, false)
279 };
280
281 let entry_size = entry.size;
282
283 while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
285 && !self.storage.is_empty()
286 {
287 self.evict_lfu().await;
288 }
289
290 if let Some(old_entry) = self.storage.insert(key.clone(), entry) {
291 self.current_size
292 .fetch_sub(old_entry.size, Ordering::SeqCst);
293 }
294
295 self.current_size.fetch_add(entry_size, Ordering::SeqCst);
296 self.frequency_map.insert(key, 1);
297 self.min_frequency.store(1, Ordering::SeqCst);
298 self.stats.writes.fetch_add(1, Ordering::Relaxed);
299
300 Ok(())
301 }
302
303 async fn evict_lfu(&self) {
305 let mut min_freq = u64::MAX;
307 let mut min_key: Option<String> = None;
308
309 for entry in self.frequency_map.iter() {
310 if *entry.value() < min_freq {
311 min_freq = *entry.value();
312 min_key = Some(entry.key().clone());
313 }
314 }
315
316 if let Some(key) = min_key {
317 if let Some((_, entry)) = self.storage.remove(&key) {
318 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
319 self.frequency_map.remove(&key);
320 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
321 }
322 }
323 }
324
325 pub async fn remove(&self, key: &CacheKey) -> Result<()> {
327 if let Some((_, entry)) = self.storage.remove(key) {
328 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
329 self.frequency_map.remove(key);
330 }
331 Ok(())
332 }
333
334 pub async fn clear(&self) -> Result<()> {
336 self.storage.clear();
337 self.frequency_map.clear();
338 self.current_size.store(0, Ordering::SeqCst);
339 self.min_frequency.store(0, Ordering::SeqCst);
340 Ok(())
341 }
342
343 #[must_use]
345 pub fn stats(&self) -> &CacheStats {
346 &self.stats
347 }
348}
349
350#[cfg(feature = "cache")]
354pub struct ArcCache {
355 t1: Arc<RwLock<VecDeque<CacheKey>>>,
357 t2: Arc<RwLock<VecDeque<CacheKey>>>,
359 b1: Arc<RwLock<VecDeque<CacheKey>>>,
361 b2: Arc<RwLock<VecDeque<CacheKey>>>,
363 storage: Arc<DashMap<CacheKey, CacheEntry>>,
365 p: Arc<RwLock<f64>>,
367 c: usize,
369 current_size: Arc<AtomicUsize>,
371 config: CacheConfig,
373 stats: CacheStats,
375}
376
377#[cfg(feature = "cache")]
378impl ArcCache {
379 pub fn new(config: CacheConfig) -> Self {
381 let c = config.max_entries;
382 Self {
383 t1: Arc::new(RwLock::new(VecDeque::new())),
384 t2: Arc::new(RwLock::new(VecDeque::new())),
385 b1: Arc::new(RwLock::new(VecDeque::new())),
386 b2: Arc::new(RwLock::new(VecDeque::new())),
387 storage: Arc::new(DashMap::new()),
388 p: Arc::new(RwLock::new(0.0)),
389 c,
390 current_size: Arc::new(AtomicUsize::new(0)),
391 config,
392 stats: CacheStats::default(),
393 }
394 }
395
396 pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
398 if let Some(mut entry) = self.storage.get_mut(key) {
400 if entry.is_expired() {
401 drop(entry);
402 self.remove(key).await?;
403 return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
404 }
405
406 entry.record_access();
407
408 let mut t1 = self.t1.write().await;
410 if let Some(pos) = t1.iter().position(|k| k == key) {
411 t1.remove(pos);
412 let mut t2 = self.t2.write().await;
413 t2.push_back(key.clone());
414 }
415
416 self.stats.hits.fetch_add(1, Ordering::Relaxed);
417 Ok(entry.data.clone())
418 } else {
419 self.stats.misses.fetch_add(1, Ordering::Relaxed);
420 Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
421 }
422 }
423
424 pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
426 let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
427 CacheEntry::with_ttl(data, false, ttl_duration)
428 } else {
429 CacheEntry::new(data, false)
430 };
431
432 let entry_size = entry.size;
433
434 let in_b1 = {
436 let b1 = self.b1.read().await;
437 b1.contains(&key)
438 };
439 let in_b2 = {
440 let b2 = self.b2.read().await;
441 b2.contains(&key)
442 };
443
444 if in_b1 {
445 let b1_len = self.b1.read().await.len();
448 let b2_len = self.b2.read().await.len();
449 let delta = if b1_len >= b2_len {
450 1.0
451 } else {
452 b2_len as f64 / b1_len as f64
453 };
454 let mut p = self.p.write().await;
455 *p = (*p + delta).min(self.c as f64);
456
457 let mut b1 = self.b1.write().await;
459 if let Some(pos) = b1.iter().position(|k| k == &key) {
460 b1.remove(pos);
461 }
462 } else if in_b2 {
463 let b1_len = self.b1.read().await.len();
466 let b2_len = self.b2.read().await.len();
467 let delta = if b2_len >= b1_len {
468 1.0
469 } else {
470 b1_len as f64 / b2_len as f64
471 };
472 let mut p = self.p.write().await;
473 *p = (*p - delta).max(0.0);
474
475 let mut b2 = self.b2.write().await;
477 if let Some(pos) = b2.iter().position(|k| k == &key) {
478 b2.remove(pos);
479 }
480 }
481
482 while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size {
484 self.replace(&key).await;
485 }
486
487 let mut t1 = self.t1.write().await;
489 t1.push_back(key.clone());
490
491 if let Some(old) = self.storage.insert(key, entry) {
492 self.current_size.fetch_sub(old.size, Ordering::SeqCst);
493 }
494 self.current_size.fetch_add(entry_size, Ordering::SeqCst);
495 self.stats.writes.fetch_add(1, Ordering::Relaxed);
496
497 Ok(())
498 }
499
500 async fn replace(&self, _key: &CacheKey) {
502 let t1_len = self.t1.read().await.len();
503 let p = *self.p.read().await;
504
505 if !self.storage.is_empty() {
506 if t1_len > 0 && (t1_len as f64 > p || self.t2.read().await.is_empty()) {
507 let mut t1 = self.t1.write().await;
509 if let Some(evict_key) = t1.pop_front() {
510 if let Some((_, entry)) = self.storage.remove(&evict_key) {
511 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
512 }
513 let mut b1 = self.b1.write().await;
515 b1.push_back(evict_key);
516 while b1.len() > self.c {
518 b1.pop_front();
519 }
520 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
521 }
522 } else {
523 let mut t2 = self.t2.write().await;
525 if let Some(evict_key) = t2.pop_front() {
526 if let Some((_, entry)) = self.storage.remove(&evict_key) {
527 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
528 }
529 let mut b2 = self.b2.write().await;
531 b2.push_back(evict_key);
532 while b2.len() > self.c {
534 b2.pop_front();
535 }
536 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
537 }
538 }
539 }
540 }
541
542 pub async fn remove(&self, key: &CacheKey) -> Result<()> {
544 if let Some((_, entry)) = self.storage.remove(key) {
545 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
546 }
547
548 {
550 let mut t1 = self.t1.write().await;
551 if let Some(pos) = t1.iter().position(|k| k == key) {
552 t1.remove(pos);
553 }
554 }
555 {
556 let mut t2 = self.t2.write().await;
557 if let Some(pos) = t2.iter().position(|k| k == key) {
558 t2.remove(pos);
559 }
560 }
561
562 Ok(())
563 }
564
565 pub async fn clear(&self) -> Result<()> {
567 self.storage.clear();
568 self.t1.write().await.clear();
569 self.t2.write().await.clear();
570 self.b1.write().await.clear();
571 self.b2.write().await.clear();
572 self.current_size.store(0, Ordering::SeqCst);
573 *self.p.write().await = 0.0;
574 Ok(())
575 }
576
577 #[must_use]
579 pub fn stats(&self) -> &CacheStats {
580 &self.stats
581 }
582}