Skip to main content

oxigdal_cache_advanced/tiering/
policy.rs

1//! Intelligent tiering policies
2//!
3//! Automatic promotion and demotion between cache tiers based on:
4//! - Access frequency
5//! - Access recency
6//! - Cost-aware placement
7//! - Predictive promotion
8//! - Adaptive tier sizing
9
10use crate::error::{CacheError, Result};
11use crate::multi_tier::CacheKey;
12use std::collections::{HashMap, VecDeque};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::RwLock;
16
17/// Tier information
18#[derive(Debug, Clone)]
19pub struct TierInfo {
20    /// Tier name
21    pub name: String,
22    /// Tier level (0 = fastest/most expensive)
23    pub level: usize,
24    /// Cost per byte (arbitrary units)
25    pub cost_per_byte: f64,
26    /// Access latency (microseconds)
27    pub latency_us: u64,
28    /// Current size in bytes
29    pub current_size: usize,
30    /// Maximum size in bytes
31    pub max_size: usize,
32}
33
34impl TierInfo {
35    /// Check if tier has space available
36    pub fn has_space(&self, bytes: usize) -> bool {
37        self.current_size + bytes <= self.max_size
38    }
39
40    /// Get utilization percentage
41    pub fn utilization(&self) -> f64 {
42        if self.max_size == 0 {
43            0.0
44        } else {
45            (self.current_size as f64 / self.max_size as f64) * 100.0
46        }
47    }
48}
49
50/// Access statistics for a cache item
51#[derive(Debug, Clone)]
52pub struct AccessStats {
53    /// Total number of accesses
54    pub access_count: u64,
55    /// Last access time
56    pub last_access: Instant,
57    /// First access time
58    pub first_access: Instant,
59    /// Access timestamps (for frequency analysis)
60    pub access_times: VecDeque<Instant>,
61    /// Current tier level
62    pub current_tier: usize,
63    /// Item size in bytes
64    pub size_bytes: usize,
65}
66
67impl AccessStats {
68    /// Create new access stats
69    pub fn new(tier: usize, size: usize) -> Self {
70        let now = Instant::now();
71        let mut times = VecDeque::with_capacity(100);
72        times.push_back(now);
73
74        Self {
75            access_count: 1,
76            last_access: now,
77            first_access: now,
78            access_times: times,
79            current_tier: tier,
80            size_bytes: size,
81        }
82    }
83
84    /// Record an access
85    pub fn record_access(&mut self) {
86        self.access_count += 1;
87        self.last_access = Instant::now();
88
89        // Keep recent access history
90        if self.access_times.len() >= 100 {
91            self.access_times.pop_front();
92        }
93        self.access_times.push_back(Instant::now());
94    }
95
96    /// Calculate access frequency (accesses per second)
97    pub fn frequency(&self) -> f64 {
98        let duration = self.last_access.duration_since(self.first_access);
99        if duration.as_secs() == 0 {
100            self.access_count as f64
101        } else {
102            self.access_count as f64 / duration.as_secs() as f64
103        }
104    }
105
106    /// Calculate recency score (0.0 = old, 1.0 = very recent)
107    pub fn recency_score(&self, max_age: Duration) -> f64 {
108        let age = self.last_access.elapsed();
109        let age_secs = age.as_secs_f64();
110        let max_secs = max_age.as_secs_f64();
111
112        if age_secs >= max_secs {
113            0.0
114        } else {
115            1.0 - (age_secs / max_secs)
116        }
117    }
118
119    /// Calculate heat score (combination of frequency and recency)
120    pub fn heat_score(&self, max_age: Duration) -> f64 {
121        let freq = self.frequency();
122        let recency = self.recency_score(max_age);
123
124        // Weighted combination (favor recency slightly)
125        0.4 * freq.min(10.0) / 10.0 + 0.6 * recency
126    }
127}
128
129/// Tiering policy decisions
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum TieringAction {
132    /// Promote to higher tier
133    Promote(usize),
134    /// Demote to lower tier
135    Demote(usize),
136    /// Stay in current tier
137    Stay,
138}
139
140/// Frequency-based tiering policy
141pub struct FrequencyBasedPolicy {
142    /// Access statistics
143    stats: Arc<RwLock<HashMap<CacheKey, AccessStats>>>,
144    /// Tier information
145    tiers: Vec<TierInfo>,
146    /// Promotion threshold (accesses per second)
147    promotion_threshold: f64,
148    /// Demotion threshold (accesses per second)
149    demotion_threshold: f64,
150}
151
152impl FrequencyBasedPolicy {
153    /// Create new frequency-based policy
154    pub fn new(tiers: Vec<TierInfo>, promotion_threshold: f64, demotion_threshold: f64) -> Self {
155        Self {
156            stats: Arc::new(RwLock::new(HashMap::new())),
157            tiers,
158            promotion_threshold,
159            demotion_threshold,
160        }
161    }
162
163    /// Record access
164    pub async fn record_access(&self, key: CacheKey, tier: usize, size: usize) {
165        let mut stats = self.stats.write().await;
166        stats
167            .entry(key)
168            .and_modify(|s| s.record_access())
169            .or_insert_with(|| AccessStats::new(tier, size));
170    }
171
172    /// Evaluate tiering action for a key
173    pub async fn evaluate(&self, key: &CacheKey) -> Result<TieringAction> {
174        let stats = self.stats.read().await;
175        let item_stats = stats
176            .get(key)
177            .ok_or_else(|| CacheError::KeyNotFound(key.clone()))?;
178
179        let freq = item_stats.frequency();
180        let current_tier = item_stats.current_tier;
181
182        if freq >= self.promotion_threshold && current_tier > 0 {
183            // Promote to higher tier
184            Ok(TieringAction::Promote(current_tier - 1))
185        } else if freq <= self.demotion_threshold && current_tier < self.tiers.len() - 1 {
186            // Demote to lower tier
187            Ok(TieringAction::Demote(current_tier + 1))
188        } else {
189            Ok(TieringAction::Stay)
190        }
191    }
192
193    /// Get items to promote
194    pub async fn get_promotion_candidates(&self, tier: usize, limit: usize) -> Vec<CacheKey> {
195        let stats = self.stats.read().await;
196        let mut candidates: Vec<_> = stats
197            .iter()
198            .filter(|(_, s)| s.current_tier == tier && s.frequency() >= self.promotion_threshold)
199            .map(|(k, s)| (k.clone(), s.frequency()))
200            .collect();
201
202        candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
203        candidates.truncate(limit);
204        candidates.into_iter().map(|(k, _)| k).collect()
205    }
206
207    /// Get items to demote
208    pub async fn get_demotion_candidates(&self, tier: usize, limit: usize) -> Vec<CacheKey> {
209        let stats = self.stats.read().await;
210        let mut candidates: Vec<_> = stats
211            .iter()
212            .filter(|(_, s)| s.current_tier == tier && s.frequency() <= self.demotion_threshold)
213            .map(|(k, s)| (k.clone(), s.frequency()))
214            .collect();
215
216        candidates.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
217        candidates.truncate(limit);
218        candidates.into_iter().map(|(k, _)| k).collect()
219    }
220}
221
222/// Cost-aware tiering policy
223pub struct CostAwarePolicy {
224    /// Access statistics
225    stats: Arc<RwLock<HashMap<CacheKey, AccessStats>>>,
226    /// Tier information
227    tiers: Vec<TierInfo>,
228    /// Maximum age for recency calculation
229    max_age: Duration,
230}
231
232impl CostAwarePolicy {
233    /// Create new cost-aware policy
234    pub fn new(tiers: Vec<TierInfo>, max_age: Duration) -> Self {
235        Self {
236            stats: Arc::new(RwLock::new(HashMap::new())),
237            tiers,
238            max_age,
239        }
240    }
241
242    /// Record access
243    pub async fn record_access(&self, key: CacheKey, tier: usize, size: usize) {
244        let mut stats = self.stats.write().await;
245        stats
246            .entry(key)
247            .and_modify(|s| s.record_access())
248            .or_insert_with(|| AccessStats::new(tier, size));
249    }
250
251    /// Calculate value score (benefit per cost)
252    fn value_score(&self, item_stats: &AccessStats, target_tier: usize) -> f64 {
253        if target_tier >= self.tiers.len() {
254            return 0.0;
255        }
256
257        let tier = &self.tiers[target_tier];
258        let heat = item_stats.heat_score(self.max_age);
259        let cost = item_stats.size_bytes as f64 * tier.cost_per_byte;
260
261        if cost > 0.0 { heat / cost } else { heat }
262    }
263
264    /// Evaluate tiering action
265    pub async fn evaluate(&self, key: &CacheKey) -> Result<TieringAction> {
266        let stats = self.stats.read().await;
267        let item_stats = stats
268            .get(key)
269            .ok_or_else(|| CacheError::KeyNotFound(key.clone()))?;
270
271        let current_tier = item_stats.current_tier;
272        let current_value = self.value_score(item_stats, current_tier);
273
274        // Check if promotion makes sense
275        if current_tier > 0 {
276            let promote_value = self.value_score(item_stats, current_tier - 1);
277            if promote_value > current_value * 1.2 {
278                // 20% improvement threshold
279                return Ok(TieringAction::Promote(current_tier - 1));
280            }
281        }
282
283        // Check if demotion makes sense
284        if current_tier < self.tiers.len() - 1 {
285            let demote_value = self.value_score(item_stats, current_tier + 1);
286            if current_value < demote_value * 0.8 {
287                // Stay only if current is at least 80% as good
288                return Ok(TieringAction::Demote(current_tier + 1));
289            }
290        }
291
292        Ok(TieringAction::Stay)
293    }
294
295    /// Get optimal tier for a key based on value score
296    pub async fn get_optimal_tier(&self, key: &CacheKey) -> Result<usize> {
297        let stats = self.stats.read().await;
298        let item_stats = stats
299            .get(key)
300            .ok_or_else(|| CacheError::KeyNotFound(key.clone()))?;
301
302        let mut best_tier = 0;
303        let mut best_value = 0.0;
304
305        for (tier_idx, _tier) in self.tiers.iter().enumerate() {
306            let value = self.value_score(item_stats, tier_idx);
307            if value > best_value {
308                best_value = value;
309                best_tier = tier_idx;
310            }
311        }
312
313        Ok(best_tier)
314    }
315}
316
317/// Adaptive tier sizing
318pub struct AdaptiveTierSizer {
319    /// Tier information
320    tiers: Arc<RwLock<Vec<TierInfo>>>,
321    /// Target utilization percentage
322    target_utilization: f64,
323    /// Resize step size (percentage)
324    resize_step: f64,
325}
326
327impl AdaptiveTierSizer {
328    /// Create new adaptive tier sizer
329    pub fn new(tiers: Vec<TierInfo>, target_utilization: f64, resize_step: f64) -> Self {
330        Self {
331            tiers: Arc::new(RwLock::new(tiers)),
332            target_utilization,
333            resize_step,
334        }
335    }
336
337    /// Adjust tier sizes based on utilization
338    pub async fn adjust_sizes(&self) -> Vec<TierInfo> {
339        let mut tiers = self.tiers.write().await;
340        let mut adjustments = Vec::new();
341
342        for tier in tiers.iter_mut() {
343            let utilization = tier.utilization();
344
345            if utilization > self.target_utilization {
346                // Increase size
347                let increase = (tier.max_size as f64 * self.resize_step) as usize;
348                tier.max_size += increase;
349                adjustments.push(tier.clone());
350            } else if utilization < self.target_utilization * 0.5 {
351                // Decrease size (if very under-utilized)
352                let decrease = (tier.max_size as f64 * self.resize_step * 0.5) as usize;
353                tier.max_size = tier.max_size.saturating_sub(decrease);
354                tier.max_size = tier.max_size.max(tier.current_size); // Don't shrink below current
355                adjustments.push(tier.clone());
356            }
357        }
358
359        tiers.clone()
360    }
361
362    /// Get current tier sizes
363    pub async fn get_tiers(&self) -> Vec<TierInfo> {
364        self.tiers.read().await.clone()
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[test]
373    fn test_access_stats() {
374        let mut stats = AccessStats::new(0, 1024);
375        assert_eq!(stats.access_count, 1);
376
377        stats.record_access();
378        assert_eq!(stats.access_count, 2);
379
380        let heat = stats.heat_score(Duration::from_secs(60));
381        assert!(heat > 0.0 && heat <= 1.0);
382    }
383
384    #[tokio::test]
385    async fn test_frequency_based_policy() {
386        let tiers = vec![
387            TierInfo {
388                name: "L1".to_string(),
389                level: 0,
390                cost_per_byte: 1.0,
391                latency_us: 10,
392                current_size: 0,
393                max_size: 1024 * 1024,
394            },
395            TierInfo {
396                name: "L2".to_string(),
397                level: 1,
398                cost_per_byte: 0.1,
399                latency_us: 100,
400                current_size: 0,
401                max_size: 10 * 1024 * 1024,
402            },
403        ];
404
405        let policy = FrequencyBasedPolicy::new(tiers, 5.0, 0.1);
406
407        let key = "test_key".to_string();
408        policy.record_access(key.clone(), 1, 1024).await;
409
410        let action = policy.evaluate(&key).await.unwrap_or(TieringAction::Stay);
411        assert!(matches!(action, TieringAction::Stay));
412    }
413
414    #[tokio::test]
415    async fn test_cost_aware_policy() {
416        let tiers = vec![
417            TierInfo {
418                name: "L1".to_string(),
419                level: 0,
420                cost_per_byte: 1.0,
421                latency_us: 10,
422                current_size: 0,
423                max_size: 1024 * 1024,
424            },
425            TierInfo {
426                name: "L2".to_string(),
427                level: 1,
428                cost_per_byte: 0.1,
429                latency_us: 100,
430                current_size: 0,
431                max_size: 10 * 1024 * 1024,
432            },
433        ];
434
435        let policy = CostAwarePolicy::new(tiers, Duration::from_secs(60));
436
437        let key = "test_key".to_string();
438        policy.record_access(key.clone(), 1, 1024).await;
439
440        let optimal = policy.get_optimal_tier(&key).await.unwrap_or(0);
441        assert!(optimal < 2);
442    }
443
444    #[tokio::test]
445    async fn test_adaptive_tier_sizer() {
446        let tiers = vec![TierInfo {
447            name: "L1".to_string(),
448            level: 0,
449            cost_per_byte: 1.0,
450            latency_us: 10,
451            current_size: 900 * 1024,
452            max_size: 1024 * 1024,
453        }];
454
455        let sizer = AdaptiveTierSizer::new(tiers.clone(), 80.0, 0.1);
456        let adjusted = sizer.adjust_sizes().await;
457
458        // Should have increased size due to high utilization
459        assert!(adjusted[0].max_size > 1024 * 1024);
460    }
461}