1use crate::compression::{AdaptiveCompressor, CompressedData, CompressionCodec, DataType};
15use crate::error::{CacheError, Result};
16use crate::eviction::{EvictionPolicy, LruEviction};
17use crate::{CacheConfig, CacheStats};
18use async_trait::async_trait;
19use bytes::Bytes;
20use dashmap::DashMap;
21use std::collections::HashMap;
22use std::path::PathBuf;
23use std::sync::Arc;
24use tokio::fs;
25use tokio::io::{AsyncReadExt, AsyncWriteExt};
26use tokio::sync::RwLock;
27
28pub type CacheKey = String;
30
31#[derive(Debug, Clone)]
33pub struct CacheValue {
34 pub data: Bytes,
36 pub data_type: DataType,
38 pub created_at: chrono::DateTime<chrono::Utc>,
40 pub last_accessed: chrono::DateTime<chrono::Utc>,
42 pub access_count: u64,
44 pub size: usize,
46}
47
48impl CacheValue {
49 pub fn new(data: Bytes, data_type: DataType) -> Self {
51 let now = chrono::Utc::now();
52 let size = data.len();
53
54 Self {
55 data,
56 data_type,
57 created_at: now,
58 last_accessed: now,
59 access_count: 0,
60 size,
61 }
62 }
63
64 pub fn record_access(&mut self) {
66 self.last_accessed = chrono::Utc::now();
67 self.access_count += 1;
68 }
69
70 pub fn age_seconds(&self) -> i64 {
72 let now = chrono::Utc::now();
73 (now - self.created_at).num_seconds()
74 }
75
76 pub fn idle_seconds(&self) -> i64 {
78 let now = chrono::Utc::now();
79 (now - self.last_accessed).num_seconds()
80 }
81}
82
83#[async_trait]
85pub trait CacheTier: Send + Sync {
86 async fn get(&self, key: &CacheKey) -> Result<Option<CacheValue>>;
88
89 async fn put(&self, key: CacheKey, value: CacheValue) -> Result<()>;
91
92 async fn remove(&self, key: &CacheKey) -> Result<bool>;
94
95 async fn contains(&self, key: &CacheKey) -> bool;
97
98 async fn stats(&self) -> CacheStats;
100
101 async fn clear(&self) -> Result<()>;
103
104 fn name(&self) -> &str;
106
107 fn capacity(&self) -> usize;
109
110 async fn current_size(&self) -> usize;
112}
113
114pub struct L1MemoryTier {
116 cache: Arc<DashMap<CacheKey, CacheValue>>,
118 max_size: usize,
120 current_size: Arc<RwLock<usize>>,
122 eviction: Arc<RwLock<Box<dyn EvictionPolicy<CacheKey>>>>,
124 stats: Arc<RwLock<CacheStats>>,
126}
127
128impl L1MemoryTier {
129 pub fn new(max_size: usize) -> Self {
131 Self {
132 cache: Arc::new(DashMap::new()),
133 max_size,
134 current_size: Arc::new(RwLock::new(0)),
135 eviction: Arc::new(RwLock::new(Box::new(LruEviction::new()))),
136 stats: Arc::new(RwLock::new(CacheStats::new())),
137 }
138 }
139
140 async fn make_space(&self, needed: usize) -> Result<()> {
142 let mut current = self.current_size.write().await;
143
144 while *current + needed > self.max_size {
145 let mut eviction = self.eviction.write().await;
146
147 if let Some(victim_key) = eviction.select_victim() {
148 if let Some((_, victim_value)) = self.cache.remove(&victim_key) {
149 *current = current.saturating_sub(victim_value.size);
150 eviction.on_remove(&victim_key);
151
152 let mut stats = self.stats.write().await;
153 stats.evictions += 1;
154 stats.item_count = stats.item_count.saturating_sub(1);
155 } else {
156 continue;
158 }
159 } else {
160 return Err(CacheError::CacheFull("L1 cache full".to_string()));
162 }
163 }
164
165 Ok(())
166 }
167}
168
169#[async_trait]
170impl CacheTier for L1MemoryTier {
171 async fn get(&self, key: &CacheKey) -> Result<Option<CacheValue>> {
172 let mut stats = self.stats.write().await;
173
174 if let Some(mut entry) = self.cache.get_mut(key) {
175 entry.record_access();
176 stats.hits += 1;
177
178 let mut eviction = self.eviction.write().await;
179 eviction.on_access(key);
180
181 Ok(Some(entry.clone()))
182 } else {
183 stats.misses += 1;
184 Ok(None)
185 }
186 }
187
188 async fn put(&self, key: CacheKey, value: CacheValue) -> Result<()> {
189 let size = value.size;
190
191 self.make_space(size).await?;
193
194 self.cache.insert(key.clone(), value);
196
197 let mut current_size = self.current_size.write().await;
199 *current_size += size;
200
201 let mut eviction = self.eviction.write().await;
203 eviction.on_insert(key.clone(), size);
204
205 let mut stats = self.stats.write().await;
207 stats.bytes_stored = *current_size as u64;
208 stats.item_count += 1;
209
210 Ok(())
211 }
212
213 async fn remove(&self, key: &CacheKey) -> Result<bool> {
214 if let Some((_, value)) = self.cache.remove(key) {
215 let mut current_size = self.current_size.write().await;
216 *current_size = current_size.saturating_sub(value.size);
217
218 let mut eviction = self.eviction.write().await;
219 eviction.on_remove(key);
220
221 let mut stats = self.stats.write().await;
222 stats.bytes_stored = *current_size as u64;
223 stats.item_count = stats.item_count.saturating_sub(1);
224
225 Ok(true)
226 } else {
227 Ok(false)
228 }
229 }
230
231 async fn contains(&self, key: &CacheKey) -> bool {
232 self.cache.contains_key(key)
233 }
234
235 async fn stats(&self) -> CacheStats {
236 self.stats.read().await.clone()
237 }
238
239 async fn clear(&self) -> Result<()> {
240 self.cache.clear();
241
242 let mut current_size = self.current_size.write().await;
243 *current_size = 0;
244
245 let mut eviction = self.eviction.write().await;
246 eviction.clear();
247
248 let mut stats = self.stats.write().await;
249 *stats = CacheStats::new();
250
251 Ok(())
252 }
253
254 fn name(&self) -> &str {
255 "L1-Memory"
256 }
257
258 fn capacity(&self) -> usize {
259 self.max_size
260 }
261
262 async fn current_size(&self) -> usize {
263 *self.current_size.read().await
264 }
265}
266
267pub struct L2DiskTier {
269 cache_dir: PathBuf,
271 max_size: usize,
273 index: Arc<DashMap<CacheKey, CacheValue>>,
275 current_size: Arc<RwLock<usize>>,
277 eviction: Arc<RwLock<Box<dyn EvictionPolicy<CacheKey>>>>,
279 compressor: Arc<RwLock<AdaptiveCompressor>>,
281 stats: Arc<RwLock<CacheStats>>,
283}
284
285impl L2DiskTier {
286 pub async fn new(cache_dir: PathBuf, max_size: usize) -> Result<Self> {
288 fs::create_dir_all(&cache_dir).await?;
290
291 let tier = Self {
292 cache_dir,
293 max_size,
294 index: Arc::new(DashMap::new()),
295 current_size: Arc::new(RwLock::new(0)),
296 eviction: Arc::new(RwLock::new(Box::new(LruEviction::new()))),
297 compressor: Arc::new(RwLock::new(AdaptiveCompressor::new())),
298 stats: Arc::new(RwLock::new(CacheStats::new())),
299 };
300
301 tier.load_index().await?;
303
304 Ok(tier)
305 }
306
307 async fn load_index(&self) -> Result<()> {
309 let mut entries = fs::read_dir(&self.cache_dir).await?;
310 let mut total_size = 0;
311
312 while let Some(entry) = entries.next_entry().await? {
313 if let Ok(metadata) = entry.metadata().await {
314 if metadata.is_file() {
315 let file_size = metadata.len() as usize;
316 total_size += file_size;
317
318 if let Some(file_name) = entry.file_name().to_str() {
320 if file_name.ends_with(".cache") {
321 let key = file_name.trim_end_matches(".cache").to_string();
322
323 let value = CacheValue {
325 data: Bytes::new(),
326 data_type: DataType::Binary,
327 created_at: chrono::Utc::now(),
328 last_accessed: chrono::Utc::now(),
329 access_count: 0,
330 size: file_size,
331 };
332
333 self.index.insert(key.clone(), value);
334
335 let mut eviction = self.eviction.write().await;
336 eviction.on_insert(key, file_size);
337 }
338 }
339 }
340 }
341 }
342
343 let mut current_size = self.current_size.write().await;
344 *current_size = total_size;
345
346 let mut stats = self.stats.write().await;
347 stats.bytes_stored = total_size as u64;
348 stats.item_count = self.index.len();
349
350 Ok(())
351 }
352
353 fn get_file_path(&self, key: &CacheKey) -> PathBuf {
355 self.cache_dir.join(format!("{}.cache", key))
356 }
357
358 async fn make_space(&self, needed: usize) -> Result<()> {
360 let mut current = self.current_size.write().await;
361
362 while *current + needed > self.max_size {
363 let mut eviction = self.eviction.write().await;
364
365 if let Some(victim_key) = eviction.select_victim() {
366 let file_path = self.get_file_path(&victim_key);
367
368 if let Some((_, victim_value)) = self.index.remove(&victim_key) {
369 let _ = fs::remove_file(file_path).await;
371
372 *current = current.saturating_sub(victim_value.size);
373 eviction.on_remove(&victim_key);
374
375 let mut stats = self.stats.write().await;
376 stats.evictions += 1;
377 stats.item_count = stats.item_count.saturating_sub(1);
378 } else {
379 continue;
380 }
381 } else {
382 return Err(CacheError::CacheFull("L2 cache full".to_string()));
383 }
384 }
385
386 Ok(())
387 }
388}
389
390#[async_trait]
391impl CacheTier for L2DiskTier {
392 async fn get(&self, key: &CacheKey) -> Result<Option<CacheValue>> {
393 let mut stats = self.stats.write().await;
394
395 if let Some(mut index_entry) = self.index.get_mut(key) {
396 let file_path = self.get_file_path(key);
397
398 let mut file = fs::File::open(file_path).await?;
400 let mut compressed_bytes = Vec::new();
401 file.read_to_end(&mut compressed_bytes).await?;
402
403 let compressed: CompressedData = serde_json::from_slice(&compressed_bytes)?;
405
406 let mut compressor = self.compressor.write().await;
408 let data = compressed.decompress(&mut compressor)?;
409
410 index_entry.record_access();
411 stats.hits += 1;
412
413 let mut eviction = self.eviction.write().await;
414 eviction.on_access(key);
415
416 Ok(Some(CacheValue {
417 data,
418 data_type: index_entry.data_type,
419 created_at: index_entry.created_at,
420 last_accessed: index_entry.last_accessed,
421 access_count: index_entry.access_count,
422 size: index_entry.size,
423 }))
424 } else {
425 stats.misses += 1;
426 Ok(None)
427 }
428 }
429
430 async fn put(&self, key: CacheKey, value: CacheValue) -> Result<()> {
431 let mut compressor = self.compressor.write().await;
433 let codec = compressor.select_codec(value.data_type);
434 let compressed_data = compressor.compress(&value.data, codec, value.data_type)?;
435
436 let actual_codec = if compressed_data.len() == value.data.len() && value.data.len() < 1024 {
438 CompressionCodec::None
439 } else {
440 codec
441 };
442 drop(compressor);
443
444 let compressed =
445 CompressedData::new(compressed_data.to_vec(), actual_codec, value.data.len());
446
447 let serialized = serde_json::to_vec(&compressed)?;
449 let file_size = serialized.len();
450
451 self.make_space(file_size).await?;
453
454 let file_path = self.get_file_path(&key);
456 let mut file = fs::File::create(file_path).await?;
457 file.write_all(&serialized).await?;
458 file.flush().await?;
459
460 let index_value = CacheValue {
462 data: Bytes::new(),
463 data_type: value.data_type,
464 created_at: value.created_at,
465 last_accessed: value.last_accessed,
466 access_count: value.access_count,
467 size: file_size,
468 };
469
470 self.index.insert(key.clone(), index_value);
471
472 let mut current_size = self.current_size.write().await;
474 *current_size += file_size;
475
476 let mut eviction = self.eviction.write().await;
478 eviction.on_insert(key, file_size);
479
480 let mut stats = self.stats.write().await;
482 stats.bytes_stored = *current_size as u64;
483 stats.item_count += 1;
484
485 Ok(())
486 }
487
488 async fn remove(&self, key: &CacheKey) -> Result<bool> {
489 if let Some((_, value)) = self.index.remove(key) {
490 let file_path = self.get_file_path(key);
491 let _ = fs::remove_file(file_path).await;
492
493 let mut current_size = self.current_size.write().await;
494 *current_size = current_size.saturating_sub(value.size);
495
496 let mut eviction = self.eviction.write().await;
497 eviction.on_remove(key);
498
499 let mut stats = self.stats.write().await;
500 stats.bytes_stored = *current_size as u64;
501 stats.item_count = stats.item_count.saturating_sub(1);
502
503 Ok(true)
504 } else {
505 Ok(false)
506 }
507 }
508
509 async fn contains(&self, key: &CacheKey) -> bool {
510 self.index.contains_key(key)
511 }
512
513 async fn stats(&self) -> CacheStats {
514 self.stats.read().await.clone()
515 }
516
517 async fn clear(&self) -> Result<()> {
518 let mut entries = fs::read_dir(&self.cache_dir).await?;
520
521 while let Some(entry) = entries.next_entry().await? {
522 if entry.path().extension().and_then(|s| s.to_str()) == Some("cache") {
523 let _ = fs::remove_file(entry.path()).await;
524 }
525 }
526
527 self.index.clear();
528
529 let mut current_size = self.current_size.write().await;
530 *current_size = 0;
531
532 let mut eviction = self.eviction.write().await;
533 eviction.clear();
534
535 let mut stats = self.stats.write().await;
536 *stats = CacheStats::new();
537
538 Ok(())
539 }
540
541 fn name(&self) -> &str {
542 "L2-Disk"
543 }
544
545 fn capacity(&self) -> usize {
546 self.max_size
547 }
548
549 async fn current_size(&self) -> usize {
550 *self.current_size.read().await
551 }
552}
553
554pub struct MultiTierCache {
556 l1: Arc<dyn CacheTier>,
558 l2: Option<Arc<dyn CacheTier>>,
560 l3: Option<Arc<dyn CacheTier>>,
562 #[allow(dead_code)]
564 config: CacheConfig,
565 global_stats: Arc<RwLock<CacheStats>>,
567}
568
569impl MultiTierCache {
570 pub async fn new(config: CacheConfig) -> Result<Self> {
572 let l1 = Arc::new(L1MemoryTier::new(config.l1_size)) as Arc<dyn CacheTier>;
573
574 let l2 = if config.l2_size > 0 {
575 if let Some(cache_dir) = &config.cache_dir {
576 let l2_dir = cache_dir.join("l2");
577 Some(Arc::new(L2DiskTier::new(l2_dir, config.l2_size).await?) as Arc<dyn CacheTier>)
578 } else {
579 None
580 }
581 } else {
582 None
583 };
584
585 Ok(Self {
586 l1,
587 l2,
588 l3: None, config,
590 global_stats: Arc::new(RwLock::new(CacheStats::new())),
591 })
592 }
593
594 pub async fn get(&self, key: &CacheKey) -> Result<Option<CacheValue>> {
596 if let Some(value) = self.l1.get(key).await? {
598 let mut stats = self.global_stats.write().await;
599 stats.hits += 1;
600 return Ok(Some(value));
601 }
602
603 if let Some(l2) = &self.l2 {
605 if let Some(value) = l2.get(key).await? {
606 let _ = self.l1.put(key.clone(), value.clone()).await;
608
609 let mut stats = self.global_stats.write().await;
610 stats.hits += 1;
611 return Ok(Some(value));
612 }
613 }
614
615 if let Some(l3) = &self.l3 {
617 if let Some(value) = l3.get(key).await? {
618 if let Some(l2) = &self.l2 {
620 let _ = l2.put(key.clone(), value.clone()).await;
621 }
622 let _ = self.l1.put(key.clone(), value.clone()).await;
623
624 let mut stats = self.global_stats.write().await;
625 stats.hits += 1;
626 return Ok(Some(value));
627 }
628 }
629
630 let mut stats = self.global_stats.write().await;
631 stats.misses += 1;
632 Ok(None)
633 }
634
635 pub async fn put(&self, key: CacheKey, value: CacheValue) -> Result<()> {
637 self.l1.put(key.clone(), value.clone()).await?;
639
640 if let Some(l2) = &self.l2 {
642 let _ = l2.put(key.clone(), value.clone()).await;
643 }
644
645 if let Some(l3) = &self.l3 {
647 let _ = l3.put(key, value).await;
648 }
649
650 Ok(())
651 }
652
653 pub async fn remove(&self, key: &CacheKey) -> Result<bool> {
655 let mut removed = false;
656
657 removed |= self.l1.remove(key).await?;
658
659 if let Some(l2) = &self.l2 {
660 removed |= l2.remove(key).await?;
661 }
662
663 if let Some(l3) = &self.l3 {
664 removed |= l3.remove(key).await?;
665 }
666
667 Ok(removed)
668 }
669
670 pub async fn contains(&self, key: &CacheKey) -> bool {
672 if self.l1.contains(key).await {
673 return true;
674 }
675
676 if let Some(l2) = &self.l2 {
677 if l2.contains(key).await {
678 return true;
679 }
680 }
681
682 if let Some(l3) = &self.l3 {
683 if l3.contains(key).await {
684 return true;
685 }
686 }
687
688 false
689 }
690
691 pub async fn stats(&self) -> CacheStats {
693 self.global_stats.read().await.clone()
694 }
695
696 pub async fn tier_stats(&self) -> HashMap<String, CacheStats> {
698 let mut stats = HashMap::new();
699
700 stats.insert(self.l1.name().to_string(), self.l1.stats().await);
701
702 if let Some(l2) = &self.l2 {
703 stats.insert(l2.name().to_string(), l2.stats().await);
704 }
705
706 if let Some(l3) = &self.l3 {
707 stats.insert(l3.name().to_string(), l3.stats().await);
708 }
709
710 stats
711 }
712
713 pub async fn clear(&self) -> Result<()> {
715 self.l1.clear().await?;
716
717 if let Some(l2) = &self.l2 {
718 l2.clear().await?;
719 }
720
721 if let Some(l3) = &self.l3 {
722 l3.clear().await?;
723 }
724
725 let mut stats = self.global_stats.write().await;
726 *stats = CacheStats::new();
727
728 Ok(())
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735
736 #[tokio::test]
737 async fn test_l1_memory_tier() {
738 let tier = L1MemoryTier::new(1024 * 1024); let key = "test_key".to_string();
741 let value = CacheValue::new(Bytes::from("test data"), DataType::Binary);
742
743 tier.put(key.clone(), value.clone())
745 .await
746 .expect("put failed");
747 let retrieved = tier.get(&key).await.expect("get failed");
748
749 assert!(retrieved.is_some());
750 assert_eq!(retrieved.as_ref().map(|v| &v.data), Some(&value.data));
751
752 let stats = tier.stats().await;
754 assert_eq!(stats.hits, 1);
755 assert_eq!(stats.misses, 0);
756 assert_eq!(stats.item_count, 1);
757 }
758
759 #[tokio::test]
760 async fn test_l1_eviction() {
761 let tier = L1MemoryTier::new(100); let value1 = CacheValue::new(Bytes::from("a".repeat(40)), DataType::Binary);
764 let value2 = CacheValue::new(Bytes::from("b".repeat(40)), DataType::Binary);
765 let value3 = CacheValue::new(Bytes::from("c".repeat(40)), DataType::Binary);
766
767 tier.put("key1".to_string(), value1)
768 .await
769 .expect("put failed");
770 tier.put("key2".to_string(), value2)
771 .await
772 .expect("put failed");
773
774 tier.put("key3".to_string(), value3)
776 .await
777 .expect("put failed");
778
779 let stats = tier.stats().await;
780 assert!(stats.evictions > 0);
781 }
782
783 #[tokio::test]
784 async fn test_multi_tier_cache() {
785 let temp_dir = std::env::temp_dir().join("oxigdal_cache_test");
786 let config = CacheConfig {
787 l1_size: 1024,
788 l2_size: 4096,
789 l3_size: 0,
790 enable_compression: true,
791 enable_prefetch: false,
792 enable_distributed: false,
793 cache_dir: Some(temp_dir.clone()),
794 };
795
796 let cache = MultiTierCache::new(config)
797 .await
798 .expect("cache creation failed");
799
800 let key = "test_multi".to_string();
801 let value = CacheValue::new(Bytes::from("multi-tier test data"), DataType::Text);
802
803 cache
805 .put(key.clone(), value.clone())
806 .await
807 .expect("put failed");
808
809 let retrieved = cache.get(&key).await.expect("get failed");
811 assert!(retrieved.is_some());
812
813 let _ = tokio::fs::remove_dir_all(temp_dir).await;
815 }
816}