1#[cfg(feature = "cache")]
4use dashmap::DashMap;
5#[cfg(feature = "cache")]
6use lru::LruCache;
7use std::collections::VecDeque;
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11
12#[cfg(feature = "async")]
13use tokio::sync::RwLock;
14
15use bytes::Bytes;
16use std::time::Duration;
17
18use super::CacheConfig;
19use super::metadata::{CacheEntry, CacheKey, CacheStats};
20use crate::error::{CacheError, CloudError, Result};
21
22#[cfg(feature = "cache")]
24pub struct LruTtlCache {
25 pub(crate) cache: Arc<RwLock<LruCache<CacheKey, CacheEntry>>>,
27 pub(crate) current_size: Arc<AtomicUsize>,
29 config: CacheConfig,
31 stats: CacheStats,
33}
34
35#[cfg(feature = "cache")]
36impl LruTtlCache {
37 pub fn new(config: CacheConfig) -> Result<Self> {
39 let capacity = NonZeroUsize::new(config.max_entries.max(1)).ok_or_else(|| {
40 CloudError::Cache(CacheError::Full {
41 message: "Invalid cache capacity".to_string(),
42 })
43 })?;
44
45 Ok(Self {
46 cache: Arc::new(RwLock::new(LruCache::new(capacity))),
47 current_size: Arc::new(AtomicUsize::new(0)),
48 config,
49 stats: CacheStats::default(),
50 })
51 }
52
53 pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
55 let mut cache = self.cache.write().await;
56
57 if let Some(entry) = cache.get_mut(key) {
58 if entry.is_expired() {
60 let size = entry.size;
61 cache.pop(key);
62 self.current_size.fetch_sub(size, Ordering::SeqCst);
63 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
64 return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
65 }
66
67 if let Some(max_age) = self.config.max_age {
69 if entry.age() > max_age {
70 let size = entry.size;
71 cache.pop(key);
72 self.current_size.fetch_sub(size, Ordering::SeqCst);
73 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
74 return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
75 }
76 }
77
78 entry.record_access();
79 self.stats.hits.fetch_add(1, Ordering::Relaxed);
80
81 let data = if entry.compressed {
82 self.decompress(&entry.data)?
83 } else {
84 entry.data.clone()
85 };
86
87 Ok(data)
88 } else {
89 self.stats.misses.fetch_add(1, Ordering::Relaxed);
90 Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
91 }
92 }
93
94 pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
96 let (final_data, is_compressed) =
97 if self.config.compress && data.len() > self.config.compress_threshold {
98 (self.compress(&data)?, true)
99 } else {
100 (data, false)
101 };
102
103 let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
104 CacheEntry::with_ttl(final_data.clone(), is_compressed, ttl_duration)
105 } else {
106 CacheEntry::new(final_data.clone(), is_compressed)
107 };
108
109 let entry_size = entry.size;
110 let mut cache = self.cache.write().await;
111
112 self.evict_expired(&mut cache).await;
114
115 while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
117 && !cache.is_empty()
118 {
119 if let Some((_, evicted_entry)) = cache.pop_lru() {
120 self.current_size
121 .fetch_sub(evicted_entry.size, Ordering::SeqCst);
122 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
123 }
124 }
125
126 if let Some(old_entry) = cache.put(key, entry) {
127 self.current_size
128 .fetch_sub(old_entry.size, Ordering::SeqCst);
129 }
130
131 self.current_size.fetch_add(entry_size, Ordering::SeqCst);
132 self.stats.writes.fetch_add(1, Ordering::Relaxed);
133
134 Ok(())
135 }
136
137 async fn evict_expired(&self, cache: &mut LruCache<CacheKey, CacheEntry>) {
139 let mut keys_to_remove = Vec::new();
140
141 for (key, entry) in cache.iter() {
142 if entry.is_expired() {
143 keys_to_remove.push(key.clone());
144 }
145 }
146
147 for key in keys_to_remove {
148 if let Some(entry) = cache.pop(&key) {
149 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
150 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
151 }
152 }
153 }
154
155 pub async fn remove(&self, key: &CacheKey) -> Result<()> {
157 let mut cache = self.cache.write().await;
158 if let Some(entry) = cache.pop(key) {
159 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
160 }
161 Ok(())
162 }
163
164 pub async fn clear(&self) -> Result<()> {
166 let mut cache = self.cache.write().await;
167 cache.clear();
168 self.current_size.store(0, Ordering::SeqCst);
169 Ok(())
170 }
171
172 #[must_use]
174 pub fn stats(&self) -> &CacheStats {
175 &self.stats
176 }
177
178 fn compress(&self, data: &Bytes) -> Result<Bytes> {
180 oxiarc_archive::gzip::compress(data, 6)
181 .map(Bytes::from)
182 .map_err(|e| {
183 CloudError::Cache(CacheError::Compression {
184 message: format!("Compression failed: {e}"),
185 })
186 })
187 }
188
189 fn decompress(&self, data: &Bytes) -> Result<Bytes> {
191 let mut reader = std::io::Cursor::new(data.as_ref());
192 oxiarc_archive::gzip::decompress(&mut reader)
193 .map(Bytes::from)
194 .map_err(|e| {
195 CloudError::Cache(CacheError::Decompression {
196 message: format!("Decompression failed: {e}"),
197 })
198 })
199 }
200}
201
202#[cfg(feature = "cache")]
204pub struct LfuCache {
205 storage: Arc<DashMap<CacheKey, CacheEntry>>,
207 frequency_map: Arc<DashMap<CacheKey, u64>>,
209 min_frequency: Arc<AtomicU64>,
211 current_size: Arc<AtomicUsize>,
213 config: CacheConfig,
215 stats: CacheStats,
217}
218
219#[cfg(feature = "cache")]
220impl LfuCache {
221 pub fn new(config: CacheConfig) -> Self {
223 Self {
224 storage: Arc::new(DashMap::new()),
225 frequency_map: Arc::new(DashMap::new()),
226 min_frequency: Arc::new(AtomicU64::new(0)),
227 current_size: Arc::new(AtomicUsize::new(0)),
228 config,
229 stats: CacheStats::default(),
230 }
231 }
232
233 pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
235 if let Some(mut entry) = self.storage.get_mut(key) {
236 if entry.is_expired() {
238 drop(entry);
239 self.remove(key).await?;
240 return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
241 }
242
243 entry.record_access();
244 self.frequency_map
245 .entry(key.clone())
246 .and_modify(|f| *f += 1)
247 .or_insert(1);
248
249 self.stats.hits.fetch_add(1, Ordering::Relaxed);
250 Ok(entry.data.clone())
251 } else {
252 self.stats.misses.fetch_add(1, Ordering::Relaxed);
253 Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
254 }
255 }
256
257 pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
259 let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
260 CacheEntry::with_ttl(data, false, ttl_duration)
261 } else {
262 CacheEntry::new(data, false)
263 };
264
265 let entry_size = entry.size;
266
267 while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
269 && !self.storage.is_empty()
270 {
271 self.evict_lfu().await;
272 }
273
274 if let Some(old_entry) = self.storage.insert(key.clone(), entry) {
275 self.current_size
276 .fetch_sub(old_entry.size, Ordering::SeqCst);
277 }
278
279 self.current_size.fetch_add(entry_size, Ordering::SeqCst);
280 self.frequency_map.insert(key, 1);
281 self.min_frequency.store(1, Ordering::SeqCst);
282 self.stats.writes.fetch_add(1, Ordering::Relaxed);
283
284 Ok(())
285 }
286
287 async fn evict_lfu(&self) {
289 let mut min_freq = u64::MAX;
291 let mut min_key: Option<String> = None;
292
293 for entry in self.frequency_map.iter() {
294 if *entry.value() < min_freq {
295 min_freq = *entry.value();
296 min_key = Some(entry.key().clone());
297 }
298 }
299
300 if let Some(key) = min_key {
301 if let Some((_, entry)) = self.storage.remove(&key) {
302 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
303 self.frequency_map.remove(&key);
304 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
305 }
306 }
307 }
308
309 pub async fn remove(&self, key: &CacheKey) -> Result<()> {
311 if let Some((_, entry)) = self.storage.remove(key) {
312 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
313 self.frequency_map.remove(key);
314 }
315 Ok(())
316 }
317
318 pub async fn clear(&self) -> Result<()> {
320 self.storage.clear();
321 self.frequency_map.clear();
322 self.current_size.store(0, Ordering::SeqCst);
323 self.min_frequency.store(0, Ordering::SeqCst);
324 Ok(())
325 }
326
327 #[must_use]
329 pub fn stats(&self) -> &CacheStats {
330 &self.stats
331 }
332}
333
334#[cfg(feature = "cache")]
338pub struct ArcCache {
339 t1: Arc<RwLock<VecDeque<CacheKey>>>,
341 t2: Arc<RwLock<VecDeque<CacheKey>>>,
343 b1: Arc<RwLock<VecDeque<CacheKey>>>,
345 b2: Arc<RwLock<VecDeque<CacheKey>>>,
347 storage: Arc<DashMap<CacheKey, CacheEntry>>,
349 p: Arc<RwLock<f64>>,
351 c: usize,
353 current_size: Arc<AtomicUsize>,
355 config: CacheConfig,
357 stats: CacheStats,
359}
360
361#[cfg(feature = "cache")]
362impl ArcCache {
363 pub fn new(config: CacheConfig) -> Self {
365 let c = config.max_entries;
366 Self {
367 t1: Arc::new(RwLock::new(VecDeque::new())),
368 t2: Arc::new(RwLock::new(VecDeque::new())),
369 b1: Arc::new(RwLock::new(VecDeque::new())),
370 b2: Arc::new(RwLock::new(VecDeque::new())),
371 storage: Arc::new(DashMap::new()),
372 p: Arc::new(RwLock::new(0.0)),
373 c,
374 current_size: Arc::new(AtomicUsize::new(0)),
375 config,
376 stats: CacheStats::default(),
377 }
378 }
379
380 pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
382 if let Some(mut entry) = self.storage.get_mut(key) {
384 if entry.is_expired() {
385 drop(entry);
386 self.remove(key).await?;
387 return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
388 }
389
390 entry.record_access();
391
392 let mut t1 = self.t1.write().await;
394 if let Some(pos) = t1.iter().position(|k| k == key) {
395 t1.remove(pos);
396 let mut t2 = self.t2.write().await;
397 t2.push_back(key.clone());
398 }
399
400 self.stats.hits.fetch_add(1, Ordering::Relaxed);
401 Ok(entry.data.clone())
402 } else {
403 self.stats.misses.fetch_add(1, Ordering::Relaxed);
404 Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
405 }
406 }
407
408 pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
410 let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
411 CacheEntry::with_ttl(data, false, ttl_duration)
412 } else {
413 CacheEntry::new(data, false)
414 };
415
416 let entry_size = entry.size;
417
418 let in_b1 = {
420 let b1 = self.b1.read().await;
421 b1.contains(&key)
422 };
423 let in_b2 = {
424 let b2 = self.b2.read().await;
425 b2.contains(&key)
426 };
427
428 if in_b1 {
429 let b1_len = self.b1.read().await.len();
432 let b2_len = self.b2.read().await.len();
433 let delta = if b1_len >= b2_len {
434 1.0
435 } else {
436 b2_len as f64 / b1_len as f64
437 };
438 let mut p = self.p.write().await;
439 *p = (*p + delta).min(self.c as f64);
440
441 let mut b1 = self.b1.write().await;
443 if let Some(pos) = b1.iter().position(|k| k == &key) {
444 b1.remove(pos);
445 }
446 } else if in_b2 {
447 let b1_len = self.b1.read().await.len();
450 let b2_len = self.b2.read().await.len();
451 let delta = if b2_len >= b1_len {
452 1.0
453 } else {
454 b1_len as f64 / b2_len as f64
455 };
456 let mut p = self.p.write().await;
457 *p = (*p - delta).max(0.0);
458
459 let mut b2 = self.b2.write().await;
461 if let Some(pos) = b2.iter().position(|k| k == &key) {
462 b2.remove(pos);
463 }
464 }
465
466 while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size {
468 self.replace(&key).await;
469 }
470
471 let mut t1 = self.t1.write().await;
473 t1.push_back(key.clone());
474
475 if let Some(old) = self.storage.insert(key, entry) {
476 self.current_size.fetch_sub(old.size, Ordering::SeqCst);
477 }
478 self.current_size.fetch_add(entry_size, Ordering::SeqCst);
479 self.stats.writes.fetch_add(1, Ordering::Relaxed);
480
481 Ok(())
482 }
483
484 async fn replace(&self, _key: &CacheKey) {
486 let t1_len = self.t1.read().await.len();
487 let p = *self.p.read().await;
488
489 if !self.storage.is_empty() {
490 if t1_len > 0 && (t1_len as f64 > p || self.t2.read().await.is_empty()) {
491 let mut t1 = self.t1.write().await;
493 if let Some(evict_key) = t1.pop_front() {
494 if let Some((_, entry)) = self.storage.remove(&evict_key) {
495 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
496 }
497 let mut b1 = self.b1.write().await;
499 b1.push_back(evict_key);
500 while b1.len() > self.c {
502 b1.pop_front();
503 }
504 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
505 }
506 } else {
507 let mut t2 = self.t2.write().await;
509 if let Some(evict_key) = t2.pop_front() {
510 if let Some((_, entry)) = self.storage.remove(&evict_key) {
511 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
512 }
513 let mut b2 = self.b2.write().await;
515 b2.push_back(evict_key);
516 while b2.len() > self.c {
518 b2.pop_front();
519 }
520 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
521 }
522 }
523 }
524 }
525
526 pub async fn remove(&self, key: &CacheKey) -> Result<()> {
528 if let Some((_, entry)) = self.storage.remove(key) {
529 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
530 }
531
532 {
534 let mut t1 = self.t1.write().await;
535 if let Some(pos) = t1.iter().position(|k| k == key) {
536 t1.remove(pos);
537 }
538 }
539 {
540 let mut t2 = self.t2.write().await;
541 if let Some(pos) = t2.iter().position(|k| k == key) {
542 t2.remove(pos);
543 }
544 }
545
546 Ok(())
547 }
548
549 pub async fn clear(&self) -> Result<()> {
551 self.storage.clear();
552 self.t1.write().await.clear();
553 self.t2.write().await.clear();
554 self.b1.write().await.clear();
555 self.b2.write().await.clear();
556 self.current_size.store(0, Ordering::SeqCst);
557 *self.p.write().await = 0.0;
558 Ok(())
559 }
560
561 #[must_use]
563 pub fn stats(&self) -> &CacheStats {
564 &self.stats
565 }
566}