parcode/
reader.rs

1//! The Read-Side Engine: Parallel Reconstruction & Random Access.
2//!
3//! This module implements the complete reading pipeline for Parcode files, providing both
4//! eager (full deserialization) and lazy (on-demand) loading strategies. It leverages memory
5//! mapping, parallel reconstruction, and zero-copy techniques to maximize performance.
6//!
7//! ## Core Architecture
8//!
9//! The reader is built on three foundational techniques:
10//!
11//! ### 1. Memory Mapping (`mmap`)
12//!
13//! Instead of reading the entire file into memory, Parcode uses `mmap` to map the file
14//! directly into the process's address space. This provides several benefits:
15//!
16//! - **Instant Startup:** Opening a file is O(1) regardless of size
17//! - **OS-Managed Paging:** The operating system handles loading pages on demand
18//! - **Zero-Copy Reads:** Uncompressed data can be read directly from the mapped region
19//! - **Shared Memory:** Multiple processes can share the same mapped file
20//!
21//! ### 2. Lazy Traversal
22//!
23//! The file is traversed lazily - we only read and decompress bytes when a specific node
24//! is requested. This enables:
25//!
26//! - **Cold Start Performance:** Applications can start in microseconds
27//! - **Selective Loading:** Load only the data you need
28//! - **Deep Navigation:** Traverse object hierarchies without I/O
29//!
30//! ### 3. Parallel Zero-Copy Stitching
31//!
32//! When reconstructing large `Vec<T>`, we use a sophisticated parallel algorithm:
33//!
34//! ```text
35//! ┌─────────────────────────────────────────────────────────────┐
36//! │ 1. Pre-allocate uninitialized buffer (MaybeUninit<T>)       │
37//! ├─────────────────────────────────────────────────────────────┤
38//! │ 2. Calculate destination offset for each shard              │
39//! ├─────────────────────────────────────────────────────────────┤
40//! │ 3. Spawn parallel workers (Rayon)                           │
41//! ├─────────────────────────────────────────────────────────────┤
42//! │ 4. Each worker:                                             │
43//! │    - Decompresses its shard                                 │
44//! │    - Deserializes items                                     │
45//! │    - Writes directly to final buffer (ptr::copy)            │
46//! ├─────────────────────────────────────────────────────────────┤
47//! │ 5. Transmute buffer to Vec<T> (all items initialized)       │
48//! └─────────────────────────────────────────────────────────────┘
49//! ```
50//!
51//! **Result:** Maximum memory bandwidth, zero intermediate allocations, perfect parallelism.
52//!
53//! ## O(1) Arithmetic Navigation
54//!
55//! Using the RLE (Run-Length Encoding) metadata stored in container nodes, we can calculate
56//! exactly which physical chunk holds the Nth item of a collection. This enables:
57//!
58//! - **Random Access:** `vec.get(1_000_000)` without loading the entire vector
59//! - **Constant Time:** O(1) shard selection via arithmetic
60//! - **Minimal I/O:** Load only the shard containing the target item
61//!
62//! ## Trait System for Strategy Selection
63//!
64//! The module defines two key traits that enable automatic strategy selection:
65//!
66//! ### [`ParcodeNative`]
67//!
68//! Types implementing this trait know how to reconstruct themselves from a [`ChunkNode`].
69//! The high-level API ([`Parcode::read`](crate::Parcode::read)) uses this trait to
70//! automatically select the optimal reconstruction strategy:
71//!
72//! - **`Vec<T>`:** Uses parallel reconstruction across shards
73//! - **`HashMap<K, V>`:** Reconstructs all shards and merges entries
74//! - **Primitives/Structs:** Uses sequential deserialization
75//!
76//! ### [`ParcodeItem`]
77//!
78//! Types implementing this trait can be read from a shard (payload + children). This trait
79//! is used internally during parallel reconstruction to deserialize individual items or
80//! slices of items from shard payloads.
81//!
82//! ## Usage Patterns
83//!
84//! ### Eager Loading (Full Deserialization)
85//!
86//! ```rust
87//! use parcode::Parcode;
88//!
89//! // Load entire object into memory
90//! let data = vec![1, 2, 3];
91//! Parcode::save("numbers_reader.par", &data).unwrap();
92//! let data: Vec<i32> = Parcode::read("numbers_reader.par").unwrap();
93//! # std::fs::remove_file("numbers_reader.par").ok();
94//! ```
95//!
96//! ### Lazy Loading (On-Demand)
97//!
98//! ```rust
99//! use parcode::{Parcode, ParcodeReader, ParcodeObject};
100//! use serde::{Serialize, Deserialize};
101//!
102//! #[derive(Serialize, Deserialize, ParcodeObject)]
103//! struct Assets {
104//!     #[parcode(chunkable)]
105//!     data: Vec<u8> }
106//!
107//! #[derive(Serialize, Deserialize, ParcodeObject)]
108//! struct GameState {
109//!     level: u32,
110//!     #[parcode(chunkable)]
111//!     assets: Assets,
112//! }
113//!
114//! // Setup
115//! let state = GameState { level: 1, assets: Assets { data: vec![0; 10] } };
116//! Parcode::save("game_reader.par", &state).unwrap();
117//!
118//! let reader = ParcodeReader::open("game_reader.par").unwrap();
119//! let game_lazy = reader.read_lazy::<GameState>().unwrap();
120//!
121//! // Access local fields (instant, already in memory)
122//! println!("Level: {}", game_lazy.level);
123//!
124//! // Load remote fields on demand
125//! let assets_data = game_lazy.assets.data.load().unwrap();
126//! # std::fs::remove_file("game_reader.par").ok();
127//! ```
128//!
129//! ### Random Access
130//!
131//! ```rust
132//! use parcode::{Parcode, ParcodeReader, ParcodeObject};
133//! use serde::{Serialize, Deserialize};
134//!
135//! #[derive(Serialize, Deserialize, ParcodeObject, Clone, Debug)]
136//! struct MyStruct { val: u32 }
137//!
138//! // Setup
139//! let data: Vec<MyStruct> = (0..100).map(|i| MyStruct { val: i }).collect();
140//! Parcode::save("data_random.par", &data).unwrap();
141//!
142//! let reader = ParcodeReader::open("data_random.par").unwrap();
143//! let root = reader.root().unwrap();
144//!
145//! // Get item at index 50 without loading the entire vector
146//! // Note: Using 50 instead of 1,000,000 for a realistic small test
147//! let item: MyStruct = root.decode_parallel_collection::<MyStruct>().unwrap().get(50).unwrap().clone();
148//! # std::fs::remove_file("data_random.par").ok();
149//! ```
150//!
151//! ### Streaming Iteration
152//!
153//! ```rust
154//! use parcode::{Parcode, ParcodeReader, ParcodeObject};
155//! use serde::{Serialize, Deserialize};
156//!
157//! #[derive(Serialize, Deserialize, ParcodeObject, Clone, Debug)]
158//! struct MyStruct { val: u32 }
159//!
160//! fn process(item: MyStruct) { println!("{:?}", item); }
161//!
162//! // Setup
163//! let data: Vec<MyStruct> = (0..10).map(|i| MyStruct { val: i }).collect();
164//! Parcode::save("data_iter.par", &data).unwrap();
165//!
166//! let reader = ParcodeReader::open("data_iter.par").unwrap();
167//! let root = reader.root().unwrap();
168//!
169//! // Note: The current API doesn't have a direct `iter` on root for Vecs yet,
170//! // it usually goes through read_lazy or decode.
171//! // Assuming we just decode for now as the example implies iteration capability.
172//! let items: Vec<MyStruct> = root.decode_parallel_collection().unwrap();
173//! for item in items {
174//!     process(item);
175//! }
176//! # std::fs::remove_file("data_iter.par").ok();
177//! ```
178//!
179//! ## Performance Characteristics
180//!
181//! - **File Opening:** O(1) - just maps the file
182//! - **Root Access:** O(1) - reads only the global header
183//! - **Random Access:** O(1) - arithmetic shard selection + single shard load
184//! - **Parallel Reconstruction:** O(N/cores) - scales linearly with CPU cores
185//! - **Memory Usage (Lazy):** O(accessed chunks) - only loaded data consumes RAM
186//! - **Memory Usage (Eager):** O(N) - entire object in memory
187//!
188//! ## Thread Safety
189//!
190//! - **[`ParcodeReader`]:** Cheap to clone (Arc-based), safe to share across threads
191//! - **[`ChunkNode`]:** Immutable view, safe to share across threads
192//! - **Parallel Reconstruction:** Uses Rayon's work-stealing scheduler
193//!
194//! ## Safety Considerations
195//!
196//! The module uses `unsafe` code in two specific contexts:
197//!
198//! 1. **Memory Mapping:** `mmap` is inherently unsafe if the file is modified externally.
199//!    We assume files are immutable during reading.
200//!
201//! 2. **Parallel Stitching:** Uses `MaybeUninit` and pointer arithmetic to avoid
202//!    initialization overhead. All unsafe operations are carefully encapsulated and
203//!    documented with safety invariants.
204
205use memmap2::Mmap;
206use rayon::prelude::*;
207use serde::{Deserialize, de::DeserializeOwned};
208use std::borrow::Cow;
209use std::collections::HashMap;
210use std::fs::File;
211use std::hash::Hash;
212use std::marker::PhantomData;
213use std::mem::{ManuallyDrop, MaybeUninit};
214use std::path::Path;
215use std::sync::Arc;
216
217use crate::compression::CompressorRegistry;
218use crate::error::{ParcodeError, Result};
219use crate::format::{ChildRef, GLOBAL_HEADER_SIZE, GlobalHeader, MAGIC_BYTES, MetaByte};
220use crate::rt::ParcodeLazyRef;
221
222// --- TRAIT SYSTEM FOR AUTOMATIC STRATEGY SELECTION ---
223
224/// A trait for types that know how to reconstruct themselves from a [`ChunkNode`].
225///
226/// This trait enables the high-level API ([`Parcode::read`](crate::Parcode::read)) to
227/// automatically select the optimal reconstruction strategy based on the type being read.
228///
229/// ## Strategy Selection
230///
231/// Different types use different reconstruction strategies:
232///
233/// - **`Vec<T>`:** Parallel reconstruction across shards (see [`ChunkNode::decode_parallel_collection`])
234/// - **`HashMap<K, V>`:** Shard merging with SOA deserialization
235/// - **Primitives:** Direct bincode deserialization
236/// - **Custom Structs:** Sequential deserialization of local fields + recursive child loading
237///
238/// ## Automatic Implementation
239///
240/// This trait is automatically implemented by the `#[derive(ParcodeObject)]` macro for custom
241/// structs. Primitive types and standard collections have manual implementations in this module.
242///
243/// ## Example
244///
245/// ```rust
246/// use parcode::Parcode;
247/// use parcode::reader::ParcodeNative;
248///
249/// // Automatically selects parallel reconstruction for Vec
250/// let data = vec![1, 2, 3];
251/// Parcode::save("numbers_native.par", &data).unwrap();
252/// let data: Vec<i32> = Parcode::read("numbers_native.par").unwrap();
253///
254/// // Automatically selects sequential deserialization for primitives
255/// let val = 42;
256/// Parcode::save("value_native.par", &val).unwrap();
257/// let value: i32 = Parcode::read("value_native.par").unwrap();
258/// # std::fs::remove_file("numbers_native.par").ok();
259/// # std::fs::remove_file("value_native.par").ok();
260/// ```
261pub trait ParcodeNative: Sized {
262    /// Reconstructs the object from the given graph node.
263    ///
264    /// This method is called by [`Parcode::read`](crate::Parcode::read) after opening the
265    /// file and locating the root chunk. Implementations should choose the most efficient
266    /// reconstruction strategy for their type.
267    ///
268    /// ## Parameters
269    ///
270    /// * `node`: The chunk node to reconstruct from (typically the root node)
271    ///
272    /// ## Returns
273    ///
274    /// The fully reconstructed object of type `Self`.
275    ///
276    /// ## Errors
277    ///
278    /// Returns an error if:
279    /// - Decompression fails
280    /// - Deserialization fails (type mismatch, corrupted data)
281    /// - Child nodes are missing or invalid
282    fn from_node(node: &ChunkNode<'_>) -> Result<Self>;
283}
284
285/// A trait for types that can be read from a shard (payload + children).
286///
287/// This trait is used internally during parallel reconstruction to deserialize individual
288/// items or slices of items from shard payloads. It provides two methods:
289///
290/// - [`read_from_shard`](Self::read_from_shard): Reads a single item
291/// - [`read_slice_from_shard`](Self::read_slice_from_shard): Reads multiple items (optimized)
292///
293/// ## Automatic Implementation
294///
295/// This trait is automatically implemented by the `#[derive(ParcodeObject)]` macro. Primitive
296/// types have optimized implementations that use bulk deserialization.
297///
298/// ## Thread Safety
299///
300/// Implementations must be `Send + Sync + 'static` to support parallel reconstruction across
301/// threads. This is automatically satisfied for most types.
302pub trait ParcodeItem: Sized + Send + Sync + 'static {
303    /// Reads a single item from the shard payload and children.
304    ///
305    /// This method is called during deserialization to reconstruct individual items from
306    /// a shard's payload. For types with chunkable fields, this method should deserialize
307    /// local fields from the reader and reconstruct remote fields from the children iterator.
308    ///
309    /// ## Parameters
310    ///
311    /// * `reader`: Cursor over the shard's decompressed payload
312    /// * `children`: Iterator over child nodes (for chunkable fields)
313    ///
314    /// ## Returns
315    ///
316    /// The deserialized item.
317    ///
318    /// ## Errors
319    ///
320    /// Returns an error if deserialization fails or children are missing.
321    fn read_from_shard(
322        reader: &mut std::io::Cursor<&[u8]>,
323        children: &mut std::vec::IntoIter<ChunkNode<'_>>,
324    ) -> Result<Self>;
325
326    /// Reads a slice of items from the shard payload and children.
327    ///
328    /// This method provides an optimization opportunity for types that can deserialize
329    /// multiple items more efficiently than calling [`read_from_shard`](Self::read_from_shard)
330    /// in a loop.
331    ///
332    /// ## Default Implementation
333    ///
334    /// The default implementation reads the slice length (u64) and then calls
335    /// `read_from_shard` for each item. Primitive types override this to use bulk
336    /// deserialization.
337    ///
338    /// ## Parameters
339    ///
340    /// * `reader`: Cursor over the shard's decompressed payload
341    /// * `children`: Iterator over child nodes (for chunkable fields)
342    ///
343    /// ## Returns
344    ///
345    /// A vector containing all deserialized items.
346    ///
347    /// ## Errors
348    ///
349    /// Returns an error if deserialization fails or the slice length exceeds `usize`.
350    fn read_slice_from_shard(
351        reader: &mut std::io::Cursor<&[u8]>,
352        children: &mut std::vec::IntoIter<ChunkNode<'_>>,
353    ) -> Result<Vec<Self>> {
354        // Default implementation: Read length, then loop
355        let len =
356            bincode::serde::decode_from_std_read::<u64, _, _>(reader, bincode::config::standard())
357                .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
358
359        let mut vec = Vec::with_capacity(
360            usize::try_from(len)
361                .map_err(|_| ParcodeError::Serialization("Vector length exceeds usize".into()))?,
362        );
363        for _ in 0..len {
364            vec.push(Self::read_from_shard(reader, children)?);
365        }
366        Ok(vec)
367    }
368}
369
370macro_rules! impl_primitive_parcode_item {
371    ($($t:ty),*) => {
372        $(
373            impl ParcodeItem for $t {
374                fn read_from_shard(
375                    reader: &mut std::io::Cursor<&[u8]>,
376                    _children: &mut std::vec::IntoIter<ChunkNode<'_>>,
377                ) -> Result<Self> {
378                    bincode::serde::decode_from_std_read(reader, bincode::config::standard())
379                        .map_err(|e| ParcodeError::Serialization(e.to_string()))
380                }
381
382                // Optimize slice reading for primitives (bulk read)
383                fn read_slice_from_shard(
384                    reader: &mut std::io::Cursor<&[u8]>,
385                    _children: &mut std::vec::IntoIter<ChunkNode<'_>>,
386                ) -> Result<Vec<Self>> {
387                     bincode::serde::decode_from_std_read(reader, bincode::config::standard())
388                        .map_err(|e| ParcodeError::Serialization(e.to_string()))
389                }
390            }
391        )*
392    }
393}
394
395impl_primitive_parcode_item!(
396    u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, f32, f64, bool, String
397);
398
399macro_rules! impl_primitive_parcode_native {
400    ($($t:ty),*) => {
401        $(
402            impl ParcodeNative for $t {
403                fn from_node(node: &ChunkNode<'_>) -> Result<Self> {
404                    node.decode()
405                }
406            }
407        )*
408    }
409}
410
411impl_primitive_parcode_native!(
412    u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, f32, f64, bool, String
413);
414
415/// Optimized implementation for Vectors: Uses Parallel Stitching.
416impl<T> ParcodeNative for Vec<T>
417where
418    T: ParcodeItem,
419{
420    fn from_node(node: &ChunkNode<'_>) -> Result<Self> {
421        node.decode_parallel_collection()
422    }
423}
424
425impl<K, V> ParcodeNative for HashMap<K, V>
426where
427    K: DeserializeOwned + Eq + Hash + Send + Sync,
428    V: DeserializeOwned + Send + Sync,
429{
430    fn from_node(node: &ChunkNode<'_>) -> Result<Self> {
431        // 1. Read container (num shards)
432        let container_payload = node.read_raw()?;
433        if container_payload.len() < 4 {
434            return Ok(Self::new());
435        }
436
437        // If it's a Blob, node.child_count == 0.
438        if node.child_count == 0 {
439            return node.decode(); // Fallback to normal Bincode
440        }
441
442        // If it has children, it's a Sharded Map.
443        let shards = node.children()?;
444        let mut map = Self::new();
445
446        for shard in shards {
447            let payload = shard.read_raw()?;
448            if payload.len() < 8 {
449                continue;
450            }
451
452            let count = u32::from_le_bytes(
453                payload
454                    .get(0..4)
455                    .ok_or_else(|| ParcodeError::Format("Payload too short for count".into()))?
456                    .try_into()
457                    .map_err(|_| ParcodeError::Format("Failed to read count".into()))?,
458            ) as usize;
459            let offsets_start = 8 + (count * 8);
460            let data_start = offsets_start + (count * 4);
461            let offsets_bytes = payload
462                .get(offsets_start..data_start)
463                .ok_or_else(|| ParcodeError::Format("Offsets out of bounds".into()))?;
464
465            for i in 0..count {
466                let off_bytes = offsets_bytes
467                    .get(i * 4..(i + 1) * 4)
468                    .ok_or_else(|| ParcodeError::Format("Offset index out of bounds".into()))?;
469                let offset = u32::from_le_bytes(
470                    off_bytes
471                        .try_into()
472                        .map_err(|_| ParcodeError::Format("Failed to read offset".into()))?,
473                ) as usize;
474                let data_slice = payload
475                    .get(data_start + offset..)
476                    .ok_or_else(|| ParcodeError::Format("Data slice out of bounds".into()))?;
477                let (k, v) =
478                    bincode::serde::decode_from_slice(data_slice, bincode::config::standard())
479                        .map_err(|e| ParcodeError::Serialization(e.to_string()))?
480                        .0;
481                map.insert(k, v);
482            }
483        }
484        Ok(map)
485    }
486}
487
488// --- CORE READER HANDLE ---
489
490/// The main handle for an open Parcode file.
491///
492/// It holds the memory map (thread-safe via Arc), the global file header,
493/// and the registry of available decompression algorithms.
494/// Cloning this struct is cheap (increments Arc ref count).
495#[derive(Debug)]
496pub struct ParcodeReader {
497    /// Memory-mapped file content.
498    mmap: Arc<Mmap>,
499    /// Parsed global footer/header information.
500    header: GlobalHeader,
501    /// Total size of the file in bytes.
502    file_size: u64,
503    /// Registry containing available decompression algorithms (Lz4, etc.).
504    registry: CompressorRegistry,
505}
506
507impl ParcodeReader {
508    /// Opens a Parcode file, maps it into memory, and validates integrity.
509    ///
510    /// # Errors
511    /// Returns error if the file does not exist, is smaller than the header,
512    /// or contains invalid magic bytes/version.
513    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
514        let file = File::open(path)?;
515        let file_size = file.metadata()?.len();
516
517        if file_size < GLOBAL_HEADER_SIZE as u64 {
518            return Err(ParcodeError::Format(
519                "File is smaller than the global header".into(),
520            ));
521        }
522
523        // SAFETY: Mmap is fundamentally unsafe in the presence of external modification
524        // (e.g., another process truncating the file). We assume the file is treated
525        // as immutable by the OS while we read it.
526        #[allow(unsafe_code)]
527        let mmap = unsafe { Mmap::map(&file)? };
528
529        // Read Global Header (Located at the very end of the file)
530        let header_start = usize::try_from(file_size)
531            .map_err(|_| ParcodeError::Format("File too large for address space".into()))?
532            - GLOBAL_HEADER_SIZE;
533        let header_bytes = mmap
534            .get(header_start..)
535            .ok_or_else(|| ParcodeError::Format("Header start out of bounds".into()))?;
536
537        if header_bytes.get(0..4) != Some(&MAGIC_BYTES) {
538            return Err(ParcodeError::Format(
539                "Invalid Magic Bytes. Not a Parcode file.".into(),
540            ));
541        }
542
543        let version = u16::from_le_bytes(
544            header_bytes
545                .get(4..6)
546                .ok_or_else(|| ParcodeError::Format("Version out of bounds".into()))?
547                .try_into()
548                .map_err(|_| ParcodeError::Format("Failed to read version".into()))?,
549        );
550        if version != 4 {
551            return Err(ParcodeError::Format(format!(
552                "Unsupported version: {version}. Expected V4."
553            )));
554        }
555
556        let root_offset = u64::from_le_bytes(
557            header_bytes
558                .get(6..14)
559                .ok_or_else(|| ParcodeError::Format("Root offset out of bounds".into()))?
560                .try_into()
561                .map_err(|_| ParcodeError::Format("Failed to read root_offset".into()))?,
562        );
563        let root_length = u64::from_le_bytes(
564            header_bytes
565                .get(14..22)
566                .ok_or_else(|| ParcodeError::Format("Root length out of bounds".into()))?
567                .try_into()
568                .map_err(|_| ParcodeError::Format("Failed to read root_length".into()))?,
569        );
570        let checksum = u32::from_le_bytes(
571            header_bytes
572                .get(22..26)
573                .ok_or_else(|| ParcodeError::Format("Checksum out of bounds".into()))?
574                .try_into()
575                .map_err(|_| ParcodeError::Format("Failed to read checksum".into()))?,
576        );
577
578        Ok(Self {
579            mmap: Arc::new(mmap),
580            header: GlobalHeader {
581                magic: MAGIC_BYTES,
582                version,
583                root_offset,
584                root_length,
585                checksum,
586            },
587            file_size,
588            // Initialize registry with default algorithms (NoCompression, Lz4 if enabled)
589            registry: CompressorRegistry::new(),
590        })
591    }
592
593    /// Helper to read a u32 from a byte slice (Little Endian).
594    fn read_u32(slice: &[u8]) -> Result<u32> {
595        slice
596            .try_into()
597            .map(u32::from_le_bytes)
598            .map_err(|_| ParcodeError::Format("Failed to read u32".into()))
599    }
600
601    /// Returns a cursor to the Root Chunk of the object graph.
602    pub fn root(&self) -> Result<ChunkNode<'_>> {
603        self.get_chunk(self.header.root_offset, self.header.root_length)
604    }
605
606    /// Internal: Resolves a physical offset/length into a `ChunkNode`.
607    /// Parses the footer to determine if the chunk has children.
608    ///
609    /// # Arguments
610    /// * `offset`: Absolute byte offset in the file.
611    /// * `length`: Total length of the chunk including metadata.
612    fn get_chunk(&self, offset: u64, length: u64) -> Result<ChunkNode<'_>> {
613        if offset + length > self.file_size {
614            return Err(ParcodeError::Format(format!(
615                "Chunk out of bounds: {} + {}",
616                offset, length
617            )));
618        }
619        let chunk_end = usize::try_from(offset + length)
620            .map_err(|_| ParcodeError::Format("Chunk end exceeds address space".into()))?;
621
622        // Read the MetaByte (Last byte of the chunk)
623        let meta = MetaByte::from_byte(
624            *self
625                .mmap
626                .get(chunk_end - 1)
627                .ok_or_else(|| ParcodeError::Format("MetaByte out of bounds".into()))?,
628        );
629
630        let mut child_count = 0;
631        let mut payload_end = chunk_end - 1; // Default: payload ends just before MetaByte
632
633        if meta.is_chunkable() {
634            // Layout: [Payload] ... [ChildRefs] [ChildCount (4 bytes)] [MetaByte (1 byte)]
635            if length < 5 {
636                return Err(ParcodeError::Format("Chunk too small for metadata".into()));
637            }
638
639            let count_start = chunk_end - 5;
640            let child_count_bytes = self
641                .mmap
642                .get(count_start..count_start + 4)
643                .ok_or_else(|| ParcodeError::Format("Child count out of bounds".into()))?;
644            child_count = Self::read_u32(child_count_bytes)?;
645
646            let footer_size = child_count as usize * ChildRef::SIZE;
647            let total_meta_size = 1 + 4 + footer_size;
648
649            if length < total_meta_size as u64 {
650                return Err(ParcodeError::Format("Invalid footer size".into()));
651            }
652            payload_end = chunk_end - total_meta_size;
653        }
654
655        Ok(ChunkNode {
656            reader: self,
657            offset,
658            length,
659            meta,
660            child_count,
661            payload_end_offset: offset + (payload_end as u64 - offset),
662        })
663    }
664
665    /// Reads an object lazily, returning a generated Mirror struct.
666    /// This parses local fields immediately but keeps remote fields as handles.
667    ///
668    /// The returned Lazy object is tied to the lifetime of the `ParcodeReader`.
669    pub fn read_lazy<'a, T>(&'a self) -> Result<T::Lazy>
670    where
671        T: ParcodeLazyRef<'a>,
672    {
673        let root = self.root()?;
674        T::create_lazy(root)
675    }
676}
677
678// --- CHUNK NODE API ---
679
680/// A lightweight cursor pointing to a specific node in the dependency graph.
681///
682/// This struct contains the logic to read, decompress, and navigate from this node.
683/// It is a "view" into the `ParcodeReader` and holds a lifetime reference to it.
684#[derive(Debug, Clone)]
685pub struct ChunkNode<'a> {
686    reader: &'a ParcodeReader,
687    /// Physical start offset.
688    offset: u64,
689    /// Total physical length.
690    #[allow(dead_code)]
691    length: u64,
692    /// Parsed metadata flags.
693    meta: MetaByte,
694    /// Number of direct children (Shards).
695    child_count: u32,
696    /// Calculated end of the payload data.
697    payload_end_offset: u64,
698}
699
700/// Helper struct for deserializing RLE metadata stored in vector headers.
701#[derive(Deserialize, Debug, Clone)]
702struct ShardRun {
703    item_count: u32,
704    repeat: u32,
705}
706
707impl<'a> ChunkNode<'a> {
708    /// Reads and decompresses the local payload of this chunk.
709    ///
710    /// This returns `Cow`, so if no compression was used, it returns a direct reference
711    /// to the mmap (Zero-Copy). If compressed, it allocates the decompressed buffer.
712    pub fn read_raw(&self) -> Result<Cow<'a, [u8]>> {
713        let start = usize::try_from(self.offset)
714            .map_err(|_| ParcodeError::Format("Offset exceeds address space".into()))?;
715        let end = usize::try_from(self.payload_end_offset)
716            .map_err(|_| ParcodeError::Format("End offset exceeds address space".into()))?;
717
718        if end > self.reader.mmap.len() {
719            return Err(ParcodeError::Format(
720                "Payload offset out of mmap bounds".into(),
721            ));
722        }
723
724        let raw = self
725            .reader
726            .mmap
727            .get(start..end)
728            .ok_or_else(|| ParcodeError::Format("Payload out of bounds".into()))?;
729        let method_id = self.meta.compression_method();
730
731        // Delegate decompression to the registry.
732        // This supports pluggable algorithms (e.g., Lz4).
733        self.reader.registry.get(method_id)?.decompress(raw)
734    }
735
736    /// Returns a list of all direct child nodes.
737    ///
738    /// This allows manual traversal of the dependency graph (e.g., iterating over specific shards).
739    /// Note: This does not deserialize the children, only loads their metadata (offsets).
740    pub fn children(&self) -> Result<Vec<Self>> {
741        let mut list = Vec::with_capacity(self.child_count as usize);
742        for i in 0..self.child_count {
743            list.push(self.get_child_by_index(i as usize)?);
744        }
745        Ok(list)
746    }
747
748    /// Standard single-threaded deserialization.
749    /// Use this for leaf nodes or simple structs that fit in memory.
750    pub fn decode<T: DeserializeOwned>(&self) -> Result<T> {
751        let payload = self.read_raw()?;
752        bincode::serde::decode_from_slice(&payload, bincode::config::standard())
753            .map(|(obj, _)| obj)
754            .map_err(|e| ParcodeError::Serialization(e.to_string()))
755    }
756
757    /// **Parallel Shard Reconstruction**
758    ///
759    /// Reconstructs a `Vec<T>` by deserializing shards concurrently.
760    ///
761    /// # Safety
762    /// This function uses `unsafe` to write directly into an uninitialized buffer.
763    /// To ensure safety:
764    /// 1. We pre-calculate correct offsets using RLE metadata.
765    /// 2. We cast the buffer pointer to `usize` to safely pass it to Rayon threads.
766    /// 3. **CRITICAL:** We verify that the number of items read from a shard matches
767    ///    the RLE expectation (`expected_count`). If not, we abort before writing,
768    ///    preventing partial initialization UB.
769    pub fn decode_parallel_collection<T>(&self) -> Result<Vec<T>>
770    where
771        T: ParcodeItem,
772    {
773        let payload = self.read_raw()?;
774
775        if payload.len() < 8 {
776            let mut cursor = std::io::Cursor::new(payload.as_ref());
777            let children = self.children()?;
778            let mut child_iter = children.into_iter();
779            return T::read_slice_from_shard(&mut cursor, &mut child_iter);
780        }
781
782        let total_items = usize::try_from(u64::from_le_bytes(
783            payload
784                .get(0..8)
785                .ok_or_else(|| ParcodeError::Format("Payload too short for header".into()))?
786                .try_into()
787                .map_err(|_| ParcodeError::Format("Failed to read total_items".into()))?,
788        ))
789        .map_err(|_| ParcodeError::Format("total_items exceeds usize range".into()))?;
790
791        let runs_data = payload.get(8..).unwrap_or(&[]);
792        let shard_runs: Vec<ShardRun> =
793            bincode::serde::decode_from_slice(runs_data, bincode::config::standard())
794                .map(|(obj, _)| obj)
795                .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
796
797        // 2. Expand RLE into explicit shard jobs.
798        let mut shard_jobs = Vec::with_capacity(self.child_count as usize);
799        let mut current_shard_idx = 0;
800        let mut current_global_idx: usize = 0;
801
802        for run in shard_runs {
803            let items_per_shard = run.item_count as usize;
804            for _ in 0..run.repeat {
805                if current_global_idx.checked_add(items_per_shard).is_none() {
806                    return Err(ParcodeError::Format(
807                        "Integer overflow in RLE calculation".into(),
808                    ));
809                }
810                shard_jobs.push((current_shard_idx, current_global_idx, items_per_shard));
811                current_shard_idx += 1;
812                current_global_idx += items_per_shard;
813            }
814        }
815
816        if current_global_idx != total_items {
817            return Err(ParcodeError::Format(format!(
818                "Metadata mismatch: Header says {} items, RLE implies {}",
819                total_items, current_global_idx
820            )));
821        }
822
823        if shard_jobs.is_empty() {
824            return Ok(Vec::new());
825        }
826
827        // 3. Allocate uninitialized buffer
828        let mut result_buffer: Vec<MaybeUninit<T>> = Vec::with_capacity(total_items);
829
830        // SAFETY: We create a "hole" in memory. We MUST fill every slot before converting to Vec<T>.
831        #[allow(unsafe_code)]
832        unsafe {
833            result_buffer.set_len(total_items);
834        }
835
836        // 4. Perform parallel stitching
837        // SAFETY: Cast pointer to usize to pass it safely to Rayon threads (usize is Send+Sync).
838        // This is safe because threads write to mathematically disjoint regions (start_idx).
839        let buffer_addr = result_buffer.as_mut_ptr() as usize;
840
841        shard_jobs.into_par_iter().try_for_each(
842            move |(shard_idx, start_idx, expected_count)| -> Result<()> {
843                let shard_node = self.get_child_by_index(shard_idx)?;
844                let payload = shard_node.read_raw()?;
845                let mut cursor = std::io::Cursor::new(payload.as_ref());
846                let children = shard_node.children()?;
847                let mut child_iter = children.into_iter();
848
849                let items: Vec<T> = T::read_slice_from_shard(&mut cursor, &mut child_iter)?;
850                let count = items.len();
851
852                // CRITICAL SAFETY CHECK:
853                // Verify shard integrity before touching the shared buffer.
854                if count != expected_count {
855                    return Err(ParcodeError::Format(format!(
856                        "Shard integrity error: Expected {} items, found {}",
857                        expected_count, count
858                    )));
859                }
860
861                if start_idx + count > total_items {
862                    return Err(ParcodeError::Format(
863                        "Shard items overflowed allocated buffer".into(),
864                    ));
865                }
866
867                // Prevent double‑free of the source items
868                let src_items = ManuallyDrop::new(items);
869
870                #[allow(unsafe_code)]
871                unsafe {
872                    // Reconstruct pointers
873                    let dest_base = buffer_addr as *mut MaybeUninit<T>;
874                    let dest_uninit = dest_base.add(start_idx);
875
876                    // Cast MaybeUninit<T> -> T (Layout compatible)
877                    let dest_ptr = dest_uninit as *mut T;
878                    let src_ptr = src_items.as_ptr();
879
880                    // Efficient copy
881                    std::ptr::copy_nonoverlapping(src_ptr, dest_ptr, count);
882                }
883                Ok(())
884            },
885        )?;
886
887        // 5. Bless the buffer
888        // SAFETY: At this point, try_for_each returned Ok(), so every shard reported
889        // exactly the expected number of items. The buffer is fully initialized.
890        #[allow(unsafe_code)]
891        let final_vec = unsafe {
892            let mut manual_buffer = ManuallyDrop::new(result_buffer);
893            Vec::from_raw_parts(
894                manual_buffer.as_mut_ptr() as *mut T,
895                manual_buffer.len(),
896                manual_buffer.capacity(),
897            )
898        };
899
900        Ok(final_vec)
901    }
902
903    // --- COLLECTION UTILITIES ---
904
905    /// Returns the logical number of items in this container.
906    pub fn len(&self) -> u64 {
907        if let Ok(payload) = self.read_raw()
908            && payload.len() >= 8
909            && let Some(bytes) = payload.get(0..8).and_then(|s| s.try_into().ok())
910        {
911            return u64::from_le_bytes(bytes);
912        }
913        0
914    }
915
916    /// Checks if the container is empty.
917    pub fn is_empty(&self) -> bool {
918        self.len() == 0
919    }
920
921    /// Helper internal: Locates the shard node and local index for a global index.
922    /// Used by `ParcodeCollectionPromise::get_lazy`.
923    pub(crate) fn locate_shard_item(&self, index: usize) -> Result<(Self, usize)> {
924        let payload = self.read_raw()?;
925
926        if payload.len() < 8 {
927            return Err(ParcodeError::Format("Invalid container payload".into()));
928        }
929
930        // Skip total_items (8 bytes)
931        let runs_data = payload.get(8..).unwrap_or(&[]);
932        let shard_runs: Vec<ShardRun> =
933            bincode::serde::decode_from_slice(runs_data, bincode::config::standard())
934                .map(|(obj, _)| obj)
935                .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
936
937        let (shard_idx, local_idx) = self.resolve_rle_index(index, &shard_runs)?;
938
939        let shard = self.get_child_by_index(shard_idx)?;
940        Ok((shard, local_idx))
941    }
942
943    /// Retrieves item at `index` using RLE arithmetic.
944    ///
945    /// This calculates which shard holds the item, loads ONLY that shard,
946    /// and returns the specific item.
947    pub fn get<T: ParcodeItem>(&self, index: usize) -> Result<T> {
948        let payload = self.read_raw()?;
949        if payload.len() < 8 {
950            return Err(ParcodeError::Format("Not a valid container".into()));
951        }
952
953        let runs_data = payload.get(8..).unwrap_or(&[]);
954        let shard_runs: Vec<ShardRun> =
955            bincode::serde::decode_from_slice(runs_data, bincode::config::standard())
956                .map(|(obj, _)| obj)
957                .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
958
959        let (target_shard_idx, index_in_shard) = self.resolve_rle_index(index, &shard_runs)?;
960
961        let shard_node = self.get_child_by_index(target_shard_idx)?;
962
963        // New logic:
964        let payload = shard_node.read_raw()?;
965        let mut cursor = std::io::Cursor::new(payload.as_ref());
966        let children = shard_node.children()?;
967        let mut child_iter = children.into_iter();
968
969        let shard_data: Vec<T> = T::read_slice_from_shard(&mut cursor, &mut child_iter)?;
970
971        shard_data
972            .into_iter()
973            .nth(index_in_shard)
974            .ok_or(ParcodeError::Internal("Shard index mismatch".into()))
975    }
976
977    /// Creates a streaming iterator over the collection.
978    /// Memory usage is constant (size of one shard) regardless of total size.
979    pub fn iter<T: ParcodeItem>(self) -> Result<ChunkIterator<'a, T>> {
980        let payload = self.read_raw()?;
981        if payload.is_empty() && self.child_count == 0 {
982            return Ok(ChunkIterator::empty(self));
983        }
984        if payload.len() < 8 {
985            return Err(ParcodeError::Format("Not a valid container".into()));
986        }
987
988        let total_len = usize::try_from(u64::from_le_bytes(
989            payload
990                .get(0..8)
991                .ok_or_else(|| ParcodeError::Format("Payload too short".into()))?
992                .try_into()
993                .map_err(|_| ParcodeError::Format("Failed to read total_len".into()))?,
994        ))
995        .map_err(|_| ParcodeError::Format("total_len exceeds usize range".into()))?;
996        let runs_data = payload.get(8..).unwrap_or(&[]);
997        let shard_runs: Vec<ShardRun> =
998            bincode::serde::decode_from_slice(runs_data, bincode::config::standard())
999                .map(|(obj, _)| obj)
1000                .map_err(|e| ParcodeError::Serialization(e.to_string()))?;
1001
1002        Ok(ChunkIterator {
1003            container: self,
1004            shard_runs,
1005            total_items: total_len,
1006            current_global_idx: 0,
1007            current_shard_idx: 0,
1008            current_items_in_shard: Vec::new().into_iter(),
1009            _marker: PhantomData,
1010        })
1011    }
1012
1013    // --- INTERNAL HELPERS ---
1014
1015    /// Retrieves a child `ChunkNode` by its index in the footer.
1016    pub fn get_child_by_index(&self, index: usize) -> Result<Self> {
1017        if index >= self.child_count as usize {
1018            return Err(ParcodeError::Format("Child index out of bounds".into()));
1019        }
1020        let footer_start = usize::try_from(self.payload_end_offset)
1021            .map_err(|_| ParcodeError::Format("Offset exceeds usize range".into()))?;
1022        let entry_start = footer_start + (index * ChildRef::SIZE);
1023        let bytes = self
1024            .reader
1025            .mmap
1026            .get(entry_start..entry_start + ChildRef::SIZE)
1027            .ok_or_else(|| ParcodeError::Format("ChildRef index out of bounds".into()))?;
1028
1029        let r = ChildRef::from_bytes(bytes)?;
1030        self.reader.get_chunk(r.offset, r.length)
1031    }
1032
1033    /// Maps a global item index to a specific (`shard_index`, `internal_index`).
1034    fn resolve_rle_index(&self, global_index: usize, runs: &[ShardRun]) -> Result<(usize, usize)> {
1035        let mut current_base = 0;
1036        let mut shard_base = 0;
1037
1038        for run in runs {
1039            let count = run.item_count as usize;
1040            let total_run = count * run.repeat as usize;
1041
1042            if global_index < current_base + total_run {
1043                let offset = global_index - current_base;
1044                // Integer division gives logical shard, modulo gives index inside
1045                return Ok((shard_base + (offset / count), offset % count));
1046            }
1047            current_base += total_run;
1048            shard_base += run.repeat as usize;
1049        }
1050        Err(ParcodeError::Format("Index out of bounds".into()))
1051    }
1052
1053    /// Returns the absolute file offset of this chunk.
1054    pub fn offset(&self) -> u64 {
1055        self.offset
1056    }
1057
1058    /// Returns the total physical length of this chunk.
1059    pub fn length(&self) -> u64 {
1060        self.length
1061    }
1062
1063    /// Returns the number of children.
1064    pub fn child_count(&self) -> u32 {
1065        self.child_count
1066    }
1067
1068    /// Returns the metadata flags.
1069    pub fn meta(&self) -> crate::format::MetaByte {
1070        self.meta
1071    }
1072
1073    /// Calculates the size of the payload (excluding metadata/footer).
1074    pub fn payload_len(&self) -> u64 {
1075        // payload_end_offset is calculated in get_chunk, usually:
1076        // offset + (payload_end - offset)
1077        self.payload_end_offset - self.offset
1078    }
1079}
1080
1081// --- STREAMING ITERATOR ---
1082
1083/// An iterator that loads shards on demand, allowing iteration over datasets
1084/// larger than available RAM.
1085///
1086/// It buffers only one shard at a time.
1087#[derive(Debug)]
1088pub struct ChunkIterator<'a, T> {
1089    container: ChunkNode<'a>,
1090    #[allow(dead_code)]
1091    shard_runs: Vec<ShardRun>, // Reserved for future skip logic
1092    total_items: usize,
1093    current_global_idx: usize,
1094
1095    // State
1096    current_shard_idx: usize,
1097    current_items_in_shard: std::vec::IntoIter<T>,
1098
1099    _marker: PhantomData<T>,
1100}
1101
1102impl<'a, T> ChunkIterator<'a, T> {
1103    fn empty(node: ChunkNode<'a>) -> Self {
1104        Self {
1105            container: node,
1106            shard_runs: vec![],
1107            total_items: 0,
1108            current_global_idx: 0,
1109            current_shard_idx: 0,
1110            current_items_in_shard: Vec::new().into_iter(),
1111            _marker: PhantomData,
1112        }
1113    }
1114}
1115
1116impl<'a, T: ParcodeItem> Iterator for ChunkIterator<'a, T> {
1117    type Item = Result<T>;
1118
1119    fn next(&mut self) -> Option<Self::Item> {
1120        if self.current_global_idx >= self.total_items {
1121            return None;
1122        }
1123
1124        // 1. Try to pull from current loaded shard
1125        if let Some(item) = self.current_items_in_shard.next() {
1126            self.current_global_idx += 1;
1127            return Some(Ok(item));
1128        }
1129
1130        // 2. Buffer empty? Load next shard
1131        if self.current_shard_idx >= self.container.child_count as usize {
1132            return Some(Err(ParcodeError::Internal(
1133                "Iterator mismatch: runs out of shards".into(),
1134            )));
1135        }
1136
1137        // Load logic
1138        let next_shard_res = self
1139            .container
1140            .get_child_by_index(self.current_shard_idx)
1141            .and_then(|node| {
1142                // New logic:
1143                let payload = node.read_raw()?;
1144                let mut cursor = std::io::Cursor::new(payload.as_ref());
1145                let children = node.children()?;
1146                let mut child_iter = children.into_iter();
1147                T::read_slice_from_shard(&mut cursor, &mut child_iter)
1148            });
1149
1150        match next_shard_res {
1151            Ok(items) => {
1152                self.current_items_in_shard = items.into_iter();
1153                self.current_shard_idx += 1;
1154                // Recursively call next to yield the first item of the new shard
1155                self.next()
1156            }
1157            Err(e) => Some(Err(e)),
1158        }
1159    }
1160}