use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
#[cfg(feature = "async")]
use tokio::sync::RwLock;
use crate::error::Result;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum AccessPattern {
SequentialForward,
SequentialBackward,
Strided { stride: i64 },
Random,
Spatial,
TemporalPeriodic { period_ms: u64 },
Burst,
#[default]
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum PrefetchPriority {
Critical = 4,
High = 3,
#[default]
Medium = 2,
Low = 1,
Background = 0,
}
#[derive(Debug, Clone)]
pub struct AccessRecord {
pub key: String,
pub timestamp: Instant,
pub coordinates: Option<(usize, usize, usize)>,
pub size: Option<usize>,
pub latency: Option<Duration>,
pub cache_hit: bool,
}
impl AccessRecord {
#[must_use]
pub fn new(key: String) -> Self {
Self {
key,
timestamp: Instant::now(),
coordinates: None,
size: None,
latency: None,
cache_hit: false,
}
}
#[must_use]
pub fn with_coordinates(key: String, x: usize, y: usize, z: usize) -> Self {
Self {
key,
timestamp: Instant::now(),
coordinates: Some((x, y, z)),
size: None,
latency: None,
cache_hit: false,
}
}
#[must_use]
pub fn with_size(mut self, size: usize) -> Self {
self.size = Some(size);
self
}
#[must_use]
pub fn with_latency(mut self, latency: Duration) -> Self {
self.latency = Some(latency);
self
}
#[must_use]
pub fn with_cache_hit(mut self, hit: bool) -> Self {
self.cache_hit = hit;
self
}
}
#[derive(Debug, Clone)]
pub struct PrefetchTarget {
pub key: String,
pub priority: PrefetchPriority,
pub coordinates: Option<(usize, usize, usize)>,
pub confidence: f64,
pub estimated_size: Option<usize>,
}
impl PrefetchTarget {
#[must_use]
pub fn new(key: String, priority: PrefetchPriority, confidence: f64) -> Self {
Self {
key,
priority,
coordinates: None,
confidence,
estimated_size: None,
}
}
#[must_use]
pub fn with_coordinates(mut self, x: usize, y: usize, z: usize) -> Self {
self.coordinates = Some((x, y, z));
self
}
#[must_use]
pub fn with_estimated_size(mut self, size: usize) -> Self {
self.estimated_size = Some(size);
self
}
}
#[derive(Debug)]
pub struct TemporalPatternAnalyzer {
intervals: VecDeque<u64>,
max_intervals: usize,
detected_period: Option<u64>,
burst_threshold: f64,
}
impl TemporalPatternAnalyzer {
#[must_use]
pub fn new(max_intervals: usize) -> Self {
Self {
intervals: VecDeque::with_capacity(max_intervals),
max_intervals,
detected_period: None,
burst_threshold: 10.0, }
}
pub fn record_interval(&mut self, interval_ms: u64) {
if self.intervals.len() >= self.max_intervals {
self.intervals.pop_front();
}
self.intervals.push_back(interval_ms);
self.analyze_periodicity();
}
fn analyze_periodicity(&mut self) {
if self.intervals.len() < 5 {
self.detected_period = None;
return;
}
let sum: u64 = self.intervals.iter().sum();
let mean = sum as f64 / self.intervals.len() as f64;
let variance: f64 = self
.intervals
.iter()
.map(|&x| {
let diff = x as f64 - mean;
diff * diff
})
.sum::<f64>()
/ self.intervals.len() as f64;
let std_dev = variance.sqrt();
let cv = std_dev / mean;
if cv < 0.3 && mean > 100.0 {
self.detected_period = Some(mean as u64);
} else {
self.detected_period = None;
}
}
#[must_use]
pub fn is_burst(&self) -> bool {
if self.intervals.len() < 3 {
return false;
}
let recent: Vec<_> = self.intervals.iter().rev().take(5).copied().collect();
if recent.is_empty() {
return false;
}
let avg_interval = recent.iter().sum::<u64>() as f64 / recent.len() as f64;
avg_interval < (1000.0 / self.burst_threshold)
}
#[must_use]
pub fn detected_period(&self) -> Option<u64> {
self.detected_period
}
#[must_use]
pub fn predict_next_access(&self, last_access: Instant) -> Option<Instant> {
self.detected_period
.map(|period| last_access + Duration::from_millis(period))
}
}
#[derive(Debug)]
pub struct SpatialLocalityAnalyzer {
coordinates: VecDeque<(usize, usize, usize)>,
max_history: usize,
direction: Option<(i64, i64)>,
zoom_direction: Option<i64>,
}
impl SpatialLocalityAnalyzer {
#[must_use]
pub fn new(max_history: usize) -> Self {
Self {
coordinates: VecDeque::with_capacity(max_history),
max_history,
direction: None,
zoom_direction: None,
}
}
pub fn record_access(&mut self, x: usize, y: usize, z: usize) {
if let Some(&(prev_x, prev_y, prev_z)) = self.coordinates.back() {
let dx = x as i64 - prev_x as i64;
let dy = y as i64 - prev_y as i64;
let dz = z as i64 - prev_z as i64;
if let Some((curr_dx, curr_dy)) = self.direction {
self.direction = Some((
(curr_dx + dx) / 2, (curr_dy + dy) / 2,
));
} else {
self.direction = Some((dx, dy));
}
self.zoom_direction = if dz != 0 { Some(dz) } else { None };
}
if self.coordinates.len() >= self.max_history {
self.coordinates.pop_front();
}
self.coordinates.push_back((x, y, z));
}
#[must_use]
pub fn predict_adjacent(&self, count: usize) -> Vec<(usize, usize, usize)> {
let Some(&(x, y, z)) = self.coordinates.back() else {
return Vec::new();
};
let mut predictions = Vec::with_capacity(count);
if let Some((dx, dy)) = self.direction {
let norm_dx = dx.signum();
let norm_dy = dy.signum();
if norm_dx != 0 {
let new_x = if norm_dx > 0 {
x + 1
} else {
x.saturating_sub(1)
};
predictions.push((new_x, y, z));
}
if norm_dy != 0 {
let new_y = if norm_dy > 0 {
y + 1
} else {
y.saturating_sub(1)
};
predictions.push((x, new_y, z));
}
if norm_dx != 0 && norm_dy != 0 {
let new_x = if norm_dx > 0 {
x + 1
} else {
x.saturating_sub(1)
};
let new_y = if norm_dy > 0 {
y + 1
} else {
y.saturating_sub(1)
};
predictions.push((new_x, new_y, z));
}
}
let spiral_offsets: [(i64, i64); 8] = [
(1, 0),
(0, 1),
(-1, 0),
(0, -1),
(1, 1),
(-1, 1),
(-1, -1),
(1, -1),
];
for (ox, oy) in spiral_offsets {
if predictions.len() >= count {
break;
}
let new_x = (x as i64 + ox).max(0) as usize;
let new_y = (y as i64 + oy).max(0) as usize;
let coord = (new_x, new_y, z);
if !predictions.contains(&coord) {
predictions.push(coord);
}
}
if let Some(dz) = self.zoom_direction {
let new_z = (z as i64 + dz).max(0) as usize;
if predictions.len() < count {
predictions.push((x, y, new_z));
}
}
predictions.truncate(count);
predictions
}
#[must_use]
pub fn movement_direction(&self) -> Option<(i64, i64)> {
self.direction
}
}
pub struct PatternAnalyzer {
history: VecDeque<AccessRecord>,
max_history: usize,
current_pattern: AccessPattern,
temporal: TemporalPatternAnalyzer,
spatial: SpatialLocalityAnalyzer,
key_frequency: HashMap<String, u64>,
detected_stride: Option<i64>,
}
impl PatternAnalyzer {
#[must_use]
pub fn new(max_history: usize) -> Self {
Self {
history: VecDeque::with_capacity(max_history),
max_history,
current_pattern: AccessPattern::Unknown,
temporal: TemporalPatternAnalyzer::new(50),
spatial: SpatialLocalityAnalyzer::new(20),
key_frequency: HashMap::new(),
detected_stride: None,
}
}
pub fn record_access(&mut self, record: AccessRecord) {
if let Some(last) = self.history.back() {
let interval = record.timestamp.duration_since(last.timestamp);
self.temporal.record_interval(interval.as_millis() as u64);
}
if let Some((x, y, z)) = record.coordinates {
self.spatial.record_access(x, y, z);
}
*self.key_frequency.entry(record.key.clone()).or_insert(0) += 1;
if self.history.len() >= self.max_history {
self.history.pop_front();
}
self.history.push_back(record);
self.analyze_patterns();
}
fn analyze_patterns(&mut self) {
if self.history.len() < 3 {
self.current_pattern = AccessPattern::Unknown;
return;
}
if self.is_spatial() {
self.current_pattern = AccessPattern::Spatial;
return;
}
if let Some(stride) = self.detect_stride() {
self.detected_stride = Some(stride);
self.current_pattern = AccessPattern::Strided { stride };
return;
}
if self.is_sequential_forward() {
self.current_pattern = AccessPattern::SequentialForward;
return;
}
if self.is_sequential_backward() {
self.current_pattern = AccessPattern::SequentialBackward;
return;
}
if let Some(period) = self.temporal.detected_period() {
self.current_pattern = AccessPattern::TemporalPeriodic { period_ms: period };
return;
}
if self.temporal.is_burst() {
self.current_pattern = AccessPattern::Burst;
return;
}
self.current_pattern = AccessPattern::Random;
}
fn is_sequential_forward(&self) -> bool {
let recent: Vec<_> = self.history.iter().rev().take(5).collect();
if recent.len() < 3 {
return false;
}
let mut sequential_count = 0;
for window in recent.windows(2) {
if let (Some(&a), Some(&b)) = (window.first(), window.get(1)) {
if let (Some(na), Some(nb)) = (
extract_trailing_number(&a.key),
extract_trailing_number(&b.key),
) {
if na == nb + 1 {
sequential_count += 1;
}
}
}
}
sequential_count >= (recent.len() - 1) / 2
}
fn is_sequential_backward(&self) -> bool {
let recent: Vec<_> = self.history.iter().rev().take(5).collect();
if recent.len() < 3 {
return false;
}
let mut backward_count = 0;
for window in recent.windows(2) {
if let (Some(&a), Some(&b)) = (window.first(), window.get(1)) {
if let (Some(na), Some(nb)) = (
extract_trailing_number(&a.key),
extract_trailing_number(&b.key),
) {
if nb == na + 1 {
backward_count += 1;
}
}
}
}
backward_count >= (recent.len() - 1) / 2
}
fn detect_stride(&self) -> Option<i64> {
let recent: Vec<_> = self.history.iter().rev().take(6).collect();
if recent.len() < 4 {
return None;
}
let numbers: Vec<i64> = recent
.iter()
.filter_map(|r| extract_trailing_number(&r.key))
.collect();
if numbers.len() < 4 {
return None;
}
let diffs: Vec<i64> = numbers.windows(2).map(|w| w[0] - w[1]).collect();
if diffs.is_empty() {
return None;
}
let first_diff = diffs[0];
if first_diff.abs() <= 1 {
return None;
}
let consistent = diffs.iter().all(|&d| d == first_diff);
if consistent { Some(first_diff) } else { None }
}
fn is_spatial(&self) -> bool {
let coords_count = self
.history
.iter()
.rev()
.take(5)
.filter(|r| r.coordinates.is_some())
.count();
coords_count >= 3
}
#[must_use]
pub fn current_pattern(&self) -> AccessPattern {
self.current_pattern
}
#[must_use]
pub fn predict_next(&self, count: usize) -> Vec<PrefetchTarget> {
match self.current_pattern {
AccessPattern::SequentialForward => self.predict_sequential_forward(count),
AccessPattern::SequentialBackward => self.predict_sequential_backward(count),
AccessPattern::Strided { stride } => self.predict_strided(count, stride),
AccessPattern::Spatial => self.predict_spatial(count),
AccessPattern::Burst | AccessPattern::Random => self.predict_hot_spots(count),
AccessPattern::TemporalPeriodic { .. } => self.predict_recent(count),
AccessPattern::Unknown => Vec::new(),
}
}
fn predict_sequential_forward(&self, count: usize) -> Vec<PrefetchTarget> {
let Some(last) = self.history.back() else {
return Vec::new();
};
let mut predictions = Vec::new();
for i in 1..=count {
if let Some(next_key) = increment_key(&last.key, i as i64) {
let priority = if i == 1 {
PrefetchPriority::Critical
} else if i <= 3 {
PrefetchPriority::High
} else {
PrefetchPriority::Medium
};
let confidence = 1.0 - (i as f64 * 0.1).min(0.5);
predictions.push(PrefetchTarget::new(next_key, priority, confidence));
}
}
predictions
}
fn predict_sequential_backward(&self, count: usize) -> Vec<PrefetchTarget> {
let Some(last) = self.history.back() else {
return Vec::new();
};
let mut predictions = Vec::new();
for i in 1..=count {
if let Some(next_key) = increment_key(&last.key, -(i as i64)) {
let priority = if i == 1 {
PrefetchPriority::Critical
} else if i <= 3 {
PrefetchPriority::High
} else {
PrefetchPriority::Medium
};
let confidence = 1.0 - (i as f64 * 0.1).min(0.5);
predictions.push(PrefetchTarget::new(next_key, priority, confidence));
}
}
predictions
}
fn predict_strided(&self, count: usize, stride: i64) -> Vec<PrefetchTarget> {
let Some(last) = self.history.back() else {
return Vec::new();
};
let mut predictions = Vec::new();
for i in 1..=count {
if let Some(next_key) = increment_key(&last.key, stride * i as i64) {
let priority = if i == 1 {
PrefetchPriority::High
} else {
PrefetchPriority::Medium
};
let confidence = 0.8 - (i as f64 * 0.1).min(0.4);
predictions.push(PrefetchTarget::new(next_key, priority, confidence));
}
}
predictions
}
fn predict_spatial(&self, count: usize) -> Vec<PrefetchTarget> {
let coords = self.spatial.predict_adjacent(count);
coords
.into_iter()
.enumerate()
.map(|(i, (x, y, z))| {
let key = format!("tile_{x}_{y}_{z}");
let priority = if i == 0 {
PrefetchPriority::High
} else if i < 4 {
PrefetchPriority::Medium
} else {
PrefetchPriority::Low
};
let confidence = 0.9 - (i as f64 * 0.1).min(0.5);
PrefetchTarget::new(key, priority, confidence).with_coordinates(x, y, z)
})
.collect()
}
fn predict_hot_spots(&self, count: usize) -> Vec<PrefetchTarget> {
let mut freq_vec: Vec<_> = self.key_frequency.iter().collect();
freq_vec.sort_by(|a, b| b.1.cmp(a.1));
freq_vec
.into_iter()
.take(count)
.map(|(key, freq)| {
let priority = if *freq > 10 {
PrefetchPriority::High
} else if *freq > 5 {
PrefetchPriority::Medium
} else {
PrefetchPriority::Low
};
let confidence = (*freq as f64 / 20.0).min(0.8);
PrefetchTarget::new(key.clone(), priority, confidence)
})
.collect()
}
fn predict_recent(&self, count: usize) -> Vec<PrefetchTarget> {
self.history
.iter()
.rev()
.take(count)
.enumerate()
.map(|(i, record)| {
let priority = if i == 0 {
PrefetchPriority::Medium
} else {
PrefetchPriority::Low
};
PrefetchTarget::new(record.key.clone(), priority, 0.5)
})
.collect()
}
}
#[derive(Debug, Default)]
pub struct BufferStats {
pub total_prefetches: AtomicU64,
pub successful_prefetches: AtomicU64,
pub wasted_prefetches: AtomicU64,
pub avg_latency_us: AtomicU64,
pub utilization_pct: AtomicU64,
}
impl BufferStats {
pub fn record_prefetch(&self, used: bool, latency_us: u64) {
self.total_prefetches.fetch_add(1, Ordering::Relaxed);
if used {
self.successful_prefetches.fetch_add(1, Ordering::Relaxed);
} else {
self.wasted_prefetches.fetch_add(1, Ordering::Relaxed);
}
let current = self.avg_latency_us.load(Ordering::Relaxed);
let new_avg = (current * 9 + latency_us) / 10;
self.avg_latency_us.store(new_avg, Ordering::Relaxed);
}
#[must_use]
pub fn hit_rate(&self) -> f64 {
let total = self.total_prefetches.load(Ordering::Relaxed);
if total == 0 {
return 0.0;
}
let successful = self.successful_prefetches.load(Ordering::Relaxed);
successful as f64 / total as f64
}
}
pub struct AdaptiveBufferSizer {
min_size: usize,
max_size: usize,
current_size: AtomicUsize,
stats: Arc<BufferStats>,
target_hit_rate: f64,
last_adjustment: RwLock<Instant>,
adjustment_interval: Duration,
}
impl AdaptiveBufferSizer {
#[must_use]
pub fn new(min_size: usize, max_size: usize) -> Self {
Self {
min_size,
max_size,
current_size: AtomicUsize::new(min_size),
stats: Arc::new(BufferStats::default()),
target_hit_rate: 0.7,
last_adjustment: RwLock::new(Instant::now()),
adjustment_interval: Duration::from_secs(10),
}
}
#[must_use]
pub fn current_size(&self) -> usize {
self.current_size.load(Ordering::Relaxed)
}
#[must_use]
pub fn stats(&self) -> Arc<BufferStats> {
Arc::clone(&self.stats)
}
pub async fn maybe_adjust(&self) {
let last = *self.last_adjustment.read().await;
if last.elapsed() < self.adjustment_interval {
return;
}
let hit_rate = self.stats.hit_rate();
let current = self.current_size.load(Ordering::Relaxed);
let new_size = if hit_rate < self.target_hit_rate - 0.1 {
(current as f64 * 1.5) as usize
} else if hit_rate > self.target_hit_rate + 0.1 {
(current as f64 * 0.9) as usize
} else {
current
};
let clamped = new_size.clamp(self.min_size, self.max_size);
self.current_size.store(clamped, Ordering::Relaxed);
*self.last_adjustment.write().await = Instant::now();
tracing::debug!(
"Adjusted prefetch buffer: {} -> {} bytes (hit_rate: {:.2})",
current,
clamped,
hit_rate
);
}
}
pub struct MemoryAwarePrefetcher {
max_memory: usize,
current_usage: AtomicUsize,
pressure_threshold: f64,
}
impl MemoryAwarePrefetcher {
#[must_use]
pub fn new(max_memory: usize) -> Self {
Self {
max_memory,
current_usage: AtomicUsize::new(0),
pressure_threshold: 0.8,
}
}
#[must_use]
pub fn can_prefetch(&self, size: usize) -> bool {
let current = self.current_usage.load(Ordering::Relaxed);
let usage_ratio = current as f64 / self.max_memory as f64;
if usage_ratio > self.pressure_threshold {
return false;
}
current + size <= self.max_memory
}
pub fn allocate(&self, size: usize) -> bool {
let current = self.current_usage.load(Ordering::Relaxed);
if current + size > self.max_memory {
return false;
}
self.current_usage.fetch_add(size, Ordering::Relaxed);
true
}
pub fn release(&self, size: usize) {
self.current_usage.fetch_sub(size, Ordering::Relaxed);
}
#[must_use]
pub fn current_usage(&self) -> usize {
self.current_usage.load(Ordering::Relaxed)
}
#[must_use]
pub fn pressure(&self) -> f64 {
let current = self.current_usage.load(Ordering::Relaxed);
current as f64 / self.max_memory as f64
}
}
pub struct BandwidthAwarePrefetcher {
max_bandwidth: usize,
bandwidth_used: AtomicUsize,
window_start: RwLock<Instant>,
window_duration: Duration,
samples: RwLock<VecDeque<(Instant, usize)>>,
}
impl BandwidthAwarePrefetcher {
#[must_use]
pub fn new(max_bandwidth: usize) -> Self {
Self {
max_bandwidth,
bandwidth_used: AtomicUsize::new(0),
window_start: RwLock::new(Instant::now()),
window_duration: Duration::from_secs(1),
samples: RwLock::new(VecDeque::with_capacity(100)),
}
}
pub async fn can_prefetch(&self, size: usize) -> bool {
self.maybe_reset_window().await;
let used = self.bandwidth_used.load(Ordering::Relaxed);
used + size <= self.max_bandwidth
}
pub async fn record_usage(&self, size: usize) {
self.maybe_reset_window().await;
self.bandwidth_used.fetch_add(size, Ordering::Relaxed);
let mut samples = self.samples.write().await;
samples.push_back((Instant::now(), size));
if samples.len() > 100 {
samples.pop_front();
}
}
async fn maybe_reset_window(&self) {
let start = *self.window_start.read().await;
if start.elapsed() >= self.window_duration {
self.bandwidth_used.store(0, Ordering::Relaxed);
*self.window_start.write().await = Instant::now();
}
}
pub async fn estimated_bandwidth(&self) -> usize {
let samples = self.samples.read().await;
if samples.len() < 2 {
return 0;
}
let first = samples.front();
let last = samples.back();
if let (Some((first_time, _)), Some((last_time, _))) = (first, last) {
let duration = last_time.duration_since(*first_time);
if duration.as_secs_f64() > 0.0 {
let total_bytes: usize = samples.iter().map(|(_, s)| s).sum();
return (total_bytes as f64 / duration.as_secs_f64()) as usize;
}
}
0
}
#[must_use]
pub fn remaining_bandwidth(&self) -> usize {
let used = self.bandwidth_used.load(Ordering::Relaxed);
self.max_bandwidth.saturating_sub(used)
}
}
#[derive(Debug, Clone)]
pub struct PrefetchConfig {
pub enabled: bool,
pub prefetch_count: usize,
pub max_concurrent: usize,
pub bandwidth_limit: Option<usize>,
pub memory_limit: usize,
pub pattern_prediction: bool,
pub adaptive_sizing: bool,
pub min_confidence: f64,
}
impl Default for PrefetchConfig {
fn default() -> Self {
Self {
enabled: true,
prefetch_count: 5,
max_concurrent: 4,
bandwidth_limit: None,
memory_limit: 64 * 1024 * 1024, pattern_prediction: true,
adaptive_sizing: true,
min_confidence: 0.3,
}
}
}
impl PrefetchConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
#[must_use]
pub fn with_prefetch_count(mut self, count: usize) -> Self {
self.prefetch_count = count;
self
}
#[must_use]
pub fn with_max_concurrent(mut self, max: usize) -> Self {
self.max_concurrent = max;
self
}
#[must_use]
pub fn with_bandwidth_limit(mut self, limit: usize) -> Self {
self.bandwidth_limit = Some(limit);
self
}
#[must_use]
pub fn with_memory_limit(mut self, limit: usize) -> Self {
self.memory_limit = limit;
self
}
#[must_use]
pub fn with_adaptive_sizing(mut self, enabled: bool) -> Self {
self.adaptive_sizing = enabled;
self
}
}
#[cfg(feature = "async")]
pub struct PrefetchQueue {
queues: RwLock<BTreeMap<PrefetchPriority, VecDeque<PrefetchTarget>>>,
queued_keys: RwLock<std::collections::HashSet<String>>,
}
#[cfg(feature = "async")]
impl PrefetchQueue {
#[must_use]
pub fn new() -> Self {
Self {
queues: RwLock::new(BTreeMap::new()),
queued_keys: RwLock::new(std::collections::HashSet::new()),
}
}
pub async fn enqueue(&self, target: PrefetchTarget) {
let mut keys = self.queued_keys.write().await;
if keys.contains(&target.key) {
return; }
keys.insert(target.key.clone());
let mut queues = self.queues.write().await;
queues
.entry(target.priority)
.or_insert_with(VecDeque::new)
.push_back(target);
}
pub async fn dequeue(&self) -> Option<PrefetchTarget> {
let mut queues = self.queues.write().await;
for (_, queue) in queues.iter_mut().rev() {
if let Some(target) = queue.pop_front() {
let mut keys = self.queued_keys.write().await;
keys.remove(&target.key);
return Some(target);
}
}
None
}
pub async fn len(&self) -> usize {
self.queued_keys.read().await.len()
}
pub async fn is_empty(&self) -> bool {
self.queued_keys.read().await.is_empty()
}
}
#[cfg(feature = "async")]
impl Default for PrefetchQueue {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "async")]
pub struct PrefetchManager {
config: PrefetchConfig,
analyzer: Arc<RwLock<PatternAnalyzer>>,
queue: Arc<PrefetchQueue>,
memory: Arc<MemoryAwarePrefetcher>,
bandwidth: Arc<BandwidthAwarePrefetcher>,
buffer_sizer: Option<Arc<AdaptiveBufferSizer>>,
}
#[cfg(feature = "async")]
impl PrefetchManager {
#[must_use]
pub fn new(config: PrefetchConfig) -> Self {
let memory = Arc::new(MemoryAwarePrefetcher::new(config.memory_limit));
let bandwidth = Arc::new(BandwidthAwarePrefetcher::new(
config.bandwidth_limit.unwrap_or(usize::MAX),
));
let buffer_sizer = if config.adaptive_sizing {
Some(Arc::new(AdaptiveBufferSizer::new(
config.memory_limit / 4,
config.memory_limit,
)))
} else {
None
};
Self {
config,
analyzer: Arc::new(RwLock::new(PatternAnalyzer::new(100))),
queue: Arc::new(PrefetchQueue::new()),
memory,
bandwidth,
buffer_sizer,
}
}
pub async fn record_access(&self, record: AccessRecord) -> Vec<PrefetchTarget> {
if !self.config.enabled {
return Vec::new();
}
let mut analyzer = self.analyzer.write().await;
analyzer.record_access(record);
if self.config.pattern_prediction {
let mut predictions = analyzer.predict_next(self.config.prefetch_count);
predictions.retain(|t| t.confidence >= self.config.min_confidence);
for target in &predictions {
self.queue.enqueue(target.clone()).await;
}
predictions
} else {
Vec::new()
}
}
pub async fn can_prefetch(&self, size: usize) -> bool {
if !self.memory.can_prefetch(size) {
return false;
}
if !self.bandwidth.can_prefetch(size).await {
return false;
}
true
}
pub async fn record_prefetch(&self, size: usize, used: bool, latency: Duration) {
self.bandwidth.record_usage(size).await;
if let Some(ref sizer) = self.buffer_sizer {
sizer
.stats()
.record_prefetch(used, latency.as_micros() as u64);
sizer.maybe_adjust().await;
}
}
pub fn allocate_memory(&self, size: usize) -> bool {
self.memory.allocate(size)
}
pub fn release_memory(&self, size: usize) {
self.memory.release(size);
}
pub async fn next_target(&self) -> Option<PrefetchTarget> {
self.queue.dequeue().await
}
pub async fn current_pattern(&self) -> AccessPattern {
self.analyzer.read().await.current_pattern()
}
pub async fn queue_len(&self) -> usize {
self.queue.len().await
}
#[must_use]
pub fn memory_pressure(&self) -> f64 {
self.memory.pressure()
}
#[must_use]
pub fn remaining_bandwidth(&self) -> usize {
self.bandwidth.remaining_bandwidth()
}
#[must_use]
pub fn hit_rate(&self) -> f64 {
self.buffer_sizer
.as_ref()
.map_or(0.0, |s| s.stats().hit_rate())
}
}
fn extract_trailing_number(key: &str) -> Option<i64> {
let parts: Vec<&str> = key
.split(|c: char| !c.is_ascii_digit() && c != '-')
.collect();
parts
.iter()
.rev()
.find(|s| !s.is_empty())
.and_then(|s| s.parse().ok())
}
fn increment_key(key: &str, offset: i64) -> Option<String> {
let parts: Vec<&str> = key.split('_').collect();
for i in (0..parts.len()).rev() {
if let Ok(num) = parts[i].parse::<i64>() {
let new_num = num + offset;
if new_num < 0 {
return None;
}
let mut new_parts: Vec<String> = parts.iter().map(|s| (*s).to_string()).collect();
new_parts[i] = new_num.to_string();
return Some(new_parts.join("_"));
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_trailing_number() {
assert_eq!(extract_trailing_number("file_5"), Some(5));
assert_eq!(extract_trailing_number("tile_10_20_3"), Some(3));
assert_eq!(extract_trailing_number("data"), None);
}
#[test]
fn test_increment_key() {
assert_eq!(increment_key("file_5", 1), Some("file_6".to_string()));
assert_eq!(increment_key("file_5", -1), Some("file_4".to_string()));
assert_eq!(
increment_key("tile_10_20", 1),
Some("tile_10_21".to_string())
);
assert_eq!(increment_key("file_0", -1), None);
}
#[test]
fn test_temporal_pattern_analyzer() {
let mut analyzer = TemporalPatternAnalyzer::new(20);
for _ in 0..10 {
analyzer.record_interval(1000); }
assert!(analyzer.detected_period().is_some());
let period = analyzer.detected_period();
assert!(period.is_some_and(|p| (p as i64 - 1000).abs() < 100));
}
#[test]
fn test_temporal_burst_detection() {
let mut analyzer = TemporalPatternAnalyzer::new(20);
for _ in 0..10 {
analyzer.record_interval(50); }
assert!(analyzer.is_burst());
}
#[test]
fn test_spatial_locality_analyzer() {
let mut analyzer = SpatialLocalityAnalyzer::new(10);
analyzer.record_access(0, 0, 0);
analyzer.record_access(1, 0, 0);
analyzer.record_access(2, 0, 0);
let predictions = analyzer.predict_adjacent(4);
assert!(!predictions.is_empty());
assert!(predictions.iter().any(|&(x, _, _)| x == 3));
}
#[test]
fn test_pattern_analyzer_sequential() {
let mut analyzer = PatternAnalyzer::new(10);
analyzer.record_access(AccessRecord::new("file_0".to_string()));
analyzer.record_access(AccessRecord::new("file_1".to_string()));
analyzer.record_access(AccessRecord::new("file_2".to_string()));
analyzer.record_access(AccessRecord::new("file_3".to_string()));
assert!(matches!(
analyzer.current_pattern(),
AccessPattern::SequentialForward | AccessPattern::SequentialBackward
));
let predictions = analyzer.predict_next(3);
assert!(!predictions.is_empty());
}
#[test]
fn test_pattern_analyzer_spatial() {
let mut analyzer = PatternAnalyzer::new(10);
analyzer.record_access(AccessRecord::with_coordinates(
"tile_0_0_0".to_string(),
0,
0,
0,
));
analyzer.record_access(AccessRecord::with_coordinates(
"tile_1_0_0".to_string(),
1,
0,
0,
));
analyzer.record_access(AccessRecord::with_coordinates(
"tile_2_0_0".to_string(),
2,
0,
0,
));
analyzer.record_access(AccessRecord::with_coordinates(
"tile_3_0_0".to_string(),
3,
0,
0,
));
assert_eq!(analyzer.current_pattern(), AccessPattern::Spatial);
let predictions = analyzer.predict_next(4);
assert!(!predictions.is_empty());
}
#[test]
fn test_prefetch_target_priority_ordering() {
let targets = [
PrefetchTarget::new("low".to_string(), PrefetchPriority::Low, 0.5),
PrefetchTarget::new("critical".to_string(), PrefetchPriority::Critical, 0.9),
PrefetchTarget::new("medium".to_string(), PrefetchPriority::Medium, 0.7),
];
let mut sorted: Vec<_> = targets.iter().map(|t| t.priority).collect();
sorted.sort();
assert_eq!(sorted[0], PrefetchPriority::Low);
assert_eq!(sorted[1], PrefetchPriority::Medium);
assert_eq!(sorted[2], PrefetchPriority::Critical);
}
#[test]
fn test_memory_aware_prefetcher() {
let prefetcher = MemoryAwarePrefetcher::new(1000);
assert!(prefetcher.can_prefetch(500));
assert!(prefetcher.allocate(500));
assert_eq!(prefetcher.current_usage(), 500);
assert!(prefetcher.can_prefetch(400));
assert!(!prefetcher.can_prefetch(600));
prefetcher.release(200);
assert_eq!(prefetcher.current_usage(), 300);
}
#[test]
fn test_buffer_stats() {
let stats = BufferStats::default();
stats.record_prefetch(true, 100);
stats.record_prefetch(true, 200);
stats.record_prefetch(false, 150);
assert_eq!(stats.total_prefetches.load(Ordering::Relaxed), 3);
assert_eq!(stats.successful_prefetches.load(Ordering::Relaxed), 2);
assert_eq!(stats.wasted_prefetches.load(Ordering::Relaxed), 1);
let hit_rate = stats.hit_rate();
assert!((hit_rate - 0.666).abs() < 0.01);
}
#[test]
fn test_prefetch_config_builder() {
let config = PrefetchConfig::new()
.with_enabled(true)
.with_prefetch_count(10)
.with_max_concurrent(8)
.with_bandwidth_limit(1_000_000)
.with_memory_limit(128 * 1024 * 1024)
.with_adaptive_sizing(true);
assert!(config.enabled);
assert_eq!(config.prefetch_count, 10);
assert_eq!(config.max_concurrent, 8);
assert_eq!(config.bandwidth_limit, Some(1_000_000));
assert_eq!(config.memory_limit, 128 * 1024 * 1024);
assert!(config.adaptive_sizing);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_prefetch_queue() {
let queue = PrefetchQueue::new();
queue
.enqueue(PrefetchTarget::new(
"low".to_string(),
PrefetchPriority::Low,
0.5,
))
.await;
queue
.enqueue(PrefetchTarget::new(
"high".to_string(),
PrefetchPriority::High,
0.9,
))
.await;
queue
.enqueue(PrefetchTarget::new(
"medium".to_string(),
PrefetchPriority::Medium,
0.7,
))
.await;
assert_eq!(queue.len().await, 3);
let first = queue.dequeue().await;
assert!(first.is_some());
assert_eq!(first.map(|t| t.key), Some("high".to_string()));
let second = queue.dequeue().await;
assert!(second.is_some());
assert_eq!(second.map(|t| t.key), Some("medium".to_string()));
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_prefetch_queue_deduplication() {
let queue = PrefetchQueue::new();
queue
.enqueue(PrefetchTarget::new(
"key1".to_string(),
PrefetchPriority::High,
0.9,
))
.await;
queue
.enqueue(PrefetchTarget::new(
"key1".to_string(),
PrefetchPriority::Critical,
0.95,
))
.await;
assert_eq!(queue.len().await, 1);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_prefetch_manager() {
let config = PrefetchConfig::new()
.with_prefetch_count(3)
.with_memory_limit(1024 * 1024);
let manager = PrefetchManager::new(config);
manager
.record_access(AccessRecord::new("file_0".to_string()))
.await;
manager
.record_access(AccessRecord::new("file_1".to_string()))
.await;
manager
.record_access(AccessRecord::new("file_2".to_string()))
.await;
let predictions = manager
.record_access(AccessRecord::new("file_3".to_string()))
.await;
assert!(!predictions.is_empty());
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_prefetch_manager_memory_pressure() {
let config = PrefetchConfig::new().with_memory_limit(1000);
let manager = PrefetchManager::new(config);
assert!(manager.allocate_memory(800));
assert!(manager.memory_pressure() > 0.7);
assert!(!manager.can_prefetch(300).await);
manager.release_memory(500);
assert!(manager.can_prefetch(300).await);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_bandwidth_aware_prefetcher() {
let prefetcher = BandwidthAwarePrefetcher::new(1000);
assert!(prefetcher.can_prefetch(500).await);
prefetcher.record_usage(500).await;
assert!(prefetcher.can_prefetch(400).await);
prefetcher.record_usage(400).await;
assert!(!prefetcher.can_prefetch(200).await);
assert!(prefetcher.remaining_bandwidth() < 200);
}
}