Skip to main content

sochdb_storage/
prefetch.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Memory Prefetching for Range Scans
19//!
20//! Implements proactive prefetching to eliminate page fault latency during
21//! sequential scans of memory-mapped data.
22//!
23//! ## jj.md Task 7: Prefetching for Range Scans
24//!
25//! Goals:
26//! - Eliminate page fault latency during scans (~50μs per fault)
27//! - 2-3x faster cold range scans
28//! - Better utilization of I/O bandwidth
29//!
30//! ## Implementation
31//!
32//! Uses platform-specific prefetching:
33//! - Unix: `madvise(MADV_SEQUENTIAL | MADV_WILLNEED)`
34//! - x86_64: `_mm_prefetch` intrinsic for CPU cache prefetching
35//!
36//! For 1M edge scan with 4KB pages (32 edges/page):
37//! - Without prefetch: ~31,250 page faults × 50μs = 1.56 seconds stalled
38//! - With prefetch: Near-zero stall time
39
40/// Edge size in bytes (fixed format)
41pub const EDGE_SIZE: usize = 128;
42
43/// Number of edges to prefetch ahead during iteration
44/// 16 edges = 2KB = half a page, good for L1 cache
45pub const PREFETCH_DISTANCE: usize = 16;
46
47/// Advise the OS about memory access patterns for a range scan.
48///
49/// This should be called before starting a sequential scan over memory-mapped
50/// data to enable read-ahead and sequential prefetching.
51///
52/// # Arguments
53/// * `data` - The memory-mapped region that will be scanned
54/// * `start_offset` - Starting offset within the region
55/// * `length` - Expected length of the scan
56///
57/// # Platform Support
58/// - Unix: Uses `madvise` with `MADV_SEQUENTIAL` and `MADV_WILLNEED`
59/// - Other platforms: No-op (relies on OS default behavior)
60#[cfg(unix)]
61pub fn advise_sequential(data: &[u8], start_offset: usize, length: usize) {
62    use std::cmp::min;
63
64    // Validate bounds
65    let start = min(start_offset, data.len());
66    let end = min(start + length, data.len());
67
68    if end <= start {
69        return;
70    }
71
72    // Align to page boundaries (typically 4KB)
73    let page_size = page_size();
74    let aligned_start = (start / page_size) * page_size;
75    let aligned_length = (end - aligned_start).div_ceil(page_size) * page_size;
76
77    unsafe {
78        let ptr = data.as_ptr().add(aligned_start) as *mut libc::c_void;
79
80        // MADV_SEQUENTIAL: Expect sequential access, enable aggressive read-ahead
81        // MADV_WILLNEED: Bring pages into memory proactively
82        let advice = libc::MADV_SEQUENTIAL | libc::MADV_WILLNEED;
83
84        let result = libc::madvise(ptr, aligned_length, advice);
85        if result != 0 {
86            // Non-fatal: log but continue
87            #[cfg(debug_assertions)]
88            eprintln!(
89                "madvise failed: {} (continuing without prefetch)",
90                std::io::Error::last_os_error()
91            );
92        }
93    }
94}
95
96#[cfg(not(unix))]
97pub fn advise_sequential(_data: &[u8], _start_offset: usize, _length: usize) {
98    // No-op on non-Unix platforms
99}
100
101/// Advise the OS that a memory region will be accessed randomly.
102///
103/// This disables sequential read-ahead for point lookups.
104#[cfg(unix)]
105pub fn advise_random(data: &[u8]) {
106    unsafe {
107        let ptr = data.as_ptr() as *mut libc::c_void;
108        let result = libc::madvise(ptr, data.len(), libc::MADV_RANDOM);
109        if result != 0 {
110            #[cfg(debug_assertions)]
111            eprintln!("madvise RANDOM failed: {}", std::io::Error::last_os_error());
112        }
113    }
114}
115
116#[cfg(not(unix))]
117pub fn advise_random(_data: &[u8]) {
118    // No-op on non-Unix platforms
119}
120
121/// Advise the OS that a memory region is no longer needed.
122///
123/// Allows the OS to free pages if memory pressure is high.
124#[cfg(unix)]
125pub fn advise_dontneed(data: &[u8], start_offset: usize, length: usize) {
126    use std::cmp::min;
127
128    let start = min(start_offset, data.len());
129    let end = min(start + length, data.len());
130
131    if end <= start {
132        return;
133    }
134
135    let page_size = page_size();
136    let aligned_start = (start / page_size) * page_size;
137    let aligned_length = (end - aligned_start).div_ceil(page_size) * page_size;
138
139    unsafe {
140        let ptr = data.as_ptr().add(aligned_start) as *mut libc::c_void;
141        // MADV_DONTNEED: We're done with this memory
142        libc::madvise(ptr, aligned_length, libc::MADV_DONTNEED);
143    }
144}
145
146#[cfg(not(unix))]
147pub fn advise_dontneed(_data: &[u8], _start_offset: usize, _length: usize) {
148    // No-op on non-Unix platforms
149}
150
151/// Prefetch data into CPU cache during iteration.
152///
153/// Uses x86_64 `_mm_prefetch` intrinsic when available for low-latency
154/// cache-to-register prefetching.
155///
156/// # Arguments
157/// * `data` - The memory region
158/// * `current_offset` - Current read position
159/// * `prefetch_distance` - Number of bytes ahead to prefetch
160///
161/// # Safety
162/// Safe to call with any offset - bounds are checked internally.
163#[cfg(all(target_arch = "x86_64", target_feature = "sse"))]
164pub fn prefetch_ahead(data: &[u8], current_offset: usize, prefetch_distance: usize) {
165    use std::arch::x86_64::{_MM_HINT_T0, _mm_prefetch};
166
167    let target_offset = current_offset + prefetch_distance;
168    if target_offset < data.len() {
169        unsafe {
170            _mm_prefetch(
171                data.as_ptr().add(target_offset) as *const i8,
172                _MM_HINT_T0, // Prefetch to L1 cache
173            );
174        }
175    }
176}
177
178#[cfg(target_arch = "aarch64")]
179pub fn prefetch_ahead(data: &[u8], current_offset: usize, prefetch_distance: usize) {
180    // aarch64 prefetch is currently unstable in Rust
181    // Fall back to no-op until stabilized (issue #117217)
182    // The OS-level madvise still provides significant benefits
183    let _ = (data, current_offset, prefetch_distance);
184}
185
186#[cfg(not(any(
187    all(target_arch = "x86_64", target_feature = "sse"),
188    target_arch = "aarch64"
189)))]
190pub fn prefetch_ahead(_data: &[u8], _current_offset: usize, _prefetch_distance: usize) {
191    // No-op on unsupported platforms
192}
193
194/// Get the system page size.
195#[cfg(unix)]
196fn page_size() -> usize {
197    unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize }
198}
199
200#[cfg(not(unix))]
201fn page_size() -> usize {
202    4096 // Common default
203}
204
205/// Iterator wrapper that adds prefetching to an existing iterator.
206///
207/// Wraps any byte-slice based iterator and adds proactive prefetching
208/// to reduce cache misses and page faults.
209pub struct PrefetchingIterator<'a, I> {
210    inner: I,
211    data: &'a [u8],
212    current_offset: usize,
213    prefetch_distance_bytes: usize,
214}
215
216impl<'a, I> PrefetchingIterator<'a, I> {
217    /// Create a new prefetching iterator.
218    ///
219    /// # Arguments
220    /// * `inner` - The underlying iterator
221    /// * `data` - The memory-mapped data being iterated
222    /// * `prefetch_distance` - Number of items to prefetch ahead
223    /// * `item_size` - Size of each item in bytes
224    pub fn new(inner: I, data: &'a [u8], prefetch_distance: usize, item_size: usize) -> Self {
225        Self {
226            inner,
227            data,
228            current_offset: 0,
229            prefetch_distance_bytes: prefetch_distance * item_size,
230        }
231    }
232
233    /// Set the current offset (for resuming iteration)
234    pub fn set_offset(&mut self, offset: usize) {
235        self.current_offset = offset;
236    }
237}
238
239impl<'a, I, T> Iterator for PrefetchingIterator<'a, I>
240where
241    I: Iterator<Item = T>,
242{
243    type Item = T;
244
245    fn next(&mut self) -> Option<Self::Item> {
246        // Prefetch ahead before reading current item
247        prefetch_ahead(self.data, self.current_offset, self.prefetch_distance_bytes);
248
249        match self.inner.next() {
250            Some(item) => {
251                self.current_offset += EDGE_SIZE; // Advance by one edge
252                Some(item)
253            }
254            None => None,
255        }
256    }
257}
258
259/// Statistics for prefetch operations (useful for debugging/tuning)
260#[derive(Debug, Default, Clone)]
261pub struct PrefetchStats {
262    /// Number of madvise calls made
263    pub madvise_calls: u64,
264    /// Number of prefetch intrinsic calls
265    pub prefetch_calls: u64,
266    /// Total bytes advised for prefetching
267    pub bytes_advised: u64,
268}
269
270impl PrefetchStats {
271    /// Create a new stats tracker
272    pub fn new() -> Self {
273        Self::default()
274    }
275
276    /// Record an madvise call
277    pub fn record_madvise(&mut self, bytes: usize) {
278        self.madvise_calls += 1;
279        self.bytes_advised += bytes as u64;
280    }
281
282    /// Record a prefetch intrinsic call
283    pub fn record_prefetch(&mut self) {
284        self.prefetch_calls += 1;
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn test_page_size() {
294        let ps = page_size();
295        assert!(ps >= 4096, "Page size too small: {}", ps);
296        assert!(ps.is_power_of_two(), "Page size not power of 2: {}", ps);
297    }
298
299    #[test]
300    fn test_advise_sequential_bounds() {
301        let data = vec![0u8; 4096 * 10]; // 10 pages
302
303        // Normal case
304        advise_sequential(&data, 0, data.len());
305
306        // Partial range
307        advise_sequential(&data, 4096, 8192);
308
309        // Beyond bounds (should handle gracefully)
310        advise_sequential(&data, data.len() + 1000, 1000);
311
312        // Zero length
313        advise_sequential(&data, 0, 0);
314    }
315
316    #[test]
317    fn test_prefetch_ahead_bounds() {
318        let data = vec![0u8; 1024];
319
320        // Normal case
321        prefetch_ahead(&data, 0, 512);
322
323        // Near end
324        prefetch_ahead(&data, 900, 200);
325
326        // Beyond bounds (should handle gracefully)
327        prefetch_ahead(&data, 1000, 500);
328    }
329
330    #[test]
331    fn test_prefetching_iterator() {
332        let data = vec![0u8; 128 * 100]; // 100 edges worth of data
333        let items: Vec<i32> = (0..100).collect();
334
335        let iter = PrefetchingIterator::new(items.into_iter(), &data, PREFETCH_DISTANCE, EDGE_SIZE);
336
337        let collected: Vec<i32> = iter.collect();
338        assert_eq!(collected.len(), 100);
339        assert_eq!(collected[0], 0);
340        assert_eq!(collected[99], 99);
341    }
342
343    #[test]
344    fn test_prefetch_stats() {
345        let mut stats = PrefetchStats::new();
346
347        stats.record_madvise(4096);
348        stats.record_madvise(8192);
349        stats.record_prefetch();
350        stats.record_prefetch();
351        stats.record_prefetch();
352
353        assert_eq!(stats.madvise_calls, 2);
354        assert_eq!(stats.prefetch_calls, 3);
355        assert_eq!(stats.bytes_advised, 4096 + 8192);
356    }
357}