hexz_core/api/file.rs
1//! High-level snapshot file API and logical stream types.
2
3use crate::algo::compression::{Compressor, create_compressor};
4use crate::algo::encryption::Encryptor;
5use crate::cache::lru::{BlockCache, ShardedPageCache};
6use crate::cache::prefetch::Prefetcher;
7use crate::format::header::Header;
8use crate::format::index::{BlockInfo, IndexPage, MasterIndex, PageEntry};
9use crate::format::magic::{HEADER_SIZE, MAGIC_BYTES};
10use crate::format::version::{VersionCompatibility, check_version, compatibility_message};
11use crate::store::StorageBackend;
12use crate::store::local::file::FileBackend;
13use bytes::Bytes;
14use crc32fast::hash as crc32_hash;
15use std::mem::MaybeUninit;
16use std::path::Path;
17use std::ptr;
18use std::sync::{Arc, Mutex};
19
20use hexz_common::constants::{BLOCK_OFFSET_PARENT, DEFAULT_BLOCK_SIZE};
21use hexz_common::{Error, Result};
22use rayon::prelude::*;
23
24/// Shared zero block for the default block size to avoid allocating when returning zero blocks.
25static ZEROS_64K: [u8; DEFAULT_BLOCK_SIZE as usize] = [0u8; DEFAULT_BLOCK_SIZE as usize];
26
27/// Work item for block decompression: (block_idx, info, buf_offset, offset_in_block, to_copy)
28type WorkItem = (u64, BlockInfo, usize, usize, usize);
29
30/// Logical stream identifier for dual-stream snapshots.
31///
32/// Hexz snapshots can store two independent data streams:
33/// - **Disk**: Persistent storage (disk image, filesystem data)
34/// - **Memory**: Volatile state (RAM contents, process memory)
35///
36/// # Example
37///
38/// ```no_run
39/// use hexz_core::{File, SnapshotStream};
40/// # use std::sync::Arc;
41/// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
42/// // Read 4KB from disk stream
43/// let disk_data = snapshot.read_at(SnapshotStream::Disk, 0, 4096)?;
44///
45/// // Read 4KB from memory stream (if present)
46/// let mem_data = snapshot.read_at(SnapshotStream::Memory, 0, 4096)?;
47/// # Ok(())
48/// # }
49/// ```
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51#[repr(u8)]
52pub enum SnapshotStream {
53 /// Persistent disk/storage stream
54 Disk = 0,
55 /// Volatile memory stream
56 Memory = 1,
57}
58
59/// Read-only interface for accessing Hexz snapshot data.
60///
61/// `File` is the primary API for reading compressed, block-indexed snapshots.
62/// It handles:
63/// - Block-level decompression with LRU caching
64/// - Optional AES-256-GCM decryption
65/// - Thin snapshot parent chaining
66/// - Dual-stream access (disk and memory)
67/// - Random access with minimal I/O
68///
69/// # Thread Safety
70///
71/// `File` is `Send + Sync` and can be safely shared across threads via `Arc`.
72/// Internal caches use `Mutex` for synchronization.
73///
74/// # Performance
75///
76/// - **Cache hit latency**: ~80μs (warm cache)
77/// - **Cache miss latency**: ~1ms (cold cache, local storage)
78/// - **Sequential throughput**: ~2-3 GB/s (NVMe + LZ4)
79/// - **Memory overhead**: ~150MB typical (configurable)
80///
81/// # Examples
82///
83/// ## Basic Usage
84///
85/// ```no_run
86/// use hexz_core::{File, SnapshotStream};
87/// use hexz_core::store::local::FileBackend;
88/// use hexz_core::algo::compression::lz4::Lz4Compressor;
89/// use std::sync::Arc;
90///
91/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
92/// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
93/// let compressor = Box::new(Lz4Compressor::new());
94/// let snapshot = File::new(backend, compressor, None)?;
95///
96/// // Read 4KB at offset 1MB
97/// let data = snapshot.read_at(SnapshotStream::Disk, 1024 * 1024, 4096)?;
98/// assert_eq!(data.len(), 4096);
99/// # Ok(())
100/// # }
101/// ```
102///
103/// ## Thin Snapshots (with parent)
104///
105/// ```no_run
106/// use hexz_core::File;
107/// use hexz_core::store::local::FileBackend;
108/// use hexz_core::algo::compression::lz4::Lz4Compressor;
109/// use std::sync::Arc;
110///
111/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
112/// // Open base snapshot
113/// let base_backend = Arc::new(FileBackend::new("base.hxz".as_ref())?);
114/// let base = File::new(
115/// base_backend,
116/// Box::new(Lz4Compressor::new()),
117/// None
118/// )?;
119///
120/// // The thin snapshot will automatically load its parent based on
121/// // the parent_path field in the header
122/// let thin_backend = Arc::new(FileBackend::new("incremental.hxz".as_ref())?);
123/// let thin = File::new(
124/// thin_backend,
125/// Box::new(Lz4Compressor::new()),
126/// None
127/// )?;
128///
129/// // Reads automatically fall back to base for unchanged blocks
130/// let data = thin.read_at(hexz_core::SnapshotStream::Disk, 0, 4096)?;
131/// # Ok(())
132/// # }
133/// ```
134pub struct File {
135 /// Snapshot metadata (sizes, compression, encryption settings)
136 pub header: Header,
137
138 /// Master index containing top-level page entries
139 master: MasterIndex,
140
141 /// Storage backend for reading raw snapshot data
142 backend: Arc<dyn StorageBackend>,
143
144 /// Compression algorithm (LZ4 or Zstandard)
145 compressor: Box<dyn Compressor>,
146
147 /// Optional encryption (AES-256-GCM)
148 encryptor: Option<Box<dyn Encryptor>>,
149
150 /// Optional parent snapshot for thin (incremental) snapshots.
151 /// When a block's offset is BLOCK_OFFSET_PARENT, data is fetched from parent.
152 parent: Option<Arc<File>>,
153
154 /// LRU cache for decompressed blocks (per-stream, per-block-index)
155 cache_l1: BlockCache,
156
157 /// Sharded LRU cache for deserialized index pages
158 page_cache: ShardedPageCache,
159
160 /// Optional prefetcher for background data loading
161 prefetcher: Option<Prefetcher>,
162}
163
164impl File {
165 /// Opens a Hexz snapshot with default cache settings.
166 ///
167 /// This is the primary constructor for `File`. It:
168 /// 1. Reads and validates the snapshot header (magic bytes, version)
169 /// 2. Deserializes the master index
170 /// 3. Recursively loads parent snapshots (for thin snapshots)
171 /// 4. Initializes block and page caches
172 ///
173 /// # Parameters
174 ///
175 /// - `backend`: Storage backend (local file, HTTP, S3, etc.)
176 /// - `compressor`: Compression algorithm matching the snapshot format
177 /// - `encryptor`: Optional decryption handler (pass `None` for unencrypted snapshots)
178 ///
179 /// # Returns
180 ///
181 /// - `Ok(File)` on success
182 /// - `Err(Error::Format)` if magic bytes or version are invalid
183 /// - `Err(Error::Io)` if storage backend fails
184 ///
185 /// # Examples
186 ///
187 /// ```no_run
188 /// use hexz_core::{File, SnapshotStream};
189 /// use hexz_core::store::local::FileBackend;
190 /// use hexz_core::algo::compression::lz4::Lz4Compressor;
191 /// use std::sync::Arc;
192 ///
193 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
194 /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
195 /// let compressor = Box::new(Lz4Compressor::new());
196 /// let snapshot = File::new(backend, compressor, None)?;
197 ///
198 /// println!("Disk size: {} bytes", snapshot.size(SnapshotStream::Disk));
199 /// # Ok(())
200 /// # }
201 /// ```
202 /// Opens a snapshot, auto-detecting compression and dictionary from the header.
203 ///
204 /// This eliminates the 3-step boilerplate of: read header, load dict, create
205 /// compressor. Equivalent to `File::new(backend, auto_compressor, encryptor)`.
206 pub fn open(
207 backend: Arc<dyn StorageBackend>,
208 encryptor: Option<Box<dyn Encryptor>>,
209 ) -> Result<Arc<Self>> {
210 Self::open_with_cache(backend, encryptor, None, None)
211 }
212
213 /// Like [`open`](Self::open) but with custom cache and prefetch settings.
214 pub fn open_with_cache(
215 backend: Arc<dyn StorageBackend>,
216 encryptor: Option<Box<dyn Encryptor>>,
217 cache_capacity_bytes: Option<usize>,
218 prefetch_window_size: Option<u32>,
219 ) -> Result<Arc<Self>> {
220 let header = Header::read_from_backend(backend.as_ref())?;
221 let dictionary = header.load_dictionary(backend.as_ref())?;
222 let compressor = create_compressor(header.compression, None, dictionary);
223 Self::with_cache(
224 backend,
225 compressor,
226 encryptor,
227 cache_capacity_bytes,
228 prefetch_window_size,
229 )
230 }
231
232 pub fn new(
233 backend: Arc<dyn StorageBackend>,
234 compressor: Box<dyn Compressor>,
235 encryptor: Option<Box<dyn Encryptor>>,
236 ) -> Result<Arc<Self>> {
237 Self::with_cache(backend, compressor, encryptor, None, None)
238 }
239
240 /// Opens a Hexz snapshot with custom cache capacity and prefetching.
241 ///
242 /// Identical to [`new`](Self::new) but allows specifying cache size and prefetch window.
243 ///
244 /// # Parameters
245 ///
246 /// - `backend`: Storage backend
247 /// - `compressor`: Compression algorithm
248 /// - `encryptor`: Optional decryption handler
249 /// - `cache_capacity_bytes`: Block cache size in bytes (default: ~400MB for 4KB blocks)
250 /// - `prefetch_window_size`: Number of blocks to prefetch ahead (default: disabled)
251 ///
252 /// # Cache Sizing
253 ///
254 /// The cache stores decompressed blocks. Given a block size of 4KB:
255 /// - `Some(100_000_000)` → ~24,000 blocks (~96MB effective)
256 /// - `None` → 1000 blocks (~4MB effective)
257 ///
258 /// Larger caches reduce repeated decompression but increase memory usage.
259 ///
260 /// # Prefetching
261 ///
262 /// When `prefetch_window_size` is set, the system will automatically fetch the next N blocks
263 /// in the background after each read, optimizing sequential access patterns:
264 /// - `Some(4)` → Prefetch 4 blocks ahead
265 /// - `None` or `Some(0)` → Disable prefetching
266 ///
267 /// # Examples
268 ///
269 /// ```no_run
270 /// use hexz_core::File;
271 /// use hexz_core::store::local::FileBackend;
272 /// use hexz_core::algo::compression::lz4::Lz4Compressor;
273 /// use std::sync::Arc;
274 ///
275 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
276 /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
277 /// let compressor = Box::new(Lz4Compressor::new());
278 ///
279 /// // Allocate 256MB for cache, prefetch 4 blocks ahead
280 /// let snapshot = File::with_cache(
281 /// backend,
282 /// compressor,
283 /// None,
284 /// Some(256 * 1024 * 1024),
285 /// Some(4)
286 /// )?;
287 /// # Ok(())
288 /// # }
289 /// ```
290 pub fn with_cache(
291 backend: Arc<dyn StorageBackend>,
292 compressor: Box<dyn Compressor>,
293 encryptor: Option<Box<dyn Encryptor>>,
294 cache_capacity_bytes: Option<usize>,
295 prefetch_window_size: Option<u32>,
296 ) -> Result<Arc<Self>> {
297 let header_bytes = backend.read_exact(0, HEADER_SIZE)?;
298 let header: Header = bincode::deserialize(&header_bytes)?;
299
300 if &header.magic != MAGIC_BYTES {
301 return Err(Error::Format("Invalid magic bytes".into()));
302 }
303
304 // Check version compatibility
305 let compatibility = check_version(header.version);
306 match compatibility {
307 VersionCompatibility::Full => {
308 // Perfect match, proceed silently
309 }
310 VersionCompatibility::Degraded => {
311 // Newer version, issue warning but allow
312 tracing::warn!("{}", compatibility_message(header.version));
313 }
314 VersionCompatibility::Incompatible => {
315 // Too old or too new, reject
316 return Err(Error::Format(compatibility_message(header.version)));
317 }
318 }
319
320 let index_bytes = backend.read_exact(
321 header.index_offset,
322 (backend.len() - header.index_offset) as usize,
323 )?;
324
325 let master: MasterIndex = bincode::deserialize(&index_bytes)?;
326
327 // Recursively load parent if present
328 let parent = if let Some(parent_path) = &header.parent_path {
329 tracing::info!("Loading parent snapshot: {}", parent_path);
330 let p_backend = Arc::new(FileBackend::new(Path::new(parent_path))?);
331 Some(File::open(p_backend, None)?)
332 } else {
333 None
334 };
335
336 let block_size = header.block_size as usize;
337 let l1_capacity = if let Some(bytes) = cache_capacity_bytes {
338 (bytes / block_size).max(1)
339 } else {
340 1000
341 };
342
343 // Initialize prefetcher if window size is specified and > 0
344 let prefetcher = prefetch_window_size.filter(|&w| w > 0).map(Prefetcher::new);
345
346 Ok(Arc::new(Self {
347 header,
348 master,
349 backend,
350 compressor,
351 encryptor,
352 parent,
353 cache_l1: BlockCache::with_capacity(l1_capacity),
354 page_cache: ShardedPageCache::default(),
355 prefetcher,
356 }))
357 }
358
359 /// Returns the logical size of a stream in bytes.
360 ///
361 /// # Parameters
362 ///
363 /// - `stream`: The stream to query (Disk or Memory)
364 ///
365 /// # Returns
366 ///
367 /// The uncompressed, logical size of the stream. This is the size you would
368 /// get if you decompressed all blocks and concatenated them.
369 ///
370 /// # Examples
371 ///
372 /// ```no_run
373 /// use hexz_core::{File, SnapshotStream};
374 /// # use std::sync::Arc;
375 /// # fn example(snapshot: Arc<File>) {
376 /// let disk_bytes = snapshot.size(SnapshotStream::Disk);
377 /// let mem_bytes = snapshot.size(SnapshotStream::Memory);
378 ///
379 /// println!("Disk: {} GB", disk_bytes / (1024 * 1024 * 1024));
380 /// println!("Memory: {} MB", mem_bytes / (1024 * 1024));
381 /// # }
382 /// ```
383 pub fn size(&self, stream: SnapshotStream) -> u64 {
384 match stream {
385 SnapshotStream::Disk => self.master.disk_size,
386 SnapshotStream::Memory => self.master.memory_size,
387 }
388 }
389
390 /// Reads data from a snapshot stream at a given offset.
391 ///
392 /// This is the primary read method for random access. It:
393 /// 1. Identifies which blocks overlap the requested range
394 /// 2. Fetches blocks from cache or decompresses from storage
395 /// 3. Handles thin snapshot fallback to parent
396 /// 4. Assembles the final buffer from block slices
397 ///
398 /// # Parameters
399 ///
400 /// - `stream`: Which stream to read from (Disk or Memory)
401 /// - `offset`: Starting byte offset (0-indexed)
402 /// - `len`: Number of bytes to read
403 ///
404 /// # Returns
405 ///
406 /// A `Vec<u8>` containing up to `len` bytes. The returned vector may be shorter
407 /// if:
408 /// - `offset` is beyond the stream size (returns empty vector)
409 /// - `offset + len` exceeds stream size (returns partial data)
410 ///
411 /// Missing data (sparse regions) is zero-filled.
412 ///
413 /// # Errors
414 ///
415 /// - `Error::Io` if backend read fails (e.g. truncated file)
416 /// - `Error::Corruption(block_idx)` if block checksum does not match
417 /// - `Error::Decompression` if block decompression fails
418 /// - `Error::Decryption` if block decryption fails
419 ///
420 /// # Performance
421 ///
422 /// - **Cache hit**: ~80μs latency, no I/O
423 /// - **Cache miss**: ~1ms latency (local storage), includes decompression
424 /// - **Remote storage**: Latency depends on network (HTTP: ~50ms, S3: ~100ms)
425 ///
426 /// Aligned reads (offset % block_size == 0) are most efficient.
427 ///
428 /// # Examples
429 ///
430 /// ```no_run
431 /// use hexz_core::{File, SnapshotStream};
432 /// # use std::sync::Arc;
433 /// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
434 /// // Read first 512 bytes of disk stream
435 /// let boot_sector = snapshot.read_at(SnapshotStream::Disk, 0, 512)?;
436 ///
437 /// // Read from arbitrary offset
438 /// let chunk = snapshot.read_at(SnapshotStream::Disk, 1024 * 1024, 4096)?;
439 ///
440 /// // Reading beyond stream size returns empty vector
441 /// let empty = snapshot.read_at(SnapshotStream::Disk, u64::MAX, 100)?;
442 /// assert!(empty.is_empty());
443 /// # Ok(())
444 /// # }
445 /// ```
446 /// Reads a byte range. Uses parallel block decompression when the range spans multiple blocks.
447 pub fn read_at(
448 self: &Arc<Self>,
449 stream: SnapshotStream,
450 offset: u64,
451 len: usize,
452 ) -> Result<Vec<u8>> {
453 let stream_size = self.size(stream);
454 if offset >= stream_size {
455 return Ok(Vec::new());
456 }
457 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
458 if actual_len == 0 {
459 return Ok(Vec::new());
460 }
461
462 let pages = match stream {
463 SnapshotStream::Disk => &self.master.disk_pages,
464 SnapshotStream::Memory => &self.master.memory_pages,
465 };
466
467 if pages.is_empty() {
468 if let Some(parent) = &self.parent {
469 return parent.read_at(stream, offset, actual_len);
470 }
471 return Ok(vec![0u8; actual_len]);
472 }
473
474 let mut buf: Vec<MaybeUninit<u8>> = Vec::new();
475 buf.resize_with(actual_len, MaybeUninit::uninit);
476 self.read_at_into_uninit(stream, offset, &mut buf)?;
477 let ptr = buf.as_mut_ptr().cast::<u8>();
478 let len = buf.len();
479 let cap = buf.capacity();
480 std::mem::forget(buf);
481 // SAFETY: `buf` was a Vec<MaybeUninit<u8>> that we fully initialized via
482 // `read_at_into_uninit` (which writes every byte). We `forget` the original
483 // Vec to avoid a double-free and reconstruct it with the same ptr/len/cap.
484 // MaybeUninit<u8> has the same layout as u8.
485 Ok(unsafe { Vec::from_raw_parts(ptr, len, cap) })
486 }
487
488 /// Reads into a provided buffer. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
489 pub fn read_at_into(
490 self: &Arc<Self>,
491 stream: SnapshotStream,
492 offset: u64,
493 buffer: &mut [u8],
494 ) -> Result<()> {
495 let len = buffer.len();
496 if len == 0 {
497 return Ok(());
498 }
499 let stream_size = self.size(stream);
500 if offset >= stream_size {
501 buffer.fill(0);
502 return Ok(());
503 }
504 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
505 if actual_len < len {
506 buffer[actual_len..].fill(0);
507 }
508 self.read_at_into_uninit_bytes(stream, offset, &mut buffer[0..actual_len])
509 }
510
511 /// Minimum number of local blocks to use the parallel decompression path.
512 const PARALLEL_MIN_BLOCKS: usize = 2;
513
514 /// Collects work items for blocks that need decompression.
515 ///
516 /// This method iterates through index pages and blocks, handling:
517 /// - Parent blocks: delegate to parent snapshot or zero-fill
518 /// - Zero blocks: zero-fill directly
519 /// - Regular blocks: add to work queue for later decompression
520 ///
521 /// Returns the work items to process and updates the tracking variables.
522 fn collect_work_items(
523 &self,
524 stream: SnapshotStream,
525 pages: &[PageEntry],
526 page_idx: usize,
527 target: &mut [MaybeUninit<u8>],
528 offset: u64,
529 actual_len: usize,
530 ) -> Result<(Vec<WorkItem>, usize)> {
531 let mut local_work: Vec<WorkItem> = Vec::new();
532 let mut buf_offset = 0;
533 let mut current_pos = offset;
534 let mut remaining = actual_len;
535
536 for page_entry in pages.iter().skip(page_idx) {
537 if remaining == 0 {
538 break;
539 }
540 if page_entry.start_logical > current_pos + remaining as u64 {
541 break;
542 }
543
544 let page = self.get_page(page_entry)?;
545 let mut block_logical_start = page_entry.start_logical;
546
547 for (block_idx_in_page, block) in page.blocks.iter().enumerate() {
548 let block_end = block_logical_start + block.logical_len as u64;
549
550 if block_end > current_pos {
551 let global_block_idx = page_entry.start_block + block_idx_in_page as u64;
552 let offset_in_block = (current_pos - block_logical_start) as usize;
553 let to_copy = std::cmp::min(
554 remaining,
555 (block.logical_len as usize).saturating_sub(offset_in_block),
556 );
557
558 if block.offset == BLOCK_OFFSET_PARENT {
559 // Parent block: delegate or zero-fill
560 if let Some(parent) = &self.parent {
561 let dest = &mut target[buf_offset..buf_offset + to_copy];
562 parent.read_at_into_uninit(stream, current_pos, dest)?;
563 } else {
564 Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
565 }
566 current_pos += to_copy as u64;
567 buf_offset += to_copy;
568 remaining -= to_copy;
569 } else if block.length == 0 {
570 // Zero block: fill with zeros
571 Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
572 current_pos += to_copy as u64;
573 buf_offset += to_copy;
574 remaining -= to_copy;
575 } else {
576 // Regular block: add to work queue
577 if to_copy > 0 {
578 local_work.push((
579 global_block_idx,
580 *block,
581 buf_offset,
582 offset_in_block,
583 to_copy,
584 ));
585 buf_offset += to_copy;
586 current_pos += to_copy as u64;
587 remaining -= to_copy;
588 }
589 }
590
591 if remaining == 0 {
592 break;
593 }
594 }
595 block_logical_start += block.logical_len as u64;
596 }
597 }
598
599 Ok((local_work, buf_offset))
600 }
601
602 /// Executes parallel decompression for multiple blocks.
603 ///
604 /// Uses a two-phase approach:
605 /// 1. Parallel I/O: Fetch all raw blocks concurrently
606 /// 2. Parallel CPU: Decompress and copy to target buffer
607 fn execute_parallel_decompression(
608 self: &Arc<Self>,
609 stream: SnapshotStream,
610 work_items: &[WorkItem],
611 target: &mut [MaybeUninit<u8>],
612 actual_len: usize,
613 ) -> Result<()> {
614 let snap = Arc::clone(self);
615 let target_addr = target.as_mut_ptr() as usize;
616
617 // Phase 1: Parallel fetch all raw blocks
618 let raw_blocks: Vec<Result<Bytes>> = work_items
619 .par_iter()
620 .map(|(block_idx, info, _, _, _)| snap.fetch_raw_block(stream, *block_idx, info))
621 .collect();
622
623 // Phase 2: Parallel decompress and copy
624 let err: Mutex<Option<Error>> = Mutex::new(None);
625 work_items
626 .par_iter()
627 .zip(raw_blocks)
628 .for_each(|(work_item, raw_result)| {
629 if err.lock().map_or(true, |e| e.is_some()) {
630 return;
631 }
632
633 let (block_idx, info, buf_offset, offset_in_block, to_copy) = work_item;
634
635 // Handle fetch errors
636 let raw = match raw_result {
637 Ok(r) => r,
638 Err(e) => {
639 if let Ok(mut guard) = err.lock() {
640 let _ = guard.replace(e);
641 }
642 return;
643 }
644 };
645
646 // If cache hit or zero block, raw is already decompressed
647 let data = if info.length == 0 || snap.cache_l1.get(stream, *block_idx).is_some() {
648 raw
649 } else {
650 // Decompress and verify
651 match snap.decompress_and_verify(raw, *block_idx, info) {
652 Ok(d) => {
653 // Cache the result
654 snap.cache_l1.insert(stream, *block_idx, d.clone());
655 d
656 }
657 Err(e) => {
658 if let Ok(mut guard) = err.lock() {
659 let _ = guard.replace(e);
660 }
661 return;
662 }
663 }
664 };
665
666 // Copy to target buffer
667 let src = data.as_ref();
668 let start = *offset_in_block;
669 let len = *to_copy;
670 if start < src.len() && len <= src.len() - start {
671 // Defensive assertion: ensure destination write is within bounds
672 debug_assert!(
673 buf_offset + len <= actual_len,
674 "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
675 len,
676 buf_offset,
677 actual_len
678 );
679 let dest = (target_addr + buf_offset) as *mut u8;
680 // SAFETY: `src[start..start+len]` is in-bounds (checked above).
681 // `dest` points into the `target` MaybeUninit buffer at a unique
682 // non-overlapping offset (each work item has a distinct `buf_offset`),
683 // and the rayon par_iter ensures each item writes to a disjoint region.
684 // The debug_assert above validates buf_offset + len <= actual_len.
685 unsafe { ptr::copy_nonoverlapping(src[start..].as_ptr(), dest, len) };
686 }
687 });
688
689 if let Some(e) = err.lock().ok().and_then(|mut guard| guard.take()) {
690 return Err(e);
691 }
692
693 Ok(())
694 }
695
696 /// Executes serial decompression for a small number of blocks.
697 fn execute_serial_decompression(
698 &self,
699 stream: SnapshotStream,
700 work_items: &[WorkItem],
701 target: &mut [MaybeUninit<u8>],
702 actual_len: usize,
703 ) -> Result<()> {
704 for (block_idx, info, buf_offset, offset_in_block, to_copy) in work_items {
705 let data = self.resolve_block_data(stream, *block_idx, info)?;
706 let src = data.as_ref();
707 let start = *offset_in_block;
708 if start < src.len() && *to_copy <= src.len() - start {
709 // Defensive assertion: ensure destination write is within bounds
710 debug_assert!(
711 *buf_offset + *to_copy <= actual_len,
712 "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
713 to_copy,
714 buf_offset,
715 actual_len
716 );
717 // SAFETY: `src[start..start+to_copy]` is in-bounds (checked above).
718 // `target[buf_offset..]` has sufficient room because `buf_offset + to_copy`
719 // never exceeds `actual_len` (tracked during work-item collection).
720 // The debug_assert above validates this invariant.
721 // MaybeUninit<u8> has the same layout as u8.
722 unsafe {
723 ptr::copy_nonoverlapping(
724 src[start..].as_ptr(),
725 target[*buf_offset..].as_mut_ptr() as *mut u8,
726 *to_copy,
727 );
728 }
729 }
730 }
731 Ok(())
732 }
733
734 /// Zero-fills a slice of uninitialized memory.
735 ///
736 /// This helper centralizes all unsafe zero-filling operations to improve
737 /// safety auditing and reduce code duplication.
738 ///
739 /// # Safety
740 ///
741 /// This function writes zeros to the provided buffer, making it fully initialized.
742 /// The caller must ensure the buffer is valid for writes.
743 #[inline]
744 fn zero_fill_uninit(buffer: &mut [MaybeUninit<u8>]) {
745 if !buffer.is_empty() {
746 // SAFETY: buffer is a valid &mut [MaybeUninit<u8>] slice, so writing
747 // buffer.len() zero bytes through its pointer is in-bounds.
748 unsafe { ptr::write_bytes(buffer.as_mut_ptr(), 0, buffer.len()) };
749 }
750 }
751
752 /// Writes into uninitialized memory. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
753 ///
754 /// **On error:** The buffer contents are undefined (possibly partially written).
755 pub fn read_at_into_uninit(
756 self: &Arc<Self>,
757 stream: SnapshotStream,
758 offset: u64,
759 buffer: &mut [MaybeUninit<u8>],
760 ) -> Result<()> {
761 self.read_at_uninit_inner(stream, offset, buffer, false)
762 }
763
764 /// Inner implementation of [`read_at_into_uninit`](Self::read_at_into_uninit).
765 ///
766 /// The `is_prefetch` flag prevents recursive prefetch thread spawning:
767 /// when `true`, the prefetch block is skipped to avoid unbounded thread creation.
768 fn read_at_uninit_inner(
769 self: &Arc<Self>,
770 stream: SnapshotStream,
771 offset: u64,
772 buffer: &mut [MaybeUninit<u8>],
773 is_prefetch: bool,
774 ) -> Result<()> {
775 // Early validation
776 let len = buffer.len();
777 if len == 0 {
778 return Ok(());
779 }
780
781 let stream_size = self.size(stream);
782 if offset >= stream_size {
783 Self::zero_fill_uninit(buffer);
784 return Ok(());
785 }
786
787 // Calculate actual read length and zero-fill suffix if needed
788 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
789 if actual_len < len {
790 Self::zero_fill_uninit(&mut buffer[actual_len..]);
791 }
792
793 let target = &mut buffer[0..actual_len];
794
795 // Get page list for stream
796 let pages = match stream {
797 SnapshotStream::Disk => &self.master.disk_pages,
798 SnapshotStream::Memory => &self.master.memory_pages,
799 };
800
801 // Delegate to parent if no index pages
802 if pages.is_empty() {
803 if let Some(parent) = &self.parent {
804 return parent.read_at_into_uninit(stream, offset, target);
805 }
806 Self::zero_fill_uninit(target);
807 return Ok(());
808 }
809
810 // Find starting page index
811 let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
812 Ok(idx) => idx,
813 Err(idx) => idx.saturating_sub(1),
814 };
815
816 // Collect work items (handles parent blocks, zero blocks, and queues regular blocks)
817 let (work_items, buf_offset) =
818 self.collect_work_items(stream, pages, page_idx, target, offset, actual_len)?;
819
820 // Choose parallel or serial decompression based on work item count
821 if work_items.len() >= Self::PARALLEL_MIN_BLOCKS {
822 self.execute_parallel_decompression(stream, &work_items, target, actual_len)?;
823 } else {
824 self.execute_serial_decompression(stream, &work_items, target, actual_len)?;
825 }
826
827 // Handle any remaining unprocessed data
828 let remaining = actual_len - buf_offset;
829 if remaining > 0 {
830 if let Some(parent) = &self.parent {
831 let current_pos = offset + buf_offset as u64;
832 parent.read_at_into_uninit(stream, current_pos, &mut target[buf_offset..])?;
833 } else {
834 Self::zero_fill_uninit(&mut target[buf_offset..]);
835 }
836 }
837
838 // Trigger prefetch for next sequential blocks if enabled.
839 // Guards:
840 // 1. `is_prefetch` prevents recursive spawning (prefetch thread spawning another)
841 // 2. `try_start()` limits to one in-flight prefetch at a time, preventing
842 // unbounded thread creation under rapid sequential reads
843 if let Some(prefetcher) = &self.prefetcher {
844 if !is_prefetch && !work_items.is_empty() && prefetcher.try_start() {
845 let next_offset = offset + actual_len as u64;
846 let prefetch_len = (self.header.block_size * 4) as usize;
847 let snap = Arc::clone(self);
848 let stream_copy = stream;
849 std::thread::spawn(move || {
850 let mut buf = vec![MaybeUninit::uninit(); prefetch_len];
851 let _ = snap.read_at_uninit_inner(stream_copy, next_offset, &mut buf, true);
852 // Release the in-flight guard so the next read can prefetch
853 if let Some(pf) = &snap.prefetcher {
854 pf.clear_in_flight();
855 }
856 });
857 }
858 }
859
860 Ok(())
861 }
862
863 /// Like [`read_at_into_uninit`](Self::read_at_into_uninit) but accepts `&mut [u8]`. Use from FFI (e.g. Python).
864 #[inline]
865 pub fn read_at_into_uninit_bytes(
866 self: &Arc<Self>,
867 stream: SnapshotStream,
868 offset: u64,
869 buf: &mut [u8],
870 ) -> Result<()> {
871 if buf.is_empty() {
872 return Ok(());
873 }
874 // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
875 // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
876 // The borrow is derived from `buf` so no aliasing occurs.
877 let uninit = unsafe { &mut *(buf as *mut [u8] as *mut [MaybeUninit<u8>]) };
878 self.read_at_into_uninit(stream, offset, uninit)
879 }
880
881 /// Fetches an index page from cache or storage.
882 ///
883 /// Index pages map logical offsets to physical block locations. This method
884 /// maintains an LRU cache to avoid repeated deserialization.
885 ///
886 /// # Parameters
887 ///
888 /// - `entry`: Page metadata from master index
889 ///
890 /// # Returns
891 ///
892 /// A shared reference to the deserialized index page.
893 ///
894 /// # Thread Safety
895 ///
896 /// This method acquires a lock on the page cache only for cache lookup and insertion.
897 /// I/O and deserialization are performed without holding the lock to avoid blocking
898 /// other threads during cache misses.
899 fn get_page(&self, entry: &PageEntry) -> Result<Arc<IndexPage>> {
900 // Fast path: check sharded cache
901 if let Some(p) = self.page_cache.get(entry.offset) {
902 return Ok(p);
903 }
904
905 // Slow path: I/O and deserialization without holding any lock
906 let bytes = self
907 .backend
908 .read_exact(entry.offset, entry.length as usize)?;
909 let page: IndexPage = bincode::deserialize(&bytes)?;
910 let arc = Arc::new(page);
911
912 // Check again in case another thread inserted while we were doing I/O
913 if let Some(p) = self.page_cache.get(entry.offset) {
914 return Ok(p);
915 }
916 self.page_cache.insert(entry.offset, arc.clone());
917
918 Ok(arc)
919 }
920
921 /// Fetches raw compressed block data from cache or storage.
922 ///
923 /// This is the I/O portion of block resolution, separated to enable parallel I/O.
924 /// It:
925 /// 1. Checks the block cache
926 /// 2. Handles zero-length blocks
927 /// 3. Reads raw compressed data from backend
928 ///
929 /// # Parameters
930 ///
931 /// - `stream`: Stream identifier (for cache key)
932 /// - `block_idx`: Global block index
933 /// - `info`: Block metadata (offset, length)
934 ///
935 /// # Returns
936 ///
937 /// Raw block data (potentially compressed/encrypted) or cached decompressed data.
938 fn fetch_raw_block(
939 &self,
940 stream: SnapshotStream,
941 block_idx: u64,
942 info: &BlockInfo,
943 ) -> Result<Bytes> {
944 // Check cache first - return decompressed data if available
945 if let Some(data) = self.cache_l1.get(stream, block_idx) {
946 return Ok(data);
947 }
948
949 // Handle zero blocks
950 if info.length == 0 {
951 let len = info.logical_len as usize;
952 if len == 0 {
953 return Ok(Bytes::new());
954 }
955 if len == ZEROS_64K.len() {
956 return Ok(Bytes::from_static(&ZEROS_64K));
957 }
958 return Ok(Bytes::from(vec![0u8; len]));
959 }
960
961 // Fetch raw compressed data (THIS IS THE PARALLEL PART)
962 self.backend.read_exact(info.offset, info.length as usize)
963 }
964
965 /// Decompresses and verifies a raw block.
966 ///
967 /// This is the CPU portion of block resolution, separated to enable parallel decompression.
968 /// It:
969 /// 1. Verifies CRC32 checksum
970 /// 2. Decrypts (if encrypted)
971 /// 3. Decompresses
972 ///
973 /// # Parameters
974 ///
975 /// - `raw`: Raw block data (potentially compressed/encrypted)
976 /// - `block_idx`: Global block index (for error reporting and decryption)
977 /// - `info`: Block metadata (checksum)
978 ///
979 /// # Returns
980 ///
981 /// Decompressed block data as `Bytes`.
982 ///
983 /// # Performance
984 ///
985 /// Decompression throughput:
986 /// - LZ4: ~2 GB/s per core
987 /// - Zstd: ~500 MB/s per core
988 fn decompress_and_verify(&self, raw: Bytes, block_idx: u64, info: &BlockInfo) -> Result<Bytes> {
989 // Verify stored checksum (CRC32 of compressed/encrypted data) before decrypt/decompress
990 if info.checksum != 0 {
991 let computed = crc32_hash(&raw);
992 if computed != info.checksum {
993 return Err(Error::Corruption(block_idx));
994 }
995 }
996
997 // Pre-allocate exact output buffer to avoid over-allocation inside decompressor.
998 // We use decompress_into() instead of decompress() to eliminate the allocation
999 // and potential reallocation overhead inside the compression library.
1000 //
1001 // Performance impact: Avoids zero-initialization overhead (~16% improvement for
1002 // high-thread-count workloads based on benchmarks).
1003 let out_len = info.logical_len as usize;
1004 let mut out = Vec::with_capacity(out_len);
1005
1006 // SAFETY: This unsafe block is required to create an uninitialized buffer for
1007 // decompress_into() to write into. This is safe because:
1008 //
1009 // 1. Contract guarantee: Both LZ4 and Zstd decompress_into() implementations
1010 // promise to either:
1011 // a) Write exactly `out.len()` bytes (the full decompressed size), OR
1012 // b) Return an Err() if decompression fails (buffer underrun, corruption, etc.)
1013 //
1014 // 2. Size accuracy: We set out.len() to info.logical_len, which is the exact
1015 // decompressed size recorded in the block metadata during compression.
1016 // The decompressor will write exactly this many bytes or fail.
1017 //
1018 // 3. Error propagation: If decompress_into() returns Err(), we propagate it
1019 // immediately via the ? operator. The uninitialized buffer is dropped
1020 // without ever being read.
1021 //
1022 // 4. No partial writes: The decompressor APIs do not support partial writes.
1023 // They either fully succeed or fully fail. We never access a partially
1024 // initialized buffer.
1025 //
1026 // 5. Memory safety: We never read from `out` before decompress_into() succeeds.
1027 // The only subsequent access is Bytes::from(out), which transfers ownership
1028 // of the now-fully-initialized buffer.
1029 //
1030 // This is a well-established pattern for zero-copy decompression. The clippy
1031 // lint is conservative and warns about ANY use of set_len() after with_capacity(),
1032 // but in this case we have explicit API guarantees from the decompressor.
1033 #[allow(clippy::uninit_vec)]
1034 unsafe {
1035 out.set_len(out_len);
1036 }
1037
1038 if let Some(enc) = &self.encryptor {
1039 let compressed = enc.decrypt(&raw, block_idx)?;
1040 self.compressor.decompress_into(&compressed, &mut out)?;
1041 } else {
1042 self.compressor.decompress_into(raw.as_ref(), &mut out)?;
1043 }
1044
1045 Ok(Bytes::from(out))
1046 }
1047
1048 /// Resolves raw block data by fetching from cache or decompressing from storage.
1049 ///
1050 /// This is the core decompression path. It:
1051 /// 1. Checks the block cache
1052 /// 2. Reads compressed block from backend
1053 /// 3. Verifies CRC32 checksum (if stored) and returns `Corruption(block_idx)` on mismatch
1054 /// 4. Decrypts (if encrypted)
1055 /// 5. Decompresses
1056 /// 6. Caches the result
1057 ///
1058 /// # Parameters
1059 ///
1060 /// - `stream`: Stream identifier (for cache key)
1061 /// - `block_idx`: Global block index
1062 /// - `info`: Block metadata (offset, length, compression)
1063 ///
1064 /// # Returns
1065 ///
1066 /// Decompressed block data as `Bytes` (zero-copy on cache hit).
1067 ///
1068 /// # Performance
1069 ///
1070 /// This method is hot path for cache misses. Decompression throughput:
1071 /// - LZ4: ~2 GB/s per core
1072 /// - Zstd: ~500 MB/s per core
1073 fn resolve_block_data(
1074 &self,
1075 stream: SnapshotStream,
1076 block_idx: u64,
1077 info: &BlockInfo,
1078 ) -> Result<Bytes> {
1079 // Fetch raw block (from cache or I/O)
1080 let raw = self.fetch_raw_block(stream, block_idx, info)?;
1081
1082 // If cache hit or zero block, raw is already decompressed
1083 if info.length == 0 || self.cache_l1.get(stream, block_idx).is_some() {
1084 return Ok(raw);
1085 }
1086
1087 // Decompress and verify
1088 let data = self.decompress_and_verify(raw, block_idx, info)?;
1089
1090 // Cache the result
1091 self.cache_l1.insert(stream, block_idx, data.clone());
1092 Ok(data)
1093 }
1094}