hexz_core/api/file.rs
1//! High-level snapshot file API and logical stream types.
2
3use crate::algo::compression::{Compressor, create_compressor};
4use crate::algo::encryption::Encryptor;
5use crate::cache::lru::{BlockCache, ShardedPageCache};
6use crate::cache::prefetch::Prefetcher;
7use crate::format::header::Header;
8use crate::format::index::{BlockInfo, IndexPage, MasterIndex, PageEntry};
9use crate::format::magic::{HEADER_SIZE, MAGIC_BYTES};
10use crate::format::version::{VersionCompatibility, check_version, compatibility_message};
11use crate::store::StorageBackend;
12use crate::store::local::file::FileBackend;
13use bytes::Bytes;
14use crc32fast::hash as crc32_hash;
15use std::mem::MaybeUninit;
16use std::path::Path;
17use std::ptr;
18use std::sync::{Arc, Mutex};
19
20use hexz_common::constants::{BLOCK_OFFSET_PARENT, DEFAULT_BLOCK_SIZE};
21use hexz_common::{Error, Result};
22use rayon::prelude::*;
23
24/// Shared zero block for the default block size to avoid allocating when returning zero blocks.
25static ZEROS_64K: [u8; DEFAULT_BLOCK_SIZE as usize] = [0u8; DEFAULT_BLOCK_SIZE as usize];
26
27/// Work item for block decompression: (block_idx, info, buf_offset, offset_in_block, to_copy)
28type WorkItem = (u64, BlockInfo, usize, usize, usize);
29
30/// Result of fetching a block from cache or storage.
31///
32/// Eliminates TOCTOU races by tracking data state at fetch time rather than
33/// re-checking the cache later (which can give a different answer if a
34/// background prefetch thread modifies the cache between check and use).
35enum FetchResult {
36 /// Data is already decompressed (came from L1 cache or is a zero block).
37 Decompressed(Bytes),
38 /// Data is raw compressed bytes from storage (needs decompression).
39 Compressed(Bytes),
40}
41
42/// Logical stream identifier for dual-stream snapshots.
43///
44/// Hexz snapshots can store two independent data streams:
45/// - **Primary**: Persistent storage (disk image, filesystem data, or main tensor weights)
46/// - **Secondary**: Volatile state (RAM contents, process memory, or auxiliary data)
47///
48/// # Example
49///
50/// ```no_run
51/// use hexz_core::{File, SnapshotStream};
52/// # use std::sync::Arc;
53/// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
54/// // Read 4KB from primary stream
55/// let data = snapshot.read_at(SnapshotStream::Primary, 0, 4096)?;
56///
57/// // Read 4KB from secondary stream (if present)
58/// let aux = snapshot.read_at(SnapshotStream::Secondary, 0, 4096)?;
59/// # Ok(())
60/// # }
61/// ```
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63#[repr(u8)]
64pub enum SnapshotStream {
65 /// Persistent primary stream (formerly Disk)
66 Primary = 0,
67 /// Volatile secondary stream (formerly Memory)
68 Secondary = 1,
69}
70
71/// Read-only interface for accessing Hexz snapshot data.
72///
73/// `File` is the primary API for reading compressed, block-indexed snapshots.
74/// It handles:
75/// - Block-level decompression with LRU caching
76/// - Optional AES-256-GCM decryption
77/// - Thin snapshot parent chaining
78/// - Dual-stream access (disk and memory)
79/// - Random access with minimal I/O
80///
81/// # Thread Safety
82///
83/// `File` is `Send + Sync` and can be safely shared across threads via `Arc`.
84/// Internal caches use `Mutex` for synchronization.
85///
86/// # Performance
87///
88/// - **Cache hit latency**: ~80μs (warm cache)
89/// - **Cache miss latency**: ~1ms (cold cache, local storage)
90/// - **Sequential throughput**: ~2-3 GB/s (NVMe + LZ4)
91/// - **Memory overhead**: ~150MB typical (configurable)
92///
93/// # Examples
94///
95/// ## Basic Usage
96///
97/// ```no_run
98/// use hexz_core::{File, SnapshotStream};
99/// use hexz_core::store::local::FileBackend;
100/// use hexz_core::algo::compression::lz4::Lz4Compressor;
101/// use std::sync::Arc;
102///
103/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
104/// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
105/// let compressor = Box::new(Lz4Compressor::new());
106/// let snapshot = File::new(backend, compressor, None)?;
107///
108/// // Read 4KB at offset 1MB
109/// let data = snapshot.read_at(SnapshotStream::Primary, 1024 * 1024, 4096)?;
110/// assert_eq!(data.len(), 4096);
111/// # Ok(())
112/// # }
113/// ```
114///
115/// ## Thin Snapshots (with parent)
116///
117/// ```no_run
118/// use hexz_core::File;
119/// use hexz_core::store::local::FileBackend;
120/// use hexz_core::algo::compression::lz4::Lz4Compressor;
121/// use std::sync::Arc;
122///
123/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
124/// // Open base snapshot
125/// let base_backend = Arc::new(FileBackend::new("base.hxz".as_ref())?);
126/// let base = File::new(
127/// base_backend,
128/// Box::new(Lz4Compressor::new()),
129/// None
130/// )?;
131///
132/// // The thin snapshot will automatically load its parent based on
133/// // the parent_path field in the header
134/// let thin_backend = Arc::new(FileBackend::new("incremental.hxz".as_ref())?);
135/// let thin = File::new(
136/// thin_backend,
137/// Box::new(Lz4Compressor::new()),
138/// None
139/// )?;
140///
141/// // Reads automatically fall back to base for unchanged blocks
142/// let data = thin.read_at(hexz_core::SnapshotStream::Primary, 0, 4096)?;
143/// # Ok(())
144/// # }
145/// ```
146pub struct File {
147 /// Snapshot metadata (sizes, compression, encryption settings)
148 pub header: Header,
149
150 /// Master index containing top-level page entries
151 pub(crate) master: MasterIndex,
152
153 /// Storage backend for reading raw snapshot data
154 backend: Arc<dyn StorageBackend>,
155
156 /// Compression algorithm (LZ4 or Zstandard)
157 compressor: Box<dyn Compressor>,
158
159 /// Optional encryption (AES-256-GCM)
160 encryptor: Option<Box<dyn Encryptor>>,
161
162 /// Optional parent snapshot for thin (incremental) snapshots.
163 /// When a block's offset is BLOCK_OFFSET_PARENT, data is fetched from parent.
164 parent: Option<Arc<File>>,
165
166 /// LRU cache for decompressed blocks (per-stream, per-block-index)
167 cache_l1: BlockCache,
168
169 /// Sharded LRU cache for deserialized index pages
170 page_cache: ShardedPageCache,
171
172 /// Optional prefetcher for background data loading
173 prefetcher: Option<Prefetcher>,
174}
175
176impl File {
177 /// Opens a Hexz snapshot with default cache settings.
178 ///
179 /// This is the primary constructor for `File`. It:
180 /// 1. Reads and validates the snapshot header (magic bytes, version)
181 /// 2. Deserializes the master index
182 /// 3. Recursively loads parent snapshots (for thin snapshots)
183 /// 4. Initializes block and page caches
184 ///
185 /// # Parameters
186 ///
187 /// - `backend`: Storage backend (local file, HTTP, S3, etc.)
188 /// - `compressor`: Compression algorithm matching the snapshot format
189 /// - `encryptor`: Optional decryption handler (pass `None` for unencrypted snapshots)
190 ///
191 /// # Returns
192 ///
193 /// - `Ok(File)` on success
194 /// - `Err(Error::Format)` if magic bytes or version are invalid
195 /// - `Err(Error::Io)` if storage backend fails
196 ///
197 /// # Examples
198 ///
199 /// ```no_run
200 /// use hexz_core::{File, SnapshotStream};
201 /// use hexz_core::store::local::FileBackend;
202 /// use hexz_core::algo::compression::lz4::Lz4Compressor;
203 /// use std::sync::Arc;
204 ///
205 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
206 /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
207 /// let compressor = Box::new(Lz4Compressor::new());
208 /// let snapshot = File::new(backend, compressor, None)?;
209 ///
210 /// println!("Primary size: {} bytes", snapshot.size(SnapshotStream::Primary));
211 /// # Ok(())
212 /// # }
213 /// ```
214 /// Opens a snapshot, auto-detecting compression and dictionary from the header.
215 ///
216 /// This eliminates the 3-step boilerplate of: read header, load dict, create
217 /// compressor. Equivalent to `File::new(backend, auto_compressor, encryptor)`.
218 pub fn open(
219 backend: Arc<dyn StorageBackend>,
220 encryptor: Option<Box<dyn Encryptor>>,
221 ) -> Result<Arc<Self>> {
222 Self::open_with_cache(backend, encryptor, None, None)
223 }
224
225 /// Like [`open`](Self::open) but with custom cache and prefetch settings.
226 pub fn open_with_cache(
227 backend: Arc<dyn StorageBackend>,
228 encryptor: Option<Box<dyn Encryptor>>,
229 cache_capacity_bytes: Option<usize>,
230 prefetch_window_size: Option<u32>,
231 ) -> Result<Arc<Self>> {
232 let header = Header::read_from_backend(backend.as_ref())?;
233 let dictionary = header.load_dictionary(backend.as_ref())?;
234 let compressor = create_compressor(header.compression, None, dictionary);
235 Self::with_cache(
236 backend,
237 compressor,
238 encryptor,
239 cache_capacity_bytes,
240 prefetch_window_size,
241 )
242 }
243
244 pub fn new(
245 backend: Arc<dyn StorageBackend>,
246 compressor: Box<dyn Compressor>,
247 encryptor: Option<Box<dyn Encryptor>>,
248 ) -> Result<Arc<Self>> {
249 Self::with_cache(backend, compressor, encryptor, None, None)
250 }
251
252 /// Opens a Hexz snapshot with custom cache capacity and prefetching.
253 ///
254 /// Identical to [`new`](Self::new) but allows specifying cache size and prefetch window.
255 ///
256 /// # Parameters
257 ///
258 /// - `backend`: Storage backend
259 /// - `compressor`: Compression algorithm
260 /// - `encryptor`: Optional decryption handler
261 /// - `cache_capacity_bytes`: Block cache size in bytes (default: ~400MB for 4KB blocks)
262 /// - `prefetch_window_size`: Number of blocks to prefetch ahead (default: disabled)
263 ///
264 /// # Cache Sizing
265 ///
266 /// The cache stores decompressed blocks. Given a block size of 4KB:
267 /// - `Some(100_000_000)` → ~24,000 blocks (~96MB effective)
268 /// - `None` → 1000 blocks (~4MB effective)
269 ///
270 /// Larger caches reduce repeated decompression but increase memory usage.
271 ///
272 /// # Prefetching
273 ///
274 /// When `prefetch_window_size` is set, the system will automatically fetch the next N blocks
275 /// in the background after each read, optimizing sequential access patterns:
276 /// - `Some(4)` → Prefetch 4 blocks ahead
277 /// - `None` or `Some(0)` → Disable prefetching
278 ///
279 /// # Examples
280 ///
281 /// ```no_run
282 /// use hexz_core::File;
283 /// use hexz_core::store::local::FileBackend;
284 /// use hexz_core::algo::compression::lz4::Lz4Compressor;
285 /// use std::sync::Arc;
286 ///
287 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
288 /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
289 /// let compressor = Box::new(Lz4Compressor::new());
290 ///
291 /// // Allocate 256MB for cache, prefetch 4 blocks ahead
292 /// let snapshot = File::with_cache(
293 /// backend,
294 /// compressor,
295 /// None,
296 /// Some(256 * 1024 * 1024),
297 /// Some(4)
298 /// )?;
299 /// # Ok(())
300 /// # }
301 /// ```
302 pub fn with_cache(
303 backend: Arc<dyn StorageBackend>,
304 compressor: Box<dyn Compressor>,
305 encryptor: Option<Box<dyn Encryptor>>,
306 cache_capacity_bytes: Option<usize>,
307 prefetch_window_size: Option<u32>,
308 ) -> Result<Arc<Self>> {
309 let header_bytes = backend.read_exact(0, HEADER_SIZE)?;
310 let header: Header = bincode::deserialize(&header_bytes)?;
311
312 if &header.magic != MAGIC_BYTES {
313 return Err(Error::Format("Invalid magic bytes".into()));
314 }
315
316 // Check version compatibility
317 let compatibility = check_version(header.version);
318 match compatibility {
319 VersionCompatibility::Full => {
320 // Perfect match, proceed silently
321 }
322 VersionCompatibility::Degraded => {
323 // Newer version, issue warning but allow
324 tracing::warn!("{}", compatibility_message(header.version));
325 }
326 VersionCompatibility::Incompatible => {
327 // Too old or too new, reject
328 return Err(Error::Format(compatibility_message(header.version)));
329 }
330 }
331
332 let file_len = backend.len();
333 if header.index_offset >= file_len {
334 return Err(Error::Format(format!(
335 "index_offset ({}) is at or past end of file ({})",
336 header.index_offset, file_len
337 )));
338 }
339 let index_bytes = backend.read_exact(
340 header.index_offset,
341 (file_len - header.index_offset) as usize,
342 )?;
343
344 let master: MasterIndex = bincode::deserialize(&index_bytes)?;
345
346 // Recursively load parent if present
347 let parent = if let Some(parent_path) = &header.parent_path {
348 tracing::info!("Loading parent snapshot: {}", parent_path);
349 let p_backend = Arc::new(FileBackend::new(Path::new(parent_path))?);
350 Some(File::open(p_backend, None)?)
351 } else {
352 None
353 };
354
355 let block_size = header.block_size as usize;
356 let l1_capacity = if let Some(bytes) = cache_capacity_bytes {
357 (bytes / block_size).max(1)
358 } else {
359 1000
360 };
361
362 // Initialize prefetcher if window size is specified and > 0
363 let prefetcher = prefetch_window_size.filter(|&w| w > 0).map(Prefetcher::new);
364
365 Ok(Arc::new(Self {
366 header,
367 master,
368 backend,
369 compressor,
370 encryptor,
371 parent,
372 cache_l1: BlockCache::with_capacity(l1_capacity),
373 page_cache: ShardedPageCache::default(),
374 prefetcher,
375 }))
376 }
377
378 /// Returns the logical size of a stream in bytes.
379 ///
380 /// # Parameters
381 ///
382 /// - `stream`: The stream to query (Disk or Memory)
383 ///
384 /// # Returns
385 ///
386 /// The uncompressed, logical size of the stream. This is the size you would
387 /// get if you decompressed all blocks and concatenated them.
388 ///
389 /// # Examples
390 ///
391 /// ```no_run
392 /// use hexz_core::{File, SnapshotStream};
393 /// # use std::sync::Arc;
394 /// # fn example(snapshot: Arc<File>) {
395 /// let disk_bytes = snapshot.size(SnapshotStream::Primary);
396 /// let mem_bytes = snapshot.size(SnapshotStream::Secondary);
397 ///
398 /// println!("Primary: {} GB", disk_bytes / (1024 * 1024 * 1024));
399 /// println!("Secondary: {} MB", mem_bytes / (1024 * 1024));
400 /// # }
401 /// ```
402 /// Returns the total number of prefetch operations spawned since this file was opened.
403 /// Returns 0 if prefetching is disabled.
404 pub fn prefetch_spawn_count(&self) -> u64 {
405 self.prefetcher.as_ref().map_or(0, |p| p.spawn_count())
406 }
407
408 pub fn size(&self, stream: SnapshotStream) -> u64 {
409 match stream {
410 SnapshotStream::Primary => self.master.primary_size,
411 SnapshotStream::Secondary => self.master.secondary_size,
412 }
413 }
414
415 /// Returns the block metadata for a given logical offset.
416 pub fn get_block_info(
417 &self,
418 stream: SnapshotStream,
419 offset: u64,
420 ) -> Result<Option<(u64, BlockInfo)>> {
421 let pages = match stream {
422 SnapshotStream::Primary => &self.master.primary_pages,
423 SnapshotStream::Secondary => &self.master.secondary_pages,
424 };
425
426 if pages.is_empty() {
427 return Ok(None);
428 }
429
430 let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
431 Ok(idx) => idx,
432 Err(idx) => idx.saturating_sub(1),
433 };
434
435 let page_entry = &pages[page_idx];
436 let page = self.get_page(page_entry)?;
437 let mut block_logical_start = page_entry.start_logical;
438
439 for (i, block) in page.blocks.iter().enumerate() {
440 let block_end = block_logical_start + block.logical_len as u64;
441 if offset >= block_logical_start && offset < block_end {
442 let global_idx = page_entry.start_block + i as u64;
443 return Ok(Some((global_idx, *block)));
444 }
445 block_logical_start = block_end;
446 }
447
448 Ok(None)
449 }
450
451 /// Reads data from a snapshot stream at a given offset.
452 ///
453 /// This is the primary read method for random access. It:
454 /// 1. Identifies which blocks overlap the requested range
455 /// 2. Fetches blocks from cache or decompresses from storage
456 /// 3. Handles thin snapshot fallback to parent
457 /// 4. Assembles the final buffer from block slices
458 ///
459 /// # Parameters
460 ///
461 /// - `stream`: Which stream to read from (Disk or Memory)
462 /// - `offset`: Starting byte offset (0-indexed)
463 /// - `len`: Number of bytes to read
464 ///
465 /// # Returns
466 ///
467 /// A `Vec<u8>` containing up to `len` bytes. The returned vector may be shorter
468 /// if:
469 /// - `offset` is beyond the stream size (returns empty vector)
470 /// - `offset + len` exceeds stream size (returns partial data)
471 ///
472 /// Missing data (sparse regions) is zero-filled.
473 ///
474 /// # Errors
475 ///
476 /// - `Error::Io` if backend read fails (e.g. truncated file)
477 /// - `Error::Corruption(block_idx)` if block checksum does not match
478 /// - `Error::Decompression` if block decompression fails
479 /// - `Error::Decryption` if block decryption fails
480 ///
481 /// # Performance
482 ///
483 /// - **Cache hit**: ~80μs latency, no I/O
484 /// - **Cache miss**: ~1ms latency (local storage), includes decompression
485 /// - **Remote storage**: Latency depends on network (HTTP: ~50ms, S3: ~100ms)
486 ///
487 /// Aligned reads (offset % block_size == 0) are most efficient.
488 ///
489 /// # Examples
490 ///
491 /// ```no_run
492 /// use hexz_core::{File, SnapshotStream};
493 /// # use std::sync::Arc;
494 /// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
495 /// // Read first 512 bytes of primary stream
496 /// let boot_sector = snapshot.read_at(SnapshotStream::Primary, 0, 512)?;
497 ///
498 /// // Read from arbitrary offset
499 /// let chunk = snapshot.read_at(SnapshotStream::Primary, 1024 * 1024, 4096)?;
500 ///
501 /// // Reading beyond stream size returns empty vector
502 /// let empty = snapshot.read_at(SnapshotStream::Primary, u64::MAX, 100)?;
503 /// assert!(empty.is_empty());
504 /// # Ok(())
505 /// # }
506 /// ```
507 /// Reads a byte range. Uses parallel block decompression when the range spans multiple blocks.
508 pub fn read_at(
509 self: &Arc<Self>,
510 stream: SnapshotStream,
511 offset: u64,
512 len: usize,
513 ) -> Result<Vec<u8>> {
514 let stream_size = self.size(stream);
515 if offset >= stream_size {
516 return Ok(Vec::new());
517 }
518 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
519 if actual_len == 0 {
520 return Ok(Vec::new());
521 }
522
523 let pages = match stream {
524 SnapshotStream::Primary => &self.master.primary_pages,
525 SnapshotStream::Secondary => &self.master.secondary_pages,
526 };
527
528 if pages.is_empty() {
529 if let Some(parent) = &self.parent {
530 return parent.read_at(stream, offset, actual_len);
531 }
532 return Ok(vec![0u8; actual_len]);
533 }
534
535 let mut buf: Vec<MaybeUninit<u8>> = Vec::new();
536 buf.resize_with(actual_len, MaybeUninit::uninit);
537 self.read_at_into_uninit(stream, offset, &mut buf)?;
538 let ptr = buf.as_mut_ptr().cast::<u8>();
539 let len = buf.len();
540 let cap = buf.capacity();
541 std::mem::forget(buf);
542 // SAFETY: `buf` was a Vec<MaybeUninit<u8>> that we fully initialized via
543 // `read_at_into_uninit` (which writes every byte). We `forget` the original
544 // Vec to avoid a double-free and reconstruct it with the same ptr/len/cap.
545 // MaybeUninit<u8> has the same layout as u8.
546 Ok(unsafe { Vec::from_raw_parts(ptr, len, cap) })
547 }
548
549 /// Reads into a provided buffer. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
550 pub fn read_at_into(
551 self: &Arc<Self>,
552 stream: SnapshotStream,
553 offset: u64,
554 buffer: &mut [u8],
555 ) -> Result<()> {
556 let len = buffer.len();
557 if len == 0 {
558 return Ok(());
559 }
560 let stream_size = self.size(stream);
561 if offset >= stream_size {
562 buffer.fill(0);
563 return Ok(());
564 }
565 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
566 if actual_len < len {
567 buffer[actual_len..].fill(0);
568 }
569 self.read_at_into_uninit_bytes(stream, offset, &mut buffer[0..actual_len])
570 }
571
572 /// Minimum number of local blocks to use the parallel decompression path.
573 const PARALLEL_MIN_BLOCKS: usize = 2;
574
575 /// Collects work items for blocks that need decompression.
576 ///
577 /// This method iterates through index pages and blocks, handling:
578 /// - Parent blocks: delegate to parent snapshot or zero-fill
579 /// - Zero blocks: zero-fill directly
580 /// - Regular blocks: add to work queue for later decompression
581 ///
582 /// Returns the work items to process and updates the tracking variables.
583 fn collect_work_items(
584 &self,
585 stream: SnapshotStream,
586 pages: &[PageEntry],
587 page_idx: usize,
588 target: &mut [MaybeUninit<u8>],
589 offset: u64,
590 actual_len: usize,
591 ) -> Result<(Vec<WorkItem>, usize)> {
592 let mut local_work: Vec<WorkItem> = Vec::new();
593 let mut buf_offset = 0;
594 let mut current_pos = offset;
595 let mut remaining = actual_len;
596
597 for page_entry in pages.iter().skip(page_idx) {
598 if remaining == 0 {
599 break;
600 }
601 if page_entry.start_logical > current_pos + remaining as u64 {
602 break;
603 }
604
605 let page = self.get_page(page_entry)?;
606 let mut block_logical_start = page_entry.start_logical;
607
608 for (block_idx_in_page, block) in page.blocks.iter().enumerate() {
609 let block_end = block_logical_start + block.logical_len as u64;
610
611 if block_end > current_pos {
612 let global_block_idx = page_entry.start_block + block_idx_in_page as u64;
613 let offset_in_block = (current_pos - block_logical_start) as usize;
614 let to_copy = std::cmp::min(
615 remaining,
616 (block.logical_len as usize).saturating_sub(offset_in_block),
617 );
618
619 if block.offset == BLOCK_OFFSET_PARENT {
620 // Parent block: delegate or zero-fill
621 if let Some(parent) = &self.parent {
622 let dest = &mut target[buf_offset..buf_offset + to_copy];
623 parent.read_at_into_uninit(stream, current_pos, dest)?;
624 } else {
625 Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
626 }
627 current_pos += to_copy as u64;
628 buf_offset += to_copy;
629 remaining -= to_copy;
630 } else if block.length == 0 {
631 // Zero block: fill with zeros
632 Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
633 current_pos += to_copy as u64;
634 buf_offset += to_copy;
635 remaining -= to_copy;
636 } else {
637 // Regular block: add to work queue
638 if to_copy > 0 {
639 local_work.push((
640 global_block_idx,
641 *block,
642 buf_offset,
643 offset_in_block,
644 to_copy,
645 ));
646 buf_offset += to_copy;
647 current_pos += to_copy as u64;
648 remaining -= to_copy;
649 }
650 }
651
652 if remaining == 0 {
653 break;
654 }
655 }
656 block_logical_start += block.logical_len as u64;
657 }
658 }
659
660 Ok((local_work, buf_offset))
661 }
662
663 /// Executes parallel decompression for multiple blocks.
664 ///
665 /// Uses a two-phase approach:
666 /// 1. Parallel I/O: Fetch all raw blocks concurrently
667 /// 2. Parallel CPU: Decompress and copy to target buffer
668 fn execute_parallel_decompression(
669 self: &Arc<Self>,
670 stream: SnapshotStream,
671 work_items: &[WorkItem],
672 target: &mut [MaybeUninit<u8>],
673 actual_len: usize,
674 ) -> Result<()> {
675 let snap = Arc::clone(self);
676 let target_addr = target.as_mut_ptr() as usize;
677
678 // Phase 1: Parallel fetch all raw blocks. Each result tracks whether
679 // the data is already decompressed (cache hit / zero block) or still
680 // compressed (storage read), eliminating a TOCTOU race where a background
681 // prefetch thread could modify the cache between fetch and decompression.
682 let raw_blocks: Vec<Result<FetchResult>> = work_items
683 .par_iter()
684 .map(|(block_idx, info, _, _, _)| snap.fetch_raw_block(stream, *block_idx, info))
685 .collect();
686
687 // Phase 2: Parallel decompress and copy
688 let err: Mutex<Option<Error>> = Mutex::new(None);
689 work_items
690 .par_iter()
691 .zip(raw_blocks)
692 .for_each(|(work_item, fetch_result)| {
693 if err.lock().map_or(true, |e| e.is_some()) {
694 return;
695 }
696
697 let (block_idx, info, buf_offset, offset_in_block, to_copy) = work_item;
698
699 // Handle fetch errors
700 let fetched = match fetch_result {
701 Ok(r) => r,
702 Err(e) => {
703 if let Ok(mut guard) = err.lock() {
704 let _ = guard.replace(e);
705 }
706 return;
707 }
708 };
709
710 // Use the FetchResult to determine if decompression is needed,
711 // rather than re-checking the cache (which could give a stale answer).
712 let data = match fetched {
713 FetchResult::Decompressed(data) => data,
714 FetchResult::Compressed(raw) => {
715 match snap.decompress_and_verify(raw, *block_idx, info) {
716 Ok(d) => {
717 // Cache the result
718 snap.cache_l1.insert(stream, *block_idx, d.clone());
719 d
720 }
721 Err(e) => {
722 if let Ok(mut guard) = err.lock() {
723 let _ = guard.replace(e);
724 }
725 return;
726 }
727 }
728 }
729 };
730
731 // Copy to target buffer
732 let src = data.as_ref();
733 let start = *offset_in_block;
734 let len = *to_copy;
735 if start < src.len() && len <= src.len() - start {
736 // Defensive assertion: ensure destination write is within bounds
737 debug_assert!(
738 buf_offset + len <= actual_len,
739 "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
740 len,
741 buf_offset,
742 actual_len
743 );
744 let dest = (target_addr + buf_offset) as *mut u8;
745 // SAFETY: `src[start..start+len]` is in-bounds (checked above).
746 // `dest` points into the `target` MaybeUninit buffer at a unique
747 // non-overlapping offset (each work item has a distinct `buf_offset`),
748 // and the rayon par_iter ensures each item writes to a disjoint region.
749 // The debug_assert above validates buf_offset + len <= actual_len.
750 unsafe { ptr::copy_nonoverlapping(src[start..].as_ptr(), dest, len) };
751 }
752 });
753
754 if let Some(e) = err.lock().ok().and_then(|mut guard| guard.take()) {
755 return Err(e);
756 }
757
758 Ok(())
759 }
760
761 /// Executes serial decompression for a small number of blocks.
762 fn execute_serial_decompression(
763 &self,
764 stream: SnapshotStream,
765 work_items: &[WorkItem],
766 target: &mut [MaybeUninit<u8>],
767 actual_len: usize,
768 ) -> Result<()> {
769 for (block_idx, info, buf_offset, offset_in_block, to_copy) in work_items {
770 let data = self.resolve_block_data(stream, *block_idx, info)?;
771 let src = data.as_ref();
772 let start = *offset_in_block;
773 if start < src.len() && *to_copy <= src.len() - start {
774 // Defensive assertion: ensure destination write is within bounds
775 debug_assert!(
776 *buf_offset + *to_copy <= actual_len,
777 "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
778 to_copy,
779 buf_offset,
780 actual_len
781 );
782 // SAFETY: `src[start..start+to_copy]` is in-bounds (checked above).
783 // `target[buf_offset..]` has sufficient room because `buf_offset + to_copy`
784 // never exceeds `actual_len` (tracked during work-item collection).
785 // The debug_assert above validates this invariant.
786 // MaybeUninit<u8> has the same layout as u8.
787 unsafe {
788 ptr::copy_nonoverlapping(
789 src[start..].as_ptr(),
790 target[*buf_offset..].as_mut_ptr() as *mut u8,
791 *to_copy,
792 );
793 }
794 }
795 }
796 Ok(())
797 }
798
799 /// Zero-fills a slice of uninitialized memory.
800 ///
801 /// This helper centralizes all unsafe zero-filling operations to improve
802 /// safety auditing and reduce code duplication.
803 ///
804 /// # Safety
805 ///
806 /// This function writes zeros to the provided buffer, making it fully initialized.
807 /// The caller must ensure the buffer is valid for writes.
808 #[inline]
809 fn zero_fill_uninit(buffer: &mut [MaybeUninit<u8>]) {
810 if !buffer.is_empty() {
811 // SAFETY: buffer is a valid &mut [MaybeUninit<u8>] slice, so writing
812 // buffer.len() zero bytes through its pointer is in-bounds.
813 unsafe { ptr::write_bytes(buffer.as_mut_ptr(), 0, buffer.len()) };
814 }
815 }
816
817 /// Writes into uninitialized memory. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
818 ///
819 /// **On error:** The buffer contents are undefined (possibly partially written).
820 pub fn read_at_into_uninit(
821 self: &Arc<Self>,
822 stream: SnapshotStream,
823 offset: u64,
824 buffer: &mut [MaybeUninit<u8>],
825 ) -> Result<()> {
826 self.read_at_uninit_inner(stream, offset, buffer, false)
827 }
828
829 /// Inner implementation of [`read_at_into_uninit`](Self::read_at_into_uninit).
830 ///
831 /// The `is_prefetch` flag prevents recursive prefetch thread spawning:
832 /// when `true`, the prefetch block is skipped to avoid unbounded thread creation.
833 fn read_at_uninit_inner(
834 self: &Arc<Self>,
835 stream: SnapshotStream,
836 offset: u64,
837 buffer: &mut [MaybeUninit<u8>],
838 is_prefetch: bool,
839 ) -> Result<()> {
840 // Early validation
841 let len = buffer.len();
842 if len == 0 {
843 return Ok(());
844 }
845
846 let stream_size = self.size(stream);
847 if offset >= stream_size {
848 Self::zero_fill_uninit(buffer);
849 return Ok(());
850 }
851
852 // Calculate actual read length and zero-fill suffix if needed
853 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
854 if actual_len < len {
855 Self::zero_fill_uninit(&mut buffer[actual_len..]);
856 }
857
858 let target = &mut buffer[0..actual_len];
859
860 // Get page list for stream
861 let pages = match stream {
862 SnapshotStream::Primary => &self.master.primary_pages,
863 SnapshotStream::Secondary => &self.master.secondary_pages,
864 };
865
866 // Delegate to parent if no index pages
867 if pages.is_empty() {
868 if let Some(parent) = &self.parent {
869 return parent.read_at_into_uninit(stream, offset, target);
870 }
871 Self::zero_fill_uninit(target);
872 return Ok(());
873 }
874
875 // Find starting page index
876 let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
877 Ok(idx) => idx,
878 Err(idx) => idx.saturating_sub(1),
879 };
880
881 // Collect work items (handles parent blocks, zero blocks, and queues regular blocks)
882 let (work_items, buf_offset) =
883 self.collect_work_items(stream, pages, page_idx, target, offset, actual_len)?;
884
885 // Choose parallel or serial decompression based on work item count
886 if work_items.len() >= Self::PARALLEL_MIN_BLOCKS {
887 self.execute_parallel_decompression(stream, &work_items, target, actual_len)?;
888 } else {
889 self.execute_serial_decompression(stream, &work_items, target, actual_len)?;
890 }
891
892 // Handle any remaining unprocessed data
893 let remaining = actual_len - buf_offset;
894 if remaining > 0 {
895 if let Some(parent) = &self.parent {
896 let current_pos = offset + buf_offset as u64;
897 parent.read_at_into_uninit(stream, current_pos, &mut target[buf_offset..])?;
898 } else {
899 Self::zero_fill_uninit(&mut target[buf_offset..]);
900 }
901 }
902
903 // Trigger prefetch for next sequential blocks if enabled.
904 // Guards:
905 // 1. `is_prefetch` prevents recursive spawning (prefetch thread spawning another)
906 // 2. `try_start()` limits to one in-flight prefetch at a time, preventing
907 // unbounded thread creation under rapid sequential reads
908 if let Some(prefetcher) = &self.prefetcher {
909 if !is_prefetch && !work_items.is_empty() && prefetcher.try_start() {
910 let next_offset = offset + actual_len as u64;
911 let prefetch_len = (self.header.block_size * 4) as usize;
912 let snap = Arc::clone(self);
913 let stream_copy = stream;
914 std::thread::spawn(move || {
915 let mut buf = vec![MaybeUninit::uninit(); prefetch_len];
916 let _ = snap.read_at_uninit_inner(stream_copy, next_offset, &mut buf, true);
917 // Release the in-flight guard so the next read can prefetch
918 if let Some(pf) = &snap.prefetcher {
919 pf.clear_in_flight();
920 }
921 });
922 }
923 }
924
925 Ok(())
926 }
927
928 /// Like [`read_at_into_uninit`](Self::read_at_into_uninit) but accepts `&mut [u8]`. Use from FFI (e.g. Python).
929 #[inline]
930 pub fn read_at_into_uninit_bytes(
931 self: &Arc<Self>,
932 stream: SnapshotStream,
933 offset: u64,
934 buf: &mut [u8],
935 ) -> Result<()> {
936 if buf.is_empty() {
937 return Ok(());
938 }
939 // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
940 // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
941 // The borrow is derived from `buf` so no aliasing occurs.
942 let uninit = unsafe { &mut *(buf as *mut [u8] as *mut [MaybeUninit<u8>]) };
943 self.read_at_into_uninit(stream, offset, uninit)
944 }
945
946 /// Fetches an index page from cache or storage.
947 ///
948 /// Index pages map logical offsets to physical block locations. This method
949 /// maintains an LRU cache to avoid repeated deserialization.
950 ///
951 /// # Parameters
952 ///
953 /// - `entry`: Page metadata from master index
954 ///
955 /// # Returns
956 ///
957 /// A shared reference to the deserialized index page.
958 ///
959 /// # Thread Safety
960 ///
961 /// This method acquires a lock on the page cache only for cache lookup and insertion.
962 /// I/O and deserialization are performed without holding the lock to avoid blocking
963 /// other threads during cache misses.
964 pub(crate) fn get_page(&self, entry: &PageEntry) -> Result<Arc<IndexPage>> {
965 // Fast path: check sharded cache
966 if let Some(p) = self.page_cache.get(entry.offset) {
967 return Ok(p);
968 }
969
970 // Slow path: I/O and deserialization without holding any lock
971 let bytes = self
972 .backend
973 .read_exact(entry.offset, entry.length as usize)?;
974 let page: IndexPage = bincode::deserialize(&bytes)?;
975 let arc = Arc::new(page);
976
977 // Check again in case another thread inserted while we were doing I/O
978 if let Some(p) = self.page_cache.get(entry.offset) {
979 return Ok(p);
980 }
981 self.page_cache.insert(entry.offset, arc.clone());
982
983 Ok(arc)
984 }
985
986 /// Fetches raw compressed block data from cache or storage.
987 ///
988 /// This is the I/O portion of block resolution, separated to enable parallel I/O.
989 /// It:
990 /// 1. Checks the block cache
991 /// 2. Handles zero-length blocks
992 /// 3. Reads raw compressed data from backend
993 ///
994 /// # Parameters
995 ///
996 /// - `stream`: Stream identifier (for cache key)
997 /// - `block_idx`: Global block index
998 /// - `info`: Block metadata (offset, length)
999 ///
1000 /// # Returns
1001 ///
1002 /// Raw block data (potentially compressed/encrypted) or cached decompressed data.
1003 fn fetch_raw_block(
1004 &self,
1005 stream: SnapshotStream,
1006 block_idx: u64,
1007 info: &BlockInfo,
1008 ) -> Result<FetchResult> {
1009 // Check cache first - return decompressed data if available
1010 if let Some(data) = self.cache_l1.get(stream, block_idx) {
1011 return Ok(FetchResult::Decompressed(data));
1012 }
1013
1014 // Handle zero blocks
1015 if info.length == 0 {
1016 let len = info.logical_len as usize;
1017 if len == 0 {
1018 return Ok(FetchResult::Decompressed(Bytes::new()));
1019 }
1020 if len == ZEROS_64K.len() {
1021 return Ok(FetchResult::Decompressed(Bytes::from_static(&ZEROS_64K)));
1022 }
1023 return Ok(FetchResult::Decompressed(Bytes::from(vec![0u8; len])));
1024 }
1025
1026 // Fetch raw compressed data (THIS IS THE PARALLEL PART)
1027 self.backend
1028 .read_exact(info.offset, info.length as usize)
1029 .map(FetchResult::Compressed)
1030 }
1031
1032 /// Decompresses and verifies a raw block.
1033 ///
1034 /// This is the CPU portion of block resolution, separated to enable parallel decompression.
1035 /// It:
1036 /// 1. Verifies CRC32 checksum
1037 /// 2. Decrypts (if encrypted)
1038 /// 3. Decompresses
1039 ///
1040 /// # Parameters
1041 ///
1042 /// - `raw`: Raw block data (potentially compressed/encrypted)
1043 /// - `block_idx`: Global block index (for error reporting and decryption)
1044 /// - `info`: Block metadata (checksum)
1045 ///
1046 /// # Returns
1047 ///
1048 /// Decompressed block data as `Bytes`.
1049 ///
1050 /// # Performance
1051 ///
1052 /// Decompression throughput:
1053 /// - LZ4: ~2 GB/s per core
1054 /// - Zstd: ~500 MB/s per core
1055 fn decompress_and_verify(&self, raw: Bytes, block_idx: u64, info: &BlockInfo) -> Result<Bytes> {
1056 // Verify stored checksum (CRC32 of compressed/encrypted data) before decrypt/decompress
1057 if info.checksum != 0 {
1058 let computed = crc32_hash(&raw);
1059 if computed != info.checksum {
1060 return Err(Error::Corruption(block_idx));
1061 }
1062 }
1063
1064 // Pre-allocate exact output buffer to avoid over-allocation inside decompressor.
1065 // We use decompress_into() instead of decompress() to eliminate the allocation
1066 // and potential reallocation overhead inside the compression library.
1067 //
1068 // Performance impact: Avoids zero-initialization overhead (~16% improvement for
1069 // high-thread-count workloads based on benchmarks).
1070 let out_len = info.logical_len as usize;
1071 let mut out = Vec::with_capacity(out_len);
1072
1073 // SAFETY: This unsafe block is required to create an uninitialized buffer for
1074 // decompress_into() to write into. This is safe because:
1075 //
1076 // 1. Contract guarantee: Both LZ4 and Zstd decompress_into() implementations
1077 // promise to either:
1078 // a) Write exactly `out.len()` bytes (the full decompressed size), OR
1079 // b) Return an Err() if decompression fails (buffer underrun, corruption, etc.)
1080 //
1081 // 2. Size accuracy: We set out.len() to info.logical_len, which is the exact
1082 // decompressed size recorded in the block metadata during compression.
1083 // The decompressor will write exactly this many bytes or fail.
1084 //
1085 // 3. Error propagation: If decompress_into() returns Err(), we propagate it
1086 // immediately via the ? operator. The uninitialized buffer is dropped
1087 // without ever being read.
1088 //
1089 // 4. No partial writes: The decompressor APIs do not support partial writes.
1090 // They either fully succeed or fully fail. We never access a partially
1091 // initialized buffer.
1092 //
1093 // 5. Memory safety: We never read from `out` before decompress_into() succeeds.
1094 // The only subsequent access is Bytes::from(out), which transfers ownership
1095 // of the now-fully-initialized buffer.
1096 //
1097 // This is a well-established pattern for zero-copy decompression. The clippy
1098 // lint is conservative and warns about ANY use of set_len() after with_capacity(),
1099 // but in this case we have explicit API guarantees from the decompressor.
1100 #[allow(clippy::uninit_vec)]
1101 unsafe {
1102 out.set_len(out_len);
1103 }
1104
1105 if let Some(enc) = &self.encryptor {
1106 let compressed = enc.decrypt(&raw, block_idx)?;
1107 self.compressor.decompress_into(&compressed, &mut out)?;
1108 } else {
1109 self.compressor.decompress_into(raw.as_ref(), &mut out)?;
1110 }
1111
1112 Ok(Bytes::from(out))
1113 }
1114
1115 /// Resolves raw block data by fetching from cache or decompressing from storage.
1116 ///
1117 /// This is the core decompression path. It:
1118 /// 1. Checks the block cache
1119 /// 2. Reads compressed block from backend
1120 /// 3. Verifies CRC32 checksum (if stored) and returns `Corruption(block_idx)` on mismatch
1121 /// 4. Decrypts (if encrypted)
1122 /// 5. Decompresses
1123 /// 6. Caches the result
1124 ///
1125 /// # Parameters
1126 ///
1127 /// - `stream`: Stream identifier (for cache key)
1128 /// - `block_idx`: Global block index
1129 /// - `info`: Block metadata (offset, length, compression)
1130 ///
1131 /// # Returns
1132 ///
1133 /// Decompressed block data as `Bytes` (zero-copy on cache hit).
1134 ///
1135 /// # Performance
1136 ///
1137 /// This method is hot path for cache misses. Decompression throughput:
1138 /// - LZ4: ~2 GB/s per core
1139 /// - Zstd: ~500 MB/s per core
1140 pub(crate) fn resolve_block_data(
1141 &self,
1142 stream: SnapshotStream,
1143 block_idx: u64,
1144 info: &BlockInfo,
1145 ) -> Result<Bytes> {
1146 // Fetch block (from cache or I/O). The FetchResult tracks whether
1147 // data is already decompressed, avoiding a TOCTOU race where a
1148 // background prefetch thread could modify the cache between fetch
1149 // and the decompression decision.
1150 match self.fetch_raw_block(stream, block_idx, info)? {
1151 FetchResult::Decompressed(data) => Ok(data),
1152 FetchResult::Compressed(raw) => {
1153 let data = self.decompress_and_verify(raw, block_idx, info)?;
1154 self.cache_l1.insert(stream, block_idx, data.clone());
1155 Ok(data)
1156 }
1157 }
1158 }
1159}