ipfrs_storage/
prefetch.rs

1//! Predictive Prefetching for intelligent block preloading
2//!
3//! This module provides predictive prefetching capabilities:
4//! - Access pattern analysis and prediction
5//! - Sequential access detection
6//! - Co-location patterns (blocks accessed together)
7//! - Time-based prediction (blocks accessed at similar times)
8//! - Adaptive prefetch depth based on cache hit rates
9//! - Background prefetching with priority control
10
11use crate::traits::BlockStore;
12use dashmap::DashMap;
13use ipfrs_core::Cid;
14use serde::{Deserialize, Serialize};
15use std::collections::VecDeque;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19use tokio::sync::Semaphore;
20use tracing::{debug, trace};
21
22/// Access pattern type
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
24pub enum AccessPattern {
25    /// Sequential access (e.g., video streaming)
26    Sequential,
27    /// Random access with no clear pattern
28    Random,
29    /// Clustered access (accessing related blocks)
30    Clustered,
31    /// Temporal (accessing at regular intervals)
32    Temporal,
33}
34
35/// Access record for a block
36#[derive(Debug, Clone)]
37struct AccessRecord {
38    /// When the block was accessed
39    #[allow(dead_code)]
40    timestamp: SystemTime,
41    /// Previous block accessed (for pattern detection)
42    #[allow(dead_code)]
43    previous_cid: Option<Cid>,
44    /// Next block accessed (updated retroactively)
45    next_cid: Option<Cid>,
46}
47
48/// Co-location pattern (blocks frequently accessed together)
49#[derive(Debug, Clone)]
50struct CoLocationPattern {
51    /// How many times this pattern was observed
52    count: u64,
53    /// Last time this pattern was seen
54    last_seen: SystemTime,
55    /// Confidence score (0.0 - 1.0)
56    confidence: f64,
57}
58
59/// Prefetch prediction
60#[derive(Debug, Clone)]
61pub struct PrefetchPrediction {
62    /// CID to prefetch
63    pub cid: Cid,
64    /// Confidence score (0.0 - 1.0)
65    pub confidence: f64,
66    /// Predicted access time
67    pub predicted_access: SystemTime,
68    /// Pattern type that generated this prediction
69    pub pattern: AccessPattern,
70}
71
72/// Prefetch configuration
73#[derive(Debug, Clone)]
74pub struct PrefetchConfig {
75    /// Maximum number of blocks to prefetch ahead
76    pub max_prefetch_depth: usize,
77    /// Minimum confidence threshold for prefetching (0.0 - 1.0)
78    pub min_confidence: f64,
79    /// Maximum concurrent prefetch operations
80    pub max_concurrent_prefetch: usize,
81    /// Time window for pattern analysis
82    pub pattern_window: Duration,
83    /// Enable sequential pattern detection
84    pub enable_sequential: bool,
85    /// Enable co-location pattern detection
86    pub enable_colocation: bool,
87    /// Enable temporal pattern detection
88    pub enable_temporal: bool,
89}
90
91impl Default for PrefetchConfig {
92    fn default() -> Self {
93        Self {
94            max_prefetch_depth: 5,
95            min_confidence: 0.6,
96            max_concurrent_prefetch: 3,
97            pattern_window: Duration::from_secs(300), // 5 minutes
98            enable_sequential: true,
99            enable_colocation: true,
100            enable_temporal: true,
101        }
102    }
103}
104
105/// Prefetch statistics
106#[derive(Debug, Default)]
107pub struct PrefetchStats {
108    /// Total prefetch attempts
109    pub prefetch_attempts: AtomicU64,
110    /// Successful prefetches (block was used)
111    pub prefetch_hits: AtomicU64,
112    /// Wasted prefetches (block was not used)
113    pub prefetch_misses: AtomicU64,
114    /// Bytes prefetched
115    pub bytes_prefetched: AtomicU64,
116    /// Average confidence of predictions
117    pub avg_confidence: parking_lot::Mutex<f64>,
118}
119
120impl PrefetchStats {
121    fn record_attempt(&self) {
122        self.prefetch_attempts.fetch_add(1, Ordering::Relaxed);
123    }
124
125    fn record_hit(&self, bytes: u64) {
126        self.prefetch_hits.fetch_add(1, Ordering::Relaxed);
127        self.bytes_prefetched.fetch_add(bytes, Ordering::Relaxed);
128    }
129
130    fn record_miss(&self) {
131        self.prefetch_misses.fetch_add(1, Ordering::Relaxed);
132    }
133
134    /// Get hit rate
135    pub fn hit_rate(&self) -> f64 {
136        let hits = self.prefetch_hits.load(Ordering::Relaxed) as f64;
137        let total = self.prefetch_attempts.load(Ordering::Relaxed) as f64;
138        if total > 0.0 {
139            hits / total
140        } else {
141            0.0
142        }
143    }
144}
145
146/// Predictive prefetcher
147pub struct PredictivePrefetcher<S: BlockStore> {
148    store: Arc<S>,
149    config: parking_lot::RwLock<PrefetchConfig>,
150    /// Access history for each CID
151    access_history: DashMap<Cid, VecDeque<AccessRecord>>,
152    /// Co-location patterns (CID -> related CIDs)
153    colocation_patterns: DashMap<Cid, DashMap<Cid, CoLocationPattern>>,
154    /// Last accessed CID (for detecting sequences)
155    last_accessed: parking_lot::Mutex<Option<Cid>>,
156    /// Prefetch queue
157    #[allow(dead_code)]
158    prefetch_queue: DashMap<Cid, PrefetchPrediction>,
159    /// Prefetch cache (blocks that have been prefetched)
160    prefetch_cache: DashMap<Cid, (Vec<u8>, SystemTime)>,
161    /// Statistics
162    stats: PrefetchStats,
163    /// Semaphore for concurrent prefetch control
164    prefetch_semaphore: Arc<Semaphore>,
165    /// Current prefetch depth (adaptive)
166    current_depth: AtomicUsize,
167}
168
169impl<S: BlockStore + Send + Sync + 'static> PredictivePrefetcher<S> {
170    /// Create a new predictive prefetcher
171    pub fn new(store: Arc<S>, config: PrefetchConfig) -> Self {
172        let max_concurrent = config.max_concurrent_prefetch;
173        let initial_depth = config.max_prefetch_depth;
174
175        Self {
176            store,
177            config: parking_lot::RwLock::new(config),
178            access_history: DashMap::new(),
179            colocation_patterns: DashMap::new(),
180            last_accessed: parking_lot::Mutex::new(None),
181            prefetch_queue: DashMap::new(),
182            prefetch_cache: DashMap::new(),
183            stats: PrefetchStats::default(),
184            prefetch_semaphore: Arc::new(Semaphore::new(max_concurrent)),
185            current_depth: AtomicUsize::new(initial_depth),
186        }
187    }
188
189    /// Record an access and update patterns
190    pub fn record_access(&self, cid: &Cid) {
191        let now = SystemTime::now();
192        let previous = self.last_accessed.lock().clone();
193
194        // Add to access history (drop guard before accessing prev_history to avoid deadlock)
195        {
196            let mut history = self
197                .access_history
198                .entry(*cid)
199                .or_insert_with(VecDeque::new);
200            history.push_back(AccessRecord {
201                timestamp: now,
202                previous_cid: previous,
203                next_cid: None,
204            });
205
206            // Limit history size
207            if history.len() > 100 {
208                history.pop_front();
209            }
210        } // Guard dropped here
211
212        // Update previous access with next CID (safe now since we dropped the guard above)
213        if let Some(prev_cid) = previous {
214            // Only update if prev_cid is different from cid to avoid unnecessary work
215            if prev_cid != *cid {
216                if let Some(mut prev_history) = self.access_history.get_mut(&prev_cid) {
217                    if let Some(last_record) = prev_history.back_mut() {
218                        last_record.next_cid = Some(*cid);
219                    }
220                }
221            }
222
223            // Update co-location patterns
224            if self.config.read().enable_colocation {
225                self.update_colocation_pattern(&prev_cid, cid);
226            }
227        }
228
229        // Update last accessed
230        *self.last_accessed.lock() = Some(*cid);
231
232        // Check if this was prefetched
233        if let Some(entry) = self.prefetch_cache.get(cid) {
234            let prefetch_time = entry.value().1;
235            let age = now.duration_since(prefetch_time).unwrap_or_default();
236            if age < Duration::from_secs(60) {
237                // Prefetch was used within 60 seconds - count as hit
238                self.stats.record_hit(0); // We don't have size info here
239            } else {
240                self.stats.record_miss();
241            }
242        }
243    }
244
245    /// Update co-location pattern
246    fn update_colocation_pattern(&self, cid1: &Cid, cid2: &Cid) {
247        let patterns = self
248            .colocation_patterns
249            .entry(*cid1)
250            .or_insert_with(DashMap::new);
251
252        patterns
253            .entry(*cid2)
254            .and_modify(|pattern| {
255                pattern.count += 1;
256                pattern.last_seen = SystemTime::now();
257                // Update confidence based on recency and frequency
258                let recency_factor = 0.9; // Decay factor
259                pattern.confidence = (pattern.confidence * recency_factor + 0.1).min(1.0);
260            })
261            .or_insert_with(|| CoLocationPattern {
262                count: 1,
263                last_seen: SystemTime::now(),
264                confidence: 0.5,
265            });
266    }
267
268    /// Predict next blocks to access
269    pub fn predict_next_blocks(&self, current_cid: &Cid) -> Vec<PrefetchPrediction> {
270        let config = self.config.read();
271        let mut predictions = Vec::new();
272
273        // Sequential pattern prediction
274        if config.enable_sequential {
275            if let Some(seq_predictions) = self.predict_sequential(current_cid) {
276                predictions.extend(seq_predictions);
277            }
278        }
279
280        // Co-location pattern prediction
281        if config.enable_colocation {
282            if let Some(coloc_predictions) = self.predict_colocation(current_cid) {
283                predictions.extend(coloc_predictions);
284            }
285        }
286
287        // Filter by confidence and limit depth
288        predictions.retain(|p| p.confidence >= config.min_confidence);
289        predictions.sort_by(|a, b| b.confidence.partial_cmp(&a.confidence).unwrap());
290
291        let depth = self.current_depth.load(Ordering::Relaxed);
292        predictions.truncate(depth);
293
294        predictions
295    }
296
297    /// Predict based on sequential access pattern
298    fn predict_sequential(&self, cid: &Cid) -> Option<Vec<PrefetchPrediction>> {
299        let history = self.access_history.get(cid)?;
300
301        // Check if there's a consistent "next" block
302        let next_counts: DashMap<Cid, u64> = DashMap::new();
303
304        for record in history.iter() {
305            if let Some(next_cid) = record.next_cid {
306                *next_counts.entry(next_cid).or_insert(0) += 1;
307            }
308        }
309
310        if next_counts.is_empty() {
311            return None;
312        }
313
314        // Find most common next block
315        let mut predictions = Vec::new();
316        let total_accesses = history.len() as f64;
317
318        for entry in next_counts.iter() {
319            let count = *entry.value() as f64;
320            let confidence = count / total_accesses;
321
322            if confidence >= 0.3 {
323                predictions.push(PrefetchPrediction {
324                    cid: *entry.key(),
325                    confidence,
326                    predicted_access: SystemTime::now(),
327                    pattern: AccessPattern::Sequential,
328                });
329            }
330        }
331
332        Some(predictions)
333    }
334
335    /// Predict based on co-location patterns
336    fn predict_colocation(&self, cid: &Cid) -> Option<Vec<PrefetchPrediction>> {
337        let patterns = self.colocation_patterns.get(cid)?;
338
339        let mut predictions = Vec::new();
340
341        for entry in patterns.iter() {
342            let pattern = entry.value();
343
344            // Check if pattern is recent
345            let age = SystemTime::now()
346                .duration_since(pattern.last_seen)
347                .unwrap_or_default();
348
349            if age < self.config.read().pattern_window {
350                predictions.push(PrefetchPrediction {
351                    cid: *entry.key(),
352                    confidence: pattern.confidence,
353                    predicted_access: SystemTime::now(),
354                    pattern: AccessPattern::Clustered,
355                });
356            }
357        }
358
359        Some(predictions)
360    }
361
362    /// Prefetch predicted blocks in background
363    pub async fn prefetch_background(&self, predictions: Vec<PrefetchPrediction>) {
364        for prediction in predictions {
365            let store = self.store.clone();
366            let cache = self.prefetch_cache.clone();
367            let stats = &self.stats;
368            let semaphore = self.prefetch_semaphore.clone();
369
370            stats.record_attempt();
371
372            let cid = prediction.cid;
373            trace!(
374                "Prefetching block {} (confidence: {:.2})",
375                cid,
376                prediction.confidence
377            );
378
379            // Spawn prefetch task
380            tokio::spawn(async move {
381                let _permit = semaphore.acquire().await.ok();
382
383                if let Ok(Some(block)) = store.get(&cid).await {
384                    cache.insert(cid, (block.data().to_vec(), SystemTime::now()));
385                    debug!("Prefetched block {}", cid);
386                }
387            });
388        }
389    }
390
391    /// Adapt prefetch depth based on hit rate
392    pub fn adapt_depth(&self) {
393        let hit_rate = self.stats.hit_rate();
394        let current = self.current_depth.load(Ordering::Relaxed);
395        let max_depth = self.config.read().max_prefetch_depth;
396
397        let new_depth = if hit_rate > 0.8 {
398            // High hit rate - increase depth
399            (current + 1).min(max_depth)
400        } else if hit_rate < 0.4 {
401            // Low hit rate - decrease depth
402            (current.saturating_sub(1)).max(1)
403        } else {
404            current
405        };
406
407        if new_depth != current {
408            self.current_depth.store(new_depth, Ordering::Relaxed);
409            debug!(
410                "Adapted prefetch depth: {} -> {} (hit rate: {:.2})",
411                current, new_depth, hit_rate
412            );
413        }
414    }
415
416    /// Get statistics
417    pub fn stats(&self) -> PrefetchStatsSnapshot {
418        PrefetchStatsSnapshot {
419            prefetch_attempts: self.stats.prefetch_attempts.load(Ordering::Relaxed),
420            prefetch_hits: self.stats.prefetch_hits.load(Ordering::Relaxed),
421            prefetch_misses: self.stats.prefetch_misses.load(Ordering::Relaxed),
422            bytes_prefetched: self.stats.bytes_prefetched.load(Ordering::Relaxed),
423            hit_rate: self.stats.hit_rate(),
424            current_depth: self.current_depth.load(Ordering::Relaxed),
425        }
426    }
427
428    /// Clear prefetch cache
429    pub fn clear_cache(&self) {
430        self.prefetch_cache.clear();
431    }
432
433    /// Get cache size
434    pub fn cache_size(&self) -> usize {
435        self.prefetch_cache.len()
436    }
437}
438
439/// Snapshot of prefetch statistics
440#[derive(Debug, Clone, Serialize, Deserialize)]
441pub struct PrefetchStatsSnapshot {
442    pub prefetch_attempts: u64,
443    pub prefetch_hits: u64,
444    pub prefetch_misses: u64,
445    pub bytes_prefetched: u64,
446    pub hit_rate: f64,
447    pub current_depth: usize,
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use crate::memory::MemoryBlockStore;
454    use ipfrs_core::cid::CidBuilder;
455
456    /// Helper to create a unique CID from an index
457    fn test_cid(index: u64) -> Cid {
458        CidBuilder::new()
459            .build(&index.to_le_bytes())
460            .expect("failed to create test cid")
461    }
462
463    #[tokio::test]
464    async fn test_prefetcher_creation() {
465        let store = Arc::new(MemoryBlockStore::new());
466        let config = PrefetchConfig::default();
467        let prefetcher = PredictivePrefetcher::new(store, config);
468
469        let stats = prefetcher.stats();
470        assert_eq!(stats.prefetch_attempts, 0);
471        assert_eq!(stats.hit_rate, 0.0);
472    }
473
474    #[tokio::test]
475    async fn test_access_recording() {
476        let store = Arc::new(MemoryBlockStore::new());
477        let prefetcher = PredictivePrefetcher::new(store, PrefetchConfig::default());
478
479        let cid1 = test_cid(1);
480        let cid2 = test_cid(2);
481
482        prefetcher.record_access(&cid1);
483        prefetcher.record_access(&cid2);
484
485        // Should have recorded co-location pattern
486        assert!(prefetcher.colocation_patterns.contains_key(&cid1));
487    }
488
489    #[tokio::test]
490    async fn test_sequential_prediction() {
491        let store = Arc::new(MemoryBlockStore::new());
492        let prefetcher = PredictivePrefetcher::new(store, PrefetchConfig::default());
493
494        let cid1 = test_cid(1);
495        let cid2 = test_cid(2);
496
497        // Simulate sequential access pattern
498        for _ in 0..5 {
499            prefetcher.record_access(&cid1);
500            prefetcher.record_access(&cid2);
501        }
502
503        let predictions = prefetcher.predict_next_blocks(&cid1);
504        assert!(!predictions.is_empty());
505
506        // Should predict cid2 after cid1
507        assert!(predictions
508            .iter()
509            .any(|p| p.pattern == AccessPattern::Sequential));
510    }
511}