sochdb_storage/
prefetch.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memory Prefetching for Range Scans
16//!
17//! Implements proactive prefetching to eliminate page fault latency during
18//! sequential scans of memory-mapped data.
19//!
20//! ## jj.md Task 7: Prefetching for Range Scans
21//!
22//! Goals:
23//! - Eliminate page fault latency during scans (~50μs per fault)
24//! - 2-3x faster cold range scans
25//! - Better utilization of I/O bandwidth
26//!
27//! ## Implementation
28//!
29//! Uses platform-specific prefetching:
30//! - Unix: `madvise(MADV_SEQUENTIAL | MADV_WILLNEED)`
31//! - x86_64: `_mm_prefetch` intrinsic for CPU cache prefetching
32//!
33//! For 1M edge scan with 4KB pages (32 edges/page):
34//! - Without prefetch: ~31,250 page faults × 50μs = 1.56 seconds stalled
35//! - With prefetch: Near-zero stall time
36
37/// Edge size in bytes (fixed format)
38pub const EDGE_SIZE: usize = 128;
39
40/// Number of edges to prefetch ahead during iteration
41/// 16 edges = 2KB = half a page, good for L1 cache
42pub const PREFETCH_DISTANCE: usize = 16;
43
44/// Advise the OS about memory access patterns for a range scan.
45///
46/// This should be called before starting a sequential scan over memory-mapped
47/// data to enable read-ahead and sequential prefetching.
48///
49/// # Arguments
50/// * `data` - The memory-mapped region that will be scanned
51/// * `start_offset` - Starting offset within the region
52/// * `length` - Expected length of the scan
53///
54/// # Platform Support
55/// - Unix: Uses `madvise` with `MADV_SEQUENTIAL` and `MADV_WILLNEED`
56/// - Other platforms: No-op (relies on OS default behavior)
57#[cfg(unix)]
58pub fn advise_sequential(data: &[u8], start_offset: usize, length: usize) {
59    use std::cmp::min;
60
61    // Validate bounds
62    let start = min(start_offset, data.len());
63    let end = min(start + length, data.len());
64
65    if end <= start {
66        return;
67    }
68
69    // Align to page boundaries (typically 4KB)
70    let page_size = page_size();
71    let aligned_start = (start / page_size) * page_size;
72    let aligned_length = (end - aligned_start).div_ceil(page_size) * page_size;
73
74    unsafe {
75        let ptr = data.as_ptr().add(aligned_start) as *mut libc::c_void;
76
77        // MADV_SEQUENTIAL: Expect sequential access, enable aggressive read-ahead
78        // MADV_WILLNEED: Bring pages into memory proactively
79        let advice = libc::MADV_SEQUENTIAL | libc::MADV_WILLNEED;
80
81        let result = libc::madvise(ptr, aligned_length, advice);
82        if result != 0 {
83            // Non-fatal: log but continue
84            #[cfg(debug_assertions)]
85            eprintln!(
86                "madvise failed: {} (continuing without prefetch)",
87                std::io::Error::last_os_error()
88            );
89        }
90    }
91}
92
93#[cfg(not(unix))]
94pub fn advise_sequential(_data: &[u8], _start_offset: usize, _length: usize) {
95    // No-op on non-Unix platforms
96}
97
98/// Advise the OS that a memory region will be accessed randomly.
99///
100/// This disables sequential read-ahead for point lookups.
101#[cfg(unix)]
102pub fn advise_random(data: &[u8]) {
103    unsafe {
104        let ptr = data.as_ptr() as *mut libc::c_void;
105        let result = libc::madvise(ptr, data.len(), libc::MADV_RANDOM);
106        if result != 0 {
107            #[cfg(debug_assertions)]
108            eprintln!("madvise RANDOM failed: {}", std::io::Error::last_os_error());
109        }
110    }
111}
112
113#[cfg(not(unix))]
114pub fn advise_random(_data: &[u8]) {
115    // No-op on non-Unix platforms
116}
117
118/// Advise the OS that a memory region is no longer needed.
119///
120/// Allows the OS to free pages if memory pressure is high.
121#[cfg(unix)]
122pub fn advise_dontneed(data: &[u8], start_offset: usize, length: usize) {
123    use std::cmp::min;
124
125    let start = min(start_offset, data.len());
126    let end = min(start + length, data.len());
127
128    if end <= start {
129        return;
130    }
131
132    let page_size = page_size();
133    let aligned_start = (start / page_size) * page_size;
134    let aligned_length = (end - aligned_start).div_ceil(page_size) * page_size;
135
136    unsafe {
137        let ptr = data.as_ptr().add(aligned_start) as *mut libc::c_void;
138        // MADV_DONTNEED: We're done with this memory
139        libc::madvise(ptr, aligned_length, libc::MADV_DONTNEED);
140    }
141}
142
143#[cfg(not(unix))]
144pub fn advise_dontneed(_data: &[u8], _start_offset: usize, _length: usize) {
145    // No-op on non-Unix platforms
146}
147
148/// Prefetch data into CPU cache during iteration.
149///
150/// Uses x86_64 `_mm_prefetch` intrinsic when available for low-latency
151/// cache-to-register prefetching.
152///
153/// # Arguments
154/// * `data` - The memory region
155/// * `current_offset` - Current read position
156/// * `prefetch_distance` - Number of bytes ahead to prefetch
157///
158/// # Safety
159/// Safe to call with any offset - bounds are checked internally.
160#[cfg(all(target_arch = "x86_64", target_feature = "sse"))]
161pub fn prefetch_ahead(data: &[u8], current_offset: usize, prefetch_distance: usize) {
162    use std::arch::x86_64::{_MM_HINT_T0, _mm_prefetch};
163
164    let target_offset = current_offset + prefetch_distance;
165    if target_offset < data.len() {
166        unsafe {
167            _mm_prefetch(
168                data.as_ptr().add(target_offset) as *const i8,
169                _MM_HINT_T0, // Prefetch to L1 cache
170            );
171        }
172    }
173}
174
175#[cfg(target_arch = "aarch64")]
176pub fn prefetch_ahead(data: &[u8], current_offset: usize, prefetch_distance: usize) {
177    // aarch64 prefetch is currently unstable in Rust
178    // Fall back to no-op until stabilized (issue #117217)
179    // The OS-level madvise still provides significant benefits
180    let _ = (data, current_offset, prefetch_distance);
181}
182
183#[cfg(not(any(
184    all(target_arch = "x86_64", target_feature = "sse"),
185    target_arch = "aarch64"
186)))]
187pub fn prefetch_ahead(_data: &[u8], _current_offset: usize, _prefetch_distance: usize) {
188    // No-op on unsupported platforms
189}
190
191/// Get the system page size.
192#[cfg(unix)]
193fn page_size() -> usize {
194    unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize }
195}
196
197#[cfg(not(unix))]
198fn page_size() -> usize {
199    4096 // Common default
200}
201
202/// Iterator wrapper that adds prefetching to an existing iterator.
203///
204/// Wraps any byte-slice based iterator and adds proactive prefetching
205/// to reduce cache misses and page faults.
206pub struct PrefetchingIterator<'a, I> {
207    inner: I,
208    data: &'a [u8],
209    current_offset: usize,
210    prefetch_distance_bytes: usize,
211}
212
213impl<'a, I> PrefetchingIterator<'a, I> {
214    /// Create a new prefetching iterator.
215    ///
216    /// # Arguments
217    /// * `inner` - The underlying iterator
218    /// * `data` - The memory-mapped data being iterated
219    /// * `prefetch_distance` - Number of items to prefetch ahead
220    /// * `item_size` - Size of each item in bytes
221    pub fn new(inner: I, data: &'a [u8], prefetch_distance: usize, item_size: usize) -> Self {
222        Self {
223            inner,
224            data,
225            current_offset: 0,
226            prefetch_distance_bytes: prefetch_distance * item_size,
227        }
228    }
229
230    /// Set the current offset (for resuming iteration)
231    pub fn set_offset(&mut self, offset: usize) {
232        self.current_offset = offset;
233    }
234}
235
236impl<'a, I, T> Iterator for PrefetchingIterator<'a, I>
237where
238    I: Iterator<Item = T>,
239{
240    type Item = T;
241
242    fn next(&mut self) -> Option<Self::Item> {
243        // Prefetch ahead before reading current item
244        prefetch_ahead(self.data, self.current_offset, self.prefetch_distance_bytes);
245
246        match self.inner.next() {
247            Some(item) => {
248                self.current_offset += EDGE_SIZE; // Advance by one edge
249                Some(item)
250            }
251            None => None,
252        }
253    }
254}
255
256/// Statistics for prefetch operations (useful for debugging/tuning)
257#[derive(Debug, Default, Clone)]
258pub struct PrefetchStats {
259    /// Number of madvise calls made
260    pub madvise_calls: u64,
261    /// Number of prefetch intrinsic calls
262    pub prefetch_calls: u64,
263    /// Total bytes advised for prefetching
264    pub bytes_advised: u64,
265}
266
267impl PrefetchStats {
268    /// Create a new stats tracker
269    pub fn new() -> Self {
270        Self::default()
271    }
272
273    /// Record an madvise call
274    pub fn record_madvise(&mut self, bytes: usize) {
275        self.madvise_calls += 1;
276        self.bytes_advised += bytes as u64;
277    }
278
279    /// Record a prefetch intrinsic call
280    pub fn record_prefetch(&mut self) {
281        self.prefetch_calls += 1;
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn test_page_size() {
291        let ps = page_size();
292        assert!(ps >= 4096, "Page size too small: {}", ps);
293        assert!(ps.is_power_of_two(), "Page size not power of 2: {}", ps);
294    }
295
296    #[test]
297    fn test_advise_sequential_bounds() {
298        let data = vec![0u8; 4096 * 10]; // 10 pages
299
300        // Normal case
301        advise_sequential(&data, 0, data.len());
302
303        // Partial range
304        advise_sequential(&data, 4096, 8192);
305
306        // Beyond bounds (should handle gracefully)
307        advise_sequential(&data, data.len() + 1000, 1000);
308
309        // Zero length
310        advise_sequential(&data, 0, 0);
311    }
312
313    #[test]
314    fn test_prefetch_ahead_bounds() {
315        let data = vec![0u8; 1024];
316
317        // Normal case
318        prefetch_ahead(&data, 0, 512);
319
320        // Near end
321        prefetch_ahead(&data, 900, 200);
322
323        // Beyond bounds (should handle gracefully)
324        prefetch_ahead(&data, 1000, 500);
325    }
326
327    #[test]
328    fn test_prefetching_iterator() {
329        let data = vec![0u8; 128 * 100]; // 100 edges worth of data
330        let items: Vec<i32> = (0..100).collect();
331
332        let iter = PrefetchingIterator::new(items.into_iter(), &data, PREFETCH_DISTANCE, EDGE_SIZE);
333
334        let collected: Vec<i32> = iter.collect();
335        assert_eq!(collected.len(), 100);
336        assert_eq!(collected[0], 0);
337        assert_eq!(collected[99], 99);
338    }
339
340    #[test]
341    fn test_prefetch_stats() {
342        let mut stats = PrefetchStats::new();
343
344        stats.record_madvise(4096);
345        stats.record_madvise(8192);
346        stats.record_prefetch();
347        stats.record_prefetch();
348        stats.record_prefetch();
349
350        assert_eq!(stats.madvise_calls, 2);
351        assert_eq!(stats.prefetch_calls, 3);
352        assert_eq!(stats.bytes_advised, 4096 + 8192);
353    }
354}