Skip to main content

oximedia_cache/
cache_warming.rs

1//! Predictive cache warming via access-pattern analysis.
2//!
3//! Records per-key access history, computes frequency/recency metrics, detects
4//! periodic access patterns with auto-correlation, and produces a prioritised
5//! [`WarmupPlan`] that fits within a given memory budget.
6
7/// Historical access record for a single cache key.
8#[derive(Debug, Clone)]
9pub struct AccessPattern {
10    /// The cache key this record belongs to.
11    pub key: String,
12    /// Monotonically increasing Unix timestamps (seconds) of past accesses.
13    pub access_times: Vec<u64>,
14    /// Caller-supplied estimated byte size for this key's value.
15    pub size_bytes: usize,
16}
17
18impl AccessPattern {
19    /// Compute accesses per hour over the entire recorded history.
20    ///
21    /// Returns `0.0` when fewer than two access timestamps are known (the
22    /// time-span is undefined).
23    pub fn frequency_per_hour(&self) -> f64 {
24        if self.access_times.len() < 2 {
25            return 0.0;
26        }
27        let first = *self.access_times.first().unwrap_or(&0);
28        let last = *self.access_times.last().unwrap_or(&0);
29        let span_secs = last.saturating_sub(first);
30        if span_secs == 0 {
31            return 0.0;
32        }
33        let span_hours = span_secs as f64 / 3600.0;
34        self.access_times.len() as f64 / span_hours
35    }
36
37    /// Predict the next access timestamp using exponential inter-arrival
38    /// smoothing (EMA with α = 0.3 on inter-arrival deltas).
39    ///
40    /// Returns `None` when there are fewer than two data points.
41    pub fn predict_next_access(&self) -> Option<u64> {
42        if self.access_times.len() < 2 {
43            return None;
44        }
45        // Build inter-arrival sequence.
46        let intervals: Vec<f64> = self
47            .access_times
48            .windows(2)
49            .map(|w| w[1].saturating_sub(w[0]) as f64)
50            .collect();
51
52        // EMA with α = 0.3 (more weight on recent intervals).
53        const ALPHA: f64 = 0.3;
54        let mut ema = intervals[0];
55        for &interval in intervals.iter().skip(1) {
56            ema = ALPHA * interval + (1.0 - ALPHA) * ema;
57        }
58
59        let last = *self.access_times.last()?;
60        // Round to nearest second, guard against overflow.
61        let predicted = last.saturating_add(ema.round().max(0.0) as u64);
62        Some(predicted)
63    }
64
65    /// Attempt to detect a dominant periodic inter-arrival time (in seconds)
66    /// using normalised auto-correlation of the inter-arrival sequence.
67    ///
68    /// Returns `Some(period)` when the highest off-zero auto-correlation peak
69    /// exceeds 0.3 (weak threshold to be liberal about detection).  Returns
70    /// `None` when the sequence is too short or no clear periodicity is found.
71    pub fn periodicity_secs(&self) -> Option<f64> {
72        if self.access_times.len() < 4 {
73            return None;
74        }
75        let intervals: Vec<f64> = self
76            .access_times
77            .windows(2)
78            .map(|w| w[1].saturating_sub(w[0]) as f64)
79            .collect();
80
81        let n = intervals.len();
82        if n < 3 {
83            return None;
84        }
85
86        // Compute mean.
87        let mean = intervals.iter().sum::<f64>() / n as f64;
88        // Compute variance (denominator for normalisation).
89        let variance: f64 = intervals.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n as f64;
90        if variance < 1e-10 {
91            // All intervals identical → trivially periodic at that value.
92            return Some(mean);
93        }
94
95        // Auto-correlation for lags 1..n/2
96        let max_lag = (n / 2).max(1);
97        let mut best_lag = 0usize;
98        let mut best_corr: f64 = 0.0;
99
100        for lag in 1..=max_lag {
101            let pairs = n - lag;
102            if pairs == 0 {
103                break;
104            }
105            let corr: f64 = (0..pairs)
106                .map(|i| (intervals[i] - mean) * (intervals[i + lag] - mean))
107                .sum::<f64>()
108                / (pairs as f64 * variance);
109
110            if corr > best_corr {
111                best_corr = corr;
112                best_lag = lag;
113            }
114        }
115
116        if best_corr > 0.3 && best_lag > 0 {
117            // Estimate the period as the mean inter-arrival time at that lag.
118            let period_sum: f64 = (0..(n - best_lag)).map(|i| intervals[i + best_lag]).sum();
119            let period = period_sum / (n - best_lag) as f64;
120            Some(period)
121        } else {
122            None
123        }
124    }
125}
126
127// ── CacheWarmer ───────────────────────────────────────────────────────────────
128
129/// A `WarmupPlan` produced by [`CacheWarmer::plan_warmup`].
130#[derive(Debug, Clone)]
131pub struct WarmupPlan {
132    /// Keys to pre-load, ordered by descending priority.
133    pub entries_to_warm: Vec<String>,
134    /// Total byte cost of all entries in the plan.
135    pub estimated_bytes: usize,
136    /// Estimated improvement in hit rate (0.0–1.0).
137    pub estimated_hit_improvement: f64,
138}
139
140/// Predictive cache warmer.
141pub struct CacheWarmer {
142    /// All recorded access patterns, keyed by cache key.
143    pub patterns: Vec<AccessPattern>,
144    /// Look-ahead window: only warm entries whose predicted next access is
145    /// within this many seconds of `current_time`.
146    pub look_ahead_secs: u64,
147    /// Minimum accesses/hour for a key to be considered worth warming.
148    pub min_frequency: f64,
149}
150
151impl CacheWarmer {
152    /// Create a new `CacheWarmer` with sensible defaults.
153    pub fn new() -> Self {
154        Self {
155            patterns: Vec::new(),
156            look_ahead_secs: 300, // 5 minutes
157            min_frequency: 1.0,
158        }
159    }
160
161    /// Record an access to `key` at `time` (Unix seconds).
162    ///
163    /// If no pattern for this key exists yet, one is created.
164    pub fn record_access(&mut self, key: &str, size_bytes: usize, time: u64) {
165        if let Some(p) = self.patterns.iter_mut().find(|p| p.key == key) {
166            p.size_bytes = size_bytes;
167            p.access_times.push(time);
168        } else {
169            self.patterns.push(AccessPattern {
170                key: key.to_string(),
171                access_times: vec![time],
172                size_bytes,
173            });
174        }
175    }
176
177    /// Build a [`WarmupPlan`] prioritising entries by:
178    ///
179    /// ```text
180    /// score = frequency_per_hour × recency_weight × size_efficiency
181    /// ```
182    ///
183    /// where
184    /// * `recency_weight = exp(-age_hours / 1.0)` – exponential decay over 1 h
185    /// * `size_efficiency = 1.0 / (1.0 + size_bytes / 1024)`
186    ///
187    /// Only entries whose predicted next access is within `look_ahead_secs`
188    /// of `current_time` and whose frequency exceeds `min_frequency` are
189    /// considered.  Entries are added in descending score order until
190    /// `available_bytes` would be exceeded.
191    pub fn plan_warmup(&self, current_time: u64, available_bytes: usize) -> WarmupPlan {
192        // Score each qualifying pattern.
193        let mut scored: Vec<(&AccessPattern, f64)> = self
194            .patterns
195            .iter()
196            .filter_map(|p| {
197                let freq = p.frequency_per_hour();
198                if freq < self.min_frequency {
199                    return None;
200                }
201                // Check predicted next access window.
202                let next = p.predict_next_access()?;
203                let deadline = current_time.saturating_add(self.look_ahead_secs);
204                if next > deadline {
205                    return None;
206                }
207                // Recency weight: based on time since last access.
208                let last_access = p.access_times.last().copied().unwrap_or(0);
209                let age_secs = current_time.saturating_sub(last_access);
210                let age_hours = age_secs as f64 / 3600.0;
211                let recency = (-age_hours).exp(); // e^(-age_hours)
212                                                  // Size efficiency: smaller entries are cheaper to warm.
213                let size_eff = 1.0 / (1.0 + p.size_bytes as f64 / 1024.0);
214                let score = freq * recency * size_eff;
215                Some((p, score))
216            })
217            .collect();
218
219        // Sort descending by score.
220        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
221
222        let mut entries_to_warm = Vec::new();
223        let mut estimated_bytes = 0usize;
224
225        for (pattern, _score) in &scored {
226            if estimated_bytes + pattern.size_bytes > available_bytes {
227                break;
228            }
229            estimated_bytes += pattern.size_bytes;
230            entries_to_warm.push(pattern.key.clone());
231        }
232
233        // Estimate hit improvement as fraction of qualifying entries included.
234        let total_qualifying = scored.len();
235        let included = entries_to_warm.len();
236        let estimated_hit_improvement = if total_qualifying == 0 {
237            0.0
238        } else {
239            included as f64 / total_qualifying as f64
240        };
241
242        WarmupPlan {
243            entries_to_warm,
244            estimated_bytes,
245            estimated_hit_improvement,
246        }
247    }
248
249    /// Return the top `n` hot keys sorted by descending frequency.
250    pub fn top_hot_keys(&self, n: usize) -> Vec<(&str, f64)> {
251        let mut scored: Vec<(&str, f64)> = self
252            .patterns
253            .iter()
254            .map(|p| (p.key.as_str(), p.frequency_per_hour()))
255            .collect();
256        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
257        scored.truncate(n);
258        scored
259    }
260}
261
262// ── Tests ─────────────────────────────────────────────────────────────────────
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    // 1. record_access creates a new pattern
269    #[test]
270    fn test_record_access_creates_pattern() {
271        let mut warmer = CacheWarmer::new();
272        warmer.record_access("key1", 512, 1_000_000);
273        assert_eq!(warmer.patterns.len(), 1);
274        assert_eq!(warmer.patterns[0].key, "key1");
275    }
276
277    // 2. record_access accumulates times for the same key
278    #[test]
279    fn test_record_access_accumulates() {
280        let mut warmer = CacheWarmer::new();
281        warmer.record_access("k", 100, 1000);
282        warmer.record_access("k", 100, 2000);
283        warmer.record_access("k", 100, 3000);
284        assert_eq!(warmer.patterns[0].access_times.len(), 3);
285    }
286
287    // 3. frequency_per_hour basic
288    #[test]
289    fn test_frequency_per_hour() {
290        let p = AccessPattern {
291            key: "k".into(),
292            // 6 accesses over 1 hour (3600 s) → ~6 /h
293            access_times: vec![0, 720, 1440, 2160, 2880, 3600],
294            size_bytes: 128,
295        };
296        let freq = p.frequency_per_hour();
297        assert!((freq - 6.0).abs() < 0.01, "expected ~6/h, got {freq}");
298    }
299
300    // 4. frequency_per_hour with fewer than 2 data points
301    #[test]
302    fn test_frequency_single_point() {
303        let p = AccessPattern {
304            key: "k".into(),
305            access_times: vec![1000],
306            size_bytes: 64,
307        };
308        assert_eq!(p.frequency_per_hour(), 0.0);
309    }
310
311    // 5. predict_next_access with uniform intervals
312    #[test]
313    fn test_predict_next_access_uniform() {
314        let p = AccessPattern {
315            key: "k".into(),
316            // 100 s intervals
317            access_times: vec![1000, 1100, 1200, 1300],
318            size_bytes: 64,
319        };
320        let predicted = p.predict_next_access().expect("should predict");
321        // With EMA the predicted interval should be close to 100 s.
322        assert!(
323            predicted >= 1380 && predicted <= 1420,
324            "expected ~1400, got {predicted}"
325        );
326    }
327
328    // 6. predict_next_access returns None with < 2 points
329    #[test]
330    fn test_predict_next_access_insufficient() {
331        let p = AccessPattern {
332            key: "k".into(),
333            access_times: vec![500],
334            size_bytes: 32,
335        };
336        assert!(p.predict_next_access().is_none());
337    }
338
339    // 7. periodicity_secs detects clear period
340    #[test]
341    fn test_periodicity_detected() {
342        // Access every 600 s (10 min) — very regular
343        let times: Vec<u64> = (0..20).map(|i| i * 600).collect();
344        let p = AccessPattern {
345            key: "k".into(),
346            access_times: times,
347            size_bytes: 64,
348        };
349        let period = p.periodicity_secs();
350        assert!(period.is_some(), "should detect periodicity");
351        let period = period.expect("period present");
352        assert!(
353            (period - 600.0).abs() < 5.0,
354            "expected ~600 s, got {period}"
355        );
356    }
357
358    // 8. periodicity_secs returns None for < 4 points
359    #[test]
360    fn test_periodicity_too_few_points() {
361        let p = AccessPattern {
362            key: "k".into(),
363            access_times: vec![0, 100, 200],
364            size_bytes: 64,
365        };
366        // Should not panic; may return None or a value — just test no panic and
367        // that with exactly 3 points we get None.
368        assert!(p.periodicity_secs().is_none());
369    }
370
371    // 9. top_hot_keys returns correct order
372    #[test]
373    fn test_top_hot_keys_order() {
374        let mut warmer = CacheWarmer::new();
375        // "cold": 2 accesses over 1 hour → ~2/h
376        for t in [0u64, 3600] {
377            warmer.record_access("cold", 64, t);
378        }
379        // "hot": 10 accesses over 1 hour → ~10/h
380        for i in 0..10u64 {
381            warmer.record_access("hot", 64, i * 360);
382        }
383        let top = warmer.top_hot_keys(2);
384        assert_eq!(top[0].0, "hot");
385        assert_eq!(top[1].0, "cold");
386    }
387
388    // 10. top_hot_keys respects n limit
389    #[test]
390    fn test_top_hot_keys_limit() {
391        let mut warmer = CacheWarmer::new();
392        for k in ["a", "b", "c", "d", "e"] {
393            warmer.record_access(k, 64, 0);
394            warmer.record_access(k, 64, 3600);
395        }
396        assert_eq!(warmer.top_hot_keys(3).len(), 3);
397    }
398
399    // 11. plan_warmup respects available_bytes
400    #[test]
401    fn test_plan_warmup_respects_budget() {
402        let mut warmer = CacheWarmer::new();
403        warmer.look_ahead_secs = 10_000;
404        warmer.min_frequency = 0.1;
405        let now = 10_000u64;
406        // Two keys with regular access patterns.
407        for i in 0..5u64 {
408            warmer.record_access("big", 5000, i * 1800);
409            warmer.record_access("small", 100, i * 1800);
410        }
411        // Only 200 bytes available → "big" (5000 B) must not be included.
412        let plan = warmer.plan_warmup(now, 200);
413        assert!(plan.estimated_bytes <= 200);
414        assert!(!plan.entries_to_warm.contains(&"big".to_string()));
415    }
416
417    // 12. plan_warmup excludes keys below min_frequency
418    #[test]
419    fn test_plan_warmup_min_frequency_filter() {
420        let mut warmer = CacheWarmer::new();
421        warmer.look_ahead_secs = 100_000;
422        warmer.min_frequency = 100.0; // very high threshold
423                                      // Only 2 accesses → frequency < 100/h
424        warmer.record_access("rare", 64, 0);
425        warmer.record_access("rare", 64, 3600);
426        let plan = warmer.plan_warmup(7200, usize::MAX);
427        assert!(plan.entries_to_warm.is_empty());
428    }
429
430    // 13. estimated_hit_improvement is between 0 and 1
431    #[test]
432    fn test_estimated_hit_improvement_range() {
433        let mut warmer = CacheWarmer::new();
434        warmer.look_ahead_secs = 100_000;
435        warmer.min_frequency = 0.1;
436        for i in 0..5u64 {
437            warmer.record_access("k", 100, i * 600);
438        }
439        let plan = warmer.plan_warmup(3000, usize::MAX);
440        assert!(plan.estimated_hit_improvement >= 0.0);
441        assert!(plan.estimated_hit_improvement <= 1.0);
442    }
443}