rez_next_cache/
predictive_preheater.rs1use crate::PreheatingConfig;
7use serde::{Deserialize, Serialize};
8use std::{
9 collections::{HashMap, VecDeque},
10 hash::Hash,
11 sync::{Arc, RwLock},
12 time::{Duration, SystemTime},
13};
14use tokio::time::Interval;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct AccessPattern {
19 pub key_hash: u64,
21 pub access_times: VecDeque<SystemTime>,
23 pub frequency: f64,
25 pub regularity: f64,
27 pub confidence: f64,
29 pub last_prediction: SystemTime,
31 pub accuracy_history: VecDeque<f64>,
33}
34
35impl AccessPattern {
36 pub fn new(key_hash: u64) -> Self {
38 Self {
39 key_hash,
40 access_times: VecDeque::new(),
41 frequency: 0.0,
42 regularity: 0.0,
43 confidence: 0.0,
44 last_prediction: SystemTime::UNIX_EPOCH,
45 accuracy_history: VecDeque::new(),
46 }
47 }
48
49 pub fn record_access(&mut self, time: SystemTime) {
51 self.access_times.push_back(time);
52
53 let cutoff = time - Duration::from_secs(24 * 3600);
55 while let Some(&front_time) = self.access_times.front() {
56 if front_time < cutoff {
57 self.access_times.pop_front();
58 } else {
59 break;
60 }
61 }
62
63 self.update_metrics();
64 }
65
66 fn update_metrics(&mut self) {
68 if self.access_times.len() < 2 {
69 return;
70 }
71
72 let time_span = self
74 .access_times
75 .back()
76 .unwrap()
77 .duration_since(*self.access_times.front().unwrap())
78 .unwrap_or(Duration::from_secs(1))
79 .as_secs_f64()
80 / 3600.0; self.frequency = if time_span > 0.0 {
83 self.access_times.len() as f64 / time_span
84 } else {
85 0.0
86 };
87
88 if self.access_times.len() >= 3 {
90 let intervals: Vec<f64> = self
91 .access_times
92 .iter()
93 .zip(self.access_times.iter().skip(1))
94 .map(|(a, b)| {
95 b.duration_since(*a)
96 .unwrap_or(Duration::from_secs(0))
97 .as_secs_f64()
98 })
99 .collect();
100
101 let mean_interval = intervals.iter().sum::<f64>() / intervals.len() as f64;
102 let variance = intervals
103 .iter()
104 .map(|&x| (x - mean_interval).powi(2))
105 .sum::<f64>()
106 / intervals.len() as f64;
107
108 self.regularity = if mean_interval > 0.0 {
110 1.0 / (1.0 + (variance.sqrt() / mean_interval))
111 } else {
112 0.0
113 };
114 }
115
116 self.confidence = (self.frequency.min(10.0) / 10.0) * self.regularity;
118 }
119
120 pub fn predict_next_access(&self) -> Option<SystemTime> {
122 if self.access_times.len() < 2 || self.confidence < 0.3 {
123 return None;
124 }
125
126 let intervals: Vec<Duration> = self
128 .access_times
129 .iter()
130 .zip(self.access_times.iter().skip(1))
131 .map(|(a, b)| b.duration_since(*a).unwrap_or(Duration::from_secs(0)))
132 .collect();
133
134 if intervals.is_empty() {
135 return None;
136 }
137
138 let avg_interval_secs =
139 intervals.iter().map(|d| d.as_secs_f64()).sum::<f64>() / intervals.len() as f64;
140
141 let last_access = *self.access_times.back()?;
142 let predicted_time = last_access + Duration::from_secs_f64(avg_interval_secs);
143
144 Some(predicted_time)
145 }
146
147 pub fn calculate_cache_score(&self) -> f64 {
149 let recency_factor = if let Some(last_access) = self.access_times.back() {
151 let age = SystemTime::now()
152 .duration_since(*last_access)
153 .unwrap_or(Duration::from_secs(u64::MAX))
154 .as_secs_f64()
155 / 3600.0; (-age / 24.0).exp() } else {
160 0.0
161 };
162
163 self.frequency * self.regularity * self.confidence * recency_factor
164 }
165}
166
167#[derive(Debug)]
172pub struct PredictivePreheater<K>
173where
174 K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
175{
176 config: PreheatingConfig,
178 patterns: Arc<RwLock<HashMap<K, AccessPattern>>>,
180 preheat_queue: Arc<RwLock<VecDeque<(K, f64, SystemTime)>>>, stats: Arc<RwLock<PreheatingStats>>,
184 _preheating_interval: Option<Interval>,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct PreheatingStats {
191 pub predictions_made: u64,
193 pub successful_predictions: u64,
195 pub failed_predictions: u64,
197 pub accuracy_rate: f64,
199 pub preheat_operations: u64,
201 pub avg_confidence: f64,
203 pub patterns_learned: usize,
205}
206
207impl Default for PreheatingStats {
208 fn default() -> Self {
209 Self {
210 predictions_made: 0,
211 successful_predictions: 0,
212 failed_predictions: 0,
213 accuracy_rate: 0.0,
214 preheat_operations: 0,
215 avg_confidence: 0.0,
216 patterns_learned: 0,
217 }
218 }
219}
220
221impl<K> PredictivePreheater<K>
222where
223 K: Clone + Hash + Eq + Send + Sync + std::fmt::Debug + 'static,
224{
225 pub fn new(config: PreheatingConfig) -> Self {
227 Self {
228 config,
229 patterns: Arc::new(RwLock::new(HashMap::new())),
230 preheat_queue: Arc::new(RwLock::new(VecDeque::new())),
231 stats: Arc::new(RwLock::new(PreheatingStats::default())),
232 _preheating_interval: None,
233 }
234 }
235
236 pub async fn record_access(&self, key: &K) {
238 if !self.config.enable_pattern_learning {
239 return;
240 }
241
242 let key_hash = self.calculate_key_hash(key);
243 let now = SystemTime::now();
244
245 let mut patterns = self.patterns.write().unwrap();
246 let pattern = patterns
247 .entry(key.clone())
248 .or_insert_with(|| AccessPattern::new(key_hash));
249
250 pattern.record_access(now);
251
252 {
254 let mut stats = self.stats.write().unwrap();
255 stats.patterns_learned = patterns.len();
256 }
257 }
258
259 pub async fn predict_and_preheat(&self, key: &K) {
261 if !self.config.enable_predictive_preheating {
262 return;
263 }
264
265 let patterns = self.patterns.read().unwrap();
266 if let Some(pattern) = patterns.get(key) {
267 if let Some(predicted_time) = pattern.predict_next_access() {
268 let score = pattern.calculate_cache_score();
269
270 if pattern.confidence >= self.config.min_confidence_threshold {
272 let mut queue = self.preheat_queue.write().unwrap();
273 queue.push_back((key.clone(), score, predicted_time));
274
275 if queue.len() > self.config.max_preheat_queue_size {
277 queue.pop_front();
278 }
279
280 {
282 let mut stats = self.stats.write().unwrap();
283 stats.predictions_made += 1;
284 }
285 }
286 }
287 }
288 }
289
290 pub async fn get_preheat_recommendations(&self) -> Vec<(K, f64)> {
292 let mut queue = self.preheat_queue.write().unwrap();
293 let now = SystemTime::now();
294
295 let mut recommendations: Vec<(K, f64)> = queue
297 .iter()
298 .filter(|(_, _, predicted_time)| {
299 let time_diff = predicted_time
301 .duration_since(now)
302 .unwrap_or_else(|_| {
303 now.duration_since(*predicted_time)
304 .unwrap_or(Duration::from_secs(0))
305 })
306 .as_secs();
307
308 time_diff <= self.config.preheat_window_seconds
309 })
310 .map(|(key, score, _)| (key.clone(), *score))
311 .collect();
312
313 recommendations.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
315
316 queue.retain(|(_, _, predicted_time)| {
318 let time_diff = predicted_time
319 .duration_since(now)
320 .unwrap_or_else(|_| {
321 now.duration_since(*predicted_time)
322 .unwrap_or(Duration::from_secs(0))
323 })
324 .as_secs();
325
326 time_diff > self.config.preheat_window_seconds
327 });
328
329 recommendations
330 .into_iter()
331 .take(self.config.max_concurrent_preheats)
332 .collect()
333 }
334
335 pub fn calculate_cache_score(&self, key: &K) -> f64 {
337 let patterns = self.patterns.read().unwrap();
338 patterns
339 .get(key)
340 .map(|pattern| pattern.calculate_cache_score())
341 .unwrap_or(0.0)
342 }
343
344 pub fn get_stats(&self) -> PreheatingStats {
346 let stats = self.stats.read().unwrap();
347 let mut result = stats.clone();
348
349 if result.predictions_made > 0 {
351 result.accuracy_rate =
352 result.successful_predictions as f64 / result.predictions_made as f64;
353 }
354
355 let patterns = self.patterns.read().unwrap();
357 if !patterns.is_empty() {
358 result.avg_confidence =
359 patterns.values().map(|p| p.confidence).sum::<f64>() / patterns.len() as f64;
360 }
361
362 result
363 }
364
365 fn calculate_key_hash(&self, key: &K) -> u64 {
367 use std::collections::hash_map::DefaultHasher;
368 use std::hash::Hasher;
369
370 let mut hasher = DefaultHasher::new();
371 key.hash(&mut hasher);
372 hasher.finish()
373 }
374
375 pub async fn record_prediction_outcome(&self, _key: &K, was_hit: bool) {
377 let mut stats = self.stats.write().unwrap();
378 if was_hit {
379 stats.successful_predictions += 1;
380 } else {
381 stats.failed_predictions += 1;
382 }
383 }
384}