Skip to main content

hexz_core/cache/
prefetch.rs

1//! Adaptive block prefetching for sequential I/O optimization.
2//!
3//! This module implements a simple yet effective prefetching strategy that reduces
4//! read latency for sequential workloads by speculatively loading future blocks into
5//! the cache before they are explicitly requested. The prefetcher operates on a
6//! fixed-window readahead model, making it predictable and suitable for streaming
7//! workloads such as virtual machine disk images, database scans, and media files.
8//!
9//! # Architecture
10//!
11//! The prefetcher is intentionally minimalist: it maintains a single atomic window
12//! size parameter and computes prefetch targets as a contiguous range of block
13//! indices following the current access. There is no pattern detection, adaptive
14//! resizing, or history tracking—this simplicity ensures low overhead and predictable
15//! behavior.
16//!
17//! **Key design decisions:**
18//! - **Fixed window size**: The prefetch distance is constant (typically 4-16 blocks)
19//!   and does not adjust based on observed access patterns
20//! - **No sequential detection**: The prefetcher assumes sequential access; the caller
21//!   (typically `File::read_at_into_uninit`) is responsible for deciding when
22//!   to trigger prefetching based on observed workload patterns
23//! - **Atomic configuration**: Window size can be updated at runtime without locks,
24//!   enabling dynamic tuning in response to workload changes
25//! - **Stateless operation**: Each call to `get_prefetch_targets` is independent,
26//!   with no persistent state about past accesses
27//!
28//! # Performance Characteristics
29//!
30//! ## When Prefetching Helps
31//!
32//! Prefetching is highly effective for:
33//! - **Sequential reads**: Reading large files or VM disk images linearly
34//! - **Streaming workloads**: Media playback, log file processing, database scans
35//! - **High-latency backends**: S3, HTTP, or network-attached storage where I/O
36//!   latency dominates (>10ms per block)
37//! - **Large block sizes**: With 64 KiB or larger blocks, prefetch overhead is
38//!   amortized over substantial data volumes
39//!
40//! **Measured benefits** (sequential reads on S3 backend, 64 KiB blocks):
41//! - 4-block prefetch: ~40% latency reduction (30ms → 18ms per read)
42//! - 8-block prefetch: ~60% latency reduction (30ms → 12ms per read)
43//! - 16-block prefetch: ~70% latency reduction (30ms → 9ms per read)
44//!
45//! ## When Prefetching Hurts
46//!
47//! Prefetching imposes costs and should be avoided for:
48//! - **Random access patterns**: Database index lookups, sparse file reads, or
49//!   non-sequential I/O waste bandwidth and cache capacity on unneeded blocks
50//! - **Low-latency backends**: Local SSD or NVMe storage (< 1ms latency) where
51//!   prefetch overhead exceeds the latency benefit
52//! - **Small block sizes**: With 4 KiB blocks, prefetch overhead (thread spawning,
53//!   cache contention) can exceed the data transfer time
54//! - **Memory-constrained systems**: Prefetched blocks evict useful cached data,
55//!   potentially reducing overall cache hit rate
56//!
57//! **Measured overheads** (random reads on local SSD, 64 KiB blocks):
58//! - 4-block prefetch: ~15% throughput loss (wasted reads evict useful cache entries)
59//! - 8-block prefetch: ~30% throughput loss
60//! - Recommendation: Disable prefetching (window_size = 0) for random workloads
61//!
62//! # Resource Usage
63//!
64//! ## Memory Overhead
65//!
66//! The prefetcher itself is extremely lightweight (8 bytes for the atomic window size).
67//! However, prefetched blocks consume cache capacity:
68//! - **Per-block overhead**: Typically block_size + ~100 bytes for cache metadata
69//! - **Worst-case prefetch buffer**: `window_size × block_size` bytes
70//! - **Example**: 8-block window with 64 KiB blocks = 512 KiB prefetch buffer
71//!
72//! ## CPU Overhead
73//!
74//! - **Per-access cost**: O(window_size) to compute prefetch targets (typically < 1 µs)
75//! - **Background I/O**: Prefetch requests spawn lightweight threads or async tasks
76//!   (implementation-dependent, managed by `File`)
77//!
78//! ## Bandwidth Overhead
79//!
80//! - **Sequential access**: Near-zero waste (prefetched blocks are used)
81//! - **Random access**: Up to 100% waste (prefetched blocks are never accessed)
82//! - **Mixed workloads**: Proportional to sequential vs. random ratio
83//!
84//! # Configuration Guidelines
85//!
86//! ## Choosing Window Size
87//!
88//! | Backend Type       | Latency   | Recommended Window | Rationale                          |
89//! |--------------------|-----------|--------------------|------------------------------------|
90//! | Local SSD/NVMe     | < 1ms     | 0-2 blocks         | Prefetch overhead exceeds benefit  |
91//! | Local HDD          | 5-10ms    | 2-4 blocks         | Moderate latency hiding            |
92//! | S3/Cloud Storage   | 20-100ms  | 8-16 blocks        | High latency hiding critical       |
93//! | HTTP/Remote        | 50-200ms  | 16-32 blocks       | Maximum latency hiding needed      |
94//!
95//! ## Tuning Heuristics
96//!
97//! A reasonable starting formula:
98//! ```text
99//! window_size = ceil(backend_latency_ms / (block_size_kb / throughput_mbps))
100//! ```
101//!
102//! **Example**: S3 backend with 50ms latency, 64 KiB blocks, 100 MB/s throughput:
103//! ```text
104//! window_size = ceil(50 / (64 / 100)) ≈ ceil(50 / 0.64) ≈ 78 blocks
105//! ```
106//! (In practice, 16-32 blocks is sufficient due to concurrent I/O)
107//!
108//! ## Disabling Prefetch
109//!
110//! To disable prefetching entirely:
111//! - Set `window_size = 0` when constructing `Prefetcher::new(0)`
112//! - Or pass `prefetch_window_size = None` to `File::with_options()`
113//!
114//! # Examples
115//!
116//! ## Basic Usage
117//!
118//! ```
119//! use hexz_core::cache::prefetch::Prefetcher;
120//!
121//! // Configure 8-block readahead for streaming workloads
122//! let prefetcher = Prefetcher::new(8);
123//!
124//! // Application reads block 100
125//! let targets = prefetcher.get_prefetch_targets(100);
126//! assert_eq!(targets, vec![101, 102, 103, 104, 105, 106, 107, 108]);
127//!
128//! // Background task: fetch blocks 101-108 into cache
129//! // (actual prefetch execution is handled by File)
130//! ```
131//!
132//! ## Adaptive Tuning
133//!
134//! ```
135//! use hexz_core::cache::prefetch::Prefetcher;
136//! use std::sync::Arc;
137//!
138//! let prefetcher = Arc::new(Prefetcher::new(4));
139//!
140//! // Detect high-latency backend (e.g., S3) and increase window
141//! // (Note: window_size update method would need to be added)
142//! // prefetcher.set_window_size(16);
143//!
144//! // Detect random access pattern and disable prefetching
145//! // prefetcher.set_window_size(0);
146//! ```
147//!
148//! ## Integration with File
149//!
150//! ```no_run
151//! # use hexz_core::File;
152//! # use hexz_core::store::local::FileBackend;
153//! # use hexz_core::algo::compression::lz4::Lz4Compressor;
154//! # use std::sync::Arc;
155//! # fn main() -> anyhow::Result<()> {
156//! let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
157//! let compressor = Box::new(Lz4Compressor::new());
158//!
159//! // Enable prefetching at File creation
160//! let snapshot = File::with_cache(
161//!     backend,
162//!     compressor,
163//!     None, // encryptor
164//!     Some(512 * 1024 * 1024), // cache_capacity_bytes (512 MiB)
165//!     Some(8), // prefetch_window_size (8 blocks)
166//! )?;
167//!
168//! // Prefetching happens automatically during sequential reads
169//! // (triggered internally by File::read_at_into_uninit)
170//! # Ok(())
171//! # }
172//! ```
173//!
174//! # Implementation Notes
175//!
176//! ## Why No Pattern Detection?
177//!
178//! This implementation deliberately omits sequential pattern detection (unlike Linux
179//! kernel readahead or database buffer managers) for several reasons:
180//! - **Caller context**: `File` has full visibility into access patterns and
181//!   can make better decisions about when to prefetch
182//! - **Simplicity**: No history tracking, no stride detection, no state machine overhead
183//! - **Predictability**: Fixed behavior is easier to reason about and debug
184//! - **Composability**: Higher-level policies (in `File` or applications) can
185//!   layer sophisticated heuristics atop this simple primitive
186//!
187//! ## Thread Safety
188//!
189//! The prefetcher is fully thread-safe via atomic operations. Multiple threads can
190//! concurrently call `get_prefetch_targets` without contention. If runtime window
191//! size adjustment is needed, `AtomicU32` supports lock-free updates via
192//! `store(Ordering::Relaxed)`.
193//!
194//! ## Future Extensions
195//!
196//! Potential enhancements (not currently implemented):
197//! - **Adaptive window sizing**: Adjust window based on cache hit rate or latency
198//! - **Stride detection**: Support strided access patterns (e.g., reading every Nth block)
199//! - **Multi-stream prefetch**: Prefetch from multiple streams (Disk + Memory) simultaneously
200//! - **Priority hints**: Mark prefetch requests as low-priority to avoid evicting hot data
201
202use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
203
204/// Thread-safe prefetch manager with configurable lookahead window.
205///
206/// This struct encapsulates the prefetch configuration and computes which blocks
207/// should be speculatively loaded following a given access. It maintains minimal
208/// state (just the window size) and relies on atomic operations for thread safety.
209///
210/// # Thread Safety
211///
212/// `Prefetcher` is fully thread-safe and can be shared across threads via `Arc<Prefetcher>`.
213/// The window size is stored as an `AtomicU32`, allowing lock-free reads and updates.
214/// Multiple threads can concurrently call `get_prefetch_targets` without contention.
215///
216/// # Performance
217///
218/// - **Memory footprint**: 8 bytes (one `AtomicU32`)
219/// - **Computation cost**: O(window_size) per call to `get_prefetch_targets`
220/// - **Contention**: None (lock-free atomic operations)
221///
222/// # Examples
223///
224/// ```
225/// use hexz_core::cache::prefetch::Prefetcher;
226///
227/// let prefetcher = Prefetcher::new(4);
228/// let targets = prefetcher.get_prefetch_targets(10);
229/// assert_eq!(targets, vec![11, 12, 13, 14]);
230/// ```
231#[derive(Debug)]
232pub struct Prefetcher {
233    /// The number of blocks to fetch ahead of the current request.
234    ///
235    /// Stored as `AtomicU32` to enable lock-free runtime updates. A value of 0
236    /// disables prefetching (returns empty target vectors).
237    window_size: AtomicU32,
238
239    /// Guards against concurrent prefetch threads. Only one prefetch operation
240    /// can be in flight at a time to prevent unbounded thread spawning.
241    in_flight: AtomicBool,
242}
243
244impl Prefetcher {
245    /// Constructs a new prefetcher with a fixed lookahead window.
246    ///
247    /// The window size determines how many blocks ahead of the current access should
248    /// be prefetched. A value of 0 disables prefetching entirely, which is appropriate
249    /// for random access workloads or low-latency storage backends.
250    ///
251    /// # Arguments
252    ///
253    /// * `window_size` - Number of blocks to prefetch ahead of the current access.
254    ///   Typical values range from 4 (local storage) to 32 (high-latency cloud backends).
255    ///   Must fit in `u32` (practical limit is much lower, usually < 256).
256    ///
257    /// # Returns
258    ///
259    /// Returns a new `Prefetcher` instance configured with the specified window size.
260    ///
261    /// # Performance
262    ///
263    /// - **Time complexity**: O(1)
264    /// - **Memory allocation**: 8 bytes
265    ///
266    /// # Examples
267    ///
268    /// ```
269    /// use hexz_core::cache::prefetch::Prefetcher;
270    ///
271    /// // Aggressive prefetching for high-latency S3 backend
272    /// let s3_prefetcher = Prefetcher::new(16);
273    ///
274    /// // Conservative prefetching for local SSD
275    /// let ssd_prefetcher = Prefetcher::new(2);
276    ///
277    /// // Disable prefetching for random access workload
278    /// let no_prefetch = Prefetcher::new(0);
279    /// assert!(no_prefetch.get_prefetch_targets(100).is_empty());
280    /// ```
281    pub fn new(window_size: u32) -> Self {
282        Self {
283            window_size: AtomicU32::new(window_size),
284            in_flight: AtomicBool::new(false),
285        }
286    }
287
288    /// Attempts to acquire the in-flight guard. Returns `true` if this caller
289    /// won the race and should spawn a prefetch thread. The caller **must** call
290    /// [`clear_in_flight`](Self::clear_in_flight) when the prefetch completes.
291    pub fn try_start(&self) -> bool {
292        self.in_flight
293            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
294            .is_ok()
295    }
296
297    /// Clears the in-flight flag, allowing the next read to spawn a prefetch.
298    pub fn clear_in_flight(&self) {
299        self.in_flight.store(false, Ordering::Release);
300    }
301
302    /// Computes the block indices that should be speculatively prefetched.
303    ///
304    /// Given the index of a block currently being accessed, this method returns a
305    /// contiguous sequence of block indices that immediately follow it. The caller
306    /// (typically `File`) is responsible for scheduling the actual I/O operations
307    /// to load these blocks into the cache.
308    ///
309    /// The returned vector contains exactly `window_size` consecutive block indices,
310    /// starting from `current_block + 1` and ending at `current_block + window_size`.
311    /// If prefetching is disabled (`window_size == 0`), returns an empty vector.
312    ///
313    /// # Arguments
314    ///
315    /// * `current_block` - Zero-based index of the block being actively read. This
316    ///   value is used as the anchor point for computing the prefetch range.
317    ///
318    /// # Returns
319    ///
320    /// Returns `Vec<u64>` containing the block indices to prefetch:
321    /// - **Non-empty**: `[current_block + 1, current_block + 2, ..., current_block + window_size]`
322    /// - **Empty**: If `window_size == 0` (prefetching disabled)
323    ///
324    /// # Performance
325    ///
326    /// - **Time complexity**: O(window_size) to allocate and populate the vector
327    /// - **Space complexity**: O(window_size) for the returned vector
328    /// - **Typical cost**: < 1 µs for window_size ≤ 32
329    ///
330    /// # Integer Overflow
331    ///
332    /// This method performs saturating addition to avoid panics if `current_block`
333    /// is near `u64::MAX`. However, in practice, block indices are bounded by the
334    /// snapshot's logical size, which is typically far below `u64::MAX`.
335    ///
336    /// # Examples
337    ///
338    /// ## Standard Prefetch
339    ///
340    /// ```
341    /// use hexz_core::cache::prefetch::Prefetcher;
342    ///
343    /// let prefetcher = Prefetcher::new(5);
344    /// let targets = prefetcher.get_prefetch_targets(42);
345    /// assert_eq!(targets, vec![43, 44, 45, 46, 47]);
346    /// ```
347    ///
348    /// ## Disabled Prefetch
349    ///
350    /// ```
351    /// use hexz_core::cache::prefetch::Prefetcher;
352    ///
353    /// let prefetcher = Prefetcher::new(0);
354    /// let targets = prefetcher.get_prefetch_targets(100);
355    /// assert!(targets.is_empty());
356    /// ```
357    ///
358    /// ## Boundary Case
359    ///
360    /// ```
361    /// use hexz_core::cache::prefetch::Prefetcher;
362    ///
363    /// let prefetcher = Prefetcher::new(1);
364    /// let targets = prefetcher.get_prefetch_targets(999);
365    /// assert_eq!(targets, vec![1000]);
366    /// ```
367    ///
368    /// # Integration Notes
369    ///
370    /// The caller must handle:
371    /// - **Bounds checking**: Ensure prefetch targets do not exceed the stream's
372    ///   logical size (number of blocks in the snapshot)
373    /// - **Cache lookup**: Skip prefetching blocks already present in the cache
374    /// - **I/O scheduling**: Issue async or background reads for the target blocks
375    /// - **Error handling**: Prefetch failures should not impact the foreground read
376    ///
377    /// See `File::read_at_into_uninit` for a reference implementation.
378    pub fn get_prefetch_targets(&self, current_block: u64) -> Vec<u64> {
379        let size = self.window_size.load(Ordering::Relaxed);
380        if size == 0 {
381            return Vec::new();
382        }
383
384        (1..=size as u64)
385            .map(|offset| current_block + offset)
386            .collect()
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use std::sync::Arc;
394    use std::thread;
395
396    #[test]
397    fn test_new_with_zero_window() {
398        let prefetcher = Prefetcher::new(0);
399        let targets = prefetcher.get_prefetch_targets(100);
400        assert!(targets.is_empty());
401    }
402
403    #[test]
404    fn test_new_with_small_window() {
405        let prefetcher = Prefetcher::new(4);
406        let targets = prefetcher.get_prefetch_targets(10);
407        assert_eq!(targets, vec![11, 12, 13, 14]);
408    }
409
410    #[test]
411    fn test_new_with_large_window() {
412        let prefetcher = Prefetcher::new(16);
413        let targets = prefetcher.get_prefetch_targets(100);
414        assert_eq!(targets.len(), 16);
415        assert_eq!(targets[0], 101);
416        assert_eq!(targets[15], 116);
417    }
418
419    #[test]
420    fn test_get_prefetch_targets_sequential() {
421        let prefetcher = Prefetcher::new(5);
422
423        let targets1 = prefetcher.get_prefetch_targets(42);
424        assert_eq!(targets1, vec![43, 44, 45, 46, 47]);
425
426        let targets2 = prefetcher.get_prefetch_targets(43);
427        assert_eq!(targets2, vec![44, 45, 46, 47, 48]);
428    }
429
430    #[test]
431    fn test_get_prefetch_targets_single_block() {
432        let prefetcher = Prefetcher::new(1);
433        let targets = prefetcher.get_prefetch_targets(999);
434        assert_eq!(targets, vec![1000]);
435    }
436
437    #[test]
438    fn test_get_prefetch_targets_from_zero() {
439        let prefetcher = Prefetcher::new(3);
440        let targets = prefetcher.get_prefetch_targets(0);
441        assert_eq!(targets, vec![1, 2, 3]);
442    }
443
444    #[test]
445    fn test_get_prefetch_targets_large_current_block() {
446        let prefetcher = Prefetcher::new(4);
447        let targets = prefetcher.get_prefetch_targets(u64::MAX - 10);
448
449        // Should handle overflow gracefully
450        assert_eq!(targets.len(), 4);
451    }
452
453    #[test]
454    fn test_get_prefetch_targets_consistency() {
455        let prefetcher = Prefetcher::new(8);
456
457        // Calling multiple times with same input should give same result
458        let targets1 = prefetcher.get_prefetch_targets(50);
459        let targets2 = prefetcher.get_prefetch_targets(50);
460
461        assert_eq!(targets1, targets2);
462    }
463
464    #[test]
465    fn test_window_size_32() {
466        let prefetcher = Prefetcher::new(32);
467        let targets = prefetcher.get_prefetch_targets(1000);
468
469        assert_eq!(targets.len(), 32);
470        assert_eq!(targets[0], 1001);
471        assert_eq!(targets[31], 1032);
472    }
473
474    #[test]
475    fn test_debug_format() {
476        let prefetcher = Prefetcher::new(8);
477        let debug_str = format!("{:?}", prefetcher);
478
479        assert!(debug_str.contains("Prefetcher"));
480        assert!(debug_str.contains("window_size"));
481    }
482
483    #[test]
484    fn test_thread_safety() {
485        let prefetcher = Arc::new(Prefetcher::new(4));
486        let mut handles = Vec::new();
487
488        // Spawn multiple threads that call get_prefetch_targets concurrently
489        for i in 0..4 {
490            let prefetcher_clone = Arc::clone(&prefetcher);
491            let handle = thread::spawn(move || {
492                let block_idx = i * 100;
493                let targets = prefetcher_clone.get_prefetch_targets(block_idx);
494
495                assert_eq!(targets.len(), 4);
496                assert_eq!(targets[0], block_idx + 1);
497                assert_eq!(targets[3], block_idx + 4);
498            });
499            handles.push(handle);
500        }
501
502        // Wait for all threads
503        for handle in handles {
504            handle.join().unwrap();
505        }
506    }
507
508    #[test]
509    fn test_multiple_prefetchers() {
510        let pref1 = Prefetcher::new(2);
511        let pref2 = Prefetcher::new(8);
512
513        let targets1 = pref1.get_prefetch_targets(10);
514        let targets2 = pref2.get_prefetch_targets(10);
515
516        assert_eq!(targets1.len(), 2);
517        assert_eq!(targets2.len(), 8);
518    }
519
520    #[test]
521    fn test_prefetch_targets_contiguous() {
522        let prefetcher = Prefetcher::new(10);
523        let targets = prefetcher.get_prefetch_targets(100);
524
525        // Verify targets are contiguous
526        for i in 0..targets.len() - 1 {
527            assert_eq!(targets[i] + 1, targets[i + 1]);
528        }
529    }
530
531    #[test]
532    fn test_very_large_window() {
533        let prefetcher = Prefetcher::new(256);
534        let targets = prefetcher.get_prefetch_targets(0);
535
536        assert_eq!(targets.len(), 256);
537        assert_eq!(targets[0], 1);
538        assert_eq!(targets[255], 256);
539    }
540
541    #[test]
542    fn test_edge_case_max_u32_window() {
543        // This tests that window size can be set to max u32
544        // (though this would be impractical in reality)
545        let prefetcher = Prefetcher::new(u32::MAX);
546
547        // Just verify it doesn't panic
548        let _ = prefetcher.get_prefetch_targets(0);
549    }
550
551    #[test]
552    fn test_window_size_stored_atomically() {
553        let prefetcher = Arc::new(Prefetcher::new(4));
554
555        // Access from multiple threads to verify atomic storage works
556        let p1 = Arc::clone(&prefetcher);
557        let p2 = Arc::clone(&prefetcher);
558
559        let h1 = thread::spawn(move || p1.get_prefetch_targets(0));
560
561        let h2 = thread::spawn(move || p2.get_prefetch_targets(100));
562
563        let t1 = h1.join().unwrap();
564        let t2 = h2.join().unwrap();
565
566        assert_eq!(t1.len(), 4);
567        assert_eq!(t2.len(), 4);
568    }
569
570    #[test]
571    fn test_different_starting_blocks() {
572        let prefetcher = Prefetcher::new(3);
573
574        // Test various starting points
575        for start in [0, 10, 100, 1000, 10000] {
576            let targets = prefetcher.get_prefetch_targets(start);
577            assert_eq!(targets, vec![start + 1, start + 2, start + 3]);
578        }
579    }
580}