1use std::collections::HashMap;
7use std::io;
8use std::sync::atomic::{AtomicU64 as Epoch, Ordering as O};
9use std::sync::{Arc, Mutex, MutexGuard};
10
11pub const WINDOW_FLOOR: u64 = 512 * 1024;
13pub const WINDOW_ABS_CAP: u64 = 8 * 1024 * 1024;
15
16const PER_STREAM_DIVISOR: u64 = 4;
18
19struct StreamEntry {
20 buf: Arc<Mutex<ReadAhead>>,
21 last_served: u64,
22}
23
24pub struct ReadAheadPool {
29 budget: u64,
31 charged: Epoch,
33 streams: Mutex<HashMap<usize, StreamEntry>>,
35 clock: Epoch,
37}
38
39impl ReadAheadPool {
40 pub fn new(budget: u64) -> Self {
41 ReadAheadPool {
42 budget,
43 charged: Epoch::new(0),
44 streams: Mutex::new(HashMap::new()),
45 clock: Epoch::new(0),
46 }
47 }
48
49 pub fn enabled(&self) -> bool {
50 self.budget > 0
51 }
52
53 pub fn per_stream_cap(&self) -> u64 {
55 if self.budget == 0 {
56 return 0;
57 }
58 (self.budget / PER_STREAM_DIVISOR).clamp(WINDOW_FLOOR, WINDOW_ABS_CAP)
59 }
60
61 pub fn register(&self, key: usize, buf: Arc<Mutex<ReadAhead>>) {
63 let last_served = self.clock.fetch_add(1, O::Relaxed);
64 self.streams
65 .lock()
66 .unwrap()
67 .insert(key, StreamEntry { buf, last_served });
68 }
69
70 pub fn deregister(&self, key: usize) {
77 let entry = self.streams.lock().unwrap().remove(&key);
78 let freed = match entry {
84 Some(e) => e.buf.lock().unwrap().clear(),
85 None => 0,
86 };
87 if freed > 0 {
88 self.charged.fetch_sub(freed, O::Relaxed);
89 }
90 }
91
92 pub fn touch(&self, key: usize) {
101 let stamp = self.clock.fetch_add(1, O::Relaxed);
102 if let Ok(mut g) = self.streams.try_lock()
103 && let Some(e) = g.get_mut(&key)
104 {
105 e.last_served = stamp;
106 }
107 }
108
109 pub fn permitted_window(&self, key: usize, old_len: u64, desired: u64) -> u64 {
115 if self.budget == 0 {
116 return 0;
117 }
118 let desired = desired.min(self.per_stream_cap()).max(old_len);
119 let need = desired - old_len;
120 loop {
121 let cur = self.charged.load(O::Relaxed);
122 let room = self.budget.saturating_sub(cur);
123 if room >= need {
124 return desired;
125 }
126 match self.evict_one_coldest(key) {
128 Some(_) => {}
129 None => {
130 return old_len + room;
132 }
133 }
134 }
135 }
136
137 pub fn reconcile(&self, old_len: u64, new_len: u64) {
140 if new_len > old_len {
141 self.charged.fetch_add(new_len - old_len, O::Relaxed);
142 } else if new_len < old_len {
143 self.charged.fetch_sub(old_len - new_len, O::Relaxed);
144 }
145 }
146
147 pub fn has_room_for(&self, need: u64) -> bool {
153 self.budget > 0 && self.budget.saturating_sub(self.charged.load(O::Relaxed)) >= need
154 }
155
156 pub fn charged(&self) -> u64 {
159 self.charged.load(O::Relaxed)
160 }
161
162 pub fn budget(&self) -> u64 {
165 self.budget
166 }
167
168 fn evict_one_coldest(&self, except: usize) -> Option<u64> {
172 let mut candidates: Vec<(usize, u64, Arc<Mutex<ReadAhead>>)> = {
176 let g = self.streams.lock().unwrap();
177 g.iter()
178 .filter(|(k, _)| **k != except)
179 .map(|(k, e)| (*k, e.last_served, Arc::clone(&e.buf)))
180 .collect()
181 };
182 candidates.sort_by_key(|(_, ls, _)| *ls); for (_, _, buf) in candidates {
184 if let Ok(mut ra) = buf.try_lock() {
185 let freed = ra.clear();
193 drop(ra);
194 if freed > 0 {
195 self.charged.fetch_sub(freed, O::Relaxed);
196 return Some(freed);
197 }
198 }
199 }
200 None
201 }
202}
203
204struct Window {
205 start: u64,
206 bytes: Vec<u8>,
207}
208
209pub struct ReadAhead {
210 windows: Vec<Window>,
211 cached_bytes: u64,
215 next_expected: u64,
216 window: u64,
217 cap: u64,
218 max_windows: usize,
219}
220
221impl ReadAhead {
222 pub fn new(cap: u64) -> Self {
223 ReadAhead {
224 windows: Vec::new(),
225 cached_bytes: 0,
226 next_expected: u64::MAX,
227 window: WINDOW_FLOOR,
228 cap,
229 max_windows: 1,
230 }
231 }
232 pub fn set_max_windows(&mut self, n: usize) {
233 self.max_windows = n.max(1);
234 }
235 pub fn next_expected(&self) -> u64 {
236 self.next_expected
237 }
238 pub fn window(&self) -> u64 {
241 self.window
242 }
243
244 #[allow(clippy::len_without_is_empty)]
245 pub fn len(&self) -> u64 {
246 self.cached_bytes
247 }
248 pub fn clear(&mut self) -> u64 {
249 let freed = self.cached_bytes;
250 self.windows.clear();
251 self.cached_bytes = 0;
252 self.window = WINDOW_FLOOR.min(self.cap);
253 self.next_expected = u64::MAX;
254 freed
255 }
256 pub fn set_cap(&mut self, cap: u64) {
257 self.cap = cap;
258 if self.window > self.cap {
259 self.window = self.cap;
260 }
261 }
262
263 pub fn covers(&self, off: u64, len: usize) -> bool {
264 let end = off.saturating_add(len as u64);
265 self.windows
266 .iter()
267 .any(|w| off >= w.start && end <= w.start + w.bytes.len() as u64)
268 }
269
270 fn window_containing(&self, off: u64, len: usize) -> Option<&Window> {
271 let end = off.saturating_add(len as u64);
272 self.windows
273 .iter()
274 .find(|w| off >= w.start && end <= w.start + w.bytes.len() as u64)
275 }
276
277 fn insert_window(&mut self, w: Window) {
278 let w_bytes = w.bytes.len() as u64;
279 match self.windows.binary_search_by_key(&w.start, |x| x.start) {
283 Ok(i) => {
284 self.cached_bytes -= self.windows[i].bytes.len() as u64;
285 self.cached_bytes += w_bytes;
286 self.windows[i] = w;
287 }
288 Err(i) => {
289 self.cached_bytes += w_bytes;
290 self.windows.insert(i, w);
291 }
292 }
293 while self.windows.len() > self.max_windows {
294 let frontier = self.next_expected;
295 let idx = self
300 .windows
301 .iter()
302 .position(|w| w.start + w.bytes.len() as u64 <= frontier)
303 .unwrap_or(0);
304 self.cached_bytes -= self.windows[idx].bytes.len() as u64;
305 self.windows.remove(idx);
306 }
307 }
308
309 pub fn store_window(&mut self, start: u64, bytes: Vec<u8>) -> (u64, u64) {
310 let old = self.len();
311 self.insert_window(Window { start, bytes });
312 (old, self.len())
313 }
314
315 pub fn read_into(
316 &mut self,
317 dst: &mut [u8],
318 off: u64,
319 backing_len: u64,
320 mut fill: impl FnMut(&mut [u8], u64) -> io::Result<()>,
321 ) -> io::Result<(u64, u64)> {
322 let len = dst.len();
323 if len == 0 {
324 let n = self.len();
325 return Ok((n, n));
326 }
327 if let Some(w) = self.window_containing(off, len) {
328 #[expect(clippy::cast_possible_truncation)]
329 let lo = (off - w.start) as usize;
330 dst.copy_from_slice(&w.bytes[lo..lo + len]);
331 self.next_expected = off + len as u64;
332 let n = self.len();
333 return Ok((n, n));
334 }
335 let old = self.len();
336 if off == self.next_expected {
337 self.window = self.window.saturating_mul(2).min(self.cap);
338 } else {
339 self.window = WINDOW_FLOOR.min(self.cap);
340 }
341 if off >= backing_len || off.saturating_add(len as u64) > backing_len {
346 return Err(io::Error::from(io::ErrorKind::UnexpectedEof));
347 }
348 let want = self
349 .window
350 .max(len as u64)
351 .min(self.cap.max(len as u64))
352 .min(backing_len.saturating_sub(off));
353 #[expect(clippy::cast_possible_truncation)]
354 let mut buf = vec![0u8; want as usize];
355 fill(&mut buf, off)?;
356 dst.copy_from_slice(&buf[..len]);
357 self.insert_window(Window {
358 start: off,
359 bytes: buf,
360 });
361 self.next_expected = off + len as u64;
362 Ok((old, self.len()))
363 }
364}
365
366fn lock_buf_or_clear<'a>(
375 buf: &'a Mutex<ReadAhead>,
376 pool: &ReadAheadPool,
377) -> MutexGuard<'a, ReadAhead> {
378 match buf.lock() {
379 Ok(g) => g,
380 Err(e) => {
381 log::error!("cleared poisoned read-ahead buffer lock");
382 buf.clear_poison();
383 let mut g = e.into_inner();
384 let freed = g.clear();
385 pool.reconcile(freed, 0);
386 g
387 }
388 }
389}
390
391pub fn try_store_prefetch(
396 pool: &ReadAheadPool,
397 buf: &Arc<Mutex<ReadAhead>>,
398 epoch: &Epoch,
399 dispatched_epoch: u64,
400 start: u64,
401 bytes: Vec<u8>,
402) -> bool {
403 let mut ra = lock_buf_or_clear(buf, pool);
404 if epoch.load(O::Acquire) != dispatched_epoch {
405 return false;
406 }
407 let (old, new) = ra.store_window(start, bytes);
408 drop(ra);
409 pool.reconcile(old, new);
410 true
411}
412
413pub fn prefetch_depth(cap: u64, window: u64) -> u64 {
420 (cap / window.max(1)).clamp(1, 4)
421}
422
423pub fn plan_prefetch(
435 prefetched_upto: u64,
436 start: u64,
437 window: u64,
438 depth: u64,
439 backing_len: u64,
440) -> (Vec<u64>, u64) {
441 if window == 0 || start >= backing_len {
442 return (Vec::new(), prefetched_upto);
443 }
444 let horizon = start
445 .saturating_add(depth.saturating_mul(window))
446 .min(backing_len);
447 let mut s = prefetched_upto;
448 if !(start..=horizon).contains(&s) {
451 s = start;
452 }
453 let mut starts = Vec::new();
454 while s < horizon {
455 starts.push(s);
456 s += window;
457 }
458 (starts, s.min(backing_len))
459}
460
461use std::cell::Cell;
462
463pub struct PrefetchContext {
468 pub file: Arc<std::fs::File>,
469 pub buf: Arc<Mutex<ReadAhead>>,
470 pub pool: Arc<ReadAheadPool>,
471 pub epoch: Arc<Epoch>,
472 pub dispatched_epoch: u64,
473 pub len: u64,
474 pub backing_len: u64,
475}
476
477pub struct PrefetchJob {
478 pub ctx: Arc<PrefetchContext>,
479 pub start: u64,
480}
481
482pub struct PrefetchWorkers {
483 tx: crossbeam_channel::Sender<PrefetchJob>,
484}
485
486impl PrefetchWorkers {
487 pub fn new(threads: usize) -> Self {
488 let (tx, rx) = crossbeam_channel::bounded::<PrefetchJob>(threads * 4);
492 for _ in 0..threads {
493 let rx = rx.clone();
494 std::thread::spawn(move || {
495 while let Ok(job) = rx.recv() {
496 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
502 Self::run_job(job);
503 }));
504 }
505 });
506 }
507 PrefetchWorkers { tx }
508 }
509
510 #[expect(clippy::needless_pass_by_value)]
511 pub fn run_job(job: PrefetchJob) {
512 use std::os::unix::fs::FileExt;
513 let ctx = &job.ctx;
514 if ctx.epoch.load(std::sync::atomic::Ordering::Acquire) != ctx.dispatched_epoch {
515 return;
516 }
517 let want = ctx.len.min(ctx.backing_len.saturating_sub(job.start));
518 if want == 0 {
519 return;
520 }
521 if !ctx.pool.has_room_for(want) {
524 return;
525 }
526 #[expect(clippy::cast_possible_truncation)]
527 let mut bytes = vec![0u8; want as usize];
528 if ctx.file.read_exact_at(&mut bytes, job.start).is_err() {
529 return;
530 }
531 let _ = try_store_prefetch(
532 &ctx.pool,
533 &ctx.buf,
534 &ctx.epoch,
535 ctx.dispatched_epoch,
536 job.start,
537 bytes,
538 );
539 }
540
541 pub fn request(&self, job: PrefetchJob) {
542 let _ = self.tx.try_send(job);
543 }
544}
545
546pub struct BackingReader<'a> {
547 file: &'a std::fs::File,
548 buf: &'a Arc<Mutex<ReadAhead>>,
549 pool: &'a ReadAheadPool,
550 key: usize,
551 backing_len: u64,
552 fills: Cell<u64>,
553 epoch: &'a std::sync::atomic::AtomicU64,
554 prefetch: bool,
558 planned_next_expected: Cell<u64>,
559 planned_window: Cell<u64>,
560}
561
562impl<'a> BackingReader<'a> {
563 pub fn new(
564 file: &'a std::fs::File,
565 buf: &'a Arc<Mutex<ReadAhead>>,
566 pool: &'a ReadAheadPool,
567 key: usize,
568 backing_len: u64,
569 epoch: &'a std::sync::atomic::AtomicU64,
570 ) -> Self {
571 BackingReader {
572 file,
573 buf,
574 pool,
575 key,
576 backing_len,
577 fills: Cell::new(0),
578 epoch,
579 prefetch: false,
580 planned_next_expected: Cell::new(u64::MAX),
581 planned_window: Cell::new(0),
582 }
583 }
584
585 #[must_use]
590 pub fn with_prefetch_planning(mut self) -> Self {
591 self.prefetch = true;
592 self
593 }
594
595 pub fn prefetch_plan(&self) -> (u64, u64) {
599 (self.planned_next_expected.get(), self.planned_window.get())
600 }
601
602 pub fn fills(&self) -> u64 {
603 self.fills.get()
604 }
605
606 pub fn file(&self) -> &std::fs::File {
607 self.file
608 }
609
610 pub fn read_exact_at(&self, dst: &mut [u8], abs_offset: u64) -> std::io::Result<()> {
611 if !self.pool.enabled() {
612 self.fills.set(self.fills.get() + 1);
613 crate::metrics::on_readahead_miss();
614 crate::metrics::on_pread(dst.len() as u64);
615 return crate::metrics::backing_read_exact_at(self.file, dst, abs_offset);
616 }
617 let mut ra = lock_buf_or_clear(self.buf, self.pool);
618 if ra.covers(abs_offset, dst.len()) {
619 crate::metrics::on_readahead_hit();
620 } else {
621 crate::metrics::on_readahead_miss();
622 if abs_offset != ra.next_expected() {
623 self.epoch.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
624 }
625 let cap = self
626 .pool
627 .permitted_window(self.key, ra.len(), self.pool.per_stream_cap());
628 ra.set_cap(cap);
629 }
630 let file = self.file;
631 let fills = &self.fills;
632 let (old_len, new_len) = ra.read_into(dst, abs_offset, self.backing_len, |b, o| {
633 fills.set(fills.get() + 1);
634 crate::metrics::on_pread(b.len() as u64);
635 crate::metrics::backing_read_exact_at(file, b, o)
636 })?;
637 if self.prefetch {
638 let window = ra.window();
642 let depth = prefetch_depth(self.pool.per_stream_cap(), window);
643 ra.set_max_windows(usize::try_from(depth).unwrap_or(4) + 1);
644 self.planned_next_expected.set(ra.next_expected());
645 self.planned_window.set(window);
646 }
647 self.pool.reconcile(old_len, new_len);
648 drop(ra);
649 self.pool.touch(self.key);
650 Ok(())
651 }
652}
653
654#[cfg(test)]
655mod window_tests {
656 use super::*;
657
658 struct Fake {
661 data: Vec<u8>,
662 reads: Vec<(u64, usize)>,
663 }
664 impl Fake {
665 fn new(len: usize) -> Self {
666 #[expect(clippy::cast_possible_truncation)]
667 let data = (0..len).map(|i| (i % 251) as u8).collect();
668 Fake {
669 data,
670 reads: Vec::new(),
671 }
672 }
673 #[expect(clippy::unnecessary_wraps)]
674 fn fill(&mut self, buf: &mut [u8], off: u64) -> io::Result<()> {
675 self.reads.push((off, buf.len()));
676 #[expect(clippy::cast_possible_truncation)]
677 let o = off as usize;
678 buf.copy_from_slice(&self.data[o..o + buf.len()]);
679 Ok(())
680 }
681 }
682
683 fn serve(ra: &mut ReadAhead, fake: &mut Fake, off: u64, len: usize) -> Vec<u8> {
684 let mut dst = vec![0u8; len];
685 let backing_len = fake.data.len() as u64;
686 ra.read_into(&mut dst, off, backing_len, |b, o| fake.fill(b, o))
687 .unwrap();
688 dst
689 }
690
691 #[test]
692 fn first_read_misses_then_sequential_reads_hit() {
693 let mut fake = Fake::new(4 * 1024 * 1024);
694 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
695 let a = serve(&mut ra, &mut fake, 0, 64 * 1024);
697 assert_eq!(a, fake.data[0..64 * 1024]);
698 assert_eq!(fake.reads.len(), 1, "first read must fill once");
699 let b = serve(&mut ra, &mut fake, 64 * 1024, 64 * 1024);
701 assert_eq!(b, fake.data[64 * 1024..128 * 1024]);
702 assert_eq!(fake.reads.len(), 1, "sequential hit must not pread");
703 }
704
705 #[test]
706 fn sequential_miss_grows_window_geometrically() {
707 let mut fake = Fake::new(16 * 1024 * 1024);
708 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
709 #[expect(clippy::cast_possible_truncation)]
711 let floor = WINDOW_FLOOR as usize;
712 serve(&mut ra, &mut fake, 0, floor); serve(&mut ra, &mut fake, WINDOW_FLOOR, floor); let second_fill_len = fake.reads[1].1 as u64;
716 assert!(
717 second_fill_len > WINDOW_FLOOR,
718 "window must grow on sequential miss"
719 );
720 assert!(second_fill_len <= WINDOW_ABS_CAP, "window capped");
721 }
722
723 #[test]
724 fn seek_resets_window_to_floor() {
725 let mut fake = Fake::new(16 * 1024 * 1024);
726 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
727 #[expect(clippy::cast_possible_truncation)]
728 serve(&mut ra, &mut fake, 0, WINDOW_FLOOR as usize);
729 #[expect(clippy::cast_possible_truncation)]
730 serve(&mut ra, &mut fake, WINDOW_FLOOR, WINDOW_FLOOR as usize); serve(&mut ra, &mut fake, 12 * 1024 * 1024, 4096);
733 let seek_fill_len = fake.reads.last().unwrap().1 as u64;
734 assert_eq!(seek_fill_len, WINDOW_FLOOR, "seek resets to floor");
735 }
736
737 #[test]
738 fn window_clamps_to_backing_len_at_eof() {
739 let mut fake = Fake::new(700 * 1024); let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
741 let out = serve(&mut ra, &mut fake, 680 * 1024, 20 * 1024);
743 assert_eq!(out, fake.data[680 * 1024..700 * 1024]);
744 let (off, len) = fake.reads[0];
745 assert!(
746 off + len as u64 <= 700 * 1024,
747 "fill must not read past EOF"
748 );
749 }
750
751 #[test]
752 fn read_straddling_eof_errors_not_panics() {
753 let mut fake = Fake::new(1000);
754 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
755 let backing_len = fake.data.len() as u64;
756 let mut dst = vec![0u8; 20];
758 let err = ra
759 .read_into(&mut dst, 990, backing_len, |b, o| fake.fill(b, o))
760 .expect_err("a read past EOF must fail, not panic");
761 assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
762 let ok = ra.read_into(&mut dst, 980, backing_len, |b, o| fake.fill(b, o));
764 assert!(ok.is_ok(), "off+len == backing_len is in range");
765 }
766}
767
768#[cfg(test)]
769mod pool_budget_tests {
770 use super::*;
771 use std::sync::{Arc, Mutex};
772
773 #[test]
774 fn disabled_pool_grants_nothing_and_reports_disabled() {
775 let pool = ReadAheadPool::new(0);
776 assert!(!pool.enabled());
777 assert_eq!(pool.per_stream_cap(), 0);
778 }
779
780 #[test]
781 fn per_stream_cap_is_budget_over_divisor_capped_by_abs() {
782 let pool = ReadAheadPool::new(16 * 1024 * 1024);
784 assert_eq!(pool.per_stream_cap(), 4 * 1024 * 1024);
785 let big = ReadAheadPool::new(1024 * 1024 * 1024);
787 assert_eq!(big.per_stream_cap(), WINDOW_ABS_CAP);
788 }
789
790 #[test]
791 fn poisoned_buffer_lock_recovers_and_uncharges() {
792 let pool = ReadAheadPool::new(WINDOW_ABS_CAP);
793 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
794 let (old, new) = buf.lock().unwrap().store_window(0, vec![0u8; 4096]);
795 pool.reconcile(old, new);
796 assert_eq!(pool.charged(), 4096);
797
798 let b2 = Arc::clone(&buf);
801 let _ = std::thread::spawn(move || {
802 let _g = b2.lock().unwrap();
803 panic!("poison the read-ahead buffer");
804 })
805 .join();
806 assert!(buf.is_poisoned());
807
808 assert_eq!(lock_buf_or_clear(&buf, &pool).len(), 0);
811 assert!(!buf.is_poisoned(), "poison cleared after recovery");
812 assert_eq!(pool.charged(), 0, "cleared bytes uncharged from the budget");
813 }
814
815 #[test]
816 fn permitted_window_grants_up_to_budget_then_clamps() {
817 let pool = ReadAheadPool::new(4 * 1024 * 1024);
818 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
819 pool.register(1, Arc::clone(&buf));
820 assert_eq!(pool.permitted_window(1, 0, 1024 * 1024), 1024 * 1024);
822 pool.reconcile(0, 1024 * 1024);
823 assert_eq!(
826 pool.permitted_window(1, 1024 * 1024, 8 * 1024 * 1024),
827 1024 * 1024
828 );
829 }
830}
831
832#[cfg(test)]
833mod eviction_tests {
834 use super::*;
835 use std::sync::{Arc, Mutex};
836
837 fn register_filled(pool: &ReadAheadPool, key: usize, bytes: usize) -> Arc<Mutex<ReadAhead>> {
842 let arc = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
843 let data = vec![7u8; bytes * 2];
844 let mut dst = vec![0u8; bytes];
845 let (old, new) = arc
846 .lock()
847 .unwrap()
848 .read_into(&mut dst, 0, (bytes * 2) as u64, |b, _| {
849 b.copy_from_slice(&data[..b.len()]);
850 Ok(())
851 })
852 .unwrap();
853 pool.register(key, Arc::clone(&arc));
854 pool.reconcile(old, new);
855 arc
856 }
857
858 #[test]
859 fn permitted_window_evicts_coldest_other_stream_under_pressure() {
860 let pool = ReadAheadPool::new(4 * 1024 * 1024);
864 let mib = 1024 * 1024usize;
865 let cold = register_filled(&pool, 1, mib);
866 register_filled(&pool, 2, mib);
867 register_filled(&pool, 3, mib);
868 register_filled(&pool, 4, mib);
869 let hot = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
871 pool.register(5, Arc::clone(&hot));
872 let granted = pool.permitted_window(5, 0, pool.per_stream_cap());
873 assert_eq!(granted, mib as u64, "eviction frees room for the full cap");
874 assert_eq!(cold.lock().unwrap().len(), 0, "coldest stream was evicted");
875 }
876
877 #[test]
878 fn locked_victim_is_skipped_not_blocked() {
879 let pool = ReadAheadPool::new(4 * 1024 * 1024);
880 let mib = 1024 * 1024;
881 let victim = register_filled(&pool, 1, mib);
882 register_filled(&pool, 2, mib);
883 register_filled(&pool, 3, mib);
884 register_filled(&pool, 4, mib);
885 let _held = victim.lock().unwrap();
887 let hot = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
888 pool.register(5, Arc::clone(&hot));
889 let granted = pool.permitted_window(5, 0, pool.per_stream_cap());
891 assert!(granted > 0 && granted <= pool.per_stream_cap());
892 }
893
894 #[cfg(test)]
895 mod backing_reader_tests {
896 use super::*;
897 use std::io::Write;
898 use std::os::unix::fs::FileExt;
899 use std::sync::{Arc, Mutex};
900
901 #[expect(clippy::cast_possible_truncation)]
902 fn temp_file(len: usize) -> (tempfile::TempDir, std::fs::File, Vec<u8>) {
903 let dir = tempfile::tempdir().unwrap();
904 let path = dir.path().join("backing.bin");
905 let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();
906 std::fs::File::create(&path)
907 .unwrap()
908 .write_all(&data)
909 .unwrap();
910 let f = std::fs::File::open(&path).unwrap();
911 (dir, f, data)
912 }
913
914 #[test]
915 fn sequential_reads_collapse_to_one_pread_per_window() {
916 let (_d, file, data) = temp_file(4 * 1024 * 1024);
917 let pool = ReadAheadPool::new(64 * 1024 * 1024);
918 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
919 pool.register(1, Arc::clone(&buf));
920 let backing_len = data.len() as u64;
921 let epoch = std::sync::atomic::AtomicU64::new(0);
922 let br = BackingReader::new(&file, &buf, &pool, 1, backing_len, &epoch);
923 let mut out = vec![0u8; 64 * 1024];
924 #[expect(clippy::cast_possible_truncation)]
925 for chunk in 0..16u64 {
926 br.read_exact_at(&mut out, chunk * 64 * 1024).unwrap();
927 assert_eq!(out, data[(chunk * 64 * 1024) as usize..][..64 * 1024]);
928 }
929 assert!(
930 br.fills() < 16,
931 "read-ahead must collapse preads, got {}",
932 br.fills()
933 );
934 }
935
936 #[test]
937 fn bytes_match_direct_pread_for_random_access() {
938 let (_d, file, data) = temp_file(2 * 1024 * 1024);
939 let pool = ReadAheadPool::new(64 * 1024 * 1024);
940 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
941 pool.register(1, Arc::clone(&buf));
942 let epoch = std::sync::atomic::AtomicU64::new(0);
943 let br = BackingReader::new(&file, &buf, &pool, 1, data.len() as u64, &epoch);
944 for &(off, len) in &[
945 (0u64, 100usize),
946 (1_000_000, 4096),
947 (5000, 700),
948 (2_097_000, 152),
949 ] {
950 let mut a = vec![0u8; len];
951 br.read_exact_at(&mut a, off).unwrap();
952 let mut b = vec![0u8; len];
953 file.read_exact_at(&mut b, off).unwrap();
954 assert_eq!(a, b, "read-ahead byte mismatch at {off}+{len}");
955 }
956 }
957 }
958}
959
960#[cfg(test)]
961mod ring_tests {
962 use super::*;
963
964 #[test]
965 fn default_ring_holds_one_window_like_phase1() {
966 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
967 let data = (0..2 * 1024 * 1024u64)
968 .map(|i| (i % 251) as u8)
969 .collect::<Vec<u8>>();
970 let blen = data.len() as u64;
971 let mut dst = vec![0u8; 4096];
972 ra.read_into(&mut dst, 0, blen, |b, o| {
973 #[expect(clippy::cast_possible_truncation)]
974 let o = o as usize;
975 b.copy_from_slice(&data[o..][..b.len()]);
976 Ok(())
977 })
978 .unwrap();
979 ra.read_into(&mut dst, 1_000_000, blen, |b, o| {
980 #[expect(clippy::cast_possible_truncation)]
981 let o = o as usize;
982 b.copy_from_slice(&data[o..][..b.len()]);
983 Ok(())
984 })
985 .unwrap();
986 assert!(
987 !ra.covers(0, 4096),
988 "single-window ring evicts the old window"
989 );
990 assert!(ra.covers(1_000_000, 4096));
991 }
992
993 #[test]
994 fn ring_of_two_keeps_current_and_prefetched_window() {
995 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
996 ra.set_max_windows(2);
997 ra.store_window(1024 * 1024, vec![9u8; 512 * 1024]);
998 let mut dst = vec![0u8; 4096];
999 ra.read_into(&mut dst, 0, 4 * 1024 * 1024, |b, _| {
1000 b.fill(1u8);
1001 Ok(())
1002 })
1003 .unwrap();
1004 assert!(ra.covers(0, 4096), "current window present");
1005 assert!(
1006 ra.covers(1024 * 1024, 4096),
1007 "prefetched window NOT clobbered"
1008 );
1009 }
1010
1011 #[test]
1016 fn prefetch_depth_tracks_window_and_bounds_inflight() {
1017 let cap = 8 * 1024 * 1024;
1018 assert_eq!(prefetch_depth(cap, WINDOW_FLOOR), 4);
1021 assert_eq!(prefetch_depth(cap, 2 * 1024 * 1024), 4);
1022 assert_eq!(prefetch_depth(cap, 4 * 1024 * 1024), 2);
1023 assert_eq!(prefetch_depth(cap, cap), 1);
1025 assert_eq!(prefetch_depth(cap, 0), 4);
1027 assert_eq!(prefetch_depth(cap, cap * 2), 1);
1028 for w in [WINDOW_FLOOR, 1 << 20, 2 << 20, 4 << 20, cap] {
1030 assert!(prefetch_depth(cap, w) * w <= cap, "overshoot at window {w}");
1031 }
1032 }
1033
1034 #[test]
1038 fn plan_prefetch_dedups_sequential_dispatch() {
1039 let win = WINDOW_FLOOR;
1040 let cap = 8 * 1024 * 1024;
1041 let depth = prefetch_depth(cap, win); let blen = 100 * 1024 * 1024;
1043 let (s1, w1) = plan_prefetch(0, win, win, depth, blen);
1045 assert_eq!(s1, vec![win, 2 * win, 3 * win, 4 * win]);
1046 assert_eq!(w1, 5 * win);
1047 let start2 = win + win / 2;
1050 let (s2, w2) = plan_prefetch(w1, start2, win, depth, blen);
1051 assert_eq!(s2, vec![5 * win], "must not re-dispatch buffered windows");
1052 assert_eq!(w2, 6 * win);
1053 }
1054
1055 #[test]
1056 fn plan_prefetch_seek_resets_watermark() {
1057 let win = WINDOW_FLOOR;
1058 let depth = 4;
1059 let blen = 100 * 1024 * 1024;
1060 let (_s, w) = plan_prefetch(0, 10 * win, win, depth, blen);
1061 let (s2, w2) = plan_prefetch(w, 2 * win, win, depth, blen);
1063 assert_eq!(s2.first(), Some(&(2 * win)));
1064 assert_eq!(w2, 6 * win);
1065 }
1066
1067 #[test]
1068 fn plan_prefetch_clamps_to_backing_len() {
1069 let win = WINDOW_FLOOR;
1070 let blen = 3 * win + 100;
1071 let (s, w) = plan_prefetch(0, win, win, 4, blen);
1072 assert!(s.iter().all(|&x| x < blen), "no job starts past EOF");
1073 assert!(w <= blen, "watermark clamped to EOF");
1074 }
1075
1076 #[test]
1077 fn len_sums_all_windows_and_clear_drops_all() {
1078 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1079 ra.set_max_windows(3);
1080 ra.store_window(0, vec![0u8; 100]);
1081 ra.store_window(1000, vec![0u8; 200]);
1082 assert_eq!(ra.len(), 300);
1083 assert_eq!(ra.clear(), 300);
1084 assert_eq!(ra.len(), 0);
1085 }
1086}
1087
1088#[cfg(test)]
1089mod prefetch_store_tests {
1090 use super::*;
1091 use std::sync::atomic::AtomicU64;
1092
1093 #[test]
1094 fn store_with_stale_epoch_is_discarded() {
1095 let pool = ReadAheadPool::new(64 * 1024 * 1024);
1096 let ra = Arc::new(Mutex::new(ReadAhead::new(WINDOW_ABS_CAP)));
1097 let epoch = AtomicU64::new(0);
1098 epoch.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
1099 assert!(!try_store_prefetch(&pool, &ra, &epoch, 0, 0, vec![1, 2, 3]));
1100 assert_eq!(ra.lock().unwrap().len(), 0);
1101 }
1102
1103 #[test]
1104 fn store_with_current_epoch_is_accepted_and_charges_budget() {
1105 let pool = ReadAheadPool::new(64 * 1024 * 1024);
1106 let ra = Arc::new(Mutex::new(ReadAhead::new(WINDOW_ABS_CAP)));
1107 ra.lock().unwrap().set_max_windows(2);
1108 let epoch = AtomicU64::new(5);
1109 assert!(try_store_prefetch(
1110 &pool,
1111 &ra,
1112 &epoch,
1113 5,
1114 1000,
1115 vec![0u8; 4096]
1116 ));
1117 assert!(ra.lock().unwrap().covers(1000, 4096));
1118 assert_eq!(pool.charged(), 4096);
1120 }
1121
1122 #[test]
1123 fn pool_reports_its_budget() {
1124 assert_eq!(ReadAheadPool::new(64 * 1024).budget(), 64 * 1024);
1125 assert_eq!(ReadAheadPool::new(0).budget(), 0);
1126 assert_eq!(ReadAheadPool::new(0).charged(), 0);
1127 }
1128
1129 #[test]
1133 #[expect(clippy::cast_possible_truncation)]
1134 fn prefetch_charge_survives_release_without_underflow() {
1135 let pool = ReadAheadPool::new(2 * 1024 * 1024); let cap = pool.per_stream_cap();
1137 let buf = Arc::new(Mutex::new(ReadAhead::new(cap)));
1138 buf.lock().unwrap().set_max_windows(2);
1139 pool.register(1, Arc::clone(&buf));
1140 let mut dst = vec![0u8; cap as usize];
1142 let (o, n) = buf
1143 .lock()
1144 .unwrap()
1145 .read_into(&mut dst, 0, 8 * 1024 * 1024, |b, _| {
1146 b.fill(1);
1147 Ok(())
1148 })
1149 .unwrap();
1150 pool.reconcile(o, n);
1151 let epoch = Epoch::new(0);
1153 assert!(try_store_prefetch(
1154 &pool,
1155 &buf,
1156 &epoch,
1157 0,
1158 cap,
1159 vec![2u8; cap as usize]
1160 ));
1161 assert_eq!(pool.charged(), 2 * cap);
1162 pool.deregister(1);
1163 assert_eq!(pool.charged(), 0, "release must not underflow");
1164 let hot = Arc::new(Mutex::new(ReadAhead::new(cap)));
1166 pool.register(2, Arc::clone(&hot));
1167 assert_eq!(pool.permitted_window(2, 0, cap), cap);
1168 }
1169
1170 #[test]
1173 fn prefetch_declines_when_budget_full() {
1174 let pool = ReadAheadPool::new(1024 * 1024);
1175 assert!(pool.has_room_for(512 * 1024));
1176 pool.reconcile(0, 1024 * 1024); assert!(!pool.has_room_for(1));
1178 }
1179}
1180
1181#[cfg(test)]
1182mod prefetch_worker_tests {
1183 use super::*;
1184 use std::io::Write;
1185 use std::sync::{Arc, Mutex};
1186
1187 #[test]
1188 fn prefetch_fills_next_window_for_a_sequential_stream() {
1189 let dir = tempfile::tempdir().unwrap();
1190 let path = dir.path().join("b.bin");
1191 let data: Vec<u8> = (0u64..8 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1192 std::fs::File::create(&path)
1193 .unwrap()
1194 .write_all(&data)
1195 .unwrap();
1196 let file = Arc::new(std::fs::File::open(&path).unwrap());
1197
1198 let pool = Arc::new(ReadAheadPool::new(64 * 1024 * 1024));
1199 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1200 let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1201 pool.register(1, Arc::clone(&buf));
1202
1203 PrefetchWorkers::run_job(PrefetchJob {
1204 ctx: Arc::new(PrefetchContext {
1205 file: Arc::clone(&file),
1206 buf: Arc::clone(&buf),
1207 pool: Arc::clone(&pool),
1208 epoch: Arc::clone(&epoch),
1209 dispatched_epoch: 0,
1210 len: 1024 * 1024,
1211 backing_len: data.len() as u64,
1212 }),
1213 start: 1024 * 1024,
1214 });
1215 let mut out = vec![0u8; 4096];
1216 let mut ra = buf.lock().unwrap();
1217 let mut fills = 0;
1218 ra.read_into(&mut out, 1024 * 1024, data.len() as u64, |_, _| {
1219 fills += 1;
1220 Ok(())
1221 })
1222 .unwrap();
1223 assert_eq!(fills, 0, "prefetched window should serve without a pread");
1224 assert_eq!(out, data[1024 * 1024..1024 * 1024 + 4096]);
1225 }
1226
1227 #[test]
1231 fn run_job_recovers_a_poisoned_buffer_lock() {
1232 let dir = tempfile::tempdir().unwrap();
1233 let path = dir.path().join("p.bin");
1234 let data: Vec<u8> = (0u64..8 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1235 std::fs::File::create(&path)
1236 .unwrap()
1237 .write_all(&data)
1238 .unwrap();
1239 let file = Arc::new(std::fs::File::open(&path).unwrap());
1240
1241 let pool = Arc::new(ReadAheadPool::new(64 * 1024 * 1024));
1242 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1243 let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1244 pool.register(1, Arc::clone(&buf));
1245
1246 let b2 = Arc::clone(&buf);
1247 let _ = std::thread::spawn(move || {
1248 let _g = b2.lock().unwrap();
1249 panic!("poison the read-ahead buffer");
1250 })
1251 .join();
1252 assert!(buf.is_poisoned());
1253
1254 PrefetchWorkers::run_job(PrefetchJob {
1256 ctx: Arc::new(PrefetchContext {
1257 file: Arc::clone(&file),
1258 buf: Arc::clone(&buf),
1259 pool: Arc::clone(&pool),
1260 epoch: Arc::clone(&epoch),
1261 dispatched_epoch: 0,
1262 len: 1024 * 1024,
1263 backing_len: data.len() as u64,
1264 }),
1265 start: 1024 * 1024,
1266 });
1267 assert!(!buf.is_poisoned(), "poison cleared by the recovering lock");
1268 assert!(pool.charged() > 0, "window stored after recovery");
1269 }
1270
1271 #[test]
1275 fn dispatched_job_reaches_a_worker_and_fills_the_buffer() {
1276 let dir = tempfile::tempdir().unwrap();
1277 let path = dir.path().join("w.bin");
1278 let data: Vec<u8> = (0u64..8 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1279 std::fs::File::create(&path)
1280 .unwrap()
1281 .write_all(&data)
1282 .unwrap();
1283 let file = Arc::new(std::fs::File::open(&path).unwrap());
1284
1285 let pool = Arc::new(ReadAheadPool::new(64 * 1024 * 1024));
1286 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1287 let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1288 pool.register(1, Arc::clone(&buf));
1289
1290 let workers = PrefetchWorkers::new(2);
1291 workers.request(PrefetchJob {
1292 ctx: Arc::new(PrefetchContext {
1293 file: Arc::clone(&file),
1294 buf: Arc::clone(&buf),
1295 pool: Arc::clone(&pool),
1296 epoch: Arc::clone(&epoch),
1297 dispatched_epoch: 0,
1298 len: 1024 * 1024,
1299 backing_len: data.len() as u64,
1300 }),
1301 start: 1024 * 1024,
1302 });
1303
1304 let step = std::time::Duration::from_millis(5);
1307 let mut waited = std::time::Duration::ZERO;
1308 while pool.charged() == 0 && waited < std::time::Duration::from_secs(5) {
1309 std::thread::sleep(step);
1310 waited += step;
1311 }
1312 assert!(pool.charged() > 0, "dispatched job never reached a worker");
1313
1314 let mut out = vec![0u8; 4096];
1315 let mut ra = buf.lock().unwrap();
1316 let mut fills = 0;
1317 ra.read_into(&mut out, 1024 * 1024, data.len() as u64, |_, _| {
1318 fills += 1;
1319 Ok(())
1320 })
1321 .unwrap();
1322 assert_eq!(
1323 fills, 0,
1324 "worker-prefetched window should serve without a pread"
1325 );
1326 assert_eq!(out, data[1024 * 1024..1024 * 1024 + 4096]);
1327 }
1328}
1329
1330#[cfg(test)]
1331mod concurrency_tests {
1332 use super::*;
1333 use std::io::Write;
1334 use std::os::unix::fs::FileExt;
1335 use std::sync::Arc;
1336
1337 #[test]
1338 fn concurrent_reads_same_handle_match_oracle() {
1339 let dir = tempfile::tempdir().unwrap();
1340 let path = dir.path().join("concurrent.bin");
1341 #[expect(clippy::cast_sign_loss)]
1342 let data: Vec<u8> = (0..4 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1343 std::fs::File::create(&path)
1344 .unwrap()
1345 .write_all(&data)
1346 .unwrap();
1347 let file = Arc::new(std::fs::File::open(&path).unwrap());
1348
1349 let pool = Arc::new(ReadAheadPool::new(64 * 1024 * 1024));
1350 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1351 let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1352 pool.register(1, Arc::clone(&buf));
1353
1354 let backing_len = data.len() as u64;
1355 let num_threads: u64 = 8;
1356 let reads_per_thread = 200;
1357
1358 std::thread::scope(|s| {
1359 for tid in 0..num_threads {
1360 let file = Arc::clone(&file);
1361 let buf = Arc::clone(&buf);
1362 let pool = Arc::clone(&pool);
1363 let epoch = Arc::clone(&epoch);
1364 s.spawn(move || {
1365 let br = BackingReader::new(&file, &buf, &pool, 1, backing_len, &epoch);
1366 let mut rng_state: u64 = tid * 7919;
1367 for _ in 0..reads_per_thread {
1368 rng_state = rng_state
1369 .wrapping_mul(6_364_136_223_846_793_005)
1370 .wrapping_add(1);
1371 let off = rng_state % (backing_len.saturating_sub(4096).max(1));
1372 #[expect(clippy::cast_possible_truncation)]
1373 let len = 4096usize.min((backing_len - off) as usize);
1374 let mut got = vec![0u8; len];
1375 br.read_exact_at(&mut got, off).unwrap();
1376 let mut expected = vec![0u8; len];
1377 file.read_exact_at(&mut expected, off).unwrap();
1378 assert_eq!(
1379 got, expected,
1380 "mismatch at off={off} len={len} from thread {tid}"
1381 );
1382 }
1383 });
1384 }
1385 });
1386 }
1387
1388 #[test]
1394 #[expect(clippy::cast_possible_truncation)]
1395 fn workers_and_eviction_race_reads_without_corruption() {
1396 let dir = tempfile::tempdir().unwrap();
1397 let path = dir.path().join("race.bin");
1398 let data: Vec<u8> = (0u64..4 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1399 std::fs::File::create(&path)
1400 .unwrap()
1401 .write_all(&data)
1402 .unwrap();
1403 let file = Arc::new(std::fs::File::open(&path).unwrap());
1404 let backing_len = data.len() as u64;
1405
1406 let pool = Arc::new(ReadAheadPool::new(2 * 1024 * 1024));
1409 let window = pool.per_stream_cap();
1410 let buf1 = Arc::new(Mutex::new(ReadAhead::new(window)));
1411 buf1.lock().unwrap().set_max_windows(4);
1412 let buf2 = Arc::new(Mutex::new(ReadAhead::new(window)));
1413 let epoch1 = Arc::new(std::sync::atomic::AtomicU64::new(0));
1414 let epoch2 = Arc::new(std::sync::atomic::AtomicU64::new(0));
1415 pool.register(1, Arc::clone(&buf1));
1416 pool.register(2, Arc::clone(&buf2));
1417
1418 std::thread::scope(|s| {
1419 {
1422 let (file, buf1, pool, epoch1, data) = (&file, &buf1, &pool, &epoch1, &data);
1423 s.spawn(move || {
1424 let br = BackingReader::new(file, buf1, pool, 1, backing_len, epoch1);
1425 let mut off = 0u64;
1426 while off < backing_len {
1427 let len = (64 * 1024).min((backing_len - off) as usize);
1428 let mut got = vec![0u8; len];
1429 br.read_exact_at(&mut got, off).unwrap();
1430 assert_eq!(got, data[off as usize..off as usize + len]);
1431 off += len as u64;
1432 }
1433 });
1434 }
1435 for _ in 0..2 {
1438 let (file, buf1, pool, epoch1) = (&file, &buf1, &pool, &epoch1);
1439 s.spawn(move || {
1440 for _ in 0..4 {
1441 let mut start = 0u64;
1442 while start < backing_len {
1443 let ctx = Arc::new(PrefetchContext {
1444 file: Arc::clone(file),
1445 buf: Arc::clone(buf1),
1446 pool: Arc::clone(pool),
1447 epoch: Arc::clone(epoch1),
1448 dispatched_epoch: epoch1.load(std::sync::atomic::Ordering::Acquire),
1449 len: window,
1450 backing_len,
1451 });
1452 PrefetchWorkers::run_job(PrefetchJob { ctx, start });
1453 start += window;
1454 }
1455 }
1456 });
1457 }
1458 for tid in 0..4u64 {
1461 let (file, buf2, pool, epoch2, data) = (&file, &buf2, &pool, &epoch2, &data);
1462 s.spawn(move || {
1463 let br = BackingReader::new(file, buf2, pool, 2, backing_len, epoch2);
1464 let mut rng: u64 = tid * 7919 + 1;
1465 for _ in 0..300 {
1466 rng = rng.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
1467 let off = rng % backing_len.saturating_sub(4096).max(1);
1468 let len = 4096usize.min((backing_len - off) as usize);
1469 let mut got = vec![0u8; len];
1470 br.read_exact_at(&mut got, off).unwrap();
1471 assert_eq!(got, data[off as usize..off as usize + len]);
1472 }
1473 });
1474 }
1475 });
1476 }
1477
1478 #[test]
1485 #[expect(clippy::cast_possible_truncation)]
1486 fn deregister_races_reads_without_deadlock() {
1487 let dir = tempfile::tempdir().unwrap();
1488 let path = dir.path().join("dereg.bin");
1489 let data: Vec<u8> = (0u64..2 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1490 std::fs::File::create(&path)
1491 .unwrap()
1492 .write_all(&data)
1493 .unwrap();
1494 let file = Arc::new(std::fs::File::open(&path).unwrap());
1495 let backing_len = data.len() as u64;
1496
1497 let pool = Arc::new(ReadAheadPool::new(1024 * 1024));
1500 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1501 let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1502 pool.register(1, Arc::clone(&buf));
1503 let buf2 = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1505 pool.register(2, Arc::clone(&buf2));
1506
1507 std::thread::scope(|s| {
1508 for tid in 0..4u64 {
1509 let (file, buf, pool, epoch, data) = (&file, &buf, &pool, &epoch, &data);
1510 s.spawn(move || {
1511 let br = BackingReader::new(file, buf, pool, 1, backing_len, epoch);
1512 let mut rng = tid * 7919 + 1;
1513 for _ in 0..300 {
1514 rng = rng.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
1515 let off = rng % backing_len.saturating_sub(4096).max(1);
1516 let len = 4096usize.min((backing_len - off) as usize);
1517 let mut got = vec![0u8; len];
1518 br.read_exact_at(&mut got, off).unwrap();
1519 assert_eq!(got, data[off as usize..off as usize + len]);
1520 }
1521 });
1522 }
1523 {
1526 let (pool, buf) = (&pool, &buf);
1527 s.spawn(move || {
1528 for _ in 0..2000 {
1529 pool.deregister(1);
1530 pool.register(1, Arc::clone(buf));
1531 }
1532 });
1533 }
1534 });
1535 }
1536
1537 #[test]
1545 #[expect(clippy::cast_possible_truncation)]
1546 fn eviction_racing_deregister_does_not_leak_charged() {
1547 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1548 const N: u64 = 64 * 1024;
1549 let pool = Arc::new(ReadAheadPool::new(8 * 1024 * 1024));
1550 let victim_key = 2usize;
1551 let stop = Arc::new(AtomicBool::new(false));
1552 let cycles = Arc::new(AtomicU64::new(0));
1553
1554 std::thread::scope(|s| {
1555 {
1558 let (pool, stop, cycles) = (&pool, &stop, &cycles);
1559 s.spawn(move || {
1560 while !stop.load(Ordering::Relaxed) {
1561 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1562 buf.lock().unwrap().store_window(0, vec![0u8; N as usize]);
1563 pool.reconcile(0, N);
1564 pool.register(victim_key, Arc::clone(&buf));
1565 pool.deregister(victim_key);
1566 cycles.fetch_add(1, Ordering::Relaxed);
1567 }
1568 });
1569 }
1570 for _ in 0..2 {
1572 let (pool, stop, cycles) = (&pool, &stop, &cycles);
1573 s.spawn(move || {
1574 while cycles.load(Ordering::Relaxed) < 200_000 {
1575 pool.evict_one_coldest(1);
1576 }
1577 stop.store(true, Ordering::Relaxed);
1578 });
1579 }
1580 });
1581
1582 assert_eq!(
1585 pool.charged(),
1586 0,
1587 "eviction racing deregister leaked charged budget"
1588 );
1589 }
1590}
1591
1592#[cfg(test)]
1597mod mutation_guard_tests {
1598 use super::*;
1599 use std::sync::atomic::Ordering as AO;
1600 use std::sync::{Arc, Mutex};
1601
1602 #[expect(clippy::unnecessary_wraps)]
1603 fn fillb(b: &mut [u8], _o: u64) -> io::Result<()> {
1604 b.fill(7);
1605 Ok(())
1606 }
1607
1608 #[expect(clippy::cast_possible_truncation)]
1609 fn bk_temp(len: usize) -> (tempfile::TempDir, std::fs::File) {
1610 use std::io::Write;
1611 let dir = tempfile::tempdir().unwrap();
1612 let path = dir.path().join("bk.bin");
1613 let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();
1614 std::fs::File::create(&path)
1615 .unwrap()
1616 .write_all(&data)
1617 .unwrap();
1618 (dir, std::fs::File::open(&path).unwrap())
1619 }
1620
1621 #[test]
1622 fn next_expected_equals_consumed_tail() {
1623 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1624 let mut dst = vec![0u8; 4096];
1625 ra.read_into(&mut dst, 1000, 8 << 20, fillb).unwrap();
1626 assert_eq!(ra.next_expected(), 1000 + 4096); let mut d2 = vec![0u8; 100];
1628 ra.read_into(&mut d2, 5096, 8 << 20, fillb).unwrap();
1629 assert_eq!(ra.next_expected(), 5096 + 100); }
1631
1632 #[test]
1633 fn window_doubles_on_sequential_miss_and_floors_on_seek() {
1634 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1635 let blen = 16 << 20;
1636 #[expect(clippy::cast_possible_truncation)]
1637 let mut dst = vec![0u8; WINDOW_FLOOR as usize];
1638 ra.read_into(&mut dst, 0, blen, fillb).unwrap();
1639 assert_eq!(ra.window(), WINDOW_FLOOR); ra.read_into(&mut dst, WINDOW_FLOOR, blen, fillb).unwrap();
1641 assert_eq!(ra.window(), WINDOW_FLOOR * 2); ra.read_into(&mut dst, 12 << 20, blen, fillb).unwrap();
1643 assert_eq!(ra.window(), WINDOW_FLOOR); }
1645
1646 #[test]
1647 fn set_cap_clamps_window_down_only() {
1648 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1649 ra.set_cap(WINDOW_FLOOR / 2);
1650 assert_eq!(ra.window(), WINDOW_FLOOR / 2); ra.set_cap(WINDOW_ABS_CAP);
1652 assert_eq!(ra.window(), WINDOW_FLOOR / 2); }
1654
1655 #[test]
1656 fn reconcile_charges_growth_and_uncharges_shrink() {
1657 let pool = ReadAheadPool::new(64 << 20);
1658 pool.reconcile(0, 1000);
1659 assert_eq!(pool.charged(), 1000);
1660 pool.reconcile(1000, 250); assert_eq!(pool.charged(), 250);
1662 pool.reconcile(250, 250); assert_eq!(pool.charged(), 250);
1664 }
1665
1666 #[test]
1667 fn has_room_for_zero_need_is_false_when_disabled() {
1668 assert!(!ReadAheadPool::new(0).has_room_for(0)); let p = ReadAheadPool::new(1 << 20);
1670 p.reconcile(0, 1 << 20);
1671 assert!(!p.has_room_for(1));
1672 }
1673
1674 #[test]
1675 fn permitted_window_need_is_relative_to_old_len() {
1676 let pool = ReadAheadPool::new(2 << 20);
1682 let cap = pool.per_stream_cap();
1683 pool.reconcile(0, (2 << 20) - cap); assert_eq!(pool.permitted_window(2, cap / 4, cap), cap);
1685 }
1686
1687 #[test]
1688 fn permitted_window_clamps_to_room_when_nothing_evictable() {
1689 let pool = ReadAheadPool::new(2 << 20);
1693 let cap = pool.per_stream_cap();
1694 pool.reconcile(0, (2 << 20) - 256 * 1024); assert_eq!(
1696 pool.permitted_window(2, 128 * 1024, cap),
1697 128 * 1024 + 256 * 1024
1698 );
1699 }
1700
1701 #[test]
1702 #[expect(clippy::cast_possible_truncation)]
1703 fn touch_keeps_a_stream_off_the_eviction_block() {
1704 let pool = ReadAheadPool::new(2 << 20); let cap = pool.per_stream_cap();
1706 let mk = |key: usize| {
1707 let arc = Arc::new(Mutex::new(ReadAhead::new(cap)));
1708 let mut d = vec![0u8; cap as usize];
1709 let (o, n) = arc
1710 .lock()
1711 .unwrap()
1712 .read_into(&mut d, 0, cap * 4, fillb)
1713 .unwrap();
1714 pool.register(key, Arc::clone(&arc));
1715 pool.reconcile(o, n);
1716 arc
1717 };
1718 let s1 = mk(1);
1719 let s2 = mk(2);
1720 let _s3 = mk(3);
1721 let _s4 = mk(4); pool.touch(1); let hot = Arc::new(Mutex::new(ReadAhead::new(cap)));
1724 pool.register(5, Arc::clone(&hot));
1725 pool.permitted_window(5, 0, cap); assert_eq!(s2.lock().unwrap().len(), 0, "coldest stream evicted");
1727 assert!(s1.lock().unwrap().len() > 0, "touched stream survives");
1728 }
1729
1730 #[test]
1731 fn fills_count_is_exact() {
1732 let (_d, file) = bk_temp(256 * 1024);
1733 let mut d = vec![0u8; 4096];
1734 let pool0 = ReadAheadPool::new(0);
1736 let buf0 = Arc::new(Mutex::new(ReadAhead::new(0)));
1737 let ep0 = std::sync::atomic::AtomicU64::new(0);
1738 let br0 = BackingReader::new(&file, &buf0, &pool0, 0, 256 * 1024, &ep0);
1739 br0.read_exact_at(&mut d, 0).unwrap();
1740 br0.read_exact_at(&mut d, 100_000).unwrap();
1741 assert_eq!(br0.fills(), 2);
1742 let pool = ReadAheadPool::new(64 << 20);
1745 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1746 pool.register(1, Arc::clone(&buf));
1747 let ep = std::sync::atomic::AtomicU64::new(0);
1748 let br = BackingReader::new(&file, &buf, &pool, 1, 256 * 1024, &ep);
1749 br.read_exact_at(&mut d, 0).unwrap();
1750 assert_eq!(br.fills(), 1);
1751 br.read_exact_at(&mut d, 4096).unwrap();
1752 assert_eq!(br.fills(), 1);
1753 }
1754
1755 #[test]
1756 fn epoch_bumps_on_seek_not_on_sequential() {
1757 let (_d, file) = bk_temp(1 << 20);
1758 let pool = ReadAheadPool::new(64 << 20);
1759 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1760 pool.register(1, Arc::clone(&buf));
1761 let ep = Arc::new(std::sync::atomic::AtomicU64::new(0));
1762 let br = BackingReader::new(&file, &buf, &pool, 1, 1 << 20, &ep);
1763 let mut d = vec![0u8; 4096];
1764 br.read_exact_at(&mut d, 0).unwrap(); let base = ep.load(AO::Relaxed);
1766 br.read_exact_at(&mut d, 4096).unwrap(); assert_eq!(ep.load(AO::Relaxed), base, "sequential must not bump epoch");
1768 br.read_exact_at(&mut d, 900_000).unwrap(); assert_eq!(ep.load(AO::Relaxed), base + 1, "seek bumps epoch");
1770 }
1771
1772 #[test]
1773 fn ring_trim_evicts_window_fully_behind_the_reader() {
1774 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1775 ra.set_max_windows(2);
1776 ra.store_window(0, vec![0u8; 1000]); ra.store_window(1000, vec![0u8; 1000]); let mut dst = vec![0u8; 10];
1780 ra.read_into(&mut dst, 1990, 1 << 20, fillb).unwrap(); ra.store_window(2000, vec![0u8; 1000]); assert!(!ra.covers(0, 10), "fully-behind window evicted");
1786 assert!(ra.covers(1000, 10), "nearer behind window kept");
1787 assert!(ra.covers(2000, 10), "just-stored ahead window kept");
1788 }
1789
1790 #[test]
1791 fn plan_prefetch_no_jobs_when_watermark_at_horizon() {
1792 let win = WINDOW_FLOOR;
1796 let depth = 4;
1797 let start = 10 * win;
1798 let horizon = start + depth * win;
1799 let (starts, upto) = plan_prefetch(horizon, start, win, depth, 100 << 20);
1800 assert!(starts.is_empty(), "caught up to horizon → no jobs");
1801 assert_eq!(upto, horizon);
1802 }
1803
1804 #[test]
1805 fn plan_prefetch_window_zero_leaves_watermark_unchanged() {
1806 let (starts, upto) = plan_prefetch(500, 1000, 0, 4, 1 << 20);
1809 assert!(starts.is_empty());
1810 assert_eq!(upto, 500);
1811 }
1812
1813 #[test]
1814 fn cached_len_replaces_same_start_window() {
1815 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1816 ra.store_window(100, vec![0u8; 1000]);
1817 assert_eq!(ra.len(), 1000);
1818 ra.store_window(100, vec![0u8; 2500]);
1822 assert_eq!(ra.len(), 2500);
1823 }
1824
1825 #[test]
1826 fn cached_len_drops_evicted_window_bytes() {
1827 let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1828 ra.set_max_windows(2);
1829 ra.store_window(0, vec![0u8; 1000]);
1830 ra.store_window(1000, vec![0u8; 1000]);
1831 assert_eq!(ra.len(), 2000);
1832 let mut dst = vec![0u8; 10];
1833 ra.read_into(&mut dst, 1990, 1 << 20, fillb).unwrap(); ra.store_window(2000, vec![0u8; 1000]);
1837 assert_eq!(ra.len(), 2000);
1838 }
1839
1840 #[test]
1841 fn prefetch_plan_reports_captured_frontier_and_window() {
1842 let (_d, file) = bk_temp(256 * 1024);
1843 let pool = ReadAheadPool::new(64 << 20);
1844 let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1845 pool.register(1, Arc::clone(&buf));
1846 let ep = std::sync::atomic::AtomicU64::new(0);
1847 let br =
1848 BackingReader::new(&file, &buf, &pool, 1, 256 * 1024, &ep).with_prefetch_planning();
1849 let mut d = vec![0u8; 4096];
1850 br.read_exact_at(&mut d, 0).unwrap();
1851 assert_eq!(br.prefetch_plan(), (4096, WINDOW_FLOOR));
1855 }
1856}