Skip to main content

musefs_core/
readahead.rs

1//! Per-handle backing read-ahead: an adaptive window over raw backing-file
2//! bytes, a global byte budget with eviction, and the `BackingReader` shim that
3//! every backing read flows through. See
4//! `docs/superpowers/specs/2026-06-14-read-ahead-overlap-design.md`.
5
6use std::collections::HashMap;
7use std::io;
8use std::sync::atomic::{AtomicU64 as Epoch, Ordering as O};
9use std::sync::{Arc, Mutex, MutexGuard};
10
11/// Floor window size: a fresh or just-seeked stream still reads this much ahead.
12pub const WINDOW_FLOOR: u64 = 512 * 1024;
13/// Absolute per-stream window cap, independent of the global budget.
14pub const WINDOW_ABS_CAP: u64 = 8 * 1024 * 1024;
15
16/// No single stream may hold more than `budget / PER_STREAM_DIVISOR`.
17const PER_STREAM_DIVISOR: u64 = 4;
18
19struct StreamEntry {
20    buf: Arc<Mutex<ReadAhead>>,
21    last_served: u64,
22}
23
24/// The process-wide read-ahead allocator: one byte budget shared by all active
25/// streams, with `try_lock` LRU eviction. Deadlock-free by construction — the
26/// budget is a lock-free atomic, the registry lock is a leaf (released before any
27/// buffer mutex), and eviction never blocks on a buffer mutex (`try_lock` + skip).
28pub struct ReadAheadPool {
29    /// Total RAM envelope; `0` means read-ahead is disabled.
30    budget: u64,
31    /// Currently charged bytes (sum of registered buffers' lengths).
32    charged: Epoch,
33    /// Active streaming handles keyed by slab key. Only sequential streams register.
34    streams: Mutex<HashMap<usize, StreamEntry>>,
35    /// Monotonic source for `last_served` stamps (LRU ordering).
36    clock: Epoch,
37}
38
39impl ReadAheadPool {
40    pub fn new(budget: u64) -> Self {
41        ReadAheadPool {
42            budget,
43            charged: Epoch::new(0),
44            streams: Mutex::new(HashMap::new()),
45            clock: Epoch::new(0),
46        }
47    }
48
49    pub fn enabled(&self) -> bool {
50        self.budget > 0
51    }
52
53    /// Per-stream window cap derived from the budget.
54    pub fn per_stream_cap(&self) -> u64 {
55        if self.budget == 0 {
56            return 0;
57        }
58        (self.budget / PER_STREAM_DIVISOR).clamp(WINDOW_FLOOR, WINDOW_ABS_CAP)
59    }
60
61    /// Lazily register a handle's buffer once sequential access is detected.
62    pub fn register(&self, key: usize, buf: Arc<Mutex<ReadAhead>>) {
63        let last_served = self.clock.fetch_add(1, O::Relaxed);
64        self.streams
65            .lock()
66            .unwrap()
67            .insert(key, StreamEntry { buf, last_served });
68    }
69
70    /// Deregister on release; uncharges the buffer's bytes. Drops the `streams`
71    /// lock BEFORE locking the buffer: a concurrent read holds its buffer mutex
72    /// and then blocking-acquires `streams` (via `permitted_window`), so holding
73    /// `streams` while locking the buffer here would invert that order and
74    /// deadlock. Keeping `streams` a leaf (released before any buffer mutex)
75    /// preserves the pool's deadlock-free invariant.
76    pub fn deregister(&self, key: usize) {
77        let entry = self.streams.lock().unwrap().remove(&key);
78        // Clear under the buffer lock rather than merely reading `len`: clearing is
79        // what makes the buffer length the authoritative "already uncharged"
80        // signal, so a racing eviction holding a stale `Arc` to this buffer reads
81        // `len 0` and uncharges nothing — neither side double-counts nor leaks
82        // (#536).
83        let freed = match entry {
84            Some(e) => e.buf.lock().unwrap().clear(),
85            None => 0,
86        };
87        if freed > 0 {
88            self.charged.fetch_sub(freed, O::Relaxed);
89        }
90    }
91
92    /// Mark `key` as most-recently-served (LRU bump). No-op if unregistered.
93    ///
94    /// Best-effort: a `try_lock` that SKIPS rather than blocks on contention. It
95    /// runs at the end of every backing pread, on every stream, so a blocking
96    /// `lock` here serializes all concurrent playback on this one registry mutex
97    /// (#519). A skipped bump only leaves a stamp slightly stale, at worst evicting
98    /// a marginally-warmer stream — and eviction is already best-effort
99    /// (`try_lock` + skip), so the LRU order was never exact to begin with.
100    pub fn touch(&self, key: usize) {
101        let stamp = self.clock.fetch_add(1, O::Relaxed);
102        if let Ok(mut g) = self.streams.try_lock()
103            && let Some(e) = g.get_mut(&key)
104        {
105            e.last_served = stamp;
106        }
107    }
108
109    /// Decide the largest window (≤ `desired`, ≤ per-stream cap) a stream may grow
110    /// to right now, given a current size of `old_len`. Evicts colder OTHER
111    /// streams as needed to make room for the `(window - old_len)` delta, but does
112    /// NOT charge — charging happens in `reconcile` against the ACTUAL bytes read.
113    /// Never blocks on a buffer mutex (`try_lock` + skip). Call only on a miss.
114    pub fn permitted_window(&self, key: usize, old_len: u64, desired: u64) -> u64 {
115        if self.budget == 0 {
116            return 0;
117        }
118        let desired = desired.min(self.per_stream_cap()).max(old_len);
119        let need = desired - old_len;
120        loop {
121            let cur = self.charged.load(O::Relaxed);
122            let room = self.budget.saturating_sub(cur);
123            if room >= need {
124                return desired;
125            }
126            // Try to evict the coldest OTHER stream to free room.
127            match self.evict_one_coldest(key) {
128                Some(_) => {}
129                None => {
130                    // Nothing evictable: permit only what fits now.
131                    return old_len + room;
132                }
133            }
134        }
135    }
136
137    /// Charge the budget by the ACTUAL window-size change `(old_len → new_len)`.
138    /// Keeps the invariant `charged == Σ(registered buffers' bytes.len())`.
139    pub fn reconcile(&self, old_len: u64, new_len: u64) {
140        if new_len > old_len {
141            self.charged.fetch_add(new_len - old_len, O::Relaxed);
142        } else if new_len < old_len {
143            self.charged.fetch_sub(old_len - new_len, O::Relaxed);
144        }
145    }
146
147    /// Best-effort check that `need` bytes of *free* (uncharged) budget exist.
148    /// Speculative prefetch uses this rather than evicting live streams: under
149    /// memory pressure it simply declines to prefetch. Racy by design — the
150    /// caller still `reconcile`s the actual stored delta, so the
151    /// `charged == Σ bytes` invariant holds regardless; this only bounds overshoot.
152    pub fn has_room_for(&self, need: u64) -> bool {
153        self.budget > 0 && self.budget.saturating_sub(self.charged.load(O::Relaxed)) >= need
154    }
155
156    /// Bytes currently held across all registered read-ahead buffers (telemetry
157    /// `musefs_readahead_charged_bytes`; #394).
158    pub fn charged(&self) -> u64 {
159        self.charged.load(O::Relaxed)
160    }
161
162    /// Total read-ahead RAM envelope; `0` when read-ahead is disabled (telemetry
163    /// `musefs_readahead_budget_bytes`; #394).
164    pub fn budget(&self) -> u64 {
165        self.budget
166    }
167
168    /// Find and clear the coldest registered buffer other than `except`, using
169    /// `try_lock` so eviction never blocks on an in-progress read. Returns the
170    /// freed byte count, or `None` if nothing was evictable this pass.
171    fn evict_one_coldest(&self, except: usize) -> Option<u64> {
172        // Snapshot the candidates under the `streams` lock, then sort by warmth
173        // *after* releasing it — the ordering work no longer blocks registration
174        // or deregistration of other streams.
175        let mut candidates: Vec<(usize, u64, Arc<Mutex<ReadAhead>>)> = {
176            let g = self.streams.lock().unwrap();
177            g.iter()
178                .filter(|(k, _)| **k != except)
179                .map(|(k, e)| (*k, e.last_served, Arc::clone(&e.buf)))
180                .collect()
181        };
182        candidates.sort_by_key(|(_, ls, _)| *ls); // coldest (smallest stamp) first
183        for (_, _, buf) in candidates {
184            if let Ok(mut ra) = buf.try_lock() {
185                // Uncharging is tied to physically clearing the buffer under its
186                // own lock: the clearer uncharges exactly the bytes it removed, and
187                // a racing `deregister`/eviction holding a stale `Arc` to the same
188                // buffer then reads `len 0` and uncharges nothing. That makes the
189                // buffer length the single coordination point — re-checking the
190                // registry instead raced `deregister` and leaked `freed` whenever
191                // the deregister landed after this clear (#536).
192                let freed = ra.clear();
193                drop(ra);
194                if freed > 0 {
195                    self.charged.fetch_sub(freed, O::Relaxed);
196                    return Some(freed);
197                }
198            }
199        }
200        None
201    }
202}
203
204struct Window {
205    start: u64,
206    bytes: Vec<u8>,
207}
208
209pub struct ReadAhead {
210    windows: Vec<Window>,
211    /// Running sum of `windows[*].bytes.len()`, maintained on every insert,
212    /// eviction and clear so `len()` is O(1) on the read hot path instead of
213    /// re-summing every window under the per-handle lock.
214    cached_bytes: u64,
215    next_expected: u64,
216    window: u64,
217    cap: u64,
218    max_windows: usize,
219}
220
221impl ReadAhead {
222    pub fn new(cap: u64) -> Self {
223        ReadAhead {
224            windows: Vec::new(),
225            cached_bytes: 0,
226            next_expected: u64::MAX,
227            window: WINDOW_FLOOR,
228            cap,
229            max_windows: 1,
230        }
231    }
232    pub fn set_max_windows(&mut self, n: usize) {
233        self.max_windows = n.max(1);
234    }
235    pub fn next_expected(&self) -> u64 {
236        self.next_expected
237    }
238    /// The current adaptive window size (grows geometrically on sequential
239    /// access, resets to the floor on seek). Drives prefetch depth.
240    pub fn window(&self) -> u64 {
241        self.window
242    }
243
244    #[allow(clippy::len_without_is_empty)]
245    pub fn len(&self) -> u64 {
246        self.cached_bytes
247    }
248    pub fn clear(&mut self) -> u64 {
249        let freed = self.cached_bytes;
250        self.windows.clear();
251        self.cached_bytes = 0;
252        self.window = WINDOW_FLOOR.min(self.cap);
253        self.next_expected = u64::MAX;
254        freed
255    }
256    pub fn set_cap(&mut self, cap: u64) {
257        self.cap = cap;
258        if self.window > self.cap {
259            self.window = self.cap;
260        }
261    }
262
263    pub fn covers(&self, off: u64, len: usize) -> bool {
264        let end = off.saturating_add(len as u64);
265        self.windows
266            .iter()
267            .any(|w| off >= w.start && end <= w.start + w.bytes.len() as u64)
268    }
269
270    fn window_containing(&self, off: u64, len: usize) -> Option<&Window> {
271        let end = off.saturating_add(len as u64);
272        self.windows
273            .iter()
274            .find(|w| off >= w.start && end <= w.start + w.bytes.len() as u64)
275    }
276
277    fn insert_window(&mut self, w: Window) {
278        let w_bytes = w.bytes.len() as u64;
279        // `windows` is kept sorted by `start`, so storing a window is a binary
280        // search + positional insert rather than a full re-sort on every miss
281        // and every completed prefetch.
282        match self.windows.binary_search_by_key(&w.start, |x| x.start) {
283            Ok(i) => {
284                self.cached_bytes -= self.windows[i].bytes.len() as u64;
285                self.cached_bytes += w_bytes;
286                self.windows[i] = w;
287            }
288            Err(i) => {
289                self.cached_bytes += w_bytes;
290                self.windows.insert(i, w);
291            }
292        }
293        while self.windows.len() > self.max_windows {
294            let frontier = self.next_expected;
295            // Sorted by `start`, so the first window lying fully behind the read
296            // frontier is also the lowest-start such window — the victim the
297            // previous min-by-start scan picked. Fall back to the oldest (index
298            // 0) when nothing is fully consumed yet.
299            let idx = self
300                .windows
301                .iter()
302                .position(|w| w.start + w.bytes.len() as u64 <= frontier)
303                .unwrap_or(0);
304            self.cached_bytes -= self.windows[idx].bytes.len() as u64;
305            self.windows.remove(idx);
306        }
307    }
308
309    pub fn store_window(&mut self, start: u64, bytes: Vec<u8>) -> (u64, u64) {
310        let old = self.len();
311        self.insert_window(Window { start, bytes });
312        (old, self.len())
313    }
314
315    pub fn read_into(
316        &mut self,
317        dst: &mut [u8],
318        off: u64,
319        backing_len: u64,
320        mut fill: impl FnMut(&mut [u8], u64) -> io::Result<()>,
321    ) -> io::Result<(u64, u64)> {
322        let len = dst.len();
323        if len == 0 {
324            let n = self.len();
325            return Ok((n, n));
326        }
327        if let Some(w) = self.window_containing(off, len) {
328            #[expect(clippy::cast_possible_truncation)]
329            let lo = (off - w.start) as usize;
330            dst.copy_from_slice(&w.bytes[lo..lo + len]);
331            self.next_expected = off + len as u64;
332            let n = self.len();
333            return Ok((n, n));
334        }
335        let old = self.len();
336        if off == self.next_expected {
337            self.window = self.window.saturating_mul(2).min(self.cap);
338        } else {
339            self.window = WINDOW_FLOOR.min(self.cap);
340        }
341        // A read straddling EOF (`off + len > backing_len`) would clamp `want`
342        // below `len` and panic on `buf[..len]` below. The serve path never
343        // requests past the backing length, but this is a `pub` method, so fail
344        // closed with `read_exact` semantics rather than panic in release.
345        if off >= backing_len || off.saturating_add(len as u64) > backing_len {
346            return Err(io::Error::from(io::ErrorKind::UnexpectedEof));
347        }
348        let want = self
349            .window
350            .max(len as u64)
351            .min(self.cap.max(len as u64))
352            .min(backing_len.saturating_sub(off));
353        #[expect(clippy::cast_possible_truncation)]
354        let mut buf = vec![0u8; want as usize];
355        fill(&mut buf, off)?;
356        dst.copy_from_slice(&buf[..len]);
357        self.insert_window(Window {
358            start: off,
359            bytes: buf,
360        });
361        self.next_expected = off + len as u64;
362        Ok((old, self.len()))
363    }
364}
365
366/// Lock a per-handle read-ahead buffer, recovering from a poisoned mutex rather
367/// than panicking. A poison means a prior reader — a foreground read or a
368/// prefetch worker — panicked mid-mutation while holding this lock, so the
369/// buffer may be partially updated; reset it to the known-good empty state (the
370/// next read cold-fills) instead of letting `unwrap()` propagate the poison into
371/// a panic on every subsequent read of this handle. Mirrors `lock::lock_or_clear`
372/// but also uncharges the cleared bytes, which a bare clear would leave
373/// over-counted against the global budget.
374fn lock_buf_or_clear<'a>(
375    buf: &'a Mutex<ReadAhead>,
376    pool: &ReadAheadPool,
377) -> MutexGuard<'a, ReadAhead> {
378    match buf.lock() {
379        Ok(g) => g,
380        Err(e) => {
381            log::error!("cleared poisoned read-ahead buffer lock");
382            buf.clear_poison();
383            let mut g = e.into_inner();
384            let freed = g.clear();
385            pool.reconcile(freed, 0);
386            g
387        }
388    }
389}
390
391/// Store a prefetched window into `buf` iff the handle's epoch is unchanged, and
392/// charge the global budget by the resulting size delta so the
393/// `charged == Σ(registered buffers' bytes.len())` invariant is preserved. A
394/// stale epoch (seek/release/refresh since dispatch) drops the window untouched.
395pub fn try_store_prefetch(
396    pool: &ReadAheadPool,
397    buf: &Arc<Mutex<ReadAhead>>,
398    epoch: &Epoch,
399    dispatched_epoch: u64,
400    start: u64,
401    bytes: Vec<u8>,
402) -> bool {
403    let mut ra = lock_buf_or_clear(buf, pool);
404    if epoch.load(O::Acquire) != dispatched_epoch {
405        return false;
406    }
407    let (old, new) = ra.store_window(start, bytes);
408    drop(ra);
409    pool.reconcile(old, new);
410    true
411}
412
413/// How many `window`-sized next-windows to keep in flight for a sequential
414/// stream: enough that their combined size is about one per-stream budget share
415/// (`cap`), clamped to a small thread fan-out. Decreases as the adaptive window
416/// grows toward `cap` (fewer, larger chunks) and rises again after a seek resets
417/// the window to the floor. Always ≥ 1 and never `cap / cap == 1` regardless of
418/// `window`.
419pub fn prefetch_depth(cap: u64, window: u64) -> u64 {
420    (cap / window.max(1)).clamp(1, 4)
421}
422
423/// Plan which next-windows to enqueue, deduplicating against `prefetched_upto`
424/// (the offset through which jobs were already dispatched for this stream).
425///
426/// Anchoring dispatch to the moving `next_expected` made every read re-request
427/// the same forward windows at shifted offsets — a pile of overlapping preads.
428/// Instead we fetch contiguous `window`-sized blocks from the watermark up to
429/// the horizon (`start + depth*window`), so a sequential reader enqueues only
430/// the freshly-exposed tail (≈ one window per window consumed). A seek — the
431/// reader landing before the watermark, or the watermark sitting beyond the new
432/// horizon — restarts dispatch from `start`. Returns the window-aligned start
433/// offsets and the new watermark.
434pub fn plan_prefetch(
435    prefetched_upto: u64,
436    start: u64,
437    window: u64,
438    depth: u64,
439    backing_len: u64,
440) -> (Vec<u64>, u64) {
441    if window == 0 || start >= backing_len {
442        return (Vec::new(), prefetched_upto);
443    }
444    let horizon = start
445        .saturating_add(depth.saturating_mul(window))
446        .min(backing_len);
447    let mut s = prefetched_upto;
448    // Outside [start, horizon] means a seek (reader moved before the watermark)
449    // or the watermark ran past the horizon: dispatch from the current position.
450    if !(start..=horizon).contains(&s) {
451        s = start;
452    }
453    let mut starts = Vec::new();
454    while s < horizon {
455        starts.push(s);
456        s += window;
457    }
458    (starts, s.min(backing_len))
459}
460
461use std::cell::Cell;
462
463/// The per-dispatch inputs shared by every window planned from one read: the
464/// same file/buffer/pool/epoch at the same dispatched epoch. Held once behind an
465/// `Arc` so each `PrefetchJob` bumps a single refcount instead of re-cloning the
466/// four Arcs per window (#431).
467pub struct PrefetchContext {
468    pub file: Arc<std::fs::File>,
469    pub buf: Arc<Mutex<ReadAhead>>,
470    pub pool: Arc<ReadAheadPool>,
471    pub epoch: Arc<Epoch>,
472    pub dispatched_epoch: u64,
473    pub len: u64,
474    pub backing_len: u64,
475}
476
477pub struct PrefetchJob {
478    pub ctx: Arc<PrefetchContext>,
479    pub start: u64,
480}
481
482pub struct PrefetchWorkers {
483    tx: crossbeam_channel::Sender<PrefetchJob>,
484}
485
486impl PrefetchWorkers {
487    pub fn new(threads: usize) -> Self {
488        // A multi-consumer channel: every worker owns its own `Receiver` clone and
489        // blocks in `recv` independently, so job hand-off and wake-ups fan out
490        // across all threads instead of serializing behind one shared lock (#430).
491        let (tx, rx) = crossbeam_channel::bounded::<PrefetchJob>(threads * 4);
492        for _ in 0..threads {
493            let rx = rx.clone();
494            std::thread::spawn(move || {
495                while let Ok(job) = rx.recv() {
496                    // Isolate each job: a panic in `run_job` (e.g. a parser bug on
497                    // a hostile backing file) must not kill this worker, which
498                    // would silently drain the pool and disable prefetch. The
499                    // foreground read path recovers any buffer lock the job
500                    // poisoned via `lock_buf_or_clear`.
501                    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
502                        Self::run_job(job);
503                    }));
504                }
505            });
506        }
507        PrefetchWorkers { tx }
508    }
509
510    #[expect(clippy::needless_pass_by_value)]
511    pub fn run_job(job: PrefetchJob) {
512        use std::os::unix::fs::FileExt;
513        let ctx = &job.ctx;
514        if ctx.epoch.load(std::sync::atomic::Ordering::Acquire) != ctx.dispatched_epoch {
515            return;
516        }
517        let want = ctx.len.min(ctx.backing_len.saturating_sub(job.start));
518        if want == 0 {
519            return;
520        }
521        // Speculative: only prefetch into free budget, never evicting a live
522        // stream. Under pressure we skip rather than thrash real reads.
523        if !ctx.pool.has_room_for(want) {
524            return;
525        }
526        #[expect(clippy::cast_possible_truncation)]
527        let mut bytes = vec![0u8; want as usize];
528        if ctx.file.read_exact_at(&mut bytes, job.start).is_err() {
529            return;
530        }
531        let _ = try_store_prefetch(
532            &ctx.pool,
533            &ctx.buf,
534            &ctx.epoch,
535            ctx.dispatched_epoch,
536            job.start,
537            bytes,
538        );
539    }
540
541    pub fn request(&self, job: PrefetchJob) {
542        let _ = self.tx.try_send(job);
543    }
544}
545
546pub struct BackingReader<'a> {
547    file: &'a std::fs::File,
548    buf: &'a Arc<Mutex<ReadAhead>>,
549    pool: &'a ReadAheadPool,
550    key: usize,
551    backing_len: u64,
552    fills: Cell<u64>,
553    epoch: &'a std::sync::atomic::AtomicU64,
554    /// When set, each backing read sizes the eviction ring and stashes the
555    /// post-read frontier/window below, so the caller can plan prefetch without
556    /// re-acquiring the per-handle lock (#429). Off by default.
557    prefetch: bool,
558    planned_next_expected: Cell<u64>,
559    planned_window: Cell<u64>,
560}
561
562impl<'a> BackingReader<'a> {
563    pub fn new(
564        file: &'a std::fs::File,
565        buf: &'a Arc<Mutex<ReadAhead>>,
566        pool: &'a ReadAheadPool,
567        key: usize,
568        backing_len: u64,
569        epoch: &'a std::sync::atomic::AtomicU64,
570    ) -> Self {
571        BackingReader {
572            file,
573            buf,
574            pool,
575            key,
576            backing_len,
577            fills: Cell::new(0),
578            epoch,
579            prefetch: false,
580            planned_next_expected: Cell::new(u64::MAX),
581            planned_window: Cell::new(0),
582        }
583    }
584
585    /// Opt into prefetch-plan capture: every backing read sizes the eviction
586    /// ring and records the post-read `next_expected`/`window` under the read
587    /// lock, so the caller reads them via [`Self::prefetch_plan`] without a
588    /// second lock acquisition (#429).
589    #[must_use]
590    pub fn with_prefetch_planning(mut self) -> Self {
591        self.prefetch = true;
592        self
593    }
594
595    /// The `(next_expected, window)` captured by the most recent backing read,
596    /// or `(u64::MAX, 0)` if no backing read occurred. Only meaningful when
597    /// constructed via [`Self::with_prefetch_planning`].
598    pub fn prefetch_plan(&self) -> (u64, u64) {
599        (self.planned_next_expected.get(), self.planned_window.get())
600    }
601
602    pub fn fills(&self) -> u64 {
603        self.fills.get()
604    }
605
606    pub fn file(&self) -> &std::fs::File {
607        self.file
608    }
609
610    pub fn read_exact_at(&self, dst: &mut [u8], abs_offset: u64) -> std::io::Result<()> {
611        if !self.pool.enabled() {
612            self.fills.set(self.fills.get() + 1);
613            crate::metrics::on_readahead_miss();
614            crate::metrics::on_pread(dst.len() as u64);
615            return crate::metrics::backing_read_exact_at(self.file, dst, abs_offset);
616        }
617        let mut ra = lock_buf_or_clear(self.buf, self.pool);
618        if ra.covers(abs_offset, dst.len()) {
619            crate::metrics::on_readahead_hit();
620        } else {
621            crate::metrics::on_readahead_miss();
622            if abs_offset != ra.next_expected() {
623                self.epoch.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
624            }
625            let cap = self
626                .pool
627                .permitted_window(self.key, ra.len(), self.pool.per_stream_cap());
628            ra.set_cap(cap);
629        }
630        let file = self.file;
631        let fills = &self.fills;
632        let (old_len, new_len) = ra.read_into(dst, abs_offset, self.backing_len, |b, o| {
633            fills.set(fills.get() + 1);
634            crate::metrics::on_pread(b.len() as u64);
635            crate::metrics::backing_read_exact_at(file, b, o)
636        })?;
637        if self.prefetch {
638            // Size the eviction ring and capture the post-read frontier/window
639            // under the lock we already hold, so the caller plans prefetch
640            // without re-locking this per-handle mutex (#429).
641            let window = ra.window();
642            let depth = prefetch_depth(self.pool.per_stream_cap(), window);
643            ra.set_max_windows(usize::try_from(depth).unwrap_or(4) + 1);
644            self.planned_next_expected.set(ra.next_expected());
645            self.planned_window.set(window);
646        }
647        self.pool.reconcile(old_len, new_len);
648        drop(ra);
649        self.pool.touch(self.key);
650        Ok(())
651    }
652}
653
654#[cfg(test)]
655mod window_tests {
656    use super::*;
657
658    /// A fake backing file: `data` is the whole file; `fill` copies from it and
659    /// records each (offset, len) actually read so tests can assert pread counts.
660    struct Fake {
661        data: Vec<u8>,
662        reads: Vec<(u64, usize)>,
663    }
664    impl Fake {
665        fn new(len: usize) -> Self {
666            #[expect(clippy::cast_possible_truncation)]
667            let data = (0..len).map(|i| (i % 251) as u8).collect();
668            Fake {
669                data,
670                reads: Vec::new(),
671            }
672        }
673        #[expect(clippy::unnecessary_wraps)]
674        fn fill(&mut self, buf: &mut [u8], off: u64) -> io::Result<()> {
675            self.reads.push((off, buf.len()));
676            #[expect(clippy::cast_possible_truncation)]
677            let o = off as usize;
678            buf.copy_from_slice(&self.data[o..o + buf.len()]);
679            Ok(())
680        }
681    }
682
683    fn serve(ra: &mut ReadAhead, fake: &mut Fake, off: u64, len: usize) -> Vec<u8> {
684        let mut dst = vec![0u8; len];
685        let backing_len = fake.data.len() as u64;
686        ra.read_into(&mut dst, off, backing_len, |b, o| fake.fill(b, o))
687            .unwrap();
688        dst
689    }
690
691    #[test]
692    fn first_read_misses_then_sequential_reads_hit() {
693        let mut fake = Fake::new(4 * 1024 * 1024);
694        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
695        // First 64 KiB read: a miss, fills a floor-sized window.
696        let a = serve(&mut ra, &mut fake, 0, 64 * 1024);
697        assert_eq!(a, fake.data[0..64 * 1024]);
698        assert_eq!(fake.reads.len(), 1, "first read must fill once");
699        // Next sequential 64 KiB: fully inside the window → no new pread.
700        let b = serve(&mut ra, &mut fake, 64 * 1024, 64 * 1024);
701        assert_eq!(b, fake.data[64 * 1024..128 * 1024]);
702        assert_eq!(fake.reads.len(), 1, "sequential hit must not pread");
703    }
704
705    #[test]
706    fn sequential_miss_grows_window_geometrically() {
707        let mut fake = Fake::new(16 * 1024 * 1024);
708        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
709        // Read the full floor window, forcing a sequential miss at its end each time.
710        #[expect(clippy::cast_possible_truncation)]
711        let floor = WINDOW_FLOOR as usize;
712        serve(&mut ra, &mut fake, 0, floor); // miss, window stays floor (first fill)
713        serve(&mut ra, &mut fake, WINDOW_FLOOR, floor); // seq miss → window doubles
714        // The second fill must have requested > floor bytes (geometric growth).
715        let second_fill_len = fake.reads[1].1 as u64;
716        assert!(
717            second_fill_len > WINDOW_FLOOR,
718            "window must grow on sequential miss"
719        );
720        assert!(second_fill_len <= WINDOW_ABS_CAP, "window capped");
721    }
722
723    #[test]
724    fn seek_resets_window_to_floor() {
725        let mut fake = Fake::new(16 * 1024 * 1024);
726        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
727        #[expect(clippy::cast_possible_truncation)]
728        serve(&mut ra, &mut fake, 0, WINDOW_FLOOR as usize);
729        #[expect(clippy::cast_possible_truncation)]
730        serve(&mut ra, &mut fake, WINDOW_FLOOR, WINDOW_FLOOR as usize); // grow
731        // Seek far away → next fill is floor-sized again.
732        serve(&mut ra, &mut fake, 12 * 1024 * 1024, 4096);
733        let seek_fill_len = fake.reads.last().unwrap().1 as u64;
734        assert_eq!(seek_fill_len, WINDOW_FLOOR, "seek resets to floor");
735    }
736
737    #[test]
738    fn window_clamps_to_backing_len_at_eof() {
739        let mut fake = Fake::new(700 * 1024); // smaller than abs cap
740        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
741        // Read near EOF: requested range valid, but a full window would overrun.
742        let out = serve(&mut ra, &mut fake, 680 * 1024, 20 * 1024);
743        assert_eq!(out, fake.data[680 * 1024..700 * 1024]);
744        let (off, len) = fake.reads[0];
745        assert!(
746            off + len as u64 <= 700 * 1024,
747            "fill must not read past EOF"
748        );
749    }
750
751    #[test]
752    fn read_straddling_eof_errors_not_panics() {
753        let mut fake = Fake::new(1000);
754        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
755        let backing_len = fake.data.len() as u64;
756        // Range ends past EOF: off=990, len=20 → 1010 > 1000.
757        let mut dst = vec![0u8; 20];
758        let err = ra
759            .read_into(&mut dst, 990, backing_len, |b, o| fake.fill(b, o))
760            .expect_err("a read past EOF must fail, not panic");
761        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
762        // A read ending exactly at EOF is still valid.
763        let ok = ra.read_into(&mut dst, 980, backing_len, |b, o| fake.fill(b, o));
764        assert!(ok.is_ok(), "off+len == backing_len is in range");
765    }
766}
767
768#[cfg(test)]
769mod pool_budget_tests {
770    use super::*;
771    use std::sync::{Arc, Mutex};
772
773    #[test]
774    fn disabled_pool_grants_nothing_and_reports_disabled() {
775        let pool = ReadAheadPool::new(0);
776        assert!(!pool.enabled());
777        assert_eq!(pool.per_stream_cap(), 0);
778    }
779
780    #[test]
781    fn per_stream_cap_is_budget_over_divisor_capped_by_abs() {
782        // 16 MiB budget / 4 = 4 MiB, below the 8 MiB abs cap.
783        let pool = ReadAheadPool::new(16 * 1024 * 1024);
784        assert_eq!(pool.per_stream_cap(), 4 * 1024 * 1024);
785        // Huge budget → abs cap wins.
786        let big = ReadAheadPool::new(1024 * 1024 * 1024);
787        assert_eq!(big.per_stream_cap(), WINDOW_ABS_CAP);
788    }
789
790    #[test]
791    fn poisoned_buffer_lock_recovers_and_uncharges() {
792        let pool = ReadAheadPool::new(WINDOW_ABS_CAP);
793        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
794        let (old, new) = buf.lock().unwrap().store_window(0, vec![0u8; 4096]);
795        pool.reconcile(old, new);
796        assert_eq!(pool.charged(), 4096);
797
798        // Poison the buffer mutex from a panicking thread, mirroring a reader that
799        // panicked mid-mutation while holding the lock.
800        let b2 = Arc::clone(&buf);
801        let _ = std::thread::spawn(move || {
802            let _g = b2.lock().unwrap();
803            panic!("poison the read-ahead buffer");
804        })
805        .join();
806        assert!(buf.is_poisoned());
807
808        // Recovery resets the buffer to empty and uncharges its bytes instead of
809        // panicking; poison is cleared so normal locking resumes.
810        assert_eq!(lock_buf_or_clear(&buf, &pool).len(), 0);
811        assert!(!buf.is_poisoned(), "poison cleared after recovery");
812        assert_eq!(pool.charged(), 0, "cleared bytes uncharged from the budget");
813    }
814
815    #[test]
816    fn permitted_window_grants_up_to_budget_then_clamps() {
817        let pool = ReadAheadPool::new(4 * 1024 * 1024);
818        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
819        pool.register(1, Arc::clone(&buf));
820        // Grow from 0 → 1 MiB: permitted fully, then charge the actual delta.
821        assert_eq!(pool.permitted_window(1, 0, 1024 * 1024), 1024 * 1024);
822        pool.reconcile(0, 1024 * 1024);
823        // charged is now 1 MiB; the per-stream cap is budget/4 = 1 MiB, so a
824        // request for 8 MiB is first capped to 1 MiB (== old_len) → no growth.
825        assert_eq!(
826            pool.permitted_window(1, 1024 * 1024, 8 * 1024 * 1024),
827            1024 * 1024
828        );
829    }
830}
831
832#[cfg(test)]
833mod eviction_tests {
834    use super::*;
835    use std::sync::{Arc, Mutex};
836
837    /// Build a buffer holding exactly `bytes` real backing bytes and register it
838    /// with the pool, charging the pool for those bytes (mirrors what a real miss
839    /// does via permitted_window + reconcile). `bytes` must be >= WINDOW_FLOOR for
840    /// the stored window to equal `bytes`.
841    fn register_filled(pool: &ReadAheadPool, key: usize, bytes: usize) -> Arc<Mutex<ReadAhead>> {
842        let arc = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
843        let data = vec![7u8; bytes * 2];
844        let mut dst = vec![0u8; bytes];
845        let (old, new) = arc
846            .lock()
847            .unwrap()
848            .read_into(&mut dst, 0, (bytes * 2) as u64, |b, _| {
849                b.copy_from_slice(&data[..b.len()]);
850                Ok(())
851            })
852            .unwrap();
853        pool.register(key, Arc::clone(&arc));
854        pool.reconcile(old, new);
855        arc
856    }
857
858    #[test]
859    fn permitted_window_evicts_coldest_other_stream_under_pressure() {
860        // Budget 4 MiB, per-stream cap 1 MiB. Fill the budget with four 1 MiB
861        // streams (registered keys 1..4, so key 1 is coldest), then a fifth stream
862        // wants to grow → must evict the coldest (key 1).
863        let pool = ReadAheadPool::new(4 * 1024 * 1024);
864        let mib = 1024 * 1024usize;
865        let cold = register_filled(&pool, 1, mib);
866        register_filled(&pool, 2, mib);
867        register_filled(&pool, 3, mib);
868        register_filled(&pool, 4, mib);
869        // Budget is now full (4 x 1 MiB). A fresh hot stream wants 1 MiB.
870        let hot = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
871        pool.register(5, Arc::clone(&hot));
872        let granted = pool.permitted_window(5, 0, pool.per_stream_cap());
873        assert_eq!(granted, mib as u64, "eviction frees room for the full cap");
874        assert_eq!(cold.lock().unwrap().len(), 0, "coldest stream was evicted");
875    }
876
877    #[test]
878    fn locked_victim_is_skipped_not_blocked() {
879        let pool = ReadAheadPool::new(4 * 1024 * 1024);
880        let mib = 1024 * 1024;
881        let victim = register_filled(&pool, 1, mib);
882        register_filled(&pool, 2, mib);
883        register_filled(&pool, 3, mib);
884        register_filled(&pool, 4, mib);
885        // Hold the coldest victim's lock: eviction must skip it (try_lock), not hang.
886        let _held = victim.lock().unwrap();
887        let hot = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
888        pool.register(5, Arc::clone(&hot));
889        // Returns promptly: evicts the next-coldest unlocked victim instead.
890        let granted = pool.permitted_window(5, 0, pool.per_stream_cap());
891        assert!(granted > 0 && granted <= pool.per_stream_cap());
892    }
893
894    #[cfg(test)]
895    mod backing_reader_tests {
896        use super::*;
897        use std::io::Write;
898        use std::os::unix::fs::FileExt;
899        use std::sync::{Arc, Mutex};
900
901        #[expect(clippy::cast_possible_truncation)]
902        fn temp_file(len: usize) -> (tempfile::TempDir, std::fs::File, Vec<u8>) {
903            let dir = tempfile::tempdir().unwrap();
904            let path = dir.path().join("backing.bin");
905            let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();
906            std::fs::File::create(&path)
907                .unwrap()
908                .write_all(&data)
909                .unwrap();
910            let f = std::fs::File::open(&path).unwrap();
911            (dir, f, data)
912        }
913
914        #[test]
915        fn sequential_reads_collapse_to_one_pread_per_window() {
916            let (_d, file, data) = temp_file(4 * 1024 * 1024);
917            let pool = ReadAheadPool::new(64 * 1024 * 1024);
918            let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
919            pool.register(1, Arc::clone(&buf));
920            let backing_len = data.len() as u64;
921            let epoch = std::sync::atomic::AtomicU64::new(0);
922            let br = BackingReader::new(&file, &buf, &pool, 1, backing_len, &epoch);
923            let mut out = vec![0u8; 64 * 1024];
924            #[expect(clippy::cast_possible_truncation)]
925            for chunk in 0..16u64 {
926                br.read_exact_at(&mut out, chunk * 64 * 1024).unwrap();
927                assert_eq!(out, data[(chunk * 64 * 1024) as usize..][..64 * 1024]);
928            }
929            assert!(
930                br.fills() < 16,
931                "read-ahead must collapse preads, got {}",
932                br.fills()
933            );
934        }
935
936        #[test]
937        fn bytes_match_direct_pread_for_random_access() {
938            let (_d, file, data) = temp_file(2 * 1024 * 1024);
939            let pool = ReadAheadPool::new(64 * 1024 * 1024);
940            let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
941            pool.register(1, Arc::clone(&buf));
942            let epoch = std::sync::atomic::AtomicU64::new(0);
943            let br = BackingReader::new(&file, &buf, &pool, 1, data.len() as u64, &epoch);
944            for &(off, len) in &[
945                (0u64, 100usize),
946                (1_000_000, 4096),
947                (5000, 700),
948                (2_097_000, 152),
949            ] {
950                let mut a = vec![0u8; len];
951                br.read_exact_at(&mut a, off).unwrap();
952                let mut b = vec![0u8; len];
953                file.read_exact_at(&mut b, off).unwrap();
954                assert_eq!(a, b, "read-ahead byte mismatch at {off}+{len}");
955            }
956        }
957    }
958}
959
960#[cfg(test)]
961mod ring_tests {
962    use super::*;
963
964    #[test]
965    fn default_ring_holds_one_window_like_phase1() {
966        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
967        let data = (0..2 * 1024 * 1024u64)
968            .map(|i| (i % 251) as u8)
969            .collect::<Vec<u8>>();
970        let blen = data.len() as u64;
971        let mut dst = vec![0u8; 4096];
972        ra.read_into(&mut dst, 0, blen, |b, o| {
973            #[expect(clippy::cast_possible_truncation)]
974            let o = o as usize;
975            b.copy_from_slice(&data[o..][..b.len()]);
976            Ok(())
977        })
978        .unwrap();
979        ra.read_into(&mut dst, 1_000_000, blen, |b, o| {
980            #[expect(clippy::cast_possible_truncation)]
981            let o = o as usize;
982            b.copy_from_slice(&data[o..][..b.len()]);
983            Ok(())
984        })
985        .unwrap();
986        assert!(
987            !ra.covers(0, 4096),
988            "single-window ring evicts the old window"
989        );
990        assert!(ra.covers(1_000_000, 4096));
991    }
992
993    #[test]
994    fn ring_of_two_keeps_current_and_prefetched_window() {
995        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
996        ra.set_max_windows(2);
997        ra.store_window(1024 * 1024, vec![9u8; 512 * 1024]);
998        let mut dst = vec![0u8; 4096];
999        ra.read_into(&mut dst, 0, 4 * 1024 * 1024, |b, _| {
1000            b.fill(1u8);
1001            Ok(())
1002        })
1003        .unwrap();
1004        assert!(ra.covers(0, 4096), "current window present");
1005        assert!(
1006            ra.covers(1024 * 1024, 4096),
1007            "prefetched window NOT clobbered"
1008        );
1009    }
1010
1011    /// Regression for the `cap / cap == 1` tautology: depth must track the live
1012    /// adaptive window, fanning out when the window is small and collapsing to 1
1013    /// as it grows to `cap` — and the in-flight prefetch (`depth * window`) must
1014    /// stay within one per-stream budget share.
1015    #[test]
1016    fn prefetch_depth_tracks_window_and_bounds_inflight() {
1017        let cap = 8 * 1024 * 1024;
1018        // Fresh / just-seeked: floor-sized window → fan out (16 → clamp 4),
1019        // NOT 1 (which the old cap/cap formula always produced).
1020        assert_eq!(prefetch_depth(cap, WINDOW_FLOOR), 4);
1021        assert_eq!(prefetch_depth(cap, 2 * 1024 * 1024), 4);
1022        assert_eq!(prefetch_depth(cap, 4 * 1024 * 1024), 2);
1023        // Window grown to the cap → a single next-window suffices.
1024        assert_eq!(prefetch_depth(cap, cap), 1);
1025        // Degenerate inputs never panic or return 0.
1026        assert_eq!(prefetch_depth(cap, 0), 4);
1027        assert_eq!(prefetch_depth(cap, cap * 2), 1);
1028        // In-flight bytes stay within one budget share across the growth curve.
1029        for w in [WINDOW_FLOOR, 1 << 20, 2 << 20, 4 << 20, cap] {
1030            assert!(prefetch_depth(cap, w) * w <= cap, "overshoot at window {w}");
1031        }
1032    }
1033
1034    /// A sequential stream must not re-request windows it already dispatched:
1035    /// after the initial fan-out, each subsequent read enqueues only the newly
1036    /// exposed tail (no overlapping preads).
1037    #[test]
1038    fn plan_prefetch_dedups_sequential_dispatch() {
1039        let win = WINDOW_FLOOR;
1040        let cap = 8 * 1024 * 1024;
1041        let depth = prefetch_depth(cap, win); // 4
1042        let blen = 100 * 1024 * 1024;
1043        // First read ends at `win`; fan out `depth` windows ahead.
1044        let (s1, w1) = plan_prefetch(0, win, win, depth, blen);
1045        assert_eq!(s1, vec![win, 2 * win, 3 * win, 4 * win]);
1046        assert_eq!(w1, 5 * win);
1047        // Reader advanced ~half a window. The already-requested windows are
1048        // skipped — only the single newly-exposed window is enqueued.
1049        let start2 = win + win / 2;
1050        let (s2, w2) = plan_prefetch(w1, start2, win, depth, blen);
1051        assert_eq!(s2, vec![5 * win], "must not re-dispatch buffered windows");
1052        assert_eq!(w2, 6 * win);
1053    }
1054
1055    #[test]
1056    fn plan_prefetch_seek_resets_watermark() {
1057        let win = WINDOW_FLOOR;
1058        let depth = 4;
1059        let blen = 100 * 1024 * 1024;
1060        let (_s, w) = plan_prefetch(0, 10 * win, win, depth, blen);
1061        // Seek backward, well before the watermark: dispatch restarts at `start`.
1062        let (s2, w2) = plan_prefetch(w, 2 * win, win, depth, blen);
1063        assert_eq!(s2.first(), Some(&(2 * win)));
1064        assert_eq!(w2, 6 * win);
1065    }
1066
1067    #[test]
1068    fn plan_prefetch_clamps_to_backing_len() {
1069        let win = WINDOW_FLOOR;
1070        let blen = 3 * win + 100;
1071        let (s, w) = plan_prefetch(0, win, win, 4, blen);
1072        assert!(s.iter().all(|&x| x < blen), "no job starts past EOF");
1073        assert!(w <= blen, "watermark clamped to EOF");
1074    }
1075
1076    #[test]
1077    fn len_sums_all_windows_and_clear_drops_all() {
1078        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1079        ra.set_max_windows(3);
1080        ra.store_window(0, vec![0u8; 100]);
1081        ra.store_window(1000, vec![0u8; 200]);
1082        assert_eq!(ra.len(), 300);
1083        assert_eq!(ra.clear(), 300);
1084        assert_eq!(ra.len(), 0);
1085    }
1086}
1087
1088#[cfg(test)]
1089mod prefetch_store_tests {
1090    use super::*;
1091    use std::sync::atomic::AtomicU64;
1092
1093    #[test]
1094    fn store_with_stale_epoch_is_discarded() {
1095        let pool = ReadAheadPool::new(64 * 1024 * 1024);
1096        let ra = Arc::new(Mutex::new(ReadAhead::new(WINDOW_ABS_CAP)));
1097        let epoch = AtomicU64::new(0);
1098        epoch.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
1099        assert!(!try_store_prefetch(&pool, &ra, &epoch, 0, 0, vec![1, 2, 3]));
1100        assert_eq!(ra.lock().unwrap().len(), 0);
1101    }
1102
1103    #[test]
1104    fn store_with_current_epoch_is_accepted_and_charges_budget() {
1105        let pool = ReadAheadPool::new(64 * 1024 * 1024);
1106        let ra = Arc::new(Mutex::new(ReadAhead::new(WINDOW_ABS_CAP)));
1107        ra.lock().unwrap().set_max_windows(2);
1108        let epoch = AtomicU64::new(5);
1109        assert!(try_store_prefetch(
1110            &pool,
1111            &ra,
1112            &epoch,
1113            5,
1114            1000,
1115            vec![0u8; 4096]
1116        ));
1117        assert!(ra.lock().unwrap().covers(1000, 4096));
1118        // The stored window is charged against the global budget.
1119        assert_eq!(pool.charged(), 4096);
1120    }
1121
1122    #[test]
1123    fn pool_reports_its_budget() {
1124        assert_eq!(ReadAheadPool::new(64 * 1024).budget(), 64 * 1024);
1125        assert_eq!(ReadAheadPool::new(0).budget(), 0);
1126        assert_eq!(ReadAheadPool::new(0).charged(), 0);
1127    }
1128
1129    /// Regression: a prefetched window must charge the budget so that the
1130    /// subsequent uncharge on eviction/release cannot drive `charged` negative
1131    /// (an underflow that silently disabled read-ahead process-wide).
1132    #[test]
1133    #[expect(clippy::cast_possible_truncation)]
1134    fn prefetch_charge_survives_release_without_underflow() {
1135        let pool = ReadAheadPool::new(2 * 1024 * 1024); // per-stream cap 512K
1136        let cap = pool.per_stream_cap();
1137        let buf = Arc::new(Mutex::new(ReadAhead::new(cap)));
1138        buf.lock().unwrap().set_max_windows(2);
1139        pool.register(1, Arc::clone(&buf));
1140        // Sync read fills + charges one cap-sized window at offset 0.
1141        let mut dst = vec![0u8; cap as usize];
1142        let (o, n) = buf
1143            .lock()
1144            .unwrap()
1145            .read_into(&mut dst, 0, 8 * 1024 * 1024, |b, _| {
1146                b.fill(1);
1147                Ok(())
1148            })
1149            .unwrap();
1150        pool.reconcile(o, n);
1151        // Prefetch a second cap-sized window — now charged via try_store_prefetch.
1152        let epoch = Epoch::new(0);
1153        assert!(try_store_prefetch(
1154            &pool,
1155            &buf,
1156            &epoch,
1157            0,
1158            cap,
1159            vec![2u8; cap as usize]
1160        ));
1161        assert_eq!(pool.charged(), 2 * cap);
1162        pool.deregister(1);
1163        assert_eq!(pool.charged(), 0, "release must not underflow");
1164        // A fresh stream still gets its full grant — read-ahead is not disabled.
1165        let hot = Arc::new(Mutex::new(ReadAhead::new(cap)));
1166        pool.register(2, Arc::clone(&hot));
1167        assert_eq!(pool.permitted_window(2, 0, cap), cap);
1168    }
1169
1170    /// Under budget pressure, prefetch declines (no free room) rather than
1171    /// evicting a live stream or overshooting the budget.
1172    #[test]
1173    fn prefetch_declines_when_budget_full() {
1174        let pool = ReadAheadPool::new(1024 * 1024);
1175        assert!(pool.has_room_for(512 * 1024));
1176        pool.reconcile(0, 1024 * 1024); // fill the budget
1177        assert!(!pool.has_room_for(1));
1178    }
1179}
1180
1181#[cfg(test)]
1182mod prefetch_worker_tests {
1183    use super::*;
1184    use std::io::Write;
1185    use std::sync::{Arc, Mutex};
1186
1187    #[test]
1188    fn prefetch_fills_next_window_for_a_sequential_stream() {
1189        let dir = tempfile::tempdir().unwrap();
1190        let path = dir.path().join("b.bin");
1191        let data: Vec<u8> = (0u64..8 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1192        std::fs::File::create(&path)
1193            .unwrap()
1194            .write_all(&data)
1195            .unwrap();
1196        let file = Arc::new(std::fs::File::open(&path).unwrap());
1197
1198        let pool = Arc::new(ReadAheadPool::new(64 * 1024 * 1024));
1199        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1200        let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1201        pool.register(1, Arc::clone(&buf));
1202
1203        PrefetchWorkers::run_job(PrefetchJob {
1204            ctx: Arc::new(PrefetchContext {
1205                file: Arc::clone(&file),
1206                buf: Arc::clone(&buf),
1207                pool: Arc::clone(&pool),
1208                epoch: Arc::clone(&epoch),
1209                dispatched_epoch: 0,
1210                len: 1024 * 1024,
1211                backing_len: data.len() as u64,
1212            }),
1213            start: 1024 * 1024,
1214        });
1215        let mut out = vec![0u8; 4096];
1216        let mut ra = buf.lock().unwrap();
1217        let mut fills = 0;
1218        ra.read_into(&mut out, 1024 * 1024, data.len() as u64, |_, _| {
1219            fills += 1;
1220            Ok(())
1221        })
1222        .unwrap();
1223        assert_eq!(fills, 0, "prefetched window should serve without a pread");
1224        assert_eq!(out, data[1024 * 1024..1024 * 1024 + 4096]);
1225    }
1226
1227    /// A job whose target buffer lock is already poisoned must not panic the
1228    /// worker thread (which would silently drain the pool). `run_job` recovers
1229    /// the poison via `lock_buf_or_clear` and still stores the window.
1230    #[test]
1231    fn run_job_recovers_a_poisoned_buffer_lock() {
1232        let dir = tempfile::tempdir().unwrap();
1233        let path = dir.path().join("p.bin");
1234        let data: Vec<u8> = (0u64..8 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1235        std::fs::File::create(&path)
1236            .unwrap()
1237            .write_all(&data)
1238            .unwrap();
1239        let file = Arc::new(std::fs::File::open(&path).unwrap());
1240
1241        let pool = Arc::new(ReadAheadPool::new(64 * 1024 * 1024));
1242        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1243        let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1244        pool.register(1, Arc::clone(&buf));
1245
1246        let b2 = Arc::clone(&buf);
1247        let _ = std::thread::spawn(move || {
1248            let _g = b2.lock().unwrap();
1249            panic!("poison the read-ahead buffer");
1250        })
1251        .join();
1252        assert!(buf.is_poisoned());
1253
1254        // Previously `buf.lock().unwrap()` in `try_store_prefetch` would panic here.
1255        PrefetchWorkers::run_job(PrefetchJob {
1256            ctx: Arc::new(PrefetchContext {
1257                file: Arc::clone(&file),
1258                buf: Arc::clone(&buf),
1259                pool: Arc::clone(&pool),
1260                epoch: Arc::clone(&epoch),
1261                dispatched_epoch: 0,
1262                len: 1024 * 1024,
1263                backing_len: data.len() as u64,
1264            }),
1265            start: 1024 * 1024,
1266        });
1267        assert!(!buf.is_poisoned(), "poison cleared by the recovering lock");
1268        assert!(pool.charged() > 0, "window stored after recovery");
1269    }
1270
1271    /// The worker-pool path the single-stream test skips: a job pushed through
1272    /// `request` must reach one of the recv-ing worker threads and run, storing
1273    /// the window. Guards the crossbeam hand-off (#430).
1274    #[test]
1275    fn dispatched_job_reaches_a_worker_and_fills_the_buffer() {
1276        let dir = tempfile::tempdir().unwrap();
1277        let path = dir.path().join("w.bin");
1278        let data: Vec<u8> = (0u64..8 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1279        std::fs::File::create(&path)
1280            .unwrap()
1281            .write_all(&data)
1282            .unwrap();
1283        let file = Arc::new(std::fs::File::open(&path).unwrap());
1284
1285        let pool = Arc::new(ReadAheadPool::new(64 * 1024 * 1024));
1286        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1287        let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1288        pool.register(1, Arc::clone(&buf));
1289
1290        let workers = PrefetchWorkers::new(2);
1291        workers.request(PrefetchJob {
1292            ctx: Arc::new(PrefetchContext {
1293                file: Arc::clone(&file),
1294                buf: Arc::clone(&buf),
1295                pool: Arc::clone(&pool),
1296                epoch: Arc::clone(&epoch),
1297                dispatched_epoch: 0,
1298                len: 1024 * 1024,
1299                backing_len: data.len() as u64,
1300            }),
1301            start: 1024 * 1024,
1302        });
1303
1304        // Hand-off is asynchronous; poll until the worker has charged the stored
1305        // window. Bounded so a broken hand-off fails fast rather than hanging.
1306        let step = std::time::Duration::from_millis(5);
1307        let mut waited = std::time::Duration::ZERO;
1308        while pool.charged() == 0 && waited < std::time::Duration::from_secs(5) {
1309            std::thread::sleep(step);
1310            waited += step;
1311        }
1312        assert!(pool.charged() > 0, "dispatched job never reached a worker");
1313
1314        let mut out = vec![0u8; 4096];
1315        let mut ra = buf.lock().unwrap();
1316        let mut fills = 0;
1317        ra.read_into(&mut out, 1024 * 1024, data.len() as u64, |_, _| {
1318            fills += 1;
1319            Ok(())
1320        })
1321        .unwrap();
1322        assert_eq!(
1323            fills, 0,
1324            "worker-prefetched window should serve without a pread"
1325        );
1326        assert_eq!(out, data[1024 * 1024..1024 * 1024 + 4096]);
1327    }
1328}
1329
1330#[cfg(test)]
1331mod concurrency_tests {
1332    use super::*;
1333    use std::io::Write;
1334    use std::os::unix::fs::FileExt;
1335    use std::sync::Arc;
1336
1337    #[test]
1338    fn concurrent_reads_same_handle_match_oracle() {
1339        let dir = tempfile::tempdir().unwrap();
1340        let path = dir.path().join("concurrent.bin");
1341        #[expect(clippy::cast_sign_loss)]
1342        let data: Vec<u8> = (0..4 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1343        std::fs::File::create(&path)
1344            .unwrap()
1345            .write_all(&data)
1346            .unwrap();
1347        let file = Arc::new(std::fs::File::open(&path).unwrap());
1348
1349        let pool = Arc::new(ReadAheadPool::new(64 * 1024 * 1024));
1350        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1351        let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1352        pool.register(1, Arc::clone(&buf));
1353
1354        let backing_len = data.len() as u64;
1355        let num_threads: u64 = 8;
1356        let reads_per_thread = 200;
1357
1358        std::thread::scope(|s| {
1359            for tid in 0..num_threads {
1360                let file = Arc::clone(&file);
1361                let buf = Arc::clone(&buf);
1362                let pool = Arc::clone(&pool);
1363                let epoch = Arc::clone(&epoch);
1364                s.spawn(move || {
1365                    let br = BackingReader::new(&file, &buf, &pool, 1, backing_len, &epoch);
1366                    let mut rng_state: u64 = tid * 7919;
1367                    for _ in 0..reads_per_thread {
1368                        rng_state = rng_state
1369                            .wrapping_mul(6_364_136_223_846_793_005)
1370                            .wrapping_add(1);
1371                        let off = rng_state % (backing_len.saturating_sub(4096).max(1));
1372                        #[expect(clippy::cast_possible_truncation)]
1373                        let len = 4096usize.min((backing_len - off) as usize);
1374                        let mut got = vec![0u8; len];
1375                        br.read_exact_at(&mut got, off).unwrap();
1376                        let mut expected = vec![0u8; len];
1377                        file.read_exact_at(&mut expected, off).unwrap();
1378                        assert_eq!(
1379                            got, expected,
1380                            "mismatch at off={off} len={len} from thread {tid}"
1381                        );
1382                    }
1383                });
1384            }
1385        });
1386    }
1387
1388    /// Stress the Phase-2 surface single-stream tests miss: worker-side
1389    /// `run_job` (store + budget `reconcile`) racing reads, while a second
1390    /// contended stream forces cross-buffer `try_lock` eviction — all hammering
1391    /// the shared `charged` budget. Asserts byte-correctness; run under TSan for
1392    /// race detection.
1393    #[test]
1394    #[expect(clippy::cast_possible_truncation)]
1395    fn workers_and_eviction_race_reads_without_corruption() {
1396        let dir = tempfile::tempdir().unwrap();
1397        let path = dir.path().join("race.bin");
1398        let data: Vec<u8> = (0u64..4 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1399        std::fs::File::create(&path)
1400            .unwrap()
1401            .write_all(&data)
1402            .unwrap();
1403        let file = Arc::new(std::fs::File::open(&path).unwrap());
1404        let backing_len = data.len() as u64;
1405
1406        // Small budget: the worker's has_room_for gate bites and stream 2's
1407        // misses must evict stream 1 to make room.
1408        let pool = Arc::new(ReadAheadPool::new(2 * 1024 * 1024));
1409        let window = pool.per_stream_cap();
1410        let buf1 = Arc::new(Mutex::new(ReadAhead::new(window)));
1411        buf1.lock().unwrap().set_max_windows(4);
1412        let buf2 = Arc::new(Mutex::new(ReadAhead::new(window)));
1413        let epoch1 = Arc::new(std::sync::atomic::AtomicU64::new(0));
1414        let epoch2 = Arc::new(std::sync::atomic::AtomicU64::new(0));
1415        pool.register(1, Arc::clone(&buf1));
1416        pool.register(2, Arc::clone(&buf2));
1417
1418        std::thread::scope(|s| {
1419            // Stream 1: one sequential reader (epoch stays stable, so the
1420            // prefetched windows actually store + reconcile).
1421            {
1422                let (file, buf1, pool, epoch1, data) = (&file, &buf1, &pool, &epoch1, &data);
1423                s.spawn(move || {
1424                    let br = BackingReader::new(file, buf1, pool, 1, backing_len, epoch1);
1425                    let mut off = 0u64;
1426                    while off < backing_len {
1427                        let len = (64 * 1024).min((backing_len - off) as usize);
1428                        let mut got = vec![0u8; len];
1429                        br.read_exact_at(&mut got, off).unwrap();
1430                        assert_eq!(got, data[off as usize..off as usize + len]);
1431                        off += len as u64;
1432                    }
1433                });
1434            }
1435            // Two prefetchers driving real `run_job` (store + reconcile) for
1436            // stream 1, concurrent with its reader.
1437            for _ in 0..2 {
1438                let (file, buf1, pool, epoch1) = (&file, &buf1, &pool, &epoch1);
1439                s.spawn(move || {
1440                    for _ in 0..4 {
1441                        let mut start = 0u64;
1442                        while start < backing_len {
1443                            let ctx = Arc::new(PrefetchContext {
1444                                file: Arc::clone(file),
1445                                buf: Arc::clone(buf1),
1446                                pool: Arc::clone(pool),
1447                                epoch: Arc::clone(epoch1),
1448                                dispatched_epoch: epoch1.load(std::sync::atomic::Ordering::Acquire),
1449                                len: window,
1450                                backing_len,
1451                            });
1452                            PrefetchWorkers::run_job(PrefetchJob { ctx, start });
1453                            start += window;
1454                        }
1455                    }
1456                });
1457            }
1458            // Stream 2: random-offset readers → misses → permitted_window →
1459            // cross-buffer eviction of stream 1 (try_lock), churning `charged`.
1460            for tid in 0..4u64 {
1461                let (file, buf2, pool, epoch2, data) = (&file, &buf2, &pool, &epoch2, &data);
1462                s.spawn(move || {
1463                    let br = BackingReader::new(file, buf2, pool, 2, backing_len, epoch2);
1464                    let mut rng: u64 = tid * 7919 + 1;
1465                    for _ in 0..300 {
1466                        rng = rng.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
1467                        let off = rng % backing_len.saturating_sub(4096).max(1);
1468                        let len = 4096usize.min((backing_len - off) as usize);
1469                        let mut got = vec![0u8; len];
1470                        br.read_exact_at(&mut got, off).unwrap();
1471                        assert_eq!(got, data[off as usize..off as usize + len]);
1472                    }
1473                });
1474            }
1475        });
1476    }
1477
1478    /// Regression for the deregister deadlock: `deregister` takes `streams` then
1479    /// the buffer mutex, while a read holds the buffer mutex then blocking-locks
1480    /// `streams` via `permitted_window` → eviction. Racing them on the same key
1481    /// would deadlock if `deregister` held `streams` across the buffer lock.
1482    /// Asserts bytes stay correct and the run completes; the tsan job's
1483    /// deadlock detector flags a regression even without an actual hang.
1484    #[test]
1485    #[expect(clippy::cast_possible_truncation)]
1486    fn deregister_races_reads_without_deadlock() {
1487        let dir = tempfile::tempdir().unwrap();
1488        let path = dir.path().join("dereg.bin");
1489        let data: Vec<u8> = (0u64..2 * 1024 * 1024).map(|i| (i % 251) as u8).collect();
1490        std::fs::File::create(&path)
1491            .unwrap()
1492            .write_all(&data)
1493            .unwrap();
1494        let file = Arc::new(std::fs::File::open(&path).unwrap());
1495        let backing_len = data.len() as u64;
1496
1497        // Small budget so reads miss and run permitted_window → eviction, which
1498        // blocking-locks `streams` while the read holds the buffer mutex.
1499        let pool = Arc::new(ReadAheadPool::new(1024 * 1024));
1500        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1501        let epoch = Arc::new(std::sync::atomic::AtomicU64::new(0));
1502        pool.register(1, Arc::clone(&buf));
1503        // A second registered stream gives eviction a candidate to walk.
1504        let buf2 = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1505        pool.register(2, Arc::clone(&buf2));
1506
1507        std::thread::scope(|s| {
1508            for tid in 0..4u64 {
1509                let (file, buf, pool, epoch, data) = (&file, &buf, &pool, &epoch, &data);
1510                s.spawn(move || {
1511                    let br = BackingReader::new(file, buf, pool, 1, backing_len, epoch);
1512                    let mut rng = tid * 7919 + 1;
1513                    for _ in 0..300 {
1514                        rng = rng.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
1515                        let off = rng % backing_len.saturating_sub(4096).max(1);
1516                        let len = 4096usize.min((backing_len - off) as usize);
1517                        let mut got = vec![0u8; len];
1518                        br.read_exact_at(&mut got, off).unwrap();
1519                        assert_eq!(got, data[off as usize..off as usize + len]);
1520                    }
1521                });
1522            }
1523            // Churn registration of the SAME key the readers use — the path that
1524            // takes `streams` and then the buffer mutex.
1525            {
1526                let (pool, buf) = (&pool, &buf);
1527                s.spawn(move || {
1528                    for _ in 0..2000 {
1529                        pool.deregister(1);
1530                        pool.register(1, Arc::clone(buf));
1531                    }
1532                });
1533            }
1534        });
1535    }
1536
1537    /// Regression for the eviction/deregister budget leak (#536): `evict_one_coldest`
1538    /// clears a victim buffer and drops its lock BEFORE re-checking the registry to
1539    /// decide whether to uncharge. A `deregister` landing in that window removes the
1540    /// stream, then reads the already-cleared buffer's `len() == 0` and uncharges
1541    /// nothing — so the freed bytes are uncharged by neither side and leak from
1542    /// `charged` permanently. Each cycle here charges then uncharges N bytes, so
1543    /// absent the race `charged` returns to 0; a single leaked race never recovers.
1544    #[test]
1545    #[expect(clippy::cast_possible_truncation)]
1546    fn eviction_racing_deregister_does_not_leak_charged() {
1547        use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1548        const N: u64 = 64 * 1024;
1549        let pool = Arc::new(ReadAheadPool::new(8 * 1024 * 1024));
1550        let victim_key = 2usize;
1551        let stop = Arc::new(AtomicBool::new(false));
1552        let cycles = Arc::new(AtomicU64::new(0));
1553
1554        std::thread::scope(|s| {
1555            // Churn: install a buffer holding N charged bytes under `victim_key`,
1556            // then deregister it. Charge-then-uncharge nets 0 absent a racing evict.
1557            {
1558                let (pool, stop, cycles) = (&pool, &stop, &cycles);
1559                s.spawn(move || {
1560                    while !stop.load(Ordering::Relaxed) {
1561                        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1562                        buf.lock().unwrap().store_window(0, vec![0u8; N as usize]);
1563                        pool.reconcile(0, N);
1564                        pool.register(victim_key, Arc::clone(&buf));
1565                        pool.deregister(victim_key);
1566                        cycles.fetch_add(1, Ordering::Relaxed);
1567                    }
1568                });
1569            }
1570            // Two evictors hammering the victim, racing the churn's deregister.
1571            for _ in 0..2 {
1572                let (pool, stop, cycles) = (&pool, &stop, &cycles);
1573                s.spawn(move || {
1574                    while cycles.load(Ordering::Relaxed) < 200_000 {
1575                        pool.evict_one_coldest(1);
1576                    }
1577                    stop.store(true, Ordering::Relaxed);
1578                });
1579            }
1580        });
1581
1582        // The victim is deregistered at the end of every cycle, so nothing is
1583        // registered now and `charged` must be exactly 0.
1584        assert_eq!(
1585            pool.charged(),
1586            0,
1587            "eviction racing deregister leaked charged budget"
1588        );
1589    }
1590}
1591
1592/// Focused unit tests that pin the read-ahead pool/buffer arithmetic and
1593/// accounting against the mutation gate (#255): each asserts an EXACT observable
1594/// (charged bytes, grant size, window size, fill/epoch counts) rather than a
1595/// loose range, so an operator/return mutation flips the assertion.
1596#[cfg(test)]
1597mod mutation_guard_tests {
1598    use super::*;
1599    use std::sync::atomic::Ordering as AO;
1600    use std::sync::{Arc, Mutex};
1601
1602    #[expect(clippy::unnecessary_wraps)]
1603    fn fillb(b: &mut [u8], _o: u64) -> io::Result<()> {
1604        b.fill(7);
1605        Ok(())
1606    }
1607
1608    #[expect(clippy::cast_possible_truncation)]
1609    fn bk_temp(len: usize) -> (tempfile::TempDir, std::fs::File) {
1610        use std::io::Write;
1611        let dir = tempfile::tempdir().unwrap();
1612        let path = dir.path().join("bk.bin");
1613        let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();
1614        std::fs::File::create(&path)
1615            .unwrap()
1616            .write_all(&data)
1617            .unwrap();
1618        (dir, std::fs::File::open(&path).unwrap())
1619    }
1620
1621    #[test]
1622    fn next_expected_equals_consumed_tail() {
1623        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1624        let mut dst = vec![0u8; 4096];
1625        ra.read_into(&mut dst, 1000, 8 << 20, fillb).unwrap();
1626        assert_eq!(ra.next_expected(), 1000 + 4096); // miss path: off + len
1627        let mut d2 = vec![0u8; 100];
1628        ra.read_into(&mut d2, 5096, 8 << 20, fillb).unwrap();
1629        assert_eq!(ra.next_expected(), 5096 + 100); // hit path: off + len
1630    }
1631
1632    #[test]
1633    fn window_doubles_on_sequential_miss_and_floors_on_seek() {
1634        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1635        let blen = 16 << 20;
1636        #[expect(clippy::cast_possible_truncation)]
1637        let mut dst = vec![0u8; WINDOW_FLOOR as usize];
1638        ra.read_into(&mut dst, 0, blen, fillb).unwrap();
1639        assert_eq!(ra.window(), WINDOW_FLOOR); // first miss
1640        ra.read_into(&mut dst, WINDOW_FLOOR, blen, fillb).unwrap();
1641        assert_eq!(ra.window(), WINDOW_FLOOR * 2); // sequential miss doubles
1642        ra.read_into(&mut dst, 12 << 20, blen, fillb).unwrap();
1643        assert_eq!(ra.window(), WINDOW_FLOOR); // seek resets to floor
1644    }
1645
1646    #[test]
1647    fn set_cap_clamps_window_down_only() {
1648        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1649        ra.set_cap(WINDOW_FLOOR / 2);
1650        assert_eq!(ra.window(), WINDOW_FLOOR / 2); // window > cap → clamped
1651        ra.set_cap(WINDOW_ABS_CAP);
1652        assert_eq!(ra.window(), WINDOW_FLOOR / 2); // window < cap → untouched
1653    }
1654
1655    #[test]
1656    fn reconcile_charges_growth_and_uncharges_shrink() {
1657        let pool = ReadAheadPool::new(64 << 20);
1658        pool.reconcile(0, 1000);
1659        assert_eq!(pool.charged(), 1000);
1660        pool.reconcile(1000, 250); // shrink must uncharge by old-new
1661        assert_eq!(pool.charged(), 250);
1662        pool.reconcile(250, 250); // equal: no change
1663        assert_eq!(pool.charged(), 250);
1664    }
1665
1666    #[test]
1667    fn has_room_for_zero_need_is_false_when_disabled() {
1668        assert!(!ReadAheadPool::new(0).has_room_for(0)); // budget==0 guard
1669        let p = ReadAheadPool::new(1 << 20);
1670        p.reconcile(0, 1 << 20);
1671        assert!(!p.has_room_for(1));
1672    }
1673
1674    #[test]
1675    fn permitted_window_need_is_relative_to_old_len() {
1676        // budget 2 MiB → cap 512K. Charge all but one cap (nothing registered, so
1677        // nothing is evictable), leaving room == cap. A stream at old_len = cap/4
1678        // asks for cap: need = cap − cap/4 = 384K ≤ room → full grant. If `need`
1679        // were `cap + cap/4` (640K > room) it would fall through to the clamp and
1680        // return old_len + room (640K) instead.
1681        let pool = ReadAheadPool::new(2 << 20);
1682        let cap = pool.per_stream_cap();
1683        pool.reconcile(0, (2 << 20) - cap); // room == cap
1684        assert_eq!(pool.permitted_window(2, cap / 4, cap), cap);
1685    }
1686
1687    #[test]
1688    fn permitted_window_clamps_to_room_when_nothing_evictable() {
1689        // budget 2 MiB → cap 512K. Charge to leave room == 256K. A stream at
1690        // old_len = 128K asks for cap: need = 384K > room, nothing evictable, so
1691        // the grant clamps to old_len + room = 384K. `old_len − room` underflows.
1692        let pool = ReadAheadPool::new(2 << 20);
1693        let cap = pool.per_stream_cap();
1694        pool.reconcile(0, (2 << 20) - 256 * 1024); // room == 256K
1695        assert_eq!(
1696            pool.permitted_window(2, 128 * 1024, cap),
1697            128 * 1024 + 256 * 1024
1698        );
1699    }
1700
1701    #[test]
1702    #[expect(clippy::cast_possible_truncation)]
1703    fn touch_keeps_a_stream_off_the_eviction_block() {
1704        let pool = ReadAheadPool::new(2 << 20); // cap 512K, holds 4 streams
1705        let cap = pool.per_stream_cap();
1706        let mk = |key: usize| {
1707            let arc = Arc::new(Mutex::new(ReadAhead::new(cap)));
1708            let mut d = vec![0u8; cap as usize];
1709            let (o, n) = arc
1710                .lock()
1711                .unwrap()
1712                .read_into(&mut d, 0, cap * 4, fillb)
1713                .unwrap();
1714            pool.register(key, Arc::clone(&arc));
1715            pool.reconcile(o, n);
1716            arc
1717        };
1718        let s1 = mk(1);
1719        let s2 = mk(2);
1720        let _s3 = mk(3);
1721        let _s4 = mk(4); // budget full (4×cap)
1722        pool.touch(1); // stream 1 now most-recent → stream 2 is coldest
1723        let hot = Arc::new(Mutex::new(ReadAhead::new(cap)));
1724        pool.register(5, Arc::clone(&hot));
1725        pool.permitted_window(5, 0, cap); // must evict the coldest OTHER (stream 2)
1726        assert_eq!(s2.lock().unwrap().len(), 0, "coldest stream evicted");
1727        assert!(s1.lock().unwrap().len() > 0, "touched stream survives");
1728    }
1729
1730    #[test]
1731    fn fills_count_is_exact() {
1732        let (_d, file) = bk_temp(256 * 1024);
1733        let mut d = vec![0u8; 4096];
1734        // Disabled pool: every read is one physical fill.
1735        let pool0 = ReadAheadPool::new(0);
1736        let buf0 = Arc::new(Mutex::new(ReadAhead::new(0)));
1737        let ep0 = std::sync::atomic::AtomicU64::new(0);
1738        let br0 = BackingReader::new(&file, &buf0, &pool0, 0, 256 * 1024, &ep0);
1739        br0.read_exact_at(&mut d, 0).unwrap();
1740        br0.read_exact_at(&mut d, 100_000).unwrap();
1741        assert_eq!(br0.fills(), 2);
1742        // Enabled pool: a cold read is one amplified fill; the next sequential
1743        // read hits the window and adds no fill.
1744        let pool = ReadAheadPool::new(64 << 20);
1745        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1746        pool.register(1, Arc::clone(&buf));
1747        let ep = std::sync::atomic::AtomicU64::new(0);
1748        let br = BackingReader::new(&file, &buf, &pool, 1, 256 * 1024, &ep);
1749        br.read_exact_at(&mut d, 0).unwrap();
1750        assert_eq!(br.fills(), 1);
1751        br.read_exact_at(&mut d, 4096).unwrap();
1752        assert_eq!(br.fills(), 1);
1753    }
1754
1755    #[test]
1756    fn epoch_bumps_on_seek_not_on_sequential() {
1757        let (_d, file) = bk_temp(1 << 20);
1758        let pool = ReadAheadPool::new(64 << 20);
1759        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1760        pool.register(1, Arc::clone(&buf));
1761        let ep = Arc::new(std::sync::atomic::AtomicU64::new(0));
1762        let br = BackingReader::new(&file, &buf, &pool, 1, 1 << 20, &ep);
1763        let mut d = vec![0u8; 4096];
1764        br.read_exact_at(&mut d, 0).unwrap(); // first read: a "seek" off MAX
1765        let base = ep.load(AO::Relaxed);
1766        br.read_exact_at(&mut d, 4096).unwrap(); // sequential hit
1767        assert_eq!(ep.load(AO::Relaxed), base, "sequential must not bump epoch");
1768        br.read_exact_at(&mut d, 900_000).unwrap(); // genuine seek (miss, off != next)
1769        assert_eq!(ep.load(AO::Relaxed), base + 1, "seek bumps epoch");
1770    }
1771
1772    #[test]
1773    fn ring_trim_evicts_window_fully_behind_the_reader() {
1774        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1775        ra.set_max_windows(2);
1776        ra.store_window(0, vec![0u8; 1000]); // [0, 1000)
1777        ra.store_window(1000, vec![0u8; 1000]); // [1000, 2000)
1778        // Advance the reader to 2000 so both windows are fully behind it.
1779        let mut dst = vec![0u8; 10];
1780        ra.read_into(&mut dst, 1990, 1 << 20, fillb).unwrap(); // hit → next_expected = 2000
1781        // A third window forces a trim: the victim is the fully-behind window with
1782        // the smallest start ([0,1000)) — NOT the just-stored ahead window. The
1783        // `<=` filter (`start+len <= frontier`) selects which windows are behind.
1784        ra.store_window(2000, vec![0u8; 1000]); // [2000, 3000)
1785        assert!(!ra.covers(0, 10), "fully-behind window evicted");
1786        assert!(ra.covers(1000, 10), "nearer behind window kept");
1787        assert!(ra.covers(2000, 10), "just-stored ahead window kept");
1788    }
1789
1790    #[test]
1791    fn plan_prefetch_no_jobs_when_watermark_at_horizon() {
1792        // Watermark already at the horizon (start + depth*window): nothing new to
1793        // dispatch. The `s > horizon` reset guard must NOT fire at s == horizon
1794        // (`>=` would reset to start and re-queue the whole horizon).
1795        let win = WINDOW_FLOOR;
1796        let depth = 4;
1797        let start = 10 * win;
1798        let horizon = start + depth * win;
1799        let (starts, upto) = plan_prefetch(horizon, start, win, depth, 100 << 20);
1800        assert!(starts.is_empty(), "caught up to horizon → no jobs");
1801        assert_eq!(upto, horizon);
1802    }
1803
1804    #[test]
1805    fn plan_prefetch_window_zero_leaves_watermark_unchanged() {
1806        // window == 0 short-circuits via the `||` early-out, returning the
1807        // watermark UNCHANGED. With `&&` it would fall through and return `start`.
1808        let (starts, upto) = plan_prefetch(500, 1000, 0, 4, 1 << 20);
1809        assert!(starts.is_empty());
1810        assert_eq!(upto, 500);
1811    }
1812
1813    #[test]
1814    fn cached_len_replaces_same_start_window() {
1815        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1816        ra.store_window(100, vec![0u8; 1000]);
1817        assert_eq!(ra.len(), 1000);
1818        // Re-storing at the same start must subtract the old window's bytes and
1819        // add the new ones — not sum them, scale either, or leave the count
1820        // stale. A single window of 2500 bytes remains.
1821        ra.store_window(100, vec![0u8; 2500]);
1822        assert_eq!(ra.len(), 2500);
1823    }
1824
1825    #[test]
1826    fn cached_len_drops_evicted_window_bytes() {
1827        let mut ra = ReadAhead::new(WINDOW_ABS_CAP);
1828        ra.set_max_windows(2);
1829        ra.store_window(0, vec![0u8; 1000]);
1830        ra.store_window(1000, vec![0u8; 1000]);
1831        assert_eq!(ra.len(), 2000);
1832        let mut dst = vec![0u8; 10];
1833        ra.read_into(&mut dst, 1990, 1 << 20, fillb).unwrap(); // hit → next_expected = 2000
1834        // The third window trims one fully-behind window; len() must drop exactly
1835        // the evicted window's bytes (3000 stored − 1000 evicted == 2000).
1836        ra.store_window(2000, vec![0u8; 1000]);
1837        assert_eq!(ra.len(), 2000);
1838    }
1839
1840    #[test]
1841    fn prefetch_plan_reports_captured_frontier_and_window() {
1842        let (_d, file) = bk_temp(256 * 1024);
1843        let pool = ReadAheadPool::new(64 << 20);
1844        let buf = Arc::new(Mutex::new(ReadAhead::new(pool.per_stream_cap())));
1845        pool.register(1, Arc::clone(&buf));
1846        let ep = std::sync::atomic::AtomicU64::new(0);
1847        let br =
1848            BackingReader::new(&file, &buf, &pool, 1, 256 * 1024, &ep).with_prefetch_planning();
1849        let mut d = vec![0u8; 4096];
1850        br.read_exact_at(&mut d, 0).unwrap();
1851        // The read above captured the post-read frontier (off + len) and the
1852        // first-miss window (the floor); `prefetch_plan` must return those exact
1853        // values, not a constant placeholder.
1854        assert_eq!(br.prefetch_plan(), (4096, WINDOW_FLOOR));
1855    }
1856}