parcode/reader.rs
1//! The Read-Side Engine: Parallel Reconstruction & Random Access.
2//!
3//! This module implements the complete reading pipeline for Parcode files, providing both
4//! eager (full deserialization) and lazy (on-demand) loading strategies. It leverages memory
5//! mapping, parallel reconstruction, and zero-copy techniques to maximize performance.
6//!
7//! ## Core Architecture
8//!
9//! The reader is built on three foundational techniques:
10//!
11//! ### 1. Memory Mapping (`mmap`)
12//!
13//! Instead of reading the entire file into memory, Parcode uses `mmap` to map the file
14//! directly into the process's address space. This provides several benefits:
15//!
16//! - **Instant Startup:** Opening a file is O(1) regardless of size
17//! - **OS-Managed Paging:** The operating system handles loading pages on demand
18//! - **Zero-Copy Reads:** Uncompressed data can be read directly from the mapped region
19//! - **Shared Memory:** Multiple processes can share the same mapped file
20//!
21//! ### 2. Lazy Traversal
22//!
23//! The file is traversed lazily - we only read and decompress bytes when a specific node
24//! is requested. This enables:
25//!
26//! - **Cold Start Performance:** Applications can start in microseconds
27//! - **Selective Loading:** Load only the data you need
28//! - **Deep Navigation:** Traverse object hierarchies without I/O
29//!
30//! ### 3. Parallel Zero-Copy Stitching
31//!
32//! When reconstructing large `Vec<T>`, we use a sophisticated parallel algorithm:
33//!
34//! ```text
35//! ┌─────────────────────────────────────────────────────────────┐
36//! │ 1. Pre-allocate uninitialized buffer (MaybeUninit<T>) │
37//! ├─────────────────────────────────────────────────────────────┤
38//! │ 2. Calculate destination offset for each shard │
39//! ├─────────────────────────────────────────────────────────────┤
40//! │ 3. Spawn parallel workers (Rayon) │
41//! ├─────────────────────────────────────────────────────────────┤
42//! │ 4. Each worker: │
43//! │ - Decompresses its shard │
44//! │ - Deserializes items │
45//! │ - Writes directly to final buffer (ptr::copy) │
46//! ├─────────────────────────────────────────────────────────────┤
47//! │ 5. Transmute buffer to Vec<T> (all items initialized) │
48//! └─────────────────────────────────────────────────────────────┘
49//! ```
50//!
51//! **Result:** Maximum memory bandwidth, zero intermediate allocations, perfect parallelism.
52//!
53//! ## O(1) Arithmetic Navigation
54//!
55//! Using the RLE (Run-Length Encoding) metadata stored in container nodes, we can calculate
56//! exactly which physical chunk holds the Nth item of a collection. This enables:
57//!
58//! - **Random Access:** `vec.get(1_000_000)` without loading the entire vector
59//! - **Constant Time:** O(1) shard selection via arithmetic
60//! - **Minimal I/O:** Load only the shard containing the target item
61//!
62//! ## Trait System for Strategy Selection
63//!
64//! The module defines two key traits that enable automatic strategy selection:
65//!
66//! ### [`ParcodeNative`]
67//!
68//! Types implementing this trait know how to reconstruct themselves from a [`ChunkNode`].
69//! The high-level API ([`Parcode::read`](crate::Parcode::read)) uses this trait to
70//! automatically select the optimal reconstruction strategy:
71//!
72//! - **`Vec<T>`:** Uses parallel reconstruction across shards
73//! - **`HashMap<K, V>`:** Reconstructs all shards and merges entries
74//! - **Primitives/Structs:** Uses sequential deserialization
75//!
76//! ### [`ParcodeItem`]
77//!
78//! Types implementing this trait can be read from a shard (payload + children). This trait
79//! is used internally during parallel reconstruction to deserialize individual items or
80//! slices of items from shard payloads.
81//!
82//! ## Usage Patterns
83//!
84//! ### Eager Loading (Full Deserialization)
85//!
86//! ```rust
87//! use parcode::Parcode;
88//!
89//! // Load entire object into memory
90//! let data = vec![1, 2, 3];
91//! Parcode::save("numbers_reader.par", &data).unwrap();
92//! let data: Vec<i32> = Parcode::read("numbers_reader.par").unwrap();
93//! # std::fs::remove_file("numbers_reader.par").ok();
94//! ```
95//!
96//! ### Lazy Loading (On-Demand)
97//!
98//! ```rust
99//! use parcode::{Parcode, ParcodeReader, ParcodeObject};
100//! use serde::{Serialize, Deserialize};
101//!
102//! #[derive(Serialize, Deserialize, ParcodeObject)]
103//! struct Assets {
104//! #[parcode(chunkable)]
105//! data: Vec<u8> }
106//!
107//! #[derive(Serialize, Deserialize, ParcodeObject)]
108//! struct GameState {
109//! level: u32,
110//! #[parcode(chunkable)]
111//! assets: Assets,
112//! }
113//!
114//! // Setup
115//! let state = GameState { level: 1, assets: Assets { data: vec![0; 10] } };
116//! Parcode::save("game_reader.par", &state).unwrap();
117//!
118//! let reader = ParcodeReader::open("game_reader.par").unwrap();
119//! let game_lazy = reader.read_lazy::<GameState>().unwrap();
120//!
121//! // Access local fields (instant, already in memory)
122//! println!("Level: {}", game_lazy.level);
123//!
124//! // Load remote fields on demand
125//! let assets_data = game_lazy.assets.data.load().unwrap();
126//! # std::fs::remove_file("game_reader.par").ok();
127//! ```
128//!
129//! ### Random Access
130//!
131//! ```rust
132//! use parcode::{Parcode, ParcodeReader, ParcodeObject};
133//! use serde::{Serialize, Deserialize};
134//!
135//! #[derive(Serialize, Deserialize, ParcodeObject, Clone, Debug)]
136//! struct MyStruct { val: u32 }
137//!
138//! // Setup
139//! let data: Vec<MyStruct> = (0..100).map(|i| MyStruct { val: i }).collect();
140//! Parcode::save("data_random.par", &data).unwrap();
141//!
142//! let reader = ParcodeReader::open("data_random.par").unwrap();
143//! let root = reader.root().unwrap();
144//!
145//! // Get item at index 50 without loading the entire vector
146//! // Note: Using 50 instead of 1,000,000 for a realistic small test
147//! let item: MyStruct = root.decode_parallel_collection::<MyStruct>().unwrap().get(50).unwrap().clone();
148//! # std::fs::remove_file("data_random.par").ok();
149//! ```
150//!
151//! ### Streaming Iteration
152//!
153//! ```rust
154//! use parcode::{Parcode, ParcodeReader, ParcodeObject};
155//! use serde::{Serialize, Deserialize};
156//!
157//! #[derive(Serialize, Deserialize, ParcodeObject, Clone, Debug)]
158//! struct MyStruct { val: u32 }
159//!
160//! fn process(item: MyStruct) { println!("{:?}", item); }
161//!
162//! // Setup
163//! let data: Vec<MyStruct> = (0..10).map(|i| MyStruct { val: i }).collect();
164//! Parcode::save("data_iter.par", &data).unwrap();
165//!
166//! let reader = ParcodeReader::open("data_iter.par").unwrap();
167//! let root = reader.root().unwrap();
168//!
169//! // Note: The current API doesn't have a direct `iter` on root for Vecs yet,
170//! // it usually goes through read_lazy or decode.
171//! // Assuming we just decode for now as the example implies iteration capability.
172//! let items: Vec<MyStruct> = root.decode_parallel_collection().unwrap();
173//! for item in items {
174//! process(item);
175//! }
176//! # std::fs::remove_file("data_iter.par").ok();
177//! ```
178//!
179//! ## Performance Characteristics
180//!
181//! - **File Opening:** O(1) - just maps the file
182//! - **Root Access:** O(1) - reads only the global header
183//! - **Random Access:** O(1) - arithmetic shard selection + single shard load
184//! - **Parallel Reconstruction:** O(N/cores) - scales linearly with CPU cores
185//! - **Memory Usage (Lazy):** O(accessed chunks) - only loaded data consumes RAM
186//! - **Memory Usage (Eager):** O(N) - entire object in memory
187//!
188//! ## Thread Safety
189//!
190//! - **[`ParcodeReader`]:** Cheap to clone (Arc-based), safe to share across threads
191//! - **[`ChunkNode`]:** Immutable view, safe to share across threads
192//! - **Parallel Reconstruction:** Uses Rayon's work-stealing scheduler
193//!
194//! ## Safety Considerations
195//!
196//! The module uses `unsafe` code in two specific contexts:
197//!
198//! 1. **Memory Mapping:** `mmap` is inherently unsafe if the file is modified externally.
199//! We assume files are immutable during reading.
200//!
201//! 2. **Parallel Stitching:** Uses `MaybeUninit` and pointer arithmetic to avoid
202//! initialization overhead. All unsafe operations are carefully encapsulated and
203//! documented with safety invariants.
204
205use memmap2::Mmap;
206use rayon::prelude::*;
207use serde::{Deserialize, de::DeserializeOwned};
208use std::borrow::Cow;
209use std::collections::HashMap;
210use std::fs::File;
211use std::hash::Hash;
212use std::marker::PhantomData;
213use std::mem::{ManuallyDrop, MaybeUninit};
214use std::path::Path;
215use std::sync::Arc;
216
217use crate::compression::CompressorRegistry;
218use crate::error::{ParcodeError, Result};
219use crate::format::{ChildRef, GLOBAL_HEADER_SIZE, GlobalHeader, MAGIC_BYTES, MetaByte};
220use crate::rt::ParcodeLazyRef;
221
222// --- TRAIT SYSTEM FOR AUTOMATIC STRATEGY SELECTION ---
223
224/// A trait for types that know how to reconstruct themselves from a [`ChunkNode`].
225///
226/// This trait enables the high-level API ([`Parcode::read`](crate::Parcode::read)) to
227/// automatically select the optimal reconstruction strategy based on the type being read.
228///
229/// ## Strategy Selection
230///
231/// Different types use different reconstruction strategies:
232///
233/// - **`Vec<T>`:** Parallel reconstruction across shards (see [`ChunkNode::decode_parallel_collection`])
234/// - **`HashMap<K, V>`:** Shard merging with SOA deserialization
235/// - **Primitives:** Direct bincode deserialization
236/// - **Custom Structs:** Sequential deserialization of local fields + recursive child loading
237///
238/// ## Automatic Implementation
239///
240/// This trait is automatically implemented by the `#[derive(ParcodeObject)]` macro for custom
241/// structs. Primitive types and standard collections have manual implementations in this module.
242///
243/// ## Example
244///
245/// ```rust
246/// use parcode::Parcode;
247/// use parcode::reader::ParcodeNative;
248///
249/// // Automatically selects parallel reconstruction for Vec
250/// let data = vec![1, 2, 3];
251/// Parcode::save("numbers_native.par", &data).unwrap();
252/// let data: Vec<i32> = Parcode::read("numbers_native.par").unwrap();
253///
254/// // Automatically selects sequential deserialization for primitives
255/// let val = 42;
256/// Parcode::save("value_native.par", &val).unwrap();
257/// let value: i32 = Parcode::read("value_native.par").unwrap();
258/// # std::fs::remove_file("numbers_native.par").ok();
259/// # std::fs::remove_file("value_native.par").ok();
260/// ```
261pub trait ParcodeNative: Sized {
262 /// Reconstructs the object from the given graph node.
263 ///
264 /// This method is called by [`Parcode::read`](crate::Parcode::read) after opening the
265 /// file and locating the root chunk. Implementations should choose the most efficient
266 /// reconstruction strategy for their type.
267 ///
268 /// ## Parameters
269 ///
270 /// * `node`: The chunk node to reconstruct from (typically the root node)
271 ///
272 /// ## Returns
273 ///
274 /// The fully reconstructed object of type `Self`.
275 ///
276 /// ## Errors
277 ///
278 /// Returns an error if:
279 /// - Decompression fails
280 /// - Deserialization fails (type mismatch, corrupted data)
281 /// - Child nodes are missing or invalid
282 fn from_node(node: &ChunkNode<'_>) -> Result<Self>;
283}
284
285/// A trait for types that can be read from a shard (payload + children).
286///
287/// This trait is used internally during parallel reconstruction to deserialize individual
288/// items or slices of items from shard payloads. It provides two methods:
289///
290/// - [`read_from_shard`](Self::read_from_shard): Reads a single item
291/// - [`read_slice_from_shard`](Self::read_slice_from_shard): Reads multiple items (optimized)
292///
293/// ## Automatic Implementation
294///
295/// This trait is automatically implemented by the `#[derive(ParcodeObject)]` macro. Primitive
296/// types have optimized implementations that use bulk deserialization.
297///
298/// ## Thread Safety
299///
300/// Implementations must be `Send + Sync + 'static` to support parallel reconstruction across
301/// threads. This is automatically satisfied for most types.
302pub trait ParcodeItem: Sized + Send + Sync + 'static {
303 /// Reads a single item from the shard payload and children.
304 ///
305 /// This method is called during deserialization to reconstruct individual items from
306 /// a shard's payload. For types with chunkable fields, this method should deserialize
307 /// local fields from the reader and reconstruct remote fields from the children iterator.
308 ///
309 /// ## Parameters
310 ///
311 /// * `reader`: Cursor over the shard's decompressed payload
312 /// * `children`: Iterator over child nodes (for chunkable fields)
313 ///
314 /// ## Returns
315 ///
316 /// The deserialized item.
317 ///
318 /// ## Errors
319 ///
320 /// Returns an error if deserialization fails or children are missing.
321 fn read_from_shard(
322 reader: &mut std::io::Cursor<&[u8]>,
323 children: &mut std::vec::IntoIter<ChunkNode<'_>>,
324 ) -> Result<Self>;
325
326 /// Reads a slice of items from the shard payload and children.
327 ///
328 /// This method provides an optimization opportunity for types that can deserialize
329 /// multiple items more efficiently than calling [`read_from_shard`](Self::read_from_shard)
330 /// in a loop.
331 ///
332 /// ## Default Implementation
333 ///
334 /// The default implementation reads the slice length (u64) and then calls
335 /// `read_from_shard` for each item. Primitive types override this to use bulk
336 /// deserialization.
337 ///
338 /// ## Parameters
339 ///
340 /// * `reader`: Cursor over the shard's decompressed payload
341 /// * `children`: Iterator over child nodes (for chunkable fields)
342 ///
343 /// ## Returns
344 ///
345 /// A vector containing all deserialized items.
346 ///
347 /// ## Errors
348 ///
349 /// Returns an error if deserialization fails or the slice length exceeds `usize`.
350 fn read_slice_from_shard(
351 reader: &mut std::io::Cursor<&[u8]>,
352 children: &mut std::vec::IntoIter<ChunkNode<'_>>,
353 ) -> Result<Vec<Self>> {
354 // Default implementation: Read length, then loop
355 let len =
356 bincode::serde::decode_from_std_read::<u64, _, _>(reader, bincode::config::standard())
357 .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
358
359 let mut vec = Vec::with_capacity(
360 usize::try_from(len)
361 .map_err(|_| ParcodeError::Serialization("Vector length exceeds usize".into()))?,
362 );
363 for _ in 0..len {
364 vec.push(Self::read_from_shard(reader, children)?);
365 }
366 Ok(vec)
367 }
368}
369
370macro_rules! impl_primitive_parcode_item {
371 ($($t:ty),*) => {
372 $(
373 impl ParcodeItem for $t {
374 fn read_from_shard(
375 reader: &mut std::io::Cursor<&[u8]>,
376 _children: &mut std::vec::IntoIter<ChunkNode<'_>>,
377 ) -> Result<Self> {
378 bincode::serde::decode_from_std_read(reader, bincode::config::standard())
379 .map_err(|e| ParcodeError::Serialization(e.to_string()))
380 }
381
382 // Optimize slice reading for primitives (bulk read)
383 fn read_slice_from_shard(
384 reader: &mut std::io::Cursor<&[u8]>,
385 _children: &mut std::vec::IntoIter<ChunkNode<'_>>,
386 ) -> Result<Vec<Self>> {
387 bincode::serde::decode_from_std_read(reader, bincode::config::standard())
388 .map_err(|e| ParcodeError::Serialization(e.to_string()))
389 }
390 }
391 )*
392 }
393}
394
395impl_primitive_parcode_item!(
396 u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, f32, f64, bool, String
397);
398
399macro_rules! impl_primitive_parcode_native {
400 ($($t:ty),*) => {
401 $(
402 impl ParcodeNative for $t {
403 fn from_node(node: &ChunkNode<'_>) -> Result<Self> {
404 node.decode()
405 }
406 }
407 )*
408 }
409}
410
411impl_primitive_parcode_native!(
412 u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, f32, f64, bool, String
413);
414
415/// Optimized implementation for Vectors: Uses Parallel Stitching.
416impl<T> ParcodeNative for Vec<T>
417where
418 T: ParcodeItem,
419{
420 fn from_node(node: &ChunkNode<'_>) -> Result<Self> {
421 node.decode_parallel_collection()
422 }
423}
424
425impl<K, V> ParcodeNative for HashMap<K, V>
426where
427 K: DeserializeOwned + Eq + Hash + Send + Sync,
428 V: DeserializeOwned + Send + Sync,
429{
430 fn from_node(node: &ChunkNode<'_>) -> Result<Self> {
431 // 1. Read container (num shards)
432 let container_payload = node.read_raw()?;
433 if container_payload.len() < 4 {
434 return Ok(Self::new());
435 }
436
437 // If it's a Blob, node.child_count == 0.
438 if node.child_count == 0 {
439 return node.decode(); // Fallback to normal Bincode
440 }
441
442 // If it has children, it's a Sharded Map.
443 let shards = node.children()?;
444 let mut map = Self::new();
445
446 for shard in shards {
447 let payload = shard.read_raw()?;
448 if payload.len() < 8 {
449 continue;
450 }
451
452 let count = u32::from_le_bytes(
453 payload
454 .get(0..4)
455 .ok_or_else(|| ParcodeError::Format("Payload too short for count".into()))?
456 .try_into()
457 .map_err(|_| ParcodeError::Format("Failed to read count".into()))?,
458 ) as usize;
459 let offsets_start = 8 + (count * 8);
460 let data_start = offsets_start + (count * 4);
461 let offsets_bytes = payload
462 .get(offsets_start..data_start)
463 .ok_or_else(|| ParcodeError::Format("Offsets out of bounds".into()))?;
464
465 for i in 0..count {
466 let off_bytes = offsets_bytes
467 .get(i * 4..(i + 1) * 4)
468 .ok_or_else(|| ParcodeError::Format("Offset index out of bounds".into()))?;
469 let offset = u32::from_le_bytes(
470 off_bytes
471 .try_into()
472 .map_err(|_| ParcodeError::Format("Failed to read offset".into()))?,
473 ) as usize;
474 let data_slice = payload
475 .get(data_start + offset..)
476 .ok_or_else(|| ParcodeError::Format("Data slice out of bounds".into()))?;
477 let (k, v) =
478 bincode::serde::decode_from_slice(data_slice, bincode::config::standard())
479 .map_err(|e| ParcodeError::Serialization(e.to_string()))?
480 .0;
481 map.insert(k, v);
482 }
483 }
484 Ok(map)
485 }
486}
487
488// --- CORE READER HANDLE ---
489
490/// The main handle for an open Parcode file.
491///
492/// It holds the memory map (thread-safe via Arc), the global file header,
493/// and the registry of available decompression algorithms.
494/// Cloning this struct is cheap (increments Arc ref count).
495#[derive(Debug)]
496pub struct ParcodeReader {
497 /// Memory-mapped file content.
498 mmap: Arc<Mmap>,
499 /// Parsed global footer/header information.
500 header: GlobalHeader,
501 /// Total size of the file in bytes.
502 file_size: u64,
503 /// Registry containing available decompression algorithms (Lz4, etc.).
504 registry: CompressorRegistry,
505}
506
507impl ParcodeReader {
508 /// Opens a Parcode file, maps it into memory, and validates integrity.
509 ///
510 /// # Errors
511 /// Returns error if the file does not exist, is smaller than the header,
512 /// or contains invalid magic bytes/version.
513 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
514 let file = File::open(path)?;
515 let file_size = file.metadata()?.len();
516
517 if file_size < GLOBAL_HEADER_SIZE as u64 {
518 return Err(ParcodeError::Format(
519 "File is smaller than the global header".into(),
520 ));
521 }
522
523 // SAFETY: Mmap is fundamentally unsafe in the presence of external modification
524 // (e.g., another process truncating the file). We assume the file is treated
525 // as immutable by the OS while we read it.
526 #[allow(unsafe_code)]
527 let mmap = unsafe { Mmap::map(&file)? };
528
529 // Read Global Header (Located at the very end of the file)
530 let header_start = usize::try_from(file_size)
531 .map_err(|_| ParcodeError::Format("File too large for address space".into()))?
532 - GLOBAL_HEADER_SIZE;
533 let header_bytes = mmap
534 .get(header_start..)
535 .ok_or_else(|| ParcodeError::Format("Header start out of bounds".into()))?;
536
537 if header_bytes.get(0..4) != Some(&MAGIC_BYTES) {
538 return Err(ParcodeError::Format(
539 "Invalid Magic Bytes. Not a Parcode file.".into(),
540 ));
541 }
542
543 let version = u16::from_le_bytes(
544 header_bytes
545 .get(4..6)
546 .ok_or_else(|| ParcodeError::Format("Version out of bounds".into()))?
547 .try_into()
548 .map_err(|_| ParcodeError::Format("Failed to read version".into()))?,
549 );
550 if version != 4 {
551 return Err(ParcodeError::Format(format!(
552 "Unsupported version: {version}. Expected V4."
553 )));
554 }
555
556 let root_offset = u64::from_le_bytes(
557 header_bytes
558 .get(6..14)
559 .ok_or_else(|| ParcodeError::Format("Root offset out of bounds".into()))?
560 .try_into()
561 .map_err(|_| ParcodeError::Format("Failed to read root_offset".into()))?,
562 );
563 let root_length = u64::from_le_bytes(
564 header_bytes
565 .get(14..22)
566 .ok_or_else(|| ParcodeError::Format("Root length out of bounds".into()))?
567 .try_into()
568 .map_err(|_| ParcodeError::Format("Failed to read root_length".into()))?,
569 );
570 let checksum = u32::from_le_bytes(
571 header_bytes
572 .get(22..26)
573 .ok_or_else(|| ParcodeError::Format("Checksum out of bounds".into()))?
574 .try_into()
575 .map_err(|_| ParcodeError::Format("Failed to read checksum".into()))?,
576 );
577
578 Ok(Self {
579 mmap: Arc::new(mmap),
580 header: GlobalHeader {
581 magic: MAGIC_BYTES,
582 version,
583 root_offset,
584 root_length,
585 checksum,
586 },
587 file_size,
588 // Initialize registry with default algorithms (NoCompression, Lz4 if enabled)
589 registry: CompressorRegistry::new(),
590 })
591 }
592
593 /// Helper to read a u32 from a byte slice (Little Endian).
594 fn read_u32(slice: &[u8]) -> Result<u32> {
595 slice
596 .try_into()
597 .map(u32::from_le_bytes)
598 .map_err(|_| ParcodeError::Format("Failed to read u32".into()))
599 }
600
601 /// Returns a cursor to the Root Chunk of the object graph.
602 pub fn root(&self) -> Result<ChunkNode<'_>> {
603 self.get_chunk(self.header.root_offset, self.header.root_length)
604 }
605
606 /// Internal: Resolves a physical offset/length into a `ChunkNode`.
607 /// Parses the footer to determine if the chunk has children.
608 ///
609 /// # Arguments
610 /// * `offset`: Absolute byte offset in the file.
611 /// * `length`: Total length of the chunk including metadata.
612 fn get_chunk(&self, offset: u64, length: u64) -> Result<ChunkNode<'_>> {
613 if offset + length > self.file_size {
614 return Err(ParcodeError::Format(format!(
615 "Chunk out of bounds: {} + {}",
616 offset, length
617 )));
618 }
619 let chunk_end = usize::try_from(offset + length)
620 .map_err(|_| ParcodeError::Format("Chunk end exceeds address space".into()))?;
621
622 // Read the MetaByte (Last byte of the chunk)
623 let meta = MetaByte::from_byte(
624 *self
625 .mmap
626 .get(chunk_end - 1)
627 .ok_or_else(|| ParcodeError::Format("MetaByte out of bounds".into()))?,
628 );
629
630 let mut child_count = 0;
631 let mut payload_end = chunk_end - 1; // Default: payload ends just before MetaByte
632
633 if meta.is_chunkable() {
634 // Layout: [Payload] ... [ChildRefs] [ChildCount (4 bytes)] [MetaByte (1 byte)]
635 if length < 5 {
636 return Err(ParcodeError::Format("Chunk too small for metadata".into()));
637 }
638
639 let count_start = chunk_end - 5;
640 let child_count_bytes = self
641 .mmap
642 .get(count_start..count_start + 4)
643 .ok_or_else(|| ParcodeError::Format("Child count out of bounds".into()))?;
644 child_count = Self::read_u32(child_count_bytes)?;
645
646 let footer_size = child_count as usize * ChildRef::SIZE;
647 let total_meta_size = 1 + 4 + footer_size;
648
649 if length < total_meta_size as u64 {
650 return Err(ParcodeError::Format("Invalid footer size".into()));
651 }
652 payload_end = chunk_end - total_meta_size;
653 }
654
655 Ok(ChunkNode {
656 reader: self,
657 offset,
658 length,
659 meta,
660 child_count,
661 payload_end_offset: offset + (payload_end as u64 - offset),
662 })
663 }
664
665 /// Reads an object lazily, returning a generated Mirror struct.
666 /// This parses local fields immediately but keeps remote fields as handles.
667 ///
668 /// The returned Lazy object is tied to the lifetime of the `ParcodeReader`.
669 pub fn read_lazy<'a, T>(&'a self) -> Result<T::Lazy>
670 where
671 T: ParcodeLazyRef<'a>,
672 {
673 let root = self.root()?;
674 T::create_lazy(root)
675 }
676}
677
678// --- CHUNK NODE API ---
679
680/// A lightweight cursor pointing to a specific node in the dependency graph.
681///
682/// This struct contains the logic to read, decompress, and navigate from this node.
683/// It is a "view" into the `ParcodeReader` and holds a lifetime reference to it.
684#[derive(Debug, Clone)]
685pub struct ChunkNode<'a> {
686 reader: &'a ParcodeReader,
687 /// Physical start offset.
688 offset: u64,
689 /// Total physical length.
690 #[allow(dead_code)]
691 length: u64,
692 /// Parsed metadata flags.
693 meta: MetaByte,
694 /// Number of direct children (Shards).
695 child_count: u32,
696 /// Calculated end of the payload data.
697 payload_end_offset: u64,
698}
699
700/// Helper struct for deserializing RLE metadata stored in vector headers.
701#[derive(Deserialize, Debug, Clone)]
702struct ShardRun {
703 item_count: u32,
704 repeat: u32,
705}
706
707impl<'a> ChunkNode<'a> {
708 /// Reads and decompresses the local payload of this chunk.
709 ///
710 /// This returns `Cow`, so if no compression was used, it returns a direct reference
711 /// to the mmap (Zero-Copy). If compressed, it allocates the decompressed buffer.
712 pub fn read_raw(&self) -> Result<Cow<'a, [u8]>> {
713 let start = usize::try_from(self.offset)
714 .map_err(|_| ParcodeError::Format("Offset exceeds address space".into()))?;
715 let end = usize::try_from(self.payload_end_offset)
716 .map_err(|_| ParcodeError::Format("End offset exceeds address space".into()))?;
717
718 if end > self.reader.mmap.len() {
719 return Err(ParcodeError::Format(
720 "Payload offset out of mmap bounds".into(),
721 ));
722 }
723
724 let raw = self
725 .reader
726 .mmap
727 .get(start..end)
728 .ok_or_else(|| ParcodeError::Format("Payload out of bounds".into()))?;
729 let method_id = self.meta.compression_method();
730
731 // Delegate decompression to the registry.
732 // This supports pluggable algorithms (e.g., Lz4).
733 self.reader.registry.get(method_id)?.decompress(raw)
734 }
735
736 /// Returns a list of all direct child nodes.
737 ///
738 /// This allows manual traversal of the dependency graph (e.g., iterating over specific shards).
739 /// Note: This does not deserialize the children, only loads their metadata (offsets).
740 pub fn children(&self) -> Result<Vec<Self>> {
741 let mut list = Vec::with_capacity(self.child_count as usize);
742 for i in 0..self.child_count {
743 list.push(self.get_child_by_index(i as usize)?);
744 }
745 Ok(list)
746 }
747
748 /// Standard single-threaded deserialization.
749 /// Use this for leaf nodes or simple structs that fit in memory.
750 pub fn decode<T: DeserializeOwned>(&self) -> Result<T> {
751 let payload = self.read_raw()?;
752 bincode::serde::decode_from_slice(&payload, bincode::config::standard())
753 .map(|(obj, _)| obj)
754 .map_err(|e| ParcodeError::Serialization(e.to_string()))
755 }
756
757 /// **Parallel Shard Reconstruction**
758 ///
759 /// Reconstructs a `Vec<T>` by deserializing shards concurrently.
760 ///
761 /// # Safety
762 /// This function uses `unsafe` to write directly into an uninitialized buffer.
763 /// To ensure safety:
764 /// 1. We pre-calculate correct offsets using RLE metadata.
765 /// 2. We cast the buffer pointer to `usize` to safely pass it to Rayon threads.
766 /// 3. **CRITICAL:** We verify that the number of items read from a shard matches
767 /// the RLE expectation (`expected_count`). If not, we abort before writing,
768 /// preventing partial initialization UB.
769 pub fn decode_parallel_collection<T>(&self) -> Result<Vec<T>>
770 where
771 T: ParcodeItem,
772 {
773 let payload = self.read_raw()?;
774
775 if payload.len() < 8 {
776 let mut cursor = std::io::Cursor::new(payload.as_ref());
777 let children = self.children()?;
778 let mut child_iter = children.into_iter();
779 return T::read_slice_from_shard(&mut cursor, &mut child_iter);
780 }
781
782 let total_items = usize::try_from(u64::from_le_bytes(
783 payload
784 .get(0..8)
785 .ok_or_else(|| ParcodeError::Format("Payload too short for header".into()))?
786 .try_into()
787 .map_err(|_| ParcodeError::Format("Failed to read total_items".into()))?,
788 ))
789 .map_err(|_| ParcodeError::Format("total_items exceeds usize range".into()))?;
790
791 let runs_data = payload.get(8..).unwrap_or(&[]);
792 let shard_runs: Vec<ShardRun> =
793 bincode::serde::decode_from_slice(runs_data, bincode::config::standard())
794 .map(|(obj, _)| obj)
795 .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
796
797 // 2. Expand RLE into explicit shard jobs.
798 let mut shard_jobs = Vec::with_capacity(self.child_count as usize);
799 let mut current_shard_idx = 0;
800 let mut current_global_idx: usize = 0;
801
802 for run in shard_runs {
803 let items_per_shard = run.item_count as usize;
804 for _ in 0..run.repeat {
805 if current_global_idx.checked_add(items_per_shard).is_none() {
806 return Err(ParcodeError::Format(
807 "Integer overflow in RLE calculation".into(),
808 ));
809 }
810 shard_jobs.push((current_shard_idx, current_global_idx, items_per_shard));
811 current_shard_idx += 1;
812 current_global_idx += items_per_shard;
813 }
814 }
815
816 if current_global_idx != total_items {
817 return Err(ParcodeError::Format(format!(
818 "Metadata mismatch: Header says {} items, RLE implies {}",
819 total_items, current_global_idx
820 )));
821 }
822
823 if shard_jobs.is_empty() {
824 return Ok(Vec::new());
825 }
826
827 // 3. Allocate uninitialized buffer
828 let mut result_buffer: Vec<MaybeUninit<T>> = Vec::with_capacity(total_items);
829
830 // SAFETY: We create a "hole" in memory. We MUST fill every slot before converting to Vec<T>.
831 #[allow(unsafe_code)]
832 unsafe {
833 result_buffer.set_len(total_items);
834 }
835
836 // 4. Perform parallel stitching
837 // SAFETY: Cast pointer to usize to pass it safely to Rayon threads (usize is Send+Sync).
838 // This is safe because threads write to mathematically disjoint regions (start_idx).
839 let buffer_addr = result_buffer.as_mut_ptr() as usize;
840
841 shard_jobs.into_par_iter().try_for_each(
842 move |(shard_idx, start_idx, expected_count)| -> Result<()> {
843 let shard_node = self.get_child_by_index(shard_idx)?;
844 let payload = shard_node.read_raw()?;
845 let mut cursor = std::io::Cursor::new(payload.as_ref());
846 let children = shard_node.children()?;
847 let mut child_iter = children.into_iter();
848
849 let items: Vec<T> = T::read_slice_from_shard(&mut cursor, &mut child_iter)?;
850 let count = items.len();
851
852 // CRITICAL SAFETY CHECK:
853 // Verify shard integrity before touching the shared buffer.
854 if count != expected_count {
855 return Err(ParcodeError::Format(format!(
856 "Shard integrity error: Expected {} items, found {}",
857 expected_count, count
858 )));
859 }
860
861 if start_idx + count > total_items {
862 return Err(ParcodeError::Format(
863 "Shard items overflowed allocated buffer".into(),
864 ));
865 }
866
867 // Prevent double‑free of the source items
868 let src_items = ManuallyDrop::new(items);
869
870 #[allow(unsafe_code)]
871 unsafe {
872 // Reconstruct pointers
873 let dest_base = buffer_addr as *mut MaybeUninit<T>;
874 let dest_uninit = dest_base.add(start_idx);
875
876 // Cast MaybeUninit<T> -> T (Layout compatible)
877 let dest_ptr = dest_uninit as *mut T;
878 let src_ptr = src_items.as_ptr();
879
880 // Efficient copy
881 std::ptr::copy_nonoverlapping(src_ptr, dest_ptr, count);
882 }
883 Ok(())
884 },
885 )?;
886
887 // 5. Bless the buffer
888 // SAFETY: At this point, try_for_each returned Ok(), so every shard reported
889 // exactly the expected number of items. The buffer is fully initialized.
890 #[allow(unsafe_code)]
891 let final_vec = unsafe {
892 let mut manual_buffer = ManuallyDrop::new(result_buffer);
893 Vec::from_raw_parts(
894 manual_buffer.as_mut_ptr() as *mut T,
895 manual_buffer.len(),
896 manual_buffer.capacity(),
897 )
898 };
899
900 Ok(final_vec)
901 }
902
903 // --- COLLECTION UTILITIES ---
904
905 /// Returns the logical number of items in this container.
906 pub fn len(&self) -> u64 {
907 if let Ok(payload) = self.read_raw()
908 && payload.len() >= 8
909 && let Some(bytes) = payload.get(0..8).and_then(|s| s.try_into().ok())
910 {
911 return u64::from_le_bytes(bytes);
912 }
913 0
914 }
915
916 /// Checks if the container is empty.
917 pub fn is_empty(&self) -> bool {
918 self.len() == 0
919 }
920
921 /// Helper internal: Locates the shard node and local index for a global index.
922 /// Used by `ParcodeCollectionPromise::get_lazy`.
923 pub(crate) fn locate_shard_item(&self, index: usize) -> Result<(Self, usize)> {
924 let payload = self.read_raw()?;
925
926 if payload.len() < 8 {
927 return Err(ParcodeError::Format("Invalid container payload".into()));
928 }
929
930 // Skip total_items (8 bytes)
931 let runs_data = payload.get(8..).unwrap_or(&[]);
932 let shard_runs: Vec<ShardRun> =
933 bincode::serde::decode_from_slice(runs_data, bincode::config::standard())
934 .map(|(obj, _)| obj)
935 .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
936
937 let (shard_idx, local_idx) = self.resolve_rle_index(index, &shard_runs)?;
938
939 let shard = self.get_child_by_index(shard_idx)?;
940 Ok((shard, local_idx))
941 }
942
943 /// Retrieves item at `index` using RLE arithmetic.
944 ///
945 /// This calculates which shard holds the item, loads ONLY that shard,
946 /// and returns the specific item.
947 pub fn get<T: ParcodeItem>(&self, index: usize) -> Result<T> {
948 let payload = self.read_raw()?;
949 if payload.len() < 8 {
950 return Err(ParcodeError::Format("Not a valid container".into()));
951 }
952
953 let runs_data = payload.get(8..).unwrap_or(&[]);
954 let shard_runs: Vec<ShardRun> =
955 bincode::serde::decode_from_slice(runs_data, bincode::config::standard())
956 .map(|(obj, _)| obj)
957 .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
958
959 let (target_shard_idx, index_in_shard) = self.resolve_rle_index(index, &shard_runs)?;
960
961 let shard_node = self.get_child_by_index(target_shard_idx)?;
962
963 // New logic:
964 let payload = shard_node.read_raw()?;
965 let mut cursor = std::io::Cursor::new(payload.as_ref());
966 let children = shard_node.children()?;
967 let mut child_iter = children.into_iter();
968
969 let shard_data: Vec<T> = T::read_slice_from_shard(&mut cursor, &mut child_iter)?;
970
971 shard_data
972 .into_iter()
973 .nth(index_in_shard)
974 .ok_or(ParcodeError::Internal("Shard index mismatch".into()))
975 }
976
977 /// Creates a streaming iterator over the collection.
978 /// Memory usage is constant (size of one shard) regardless of total size.
979 pub fn iter<T: ParcodeItem>(self) -> Result<ChunkIterator<'a, T>> {
980 let payload = self.read_raw()?;
981 if payload.is_empty() && self.child_count == 0 {
982 return Ok(ChunkIterator::empty(self));
983 }
984 if payload.len() < 8 {
985 return Err(ParcodeError::Format("Not a valid container".into()));
986 }
987
988 let total_len = usize::try_from(u64::from_le_bytes(
989 payload
990 .get(0..8)
991 .ok_or_else(|| ParcodeError::Format("Payload too short".into()))?
992 .try_into()
993 .map_err(|_| ParcodeError::Format("Failed to read total_len".into()))?,
994 ))
995 .map_err(|_| ParcodeError::Format("total_len exceeds usize range".into()))?;
996 let runs_data = payload.get(8..).unwrap_or(&[]);
997 let shard_runs: Vec<ShardRun> =
998 bincode::serde::decode_from_slice(runs_data, bincode::config::standard())
999 .map(|(obj, _)| obj)
1000 .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
1001
1002 Ok(ChunkIterator {
1003 container: self,
1004 shard_runs,
1005 total_items: total_len,
1006 current_global_idx: 0,
1007 current_shard_idx: 0,
1008 current_items_in_shard: Vec::new().into_iter(),
1009 _marker: PhantomData,
1010 })
1011 }
1012
1013 // --- INTERNAL HELPERS ---
1014
1015 /// Retrieves a child `ChunkNode` by its index in the footer.
1016 pub fn get_child_by_index(&self, index: usize) -> Result<Self> {
1017 if index >= self.child_count as usize {
1018 return Err(ParcodeError::Format("Child index out of bounds".into()));
1019 }
1020 let footer_start = usize::try_from(self.payload_end_offset)
1021 .map_err(|_| ParcodeError::Format("Offset exceeds usize range".into()))?;
1022 let entry_start = footer_start + (index * ChildRef::SIZE);
1023 let bytes = self
1024 .reader
1025 .mmap
1026 .get(entry_start..entry_start + ChildRef::SIZE)
1027 .ok_or_else(|| ParcodeError::Format("ChildRef index out of bounds".into()))?;
1028
1029 let r = ChildRef::from_bytes(bytes)?;
1030 self.reader.get_chunk(r.offset, r.length)
1031 }
1032
1033 /// Maps a global item index to a specific (`shard_index`, `internal_index`).
1034 fn resolve_rle_index(&self, global_index: usize, runs: &[ShardRun]) -> Result<(usize, usize)> {
1035 let mut current_base = 0;
1036 let mut shard_base = 0;
1037
1038 for run in runs {
1039 let count = run.item_count as usize;
1040 let total_run = count * run.repeat as usize;
1041
1042 if global_index < current_base + total_run {
1043 let offset = global_index - current_base;
1044 // Integer division gives logical shard, modulo gives index inside
1045 return Ok((shard_base + (offset / count), offset % count));
1046 }
1047 current_base += total_run;
1048 shard_base += run.repeat as usize;
1049 }
1050 Err(ParcodeError::Format("Index out of bounds".into()))
1051 }
1052
1053 /// Returns the absolute file offset of this chunk.
1054 pub fn offset(&self) -> u64 {
1055 self.offset
1056 }
1057
1058 /// Returns the total physical length of this chunk.
1059 pub fn length(&self) -> u64 {
1060 self.length
1061 }
1062
1063 /// Returns the number of children.
1064 pub fn child_count(&self) -> u32 {
1065 self.child_count
1066 }
1067
1068 /// Returns the metadata flags.
1069 pub fn meta(&self) -> crate::format::MetaByte {
1070 self.meta
1071 }
1072
1073 /// Calculates the size of the payload (excluding metadata/footer).
1074 pub fn payload_len(&self) -> u64 {
1075 // payload_end_offset is calculated in get_chunk, usually:
1076 // offset + (payload_end - offset)
1077 self.payload_end_offset - self.offset
1078 }
1079}
1080
1081// --- STREAMING ITERATOR ---
1082
1083/// An iterator that loads shards on demand, allowing iteration over datasets
1084/// larger than available RAM.
1085///
1086/// It buffers only one shard at a time.
1087#[derive(Debug)]
1088pub struct ChunkIterator<'a, T> {
1089 container: ChunkNode<'a>,
1090 #[allow(dead_code)]
1091 shard_runs: Vec<ShardRun>, // Reserved for future skip logic
1092 total_items: usize,
1093 current_global_idx: usize,
1094
1095 // State
1096 current_shard_idx: usize,
1097 current_items_in_shard: std::vec::IntoIter<T>,
1098
1099 _marker: PhantomData<T>,
1100}
1101
1102impl<'a, T> ChunkIterator<'a, T> {
1103 fn empty(node: ChunkNode<'a>) -> Self {
1104 Self {
1105 container: node,
1106 shard_runs: vec![],
1107 total_items: 0,
1108 current_global_idx: 0,
1109 current_shard_idx: 0,
1110 current_items_in_shard: Vec::new().into_iter(),
1111 _marker: PhantomData,
1112 }
1113 }
1114}
1115
1116impl<'a, T: ParcodeItem> Iterator for ChunkIterator<'a, T> {
1117 type Item = Result<T>;
1118
1119 fn next(&mut self) -> Option<Self::Item> {
1120 if self.current_global_idx >= self.total_items {
1121 return None;
1122 }
1123
1124 // 1. Try to pull from current loaded shard
1125 if let Some(item) = self.current_items_in_shard.next() {
1126 self.current_global_idx += 1;
1127 return Some(Ok(item));
1128 }
1129
1130 // 2. Buffer empty? Load next shard
1131 if self.current_shard_idx >= self.container.child_count as usize {
1132 return Some(Err(ParcodeError::Internal(
1133 "Iterator mismatch: runs out of shards".into(),
1134 )));
1135 }
1136
1137 // Load logic
1138 let next_shard_res = self
1139 .container
1140 .get_child_by_index(self.current_shard_idx)
1141 .and_then(|node| {
1142 // New logic:
1143 let payload = node.read_raw()?;
1144 let mut cursor = std::io::Cursor::new(payload.as_ref());
1145 let children = node.children()?;
1146 let mut child_iter = children.into_iter();
1147 T::read_slice_from_shard(&mut cursor, &mut child_iter)
1148 });
1149
1150 match next_shard_res {
1151 Ok(items) => {
1152 self.current_items_in_shard = items.into_iter();
1153 self.current_shard_idx += 1;
1154 // Recursively call next to yield the first item of the new shard
1155 self.next()
1156 }
1157 Err(e) => Some(Err(e)),
1158 }
1159 }
1160}