1mod pattern;
4
5use std::collections::{HashMap, HashSet};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread::{self, JoinHandle};
9
10use crate::error::{NxsError, Result};
11
12pub use pattern::{AccessPattern, AccessPatternDetector, UPGRADE_SEQUENTIAL_THRESHOLD};
13
14pub const DEFAULT_PAGE_SIZE: usize = 65_536;
15pub const DEFAULT_MAX_PAGES: usize = 256;
16pub const DEFAULT_COALESCE_GAP_PAGES: usize = 1;
17pub const DEFAULT_PREFETCH_DEPTH: usize = 4;
18pub const EAGER_THRESHOLD_MB: usize = 10;
19pub const LAZY_THRESHOLD_MB: usize = 50;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23#[repr(u8)]
24pub enum AccessHint {
25 #[default]
26 Unknown = 0,
27 Sequential = 1,
28 Random = 2,
29 Full = 3,
30 Partial = 4,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum PrefetchStrategy {
36 Lazy,
37 Adaptive,
38 Eager,
39}
40
41impl PrefetchStrategy {
42 pub fn as_str(self) -> &'static str {
43 match self {
44 Self::Lazy => "lazy",
45 Self::Adaptive => "adaptive",
46 Self::Eager => "eager",
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
53pub struct OpenOptions {
54 pub hint: AccessHint,
55 pub max_pages: usize,
56 pub page_size: usize,
57 pub coalesce_gap_pages: usize,
58 pub prefetch_depth: usize,
59}
60
61impl Default for OpenOptions {
62 fn default() -> Self {
63 Self {
64 hint: AccessHint::Unknown,
65 max_pages: DEFAULT_MAX_PAGES,
66 page_size: DEFAULT_PAGE_SIZE,
67 coalesce_gap_pages: DEFAULT_COALESCE_GAP_PAGES,
68 prefetch_depth: DEFAULT_PREFETCH_DEPTH,
69 }
70 }
71}
72
73impl OpenOptions {
74 pub fn new() -> Self {
75 Self::default()
76 }
77
78 pub fn hint(mut self, hint: AccessHint) -> Self {
79 self.hint = hint;
80 self
81 }
82
83 pub fn max_pages(mut self, max_pages: usize) -> Self {
84 self.max_pages = max_pages;
85 self
86 }
87
88 pub fn page_size(mut self, page_size: usize) -> Self {
89 self.page_size = page_size;
90 self
91 }
92
93 pub fn coalesce_gap_pages(mut self, gap: usize) -> Self {
94 self.coalesce_gap_pages = gap;
95 self
96 }
97
98 pub fn prefetch_depth(mut self, depth: usize) -> Self {
99 self.prefetch_depth = depth;
100 self
101 }
102
103 pub fn validate(&self) -> Result<()> {
104 if self.page_size == 0 {
105 return Err(NxsError::ParseError(
106 "prefetch page_size must be greater than 0".into(),
107 ));
108 }
109 Ok(())
110 }
111}
112
113pub fn initial_strategy(hint: AccessHint, file_size: usize) -> PrefetchStrategy {
114 let file_size_mb = file_size / (1024 * 1024);
115 if hint == AccessHint::Full && file_size_mb <= EAGER_THRESHOLD_MB {
116 PrefetchStrategy::Eager
117 } else if file_size_mb > LAZY_THRESHOLD_MB {
118 PrefetchStrategy::Lazy
119 } else {
120 PrefetchStrategy::Adaptive
121 }
122}
123
124pub fn row_data_sector(tail_start: usize, file_size: usize) -> (usize, usize) {
126 let start = 32;
127 if tail_start > start && tail_start <= file_size {
128 (start, tail_start - start)
129 } else {
130 (start, 0)
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct CoalescedRange {
137 pub page_start: u32,
138 pub page_end: u32,
139 pub byte_start: usize,
140 pub byte_length: usize,
141}
142
143pub fn coalesce_page_indices(
144 indices: &[u32],
145 gap_pages: usize,
146 page_size: usize,
147) -> Vec<CoalescedRange> {
148 if indices.is_empty() {
149 return Vec::new();
150 }
151 let mut uniq: Vec<u32> = indices.to_vec();
152 uniq.sort_unstable();
153 uniq.dedup();
154
155 let mut spans: Vec<(u32, u32)> = Vec::new();
156 let mut start = uniq[0];
157 let mut end = uniq[0];
158 for &idx in &uniq[1..] {
159 if idx.saturating_sub(end) <= gap_pages as u32 {
160 end = idx;
161 } else {
162 spans.push((start, end));
163 start = idx;
164 end = idx;
165 }
166 }
167 spans.push((start, end));
168
169 spans
170 .into_iter()
171 .filter_map(|(a, b)| {
172 let count = (b - a).checked_add(1)? as usize;
173 let byte_start = (a as usize).checked_mul(page_size)?;
174 let byte_length = count.checked_mul(page_size)?;
175 Some(CoalescedRange {
176 page_start: a,
177 page_end: b,
178 byte_start,
179 byte_length,
180 })
181 })
182 .collect()
183}
184
185pub fn clamp_ranges(ranges: Vec<CoalescedRange>, file_size: usize) -> Vec<CoalescedRange> {
186 ranges
187 .into_iter()
188 .filter_map(|mut r| {
189 if r.byte_start >= file_size {
190 return None;
191 }
192 let end = r.byte_start.checked_add(r.byte_length)?;
193 if end > file_size {
194 r.byte_length = file_size - r.byte_start;
195 }
196 if r.byte_length == 0 {
197 return None;
198 }
199 Some(r)
200 })
201 .collect()
202}
203
204struct PageEntry {
205 data: Vec<u8>,
206 last_used: u64,
207 pinned: bool,
208}
209
210pub struct PageCache {
211 max_pages: usize,
212 page_size: usize,
213 pages: HashMap<u32, PageEntry>,
214 clock: u64,
215 hits: u64,
216 misses: u64,
217}
218
219impl PageCache {
220 pub fn new(max_pages: usize, page_size: usize) -> Self {
221 Self {
222 max_pages,
223 page_size,
224 pages: HashMap::new(),
225 clock: 0,
226 hits: 0,
227 misses: 0,
228 }
229 }
230
231 pub fn page_size(&self) -> usize {
232 self.page_size
233 }
234
235 pub fn has(&self, page_index: u32) -> bool {
236 self.pages.contains_key(&page_index)
237 }
238
239 pub fn get(&mut self, page_index: u32) -> Option<&[u8]> {
240 let entry = self.pages.get_mut(&page_index)?;
241 self.clock = self.clock.saturating_add(1);
242 entry.last_used = self.clock;
243 self.hits = self.hits.saturating_add(1);
244 Some(entry.data.as_slice())
245 }
246
247 pub fn set(&mut self, page_index: u32, data: Vec<u8>, pinned: bool) -> bool {
248 if self.max_pages == 0 {
249 return false;
250 }
251 if !self.pages.contains_key(&page_index) {
252 while self.pages.len() >= self.max_pages {
253 if !self.evict_one() {
254 return false;
255 }
256 }
257 }
258 self.clock = self.clock.saturating_add(1);
259 self.pages.insert(
260 page_index,
261 PageEntry {
262 data,
263 last_used: self.clock,
264 pinned,
265 },
266 );
267 true
268 }
269
270 fn evict_one(&mut self) -> bool {
271 let victim = self
272 .pages
273 .iter()
274 .filter(|(_, e)| !e.pinned)
275 .min_by_key(|(_, e)| e.last_used)
276 .map(|(k, _)| *k);
277 if let Some(v) = victim {
278 self.pages.remove(&v);
279 true
280 } else {
281 false
282 }
283 }
284
285 pub fn pin_pages(&mut self, page_indices: &[u32]) {
286 for &p in page_indices {
287 if let Some(e) = self.pages.get_mut(&p) {
288 e.pinned = true;
289 }
290 }
291 }
292
293 pub fn unpin_all(&mut self) {
294 for e in self.pages.values_mut() {
295 e.pinned = false;
296 }
297 }
298
299 pub fn pages_cached(&self) -> usize {
300 self.pages.len()
301 }
302
303 pub fn memory_used_bytes(&self) -> usize {
304 self.pages.values().map(|e| e.data.len()).sum()
305 }
306
307 pub fn hits(&self) -> u64 {
308 self.hits
309 }
310
311 pub fn misses(&self) -> u64 {
312 self.misses
313 }
314
315 pub fn note_miss(&mut self) {
316 self.misses = self.misses.saturating_add(1);
317 }
318}
319
320#[derive(Debug, Clone, PartialEq, Eq)]
321pub struct CacheStats {
322 pub pages_cached: usize,
323 pub pages_max: usize,
324 pub memory_used_bytes: usize,
325 pub cache_hits: u64,
326 pub cache_misses: u64,
327 pub fetches_issued: u64,
328 pub column_fetches_issued: u64,
329 pub strategy: String,
330 pub pattern: String,
331}
332
333struct EagerState {
334 cancelled: Arc<AtomicBool>,
335 complete: Arc<AtomicBool>,
336 started: AtomicBool,
337 join: Mutex<Option<JoinHandle<()>>>,
338}
339
340impl EagerState {
341 fn new() -> Self {
342 Self {
343 cancelled: Arc::new(AtomicBool::new(false)),
344 complete: Arc::new(AtomicBool::new(false)),
345 started: AtomicBool::new(false),
346 join: Mutex::new(None),
347 }
348 }
349}
350
351pub struct PrefetchEngine {
353 cache: Arc<Mutex<PageCache>>,
354 in_flight: Arc<Mutex<HashSet<u32>>>,
355 fetches_issued: Arc<AtomicU64>,
356 options: OpenOptions,
357 strategy: Mutex<PrefetchStrategy>,
358 detector: Mutex<AccessPatternDetector>,
359 file_size: usize,
360 eager: EagerState,
361 paused: AtomicBool,
362}
363
364impl PrefetchEngine {
365 pub fn new(options: OpenOptions, file_size: usize) -> Self {
366 let strategy = initial_strategy(options.hint, file_size);
367 Self {
368 cache: Arc::new(Mutex::new(PageCache::new(
369 options.max_pages,
370 options.page_size,
371 ))),
372 in_flight: Arc::new(Mutex::new(HashSet::new())),
373 fetches_issued: Arc::new(AtomicU64::new(0)),
374 options,
375 strategy: Mutex::new(strategy),
376 detector: Mutex::new(AccessPatternDetector::new()),
377 file_size,
378 eager: EagerState::new(),
379 paused: AtomicBool::new(false),
380 }
381 }
382
383 pub fn pause_prefetch(&self) {
384 self.paused.store(true, Ordering::Release);
385 }
386
387 pub fn resume_prefetch(&self) {
388 self.paused.store(false, Ordering::Release);
389 }
390
391 fn is_paused(&self) -> bool {
392 self.paused.load(Ordering::Acquire)
393 }
394
395 pub fn options(&self) -> &OpenOptions {
396 &self.options
397 }
398
399 pub fn strategy(&self) -> PrefetchStrategy {
400 *self.strategy.lock().expect("prefetch strategy lock")
401 }
402
403 pub fn is_eager(&self) -> bool {
404 self.strategy() == PrefetchStrategy::Eager && self.eager.complete.load(Ordering::Acquire)
405 }
406
407 pub fn cache_stats(&self) -> CacheStats {
408 let cache = self.cache.lock().expect("prefetch cache lock");
409 let detector = self.detector.lock().expect("prefetch detector lock");
410 CacheStats {
411 pages_cached: cache.pages_cached(),
412 pages_max: self.options.max_pages,
413 memory_used_bytes: cache.memory_used_bytes(),
414 cache_hits: cache.hits(),
415 cache_misses: cache.misses(),
416 fetches_issued: self.fetches_issued.load(Ordering::Relaxed),
417 column_fetches_issued: 0,
418 strategy: self.strategy().as_str().to_string(),
419 pattern: detector.pattern().as_str().to_string(),
420 }
421 }
422
423 pub fn start_eager_background(&self, data: Vec<u8>, tail_start: usize) {
425 if self.strategy() != PrefetchStrategy::Eager || self.is_paused() {
426 return;
427 }
428 if self.eager.started.swap(true, Ordering::AcqRel) {
429 return;
430 }
431 let (sector_start, sector_len) = row_data_sector(tail_start, data.len());
432 if sector_len == 0 {
433 self.eager.complete.store(true, Ordering::Release);
434 return;
435 }
436 let cancelled = Arc::clone(&self.eager.cancelled);
437 let complete = Arc::clone(&self.eager.complete);
438 let fetches = Arc::clone(&self.fetches_issued);
439 let page_size = self.options.page_size;
440 let gap = self.options.coalesce_gap_pages;
441 let cache = Arc::clone(&self.cache);
442 let in_flight = Arc::clone(&self.in_flight);
443
444 let handle = thread::spawn(move || {
445 if cancelled.load(Ordering::Acquire) {
446 return;
447 }
448 let end = sector_start.saturating_add(sector_len).min(data.len());
449 let first_page = sector_start / page_size;
450 let last_page = (end.saturating_sub(1)) / page_size;
451 let indices: Vec<u32> = (first_page..=last_page).map(|p| p as u32).collect();
452 let ranges = clamp_ranges(coalesce_page_indices(&indices, gap, page_size), data.len());
453 for range in ranges {
454 if cancelled.load(Ordering::Acquire) {
455 return;
456 }
457 fetches.fetch_add(1, Ordering::Relaxed);
458 for p in range.page_start..=range.page_end {
459 in_flight.lock().expect("in_flight").insert(p);
460 }
461 for p in range.page_start..=range.page_end {
462 if cancelled.load(Ordering::Acquire) {
463 return;
464 }
465 let p_usize = p as usize;
466 let byte_start = p_usize * page_size;
467 if byte_start >= data.len() {
468 in_flight.lock().expect("in_flight").remove(&p);
469 continue;
470 }
471 let byte_end = ((p_usize + 1) * page_size).min(data.len());
472 let page_data = data[byte_start..byte_end].to_vec();
473 cache.lock().expect("cache").set(p, page_data, false);
474 in_flight.lock().expect("in_flight").remove(&p);
475 }
476 }
477 complete.store(true, Ordering::Release);
478 });
479 *self.eager.join.lock().expect("eager join") = Some(handle);
480 }
481
482 pub fn warmup(&self) {
483 while !self.eager.complete.load(Ordering::Acquire)
484 && !self.eager.cancelled.load(Ordering::Acquire)
485 {
486 std::thread::yield_now();
487 }
488 if let Some(handle) = self.eager.join.lock().expect("eager join").take() {
489 let _ = handle.join();
490 }
491 }
492
493 pub fn on_access(&self, data: &[u8], tail_start: usize, record_count: usize, index: usize) {
494 if record_count == 0 || self.is_paused() {
495 return;
496 }
497 {
498 let mut detector = self.detector.lock().expect("prefetch detector lock");
499 detector.observe(index);
500 self.maybe_upgrade_to_eager(&detector, data, tail_start);
501 }
502 if self.is_eager() || self.strategy() == PrefetchStrategy::Eager {
503 return;
504 }
505 if let Some(off) = row_record_offset(data, tail_start, index) {
506 let page_index = (off / self.options.page_size) as u32;
507 self.touch_page(page_index);
508 }
509 if self.strategy() == PrefetchStrategy::Adaptive {
510 let detector = self.detector.lock().expect("prefetch detector lock");
511 if detector.pattern() == AccessPattern::Sequential {
512 self.speculative_prefetch(data, tail_start, record_count, &detector);
513 }
514 }
515 }
516
517 fn maybe_upgrade_to_eager(
518 &self,
519 detector: &AccessPatternDetector,
520 data: &[u8],
521 tail_start: usize,
522 ) {
523 let mut strategy = self.strategy.lock().expect("prefetch strategy lock");
524 if *strategy != PrefetchStrategy::Adaptive || self.is_paused() {
525 return;
526 }
527 if detector.pattern() != AccessPattern::Sequential {
528 return;
529 }
530 if detector.sequential_runs() < UPGRADE_SEQUENTIAL_THRESHOLD {
531 return;
532 }
533 let file_size_mb = self.file_size / (1024 * 1024);
534 if file_size_mb > EAGER_THRESHOLD_MB {
535 return;
536 }
537 *strategy = PrefetchStrategy::Eager;
538 drop(strategy);
539 self.start_eager_background(data.to_vec(), tail_start);
540 }
541
542 fn speculative_prefetch(
543 &self,
544 data: &[u8],
545 tail_start: usize,
546 record_count: usize,
547 detector: &AccessPatternDetector,
548 ) {
549 if self.is_paused() || self.eager.cancelled.load(Ordering::Acquire) {
550 return;
551 }
552 let depth = self.options.prefetch_depth;
553 let predicted = detector.predict_next(depth, record_count);
554 let page_size = self.options.page_size;
555 let mut page_indices: Vec<u32> = Vec::new();
556 for idx in predicted {
557 if let Some(off) = row_record_offset(data, tail_start, idx) {
558 page_indices.push((off / page_size) as u32);
559 }
560 }
561 page_indices.sort_unstable();
562 page_indices.dedup();
563 let needed: Vec<u32> = {
564 let cache = self.cache.lock().expect("prefetch cache lock");
565 let in_flight = self.in_flight.lock().expect("prefetch in_flight lock");
566 page_indices
567 .into_iter()
568 .filter(|&p| !cache.has(p) && !in_flight.contains(&p))
569 .collect()
570 };
571 if needed.is_empty() {
572 return;
573 }
574 let gap = self.options.coalesce_gap_pages;
575 let file_size = data.len();
576 let ranges = clamp_ranges(coalesce_page_indices(&needed, gap, page_size), file_size);
577 for range in ranges {
578 self.fetch_range(data, &range, page_size, file_size);
579 }
580 }
581
582 pub fn touch_page(&self, page_index: u32) {
583 if self.is_eager() {
584 return;
585 }
586 let mut cache = self.cache.lock().expect("prefetch cache lock");
587 if cache.get(page_index).is_none() {
588 cache.note_miss();
589 }
590 }
591
592 pub fn prefetch_viewport(
593 &self,
594 data: &[u8],
595 tail_start: usize,
596 record_count: usize,
597 start_index: usize,
598 end_index: usize,
599 ) {
600 if record_count == 0 || data.is_empty() {
601 return;
602 }
603 let start = start_index.min(record_count - 1);
604 let end = end_index.min(record_count - 1);
605 if start > end {
606 return;
607 }
608
609 let page_size = self.options.page_size;
610 let gap = self.options.coalesce_gap_pages;
611 let file_size = data.len();
612
613 let mut page_indices: Vec<u32> = Vec::new();
614 for i in start..=end {
615 if let Some(off) = row_record_offset(data, tail_start, i) {
616 page_indices.push((off / page_size) as u32);
617 }
618 }
619 page_indices.sort_unstable();
620 page_indices.dedup();
621
622 let pinned: Vec<u32> = page_indices.clone();
623
624 let needed: Vec<u32> = {
625 let cache = self.cache.lock().expect("prefetch cache lock");
626 let in_flight = self.in_flight.lock().expect("prefetch in_flight lock");
627 page_indices
628 .into_iter()
629 .filter(|&p| !cache.has(p) && !in_flight.contains(&p))
630 .collect()
631 };
632
633 let ranges = clamp_ranges(coalesce_page_indices(&needed, gap, page_size), file_size);
634
635 for range in ranges {
636 self.fetch_range(data, &range, page_size, file_size);
637 }
638
639 let mut cache = self.cache.lock().expect("prefetch cache lock");
640 cache.pin_pages(&pinned);
641 cache.unpin_all();
642 }
643
644 fn fetch_range(&self, data: &[u8], range: &CoalescedRange, page_size: usize, file_size: usize) {
645 {
646 let mut in_flight = self.in_flight.lock().expect("prefetch in_flight lock");
647 for p in range.page_start..=range.page_end {
648 in_flight.insert(p);
649 }
650 }
651
652 self.fetches_issued.fetch_add(1, Ordering::Relaxed);
653
654 for p in range.page_start..=range.page_end {
655 let p_usize = p as usize;
656 let byte_start = p_usize * page_size;
657 if byte_start >= file_size {
658 self.in_flight
659 .lock()
660 .expect("prefetch in_flight lock")
661 .remove(&p);
662 continue;
663 }
664 let byte_end = ((p_usize + 1) * page_size).min(file_size);
665 let page_data = data[byte_start..byte_end].to_vec();
666 let mut cache = self.cache.lock().expect("prefetch cache lock");
667 cache.set(p, page_data, false);
668 self.in_flight
669 .lock()
670 .expect("prefetch in_flight lock")
671 .remove(&p);
672 }
673 }
674}
675
676impl Drop for PrefetchEngine {
677 fn drop(&mut self) {
678 self.eager.cancelled.store(true, Ordering::Release);
679 if let Some(handle) = self.eager.join.lock().expect("eager join").take() {
680 let _ = handle.join();
681 }
682 }
683}
684
685pub fn row_record_offset(data: &[u8], tail_start: usize, index: usize) -> Option<usize> {
686 let entry = tail_start.checked_add(index.checked_mul(10)?)?;
687 let end = entry.checked_add(10)?;
688 Some(u64::from_le_bytes(data.get(entry + 2..end)?.try_into().ok()?) as usize)
689}
690
691pub fn page_indices_for_viewport(
692 start_index: usize,
693 end_index: usize,
694 page_size: usize,
695 record_offset: impl Fn(usize) -> Option<usize>,
696) -> Vec<u32> {
697 let mut out = Vec::new();
698 for i in start_index..=end_index {
699 if let Some(off) = record_offset(i) {
700 out.push((off / page_size) as u32);
701 }
702 }
703 out.sort_unstable();
704 out.dedup();
705 out
706}
707
708#[cfg(test)]
709mod tests {
710 use super::*;
711 use crate::query::{Layout, Reader};
712 use crate::writer::{NxsWriter, Schema};
713
714 fn make_sparse_nxb(n: usize) -> Vec<u8> {
715 let schema = Schema::new(&["id", "payload"]);
716 let mut w = NxsWriter::new(&schema);
717 for i in 0..n {
718 w.begin_object();
719 w.write_i64(crate::writer::Slot(0), i as i64);
720 let pad = format!("record-{i:04}-{}", "x".repeat(4096 + (i % 7) * 512));
721 w.write_str(crate::writer::Slot(1), &pad);
722 w.end_object();
723 }
724 w.finish()
725 }
726
727 fn make_compact_nxb(n: usize) -> Vec<u8> {
728 let schema = Schema::new(&["id", "tag"]);
729 let mut w = NxsWriter::new(&schema);
730 for i in 0..n {
731 w.begin_object();
732 w.write_i64(crate::writer::Slot(0), i as i64);
733 w.write_str(crate::writer::Slot(1), &format!("r{i}"));
734 w.end_object();
735 }
736 w.finish()
737 }
738
739 #[test]
740 fn hint_full_small_file_eager_at_open() {
741 let data = make_compact_nxb(200);
742 assert!(data.len() <= EAGER_THRESHOLD_MB * 1024 * 1024);
743 let opts = OpenOptions::new().hint(AccessHint::Full);
744 let reader = Reader::with_options(&data, opts).unwrap();
745 reader.warmup();
746 assert_eq!(reader.cache_stats().strategy, "eager");
747 }
748
749 #[test]
750 fn sequential_upgrade_to_eager() {
751 let data = make_compact_nxb(200);
752 let reader = Reader::with_options(&data, OpenOptions::new()).unwrap();
753 for i in 0..150 {
754 let _ = reader.record(i);
755 }
756 reader.warmup();
757 assert_eq!(reader.cache_stats().strategy, "eager");
758 assert_eq!(reader.cache_stats().pattern, "sequential");
759 }
760
761 #[test]
762 fn pause_stops_speculative_prefetch() {
763 let opts = OpenOptions::new()
765 .page_size(4096)
766 .coalesce_gap_pages(0)
767 .prefetch_depth(4);
768 let data = make_sparse_nxb(50);
769 let reader = Reader::with_options(&data, opts).unwrap();
770 for i in 0..21 {
771 let _ = reader.record(i);
772 }
773 assert_eq!(reader.cache_stats().pattern, "sequential");
774 let before = reader.cache_stats().fetches_issued;
775 reader.pause_prefetch();
776 for i in 21..26 {
777 let _ = reader.record(i);
778 }
779 assert_eq!(reader.cache_stats().fetches_issued, before);
780 reader.resume_prefetch();
781 let _ = reader.record(26);
782 assert!(reader.cache_stats().fetches_issued > before);
783 }
784
785 #[test]
786 fn prefetch_cancel_conformance_vector() {
787 let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
788 .join("../conformance/prefetch/prefetch_cancel.nxb");
789 let data = match std::fs::read(&path) {
790 Ok(d) => d,
791 Err(_) => return,
792 };
793 let opts = OpenOptions::new().hint(AccessHint::Full);
794 let reader = Reader::with_options(&data, opts).unwrap();
795 let issued_before_close = reader.cache_stats().fetches_issued;
796 drop(reader);
797 assert!(issued_before_close <= 50);
798 }
799
800 #[test]
801 fn eager_cancel_on_close_no_extra_fetches() {
802 let data = make_compact_nxb(500);
803 let opts = OpenOptions::new().hint(AccessHint::Full);
804 let reader = Reader::with_options(&data, opts).unwrap();
805 let issued = reader.cache_stats().fetches_issued;
806 drop(reader);
807 assert!(issued <= 50);
808 }
809
810 #[test]
811 fn coalesce_adjacent_pages() {
812 let indices = vec![3, 4, 6, 7, 12];
813 let ranges = coalesce_page_indices(&indices, 1, DEFAULT_PAGE_SIZE);
814 assert_eq!(ranges.len(), 3);
815 }
816
817 #[test]
818 fn prefetch_viewport_populates_cache() {
819 let data = make_sparse_nxb(50);
820 let reader = Reader::with_options(&data, OpenOptions::new()).unwrap();
821 reader.prefetch_viewport(0, 49).unwrap();
822 assert!(reader.cache_stats().pages_cached > 0);
823 }
824
825 #[test]
826 fn prefetch_columnar_fast_path_conformance_vector() {
827 let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
828 .join("../conformance/prefetch/prefetch_columnar_fast_path.nxb");
829 let data = match std::fs::read(&path) {
830 Ok(d) => d,
831 Err(_) => return,
832 };
833 let reader = Reader::new(&data).unwrap();
834 assert_eq!(reader.layout(), Layout::Columnar);
835 reader.prefetch_column("score").unwrap();
836 assert_eq!(reader.cache_stats().column_fetches_issued, 1);
837 reader.prefetch_column("score").unwrap();
838 assert_eq!(reader.cache_stats().column_fetches_issued, 1);
839 let sum = reader.col_sum_f64("score").unwrap();
840 assert!((sum - 2475.0).abs() < 1e-9);
841 }
842
843 #[test]
844 fn prefetch_sequential_upgrade_conformance_vector() {
845 let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
846 .join("../conformance/prefetch/prefetch_sequential_upgrade.nxb");
847 let data = match std::fs::read(&path) {
848 Ok(d) => d,
849 Err(_) => return,
850 };
851 let reader = Reader::with_options(&data, OpenOptions::new()).unwrap();
852 for i in 0..150 {
853 let _ = reader.record(i);
854 }
855 reader.warmup();
856 let stats = reader.cache_stats();
857 assert_eq!(stats.strategy, "eager");
858 assert_eq!(stats.pattern, "sequential");
859 assert!(stats.fetches_issued >= 1);
860 }
861
862 #[test]
863 fn open_options_rejects_zero_page_size() {
864 let opts = OpenOptions::new().page_size(0);
865 assert!(opts.validate().is_err());
866 }
867}