oxigdal_cache_advanced/tiering/
policy.rs1use crate::error::{CacheError, Result};
11use crate::multi_tier::CacheKey;
12use std::collections::{HashMap, VecDeque};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::RwLock;
16
17#[derive(Debug, Clone)]
19pub struct TierInfo {
20 pub name: String,
22 pub level: usize,
24 pub cost_per_byte: f64,
26 pub latency_us: u64,
28 pub current_size: usize,
30 pub max_size: usize,
32}
33
34impl TierInfo {
35 pub fn has_space(&self, bytes: usize) -> bool {
37 self.current_size + bytes <= self.max_size
38 }
39
40 pub fn utilization(&self) -> f64 {
42 if self.max_size == 0 {
43 0.0
44 } else {
45 (self.current_size as f64 / self.max_size as f64) * 100.0
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct AccessStats {
53 pub access_count: u64,
55 pub last_access: Instant,
57 pub first_access: Instant,
59 pub access_times: VecDeque<Instant>,
61 pub current_tier: usize,
63 pub size_bytes: usize,
65}
66
67impl AccessStats {
68 pub fn new(tier: usize, size: usize) -> Self {
70 let now = Instant::now();
71 let mut times = VecDeque::with_capacity(100);
72 times.push_back(now);
73
74 Self {
75 access_count: 1,
76 last_access: now,
77 first_access: now,
78 access_times: times,
79 current_tier: tier,
80 size_bytes: size,
81 }
82 }
83
84 pub fn record_access(&mut self) {
86 self.access_count += 1;
87 self.last_access = Instant::now();
88
89 if self.access_times.len() >= 100 {
91 self.access_times.pop_front();
92 }
93 self.access_times.push_back(Instant::now());
94 }
95
96 pub fn frequency(&self) -> f64 {
98 let duration = self.last_access.duration_since(self.first_access);
99 if duration.as_secs() == 0 {
100 self.access_count as f64
101 } else {
102 self.access_count as f64 / duration.as_secs() as f64
103 }
104 }
105
106 pub fn recency_score(&self, max_age: Duration) -> f64 {
108 let age = self.last_access.elapsed();
109 let age_secs = age.as_secs_f64();
110 let max_secs = max_age.as_secs_f64();
111
112 if age_secs >= max_secs {
113 0.0
114 } else {
115 1.0 - (age_secs / max_secs)
116 }
117 }
118
119 pub fn heat_score(&self, max_age: Duration) -> f64 {
121 let freq = self.frequency();
122 let recency = self.recency_score(max_age);
123
124 0.4 * freq.min(10.0) / 10.0 + 0.6 * recency
126 }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum TieringAction {
132 Promote(usize),
134 Demote(usize),
136 Stay,
138}
139
140pub struct FrequencyBasedPolicy {
142 stats: Arc<RwLock<HashMap<CacheKey, AccessStats>>>,
144 tiers: Vec<TierInfo>,
146 promotion_threshold: f64,
148 demotion_threshold: f64,
150}
151
152impl FrequencyBasedPolicy {
153 pub fn new(tiers: Vec<TierInfo>, promotion_threshold: f64, demotion_threshold: f64) -> Self {
155 Self {
156 stats: Arc::new(RwLock::new(HashMap::new())),
157 tiers,
158 promotion_threshold,
159 demotion_threshold,
160 }
161 }
162
163 pub async fn record_access(&self, key: CacheKey, tier: usize, size: usize) {
165 let mut stats = self.stats.write().await;
166 stats
167 .entry(key)
168 .and_modify(|s| s.record_access())
169 .or_insert_with(|| AccessStats::new(tier, size));
170 }
171
172 pub async fn evaluate(&self, key: &CacheKey) -> Result<TieringAction> {
174 let stats = self.stats.read().await;
175 let item_stats = stats
176 .get(key)
177 .ok_or_else(|| CacheError::KeyNotFound(key.clone()))?;
178
179 let freq = item_stats.frequency();
180 let current_tier = item_stats.current_tier;
181
182 if freq >= self.promotion_threshold && current_tier > 0 {
183 Ok(TieringAction::Promote(current_tier - 1))
185 } else if freq <= self.demotion_threshold && current_tier < self.tiers.len() - 1 {
186 Ok(TieringAction::Demote(current_tier + 1))
188 } else {
189 Ok(TieringAction::Stay)
190 }
191 }
192
193 pub async fn get_promotion_candidates(&self, tier: usize, limit: usize) -> Vec<CacheKey> {
195 let stats = self.stats.read().await;
196 let mut candidates: Vec<_> = stats
197 .iter()
198 .filter(|(_, s)| s.current_tier == tier && s.frequency() >= self.promotion_threshold)
199 .map(|(k, s)| (k.clone(), s.frequency()))
200 .collect();
201
202 candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
203 candidates.truncate(limit);
204 candidates.into_iter().map(|(k, _)| k).collect()
205 }
206
207 pub async fn get_demotion_candidates(&self, tier: usize, limit: usize) -> Vec<CacheKey> {
209 let stats = self.stats.read().await;
210 let mut candidates: Vec<_> = stats
211 .iter()
212 .filter(|(_, s)| s.current_tier == tier && s.frequency() <= self.demotion_threshold)
213 .map(|(k, s)| (k.clone(), s.frequency()))
214 .collect();
215
216 candidates.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
217 candidates.truncate(limit);
218 candidates.into_iter().map(|(k, _)| k).collect()
219 }
220}
221
222pub struct CostAwarePolicy {
224 stats: Arc<RwLock<HashMap<CacheKey, AccessStats>>>,
226 tiers: Vec<TierInfo>,
228 max_age: Duration,
230}
231
232impl CostAwarePolicy {
233 pub fn new(tiers: Vec<TierInfo>, max_age: Duration) -> Self {
235 Self {
236 stats: Arc::new(RwLock::new(HashMap::new())),
237 tiers,
238 max_age,
239 }
240 }
241
242 pub async fn record_access(&self, key: CacheKey, tier: usize, size: usize) {
244 let mut stats = self.stats.write().await;
245 stats
246 .entry(key)
247 .and_modify(|s| s.record_access())
248 .or_insert_with(|| AccessStats::new(tier, size));
249 }
250
251 fn value_score(&self, item_stats: &AccessStats, target_tier: usize) -> f64 {
253 if target_tier >= self.tiers.len() {
254 return 0.0;
255 }
256
257 let tier = &self.tiers[target_tier];
258 let heat = item_stats.heat_score(self.max_age);
259 let cost = item_stats.size_bytes as f64 * tier.cost_per_byte;
260
261 if cost > 0.0 { heat / cost } else { heat }
262 }
263
264 pub async fn evaluate(&self, key: &CacheKey) -> Result<TieringAction> {
266 let stats = self.stats.read().await;
267 let item_stats = stats
268 .get(key)
269 .ok_or_else(|| CacheError::KeyNotFound(key.clone()))?;
270
271 let current_tier = item_stats.current_tier;
272 let current_value = self.value_score(item_stats, current_tier);
273
274 if current_tier > 0 {
276 let promote_value = self.value_score(item_stats, current_tier - 1);
277 if promote_value > current_value * 1.2 {
278 return Ok(TieringAction::Promote(current_tier - 1));
280 }
281 }
282
283 if current_tier < self.tiers.len() - 1 {
285 let demote_value = self.value_score(item_stats, current_tier + 1);
286 if current_value < demote_value * 0.8 {
287 return Ok(TieringAction::Demote(current_tier + 1));
289 }
290 }
291
292 Ok(TieringAction::Stay)
293 }
294
295 pub async fn get_optimal_tier(&self, key: &CacheKey) -> Result<usize> {
297 let stats = self.stats.read().await;
298 let item_stats = stats
299 .get(key)
300 .ok_or_else(|| CacheError::KeyNotFound(key.clone()))?;
301
302 let mut best_tier = 0;
303 let mut best_value = 0.0;
304
305 for (tier_idx, _tier) in self.tiers.iter().enumerate() {
306 let value = self.value_score(item_stats, tier_idx);
307 if value > best_value {
308 best_value = value;
309 best_tier = tier_idx;
310 }
311 }
312
313 Ok(best_tier)
314 }
315}
316
317pub struct AdaptiveTierSizer {
319 tiers: Arc<RwLock<Vec<TierInfo>>>,
321 target_utilization: f64,
323 resize_step: f64,
325}
326
327impl AdaptiveTierSizer {
328 pub fn new(tiers: Vec<TierInfo>, target_utilization: f64, resize_step: f64) -> Self {
330 Self {
331 tiers: Arc::new(RwLock::new(tiers)),
332 target_utilization,
333 resize_step,
334 }
335 }
336
337 pub async fn adjust_sizes(&self) -> Vec<TierInfo> {
339 let mut tiers = self.tiers.write().await;
340 let mut adjustments = Vec::new();
341
342 for tier in tiers.iter_mut() {
343 let utilization = tier.utilization();
344
345 if utilization > self.target_utilization {
346 let increase = (tier.max_size as f64 * self.resize_step) as usize;
348 tier.max_size += increase;
349 adjustments.push(tier.clone());
350 } else if utilization < self.target_utilization * 0.5 {
351 let decrease = (tier.max_size as f64 * self.resize_step * 0.5) as usize;
353 tier.max_size = tier.max_size.saturating_sub(decrease);
354 tier.max_size = tier.max_size.max(tier.current_size); adjustments.push(tier.clone());
356 }
357 }
358
359 tiers.clone()
360 }
361
362 pub async fn get_tiers(&self) -> Vec<TierInfo> {
364 self.tiers.read().await.clone()
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371
372 #[test]
373 fn test_access_stats() {
374 let mut stats = AccessStats::new(0, 1024);
375 assert_eq!(stats.access_count, 1);
376
377 stats.record_access();
378 assert_eq!(stats.access_count, 2);
379
380 let heat = stats.heat_score(Duration::from_secs(60));
381 assert!(heat > 0.0 && heat <= 1.0);
382 }
383
384 #[tokio::test]
385 async fn test_frequency_based_policy() {
386 let tiers = vec![
387 TierInfo {
388 name: "L1".to_string(),
389 level: 0,
390 cost_per_byte: 1.0,
391 latency_us: 10,
392 current_size: 0,
393 max_size: 1024 * 1024,
394 },
395 TierInfo {
396 name: "L2".to_string(),
397 level: 1,
398 cost_per_byte: 0.1,
399 latency_us: 100,
400 current_size: 0,
401 max_size: 10 * 1024 * 1024,
402 },
403 ];
404
405 let policy = FrequencyBasedPolicy::new(tiers, 5.0, 0.1);
406
407 let key = "test_key".to_string();
408 policy.record_access(key.clone(), 1, 1024).await;
409
410 let action = policy.evaluate(&key).await.unwrap_or(TieringAction::Stay);
411 assert!(matches!(action, TieringAction::Stay));
412 }
413
414 #[tokio::test]
415 async fn test_cost_aware_policy() {
416 let tiers = vec![
417 TierInfo {
418 name: "L1".to_string(),
419 level: 0,
420 cost_per_byte: 1.0,
421 latency_us: 10,
422 current_size: 0,
423 max_size: 1024 * 1024,
424 },
425 TierInfo {
426 name: "L2".to_string(),
427 level: 1,
428 cost_per_byte: 0.1,
429 latency_us: 100,
430 current_size: 0,
431 max_size: 10 * 1024 * 1024,
432 },
433 ];
434
435 let policy = CostAwarePolicy::new(tiers, Duration::from_secs(60));
436
437 let key = "test_key".to_string();
438 policy.record_access(key.clone(), 1, 1024).await;
439
440 let optimal = policy.get_optimal_tier(&key).await.unwrap_or(0);
441 assert!(optimal < 2);
442 }
443
444 #[tokio::test]
445 async fn test_adaptive_tier_sizer() {
446 let tiers = vec![TierInfo {
447 name: "L1".to_string(),
448 level: 0,
449 cost_per_byte: 1.0,
450 latency_us: 10,
451 current_size: 900 * 1024,
452 max_size: 1024 * 1024,
453 }];
454
455 let sizer = AdaptiveTierSizer::new(tiers.clone(), 80.0, 0.1);
456 let adjusted = sizer.adjust_sizes().await;
457
458 assert!(adjusted[0].max_size > 1024 * 1024);
460 }
461}