hexz_core/api/file.rs
1//! High-level snapshot file API and logical stream types.
2
3use crate::algo::compression::{Compressor, create_compressor};
4use crate::algo::encryption::Encryptor;
5use crate::cache::buffer_pool::BufferPool;
6use crate::cache::lru::{BlockCache, ShardedPageCache};
7use crate::cache::prefetch::Prefetcher;
8use crate::format::header::Header;
9use crate::format::index::{BlockInfo, IndexPage, MasterIndex, PageEntry};
10use crate::format::magic::{HEADER_SIZE, MAGIC_BYTES};
11use crate::format::version::{VersionCompatibility, check_version, compatibility_message};
12use crate::store::StorageBackend;
13use bytes::Bytes;
14use crc32fast::hash as crc32_hash;
15use std::mem::MaybeUninit;
16use std::ptr;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Mutex};
19
20use hexz_common::constants::{BLOCK_OFFSET_PARENT, DEFAULT_BLOCK_SIZE};
21use hexz_common::{Error, Result};
22use rayon::prelude::*;
23
24/// A factory function that opens a parent snapshot by path.
25///
26/// Provided by the caller of [`File::with_cache_and_loader`] so that the
27/// core read API has no hard dependency on any specific storage backend
28/// implementation. Storage crates supply a concrete loader; callers that
29/// know parents cannot exist may pass `None`.
30pub type ParentLoader = Box<dyn Fn(&str) -> Result<Arc<File>> + Send + Sync>;
31
32/// Shared zero block for the default block size to avoid allocating when returning zero blocks.
33static ZEROS_64K: [u8; DEFAULT_BLOCK_SIZE as usize] = [0u8; DEFAULT_BLOCK_SIZE as usize];
34
35/// Work item for block decompression: (block_idx, info, buf_offset, offset_in_block, to_copy)
36type WorkItem = (u64, BlockInfo, usize, usize, usize);
37
38/// Result of fetching a block from cache or storage.
39///
40/// Eliminates TOCTOU races by tracking data state at fetch time rather than
41/// re-checking the cache later (which can give a different answer if a
42/// background prefetch thread modifies the cache between check and use).
43enum FetchResult {
44 /// Data is already decompressed (came from L1 cache or is a zero block).
45 Decompressed(Bytes),
46 /// Data is raw compressed bytes from storage (needs decompression).
47 Compressed(Bytes),
48}
49
50/// Logical stream identifier for dual-stream snapshots.
51///
52/// Hexz snapshots can store two independent data streams:
53/// - **Primary**: Persistent storage (disk image, filesystem data, or main tensor weights)
54/// - **Secondary**: Volatile state (RAM contents, process memory, or auxiliary data)
55///
56/// # Example
57///
58/// ```ignore
59/// use hexz_core::{File, SnapshotStream};
60/// # use std::sync::Arc;
61/// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
62/// // Read 4KB from primary stream
63/// let data = snapshot.read_at(SnapshotStream::Primary, 0, 4096)?;
64///
65/// // Read 4KB from secondary stream (if present)
66/// let aux = snapshot.read_at(SnapshotStream::Secondary, 0, 4096)?;
67/// # Ok(())
68/// # }
69/// ```
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71#[repr(u8)]
72pub enum SnapshotStream {
73 /// Persistent primary stream (formerly Disk)
74 Primary = 0,
75 /// Volatile secondary stream (formerly Memory)
76 Secondary = 1,
77}
78
79/// Read-only interface for accessing Hexz snapshot data.
80///
81/// `File` is the primary API for reading compressed, block-indexed snapshots.
82/// It handles:
83/// - Block-level decompression with LRU caching
84/// - Optional AES-256-GCM decryption
85/// - Thin snapshot parent chaining
86/// - Dual-stream access (disk and memory)
87/// - Random access with minimal I/O
88///
89/// # Thread Safety
90///
91/// `File` is `Send + Sync` and can be safely shared across threads via `Arc`.
92/// Internal caches use `Mutex` for synchronization.
93///
94/// # Performance
95///
96/// - **Cache hit latency**: ~80μs (warm cache)
97/// - **Cache miss latency**: ~1ms (cold cache, local storage)
98/// - **Sequential throughput**: ~2-3 GB/s (NVMe + LZ4)
99/// - **Memory overhead**: ~150MB typical (configurable)
100///
101/// # Examples
102///
103/// ## Basic Usage
104///
105/// ```ignore
106/// use hexz_core::{File, SnapshotStream};
107/// use hexz_store::local::FileBackend;
108/// use hexz_core::algo::compression::lz4::Lz4Compressor;
109/// use std::sync::Arc;
110///
111/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
112/// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
113/// let compressor = Box::new(Lz4Compressor::new());
114/// let snapshot = File::new(backend, compressor, None)?;
115///
116/// // Read 4KB at offset 1MB
117/// let data = snapshot.read_at(SnapshotStream::Primary, 1024 * 1024, 4096)?;
118/// assert_eq!(data.len(), 4096);
119/// # Ok(())
120/// # }
121/// ```
122///
123/// ## Thin Snapshots (with parent)
124///
125/// ```ignore
126/// use hexz_core::File;
127/// use hexz_store::local::FileBackend;
128/// use hexz_core::algo::compression::lz4::Lz4Compressor;
129/// use std::sync::Arc;
130///
131/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
132/// // Open base snapshot
133/// let base_backend = Arc::new(FileBackend::new("base.hxz".as_ref())?);
134/// let base = File::new(
135/// base_backend,
136/// Box::new(Lz4Compressor::new()),
137/// None
138/// )?;
139///
140/// // The thin snapshot will automatically load its parent based on
141/// // the parent_paths field in the header
142/// let thin_backend = Arc::new(FileBackend::new("incremental.hxz".as_ref())?);
143/// let thin = File::new(
144/// thin_backend,
145/// Box::new(Lz4Compressor::new()),
146/// None
147/// )?;
148///
149/// // Reads automatically fall back to base for unchanged blocks
150/// let data = thin.read_at(hexz_core::SnapshotStream::Primary, 0, 4096)?;
151/// # Ok(())
152/// # }
153/// ```
154pub struct File {
155 /// Snapshot metadata (sizes, compression, encryption settings)
156 pub header: Header,
157
158 /// Master index containing top-level page entries
159 pub(crate) master: MasterIndex,
160
161 /// Storage backend for reading raw snapshot data
162 backend: Arc<dyn StorageBackend>,
163
164 /// Compression algorithm (LZ4 or Zstandard)
165 compressor: Box<dyn Compressor>,
166
167 /// Optional encryption (AES-256-GCM)
168 encryptor: Option<Box<dyn Encryptor>>,
169
170 /// Optional parent snapshot for thin (incremental) snapshots.
171 /// When a block's offset is BLOCK_OFFSET_PARENT, data is fetched from parent.
172 parents: Vec<Arc<File>>,
173
174 /// LRU cache for decompressed blocks (per-stream, per-block-index)
175 cache_l1: BlockCache,
176
177 /// Sharded LRU cache for deserialized index pages
178 page_cache: ShardedPageCache,
179
180 /// Optional prefetcher for background data loading
181 prefetcher: Option<Prefetcher>,
182
183 /// Reusable buffer pool for decompression output buffers.
184 /// Reduces allocator contention in parallel decompression and avoids
185 /// repeated alloc/dealloc of identically-sized buffers in serial reads.
186 decompress_pool: BufferPool,
187}
188
189impl File {
190 /// Opens a Hexz snapshot with default cache settings.
191 ///
192 /// This is the primary constructor for `File`. It:
193 /// 1. Reads and validates the snapshot header (magic bytes, version)
194 /// 2. Deserializes the master index
195 /// 3. Recursively loads parent snapshots (for thin snapshots)
196 /// 4. Initializes block and page caches
197 ///
198 /// # Parameters
199 ///
200 /// - `backend`: Storage backend (local file, HTTP, S3, etc.)
201 /// - `compressor`: Compression algorithm matching the snapshot format
202 /// - `encryptor`: Optional decryption handler (pass `None` for unencrypted snapshots)
203 ///
204 /// # Returns
205 ///
206 /// - `Ok(File)` on success
207 /// - `Err(Error::Format)` if magic bytes or version are invalid
208 /// - `Err(Error::Io)` if storage backend fails
209 ///
210 /// # Examples
211 ///
212 /// ```ignore
213 /// use hexz_core::{File, SnapshotStream};
214 /// use hexz_store::local::FileBackend;
215 /// use hexz_core::algo::compression::lz4::Lz4Compressor;
216 /// use std::sync::Arc;
217 ///
218 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
219 /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
220 /// let compressor = Box::new(Lz4Compressor::new());
221 /// let snapshot = File::new(backend, compressor, None)?;
222 ///
223 /// println!("Primary size: {} bytes", snapshot.size(SnapshotStream::Primary));
224 /// # Ok(())
225 /// # }
226 /// ```
227 /// Opens a snapshot, auto-detecting compression and dictionary from the header.
228 ///
229 /// This eliminates the 3-step boilerplate of: read header, load dict, create
230 /// compressor. Equivalent to `File::new(backend, auto_compressor, encryptor)`.
231 pub fn open(
232 backend: Arc<dyn StorageBackend>,
233 encryptor: Option<Box<dyn Encryptor>>,
234 ) -> Result<Arc<Self>> {
235 Self::open_with_cache(backend, encryptor, None, None)
236 }
237
238 /// Like [`open`](Self::open) but with custom cache and prefetch settings.
239 pub fn open_with_cache(
240 backend: Arc<dyn StorageBackend>,
241 encryptor: Option<Box<dyn Encryptor>>,
242 cache_capacity_bytes: Option<usize>,
243 prefetch_window_size: Option<u32>,
244 ) -> Result<Arc<Self>> {
245 Self::open_with_cache_and_loader(
246 backend,
247 encryptor,
248 cache_capacity_bytes,
249 prefetch_window_size,
250 None,
251 )
252 }
253
254 /// Like [`open_with_cache`](Self::open_with_cache) but accepts an optional parent loader.
255 pub fn open_with_cache_and_loader(
256 backend: Arc<dyn StorageBackend>,
257 encryptor: Option<Box<dyn Encryptor>>,
258 cache_capacity_bytes: Option<usize>,
259 prefetch_window_size: Option<u32>,
260 parent_loader: Option<&ParentLoader>,
261 ) -> Result<Arc<Self>> {
262 let header = Header::read_from_backend(backend.as_ref())?;
263 let dictionary = header.load_dictionary(backend.as_ref())?;
264 let compressor = create_compressor(header.compression, None, dictionary);
265 Self::with_cache_and_loader(
266 backend,
267 compressor,
268 encryptor,
269 cache_capacity_bytes,
270 prefetch_window_size,
271 parent_loader,
272 )
273 }
274
275 pub fn new(
276 backend: Arc<dyn StorageBackend>,
277 compressor: Box<dyn Compressor>,
278 encryptor: Option<Box<dyn Encryptor>>,
279 ) -> Result<Arc<Self>> {
280 Self::with_cache(backend, compressor, encryptor, None, None)
281 }
282
283 /// Opens a Hexz snapshot with custom cache capacity and prefetching.
284 ///
285 /// Identical to [`new`](Self::new) but allows specifying cache size and prefetch window.
286 ///
287 /// # Parameters
288 ///
289 /// - `backend`: Storage backend
290 /// - `compressor`: Compression algorithm
291 /// - `encryptor`: Optional decryption handler
292 /// - `cache_capacity_bytes`: Block cache size in bytes (default: ~400MB for 4KB blocks)
293 /// - `prefetch_window_size`: Number of blocks to prefetch ahead (default: disabled)
294 ///
295 /// # Cache Sizing
296 ///
297 /// The cache stores decompressed blocks. Given a block size of 4KB:
298 /// - `Some(100_000_000)` → ~24,000 blocks (~96MB effective)
299 /// - `None` → 1000 blocks (~4MB effective)
300 ///
301 /// Larger caches reduce repeated decompression but increase memory usage.
302 ///
303 /// # Prefetching
304 ///
305 /// When `prefetch_window_size` is set, the system will automatically fetch the next N blocks
306 /// in the background after each read, optimizing sequential access patterns:
307 /// - `Some(4)` → Prefetch 4 blocks ahead
308 /// - `None` or `Some(0)` → Disable prefetching
309 ///
310 /// # Examples
311 ///
312 /// ```ignore
313 /// use hexz_core::File;
314 /// use hexz_store::local::FileBackend;
315 /// use hexz_core::algo::compression::lz4::Lz4Compressor;
316 /// use std::sync::Arc;
317 ///
318 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
319 /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
320 /// let compressor = Box::new(Lz4Compressor::new());
321 ///
322 /// // Allocate 256MB for cache, prefetch 4 blocks ahead
323 /// let snapshot = File::with_cache(
324 /// backend,
325 /// compressor,
326 /// None,
327 /// Some(256 * 1024 * 1024),
328 /// Some(4)
329 /// )?;
330 /// # Ok(())
331 /// # }
332 /// ```
333 pub fn with_cache(
334 backend: Arc<dyn StorageBackend>,
335 compressor: Box<dyn Compressor>,
336 encryptor: Option<Box<dyn Encryptor>>,
337 cache_capacity_bytes: Option<usize>,
338 prefetch_window_size: Option<u32>,
339 ) -> Result<Arc<Self>> {
340 Self::with_cache_and_loader(
341 backend,
342 compressor,
343 encryptor,
344 cache_capacity_bytes,
345 prefetch_window_size,
346 None,
347 )
348 }
349
350 /// Like [`with_cache`](Self::with_cache) but accepts an optional parent loader.
351 ///
352 /// The `parent_loader` is called for each path in the snapshot's `parent_paths`
353 /// header field. Passing `None` disables automatic parent chaining — reads that
354 /// fall through to a parent will return an error unless no parent paths are present.
355 ///
356 /// This is the primary constructor used by `hexz-store` to supply a
357 /// `FileBackend`-based loader without coupling this crate to any backend impl.
358 pub fn with_cache_and_loader(
359 backend: Arc<dyn StorageBackend>,
360 compressor: Box<dyn Compressor>,
361 encryptor: Option<Box<dyn Encryptor>>,
362 cache_capacity_bytes: Option<usize>,
363 prefetch_window_size: Option<u32>,
364 parent_loader: Option<&ParentLoader>,
365 ) -> Result<Arc<Self>> {
366 let header_bytes = backend.read_exact(0, HEADER_SIZE)?;
367 let header: Header = bincode::deserialize(&header_bytes)?;
368
369 if &header.magic != MAGIC_BYTES {
370 return Err(Error::Format("Invalid magic bytes".into()));
371 }
372
373 // Check version compatibility
374 let compatibility = check_version(header.version);
375 match compatibility {
376 VersionCompatibility::Full => {
377 // Perfect match, proceed silently
378 }
379 VersionCompatibility::Degraded => {
380 // Newer version, issue warning but allow
381 tracing::warn!("{}", compatibility_message(header.version));
382 }
383 VersionCompatibility::Incompatible => {
384 // Too old or too new, reject
385 return Err(Error::Format(compatibility_message(header.version)));
386 }
387 }
388
389 let file_len = backend.len();
390 if header.index_offset >= file_len {
391 return Err(Error::Format(format!(
392 "index_offset ({}) is at or past end of file ({})",
393 header.index_offset, file_len
394 )));
395 }
396 let index_bytes = backend.read_exact(
397 header.index_offset,
398 (file_len - header.index_offset) as usize,
399 )?;
400
401 let master: MasterIndex = bincode::deserialize(&index_bytes)?;
402
403 // Recursively load parent snapshots if a loader is provided.
404 let mut parents = Vec::new();
405 if let Some(loader) = parent_loader {
406 for parent_path in &header.parent_paths {
407 tracing::info!("Loading parent snapshot: {}", parent_path);
408 parents.push(loader(parent_path)?);
409 }
410 } else if !header.parent_paths.is_empty() {
411 tracing::warn!(
412 "Snapshot has {} parent path(s) but no parent_loader was provided; \
413 parent-reference blocks will not be resolvable.",
414 header.parent_paths.len()
415 );
416 }
417
418 let block_size = header.block_size as usize;
419 let l1_capacity = if let Some(bytes) = cache_capacity_bytes {
420 (bytes / block_size).max(1)
421 } else {
422 1000
423 };
424
425 // Initialize prefetcher if window size is specified and > 0
426 let prefetcher = prefetch_window_size.filter(|&w| w > 0).map(Prefetcher::new);
427
428 // Size the decompression buffer pool: one buffer per rayon thread plus
429 // a few extra for the serial path and prefetch thread.
430 let pool_size = rayon::current_num_threads() + 2;
431
432 Ok(Arc::new(Self {
433 header,
434 master,
435 backend,
436 compressor,
437 encryptor,
438 parents,
439 cache_l1: BlockCache::with_capacity(l1_capacity),
440 page_cache: ShardedPageCache::default(),
441 prefetcher,
442 decompress_pool: BufferPool::new(pool_size),
443 }))
444 }
445
446 /// Returns the logical size of a stream in bytes.
447 ///
448 /// # Parameters
449 ///
450 /// - `stream`: The stream to query (Disk or Memory)
451 ///
452 /// # Returns
453 ///
454 /// The uncompressed, logical size of the stream. This is the size you would
455 /// get if you decompressed all blocks and concatenated them.
456 ///
457 /// # Examples
458 ///
459 /// ```ignore
460 /// use hexz_core::{File, SnapshotStream};
461 /// # use std::sync::Arc;
462 /// # fn example(snapshot: Arc<File>) {
463 /// let disk_bytes = snapshot.size(SnapshotStream::Primary);
464 /// let mem_bytes = snapshot.size(SnapshotStream::Secondary);
465 ///
466 /// println!("Primary: {} GB", disk_bytes / (1024 * 1024 * 1024));
467 /// println!("Secondary: {} MB", mem_bytes / (1024 * 1024));
468 /// # }
469 /// ```
470 /// Returns the total number of prefetch operations spawned since this file was opened.
471 /// Returns 0 if prefetching is disabled.
472 pub fn prefetch_spawn_count(&self) -> u64 {
473 self.prefetcher.as_ref().map_or(0, |p| p.spawn_count())
474 }
475
476 pub fn size(&self, stream: SnapshotStream) -> u64 {
477 match stream {
478 SnapshotStream::Primary => self.master.primary_size,
479 SnapshotStream::Secondary => self.master.secondary_size,
480 }
481 }
482
483 /// Iterates all non-sparse block hashes for the given stream.
484 ///
485 /// Used by `hexz-ops` to build a `ParentIndex` for cross-file deduplication
486 /// without requiring access to private fields.
487 pub fn iter_block_hashes(&self, stream: SnapshotStream) -> Result<Vec<[u8; 32]>> {
488 let pages = match stream {
489 SnapshotStream::Primary => &self.master.primary_pages,
490 SnapshotStream::Secondary => &self.master.secondary_pages,
491 };
492 let mut hashes = Vec::new();
493 for page_entry in pages {
494 let page = self.get_page(page_entry)?;
495 for block_info in &page.blocks {
496 if !block_info.is_sparse() && block_info.hash != [0u8; 32] {
497 hashes.push(block_info.hash);
498 }
499 }
500 }
501 Ok(hashes)
502 }
503
504 /// Returns the block metadata for a given logical offset.
505 pub fn get_block_info(
506 &self,
507 stream: SnapshotStream,
508 offset: u64,
509 ) -> Result<Option<(u64, BlockInfo)>> {
510 let pages = match stream {
511 SnapshotStream::Primary => &self.master.primary_pages,
512 SnapshotStream::Secondary => &self.master.secondary_pages,
513 };
514
515 if pages.is_empty() {
516 return Ok(None);
517 }
518
519 let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
520 Ok(idx) => idx,
521 Err(idx) => idx.saturating_sub(1),
522 };
523
524 let page_entry = &pages[page_idx];
525 let page = self.get_page(page_entry)?;
526 let mut block_logical_start = page_entry.start_logical;
527
528 for (i, block) in page.blocks.iter().enumerate() {
529 let block_end = block_logical_start + block.logical_len as u64;
530 if offset >= block_logical_start && offset < block_end {
531 let global_idx = page_entry.start_block + i as u64;
532 return Ok(Some((global_idx, *block)));
533 }
534 block_logical_start = block_end;
535 }
536
537 Ok(None)
538 }
539
540 /// Reads data from a snapshot stream at a given offset.
541 ///
542 /// This is the primary read method for random access. It:
543 /// 1. Identifies which blocks overlap the requested range
544 /// 2. Fetches blocks from cache or decompresses from storage
545 /// 3. Handles thin snapshot fallback to parent
546 /// 4. Assembles the final buffer from block slices
547 ///
548 /// # Parameters
549 ///
550 /// - `stream`: Which stream to read from (Disk or Memory)
551 /// - `offset`: Starting byte offset (0-indexed)
552 /// - `len`: Number of bytes to read
553 ///
554 /// # Returns
555 ///
556 /// A `Vec<u8>` containing up to `len` bytes. The returned vector may be shorter
557 /// if:
558 /// - `offset` is beyond the stream size (returns empty vector)
559 /// - `offset + len` exceeds stream size (returns partial data)
560 ///
561 /// Missing data (sparse regions) is zero-filled.
562 ///
563 /// # Errors
564 ///
565 /// - `Error::Io` if backend read fails (e.g. truncated file)
566 /// - `Error::Corruption(block_idx)` if block checksum does not match
567 /// - `Error::Decompression` if block decompression fails
568 /// - `Error::Decryption` if block decryption fails
569 ///
570 /// # Performance
571 ///
572 /// - **Cache hit**: ~80μs latency, no I/O
573 /// - **Cache miss**: ~1ms latency (local storage), includes decompression
574 /// - **Remote storage**: Latency depends on network (HTTP: ~50ms, S3: ~100ms)
575 ///
576 /// Aligned reads (offset % block_size == 0) are most efficient.
577 ///
578 /// # Examples
579 ///
580 /// ```ignore
581 /// use hexz_core::{File, SnapshotStream};
582 /// # use std::sync::Arc;
583 /// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
584 /// // Read first 512 bytes of primary stream
585 /// let boot_sector = snapshot.read_at(SnapshotStream::Primary, 0, 512)?;
586 ///
587 /// // Read from arbitrary offset
588 /// let chunk = snapshot.read_at(SnapshotStream::Primary, 1024 * 1024, 4096)?;
589 ///
590 /// // Reading beyond stream size returns empty vector
591 /// let empty = snapshot.read_at(SnapshotStream::Primary, u64::MAX, 100)?;
592 /// assert!(empty.is_empty());
593 /// # Ok(())
594 /// # }
595 /// ```
596 /// Reads a byte range. Uses parallel block decompression when the range spans multiple blocks.
597 pub fn read_at(
598 self: &Arc<Self>,
599 stream: SnapshotStream,
600 offset: u64,
601 len: usize,
602 ) -> Result<Vec<u8>> {
603 let stream_size = self.size(stream);
604 if offset >= stream_size {
605 return Ok(Vec::new());
606 }
607 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
608 if actual_len == 0 {
609 return Ok(Vec::new());
610 }
611
612 let pages = match stream {
613 SnapshotStream::Primary => &self.master.primary_pages,
614 SnapshotStream::Secondary => &self.master.secondary_pages,
615 };
616
617 if pages.is_empty() {
618 for parent in &self.parents {
619 if parent.get_block_info(stream, offset)?.is_some() {
620 return parent.read_at(stream, offset, actual_len);
621 }
622 }
623 return Ok(vec![0u8; actual_len]);
624 }
625
626 let mut buf: Vec<MaybeUninit<u8>> = Vec::with_capacity(actual_len);
627 // SAFETY: MaybeUninit<u8> does not require initialization; we only need
628 // the allocation. read_at_into_uninit writes every byte before we
629 // transmute to Vec<u8> below.
630 unsafe { buf.set_len(actual_len) };
631 self.read_at_into_uninit(stream, offset, &mut buf)?;
632 let ptr = buf.as_mut_ptr().cast::<u8>();
633 let len = buf.len();
634 let cap = buf.capacity();
635 std::mem::forget(buf);
636 // SAFETY: `buf` was a Vec<MaybeUninit<u8>> that we fully initialized via
637 // `read_at_into_uninit` (which writes every byte). We `forget` the original
638 // Vec to avoid a double-free and reconstruct it with the same ptr/len/cap.
639 // MaybeUninit<u8> has the same layout as u8.
640 Ok(unsafe { Vec::from_raw_parts(ptr, len, cap) })
641 }
642
643 /// Reads into a provided buffer. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
644 pub fn read_at_into(
645 self: &Arc<Self>,
646 stream: SnapshotStream,
647 offset: u64,
648 buffer: &mut [u8],
649 ) -> Result<()> {
650 let len = buffer.len();
651 if len == 0 {
652 return Ok(());
653 }
654 let stream_size = self.size(stream);
655 if offset >= stream_size {
656 buffer.fill(0);
657 return Ok(());
658 }
659 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
660 if actual_len < len {
661 buffer[actual_len..].fill(0);
662 }
663 self.read_at_into_uninit_bytes(stream, offset, &mut buffer[0..actual_len])
664 }
665
666 /// Minimum number of local blocks to use the parallel decompression path.
667 ///
668 /// Rayon task dispatch adds ~5-10μs overhead per task. With LZ4 at ~2GB/s,
669 /// a 64KB block decompresses in ~32μs. Below 4 blocks the overhead fraction
670 /// is significant; serial decompression avoids the synchronization cost.
671 const PARALLEL_MIN_BLOCKS: usize = 4;
672
673 /// Collects work items for blocks that need decompression.
674 ///
675 /// This method iterates through index pages and blocks, handling:
676 /// - Parent blocks: delegate to parent snapshot or zero-fill
677 /// - Zero blocks: zero-fill directly
678 /// - Regular blocks: add to work queue for later decompression
679 ///
680 /// Returns the work items to process and updates the tracking variables.
681 fn collect_work_items(
682 &self,
683 stream: SnapshotStream,
684 pages: &[PageEntry],
685 page_idx: usize,
686 target: &mut [MaybeUninit<u8>],
687 offset: u64,
688 actual_len: usize,
689 ) -> Result<(Vec<WorkItem>, usize)> {
690 let block_size = self.header.block_size.max(1) as usize;
691 let estimated_blocks = actual_len.div_ceil(block_size);
692 let mut local_work: Vec<WorkItem> = Vec::with_capacity(estimated_blocks);
693 let mut buf_offset = 0;
694 let mut current_pos = offset;
695 let mut remaining = actual_len;
696
697 for page_entry in pages.iter().skip(page_idx) {
698 if remaining == 0 {
699 break;
700 }
701 if page_entry.start_logical > current_pos + remaining as u64 {
702 break;
703 }
704
705 let page = self.get_page(page_entry)?;
706 let mut block_logical_start = page_entry.start_logical;
707
708 for (block_idx_in_page, block) in page.blocks.iter().enumerate() {
709 let block_end = block_logical_start + block.logical_len as u64;
710
711 if block_end > current_pos {
712 let global_block_idx = page_entry.start_block + block_idx_in_page as u64;
713 let offset_in_block = (current_pos - block_logical_start) as usize;
714 let to_copy = std::cmp::min(
715 remaining,
716 (block.logical_len as usize).saturating_sub(offset_in_block),
717 );
718
719 if block.offset == BLOCK_OFFSET_PARENT {
720 // Parent block: delegate to the parent that has it, or zero-fill
721 let mut found = false;
722 for parent in &self.parents {
723 if parent.get_block_info(stream, current_pos)?.is_some() {
724 let dest = &mut target[buf_offset..buf_offset + to_copy];
725 parent.read_at_into_uninit(stream, current_pos, dest)?;
726 found = true;
727 break;
728 }
729 }
730 if !found {
731 Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
732 }
733 current_pos += to_copy as u64;
734 buf_offset += to_copy;
735 remaining -= to_copy;
736 } else if block.length == 0 {
737 // Zero block: fill with zeros
738 Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
739 current_pos += to_copy as u64;
740 buf_offset += to_copy;
741 remaining -= to_copy;
742 } else {
743 // Regular block: add to work queue
744 if to_copy > 0 {
745 local_work.push((
746 global_block_idx,
747 *block,
748 buf_offset,
749 offset_in_block,
750 to_copy,
751 ));
752 buf_offset += to_copy;
753 current_pos += to_copy as u64;
754 remaining -= to_copy;
755 }
756 }
757
758 if remaining == 0 {
759 break;
760 }
761 }
762 block_logical_start += block.logical_len as u64;
763 }
764 }
765
766 Ok((local_work, buf_offset))
767 }
768
769 /// Executes parallel decompression for multiple blocks.
770 ///
771 /// Uses a two-phase approach:
772 /// 1. Parallel I/O: Fetch all raw blocks concurrently
773 /// 2. Parallel CPU: Decompress and copy to target buffer
774 fn execute_parallel_decompression(
775 self: &Arc<Self>,
776 stream: SnapshotStream,
777 work_items: &[WorkItem],
778 target: &mut [MaybeUninit<u8>],
779 actual_len: usize,
780 ) -> Result<()> {
781 let snap = Arc::clone(self);
782 let target_addr = target.as_mut_ptr() as usize;
783
784 // Phase 1: Parallel fetch all raw blocks. Each result tracks whether
785 // the data is already decompressed (cache hit / zero block) or still
786 // compressed (storage read), eliminating a TOCTOU race where a background
787 // prefetch thread could modify the cache between fetch and decompression.
788 let raw_blocks: Vec<Result<FetchResult>> = work_items
789 .par_iter()
790 .map(|(block_idx, info, _, _, _)| snap.fetch_raw_block(stream, *block_idx, info))
791 .collect();
792
793 // Phase 2: Parallel decompress and copy
794 let has_error = AtomicBool::new(false);
795 let err: Mutex<Option<Error>> = Mutex::new(None);
796 work_items
797 .par_iter()
798 .zip(raw_blocks)
799 .for_each(|(work_item, fetch_result)| {
800 // Fast path: single atomic load (~1 cycle), no lock contention.
801 // Relaxed ordering is sufficient — this is a best-effort early-exit
802 // hint; the final err.lock() after the loop provides synchronization.
803 if has_error.load(Ordering::Relaxed) {
804 return;
805 }
806
807 let (block_idx, info, buf_offset, offset_in_block, to_copy) = work_item;
808
809 // Handle fetch errors
810 let fetched = match fetch_result {
811 Ok(r) => r,
812 Err(e) => {
813 has_error.store(true, Ordering::Relaxed);
814 if let Ok(mut guard) = err.lock() {
815 let _ = guard.replace(e);
816 }
817 return;
818 }
819 };
820
821 // Use the FetchResult to determine if decompression is needed,
822 // rather than re-checking the cache (which could give a stale answer).
823 let data = match fetched {
824 FetchResult::Decompressed(data) => data,
825 FetchResult::Compressed(raw) => {
826 match snap.decompress_and_verify(raw, *block_idx, info) {
827 Ok(d) => {
828 // Cache the result
829 snap.cache_l1.insert(stream, *block_idx, d.clone());
830 d
831 }
832 Err(e) => {
833 has_error.store(true, Ordering::Relaxed);
834 if let Ok(mut guard) = err.lock() {
835 let _ = guard.replace(e);
836 }
837 return;
838 }
839 }
840 }
841 };
842
843 // Copy to target buffer
844 let src = data.as_ref();
845 let start = *offset_in_block;
846 let len = *to_copy;
847 if start < src.len() && len <= src.len() - start {
848 // Defensive assertion: ensure destination write is within bounds
849 debug_assert!(
850 buf_offset + len <= actual_len,
851 "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
852 len,
853 buf_offset,
854 actual_len
855 );
856 let dest = (target_addr + buf_offset) as *mut u8;
857 // SAFETY: `src[start..start+len]` is in-bounds (checked above).
858 // `dest` points into the `target` MaybeUninit buffer at a unique
859 // non-overlapping offset (each work item has a distinct `buf_offset`),
860 // and the rayon par_iter ensures each item writes to a disjoint region.
861 // The debug_assert above validates buf_offset + len <= actual_len.
862 unsafe { ptr::copy_nonoverlapping(src[start..].as_ptr(), dest, len) };
863 }
864 });
865
866 if let Some(e) = err.lock().ok().and_then(|mut guard| guard.take()) {
867 return Err(e);
868 }
869
870 Ok(())
871 }
872
873 /// Executes serial decompression for a small number of blocks.
874 fn execute_serial_decompression(
875 &self,
876 stream: SnapshotStream,
877 work_items: &[WorkItem],
878 target: &mut [MaybeUninit<u8>],
879 actual_len: usize,
880 ) -> Result<()> {
881 for (block_idx, info, buf_offset, offset_in_block, to_copy) in work_items {
882 let fetched = self.fetch_raw_block(stream, *block_idx, info)?;
883 let data = match fetched {
884 FetchResult::Decompressed(data) => data,
885 FetchResult::Compressed(raw) => {
886 let data = self.decompress_and_verify(raw, *block_idx, info)?;
887 self.cache_l1.insert(stream, *block_idx, data.clone());
888 data
889 }
890 };
891
892 let src = data.as_ref();
893 let start = *offset_in_block;
894 if start < src.len() && *to_copy <= src.len() - start {
895 debug_assert!(
896 *buf_offset + *to_copy <= actual_len,
897 "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
898 to_copy,
899 buf_offset,
900 actual_len
901 );
902 // SAFETY: `src[start..start+to_copy]` is in-bounds (checked above).
903 // `target[buf_offset..]` has sufficient room because `buf_offset + to_copy`
904 // never exceeds `actual_len` (tracked during work-item collection).
905 // MaybeUninit<u8> has the same layout as u8.
906 unsafe {
907 ptr::copy_nonoverlapping(
908 src[start..].as_ptr(),
909 target[*buf_offset..].as_mut_ptr() as *mut u8,
910 *to_copy,
911 );
912 }
913 }
914 }
915 Ok(())
916 }
917
918 /// Zero-fills a slice of uninitialized memory.
919 ///
920 /// This helper centralizes all unsafe zero-filling operations to improve
921 /// safety auditing and reduce code duplication.
922 ///
923 /// # Safety
924 ///
925 /// This function writes zeros to the provided buffer, making it fully initialized.
926 /// The caller must ensure the buffer is valid for writes.
927 #[inline]
928 fn zero_fill_uninit(buffer: &mut [MaybeUninit<u8>]) {
929 if !buffer.is_empty() {
930 // SAFETY: buffer is a valid &mut [MaybeUninit<u8>] slice, so writing
931 // buffer.len() zero bytes through its pointer is in-bounds.
932 unsafe { ptr::write_bytes(buffer.as_mut_ptr(), 0, buffer.len()) };
933 }
934 }
935
936 /// Writes into uninitialized memory. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
937 ///
938 /// **On error:** The buffer contents are undefined (possibly partially written).
939 pub fn read_at_into_uninit(
940 self: &Arc<Self>,
941 stream: SnapshotStream,
942 offset: u64,
943 buffer: &mut [MaybeUninit<u8>],
944 ) -> Result<()> {
945 self.read_at_uninit_inner(stream, offset, buffer, false)
946 }
947
948 /// Inner implementation of [`read_at_into_uninit`](Self::read_at_into_uninit).
949 ///
950 /// The `is_prefetch` flag prevents recursive prefetch thread spawning:
951 /// when `true`, the prefetch block is skipped to avoid unbounded thread creation.
952 fn read_at_uninit_inner(
953 self: &Arc<Self>,
954 stream: SnapshotStream,
955 offset: u64,
956 buffer: &mut [MaybeUninit<u8>],
957 is_prefetch: bool,
958 ) -> Result<()> {
959 // Early validation
960 let len = buffer.len();
961 if len == 0 {
962 return Ok(());
963 }
964
965 let stream_size = self.size(stream);
966 if offset >= stream_size {
967 Self::zero_fill_uninit(buffer);
968 return Ok(());
969 }
970
971 // Calculate actual read length and zero-fill suffix if needed
972 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
973 if actual_len < len {
974 Self::zero_fill_uninit(&mut buffer[actual_len..]);
975 }
976
977 let target = &mut buffer[0..actual_len];
978
979 // Get page list for stream
980 let pages = match stream {
981 SnapshotStream::Primary => &self.master.primary_pages,
982 SnapshotStream::Secondary => &self.master.secondary_pages,
983 };
984
985 // Delegate to parent if no index pages
986 if pages.is_empty() {
987 for parent in &self.parents {
988 if parent.get_block_info(stream, offset)?.is_some() {
989 return parent.read_at_into_uninit(stream, offset, target);
990 }
991 }
992 Self::zero_fill_uninit(target);
993 return Ok(());
994 }
995
996 // Find starting page index
997 let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
998 Ok(idx) => idx,
999 Err(idx) => idx.saturating_sub(1),
1000 };
1001
1002 // Collect work items (handles parent blocks, zero blocks, and queues regular blocks)
1003 let (work_items, buf_offset) =
1004 self.collect_work_items(stream, pages, page_idx, target, offset, actual_len)?;
1005
1006 // Choose parallel or serial decompression based on work item count
1007 if work_items.len() >= Self::PARALLEL_MIN_BLOCKS {
1008 self.execute_parallel_decompression(stream, &work_items, target, actual_len)?;
1009 } else {
1010 self.execute_serial_decompression(stream, &work_items, target, actual_len)?;
1011 }
1012
1013 // Handle any remaining unprocessed data
1014 let remaining = actual_len - buf_offset;
1015 if remaining > 0 {
1016 let current_pos = offset + buf_offset as u64;
1017 let mut found = false;
1018 for parent in &self.parents {
1019 if parent.get_block_info(stream, current_pos)?.is_some() {
1020 parent.read_at_into_uninit(stream, current_pos, &mut target[buf_offset..])?;
1021 found = true;
1022 break;
1023 }
1024 }
1025 if !found {
1026 Self::zero_fill_uninit(&mut target[buf_offset..]);
1027 }
1028 }
1029
1030 // Trigger prefetch for next sequential blocks if enabled.
1031 // Guards:
1032 // 1. `is_prefetch` prevents recursive spawning (prefetch thread spawning another)
1033 // 2. `try_start()` limits to one in-flight prefetch at a time, preventing
1034 // unbounded thread creation under rapid sequential reads
1035 if let Some(prefetcher) = &self.prefetcher {
1036 if !is_prefetch && !work_items.is_empty() && prefetcher.try_start() {
1037 let next_offset = offset + actual_len as u64;
1038 let prefetch_len = (self.header.block_size * 4) as usize;
1039 let snap = Arc::clone(self);
1040 let stream_copy = stream;
1041 rayon::spawn(move || {
1042 let _ = snap.warm_blocks(stream_copy, next_offset, prefetch_len);
1043 // Release the in-flight guard so the next read can prefetch
1044 if let Some(pf) = &snap.prefetcher {
1045 pf.clear_in_flight();
1046 }
1047 });
1048 }
1049 }
1050
1051 Ok(())
1052 }
1053
1054 /// Warms the block cache for the given byte range without allocating a target buffer.
1055 ///
1056 /// Unlike [`read_at_into_uninit`](Self::read_at_into_uninit), this method only fetches,
1057 /// decompresses, and inserts blocks into the L1 cache. It skips blocks that are already
1058 /// cached, zero-length, or parent-delegated. No output buffer is allocated or written to.
1059 ///
1060 /// Used by the prefetcher to reduce overhead: the old path allocated a throwaway buffer
1061 /// of `block_size * 4` bytes and copied decompressed data into it, only to discard it.
1062 fn warm_blocks(&self, stream: SnapshotStream, offset: u64, len: usize) -> Result<()> {
1063 if len == 0 {
1064 return Ok(());
1065 }
1066 let stream_size = self.size(stream);
1067 if offset >= stream_size {
1068 return Ok(());
1069 }
1070 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
1071
1072 let pages = match stream {
1073 SnapshotStream::Primary => &self.master.primary_pages,
1074 SnapshotStream::Secondary => &self.master.secondary_pages,
1075 };
1076 if pages.is_empty() {
1077 return Ok(());
1078 }
1079
1080 let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
1081 Ok(idx) => idx,
1082 Err(idx) => idx.saturating_sub(1),
1083 };
1084
1085 let mut current_pos = offset;
1086 let mut remaining = actual_len;
1087
1088 for page_entry in pages.iter().skip(page_idx) {
1089 if remaining == 0 {
1090 break;
1091 }
1092 if page_entry.start_logical > current_pos + remaining as u64 {
1093 break;
1094 }
1095
1096 let page = self.get_page(page_entry)?;
1097 let mut block_logical_start = page_entry.start_logical;
1098
1099 for (i, block) in page.blocks.iter().enumerate() {
1100 if remaining == 0 {
1101 break;
1102 }
1103 let block_end = block_logical_start + block.logical_len as u64;
1104
1105 if block_end > current_pos {
1106 let offset_in_block = (current_pos - block_logical_start) as usize;
1107 let to_advance = std::cmp::min(
1108 remaining,
1109 (block.logical_len as usize).saturating_sub(offset_in_block),
1110 );
1111
1112 // Only warm regular blocks (skip parent-delegated and zero blocks).
1113 // fetch_raw_block handles the cache check internally — on a hit it
1114 // returns Decompressed which we simply ignore via the Compressed match.
1115 if block.offset != BLOCK_OFFSET_PARENT && block.length > 0 {
1116 let global_idx = page_entry.start_block + i as u64;
1117 if let Ok(FetchResult::Compressed(raw)) =
1118 self.fetch_raw_block(stream, global_idx, block)
1119 {
1120 if let Ok(data) = self.decompress_and_verify(raw, global_idx, block) {
1121 self.cache_l1.insert(stream, global_idx, data);
1122 }
1123 }
1124 }
1125
1126 current_pos += to_advance as u64;
1127 remaining -= to_advance;
1128 }
1129 block_logical_start += block.logical_len as u64;
1130 }
1131 }
1132
1133 Ok(())
1134 }
1135
1136 /// Like [`read_at_into_uninit`](Self::read_at_into_uninit) but accepts `&mut [u8]`. Use from FFI (e.g. Python).
1137 #[inline]
1138 pub fn read_at_into_uninit_bytes(
1139 self: &Arc<Self>,
1140 stream: SnapshotStream,
1141 offset: u64,
1142 buf: &mut [u8],
1143 ) -> Result<()> {
1144 if buf.is_empty() {
1145 return Ok(());
1146 }
1147 // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
1148 // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
1149 // The borrow is derived from `buf` so no aliasing occurs.
1150 let uninit = unsafe { &mut *(buf as *mut [u8] as *mut [MaybeUninit<u8>]) };
1151 self.read_at_into_uninit(stream, offset, uninit)
1152 }
1153
1154 /// Fetches an index page from cache or storage.
1155 ///
1156 /// Index pages map logical offsets to physical block locations. This method
1157 /// maintains an LRU cache to avoid repeated deserialization.
1158 ///
1159 /// # Parameters
1160 ///
1161 /// - `entry`: Page metadata from master index
1162 ///
1163 /// # Returns
1164 ///
1165 /// A shared reference to the deserialized index page.
1166 ///
1167 /// # Thread Safety
1168 ///
1169 /// This method acquires a lock on the page cache only for cache lookup and insertion.
1170 /// I/O and deserialization are performed without holding the lock to avoid blocking
1171 /// other threads during cache misses.
1172 pub(crate) fn get_page(&self, entry: &PageEntry) -> Result<Arc<IndexPage>> {
1173 // Fast path: check sharded cache
1174 if let Some(p) = self.page_cache.get(entry.offset) {
1175 return Ok(p);
1176 }
1177
1178 // Slow path: I/O and deserialization without holding any lock
1179 let bytes = self
1180 .backend
1181 .read_exact(entry.offset, entry.length as usize)?;
1182 let page: IndexPage = bincode::deserialize(&bytes)?;
1183 let arc = Arc::new(page);
1184
1185 // Check again in case another thread inserted while we were doing I/O
1186 if let Some(p) = self.page_cache.get(entry.offset) {
1187 return Ok(p);
1188 }
1189 self.page_cache.insert(entry.offset, arc.clone());
1190
1191 Ok(arc)
1192 }
1193
1194 /// Fetches raw compressed block data from cache or storage.
1195 ///
1196 /// This is the I/O portion of block resolution, separated to enable parallel I/O.
1197 /// It:
1198 /// 1. Checks the block cache
1199 /// 2. Handles zero-length blocks
1200 /// 3. Reads raw compressed data from backend
1201 ///
1202 /// # Parameters
1203 ///
1204 /// - `stream`: Stream identifier (for cache key)
1205 /// - `block_idx`: Global block index
1206 /// - `info`: Block metadata (offset, length)
1207 ///
1208 /// # Returns
1209 ///
1210 /// Raw block data (potentially compressed/encrypted) or cached decompressed data.
1211 fn fetch_raw_block(
1212 &self,
1213 stream: SnapshotStream,
1214 block_idx: u64,
1215 info: &BlockInfo,
1216 ) -> Result<FetchResult> {
1217 // Check cache first - return decompressed data if available
1218 if let Some(data) = self.cache_l1.get(stream, block_idx) {
1219 return Ok(FetchResult::Decompressed(data));
1220 }
1221
1222 // Handle zero blocks
1223 if info.length == 0 {
1224 let len = info.logical_len as usize;
1225 if len == 0 {
1226 return Ok(FetchResult::Decompressed(Bytes::new()));
1227 }
1228 if len == ZEROS_64K.len() {
1229 return Ok(FetchResult::Decompressed(Bytes::from_static(&ZEROS_64K)));
1230 }
1231 return Ok(FetchResult::Decompressed(Bytes::from(vec![0u8; len])));
1232 }
1233
1234 // Fetch raw compressed data (THIS IS THE PARALLEL PART)
1235 self.backend
1236 .read_exact(info.offset, info.length as usize)
1237 .map(FetchResult::Compressed)
1238 }
1239
1240 /// Decompresses and verifies a raw block.
1241 ///
1242 /// This is the CPU portion of block resolution, separated to enable parallel decompression.
1243 /// It:
1244 /// 1. Verifies CRC32 checksum
1245 /// 2. Decrypts (if encrypted)
1246 /// 3. Decompresses
1247 ///
1248 /// # Parameters
1249 ///
1250 /// - `raw`: Raw block data (potentially compressed/encrypted)
1251 /// - `block_idx`: Global block index (for error reporting and decryption)
1252 /// - `info`: Block metadata (checksum)
1253 ///
1254 /// # Returns
1255 ///
1256 /// Decompressed block data as `Bytes`.
1257 ///
1258 /// # Performance
1259 ///
1260 /// Decompression throughput:
1261 /// - LZ4: ~2 GB/s per core
1262 /// - Zstd: ~500 MB/s per core
1263 fn decompress_and_verify(&self, raw: Bytes, block_idx: u64, info: &BlockInfo) -> Result<Bytes> {
1264 // Verify stored checksum (CRC32 of compressed/encrypted data) before decrypt/decompress
1265 if info.checksum != 0 {
1266 let computed = crc32_hash(&raw);
1267 if computed != info.checksum {
1268 return Err(Error::Corruption(block_idx));
1269 }
1270 }
1271
1272 // Checkout a buffer from the pool instead of allocating a new Vec.
1273 // The pool maintains pre-allocated buffers of common sizes (typically
1274 // block_size = 64KB), eliminating allocator overhead for the hot
1275 // decompression path. The buffer is consumed by Bytes::from() below,
1276 // so it is not returned to the pool — the pool acts as a fast allocator
1277 // for the common case where recently-freed buffers are available.
1278 let out_len = info.logical_len as usize;
1279 let mut out = self.decompress_pool.checkout(out_len);
1280
1281 // SAFETY: See decompress_into_slice() for the full safety argument.
1282 // In short: decompress_into() writes exactly out_len bytes or returns Err.
1283 #[allow(clippy::uninit_vec)]
1284 unsafe {
1285 out.set_len(out_len);
1286 }
1287
1288 if let Some(enc) = &self.encryptor {
1289 let compressed = enc.decrypt(&raw, block_idx)?;
1290 self.compressor.decompress_into(&compressed, &mut out)?;
1291 } else {
1292 self.compressor.decompress_into(raw.as_ref(), &mut out)?;
1293 }
1294
1295 Ok(Bytes::from(out))
1296 }
1297}