oxigdal_cloud/cache/
multi.rs1use std::collections::{HashMap, VecDeque};
4use std::sync::Arc;
5use std::sync::atomic::Ordering;
6use std::time::Duration;
7
8#[cfg(feature = "async")]
9use tokio::sync::RwLock;
10
11use bytes::Bytes;
12
13use super::backends::PersistentDiskCache;
14use super::eviction::LruTtlCache;
15use super::metadata::{CacheKey, CacheStats};
16use super::{CacheConfig, WarmingStrategy};
17use crate::error::{CacheError, CloudError, Result};
18
19#[cfg(feature = "cache")]
21pub struct CacheWarmer<C> {
22 cache: Arc<C>,
24 strategy: WarmingStrategy,
26 access_history: Arc<RwLock<VecDeque<CacheKey>>>,
28 max_history: usize,
30}
31
32#[cfg(feature = "cache")]
33impl<C> CacheWarmer<C> {
34 pub fn new(cache: Arc<C>, strategy: WarmingStrategy) -> Self {
36 Self {
37 cache,
38 strategy,
39 access_history: Arc::new(RwLock::new(VecDeque::new())),
40 max_history: 1000,
41 }
42 }
43
44 pub async fn record_access(&self, key: &CacheKey) {
46 let mut history = self.access_history.write().await;
47 history.push_back(key.clone());
48 while history.len() > self.max_history {
49 history.pop_front();
50 }
51 }
52
53 pub async fn get_warming_targets(&self, current_key: &CacheKey) -> Vec<CacheKey> {
55 match self.strategy {
56 WarmingStrategy::None => Vec::new(),
57 WarmingStrategy::AccessPattern => self.get_pattern_targets(current_key).await,
58 WarmingStrategy::SpatialAdjacent => Vec::new(), WarmingStrategy::PyramidLevels => Vec::new(), WarmingStrategy::Custom => Vec::new(),
61 }
62 }
63
64 async fn get_pattern_targets(&self, current_key: &CacheKey) -> Vec<CacheKey> {
66 let history = self.access_history.read().await;
67
68 let mut next_keys: HashMap<CacheKey, usize> = HashMap::new();
70
71 for window in history.iter().collect::<Vec<_>>().windows(2) {
72 if window[0] == current_key {
73 *next_keys.entry(window[1].clone()).or_insert(0) += 1;
74 }
75 }
76
77 let mut targets: Vec<_> = next_keys.into_iter().collect();
79 targets.sort_by_key(|x| std::cmp::Reverse(x.1));
80 targets.into_iter().take(5).map(|(k, _)| k).collect()
81 }
82}
83
84#[cfg(feature = "cache")]
86pub type MemoryCache = LruTtlCache;
87
88pub type DiskCache = PersistentDiskCache;
90
91#[cfg(feature = "cache")]
93pub struct MultiLevelCache {
94 pub(crate) memory: LruTtlCache,
96 disk: Option<PersistentDiskCache>,
98 warmer: Option<Arc<CacheWarmer<LruTtlCache>>>,
100}
101
102#[cfg(feature = "cache")]
103impl MultiLevelCache {
104 pub fn new(config: CacheConfig) -> Result<Self> {
106 let memory = LruTtlCache::new(config.clone())?;
107
108 let disk = if config.persistent && config.cache_dir.is_some() {
109 Some(PersistentDiskCache::new(config.clone())?)
110 } else {
111 None
112 };
113
114 let warmer = if config.warming_strategy != WarmingStrategy::None {
115 Some(Arc::new(CacheWarmer::new(
116 Arc::new(LruTtlCache::new(config.clone())?),
117 config.warming_strategy,
118 )))
119 } else {
120 None
121 };
122
123 Ok(Self {
124 memory,
125 disk,
126 warmer,
127 })
128 }
129
130 pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
132 if let Some(ref warmer) = self.warmer {
134 warmer.record_access(key).await;
135 }
136
137 if let Ok(data) = self.memory.get(key).await {
139 tracing::trace!("Cache hit (memory): {}", key);
140 return Ok(data);
141 }
142
143 if let Some(ref disk) = self.disk {
145 if let Ok(data) = disk.get(key).await {
146 tracing::trace!("Cache hit (disk): {}", key);
147
148 self.memory.put(key.clone(), data.clone(), None).await.ok();
150
151 return Ok(data);
152 }
153 }
154
155 tracing::trace!("Cache miss: {}", key);
156 Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
157 }
158
159 pub async fn put(&self, key: CacheKey, data: Bytes) -> Result<()> {
161 self.memory.put(key.clone(), data.clone(), None).await?;
163
164 if let Some(ref disk) = self.disk {
165 disk.put(key, data, None).await?;
166 }
167
168 Ok(())
169 }
170
171 pub async fn put_with_ttl(&self, key: CacheKey, data: Bytes, ttl: Duration) -> Result<()> {
173 self.memory
174 .put(key.clone(), data.clone(), Some(ttl))
175 .await?;
176
177 if let Some(ref disk) = self.disk {
178 disk.put(key, data, Some(ttl)).await?;
179 }
180
181 Ok(())
182 }
183
184 pub async fn remove(&self, key: &CacheKey) -> Result<()> {
186 self.memory.remove(key).await?;
187
188 if let Some(ref disk) = self.disk {
189 disk.remove(key).await?;
190 }
191
192 Ok(())
193 }
194
195 pub async fn clear(&self) -> Result<()> {
197 self.memory.clear().await?;
198
199 if let Some(ref disk) = self.disk {
200 disk.clear().await?;
201 }
202
203 Ok(())
204 }
205
206 #[must_use]
208 pub fn stats(&self) -> &CacheStats {
209 self.memory.stats()
210 }
211
212 pub fn memory_size(&self) -> usize {
214 self.memory.current_size.load(Ordering::SeqCst)
215 }
216}