1use crate::traits::BlockStore;
12use dashmap::DashMap;
13use ipfrs_core::Cid;
14use serde::{Deserialize, Serialize};
15use std::collections::VecDeque;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19use tokio::sync::Semaphore;
20use tracing::{debug, trace};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
24pub enum AccessPattern {
25 Sequential,
27 Random,
29 Clustered,
31 Temporal,
33}
34
35#[derive(Debug, Clone)]
37struct AccessRecord {
38 #[allow(dead_code)]
40 timestamp: SystemTime,
41 #[allow(dead_code)]
43 previous_cid: Option<Cid>,
44 next_cid: Option<Cid>,
46}
47
48#[derive(Debug, Clone)]
50struct CoLocationPattern {
51 count: u64,
53 last_seen: SystemTime,
55 confidence: f64,
57}
58
59#[derive(Debug, Clone)]
61pub struct PrefetchPrediction {
62 pub cid: Cid,
64 pub confidence: f64,
66 pub predicted_access: SystemTime,
68 pub pattern: AccessPattern,
70}
71
72#[derive(Debug, Clone)]
74pub struct PrefetchConfig {
75 pub max_prefetch_depth: usize,
77 pub min_confidence: f64,
79 pub max_concurrent_prefetch: usize,
81 pub pattern_window: Duration,
83 pub enable_sequential: bool,
85 pub enable_colocation: bool,
87 pub enable_temporal: bool,
89}
90
91impl Default for PrefetchConfig {
92 fn default() -> Self {
93 Self {
94 max_prefetch_depth: 5,
95 min_confidence: 0.6,
96 max_concurrent_prefetch: 3,
97 pattern_window: Duration::from_secs(300), enable_sequential: true,
99 enable_colocation: true,
100 enable_temporal: true,
101 }
102 }
103}
104
105#[derive(Debug, Default)]
107pub struct PrefetchStats {
108 pub prefetch_attempts: AtomicU64,
110 pub prefetch_hits: AtomicU64,
112 pub prefetch_misses: AtomicU64,
114 pub bytes_prefetched: AtomicU64,
116 pub avg_confidence: parking_lot::Mutex<f64>,
118}
119
120impl PrefetchStats {
121 fn record_attempt(&self) {
122 self.prefetch_attempts.fetch_add(1, Ordering::Relaxed);
123 }
124
125 fn record_hit(&self, bytes: u64) {
126 self.prefetch_hits.fetch_add(1, Ordering::Relaxed);
127 self.bytes_prefetched.fetch_add(bytes, Ordering::Relaxed);
128 }
129
130 fn record_miss(&self) {
131 self.prefetch_misses.fetch_add(1, Ordering::Relaxed);
132 }
133
134 pub fn hit_rate(&self) -> f64 {
136 let hits = self.prefetch_hits.load(Ordering::Relaxed) as f64;
137 let total = self.prefetch_attempts.load(Ordering::Relaxed) as f64;
138 if total > 0.0 {
139 hits / total
140 } else {
141 0.0
142 }
143 }
144}
145
146pub struct PredictivePrefetcher<S: BlockStore> {
148 store: Arc<S>,
149 config: parking_lot::RwLock<PrefetchConfig>,
150 access_history: DashMap<Cid, VecDeque<AccessRecord>>,
152 colocation_patterns: DashMap<Cid, DashMap<Cid, CoLocationPattern>>,
154 last_accessed: parking_lot::Mutex<Option<Cid>>,
156 #[allow(dead_code)]
158 prefetch_queue: DashMap<Cid, PrefetchPrediction>,
159 prefetch_cache: DashMap<Cid, (Vec<u8>, SystemTime)>,
161 stats: PrefetchStats,
163 prefetch_semaphore: Arc<Semaphore>,
165 current_depth: AtomicUsize,
167}
168
169impl<S: BlockStore + Send + Sync + 'static> PredictivePrefetcher<S> {
170 pub fn new(store: Arc<S>, config: PrefetchConfig) -> Self {
172 let max_concurrent = config.max_concurrent_prefetch;
173 let initial_depth = config.max_prefetch_depth;
174
175 Self {
176 store,
177 config: parking_lot::RwLock::new(config),
178 access_history: DashMap::new(),
179 colocation_patterns: DashMap::new(),
180 last_accessed: parking_lot::Mutex::new(None),
181 prefetch_queue: DashMap::new(),
182 prefetch_cache: DashMap::new(),
183 stats: PrefetchStats::default(),
184 prefetch_semaphore: Arc::new(Semaphore::new(max_concurrent)),
185 current_depth: AtomicUsize::new(initial_depth),
186 }
187 }
188
189 pub fn record_access(&self, cid: &Cid) {
191 let now = SystemTime::now();
192 let previous = self.last_accessed.lock().clone();
193
194 {
196 let mut history = self
197 .access_history
198 .entry(*cid)
199 .or_insert_with(VecDeque::new);
200 history.push_back(AccessRecord {
201 timestamp: now,
202 previous_cid: previous,
203 next_cid: None,
204 });
205
206 if history.len() > 100 {
208 history.pop_front();
209 }
210 } if let Some(prev_cid) = previous {
214 if prev_cid != *cid {
216 if let Some(mut prev_history) = self.access_history.get_mut(&prev_cid) {
217 if let Some(last_record) = prev_history.back_mut() {
218 last_record.next_cid = Some(*cid);
219 }
220 }
221 }
222
223 if self.config.read().enable_colocation {
225 self.update_colocation_pattern(&prev_cid, cid);
226 }
227 }
228
229 *self.last_accessed.lock() = Some(*cid);
231
232 if let Some(entry) = self.prefetch_cache.get(cid) {
234 let prefetch_time = entry.value().1;
235 let age = now.duration_since(prefetch_time).unwrap_or_default();
236 if age < Duration::from_secs(60) {
237 self.stats.record_hit(0); } else {
240 self.stats.record_miss();
241 }
242 }
243 }
244
245 fn update_colocation_pattern(&self, cid1: &Cid, cid2: &Cid) {
247 let patterns = self
248 .colocation_patterns
249 .entry(*cid1)
250 .or_insert_with(DashMap::new);
251
252 patterns
253 .entry(*cid2)
254 .and_modify(|pattern| {
255 pattern.count += 1;
256 pattern.last_seen = SystemTime::now();
257 let recency_factor = 0.9; pattern.confidence = (pattern.confidence * recency_factor + 0.1).min(1.0);
260 })
261 .or_insert_with(|| CoLocationPattern {
262 count: 1,
263 last_seen: SystemTime::now(),
264 confidence: 0.5,
265 });
266 }
267
268 pub fn predict_next_blocks(&self, current_cid: &Cid) -> Vec<PrefetchPrediction> {
270 let config = self.config.read();
271 let mut predictions = Vec::new();
272
273 if config.enable_sequential {
275 if let Some(seq_predictions) = self.predict_sequential(current_cid) {
276 predictions.extend(seq_predictions);
277 }
278 }
279
280 if config.enable_colocation {
282 if let Some(coloc_predictions) = self.predict_colocation(current_cid) {
283 predictions.extend(coloc_predictions);
284 }
285 }
286
287 predictions.retain(|p| p.confidence >= config.min_confidence);
289 predictions.sort_by(|a, b| b.confidence.partial_cmp(&a.confidence).unwrap());
290
291 let depth = self.current_depth.load(Ordering::Relaxed);
292 predictions.truncate(depth);
293
294 predictions
295 }
296
297 fn predict_sequential(&self, cid: &Cid) -> Option<Vec<PrefetchPrediction>> {
299 let history = self.access_history.get(cid)?;
300
301 let next_counts: DashMap<Cid, u64> = DashMap::new();
303
304 for record in history.iter() {
305 if let Some(next_cid) = record.next_cid {
306 *next_counts.entry(next_cid).or_insert(0) += 1;
307 }
308 }
309
310 if next_counts.is_empty() {
311 return None;
312 }
313
314 let mut predictions = Vec::new();
316 let total_accesses = history.len() as f64;
317
318 for entry in next_counts.iter() {
319 let count = *entry.value() as f64;
320 let confidence = count / total_accesses;
321
322 if confidence >= 0.3 {
323 predictions.push(PrefetchPrediction {
324 cid: *entry.key(),
325 confidence,
326 predicted_access: SystemTime::now(),
327 pattern: AccessPattern::Sequential,
328 });
329 }
330 }
331
332 Some(predictions)
333 }
334
335 fn predict_colocation(&self, cid: &Cid) -> Option<Vec<PrefetchPrediction>> {
337 let patterns = self.colocation_patterns.get(cid)?;
338
339 let mut predictions = Vec::new();
340
341 for entry in patterns.iter() {
342 let pattern = entry.value();
343
344 let age = SystemTime::now()
346 .duration_since(pattern.last_seen)
347 .unwrap_or_default();
348
349 if age < self.config.read().pattern_window {
350 predictions.push(PrefetchPrediction {
351 cid: *entry.key(),
352 confidence: pattern.confidence,
353 predicted_access: SystemTime::now(),
354 pattern: AccessPattern::Clustered,
355 });
356 }
357 }
358
359 Some(predictions)
360 }
361
362 pub async fn prefetch_background(&self, predictions: Vec<PrefetchPrediction>) {
364 for prediction in predictions {
365 let store = self.store.clone();
366 let cache = self.prefetch_cache.clone();
367 let stats = &self.stats;
368 let semaphore = self.prefetch_semaphore.clone();
369
370 stats.record_attempt();
371
372 let cid = prediction.cid;
373 trace!(
374 "Prefetching block {} (confidence: {:.2})",
375 cid,
376 prediction.confidence
377 );
378
379 tokio::spawn(async move {
381 let _permit = semaphore.acquire().await.ok();
382
383 if let Ok(Some(block)) = store.get(&cid).await {
384 cache.insert(cid, (block.data().to_vec(), SystemTime::now()));
385 debug!("Prefetched block {}", cid);
386 }
387 });
388 }
389 }
390
391 pub fn adapt_depth(&self) {
393 let hit_rate = self.stats.hit_rate();
394 let current = self.current_depth.load(Ordering::Relaxed);
395 let max_depth = self.config.read().max_prefetch_depth;
396
397 let new_depth = if hit_rate > 0.8 {
398 (current + 1).min(max_depth)
400 } else if hit_rate < 0.4 {
401 (current.saturating_sub(1)).max(1)
403 } else {
404 current
405 };
406
407 if new_depth != current {
408 self.current_depth.store(new_depth, Ordering::Relaxed);
409 debug!(
410 "Adapted prefetch depth: {} -> {} (hit rate: {:.2})",
411 current, new_depth, hit_rate
412 );
413 }
414 }
415
416 pub fn stats(&self) -> PrefetchStatsSnapshot {
418 PrefetchStatsSnapshot {
419 prefetch_attempts: self.stats.prefetch_attempts.load(Ordering::Relaxed),
420 prefetch_hits: self.stats.prefetch_hits.load(Ordering::Relaxed),
421 prefetch_misses: self.stats.prefetch_misses.load(Ordering::Relaxed),
422 bytes_prefetched: self.stats.bytes_prefetched.load(Ordering::Relaxed),
423 hit_rate: self.stats.hit_rate(),
424 current_depth: self.current_depth.load(Ordering::Relaxed),
425 }
426 }
427
428 pub fn clear_cache(&self) {
430 self.prefetch_cache.clear();
431 }
432
433 pub fn cache_size(&self) -> usize {
435 self.prefetch_cache.len()
436 }
437}
438
439#[derive(Debug, Clone, Serialize, Deserialize)]
441pub struct PrefetchStatsSnapshot {
442 pub prefetch_attempts: u64,
443 pub prefetch_hits: u64,
444 pub prefetch_misses: u64,
445 pub bytes_prefetched: u64,
446 pub hit_rate: f64,
447 pub current_depth: usize,
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::memory::MemoryBlockStore;
454 use ipfrs_core::cid::CidBuilder;
455
456 fn test_cid(index: u64) -> Cid {
458 CidBuilder::new()
459 .build(&index.to_le_bytes())
460 .expect("failed to create test cid")
461 }
462
463 #[tokio::test]
464 async fn test_prefetcher_creation() {
465 let store = Arc::new(MemoryBlockStore::new());
466 let config = PrefetchConfig::default();
467 let prefetcher = PredictivePrefetcher::new(store, config);
468
469 let stats = prefetcher.stats();
470 assert_eq!(stats.prefetch_attempts, 0);
471 assert_eq!(stats.hit_rate, 0.0);
472 }
473
474 #[tokio::test]
475 async fn test_access_recording() {
476 let store = Arc::new(MemoryBlockStore::new());
477 let prefetcher = PredictivePrefetcher::new(store, PrefetchConfig::default());
478
479 let cid1 = test_cid(1);
480 let cid2 = test_cid(2);
481
482 prefetcher.record_access(&cid1);
483 prefetcher.record_access(&cid2);
484
485 assert!(prefetcher.colocation_patterns.contains_key(&cid1));
487 }
488
489 #[tokio::test]
490 async fn test_sequential_prediction() {
491 let store = Arc::new(MemoryBlockStore::new());
492 let prefetcher = PredictivePrefetcher::new(store, PrefetchConfig::default());
493
494 let cid1 = test_cid(1);
495 let cid2 = test_cid(2);
496
497 for _ in 0..5 {
499 prefetcher.record_access(&cid1);
500 prefetcher.record_access(&cid2);
501 }
502
503 let predictions = prefetcher.predict_next_blocks(&cid1);
504 assert!(!predictions.is_empty());
505
506 assert!(predictions
508 .iter()
509 .any(|p| p.pattern == AccessPattern::Sequential));
510 }
511}