Skip to main content

hexz_core/api/
file.rs

1//! High-level snapshot file API and logical stream types.
2
3use crate::algo::compression::{Compressor, create_compressor};
4use crate::algo::encryption::Encryptor;
5use crate::cache::lru::{BlockCache, ShardedPageCache};
6use crate::cache::prefetch::Prefetcher;
7use crate::format::header::Header;
8use crate::format::index::{BlockInfo, IndexPage, MasterIndex, PageEntry};
9use crate::format::magic::{HEADER_SIZE, MAGIC_BYTES};
10use crate::format::version::{VersionCompatibility, check_version, compatibility_message};
11use crate::store::StorageBackend;
12use crate::store::local::file::FileBackend;
13use bytes::Bytes;
14use crc32fast::hash as crc32_hash;
15use std::mem::MaybeUninit;
16use std::path::Path;
17use std::ptr;
18use std::sync::{Arc, Mutex};
19
20use hexz_common::constants::{BLOCK_OFFSET_PARENT, DEFAULT_BLOCK_SIZE};
21use hexz_common::{Error, Result};
22use rayon::prelude::*;
23
24/// Shared zero block for the default block size to avoid allocating when returning zero blocks.
25static ZEROS_64K: [u8; DEFAULT_BLOCK_SIZE as usize] = [0u8; DEFAULT_BLOCK_SIZE as usize];
26
27/// Work item for block decompression: (block_idx, info, buf_offset, offset_in_block, to_copy)
28type WorkItem = (u64, BlockInfo, usize, usize, usize);
29
30/// Logical stream identifier for dual-stream snapshots.
31///
32/// Hexz snapshots can store two independent data streams:
33/// - **Disk**: Persistent storage (disk image, filesystem data)
34/// - **Memory**: Volatile state (RAM contents, process memory)
35///
36/// # Example
37///
38/// ```no_run
39/// use hexz_core::{File, SnapshotStream};
40/// # use std::sync::Arc;
41/// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
42/// // Read 4KB from disk stream
43/// let disk_data = snapshot.read_at(SnapshotStream::Disk, 0, 4096)?;
44///
45/// // Read 4KB from memory stream (if present)
46/// let mem_data = snapshot.read_at(SnapshotStream::Memory, 0, 4096)?;
47/// # Ok(())
48/// # }
49/// ```
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51#[repr(u8)]
52pub enum SnapshotStream {
53    /// Persistent disk/storage stream
54    Disk = 0,
55    /// Volatile memory stream
56    Memory = 1,
57}
58
59/// Read-only interface for accessing Hexz snapshot data.
60///
61/// `File` is the primary API for reading compressed, block-indexed snapshots.
62/// It handles:
63/// - Block-level decompression with LRU caching
64/// - Optional AES-256-GCM decryption
65/// - Thin snapshot parent chaining
66/// - Dual-stream access (disk and memory)
67/// - Random access with minimal I/O
68///
69/// # Thread Safety
70///
71/// `File` is `Send + Sync` and can be safely shared across threads via `Arc`.
72/// Internal caches use `Mutex` for synchronization.
73///
74/// # Performance
75///
76/// - **Cache hit latency**: ~80μs (warm cache)
77/// - **Cache miss latency**: ~1ms (cold cache, local storage)
78/// - **Sequential throughput**: ~2-3 GB/s (NVMe + LZ4)
79/// - **Memory overhead**: ~150MB typical (configurable)
80///
81/// # Examples
82///
83/// ## Basic Usage
84///
85/// ```no_run
86/// use hexz_core::{File, SnapshotStream};
87/// use hexz_core::store::local::FileBackend;
88/// use hexz_core::algo::compression::lz4::Lz4Compressor;
89/// use std::sync::Arc;
90///
91/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
92/// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
93/// let compressor = Box::new(Lz4Compressor::new());
94/// let snapshot = File::new(backend, compressor, None)?;
95///
96/// // Read 4KB at offset 1MB
97/// let data = snapshot.read_at(SnapshotStream::Disk, 1024 * 1024, 4096)?;
98/// assert_eq!(data.len(), 4096);
99/// # Ok(())
100/// # }
101/// ```
102///
103/// ## Thin Snapshots (with parent)
104///
105/// ```no_run
106/// use hexz_core::File;
107/// use hexz_core::store::local::FileBackend;
108/// use hexz_core::algo::compression::lz4::Lz4Compressor;
109/// use std::sync::Arc;
110///
111/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
112/// // Open base snapshot
113/// let base_backend = Arc::new(FileBackend::new("base.hxz".as_ref())?);
114/// let base = File::new(
115///     base_backend,
116///     Box::new(Lz4Compressor::new()),
117///     None
118/// )?;
119///
120/// // The thin snapshot will automatically load its parent based on
121/// // the parent_path field in the header
122/// let thin_backend = Arc::new(FileBackend::new("incremental.hxz".as_ref())?);
123/// let thin = File::new(
124///     thin_backend,
125///     Box::new(Lz4Compressor::new()),
126///     None
127/// )?;
128///
129/// // Reads automatically fall back to base for unchanged blocks
130/// let data = thin.read_at(hexz_core::SnapshotStream::Disk, 0, 4096)?;
131/// # Ok(())
132/// # }
133/// ```
134pub struct File {
135    /// Snapshot metadata (sizes, compression, encryption settings)
136    pub header: Header,
137
138    /// Master index containing top-level page entries
139    master: MasterIndex,
140
141    /// Storage backend for reading raw snapshot data
142    backend: Arc<dyn StorageBackend>,
143
144    /// Compression algorithm (LZ4 or Zstandard)
145    compressor: Box<dyn Compressor>,
146
147    /// Optional encryption (AES-256-GCM)
148    encryptor: Option<Box<dyn Encryptor>>,
149
150    /// Optional parent snapshot for thin (incremental) snapshots.
151    /// When a block's offset is BLOCK_OFFSET_PARENT, data is fetched from parent.
152    parent: Option<Arc<File>>,
153
154    /// LRU cache for decompressed blocks (per-stream, per-block-index)
155    cache_l1: BlockCache,
156
157    /// Sharded LRU cache for deserialized index pages
158    page_cache: ShardedPageCache,
159
160    /// Optional prefetcher for background data loading
161    prefetcher: Option<Prefetcher>,
162}
163
164impl File {
165    /// Opens a Hexz snapshot with default cache settings.
166    ///
167    /// This is the primary constructor for `File`. It:
168    /// 1. Reads and validates the snapshot header (magic bytes, version)
169    /// 2. Deserializes the master index
170    /// 3. Recursively loads parent snapshots (for thin snapshots)
171    /// 4. Initializes block and page caches
172    ///
173    /// # Parameters
174    ///
175    /// - `backend`: Storage backend (local file, HTTP, S3, etc.)
176    /// - `compressor`: Compression algorithm matching the snapshot format
177    /// - `encryptor`: Optional decryption handler (pass `None` for unencrypted snapshots)
178    ///
179    /// # Returns
180    ///
181    /// - `Ok(File)` on success
182    /// - `Err(Error::Format)` if magic bytes or version are invalid
183    /// - `Err(Error::Io)` if storage backend fails
184    ///
185    /// # Examples
186    ///
187    /// ```no_run
188    /// use hexz_core::{File, SnapshotStream};
189    /// use hexz_core::store::local::FileBackend;
190    /// use hexz_core::algo::compression::lz4::Lz4Compressor;
191    /// use std::sync::Arc;
192    ///
193    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
194    /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
195    /// let compressor = Box::new(Lz4Compressor::new());
196    /// let snapshot = File::new(backend, compressor, None)?;
197    ///
198    /// println!("Disk size: {} bytes", snapshot.size(SnapshotStream::Disk));
199    /// # Ok(())
200    /// # }
201    /// ```
202    /// Opens a snapshot, auto-detecting compression and dictionary from the header.
203    ///
204    /// This eliminates the 3-step boilerplate of: read header, load dict, create
205    /// compressor. Equivalent to `File::new(backend, auto_compressor, encryptor)`.
206    pub fn open(
207        backend: Arc<dyn StorageBackend>,
208        encryptor: Option<Box<dyn Encryptor>>,
209    ) -> Result<Arc<Self>> {
210        Self::open_with_cache(backend, encryptor, None, None)
211    }
212
213    /// Like [`open`](Self::open) but with custom cache and prefetch settings.
214    pub fn open_with_cache(
215        backend: Arc<dyn StorageBackend>,
216        encryptor: Option<Box<dyn Encryptor>>,
217        cache_capacity_bytes: Option<usize>,
218        prefetch_window_size: Option<u32>,
219    ) -> Result<Arc<Self>> {
220        let header = Header::read_from_backend(backend.as_ref())?;
221        let dictionary = header.load_dictionary(backend.as_ref())?;
222        let compressor = create_compressor(header.compression, None, dictionary);
223        Self::with_cache(
224            backend,
225            compressor,
226            encryptor,
227            cache_capacity_bytes,
228            prefetch_window_size,
229        )
230    }
231
232    pub fn new(
233        backend: Arc<dyn StorageBackend>,
234        compressor: Box<dyn Compressor>,
235        encryptor: Option<Box<dyn Encryptor>>,
236    ) -> Result<Arc<Self>> {
237        Self::with_cache(backend, compressor, encryptor, None, None)
238    }
239
240    /// Opens a Hexz snapshot with custom cache capacity and prefetching.
241    ///
242    /// Identical to [`new`](Self::new) but allows specifying cache size and prefetch window.
243    ///
244    /// # Parameters
245    ///
246    /// - `backend`: Storage backend
247    /// - `compressor`: Compression algorithm
248    /// - `encryptor`: Optional decryption handler
249    /// - `cache_capacity_bytes`: Block cache size in bytes (default: ~400MB for 4KB blocks)
250    /// - `prefetch_window_size`: Number of blocks to prefetch ahead (default: disabled)
251    ///
252    /// # Cache Sizing
253    ///
254    /// The cache stores decompressed blocks. Given a block size of 4KB:
255    /// - `Some(100_000_000)` → ~24,000 blocks (~96MB effective)
256    /// - `None` → 1000 blocks (~4MB effective)
257    ///
258    /// Larger caches reduce repeated decompression but increase memory usage.
259    ///
260    /// # Prefetching
261    ///
262    /// When `prefetch_window_size` is set, the system will automatically fetch the next N blocks
263    /// in the background after each read, optimizing sequential access patterns:
264    /// - `Some(4)` → Prefetch 4 blocks ahead
265    /// - `None` or `Some(0)` → Disable prefetching
266    ///
267    /// # Examples
268    ///
269    /// ```no_run
270    /// use hexz_core::File;
271    /// use hexz_core::store::local::FileBackend;
272    /// use hexz_core::algo::compression::lz4::Lz4Compressor;
273    /// use std::sync::Arc;
274    ///
275    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
276    /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
277    /// let compressor = Box::new(Lz4Compressor::new());
278    ///
279    /// // Allocate 256MB for cache, prefetch 4 blocks ahead
280    /// let snapshot = File::with_cache(
281    ///     backend,
282    ///     compressor,
283    ///     None,
284    ///     Some(256 * 1024 * 1024),
285    ///     Some(4)
286    /// )?;
287    /// # Ok(())
288    /// # }
289    /// ```
290    pub fn with_cache(
291        backend: Arc<dyn StorageBackend>,
292        compressor: Box<dyn Compressor>,
293        encryptor: Option<Box<dyn Encryptor>>,
294        cache_capacity_bytes: Option<usize>,
295        prefetch_window_size: Option<u32>,
296    ) -> Result<Arc<Self>> {
297        let header_bytes = backend.read_exact(0, HEADER_SIZE)?;
298        let header: Header = bincode::deserialize(&header_bytes)?;
299
300        if &header.magic != MAGIC_BYTES {
301            return Err(Error::Format("Invalid magic bytes".into()));
302        }
303
304        // Check version compatibility
305        let compatibility = check_version(header.version);
306        match compatibility {
307            VersionCompatibility::Full => {
308                // Perfect match, proceed silently
309            }
310            VersionCompatibility::Degraded => {
311                // Newer version, issue warning but allow
312                tracing::warn!("{}", compatibility_message(header.version));
313            }
314            VersionCompatibility::Incompatible => {
315                // Too old or too new, reject
316                return Err(Error::Format(compatibility_message(header.version)));
317            }
318        }
319
320        let index_bytes = backend.read_exact(
321            header.index_offset,
322            (backend.len() - header.index_offset) as usize,
323        )?;
324
325        let master: MasterIndex = bincode::deserialize(&index_bytes)?;
326
327        // Recursively load parent if present
328        let parent = if let Some(parent_path) = &header.parent_path {
329            tracing::info!("Loading parent snapshot: {}", parent_path);
330            let p_backend = Arc::new(FileBackend::new(Path::new(parent_path))?);
331            Some(File::open(p_backend, None)?)
332        } else {
333            None
334        };
335
336        let block_size = header.block_size as usize;
337        let l1_capacity = if let Some(bytes) = cache_capacity_bytes {
338            (bytes / block_size).max(1)
339        } else {
340            1000
341        };
342
343        // Initialize prefetcher if window size is specified and > 0
344        let prefetcher = prefetch_window_size.filter(|&w| w > 0).map(Prefetcher::new);
345
346        Ok(Arc::new(Self {
347            header,
348            master,
349            backend,
350            compressor,
351            encryptor,
352            parent,
353            cache_l1: BlockCache::with_capacity(l1_capacity),
354            page_cache: ShardedPageCache::default(),
355            prefetcher,
356        }))
357    }
358
359    /// Returns the logical size of a stream in bytes.
360    ///
361    /// # Parameters
362    ///
363    /// - `stream`: The stream to query (Disk or Memory)
364    ///
365    /// # Returns
366    ///
367    /// The uncompressed, logical size of the stream. This is the size you would
368    /// get if you decompressed all blocks and concatenated them.
369    ///
370    /// # Examples
371    ///
372    /// ```no_run
373    /// use hexz_core::{File, SnapshotStream};
374    /// # use std::sync::Arc;
375    /// # fn example(snapshot: Arc<File>) {
376    /// let disk_bytes = snapshot.size(SnapshotStream::Disk);
377    /// let mem_bytes = snapshot.size(SnapshotStream::Memory);
378    ///
379    /// println!("Disk: {} GB", disk_bytes / (1024 * 1024 * 1024));
380    /// println!("Memory: {} MB", mem_bytes / (1024 * 1024));
381    /// # }
382    /// ```
383    /// Returns the total number of prefetch operations spawned since this file was opened.
384    /// Returns 0 if prefetching is disabled.
385    pub fn prefetch_spawn_count(&self) -> u64 {
386        self.prefetcher.as_ref().map_or(0, |p| p.spawn_count())
387    }
388
389    pub fn size(&self, stream: SnapshotStream) -> u64 {
390        match stream {
391            SnapshotStream::Disk => self.master.disk_size,
392            SnapshotStream::Memory => self.master.memory_size,
393        }
394    }
395
396    /// Reads data from a snapshot stream at a given offset.
397    ///
398    /// This is the primary read method for random access. It:
399    /// 1. Identifies which blocks overlap the requested range
400    /// 2. Fetches blocks from cache or decompresses from storage
401    /// 3. Handles thin snapshot fallback to parent
402    /// 4. Assembles the final buffer from block slices
403    ///
404    /// # Parameters
405    ///
406    /// - `stream`: Which stream to read from (Disk or Memory)
407    /// - `offset`: Starting byte offset (0-indexed)
408    /// - `len`: Number of bytes to read
409    ///
410    /// # Returns
411    ///
412    /// A `Vec<u8>` containing up to `len` bytes. The returned vector may be shorter
413    /// if:
414    /// - `offset` is beyond the stream size (returns empty vector)
415    /// - `offset + len` exceeds stream size (returns partial data)
416    ///
417    /// Missing data (sparse regions) is zero-filled.
418    ///
419    /// # Errors
420    ///
421    /// - `Error::Io` if backend read fails (e.g. truncated file)
422    /// - `Error::Corruption(block_idx)` if block checksum does not match
423    /// - `Error::Decompression` if block decompression fails
424    /// - `Error::Decryption` if block decryption fails
425    ///
426    /// # Performance
427    ///
428    /// - **Cache hit**: ~80μs latency, no I/O
429    /// - **Cache miss**: ~1ms latency (local storage), includes decompression
430    /// - **Remote storage**: Latency depends on network (HTTP: ~50ms, S3: ~100ms)
431    ///
432    /// Aligned reads (offset % block_size == 0) are most efficient.
433    ///
434    /// # Examples
435    ///
436    /// ```no_run
437    /// use hexz_core::{File, SnapshotStream};
438    /// # use std::sync::Arc;
439    /// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
440    /// // Read first 512 bytes of disk stream
441    /// let boot_sector = snapshot.read_at(SnapshotStream::Disk, 0, 512)?;
442    ///
443    /// // Read from arbitrary offset
444    /// let chunk = snapshot.read_at(SnapshotStream::Disk, 1024 * 1024, 4096)?;
445    ///
446    /// // Reading beyond stream size returns empty vector
447    /// let empty = snapshot.read_at(SnapshotStream::Disk, u64::MAX, 100)?;
448    /// assert!(empty.is_empty());
449    /// # Ok(())
450    /// # }
451    /// ```
452    /// Reads a byte range. Uses parallel block decompression when the range spans multiple blocks.
453    pub fn read_at(
454        self: &Arc<Self>,
455        stream: SnapshotStream,
456        offset: u64,
457        len: usize,
458    ) -> Result<Vec<u8>> {
459        let stream_size = self.size(stream);
460        if offset >= stream_size {
461            return Ok(Vec::new());
462        }
463        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
464        if actual_len == 0 {
465            return Ok(Vec::new());
466        }
467
468        let pages = match stream {
469            SnapshotStream::Disk => &self.master.disk_pages,
470            SnapshotStream::Memory => &self.master.memory_pages,
471        };
472
473        if pages.is_empty() {
474            if let Some(parent) = &self.parent {
475                return parent.read_at(stream, offset, actual_len);
476            }
477            return Ok(vec![0u8; actual_len]);
478        }
479
480        let mut buf: Vec<MaybeUninit<u8>> = Vec::new();
481        buf.resize_with(actual_len, MaybeUninit::uninit);
482        self.read_at_into_uninit(stream, offset, &mut buf)?;
483        let ptr = buf.as_mut_ptr().cast::<u8>();
484        let len = buf.len();
485        let cap = buf.capacity();
486        std::mem::forget(buf);
487        // SAFETY: `buf` was a Vec<MaybeUninit<u8>> that we fully initialized via
488        // `read_at_into_uninit` (which writes every byte). We `forget` the original
489        // Vec to avoid a double-free and reconstruct it with the same ptr/len/cap.
490        // MaybeUninit<u8> has the same layout as u8.
491        Ok(unsafe { Vec::from_raw_parts(ptr, len, cap) })
492    }
493
494    /// Reads into a provided buffer. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
495    pub fn read_at_into(
496        self: &Arc<Self>,
497        stream: SnapshotStream,
498        offset: u64,
499        buffer: &mut [u8],
500    ) -> Result<()> {
501        let len = buffer.len();
502        if len == 0 {
503            return Ok(());
504        }
505        let stream_size = self.size(stream);
506        if offset >= stream_size {
507            buffer.fill(0);
508            return Ok(());
509        }
510        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
511        if actual_len < len {
512            buffer[actual_len..].fill(0);
513        }
514        self.read_at_into_uninit_bytes(stream, offset, &mut buffer[0..actual_len])
515    }
516
517    /// Minimum number of local blocks to use the parallel decompression path.
518    const PARALLEL_MIN_BLOCKS: usize = 2;
519
520    /// Collects work items for blocks that need decompression.
521    ///
522    /// This method iterates through index pages and blocks, handling:
523    /// - Parent blocks: delegate to parent snapshot or zero-fill
524    /// - Zero blocks: zero-fill directly
525    /// - Regular blocks: add to work queue for later decompression
526    ///
527    /// Returns the work items to process and updates the tracking variables.
528    fn collect_work_items(
529        &self,
530        stream: SnapshotStream,
531        pages: &[PageEntry],
532        page_idx: usize,
533        target: &mut [MaybeUninit<u8>],
534        offset: u64,
535        actual_len: usize,
536    ) -> Result<(Vec<WorkItem>, usize)> {
537        let mut local_work: Vec<WorkItem> = Vec::new();
538        let mut buf_offset = 0;
539        let mut current_pos = offset;
540        let mut remaining = actual_len;
541
542        for page_entry in pages.iter().skip(page_idx) {
543            if remaining == 0 {
544                break;
545            }
546            if page_entry.start_logical > current_pos + remaining as u64 {
547                break;
548            }
549
550            let page = self.get_page(page_entry)?;
551            let mut block_logical_start = page_entry.start_logical;
552
553            for (block_idx_in_page, block) in page.blocks.iter().enumerate() {
554                let block_end = block_logical_start + block.logical_len as u64;
555
556                if block_end > current_pos {
557                    let global_block_idx = page_entry.start_block + block_idx_in_page as u64;
558                    let offset_in_block = (current_pos - block_logical_start) as usize;
559                    let to_copy = std::cmp::min(
560                        remaining,
561                        (block.logical_len as usize).saturating_sub(offset_in_block),
562                    );
563
564                    if block.offset == BLOCK_OFFSET_PARENT {
565                        // Parent block: delegate or zero-fill
566                        if let Some(parent) = &self.parent {
567                            let dest = &mut target[buf_offset..buf_offset + to_copy];
568                            parent.read_at_into_uninit(stream, current_pos, dest)?;
569                        } else {
570                            Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
571                        }
572                        current_pos += to_copy as u64;
573                        buf_offset += to_copy;
574                        remaining -= to_copy;
575                    } else if block.length == 0 {
576                        // Zero block: fill with zeros
577                        Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
578                        current_pos += to_copy as u64;
579                        buf_offset += to_copy;
580                        remaining -= to_copy;
581                    } else {
582                        // Regular block: add to work queue
583                        if to_copy > 0 {
584                            local_work.push((
585                                global_block_idx,
586                                *block,
587                                buf_offset,
588                                offset_in_block,
589                                to_copy,
590                            ));
591                            buf_offset += to_copy;
592                            current_pos += to_copy as u64;
593                            remaining -= to_copy;
594                        }
595                    }
596
597                    if remaining == 0 {
598                        break;
599                    }
600                }
601                block_logical_start += block.logical_len as u64;
602            }
603        }
604
605        Ok((local_work, buf_offset))
606    }
607
608    /// Executes parallel decompression for multiple blocks.
609    ///
610    /// Uses a two-phase approach:
611    /// 1. Parallel I/O: Fetch all raw blocks concurrently
612    /// 2. Parallel CPU: Decompress and copy to target buffer
613    fn execute_parallel_decompression(
614        self: &Arc<Self>,
615        stream: SnapshotStream,
616        work_items: &[WorkItem],
617        target: &mut [MaybeUninit<u8>],
618        actual_len: usize,
619    ) -> Result<()> {
620        let snap = Arc::clone(self);
621        let target_addr = target.as_mut_ptr() as usize;
622
623        // Phase 1: Parallel fetch all raw blocks
624        let raw_blocks: Vec<Result<Bytes>> = work_items
625            .par_iter()
626            .map(|(block_idx, info, _, _, _)| snap.fetch_raw_block(stream, *block_idx, info))
627            .collect();
628
629        // Phase 2: Parallel decompress and copy
630        let err: Mutex<Option<Error>> = Mutex::new(None);
631        work_items
632            .par_iter()
633            .zip(raw_blocks)
634            .for_each(|(work_item, raw_result)| {
635                if err.lock().map_or(true, |e| e.is_some()) {
636                    return;
637                }
638
639                let (block_idx, info, buf_offset, offset_in_block, to_copy) = work_item;
640
641                // Handle fetch errors
642                let raw = match raw_result {
643                    Ok(r) => r,
644                    Err(e) => {
645                        if let Ok(mut guard) = err.lock() {
646                            let _ = guard.replace(e);
647                        }
648                        return;
649                    }
650                };
651
652                // If cache hit or zero block, raw is already decompressed
653                let data = if info.length == 0 || snap.cache_l1.get(stream, *block_idx).is_some() {
654                    raw
655                } else {
656                    // Decompress and verify
657                    match snap.decompress_and_verify(raw, *block_idx, info) {
658                        Ok(d) => {
659                            // Cache the result
660                            snap.cache_l1.insert(stream, *block_idx, d.clone());
661                            d
662                        }
663                        Err(e) => {
664                            if let Ok(mut guard) = err.lock() {
665                                let _ = guard.replace(e);
666                            }
667                            return;
668                        }
669                    }
670                };
671
672                // Copy to target buffer
673                let src = data.as_ref();
674                let start = *offset_in_block;
675                let len = *to_copy;
676                if start < src.len() && len <= src.len() - start {
677                    // Defensive assertion: ensure destination write is within bounds
678                    debug_assert!(
679                        buf_offset + len <= actual_len,
680                        "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
681                        len,
682                        buf_offset,
683                        actual_len
684                    );
685                    let dest = (target_addr + buf_offset) as *mut u8;
686                    // SAFETY: `src[start..start+len]` is in-bounds (checked above).
687                    // `dest` points into the `target` MaybeUninit buffer at a unique
688                    // non-overlapping offset (each work item has a distinct `buf_offset`),
689                    // and the rayon par_iter ensures each item writes to a disjoint region.
690                    // The debug_assert above validates buf_offset + len <= actual_len.
691                    unsafe { ptr::copy_nonoverlapping(src[start..].as_ptr(), dest, len) };
692                }
693            });
694
695        if let Some(e) = err.lock().ok().and_then(|mut guard| guard.take()) {
696            return Err(e);
697        }
698
699        Ok(())
700    }
701
702    /// Executes serial decompression for a small number of blocks.
703    fn execute_serial_decompression(
704        &self,
705        stream: SnapshotStream,
706        work_items: &[WorkItem],
707        target: &mut [MaybeUninit<u8>],
708        actual_len: usize,
709    ) -> Result<()> {
710        for (block_idx, info, buf_offset, offset_in_block, to_copy) in work_items {
711            let data = self.resolve_block_data(stream, *block_idx, info)?;
712            let src = data.as_ref();
713            let start = *offset_in_block;
714            if start < src.len() && *to_copy <= src.len() - start {
715                // Defensive assertion: ensure destination write is within bounds
716                debug_assert!(
717                    *buf_offset + *to_copy <= actual_len,
718                    "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
719                    to_copy,
720                    buf_offset,
721                    actual_len
722                );
723                // SAFETY: `src[start..start+to_copy]` is in-bounds (checked above).
724                // `target[buf_offset..]` has sufficient room because `buf_offset + to_copy`
725                // never exceeds `actual_len` (tracked during work-item collection).
726                // The debug_assert above validates this invariant.
727                // MaybeUninit<u8> has the same layout as u8.
728                unsafe {
729                    ptr::copy_nonoverlapping(
730                        src[start..].as_ptr(),
731                        target[*buf_offset..].as_mut_ptr() as *mut u8,
732                        *to_copy,
733                    );
734                }
735            }
736        }
737        Ok(())
738    }
739
740    /// Zero-fills a slice of uninitialized memory.
741    ///
742    /// This helper centralizes all unsafe zero-filling operations to improve
743    /// safety auditing and reduce code duplication.
744    ///
745    /// # Safety
746    ///
747    /// This function writes zeros to the provided buffer, making it fully initialized.
748    /// The caller must ensure the buffer is valid for writes.
749    #[inline]
750    fn zero_fill_uninit(buffer: &mut [MaybeUninit<u8>]) {
751        if !buffer.is_empty() {
752            // SAFETY: buffer is a valid &mut [MaybeUninit<u8>] slice, so writing
753            // buffer.len() zero bytes through its pointer is in-bounds.
754            unsafe { ptr::write_bytes(buffer.as_mut_ptr(), 0, buffer.len()) };
755        }
756    }
757
758    /// Writes into uninitialized memory. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
759    ///
760    /// **On error:** The buffer contents are undefined (possibly partially written).
761    pub fn read_at_into_uninit(
762        self: &Arc<Self>,
763        stream: SnapshotStream,
764        offset: u64,
765        buffer: &mut [MaybeUninit<u8>],
766    ) -> Result<()> {
767        self.read_at_uninit_inner(stream, offset, buffer, false)
768    }
769
770    /// Inner implementation of [`read_at_into_uninit`](Self::read_at_into_uninit).
771    ///
772    /// The `is_prefetch` flag prevents recursive prefetch thread spawning:
773    /// when `true`, the prefetch block is skipped to avoid unbounded thread creation.
774    fn read_at_uninit_inner(
775        self: &Arc<Self>,
776        stream: SnapshotStream,
777        offset: u64,
778        buffer: &mut [MaybeUninit<u8>],
779        is_prefetch: bool,
780    ) -> Result<()> {
781        // Early validation
782        let len = buffer.len();
783        if len == 0 {
784            return Ok(());
785        }
786
787        let stream_size = self.size(stream);
788        if offset >= stream_size {
789            Self::zero_fill_uninit(buffer);
790            return Ok(());
791        }
792
793        // Calculate actual read length and zero-fill suffix if needed
794        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
795        if actual_len < len {
796            Self::zero_fill_uninit(&mut buffer[actual_len..]);
797        }
798
799        let target = &mut buffer[0..actual_len];
800
801        // Get page list for stream
802        let pages = match stream {
803            SnapshotStream::Disk => &self.master.disk_pages,
804            SnapshotStream::Memory => &self.master.memory_pages,
805        };
806
807        // Delegate to parent if no index pages
808        if pages.is_empty() {
809            if let Some(parent) = &self.parent {
810                return parent.read_at_into_uninit(stream, offset, target);
811            }
812            Self::zero_fill_uninit(target);
813            return Ok(());
814        }
815
816        // Find starting page index
817        let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
818            Ok(idx) => idx,
819            Err(idx) => idx.saturating_sub(1),
820        };
821
822        // Collect work items (handles parent blocks, zero blocks, and queues regular blocks)
823        let (work_items, buf_offset) =
824            self.collect_work_items(stream, pages, page_idx, target, offset, actual_len)?;
825
826        // Choose parallel or serial decompression based on work item count
827        if work_items.len() >= Self::PARALLEL_MIN_BLOCKS {
828            self.execute_parallel_decompression(stream, &work_items, target, actual_len)?;
829        } else {
830            self.execute_serial_decompression(stream, &work_items, target, actual_len)?;
831        }
832
833        // Handle any remaining unprocessed data
834        let remaining = actual_len - buf_offset;
835        if remaining > 0 {
836            if let Some(parent) = &self.parent {
837                let current_pos = offset + buf_offset as u64;
838                parent.read_at_into_uninit(stream, current_pos, &mut target[buf_offset..])?;
839            } else {
840                Self::zero_fill_uninit(&mut target[buf_offset..]);
841            }
842        }
843
844        // Trigger prefetch for next sequential blocks if enabled.
845        // Guards:
846        // 1. `is_prefetch` prevents recursive spawning (prefetch thread spawning another)
847        // 2. `try_start()` limits to one in-flight prefetch at a time, preventing
848        //    unbounded thread creation under rapid sequential reads
849        if let Some(prefetcher) = &self.prefetcher {
850            if !is_prefetch && !work_items.is_empty() && prefetcher.try_start() {
851                let next_offset = offset + actual_len as u64;
852                let prefetch_len = (self.header.block_size * 4) as usize;
853                let snap = Arc::clone(self);
854                let stream_copy = stream;
855                std::thread::spawn(move || {
856                    let mut buf = vec![MaybeUninit::uninit(); prefetch_len];
857                    let _ = snap.read_at_uninit_inner(stream_copy, next_offset, &mut buf, true);
858                    // Release the in-flight guard so the next read can prefetch
859                    if let Some(pf) = &snap.prefetcher {
860                        pf.clear_in_flight();
861                    }
862                });
863            }
864        }
865
866        Ok(())
867    }
868
869    /// Like [`read_at_into_uninit`](Self::read_at_into_uninit) but accepts `&mut [u8]`. Use from FFI (e.g. Python).
870    #[inline]
871    pub fn read_at_into_uninit_bytes(
872        self: &Arc<Self>,
873        stream: SnapshotStream,
874        offset: u64,
875        buf: &mut [u8],
876    ) -> Result<()> {
877        if buf.is_empty() {
878            return Ok(());
879        }
880        // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
881        // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
882        // The borrow is derived from `buf` so no aliasing occurs.
883        let uninit = unsafe { &mut *(buf as *mut [u8] as *mut [MaybeUninit<u8>]) };
884        self.read_at_into_uninit(stream, offset, uninit)
885    }
886
887    /// Fetches an index page from cache or storage.
888    ///
889    /// Index pages map logical offsets to physical block locations. This method
890    /// maintains an LRU cache to avoid repeated deserialization.
891    ///
892    /// # Parameters
893    ///
894    /// - `entry`: Page metadata from master index
895    ///
896    /// # Returns
897    ///
898    /// A shared reference to the deserialized index page.
899    ///
900    /// # Thread Safety
901    ///
902    /// This method acquires a lock on the page cache only for cache lookup and insertion.
903    /// I/O and deserialization are performed without holding the lock to avoid blocking
904    /// other threads during cache misses.
905    fn get_page(&self, entry: &PageEntry) -> Result<Arc<IndexPage>> {
906        // Fast path: check sharded cache
907        if let Some(p) = self.page_cache.get(entry.offset) {
908            return Ok(p);
909        }
910
911        // Slow path: I/O and deserialization without holding any lock
912        let bytes = self
913            .backend
914            .read_exact(entry.offset, entry.length as usize)?;
915        let page: IndexPage = bincode::deserialize(&bytes)?;
916        let arc = Arc::new(page);
917
918        // Check again in case another thread inserted while we were doing I/O
919        if let Some(p) = self.page_cache.get(entry.offset) {
920            return Ok(p);
921        }
922        self.page_cache.insert(entry.offset, arc.clone());
923
924        Ok(arc)
925    }
926
927    /// Fetches raw compressed block data from cache or storage.
928    ///
929    /// This is the I/O portion of block resolution, separated to enable parallel I/O.
930    /// It:
931    /// 1. Checks the block cache
932    /// 2. Handles zero-length blocks
933    /// 3. Reads raw compressed data from backend
934    ///
935    /// # Parameters
936    ///
937    /// - `stream`: Stream identifier (for cache key)
938    /// - `block_idx`: Global block index
939    /// - `info`: Block metadata (offset, length)
940    ///
941    /// # Returns
942    ///
943    /// Raw block data (potentially compressed/encrypted) or cached decompressed data.
944    fn fetch_raw_block(
945        &self,
946        stream: SnapshotStream,
947        block_idx: u64,
948        info: &BlockInfo,
949    ) -> Result<Bytes> {
950        // Check cache first - return decompressed data if available
951        if let Some(data) = self.cache_l1.get(stream, block_idx) {
952            return Ok(data);
953        }
954
955        // Handle zero blocks
956        if info.length == 0 {
957            let len = info.logical_len as usize;
958            if len == 0 {
959                return Ok(Bytes::new());
960            }
961            if len == ZEROS_64K.len() {
962                return Ok(Bytes::from_static(&ZEROS_64K));
963            }
964            return Ok(Bytes::from(vec![0u8; len]));
965        }
966
967        // Fetch raw compressed data (THIS IS THE PARALLEL PART)
968        self.backend.read_exact(info.offset, info.length as usize)
969    }
970
971    /// Decompresses and verifies a raw block.
972    ///
973    /// This is the CPU portion of block resolution, separated to enable parallel decompression.
974    /// It:
975    /// 1. Verifies CRC32 checksum
976    /// 2. Decrypts (if encrypted)
977    /// 3. Decompresses
978    ///
979    /// # Parameters
980    ///
981    /// - `raw`: Raw block data (potentially compressed/encrypted)
982    /// - `block_idx`: Global block index (for error reporting and decryption)
983    /// - `info`: Block metadata (checksum)
984    ///
985    /// # Returns
986    ///
987    /// Decompressed block data as `Bytes`.
988    ///
989    /// # Performance
990    ///
991    /// Decompression throughput:
992    /// - LZ4: ~2 GB/s per core
993    /// - Zstd: ~500 MB/s per core
994    fn decompress_and_verify(&self, raw: Bytes, block_idx: u64, info: &BlockInfo) -> Result<Bytes> {
995        // Verify stored checksum (CRC32 of compressed/encrypted data) before decrypt/decompress
996        if info.checksum != 0 {
997            let computed = crc32_hash(&raw);
998            if computed != info.checksum {
999                return Err(Error::Corruption(block_idx));
1000            }
1001        }
1002
1003        // Pre-allocate exact output buffer to avoid over-allocation inside decompressor.
1004        // We use decompress_into() instead of decompress() to eliminate the allocation
1005        // and potential reallocation overhead inside the compression library.
1006        //
1007        // Performance impact: Avoids zero-initialization overhead (~16% improvement for
1008        // high-thread-count workloads based on benchmarks).
1009        let out_len = info.logical_len as usize;
1010        let mut out = Vec::with_capacity(out_len);
1011
1012        // SAFETY: This unsafe block is required to create an uninitialized buffer for
1013        // decompress_into() to write into. This is safe because:
1014        //
1015        // 1. Contract guarantee: Both LZ4 and Zstd decompress_into() implementations
1016        //    promise to either:
1017        //    a) Write exactly `out.len()` bytes (the full decompressed size), OR
1018        //    b) Return an Err() if decompression fails (buffer underrun, corruption, etc.)
1019        //
1020        // 2. Size accuracy: We set out.len() to info.logical_len, which is the exact
1021        //    decompressed size recorded in the block metadata during compression.
1022        //    The decompressor will write exactly this many bytes or fail.
1023        //
1024        // 3. Error propagation: If decompress_into() returns Err(), we propagate it
1025        //    immediately via the ? operator. The uninitialized buffer is dropped
1026        //    without ever being read.
1027        //
1028        // 4. No partial writes: The decompressor APIs do not support partial writes.
1029        //    They either fully succeed or fully fail. We never access a partially
1030        //    initialized buffer.
1031        //
1032        // 5. Memory safety: We never read from `out` before decompress_into() succeeds.
1033        //    The only subsequent access is Bytes::from(out), which transfers ownership
1034        //    of the now-fully-initialized buffer.
1035        //
1036        // This is a well-established pattern for zero-copy decompression. The clippy
1037        // lint is conservative and warns about ANY use of set_len() after with_capacity(),
1038        // but in this case we have explicit API guarantees from the decompressor.
1039        #[allow(clippy::uninit_vec)]
1040        unsafe {
1041            out.set_len(out_len);
1042        }
1043
1044        if let Some(enc) = &self.encryptor {
1045            let compressed = enc.decrypt(&raw, block_idx)?;
1046            self.compressor.decompress_into(&compressed, &mut out)?;
1047        } else {
1048            self.compressor.decompress_into(raw.as_ref(), &mut out)?;
1049        }
1050
1051        Ok(Bytes::from(out))
1052    }
1053
1054    /// Resolves raw block data by fetching from cache or decompressing from storage.
1055    ///
1056    /// This is the core decompression path. It:
1057    /// 1. Checks the block cache
1058    /// 2. Reads compressed block from backend
1059    /// 3. Verifies CRC32 checksum (if stored) and returns `Corruption(block_idx)` on mismatch
1060    /// 4. Decrypts (if encrypted)
1061    /// 5. Decompresses
1062    /// 6. Caches the result
1063    ///
1064    /// # Parameters
1065    ///
1066    /// - `stream`: Stream identifier (for cache key)
1067    /// - `block_idx`: Global block index
1068    /// - `info`: Block metadata (offset, length, compression)
1069    ///
1070    /// # Returns
1071    ///
1072    /// Decompressed block data as `Bytes` (zero-copy on cache hit).
1073    ///
1074    /// # Performance
1075    ///
1076    /// This method is hot path for cache misses. Decompression throughput:
1077    /// - LZ4: ~2 GB/s per core
1078    /// - Zstd: ~500 MB/s per core
1079    fn resolve_block_data(
1080        &self,
1081        stream: SnapshotStream,
1082        block_idx: u64,
1083        info: &BlockInfo,
1084    ) -> Result<Bytes> {
1085        // Fetch raw block (from cache or I/O)
1086        let raw = self.fetch_raw_block(stream, block_idx, info)?;
1087
1088        // If cache hit or zero block, raw is already decompressed
1089        if info.length == 0 || self.cache_l1.get(stream, block_idx).is_some() {
1090            return Ok(raw);
1091        }
1092
1093        // Decompress and verify
1094        let data = self.decompress_and_verify(raw, block_idx, info)?;
1095
1096        // Cache the result
1097        self.cache_l1.insert(stream, block_idx, data.clone());
1098        Ok(data)
1099    }
1100}