Skip to main content

hexz_core/api/
file.rs

1//! High-level snapshot file API and logical stream types.
2
3use crate::algo::compression::{Compressor, create_compressor};
4use crate::algo::encryption::Encryptor;
5use crate::cache::buffer_pool::BufferPool;
6use crate::cache::lru::{BlockCache, ShardedPageCache};
7use crate::cache::prefetch::Prefetcher;
8use crate::format::header::Header;
9use crate::format::index::{BlockInfo, IndexPage, MasterIndex, PageEntry};
10use crate::format::magic::{HEADER_SIZE, MAGIC_BYTES};
11use crate::format::version::{VersionCompatibility, check_version, compatibility_message};
12use crate::store::StorageBackend;
13use bytes::Bytes;
14use crc32fast::hash as crc32_hash;
15use std::mem::MaybeUninit;
16use std::ptr;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Mutex};
19
20use hexz_common::constants::{BLOCK_OFFSET_PARENT, DEFAULT_BLOCK_SIZE};
21use hexz_common::{Error, Result};
22use rayon::prelude::*;
23
24/// A factory function that opens a parent snapshot by path.
25///
26/// Provided by the caller of [`File::with_cache_and_loader`] so that the
27/// core read API has no hard dependency on any specific storage backend
28/// implementation. Storage crates supply a concrete loader; callers that
29/// know parents cannot exist may pass `None`.
30pub type ParentLoader = Box<dyn Fn(&str) -> Result<Arc<File>> + Send + Sync>;
31
32/// Shared zero block for the default block size to avoid allocating when returning zero blocks.
33static ZEROS_64K: [u8; DEFAULT_BLOCK_SIZE as usize] = [0u8; DEFAULT_BLOCK_SIZE as usize];
34
35/// Work item for block decompression: (block_idx, info, buf_offset, offset_in_block, to_copy)
36type WorkItem = (u64, BlockInfo, usize, usize, usize);
37
38/// Result of fetching a block from cache or storage.
39///
40/// Eliminates TOCTOU races by tracking data state at fetch time rather than
41/// re-checking the cache later (which can give a different answer if a
42/// background prefetch thread modifies the cache between check and use).
43enum FetchResult {
44    /// Data is already decompressed (came from L1 cache or is a zero block).
45    Decompressed(Bytes),
46    /// Data is raw compressed bytes from storage (needs decompression).
47    Compressed(Bytes),
48}
49
50/// Logical stream identifier for dual-stream snapshots.
51///
52/// Hexz snapshots can store two independent data streams:
53/// - **Primary**: Persistent storage (disk image, filesystem data, or main tensor weights)
54/// - **Secondary**: Volatile state (RAM contents, process memory, or auxiliary data)
55///
56/// # Example
57///
58/// ```ignore
59/// use hexz_core::{File, SnapshotStream};
60/// # use std::sync::Arc;
61/// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
62/// // Read 4KB from primary stream
63/// let data = snapshot.read_at(SnapshotStream::Primary, 0, 4096)?;
64///
65/// // Read 4KB from secondary stream (if present)
66/// let aux = snapshot.read_at(SnapshotStream::Secondary, 0, 4096)?;
67/// # Ok(())
68/// # }
69/// ```
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71#[repr(u8)]
72pub enum SnapshotStream {
73    /// Persistent primary stream (formerly Disk)
74    Primary = 0,
75    /// Volatile secondary stream (formerly Memory)
76    Secondary = 1,
77}
78
79/// Read-only interface for accessing Hexz snapshot data.
80///
81/// `File` is the primary API for reading compressed, block-indexed snapshots.
82/// It handles:
83/// - Block-level decompression with LRU caching
84/// - Optional AES-256-GCM decryption
85/// - Thin snapshot parent chaining
86/// - Dual-stream access (disk and memory)
87/// - Random access with minimal I/O
88///
89/// # Thread Safety
90///
91/// `File` is `Send + Sync` and can be safely shared across threads via `Arc`.
92/// Internal caches use `Mutex` for synchronization.
93///
94/// # Performance
95///
96/// - **Cache hit latency**: ~80μs (warm cache)
97/// - **Cache miss latency**: ~1ms (cold cache, local storage)
98/// - **Sequential throughput**: ~2-3 GB/s (NVMe + LZ4)
99/// - **Memory overhead**: ~150MB typical (configurable)
100///
101/// # Examples
102///
103/// ## Basic Usage
104///
105/// ```ignore
106/// use hexz_core::{File, SnapshotStream};
107/// use hexz_store::local::FileBackend;
108/// use hexz_core::algo::compression::lz4::Lz4Compressor;
109/// use std::sync::Arc;
110///
111/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
112/// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
113/// let compressor = Box::new(Lz4Compressor::new());
114/// let snapshot = File::new(backend, compressor, None)?;
115///
116/// // Read 4KB at offset 1MB
117/// let data = snapshot.read_at(SnapshotStream::Primary, 1024 * 1024, 4096)?;
118/// assert_eq!(data.len(), 4096);
119/// # Ok(())
120/// # }
121/// ```
122///
123/// ## Thin Snapshots (with parent)
124///
125/// ```ignore
126/// use hexz_core::File;
127/// use hexz_store::local::FileBackend;
128/// use hexz_core::algo::compression::lz4::Lz4Compressor;
129/// use std::sync::Arc;
130///
131/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
132/// // Open base snapshot
133/// let base_backend = Arc::new(FileBackend::new("base.hxz".as_ref())?);
134/// let base = File::new(
135///     base_backend,
136///     Box::new(Lz4Compressor::new()),
137///     None
138/// )?;
139///
140/// // The thin snapshot will automatically load its parent based on
141/// // the parent_paths field in the header
142/// let thin_backend = Arc::new(FileBackend::new("incremental.hxz".as_ref())?);
143/// let thin = File::new(
144///     thin_backend,
145///     Box::new(Lz4Compressor::new()),
146///     None
147/// )?;
148///
149/// // Reads automatically fall back to base for unchanged blocks
150/// let data = thin.read_at(hexz_core::SnapshotStream::Primary, 0, 4096)?;
151/// # Ok(())
152/// # }
153/// ```
154pub struct File {
155    /// Snapshot metadata (sizes, compression, encryption settings)
156    pub header: Header,
157
158    /// Master index containing top-level page entries
159    pub(crate) master: MasterIndex,
160
161    /// Storage backend for reading raw snapshot data
162    backend: Arc<dyn StorageBackend>,
163
164    /// Compression algorithm (LZ4 or Zstandard)
165    compressor: Box<dyn Compressor>,
166
167    /// Optional encryption (AES-256-GCM)
168    encryptor: Option<Box<dyn Encryptor>>,
169
170    /// Optional parent snapshot for thin (incremental) snapshots.
171    /// When a block's offset is BLOCK_OFFSET_PARENT, data is fetched from parent.
172    parents: Vec<Arc<File>>,
173
174    /// LRU cache for decompressed blocks (per-stream, per-block-index)
175    cache_l1: BlockCache,
176
177    /// Sharded LRU cache for deserialized index pages
178    page_cache: ShardedPageCache,
179
180    /// Optional prefetcher for background data loading
181    prefetcher: Option<Prefetcher>,
182
183    /// Reusable buffer pool for decompression output buffers.
184    /// Reduces allocator contention in parallel decompression and avoids
185    /// repeated alloc/dealloc of identically-sized buffers in serial reads.
186    decompress_pool: BufferPool,
187}
188
189impl File {
190    /// Opens a Hexz snapshot with default cache settings.
191    ///
192    /// This is the primary constructor for `File`. It:
193    /// 1. Reads and validates the snapshot header (magic bytes, version)
194    /// 2. Deserializes the master index
195    /// 3. Recursively loads parent snapshots (for thin snapshots)
196    /// 4. Initializes block and page caches
197    ///
198    /// # Parameters
199    ///
200    /// - `backend`: Storage backend (local file, HTTP, S3, etc.)
201    /// - `compressor`: Compression algorithm matching the snapshot format
202    /// - `encryptor`: Optional decryption handler (pass `None` for unencrypted snapshots)
203    ///
204    /// # Returns
205    ///
206    /// - `Ok(File)` on success
207    /// - `Err(Error::Format)` if magic bytes or version are invalid
208    /// - `Err(Error::Io)` if storage backend fails
209    ///
210    /// # Examples
211    ///
212    /// ```ignore
213    /// use hexz_core::{File, SnapshotStream};
214    /// use hexz_store::local::FileBackend;
215    /// use hexz_core::algo::compression::lz4::Lz4Compressor;
216    /// use std::sync::Arc;
217    ///
218    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
219    /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
220    /// let compressor = Box::new(Lz4Compressor::new());
221    /// let snapshot = File::new(backend, compressor, None)?;
222    ///
223    /// println!("Primary size: {} bytes", snapshot.size(SnapshotStream::Primary));
224    /// # Ok(())
225    /// # }
226    /// ```
227    /// Opens a snapshot, auto-detecting compression and dictionary from the header.
228    ///
229    /// This eliminates the 3-step boilerplate of: read header, load dict, create
230    /// compressor. Equivalent to `File::new(backend, auto_compressor, encryptor)`.
231    pub fn open(
232        backend: Arc<dyn StorageBackend>,
233        encryptor: Option<Box<dyn Encryptor>>,
234    ) -> Result<Arc<Self>> {
235        Self::open_with_cache(backend, encryptor, None, None)
236    }
237
238    /// Like [`open`](Self::open) but with custom cache and prefetch settings.
239    pub fn open_with_cache(
240        backend: Arc<dyn StorageBackend>,
241        encryptor: Option<Box<dyn Encryptor>>,
242        cache_capacity_bytes: Option<usize>,
243        prefetch_window_size: Option<u32>,
244    ) -> Result<Arc<Self>> {
245        Self::open_with_cache_and_loader(
246            backend,
247            encryptor,
248            cache_capacity_bytes,
249            prefetch_window_size,
250            None,
251        )
252    }
253
254    /// Like [`open_with_cache`](Self::open_with_cache) but accepts an optional parent loader.
255    pub fn open_with_cache_and_loader(
256        backend: Arc<dyn StorageBackend>,
257        encryptor: Option<Box<dyn Encryptor>>,
258        cache_capacity_bytes: Option<usize>,
259        prefetch_window_size: Option<u32>,
260        parent_loader: Option<&ParentLoader>,
261    ) -> Result<Arc<Self>> {
262        let header = Header::read_from_backend(backend.as_ref())?;
263        let dictionary = header.load_dictionary(backend.as_ref())?;
264        let compressor = create_compressor(header.compression, None, dictionary);
265        Self::with_cache_and_loader(
266            backend,
267            compressor,
268            encryptor,
269            cache_capacity_bytes,
270            prefetch_window_size,
271            parent_loader,
272        )
273    }
274
275    pub fn new(
276        backend: Arc<dyn StorageBackend>,
277        compressor: Box<dyn Compressor>,
278        encryptor: Option<Box<dyn Encryptor>>,
279    ) -> Result<Arc<Self>> {
280        Self::with_cache(backend, compressor, encryptor, None, None)
281    }
282
283    /// Opens a Hexz snapshot with custom cache capacity and prefetching.
284    ///
285    /// Identical to [`new`](Self::new) but allows specifying cache size and prefetch window.
286    ///
287    /// # Parameters
288    ///
289    /// - `backend`: Storage backend
290    /// - `compressor`: Compression algorithm
291    /// - `encryptor`: Optional decryption handler
292    /// - `cache_capacity_bytes`: Block cache size in bytes (default: ~400MB for 4KB blocks)
293    /// - `prefetch_window_size`: Number of blocks to prefetch ahead (default: disabled)
294    ///
295    /// # Cache Sizing
296    ///
297    /// The cache stores decompressed blocks. Given a block size of 4KB:
298    /// - `Some(100_000_000)` → ~24,000 blocks (~96MB effective)
299    /// - `None` → 1000 blocks (~4MB effective)
300    ///
301    /// Larger caches reduce repeated decompression but increase memory usage.
302    ///
303    /// # Prefetching
304    ///
305    /// When `prefetch_window_size` is set, the system will automatically fetch the next N blocks
306    /// in the background after each read, optimizing sequential access patterns:
307    /// - `Some(4)` → Prefetch 4 blocks ahead
308    /// - `None` or `Some(0)` → Disable prefetching
309    ///
310    /// # Examples
311    ///
312    /// ```ignore
313    /// use hexz_core::File;
314    /// use hexz_store::local::FileBackend;
315    /// use hexz_core::algo::compression::lz4::Lz4Compressor;
316    /// use std::sync::Arc;
317    ///
318    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
319    /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
320    /// let compressor = Box::new(Lz4Compressor::new());
321    ///
322    /// // Allocate 256MB for cache, prefetch 4 blocks ahead
323    /// let snapshot = File::with_cache(
324    ///     backend,
325    ///     compressor,
326    ///     None,
327    ///     Some(256 * 1024 * 1024),
328    ///     Some(4)
329    /// )?;
330    /// # Ok(())
331    /// # }
332    /// ```
333    pub fn with_cache(
334        backend: Arc<dyn StorageBackend>,
335        compressor: Box<dyn Compressor>,
336        encryptor: Option<Box<dyn Encryptor>>,
337        cache_capacity_bytes: Option<usize>,
338        prefetch_window_size: Option<u32>,
339    ) -> Result<Arc<Self>> {
340        Self::with_cache_and_loader(
341            backend,
342            compressor,
343            encryptor,
344            cache_capacity_bytes,
345            prefetch_window_size,
346            None,
347        )
348    }
349
350    /// Like [`with_cache`](Self::with_cache) but accepts an optional parent loader.
351    ///
352    /// The `parent_loader` is called for each path in the snapshot's `parent_paths`
353    /// header field. Passing `None` disables automatic parent chaining — reads that
354    /// fall through to a parent will return an error unless no parent paths are present.
355    ///
356    /// This is the primary constructor used by `hexz-store` to supply a
357    /// `FileBackend`-based loader without coupling this crate to any backend impl.
358    pub fn with_cache_and_loader(
359        backend: Arc<dyn StorageBackend>,
360        compressor: Box<dyn Compressor>,
361        encryptor: Option<Box<dyn Encryptor>>,
362        cache_capacity_bytes: Option<usize>,
363        prefetch_window_size: Option<u32>,
364        parent_loader: Option<&ParentLoader>,
365    ) -> Result<Arc<Self>> {
366        let header_bytes = backend.read_exact(0, HEADER_SIZE)?;
367        let header: Header = bincode::deserialize(&header_bytes)?;
368
369        if &header.magic != MAGIC_BYTES {
370            return Err(Error::Format("Invalid magic bytes".into()));
371        }
372
373        // Check version compatibility
374        let compatibility = check_version(header.version);
375        match compatibility {
376            VersionCompatibility::Full => {
377                // Perfect match, proceed silently
378            }
379            VersionCompatibility::Degraded => {
380                // Newer version, issue warning but allow
381                tracing::warn!("{}", compatibility_message(header.version));
382            }
383            VersionCompatibility::Incompatible => {
384                // Too old or too new, reject
385                return Err(Error::Format(compatibility_message(header.version)));
386            }
387        }
388
389        let file_len = backend.len();
390        if header.index_offset >= file_len {
391            return Err(Error::Format(format!(
392                "index_offset ({}) is at or past end of file ({})",
393                header.index_offset, file_len
394            )));
395        }
396        let index_bytes = backend.read_exact(
397            header.index_offset,
398            (file_len - header.index_offset) as usize,
399        )?;
400
401        let master: MasterIndex = bincode::deserialize(&index_bytes)?;
402
403        // Recursively load parent snapshots if a loader is provided.
404        let mut parents = Vec::new();
405        if let Some(loader) = parent_loader {
406            for parent_path in &header.parent_paths {
407                tracing::info!("Loading parent snapshot: {}", parent_path);
408                parents.push(loader(parent_path)?);
409            }
410        } else if !header.parent_paths.is_empty() {
411            tracing::warn!(
412                "Snapshot has {} parent path(s) but no parent_loader was provided; \
413                 parent-reference blocks will not be resolvable.",
414                header.parent_paths.len()
415            );
416        }
417
418        let block_size = header.block_size as usize;
419        let l1_capacity = if let Some(bytes) = cache_capacity_bytes {
420            (bytes / block_size).max(1)
421        } else {
422            1000
423        };
424
425        // Initialize prefetcher if window size is specified and > 0
426        let prefetcher = prefetch_window_size.filter(|&w| w > 0).map(Prefetcher::new);
427
428        // Size the decompression buffer pool: one buffer per rayon thread plus
429        // a few extra for the serial path and prefetch thread.
430        let pool_size = rayon::current_num_threads() + 2;
431
432        Ok(Arc::new(Self {
433            header,
434            master,
435            backend,
436            compressor,
437            encryptor,
438            parents,
439            cache_l1: BlockCache::with_capacity(l1_capacity),
440            page_cache: ShardedPageCache::default(),
441            prefetcher,
442            decompress_pool: BufferPool::new(pool_size),
443        }))
444    }
445
446    /// Returns the logical size of a stream in bytes.
447    ///
448    /// # Parameters
449    ///
450    /// - `stream`: The stream to query (Disk or Memory)
451    ///
452    /// # Returns
453    ///
454    /// The uncompressed, logical size of the stream. This is the size you would
455    /// get if you decompressed all blocks and concatenated them.
456    ///
457    /// # Examples
458    ///
459    /// ```ignore
460    /// use hexz_core::{File, SnapshotStream};
461    /// # use std::sync::Arc;
462    /// # fn example(snapshot: Arc<File>) {
463    /// let disk_bytes = snapshot.size(SnapshotStream::Primary);
464    /// let mem_bytes = snapshot.size(SnapshotStream::Secondary);
465    ///
466    /// println!("Primary: {} GB", disk_bytes / (1024 * 1024 * 1024));
467    /// println!("Secondary: {} MB", mem_bytes / (1024 * 1024));
468    /// # }
469    /// ```
470    /// Returns the total number of prefetch operations spawned since this file was opened.
471    /// Returns 0 if prefetching is disabled.
472    pub fn prefetch_spawn_count(&self) -> u64 {
473        self.prefetcher.as_ref().map_or(0, |p| p.spawn_count())
474    }
475
476    pub fn size(&self, stream: SnapshotStream) -> u64 {
477        match stream {
478            SnapshotStream::Primary => self.master.primary_size,
479            SnapshotStream::Secondary => self.master.secondary_size,
480        }
481    }
482
483    /// Iterates all non-sparse block hashes for the given stream.
484    ///
485    /// Used by `hexz-ops` to build a `ParentIndex` for cross-file deduplication
486    /// without requiring access to private fields.
487    pub fn iter_block_hashes(&self, stream: SnapshotStream) -> Result<Vec<[u8; 32]>> {
488        let pages = match stream {
489            SnapshotStream::Primary => &self.master.primary_pages,
490            SnapshotStream::Secondary => &self.master.secondary_pages,
491        };
492        let mut hashes = Vec::new();
493        for page_entry in pages {
494            let page = self.get_page(page_entry)?;
495            for block_info in &page.blocks {
496                if !block_info.is_sparse() && block_info.hash != [0u8; 32] {
497                    hashes.push(block_info.hash);
498                }
499            }
500        }
501        Ok(hashes)
502    }
503
504    /// Returns the block metadata for a given logical offset.
505    pub fn get_block_info(
506        &self,
507        stream: SnapshotStream,
508        offset: u64,
509    ) -> Result<Option<(u64, BlockInfo)>> {
510        let pages = match stream {
511            SnapshotStream::Primary => &self.master.primary_pages,
512            SnapshotStream::Secondary => &self.master.secondary_pages,
513        };
514
515        if pages.is_empty() {
516            return Ok(None);
517        }
518
519        let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
520            Ok(idx) => idx,
521            Err(idx) => idx.saturating_sub(1),
522        };
523
524        let page_entry = &pages[page_idx];
525        let page = self.get_page(page_entry)?;
526        let mut block_logical_start = page_entry.start_logical;
527
528        for (i, block) in page.blocks.iter().enumerate() {
529            let block_end = block_logical_start + block.logical_len as u64;
530            if offset >= block_logical_start && offset < block_end {
531                let global_idx = page_entry.start_block + i as u64;
532                return Ok(Some((global_idx, *block)));
533            }
534            block_logical_start = block_end;
535        }
536
537        Ok(None)
538    }
539
540    /// Reads data from a snapshot stream at a given offset.
541    ///
542    /// This is the primary read method for random access. It:
543    /// 1. Identifies which blocks overlap the requested range
544    /// 2. Fetches blocks from cache or decompresses from storage
545    /// 3. Handles thin snapshot fallback to parent
546    /// 4. Assembles the final buffer from block slices
547    ///
548    /// # Parameters
549    ///
550    /// - `stream`: Which stream to read from (Disk or Memory)
551    /// - `offset`: Starting byte offset (0-indexed)
552    /// - `len`: Number of bytes to read
553    ///
554    /// # Returns
555    ///
556    /// A `Vec<u8>` containing up to `len` bytes. The returned vector may be shorter
557    /// if:
558    /// - `offset` is beyond the stream size (returns empty vector)
559    /// - `offset + len` exceeds stream size (returns partial data)
560    ///
561    /// Missing data (sparse regions) is zero-filled.
562    ///
563    /// # Errors
564    ///
565    /// - `Error::Io` if backend read fails (e.g. truncated file)
566    /// - `Error::Corruption(block_idx)` if block checksum does not match
567    /// - `Error::Decompression` if block decompression fails
568    /// - `Error::Decryption` if block decryption fails
569    ///
570    /// # Performance
571    ///
572    /// - **Cache hit**: ~80μs latency, no I/O
573    /// - **Cache miss**: ~1ms latency (local storage), includes decompression
574    /// - **Remote storage**: Latency depends on network (HTTP: ~50ms, S3: ~100ms)
575    ///
576    /// Aligned reads (offset % block_size == 0) are most efficient.
577    ///
578    /// # Examples
579    ///
580    /// ```ignore
581    /// use hexz_core::{File, SnapshotStream};
582    /// # use std::sync::Arc;
583    /// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
584    /// // Read first 512 bytes of primary stream
585    /// let boot_sector = snapshot.read_at(SnapshotStream::Primary, 0, 512)?;
586    ///
587    /// // Read from arbitrary offset
588    /// let chunk = snapshot.read_at(SnapshotStream::Primary, 1024 * 1024, 4096)?;
589    ///
590    /// // Reading beyond stream size returns empty vector
591    /// let empty = snapshot.read_at(SnapshotStream::Primary, u64::MAX, 100)?;
592    /// assert!(empty.is_empty());
593    /// # Ok(())
594    /// # }
595    /// ```
596    /// Reads a byte range. Uses parallel block decompression when the range spans multiple blocks.
597    pub fn read_at(
598        self: &Arc<Self>,
599        stream: SnapshotStream,
600        offset: u64,
601        len: usize,
602    ) -> Result<Vec<u8>> {
603        let stream_size = self.size(stream);
604        if offset >= stream_size {
605            return Ok(Vec::new());
606        }
607        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
608        if actual_len == 0 {
609            return Ok(Vec::new());
610        }
611
612        let pages = match stream {
613            SnapshotStream::Primary => &self.master.primary_pages,
614            SnapshotStream::Secondary => &self.master.secondary_pages,
615        };
616
617        if pages.is_empty() {
618            for parent in &self.parents {
619                if parent.get_block_info(stream, offset)?.is_some() {
620                    return parent.read_at(stream, offset, actual_len);
621                }
622            }
623            return Ok(vec![0u8; actual_len]);
624        }
625
626        let mut buf: Vec<MaybeUninit<u8>> = Vec::with_capacity(actual_len);
627        // SAFETY: MaybeUninit<u8> does not require initialization; we only need
628        // the allocation. read_at_into_uninit writes every byte before we
629        // transmute to Vec<u8> below.
630        unsafe { buf.set_len(actual_len) };
631        self.read_at_into_uninit(stream, offset, &mut buf)?;
632        let ptr = buf.as_mut_ptr().cast::<u8>();
633        let len = buf.len();
634        let cap = buf.capacity();
635        std::mem::forget(buf);
636        // SAFETY: `buf` was a Vec<MaybeUninit<u8>> that we fully initialized via
637        // `read_at_into_uninit` (which writes every byte). We `forget` the original
638        // Vec to avoid a double-free and reconstruct it with the same ptr/len/cap.
639        // MaybeUninit<u8> has the same layout as u8.
640        Ok(unsafe { Vec::from_raw_parts(ptr, len, cap) })
641    }
642
643    /// Reads into a provided buffer. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
644    pub fn read_at_into(
645        self: &Arc<Self>,
646        stream: SnapshotStream,
647        offset: u64,
648        buffer: &mut [u8],
649    ) -> Result<()> {
650        let len = buffer.len();
651        if len == 0 {
652            return Ok(());
653        }
654        let stream_size = self.size(stream);
655        if offset >= stream_size {
656            buffer.fill(0);
657            return Ok(());
658        }
659        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
660        if actual_len < len {
661            buffer[actual_len..].fill(0);
662        }
663        self.read_at_into_uninit_bytes(stream, offset, &mut buffer[0..actual_len])
664    }
665
666    /// Minimum number of local blocks to use the parallel decompression path.
667    ///
668    /// Rayon task dispatch adds ~5-10μs overhead per task. With LZ4 at ~2GB/s,
669    /// a 64KB block decompresses in ~32μs. Below 4 blocks the overhead fraction
670    /// is significant; serial decompression avoids the synchronization cost.
671    const PARALLEL_MIN_BLOCKS: usize = 4;
672
673    /// Collects work items for blocks that need decompression.
674    ///
675    /// This method iterates through index pages and blocks, handling:
676    /// - Parent blocks: delegate to parent snapshot or zero-fill
677    /// - Zero blocks: zero-fill directly
678    /// - Regular blocks: add to work queue for later decompression
679    ///
680    /// Returns the work items to process and updates the tracking variables.
681    fn collect_work_items(
682        &self,
683        stream: SnapshotStream,
684        pages: &[PageEntry],
685        page_idx: usize,
686        target: &mut [MaybeUninit<u8>],
687        offset: u64,
688        actual_len: usize,
689    ) -> Result<(Vec<WorkItem>, usize)> {
690        let block_size = self.header.block_size.max(1) as usize;
691        let estimated_blocks = actual_len.div_ceil(block_size);
692        let mut local_work: Vec<WorkItem> = Vec::with_capacity(estimated_blocks);
693        let mut buf_offset = 0;
694        let mut current_pos = offset;
695        let mut remaining = actual_len;
696
697        for page_entry in pages.iter().skip(page_idx) {
698            if remaining == 0 {
699                break;
700            }
701            if page_entry.start_logical > current_pos + remaining as u64 {
702                break;
703            }
704
705            let page = self.get_page(page_entry)?;
706            let mut block_logical_start = page_entry.start_logical;
707
708            for (block_idx_in_page, block) in page.blocks.iter().enumerate() {
709                let block_end = block_logical_start + block.logical_len as u64;
710
711                if block_end > current_pos {
712                    let global_block_idx = page_entry.start_block + block_idx_in_page as u64;
713                    let offset_in_block = (current_pos - block_logical_start) as usize;
714                    let to_copy = std::cmp::min(
715                        remaining,
716                        (block.logical_len as usize).saturating_sub(offset_in_block),
717                    );
718
719                    if block.offset == BLOCK_OFFSET_PARENT {
720                        // Parent block: delegate to the parent that has it, or zero-fill
721                        let mut found = false;
722                        for parent in &self.parents {
723                            if parent.get_block_info(stream, current_pos)?.is_some() {
724                                let dest = &mut target[buf_offset..buf_offset + to_copy];
725                                parent.read_at_into_uninit(stream, current_pos, dest)?;
726                                found = true;
727                                break;
728                            }
729                        }
730                        if !found {
731                            Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
732                        }
733                        current_pos += to_copy as u64;
734                        buf_offset += to_copy;
735                        remaining -= to_copy;
736                    } else if block.length == 0 {
737                        // Zero block: fill with zeros
738                        Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
739                        current_pos += to_copy as u64;
740                        buf_offset += to_copy;
741                        remaining -= to_copy;
742                    } else {
743                        // Regular block: add to work queue
744                        if to_copy > 0 {
745                            local_work.push((
746                                global_block_idx,
747                                *block,
748                                buf_offset,
749                                offset_in_block,
750                                to_copy,
751                            ));
752                            buf_offset += to_copy;
753                            current_pos += to_copy as u64;
754                            remaining -= to_copy;
755                        }
756                    }
757
758                    if remaining == 0 {
759                        break;
760                    }
761                }
762                block_logical_start += block.logical_len as u64;
763            }
764        }
765
766        Ok((local_work, buf_offset))
767    }
768
769    /// Executes parallel decompression for multiple blocks.
770    ///
771    /// Uses a two-phase approach:
772    /// 1. Parallel I/O: Fetch all raw blocks concurrently
773    /// 2. Parallel CPU: Decompress and copy to target buffer
774    fn execute_parallel_decompression(
775        self: &Arc<Self>,
776        stream: SnapshotStream,
777        work_items: &[WorkItem],
778        target: &mut [MaybeUninit<u8>],
779        actual_len: usize,
780    ) -> Result<()> {
781        let snap = Arc::clone(self);
782        let target_addr = target.as_mut_ptr() as usize;
783
784        // Phase 1: Parallel fetch all raw blocks. Each result tracks whether
785        // the data is already decompressed (cache hit / zero block) or still
786        // compressed (storage read), eliminating a TOCTOU race where a background
787        // prefetch thread could modify the cache between fetch and decompression.
788        let raw_blocks: Vec<Result<FetchResult>> = work_items
789            .par_iter()
790            .map(|(block_idx, info, _, _, _)| snap.fetch_raw_block(stream, *block_idx, info))
791            .collect();
792
793        // Phase 2: Parallel decompress and copy
794        let has_error = AtomicBool::new(false);
795        let err: Mutex<Option<Error>> = Mutex::new(None);
796        work_items
797            .par_iter()
798            .zip(raw_blocks)
799            .for_each(|(work_item, fetch_result)| {
800                // Fast path: single atomic load (~1 cycle), no lock contention.
801                // Relaxed ordering is sufficient — this is a best-effort early-exit
802                // hint; the final err.lock() after the loop provides synchronization.
803                if has_error.load(Ordering::Relaxed) {
804                    return;
805                }
806
807                let (block_idx, info, buf_offset, offset_in_block, to_copy) = work_item;
808
809                // Handle fetch errors
810                let fetched = match fetch_result {
811                    Ok(r) => r,
812                    Err(e) => {
813                        has_error.store(true, Ordering::Relaxed);
814                        if let Ok(mut guard) = err.lock() {
815                            let _ = guard.replace(e);
816                        }
817                        return;
818                    }
819                };
820
821                // Use the FetchResult to determine if decompression is needed,
822                // rather than re-checking the cache (which could give a stale answer).
823                let data = match fetched {
824                    FetchResult::Decompressed(data) => data,
825                    FetchResult::Compressed(raw) => {
826                        match snap.decompress_and_verify(raw, *block_idx, info) {
827                            Ok(d) => {
828                                // Cache the result
829                                snap.cache_l1.insert(stream, *block_idx, d.clone());
830                                d
831                            }
832                            Err(e) => {
833                                has_error.store(true, Ordering::Relaxed);
834                                if let Ok(mut guard) = err.lock() {
835                                    let _ = guard.replace(e);
836                                }
837                                return;
838                            }
839                        }
840                    }
841                };
842
843                // Copy to target buffer
844                let src = data.as_ref();
845                let start = *offset_in_block;
846                let len = *to_copy;
847                if start < src.len() && len <= src.len() - start {
848                    // Defensive assertion: ensure destination write is within bounds
849                    debug_assert!(
850                        buf_offset + len <= actual_len,
851                        "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
852                        len,
853                        buf_offset,
854                        actual_len
855                    );
856                    let dest = (target_addr + buf_offset) as *mut u8;
857                    // SAFETY: `src[start..start+len]` is in-bounds (checked above).
858                    // `dest` points into the `target` MaybeUninit buffer at a unique
859                    // non-overlapping offset (each work item has a distinct `buf_offset`),
860                    // and the rayon par_iter ensures each item writes to a disjoint region.
861                    // The debug_assert above validates buf_offset + len <= actual_len.
862                    unsafe { ptr::copy_nonoverlapping(src[start..].as_ptr(), dest, len) };
863                }
864            });
865
866        if let Some(e) = err.lock().ok().and_then(|mut guard| guard.take()) {
867            return Err(e);
868        }
869
870        Ok(())
871    }
872
873    /// Executes serial decompression for a small number of blocks.
874    fn execute_serial_decompression(
875        &self,
876        stream: SnapshotStream,
877        work_items: &[WorkItem],
878        target: &mut [MaybeUninit<u8>],
879        actual_len: usize,
880    ) -> Result<()> {
881        for (block_idx, info, buf_offset, offset_in_block, to_copy) in work_items {
882            let fetched = self.fetch_raw_block(stream, *block_idx, info)?;
883            let data = match fetched {
884                FetchResult::Decompressed(data) => data,
885                FetchResult::Compressed(raw) => {
886                    let data = self.decompress_and_verify(raw, *block_idx, info)?;
887                    self.cache_l1.insert(stream, *block_idx, data.clone());
888                    data
889                }
890            };
891
892            let src = data.as_ref();
893            let start = *offset_in_block;
894            if start < src.len() && *to_copy <= src.len() - start {
895                debug_assert!(
896                    *buf_offset + *to_copy <= actual_len,
897                    "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
898                    to_copy,
899                    buf_offset,
900                    actual_len
901                );
902                // SAFETY: `src[start..start+to_copy]` is in-bounds (checked above).
903                // `target[buf_offset..]` has sufficient room because `buf_offset + to_copy`
904                // never exceeds `actual_len` (tracked during work-item collection).
905                // MaybeUninit<u8> has the same layout as u8.
906                unsafe {
907                    ptr::copy_nonoverlapping(
908                        src[start..].as_ptr(),
909                        target[*buf_offset..].as_mut_ptr() as *mut u8,
910                        *to_copy,
911                    );
912                }
913            }
914        }
915        Ok(())
916    }
917
918    /// Zero-fills a slice of uninitialized memory.
919    ///
920    /// This helper centralizes all unsafe zero-filling operations to improve
921    /// safety auditing and reduce code duplication.
922    ///
923    /// # Safety
924    ///
925    /// This function writes zeros to the provided buffer, making it fully initialized.
926    /// The caller must ensure the buffer is valid for writes.
927    #[inline]
928    fn zero_fill_uninit(buffer: &mut [MaybeUninit<u8>]) {
929        if !buffer.is_empty() {
930            // SAFETY: buffer is a valid &mut [MaybeUninit<u8>] slice, so writing
931            // buffer.len() zero bytes through its pointer is in-bounds.
932            unsafe { ptr::write_bytes(buffer.as_mut_ptr(), 0, buffer.len()) };
933        }
934    }
935
936    /// Writes into uninitialized memory. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
937    ///
938    /// **On error:** The buffer contents are undefined (possibly partially written).
939    pub fn read_at_into_uninit(
940        self: &Arc<Self>,
941        stream: SnapshotStream,
942        offset: u64,
943        buffer: &mut [MaybeUninit<u8>],
944    ) -> Result<()> {
945        self.read_at_uninit_inner(stream, offset, buffer, false)
946    }
947
948    /// Inner implementation of [`read_at_into_uninit`](Self::read_at_into_uninit).
949    ///
950    /// The `is_prefetch` flag prevents recursive prefetch thread spawning:
951    /// when `true`, the prefetch block is skipped to avoid unbounded thread creation.
952    fn read_at_uninit_inner(
953        self: &Arc<Self>,
954        stream: SnapshotStream,
955        offset: u64,
956        buffer: &mut [MaybeUninit<u8>],
957        is_prefetch: bool,
958    ) -> Result<()> {
959        // Early validation
960        let len = buffer.len();
961        if len == 0 {
962            return Ok(());
963        }
964
965        let stream_size = self.size(stream);
966        if offset >= stream_size {
967            Self::zero_fill_uninit(buffer);
968            return Ok(());
969        }
970
971        // Calculate actual read length and zero-fill suffix if needed
972        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
973        if actual_len < len {
974            Self::zero_fill_uninit(&mut buffer[actual_len..]);
975        }
976
977        let target = &mut buffer[0..actual_len];
978
979        // Get page list for stream
980        let pages = match stream {
981            SnapshotStream::Primary => &self.master.primary_pages,
982            SnapshotStream::Secondary => &self.master.secondary_pages,
983        };
984
985        // Delegate to parent if no index pages
986        if pages.is_empty() {
987            for parent in &self.parents {
988                if parent.get_block_info(stream, offset)?.is_some() {
989                    return parent.read_at_into_uninit(stream, offset, target);
990                }
991            }
992            Self::zero_fill_uninit(target);
993            return Ok(());
994        }
995
996        // Find starting page index
997        let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
998            Ok(idx) => idx,
999            Err(idx) => idx.saturating_sub(1),
1000        };
1001
1002        // Collect work items (handles parent blocks, zero blocks, and queues regular blocks)
1003        let (work_items, buf_offset) =
1004            self.collect_work_items(stream, pages, page_idx, target, offset, actual_len)?;
1005
1006        // Choose parallel or serial decompression based on work item count
1007        if work_items.len() >= Self::PARALLEL_MIN_BLOCKS {
1008            self.execute_parallel_decompression(stream, &work_items, target, actual_len)?;
1009        } else {
1010            self.execute_serial_decompression(stream, &work_items, target, actual_len)?;
1011        }
1012
1013        // Handle any remaining unprocessed data
1014        let remaining = actual_len - buf_offset;
1015        if remaining > 0 {
1016            let current_pos = offset + buf_offset as u64;
1017            let mut found = false;
1018            for parent in &self.parents {
1019                if parent.get_block_info(stream, current_pos)?.is_some() {
1020                    parent.read_at_into_uninit(stream, current_pos, &mut target[buf_offset..])?;
1021                    found = true;
1022                    break;
1023                }
1024            }
1025            if !found {
1026                Self::zero_fill_uninit(&mut target[buf_offset..]);
1027            }
1028        }
1029
1030        // Trigger prefetch for next sequential blocks if enabled.
1031        // Guards:
1032        // 1. `is_prefetch` prevents recursive spawning (prefetch thread spawning another)
1033        // 2. `try_start()` limits to one in-flight prefetch at a time, preventing
1034        //    unbounded thread creation under rapid sequential reads
1035        if let Some(prefetcher) = &self.prefetcher {
1036            if !is_prefetch && !work_items.is_empty() && prefetcher.try_start() {
1037                let next_offset = offset + actual_len as u64;
1038                let prefetch_len = (self.header.block_size * 4) as usize;
1039                let snap = Arc::clone(self);
1040                let stream_copy = stream;
1041                rayon::spawn(move || {
1042                    let _ = snap.warm_blocks(stream_copy, next_offset, prefetch_len);
1043                    // Release the in-flight guard so the next read can prefetch
1044                    if let Some(pf) = &snap.prefetcher {
1045                        pf.clear_in_flight();
1046                    }
1047                });
1048            }
1049        }
1050
1051        Ok(())
1052    }
1053
1054    /// Warms the block cache for the given byte range without allocating a target buffer.
1055    ///
1056    /// Unlike [`read_at_into_uninit`](Self::read_at_into_uninit), this method only fetches,
1057    /// decompresses, and inserts blocks into the L1 cache. It skips blocks that are already
1058    /// cached, zero-length, or parent-delegated. No output buffer is allocated or written to.
1059    ///
1060    /// Used by the prefetcher to reduce overhead: the old path allocated a throwaway buffer
1061    /// of `block_size * 4` bytes and copied decompressed data into it, only to discard it.
1062    fn warm_blocks(&self, stream: SnapshotStream, offset: u64, len: usize) -> Result<()> {
1063        if len == 0 {
1064            return Ok(());
1065        }
1066        let stream_size = self.size(stream);
1067        if offset >= stream_size {
1068            return Ok(());
1069        }
1070        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
1071
1072        let pages = match stream {
1073            SnapshotStream::Primary => &self.master.primary_pages,
1074            SnapshotStream::Secondary => &self.master.secondary_pages,
1075        };
1076        if pages.is_empty() {
1077            return Ok(());
1078        }
1079
1080        let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
1081            Ok(idx) => idx,
1082            Err(idx) => idx.saturating_sub(1),
1083        };
1084
1085        let mut current_pos = offset;
1086        let mut remaining = actual_len;
1087
1088        for page_entry in pages.iter().skip(page_idx) {
1089            if remaining == 0 {
1090                break;
1091            }
1092            if page_entry.start_logical > current_pos + remaining as u64 {
1093                break;
1094            }
1095
1096            let page = self.get_page(page_entry)?;
1097            let mut block_logical_start = page_entry.start_logical;
1098
1099            for (i, block) in page.blocks.iter().enumerate() {
1100                if remaining == 0 {
1101                    break;
1102                }
1103                let block_end = block_logical_start + block.logical_len as u64;
1104
1105                if block_end > current_pos {
1106                    let offset_in_block = (current_pos - block_logical_start) as usize;
1107                    let to_advance = std::cmp::min(
1108                        remaining,
1109                        (block.logical_len as usize).saturating_sub(offset_in_block),
1110                    );
1111
1112                    // Only warm regular blocks (skip parent-delegated and zero blocks).
1113                    // fetch_raw_block handles the cache check internally — on a hit it
1114                    // returns Decompressed which we simply ignore via the Compressed match.
1115                    if block.offset != BLOCK_OFFSET_PARENT && block.length > 0 {
1116                        let global_idx = page_entry.start_block + i as u64;
1117                        if let Ok(FetchResult::Compressed(raw)) =
1118                            self.fetch_raw_block(stream, global_idx, block)
1119                        {
1120                            if let Ok(data) = self.decompress_and_verify(raw, global_idx, block) {
1121                                self.cache_l1.insert(stream, global_idx, data);
1122                            }
1123                        }
1124                    }
1125
1126                    current_pos += to_advance as u64;
1127                    remaining -= to_advance;
1128                }
1129                block_logical_start += block.logical_len as u64;
1130            }
1131        }
1132
1133        Ok(())
1134    }
1135
1136    /// Like [`read_at_into_uninit`](Self::read_at_into_uninit) but accepts `&mut [u8]`. Use from FFI (e.g. Python).
1137    #[inline]
1138    pub fn read_at_into_uninit_bytes(
1139        self: &Arc<Self>,
1140        stream: SnapshotStream,
1141        offset: u64,
1142        buf: &mut [u8],
1143    ) -> Result<()> {
1144        if buf.is_empty() {
1145            return Ok(());
1146        }
1147        // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
1148        // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
1149        // The borrow is derived from `buf` so no aliasing occurs.
1150        let uninit = unsafe { &mut *(buf as *mut [u8] as *mut [MaybeUninit<u8>]) };
1151        self.read_at_into_uninit(stream, offset, uninit)
1152    }
1153
1154    /// Fetches an index page from cache or storage.
1155    ///
1156    /// Index pages map logical offsets to physical block locations. This method
1157    /// maintains an LRU cache to avoid repeated deserialization.
1158    ///
1159    /// # Parameters
1160    ///
1161    /// - `entry`: Page metadata from master index
1162    ///
1163    /// # Returns
1164    ///
1165    /// A shared reference to the deserialized index page.
1166    ///
1167    /// # Thread Safety
1168    ///
1169    /// This method acquires a lock on the page cache only for cache lookup and insertion.
1170    /// I/O and deserialization are performed without holding the lock to avoid blocking
1171    /// other threads during cache misses.
1172    pub(crate) fn get_page(&self, entry: &PageEntry) -> Result<Arc<IndexPage>> {
1173        // Fast path: check sharded cache
1174        if let Some(p) = self.page_cache.get(entry.offset) {
1175            return Ok(p);
1176        }
1177
1178        // Slow path: I/O and deserialization without holding any lock
1179        let bytes = self
1180            .backend
1181            .read_exact(entry.offset, entry.length as usize)?;
1182        let page: IndexPage = bincode::deserialize(&bytes)?;
1183        let arc = Arc::new(page);
1184
1185        // Check again in case another thread inserted while we were doing I/O
1186        if let Some(p) = self.page_cache.get(entry.offset) {
1187            return Ok(p);
1188        }
1189        self.page_cache.insert(entry.offset, arc.clone());
1190
1191        Ok(arc)
1192    }
1193
1194    /// Fetches raw compressed block data from cache or storage.
1195    ///
1196    /// This is the I/O portion of block resolution, separated to enable parallel I/O.
1197    /// It:
1198    /// 1. Checks the block cache
1199    /// 2. Handles zero-length blocks
1200    /// 3. Reads raw compressed data from backend
1201    ///
1202    /// # Parameters
1203    ///
1204    /// - `stream`: Stream identifier (for cache key)
1205    /// - `block_idx`: Global block index
1206    /// - `info`: Block metadata (offset, length)
1207    ///
1208    /// # Returns
1209    ///
1210    /// Raw block data (potentially compressed/encrypted) or cached decompressed data.
1211    fn fetch_raw_block(
1212        &self,
1213        stream: SnapshotStream,
1214        block_idx: u64,
1215        info: &BlockInfo,
1216    ) -> Result<FetchResult> {
1217        // Check cache first - return decompressed data if available
1218        if let Some(data) = self.cache_l1.get(stream, block_idx) {
1219            return Ok(FetchResult::Decompressed(data));
1220        }
1221
1222        // Handle zero blocks
1223        if info.length == 0 {
1224            let len = info.logical_len as usize;
1225            if len == 0 {
1226                return Ok(FetchResult::Decompressed(Bytes::new()));
1227            }
1228            if len == ZEROS_64K.len() {
1229                return Ok(FetchResult::Decompressed(Bytes::from_static(&ZEROS_64K)));
1230            }
1231            return Ok(FetchResult::Decompressed(Bytes::from(vec![0u8; len])));
1232        }
1233
1234        // Fetch raw compressed data (THIS IS THE PARALLEL PART)
1235        self.backend
1236            .read_exact(info.offset, info.length as usize)
1237            .map(FetchResult::Compressed)
1238    }
1239
1240    /// Decompresses and verifies a raw block.
1241    ///
1242    /// This is the CPU portion of block resolution, separated to enable parallel decompression.
1243    /// It:
1244    /// 1. Verifies CRC32 checksum
1245    /// 2. Decrypts (if encrypted)
1246    /// 3. Decompresses
1247    ///
1248    /// # Parameters
1249    ///
1250    /// - `raw`: Raw block data (potentially compressed/encrypted)
1251    /// - `block_idx`: Global block index (for error reporting and decryption)
1252    /// - `info`: Block metadata (checksum)
1253    ///
1254    /// # Returns
1255    ///
1256    /// Decompressed block data as `Bytes`.
1257    ///
1258    /// # Performance
1259    ///
1260    /// Decompression throughput:
1261    /// - LZ4: ~2 GB/s per core
1262    /// - Zstd: ~500 MB/s per core
1263    fn decompress_and_verify(&self, raw: Bytes, block_idx: u64, info: &BlockInfo) -> Result<Bytes> {
1264        // Verify stored checksum (CRC32 of compressed/encrypted data) before decrypt/decompress
1265        if info.checksum != 0 {
1266            let computed = crc32_hash(&raw);
1267            if computed != info.checksum {
1268                return Err(Error::Corruption(block_idx));
1269            }
1270        }
1271
1272        // Checkout a buffer from the pool instead of allocating a new Vec.
1273        // The pool maintains pre-allocated buffers of common sizes (typically
1274        // block_size = 64KB), eliminating allocator overhead for the hot
1275        // decompression path. The buffer is consumed by Bytes::from() below,
1276        // so it is not returned to the pool — the pool acts as a fast allocator
1277        // for the common case where recently-freed buffers are available.
1278        let out_len = info.logical_len as usize;
1279        let mut out = self.decompress_pool.checkout(out_len);
1280
1281        // SAFETY: See decompress_into_slice() for the full safety argument.
1282        // In short: decompress_into() writes exactly out_len bytes or returns Err.
1283        #[allow(clippy::uninit_vec)]
1284        unsafe {
1285            out.set_len(out_len);
1286        }
1287
1288        if let Some(enc) = &self.encryptor {
1289            let compressed = enc.decrypt(&raw, block_idx)?;
1290            self.compressor.decompress_into(&compressed, &mut out)?;
1291        } else {
1292            self.compressor.decompress_into(raw.as_ref(), &mut out)?;
1293        }
1294
1295        Ok(Bytes::from(out))
1296    }
1297}