Skip to main content

nxs/prefetch/
mod.rs

1//! Adaptive prefetch — page cache, pattern detector, strategies (spec §4–§8.4).
2
3mod pattern;
4
5use std::collections::{HashMap, HashSet};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread::{self, JoinHandle};
9
10use crate::error::{NxsError, Result};
11
12pub use pattern::{AccessPattern, AccessPatternDetector, UPGRADE_SEQUENTIAL_THRESHOLD};
13
14pub const DEFAULT_PAGE_SIZE: usize = 65_536;
15pub const DEFAULT_MAX_PAGES: usize = 256;
16pub const DEFAULT_COALESCE_GAP_PAGES: usize = 1;
17pub const DEFAULT_PREFETCH_DEPTH: usize = 4;
18pub const EAGER_THRESHOLD_MB: usize = 10;
19pub const LAZY_THRESHOLD_MB: usize = 50;
20
21/// Caller access hint at open time (advisory).
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23#[repr(u8)]
24pub enum AccessHint {
25    #[default]
26    Unknown = 0,
27    Sequential = 1,
28    Random = 2,
29    Full = 3,
30    Partial = 4,
31}
32
33/// Prefetch strategy (spec §5).
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum PrefetchStrategy {
36    Lazy,
37    Adaptive,
38    Eager,
39}
40
41impl PrefetchStrategy {
42    pub fn as_str(self) -> &'static str {
43        match self {
44            Self::Lazy => "lazy",
45            Self::Adaptive => "adaptive",
46            Self::Eager => "eager",
47        }
48    }
49}
50
51/// Open-time prefetch configuration.
52#[derive(Debug, Clone)]
53pub struct OpenOptions {
54    pub hint: AccessHint,
55    pub max_pages: usize,
56    pub page_size: usize,
57    pub coalesce_gap_pages: usize,
58    pub prefetch_depth: usize,
59}
60
61impl Default for OpenOptions {
62    fn default() -> Self {
63        Self {
64            hint: AccessHint::Unknown,
65            max_pages: DEFAULT_MAX_PAGES,
66            page_size: DEFAULT_PAGE_SIZE,
67            coalesce_gap_pages: DEFAULT_COALESCE_GAP_PAGES,
68            prefetch_depth: DEFAULT_PREFETCH_DEPTH,
69        }
70    }
71}
72
73impl OpenOptions {
74    pub fn new() -> Self {
75        Self::default()
76    }
77
78    pub fn hint(mut self, hint: AccessHint) -> Self {
79        self.hint = hint;
80        self
81    }
82
83    pub fn max_pages(mut self, max_pages: usize) -> Self {
84        self.max_pages = max_pages;
85        self
86    }
87
88    pub fn page_size(mut self, page_size: usize) -> Self {
89        self.page_size = page_size;
90        self
91    }
92
93    pub fn coalesce_gap_pages(mut self, gap: usize) -> Self {
94        self.coalesce_gap_pages = gap;
95        self
96    }
97
98    pub fn prefetch_depth(mut self, depth: usize) -> Self {
99        self.prefetch_depth = depth;
100        self
101    }
102
103    pub fn validate(&self) -> Result<()> {
104        if self.page_size == 0 {
105            return Err(NxsError::ParseError(
106                "prefetch page_size must be greater than 0".into(),
107            ));
108        }
109        Ok(())
110    }
111}
112
113pub fn initial_strategy(hint: AccessHint, file_size: usize) -> PrefetchStrategy {
114    let file_size_mb = file_size / (1024 * 1024);
115    if hint == AccessHint::Full && file_size_mb <= EAGER_THRESHOLD_MB {
116        PrefetchStrategy::Eager
117    } else if file_size_mb > LAZY_THRESHOLD_MB {
118        PrefetchStrategy::Lazy
119    } else {
120        PrefetchStrategy::Adaptive
121    }
122}
123
124/// Row-layout data sector byte range `[start, start+len)`.
125pub fn row_data_sector(tail_start: usize, file_size: usize) -> (usize, usize) {
126    let start = 32;
127    if tail_start > start && tail_start <= file_size {
128        (start, tail_start - start)
129    } else {
130        (start, 0)
131    }
132}
133
134/// A coalesced byte range covering one or more page indices.
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct CoalescedRange {
137    pub page_start: u32,
138    pub page_end: u32,
139    pub byte_start: usize,
140    pub byte_length: usize,
141}
142
143pub fn coalesce_page_indices(
144    indices: &[u32],
145    gap_pages: usize,
146    page_size: usize,
147) -> Vec<CoalescedRange> {
148    if indices.is_empty() {
149        return Vec::new();
150    }
151    let mut uniq: Vec<u32> = indices.to_vec();
152    uniq.sort_unstable();
153    uniq.dedup();
154
155    let mut spans: Vec<(u32, u32)> = Vec::new();
156    let mut start = uniq[0];
157    let mut end = uniq[0];
158    for &idx in &uniq[1..] {
159        if idx.saturating_sub(end) <= gap_pages as u32 {
160            end = idx;
161        } else {
162            spans.push((start, end));
163            start = idx;
164            end = idx;
165        }
166    }
167    spans.push((start, end));
168
169    spans
170        .into_iter()
171        .filter_map(|(a, b)| {
172            let count = (b - a).checked_add(1)? as usize;
173            let byte_start = (a as usize).checked_mul(page_size)?;
174            let byte_length = count.checked_mul(page_size)?;
175            Some(CoalescedRange {
176                page_start: a,
177                page_end: b,
178                byte_start,
179                byte_length,
180            })
181        })
182        .collect()
183}
184
185pub fn clamp_ranges(ranges: Vec<CoalescedRange>, file_size: usize) -> Vec<CoalescedRange> {
186    ranges
187        .into_iter()
188        .filter_map(|mut r| {
189            if r.byte_start >= file_size {
190                return None;
191            }
192            let end = r.byte_start.checked_add(r.byte_length)?;
193            if end > file_size {
194                r.byte_length = file_size - r.byte_start;
195            }
196            if r.byte_length == 0 {
197                return None;
198            }
199            Some(r)
200        })
201        .collect()
202}
203
204struct PageEntry {
205    data: Vec<u8>,
206    last_used: u64,
207    pinned: bool,
208}
209
210pub struct PageCache {
211    max_pages: usize,
212    page_size: usize,
213    pages: HashMap<u32, PageEntry>,
214    clock: u64,
215    hits: u64,
216    misses: u64,
217}
218
219impl PageCache {
220    pub fn new(max_pages: usize, page_size: usize) -> Self {
221        Self {
222            max_pages,
223            page_size,
224            pages: HashMap::new(),
225            clock: 0,
226            hits: 0,
227            misses: 0,
228        }
229    }
230
231    pub fn page_size(&self) -> usize {
232        self.page_size
233    }
234
235    pub fn has(&self, page_index: u32) -> bool {
236        self.pages.contains_key(&page_index)
237    }
238
239    pub fn get(&mut self, page_index: u32) -> Option<&[u8]> {
240        let entry = self.pages.get_mut(&page_index)?;
241        self.clock = self.clock.saturating_add(1);
242        entry.last_used = self.clock;
243        self.hits = self.hits.saturating_add(1);
244        Some(entry.data.as_slice())
245    }
246
247    pub fn set(&mut self, page_index: u32, data: Vec<u8>, pinned: bool) -> bool {
248        if self.max_pages == 0 {
249            return false;
250        }
251        if !self.pages.contains_key(&page_index) {
252            while self.pages.len() >= self.max_pages {
253                if !self.evict_one() {
254                    return false;
255                }
256            }
257        }
258        self.clock = self.clock.saturating_add(1);
259        self.pages.insert(
260            page_index,
261            PageEntry {
262                data,
263                last_used: self.clock,
264                pinned,
265            },
266        );
267        true
268    }
269
270    fn evict_one(&mut self) -> bool {
271        let victim = self
272            .pages
273            .iter()
274            .filter(|(_, e)| !e.pinned)
275            .min_by_key(|(_, e)| e.last_used)
276            .map(|(k, _)| *k);
277        if let Some(v) = victim {
278            self.pages.remove(&v);
279            true
280        } else {
281            false
282        }
283    }
284
285    pub fn pin_pages(&mut self, page_indices: &[u32]) {
286        for &p in page_indices {
287            if let Some(e) = self.pages.get_mut(&p) {
288                e.pinned = true;
289            }
290        }
291    }
292
293    pub fn unpin_all(&mut self) {
294        for e in self.pages.values_mut() {
295            e.pinned = false;
296        }
297    }
298
299    pub fn pages_cached(&self) -> usize {
300        self.pages.len()
301    }
302
303    pub fn memory_used_bytes(&self) -> usize {
304        self.pages.values().map(|e| e.data.len()).sum()
305    }
306
307    pub fn hits(&self) -> u64 {
308        self.hits
309    }
310
311    pub fn misses(&self) -> u64 {
312        self.misses
313    }
314
315    pub fn note_miss(&mut self) {
316        self.misses = self.misses.saturating_add(1);
317    }
318}
319
320#[derive(Debug, Clone, PartialEq, Eq)]
321pub struct CacheStats {
322    pub pages_cached: usize,
323    pub pages_max: usize,
324    pub memory_used_bytes: usize,
325    pub cache_hits: u64,
326    pub cache_misses: u64,
327    pub fetches_issued: u64,
328    pub column_fetches_issued: u64,
329    pub strategy: String,
330    pub pattern: String,
331}
332
333struct EagerState {
334    cancelled: Arc<AtomicBool>,
335    complete: Arc<AtomicBool>,
336    started: AtomicBool,
337    join: Mutex<Option<JoinHandle<()>>>,
338}
339
340impl EagerState {
341    fn new() -> Self {
342        Self {
343            cancelled: Arc::new(AtomicBool::new(false)),
344            complete: Arc::new(AtomicBool::new(false)),
345            started: AtomicBool::new(false),
346            join: Mutex::new(None),
347        }
348    }
349}
350
351/// Per-reader prefetch engine (`Send + Sync`).
352pub struct PrefetchEngine {
353    cache: Arc<Mutex<PageCache>>,
354    in_flight: Arc<Mutex<HashSet<u32>>>,
355    fetches_issued: Arc<AtomicU64>,
356    options: OpenOptions,
357    strategy: Mutex<PrefetchStrategy>,
358    detector: Mutex<AccessPatternDetector>,
359    file_size: usize,
360    eager: EagerState,
361    paused: AtomicBool,
362}
363
364impl PrefetchEngine {
365    pub fn new(options: OpenOptions, file_size: usize) -> Self {
366        let strategy = initial_strategy(options.hint, file_size);
367        Self {
368            cache: Arc::new(Mutex::new(PageCache::new(
369                options.max_pages,
370                options.page_size,
371            ))),
372            in_flight: Arc::new(Mutex::new(HashSet::new())),
373            fetches_issued: Arc::new(AtomicU64::new(0)),
374            options,
375            strategy: Mutex::new(strategy),
376            detector: Mutex::new(AccessPatternDetector::new()),
377            file_size,
378            eager: EagerState::new(),
379            paused: AtomicBool::new(false),
380        }
381    }
382
383    pub fn pause_prefetch(&self) {
384        self.paused.store(true, Ordering::Release);
385    }
386
387    pub fn resume_prefetch(&self) {
388        self.paused.store(false, Ordering::Release);
389    }
390
391    fn is_paused(&self) -> bool {
392        self.paused.load(Ordering::Acquire)
393    }
394
395    pub fn options(&self) -> &OpenOptions {
396        &self.options
397    }
398
399    pub fn strategy(&self) -> PrefetchStrategy {
400        *self.strategy.lock().expect("prefetch strategy lock")
401    }
402
403    pub fn is_eager(&self) -> bool {
404        self.strategy() == PrefetchStrategy::Eager && self.eager.complete.load(Ordering::Acquire)
405    }
406
407    pub fn cache_stats(&self) -> CacheStats {
408        let cache = self.cache.lock().expect("prefetch cache lock");
409        let detector = self.detector.lock().expect("prefetch detector lock");
410        CacheStats {
411            pages_cached: cache.pages_cached(),
412            pages_max: self.options.max_pages,
413            memory_used_bytes: cache.memory_used_bytes(),
414            cache_hits: cache.hits(),
415            cache_misses: cache.misses(),
416            fetches_issued: self.fetches_issued.load(Ordering::Relaxed),
417            column_fetches_issued: 0,
418            strategy: self.strategy().as_str().to_string(),
419            pattern: detector.pattern().as_str().to_string(),
420        }
421    }
422
423    /// Start eager background load of the row data sector (§7.3).
424    pub fn start_eager_background(&self, data: Vec<u8>, tail_start: usize) {
425        if self.strategy() != PrefetchStrategy::Eager || self.is_paused() {
426            return;
427        }
428        if self.eager.started.swap(true, Ordering::AcqRel) {
429            return;
430        }
431        let (sector_start, sector_len) = row_data_sector(tail_start, data.len());
432        if sector_len == 0 {
433            self.eager.complete.store(true, Ordering::Release);
434            return;
435        }
436        let cancelled = Arc::clone(&self.eager.cancelled);
437        let complete = Arc::clone(&self.eager.complete);
438        let fetches = Arc::clone(&self.fetches_issued);
439        let page_size = self.options.page_size;
440        let gap = self.options.coalesce_gap_pages;
441        let cache = Arc::clone(&self.cache);
442        let in_flight = Arc::clone(&self.in_flight);
443
444        let handle = thread::spawn(move || {
445            if cancelled.load(Ordering::Acquire) {
446                return;
447            }
448            let end = sector_start.saturating_add(sector_len).min(data.len());
449            let first_page = sector_start / page_size;
450            let last_page = (end.saturating_sub(1)) / page_size;
451            let indices: Vec<u32> = (first_page..=last_page).map(|p| p as u32).collect();
452            let ranges = clamp_ranges(coalesce_page_indices(&indices, gap, page_size), data.len());
453            for range in ranges {
454                if cancelled.load(Ordering::Acquire) {
455                    return;
456                }
457                fetches.fetch_add(1, Ordering::Relaxed);
458                for p in range.page_start..=range.page_end {
459                    in_flight.lock().expect("in_flight").insert(p);
460                }
461                for p in range.page_start..=range.page_end {
462                    if cancelled.load(Ordering::Acquire) {
463                        return;
464                    }
465                    let p_usize = p as usize;
466                    let byte_start = p_usize * page_size;
467                    if byte_start >= data.len() {
468                        in_flight.lock().expect("in_flight").remove(&p);
469                        continue;
470                    }
471                    let byte_end = ((p_usize + 1) * page_size).min(data.len());
472                    let page_data = data[byte_start..byte_end].to_vec();
473                    cache.lock().expect("cache").set(p, page_data, false);
474                    in_flight.lock().expect("in_flight").remove(&p);
475                }
476            }
477            complete.store(true, Ordering::Release);
478        });
479        *self.eager.join.lock().expect("eager join") = Some(handle);
480    }
481
482    pub fn warmup(&self) {
483        while !self.eager.complete.load(Ordering::Acquire)
484            && !self.eager.cancelled.load(Ordering::Acquire)
485        {
486            std::thread::yield_now();
487        }
488        if let Some(handle) = self.eager.join.lock().expect("eager join").take() {
489            let _ = handle.join();
490        }
491    }
492
493    pub fn on_access(&self, data: &[u8], tail_start: usize, record_count: usize, index: usize) {
494        if record_count == 0 || self.is_paused() {
495            return;
496        }
497        {
498            let mut detector = self.detector.lock().expect("prefetch detector lock");
499            detector.observe(index);
500            self.maybe_upgrade_to_eager(&detector, data, tail_start);
501        }
502        if self.is_eager() || self.strategy() == PrefetchStrategy::Eager {
503            return;
504        }
505        if let Some(off) = row_record_offset(data, tail_start, index) {
506            let page_index = (off / self.options.page_size) as u32;
507            self.touch_page(page_index);
508        }
509        if self.strategy() == PrefetchStrategy::Adaptive {
510            let detector = self.detector.lock().expect("prefetch detector lock");
511            if detector.pattern() == AccessPattern::Sequential {
512                self.speculative_prefetch(data, tail_start, record_count, &detector);
513            }
514        }
515    }
516
517    fn maybe_upgrade_to_eager(
518        &self,
519        detector: &AccessPatternDetector,
520        data: &[u8],
521        tail_start: usize,
522    ) {
523        let mut strategy = self.strategy.lock().expect("prefetch strategy lock");
524        if *strategy != PrefetchStrategy::Adaptive || self.is_paused() {
525            return;
526        }
527        if detector.pattern() != AccessPattern::Sequential {
528            return;
529        }
530        if detector.sequential_runs() < UPGRADE_SEQUENTIAL_THRESHOLD {
531            return;
532        }
533        let file_size_mb = self.file_size / (1024 * 1024);
534        if file_size_mb > EAGER_THRESHOLD_MB {
535            return;
536        }
537        *strategy = PrefetchStrategy::Eager;
538        drop(strategy);
539        self.start_eager_background(data.to_vec(), tail_start);
540    }
541
542    fn speculative_prefetch(
543        &self,
544        data: &[u8],
545        tail_start: usize,
546        record_count: usize,
547        detector: &AccessPatternDetector,
548    ) {
549        if self.is_paused() || self.eager.cancelled.load(Ordering::Acquire) {
550            return;
551        }
552        let depth = self.options.prefetch_depth;
553        let predicted = detector.predict_next(depth, record_count);
554        let page_size = self.options.page_size;
555        let mut page_indices: Vec<u32> = Vec::new();
556        for idx in predicted {
557            if let Some(off) = row_record_offset(data, tail_start, idx) {
558                page_indices.push((off / page_size) as u32);
559            }
560        }
561        page_indices.sort_unstable();
562        page_indices.dedup();
563        let needed: Vec<u32> = {
564            let cache = self.cache.lock().expect("prefetch cache lock");
565            let in_flight = self.in_flight.lock().expect("prefetch in_flight lock");
566            page_indices
567                .into_iter()
568                .filter(|&p| !cache.has(p) && !in_flight.contains(&p))
569                .collect()
570        };
571        if needed.is_empty() {
572            return;
573        }
574        let gap = self.options.coalesce_gap_pages;
575        let file_size = data.len();
576        let ranges = clamp_ranges(coalesce_page_indices(&needed, gap, page_size), file_size);
577        for range in ranges {
578            self.fetch_range(data, &range, page_size, file_size);
579        }
580    }
581
582    pub fn touch_page(&self, page_index: u32) {
583        if self.is_eager() {
584            return;
585        }
586        let mut cache = self.cache.lock().expect("prefetch cache lock");
587        if cache.get(page_index).is_none() {
588            cache.note_miss();
589        }
590    }
591
592    pub fn prefetch_viewport(
593        &self,
594        data: &[u8],
595        tail_start: usize,
596        record_count: usize,
597        start_index: usize,
598        end_index: usize,
599    ) {
600        if record_count == 0 || data.is_empty() {
601            return;
602        }
603        let start = start_index.min(record_count - 1);
604        let end = end_index.min(record_count - 1);
605        if start > end {
606            return;
607        }
608
609        let page_size = self.options.page_size;
610        let gap = self.options.coalesce_gap_pages;
611        let file_size = data.len();
612
613        let mut page_indices: Vec<u32> = Vec::new();
614        for i in start..=end {
615            if let Some(off) = row_record_offset(data, tail_start, i) {
616                page_indices.push((off / page_size) as u32);
617            }
618        }
619        page_indices.sort_unstable();
620        page_indices.dedup();
621
622        let pinned: Vec<u32> = page_indices.clone();
623
624        let needed: Vec<u32> = {
625            let cache = self.cache.lock().expect("prefetch cache lock");
626            let in_flight = self.in_flight.lock().expect("prefetch in_flight lock");
627            page_indices
628                .into_iter()
629                .filter(|&p| !cache.has(p) && !in_flight.contains(&p))
630                .collect()
631        };
632
633        let ranges = clamp_ranges(coalesce_page_indices(&needed, gap, page_size), file_size);
634
635        for range in ranges {
636            self.fetch_range(data, &range, page_size, file_size);
637        }
638
639        let mut cache = self.cache.lock().expect("prefetch cache lock");
640        cache.pin_pages(&pinned);
641        cache.unpin_all();
642    }
643
644    fn fetch_range(&self, data: &[u8], range: &CoalescedRange, page_size: usize, file_size: usize) {
645        {
646            let mut in_flight = self.in_flight.lock().expect("prefetch in_flight lock");
647            for p in range.page_start..=range.page_end {
648                in_flight.insert(p);
649            }
650        }
651
652        self.fetches_issued.fetch_add(1, Ordering::Relaxed);
653
654        for p in range.page_start..=range.page_end {
655            let p_usize = p as usize;
656            let byte_start = p_usize * page_size;
657            if byte_start >= file_size {
658                self.in_flight
659                    .lock()
660                    .expect("prefetch in_flight lock")
661                    .remove(&p);
662                continue;
663            }
664            let byte_end = ((p_usize + 1) * page_size).min(file_size);
665            let page_data = data[byte_start..byte_end].to_vec();
666            let mut cache = self.cache.lock().expect("prefetch cache lock");
667            cache.set(p, page_data, false);
668            self.in_flight
669                .lock()
670                .expect("prefetch in_flight lock")
671                .remove(&p);
672        }
673    }
674}
675
676impl Drop for PrefetchEngine {
677    fn drop(&mut self) {
678        self.eager.cancelled.store(true, Ordering::Release);
679        if let Some(handle) = self.eager.join.lock().expect("eager join").take() {
680            let _ = handle.join();
681        }
682    }
683}
684
685pub fn row_record_offset(data: &[u8], tail_start: usize, index: usize) -> Option<usize> {
686    let entry = tail_start.checked_add(index.checked_mul(10)?)?;
687    let end = entry.checked_add(10)?;
688    Some(u64::from_le_bytes(data.get(entry + 2..end)?.try_into().ok()?) as usize)
689}
690
691pub fn page_indices_for_viewport(
692    start_index: usize,
693    end_index: usize,
694    page_size: usize,
695    record_offset: impl Fn(usize) -> Option<usize>,
696) -> Vec<u32> {
697    let mut out = Vec::new();
698    for i in start_index..=end_index {
699        if let Some(off) = record_offset(i) {
700            out.push((off / page_size) as u32);
701        }
702    }
703    out.sort_unstable();
704    out.dedup();
705    out
706}
707
708#[cfg(test)]
709mod tests {
710    use super::*;
711    use crate::query::{Layout, Reader};
712    use crate::writer::{NxsWriter, Schema};
713
714    fn make_sparse_nxb(n: usize) -> Vec<u8> {
715        let schema = Schema::new(&["id", "payload"]);
716        let mut w = NxsWriter::new(&schema);
717        for i in 0..n {
718            w.begin_object();
719            w.write_i64(crate::writer::Slot(0), i as i64);
720            let pad = format!("record-{i:04}-{}", "x".repeat(4096 + (i % 7) * 512));
721            w.write_str(crate::writer::Slot(1), &pad);
722            w.end_object();
723        }
724        w.finish()
725    }
726
727    fn make_compact_nxb(n: usize) -> Vec<u8> {
728        let schema = Schema::new(&["id", "tag"]);
729        let mut w = NxsWriter::new(&schema);
730        for i in 0..n {
731            w.begin_object();
732            w.write_i64(crate::writer::Slot(0), i as i64);
733            w.write_str(crate::writer::Slot(1), &format!("r{i}"));
734            w.end_object();
735        }
736        w.finish()
737    }
738
739    #[test]
740    fn hint_full_small_file_eager_at_open() {
741        let data = make_compact_nxb(200);
742        assert!(data.len() <= EAGER_THRESHOLD_MB * 1024 * 1024);
743        let opts = OpenOptions::new().hint(AccessHint::Full);
744        let reader = Reader::with_options(&data, opts).unwrap();
745        reader.warmup();
746        assert_eq!(reader.cache_stats().strategy, "eager");
747    }
748
749    #[test]
750    fn sequential_upgrade_to_eager() {
751        let data = make_compact_nxb(200);
752        let reader = Reader::with_options(&data, OpenOptions::new()).unwrap();
753        for i in 0..150 {
754            let _ = reader.record(i);
755        }
756        reader.warmup();
757        assert_eq!(reader.cache_stats().strategy, "eager");
758        assert_eq!(reader.cache_stats().pattern, "sequential");
759    }
760
761    #[test]
762    fn pause_stops_speculative_prefetch() {
763        // Small pages so speculative windows do not overlap after pause/resume.
764        let opts = OpenOptions::new()
765            .page_size(4096)
766            .coalesce_gap_pages(0)
767            .prefetch_depth(4);
768        let data = make_sparse_nxb(50);
769        let reader = Reader::with_options(&data, opts).unwrap();
770        for i in 0..21 {
771            let _ = reader.record(i);
772        }
773        assert_eq!(reader.cache_stats().pattern, "sequential");
774        let before = reader.cache_stats().fetches_issued;
775        reader.pause_prefetch();
776        for i in 21..26 {
777            let _ = reader.record(i);
778        }
779        assert_eq!(reader.cache_stats().fetches_issued, before);
780        reader.resume_prefetch();
781        let _ = reader.record(26);
782        assert!(reader.cache_stats().fetches_issued > before);
783    }
784
785    #[test]
786    fn prefetch_cancel_conformance_vector() {
787        let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
788            .join("../conformance/prefetch/prefetch_cancel.nxb");
789        let data = match std::fs::read(&path) {
790            Ok(d) => d,
791            Err(_) => return,
792        };
793        let opts = OpenOptions::new().hint(AccessHint::Full);
794        let reader = Reader::with_options(&data, opts).unwrap();
795        let issued_before_close = reader.cache_stats().fetches_issued;
796        drop(reader);
797        assert!(issued_before_close <= 50);
798    }
799
800    #[test]
801    fn eager_cancel_on_close_no_extra_fetches() {
802        let data = make_compact_nxb(500);
803        let opts = OpenOptions::new().hint(AccessHint::Full);
804        let reader = Reader::with_options(&data, opts).unwrap();
805        let issued = reader.cache_stats().fetches_issued;
806        drop(reader);
807        assert!(issued <= 50);
808    }
809
810    #[test]
811    fn coalesce_adjacent_pages() {
812        let indices = vec![3, 4, 6, 7, 12];
813        let ranges = coalesce_page_indices(&indices, 1, DEFAULT_PAGE_SIZE);
814        assert_eq!(ranges.len(), 3);
815    }
816
817    #[test]
818    fn prefetch_viewport_populates_cache() {
819        let data = make_sparse_nxb(50);
820        let reader = Reader::with_options(&data, OpenOptions::new()).unwrap();
821        reader.prefetch_viewport(0, 49).unwrap();
822        assert!(reader.cache_stats().pages_cached > 0);
823    }
824
825    #[test]
826    fn prefetch_columnar_fast_path_conformance_vector() {
827        let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
828            .join("../conformance/prefetch/prefetch_columnar_fast_path.nxb");
829        let data = match std::fs::read(&path) {
830            Ok(d) => d,
831            Err(_) => return,
832        };
833        let reader = Reader::new(&data).unwrap();
834        assert_eq!(reader.layout(), Layout::Columnar);
835        reader.prefetch_column("score").unwrap();
836        assert_eq!(reader.cache_stats().column_fetches_issued, 1);
837        reader.prefetch_column("score").unwrap();
838        assert_eq!(reader.cache_stats().column_fetches_issued, 1);
839        let sum = reader.col_sum_f64("score").unwrap();
840        assert!((sum - 2475.0).abs() < 1e-9);
841    }
842
843    #[test]
844    fn prefetch_sequential_upgrade_conformance_vector() {
845        let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
846            .join("../conformance/prefetch/prefetch_sequential_upgrade.nxb");
847        let data = match std::fs::read(&path) {
848            Ok(d) => d,
849            Err(_) => return,
850        };
851        let reader = Reader::with_options(&data, OpenOptions::new()).unwrap();
852        for i in 0..150 {
853            let _ = reader.record(i);
854        }
855        reader.warmup();
856        let stats = reader.cache_stats();
857        assert_eq!(stats.strategy, "eager");
858        assert_eq!(stats.pattern, "sequential");
859        assert!(stats.fetches_issued >= 1);
860    }
861
862    #[test]
863    fn open_options_rejects_zero_page_size() {
864        let opts = OpenOptions::new().page_size(0);
865        assert!(opts.validate().is_err());
866    }
867}