use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use super::swizzled_ptr::{DiskLocation, SwizzledPtr};
#[derive(Debug, Default)]
pub struct PrefetchStats {
pub requests: AtomicU64,
pub cache_hits: AtomicU64,
pub io_issued: AtomicU64,
pub dropped: AtomicU64,
}
impl PrefetchStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_request(&self) {
self.requests.fetch_add(1, Ordering::Relaxed);
}
pub fn record_cache_hit(&self) {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
}
pub fn record_io(&self) {
self.io_issued.fetch_add(1, Ordering::Relaxed);
}
pub fn record_dropped(&self) {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
pub fn snapshot(&self) -> PrefetchStatsSnapshot {
PrefetchStatsSnapshot {
requests: self.requests.load(Ordering::Relaxed),
cache_hits: self.cache_hits.load(Ordering::Relaxed),
io_issued: self.io_issued.load(Ordering::Relaxed),
dropped: self.dropped.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct PrefetchStatsSnapshot {
pub requests: u64,
pub cache_hits: u64,
pub io_issued: u64,
pub dropped: u64,
}
impl PrefetchStatsSnapshot {
pub fn hit_rate(&self) -> f64 {
if self.requests == 0 {
0.0
} else {
self.cache_hits as f64 / self.requests as f64
}
}
pub fn drop_rate(&self) -> f64 {
if self.requests == 0 {
0.0
} else {
self.dropped as f64 / self.requests as f64
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PrefetchStrategy {
Immediate,
FirstN(usize),
DepthLimited(usize),
Disabled,
}
impl Default for PrefetchStrategy {
fn default() -> Self {
PrefetchStrategy::Immediate
}
}
#[derive(Debug, Clone)]
pub struct PrefetchRequest {
pub location: DiskLocation,
pub priority: u8,
pub depth: u16,
}
pub struct Prefetcher {
queue: Mutex<VecDeque<PrefetchRequest>>,
max_queue_size: usize,
strategy: PrefetchStrategy,
stats: Arc<PrefetchStats>,
enabled: AtomicBool,
}
impl Prefetcher {
pub fn new() -> Self {
Self::with_config(1024, PrefetchStrategy::default())
}
pub fn with_config(max_queue_size: usize, strategy: PrefetchStrategy) -> Self {
Prefetcher {
queue: Mutex::new(VecDeque::with_capacity(max_queue_size.min(1024))),
max_queue_size,
strategy,
stats: Arc::new(PrefetchStats::new()),
enabled: AtomicBool::new(true),
}
}
pub fn disabled() -> Self {
let mut prefetcher = Self::new();
prefetcher.strategy = PrefetchStrategy::Disabled;
prefetcher.enabled.store(false, Ordering::Relaxed);
prefetcher
}
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}
pub fn set_enabled(&self, enabled: bool) {
self.enabled.store(enabled, Ordering::Relaxed);
}
pub fn strategy(&self) -> PrefetchStrategy {
self.strategy
}
pub fn set_strategy(&mut self, strategy: PrefetchStrategy) {
self.strategy = strategy;
if strategy == PrefetchStrategy::Disabled {
self.enabled.store(false, Ordering::Relaxed);
}
}
pub fn prefetch(&self, ptr: &SwizzledPtr) {
if !self.is_enabled() || self.strategy == PrefetchStrategy::Disabled {
return;
}
self.stats.record_request();
if ptr.is_swizzled() {
self.stats.record_cache_hit();
return;
}
if let Some(location) = ptr.disk_location() {
self.queue_prefetch(PrefetchRequest {
location,
priority: 0,
depth: 0,
});
}
}
pub fn prefetch_with_depth(&self, ptr: &SwizzledPtr, depth: u16) {
if !self.is_enabled() {
return;
}
if let PrefetchStrategy::DepthLimited(max_depth) = self.strategy {
if depth as usize > max_depth {
return;
}
}
self.stats.record_request();
if ptr.is_swizzled() {
self.stats.record_cache_hit();
return;
}
if let Some(location) = ptr.disk_location() {
self.queue_prefetch(PrefetchRequest {
location,
priority: depth as u8,
depth,
});
}
}
pub fn prefetch_children(&self, children: &[(u8, SwizzledPtr)]) {
if !self.is_enabled() {
return;
}
let limit = match self.strategy {
PrefetchStrategy::FirstN(n) => n.min(children.len()),
PrefetchStrategy::Disabled => return,
_ => children.len(),
};
for (_, ptr) in children.iter().take(limit) {
self.prefetch(ptr);
}
}
pub fn prefetch_children_bounded(&self, children: &[(u8, SwizzledPtr)], depth: u16) {
if !self.is_enabled() {
return;
}
if let PrefetchStrategy::DepthLimited(max_depth) = self.strategy {
if depth as usize > max_depth {
return;
}
}
let limit = match self.strategy {
PrefetchStrategy::FirstN(n) => n.min(children.len()),
PrefetchStrategy::Disabled => return,
_ => children.len(),
};
for (_, ptr) in children.iter().take(limit) {
self.prefetch_with_depth(ptr, depth);
}
}
fn queue_prefetch(&self, request: PrefetchRequest) {
let mut queue = self.queue.lock().expect("prefetch queue poisoned");
if queue.len() >= self.max_queue_size {
self.stats.record_dropped();
return;
}
queue.push_back(request);
self.stats.record_io();
}
pub fn drain_requests(&self) -> Vec<PrefetchRequest> {
let mut queue = self.queue.lock().expect("prefetch queue poisoned");
queue.drain(..).collect()
}
pub fn queue_len(&self) -> usize {
self.queue.lock().expect("prefetch queue poisoned").len()
}
pub fn stats(&self) -> Arc<PrefetchStats> {
self.stats.clone()
}
pub fn clear(&self) {
let mut queue = self.queue.lock().expect("prefetch queue poisoned");
queue.clear();
}
}
impl Default for Prefetcher {
fn default() -> Self {
Self::new()
}
}
pub trait PrefetchHint {
fn prefetch_hints(&self) -> Vec<SwizzledPtr>;
fn child_count(&self) -> usize;
}
pub struct AdaptivePrefetcher {
inner: Prefetcher,
target_hit_rate: f64,
check_interval: u64,
last_check: AtomicU64,
}
impl AdaptivePrefetcher {
pub fn new(target_hit_rate: f64) -> Self {
AdaptivePrefetcher {
inner: Prefetcher::new(),
target_hit_rate: target_hit_rate.clamp(0.0, 1.0),
check_interval: 1000,
last_check: AtomicU64::new(0),
}
}
pub fn prefetch(&self, ptr: &SwizzledPtr) {
self.inner.prefetch(ptr);
self.maybe_adjust();
}
fn maybe_adjust(&self) {
let stats = self.inner.stats.snapshot();
let last = self.last_check.load(Ordering::Relaxed);
if stats.requests - last >= self.check_interval {
self.last_check.store(stats.requests, Ordering::Relaxed);
let hit_rate = stats.hit_rate();
if hit_rate < self.target_hit_rate * 0.8 {
}
}
}
pub fn inner(&self) -> &Prefetcher {
&self.inner
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_prefetch_stats() {
let stats = PrefetchStats::new();
stats.record_request();
stats.record_request();
stats.record_cache_hit();
stats.record_io();
stats.record_dropped();
let snapshot = stats.snapshot();
assert_eq!(snapshot.requests, 2);
assert_eq!(snapshot.cache_hits, 1);
assert_eq!(snapshot.io_issued, 1);
assert_eq!(snapshot.dropped, 1);
assert_eq!(snapshot.hit_rate(), 0.5);
}
#[test]
fn test_prefetcher_disabled() {
let prefetcher = Prefetcher::disabled();
assert!(!prefetcher.is_enabled());
let ptr = SwizzledPtr::on_disk(0, 0, super::super::swizzled_ptr::NodeType::Bucket);
prefetcher.prefetch(&ptr);
assert_eq!(prefetcher.queue_len(), 0);
}
#[test]
fn test_prefetcher_memory_pointer() {
let prefetcher = Prefetcher::new();
let ptr = SwizzledPtr::null();
prefetcher.prefetch(&ptr);
let stats = prefetcher.stats().snapshot();
assert!(stats.requests >= 0);
}
#[test]
fn test_prefetcher_queue() {
let prefetcher = Prefetcher::new();
for i in 0..10 {
let ptr = SwizzledPtr::on_disk(i, 0, super::super::swizzled_ptr::NodeType::Bucket);
prefetcher.prefetch(&ptr);
}
assert_eq!(prefetcher.queue_len(), 10);
let requests = prefetcher.drain_requests();
assert_eq!(requests.len(), 10);
assert_eq!(prefetcher.queue_len(), 0);
}
#[test]
fn test_prefetcher_max_queue() {
let prefetcher = Prefetcher::with_config(5, PrefetchStrategy::Immediate);
for i in 0..10 {
let ptr = SwizzledPtr::on_disk(i, 0, super::super::swizzled_ptr::NodeType::Bucket);
prefetcher.prefetch(&ptr);
}
assert_eq!(prefetcher.queue_len(), 5);
let stats = prefetcher.stats().snapshot();
assert_eq!(stats.dropped, 5);
}
#[test]
fn test_prefetch_strategy_first_n() {
let prefetcher = Prefetcher::with_config(100, PrefetchStrategy::FirstN(3));
let children: Vec<(u8, SwizzledPtr)> = (0..10)
.map(|i| {
(
i,
SwizzledPtr::on_disk(i as u32, 0, super::super::swizzled_ptr::NodeType::Bucket),
)
})
.collect();
prefetcher.prefetch_children(&children);
assert_eq!(prefetcher.queue_len(), 3);
}
#[test]
fn test_prefetch_depth_limited() {
let prefetcher = Prefetcher::with_config(100, PrefetchStrategy::DepthLimited(2));
let ptr = SwizzledPtr::on_disk(0, 0, super::super::swizzled_ptr::NodeType::Bucket);
prefetcher.prefetch_with_depth(&ptr, 0);
assert_eq!(prefetcher.queue_len(), 1);
prefetcher.prefetch_with_depth(&ptr, 1);
assert_eq!(prefetcher.queue_len(), 2);
prefetcher.prefetch_with_depth(&ptr, 2);
assert_eq!(prefetcher.queue_len(), 3);
prefetcher.prefetch_with_depth(&ptr, 3);
assert_eq!(prefetcher.queue_len(), 3);
}
#[test]
fn test_adaptive_prefetcher() {
let prefetcher = AdaptivePrefetcher::new(0.5);
assert!(prefetcher.inner().is_enabled());
let ptr = SwizzledPtr::on_disk(0, 0, super::super::swizzled_ptr::NodeType::Bucket);
prefetcher.prefetch(&ptr);
assert_eq!(prefetcher.inner().queue_len(), 1);
}
#[test]
fn test_prefetch_children_bounded_depth_limited() {
let prefetcher = Prefetcher::with_config(100, PrefetchStrategy::DepthLimited(2));
let children: Vec<(u8, SwizzledPtr)> = (0..5)
.map(|i| {
(
i,
SwizzledPtr::on_disk(i as u32, 0, super::super::swizzled_ptr::NodeType::Node4),
)
})
.collect();
prefetcher.prefetch_children_bounded(&children, 0);
assert_eq!(prefetcher.queue_len(), 5);
prefetcher.clear();
prefetcher.prefetch_children_bounded(&children, 1);
assert_eq!(prefetcher.queue_len(), 5);
prefetcher.clear();
prefetcher.prefetch_children_bounded(&children, 2);
assert_eq!(prefetcher.queue_len(), 5);
prefetcher.clear();
prefetcher.prefetch_children_bounded(&children, 3);
assert_eq!(prefetcher.queue_len(), 0);
}
#[test]
fn test_prefetch_children_bounded_first_n() {
let prefetcher = Prefetcher::with_config(100, PrefetchStrategy::FirstN(3));
let children: Vec<(u8, SwizzledPtr)> = (0..10)
.map(|i| {
(
i,
SwizzledPtr::on_disk(i as u32, 0, super::super::swizzled_ptr::NodeType::Node4),
)
})
.collect();
prefetcher.prefetch_children_bounded(&children, 0);
assert_eq!(prefetcher.queue_len(), 3);
prefetcher.clear();
prefetcher.prefetch_children_bounded(&children, 5);
assert_eq!(prefetcher.queue_len(), 3);
}
#[test]
fn test_prefetch_children_bounded_disabled() {
let prefetcher = Prefetcher::with_config(100, PrefetchStrategy::Disabled);
let children: Vec<(u8, SwizzledPtr)> = (0..5)
.map(|i| {
(
i,
SwizzledPtr::on_disk(i as u32, 0, super::super::swizzled_ptr::NodeType::Node4),
)
})
.collect();
prefetcher.prefetch_children_bounded(&children, 0);
assert_eq!(prefetcher.queue_len(), 0);
}
#[test]
fn test_prefetch_children_bounded_immediate() {
let prefetcher = Prefetcher::with_config(100, PrefetchStrategy::Immediate);
let children: Vec<(u8, SwizzledPtr)> = (0..7)
.map(|i| {
(
i,
SwizzledPtr::on_disk(i as u32, 0, super::super::swizzled_ptr::NodeType::Node4),
)
})
.collect();
prefetcher.prefetch_children_bounded(&children, 0);
assert_eq!(prefetcher.queue_len(), 7);
prefetcher.clear();
prefetcher.prefetch_children_bounded(&children, 100);
assert_eq!(prefetcher.queue_len(), 7);
}
}