Skip to main content

hexz_ops/
pack.rs

1//! High-level archive packing operations.
2//!
3//! This module implements the core business logic for creating Hexz archive files
4//! from raw disk and memory images. It orchestrates a multi-stage pipeline that
5//! transforms raw input data into compressed, indexed, and optionally encrypted
6//! archive files optimized for fast random access and deduplication.
7//!
8//! # Core Capabilities
9//!
10//! - **Dictionary Training**: Intelligent sampling and Zstd dictionary optimization
11//! - **Chunking Strategies**: Fixed-size blocks or content-defined (`FastCDC`) for better deduplication
12//! - **Compression**: LZ4 (fast) or Zstd (high-ratio) with optional dictionary support
13//! - **Encryption**: Per-block AES-256-GCM authenticated encryption
14//! - **Deduplication**: BLAKE3 based content deduplication (disabled for encrypted data)
15//! - **Hierarchical Indexing**: Two-level index structure for efficient random access
16//! - **Progress Reporting**: Optional callback interface for UI integration
17//!
18//! # Architecture
19//!
20//! The packing process follows a carefully orchestrated pipeline. Each stage is designed
21//! to be memory-efficient (streaming) and to minimize write amplification:
22//!
23//! ```text
24//! ┌─────────────────────────────────────────────────────────────────────┐
25//! │ Stage 1: Dictionary Training (Optional, Zstd only)                  │
26//! │                                                                      │
27//! │  Input File → Stratified Sampling → Entropy Filtering → Zstd Train │
28//! │                                                                      │
29//! │  - Samples ~4000 blocks evenly distributed across input             │
30//! │  - Filters out zero blocks and high-entropy data (>6.0 bits/byte)   │
31//! │  - Produces dictionary (max 110 KiB) optimized for dataset          │
32//! │  - Training time: 2-5 seconds for typical VM images                 │
33//! └─────────────────────────────────────────────────────────────────────┘
34//!                                  ↓
35//! ┌─────────────────────────────────────────────────────────────────────┐
36//! │ Stage 2: Stream Processing (Per Input: Disk, Memory)                │
37//! │                                                                      │
38//! │  Raw Input → Chunking → Compression → Encryption → Dedup → Write   │
39//! │                                                                      │
40//! │  Chunking:                                                           │
41//! │   - Fixed-size: Divide into equal blocks (default 64 KiB)           │
42//! │   - FastCDC: Content-defined boundaries for better deduplication    │
43//! │                                                                      │
44//! │  Zero Block Optimization:                                            │
45//! │   - Detect all-zero chunks (common in VM images)                    │
46//! │   - Store as metadata only (offset=0, length=0)                     │
47//! │   - Saves significant space for sparse images                       │
48//! │                                                                      │
49//! │  Deduplication (Unencrypted only):                                  │
50//! │   - Compute BLAKE3 hash of compressed data                           │
51//! │   - Check hash table for existing block                             │
52//! │   - Reuse offset if duplicate found                                 │
53//! │   - Note: Disabled for encrypted data (unique nonces prevent dedup) │
54//! │                                                                      │
55//! │  Index Page Building:                                                │
56//! │   - Accumulate BlockInfo metadata (offset, length, checksum)        │
57//! │   - Flush page when reaching 4096 entries (~16 MB logical data)     │
58//! │   - Write serialized page to output, record PageEntry               │
59//! └─────────────────────────────────────────────────────────────────────┘
60//!                                  ↓
61//! ┌─────────────────────────────────────────────────────────────────────┐
62//! │ Stage 3: Index Finalization                                          │
63//! │                                                                      │
64//! │  MasterIndex (main_pages[], auxiliary_pages[], sizes) → Serialize      │
65//! │                                                                      │
66//! │  - Collect all PageEntry records from both streams                  │
67//! │  - Write master index at end of file                                │
68//! │  - Record index offset in header                                    │
69//! └─────────────────────────────────────────────────────────────────────┘
70//!                                  ↓
71//! ┌─────────────────────────────────────────────────────────────────────┐
72//! │ Stage 4: Header Writing                                              │
73//! │                                                                      │
74//! │  - Seek to file start (reserved 512 bytes)                          │
75//! │  - Write Header with format metadata                          │
76//! │  - Includes: compression type, encryption params, index offset      │
77//! │  - Flush to ensure atomicity                                        │
78//! └─────────────────────────────────────────────────────────────────────┘
79//! ```
80//!
81//! # Optimization Strategies
82//!
83//! ## Dictionary Training Algorithm
84//!
85//! The dictionary training process improves compression ratios by 10-30% for
86//! structured data (file systems, databases) by building a Zstd shared dictionary:
87//!
88//! 1. **Stratified Sampling**: Sample blocks evenly across input to capture diversity
89//!    - Step size = `file_size` / `target_samples` (typically 4000 samples)
90//!    - Ensures coverage of different file system regions
91//!
92//! 2. **Quality Filtering**: Exclude unsuitable blocks
93//!    - Skip all-zero blocks (no compressible patterns)
94//!    - Compute Shannon entropy for each block
95//!    - Reject blocks with entropy > 6.0 bits/byte (likely encrypted/random)
96//!
97//! 3. **Training**: Feed filtered samples to Zstd dictionary builder
98//!    - Target dictionary size: 110 KiB (fits in L2 cache)
99//!    - Uses Zstd's COVER algorithm to extract common patterns
100//!
101//! ## Deduplication Mechanism
102//!
103//! Content-based deduplication eliminates redundant blocks:
104//!
105//! - **Hash Table**: Maps BLAKE3 hash → physical offset for each unique compressed block
106//! - **Collision Handling**: BLAKE3 collisions are astronomically unlikely (2^128 blocks)
107//! - **Memory Usage**: ~48 bytes per unique block (32-byte hash + 8-byte offset + `HashMap` overhead)
108//! - **Write Behavior**: Only write each unique block once; reuse offset for duplicates
109//! - **Encryption Interaction**: Disabled when encrypting (each block gets unique nonce/ciphertext)
110//!
111//! ## Index Page Management
112//!
113//! The two-level index hierarchy balances random access performance and metadata overhead:
114//!
115//! - **Page Size**: 4096 entries per page
116//!   - With 64 KiB blocks: Each page covers ~256 MB of logical data
117//!   - Serialized page size: ~64 KiB (fits in L2 cache)
118//!
119//! - **Flushing Strategy**: Eager flush when page fills
120//!   - Prevents memory growth during large packs
121//!   - Enables streaming operation (constant memory)
122//!
123//! - **Master Index**: Array of `PageEntry` records
124//!   - Binary search for O(log N) page lookup
125//!   - Typical overhead: 1 KiB per GB of data
126//!
127//! # Memory Usage Patterns
128//!
129//! The packing operation is designed for constant memory usage regardless of input size:
130//!
131//! - **Chunking Buffer**: 1 block (64 KiB default)
132//! - **Compression Output**: ~1.5× block size (worst case: incompressible data)
133//! - **Current Index Page**: Up to 4096 × 20 bytes = 80 KiB
134//! - **Deduplication Map**: ~48 bytes × `unique_blocks`
135//!   - Example: 10 GB image with 50% dedup = ~80 MB `HashMap`
136//! - **Dictionary**: 110 KiB (if trained)
137//!
138//! Total typical memory: 100-200 MB for dedup hash table + ~1 MB working set.
139//!
140//! # Error Recovery
141//!
142//! The packing operation is not atomic. On failure:
143//!
144//! - **Partial File**: Output file is left in incomplete state
145//! - **Header Invalid**: Header is written last, so partial packs have zeroed header
146//! - **Detection**: Readers validate magic bytes and header checksum
147//! - **Recovery**: None; must delete partial file and retry pack operation
148//!
149//! Future enhancement: Two-phase commit with temporary file + atomic rename.
150//!
151//! # Usage Contexts
152//!
153//! This module is designed to be called from multiple contexts:
154//!
155//! - **CLI Commands**: `hexz data pack` (with terminal progress bars)
156//! - **Python Bindings**: `hexz.pack()` (with optional callbacks)
157//! - **Rust Applications**: Direct API usage for embedded scenarios
158//!
159//! By keeping pack operations separate from UI/CLI code, we avoid pulling in
160//! heavy dependencies (`clap`, `indicatif`) into library contexts.
161//!
162//! # Examples
163//!
164//! ## Basic Packing (LZ4, No Encryption)
165//!
166//! ```no_run
167//! use hexz_ops::pack::{pack_archive, PackConfig};
168//! use std::path::PathBuf;
169//!
170//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
171//! let config = PackConfig {
172//!     input: PathBuf::from("disk.raw"),
173//!     output: PathBuf::from("archive.hxz"),
174//!     compression: "lz4".to_string(),
175//!     ..Default::default()
176//! };
177//!
178//! pack_archive::<fn(u64, u64)>(&config, None)?;
179//! # Ok(())
180//! # }
181//! ```
182//!
183//! ## Advanced Packing (Zstd with Dictionary, CDC, Encryption)
184//!
185//! ```no_run
186//! use hexz_ops::pack::{pack_archive, PackConfig, PackTransformFlags};
187//! use std::path::PathBuf;
188//!
189//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
190//! let config = PackConfig {
191//!     input: PathBuf::from("ubuntu.qcow2"),
192//!     output: PathBuf::from("ubuntu.hxz"),
193//!     compression: "zstd".to_string(),
194//!     password: Some("secure_passphrase".to_string()),
195//!     min_chunk: Some(16384),   // 16 KiB minimum chunk
196//!     avg_chunk: Some(65536),   // 64 KiB average chunk
197//!     max_chunk: Some(262144),  // 256 KiB maximum chunk
198//!     transform: PackTransformFlags {
199//!         train_dict: true,     // Train dictionary for better ratio
200//!         encrypt: true,
201//!         ..Default::default()
202//!     },
203//!     ..Default::default()
204//! };
205//!
206//! pack_archive::<fn(u64, u64)>(&config, None)?;
207//! # Ok(())
208//! # }
209//! ```
210//!
211//! ## Progress Reporting
212//!
213//! ```no_run
214//! use hexz_ops::pack::{pack_archive, PackConfig};
215//! use std::path::PathBuf;
216//!
217//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
218//! let config = PackConfig {
219//!     input: PathBuf::from("disk.raw"),
220//!     output: PathBuf::from("archive.hxz"),
221//!     ..Default::default()
222//! };
223//!
224//! // Callback receives (current_logical_pos, total_size)
225//! let cb = |pos: u64, total: u64| {
226//!     let pct = (pos as f64 / total as f64) * 100.0;
227//!     println!("Packing: {:.1}%", pct);
228//! };
229//! pack_archive(&config, Some(&cb))?;
230//! # Ok(())
231//! # }
232//! ```
233//!
234//! # Performance Characteristics
235//!
236//! ## Throughput (Single-Threaded, i7-14700K)
237//!
238//! Validated benchmarks (see `docs/project-docs/BENCHMARKS.md` for details):
239//!
240//! - **LZ4 Compression**: 22 GB/s (minimal CPU overhead)
241//! - **LZ4 Decompression**: 31 GB/s
242//! - **Zstd Level 3 Compression**: 8.7 GB/s
243//! - **Zstd Level 3 Decompression**: 12.9 GB/s
244//! - **BLAKE3 Hashing**: 5.3 GB/s (2.2× faster than SHA-256)
245//! - **SHA-256 Hashing**: 2.5 GB/s
246//! - **`FastCDC` Chunking**: 2.7 GB/s (gear-based rolling hash)
247//! - **AES-256-GCM Encryption**: 2.1 GB/s (hardware AES-NI acceleration)
248//! - **Pack Throughput (LZ4, no CDC)**: 4.9 GB/s (64KB blocks)
249//! - **Pack Throughput (LZ4 + CDC)**: 1.9 GB/s (CDC adds 2.6× overhead)
250//! - **Pack Throughput (Zstd-3)**: 1.6 GB/s
251//! - **Block Size Impact**: 2.3 GB/s (4KB) → 4.7 GB/s (64KB) → 5.1 GB/s (1MB)
252//!
253//! Typical bottleneck: CDC chunking (when enabled) or compression CPU time. SSD I/O rarely limits.
254//!
255//! Run benchmarks: `cargo bench --bench compression`, `cargo bench --bench hashing`, `cargo bench --bench cdc_chunking`, `cargo bench --bench encryption`, `cargo bench --bench write_throughput`, and `cargo bench --bench block_size_tradeoffs`
256//!
257//! ## Compression Ratios (Typical VM Images)
258//!
259//! - **LZ4**: 2-3× (fast but lower ratio)
260//! - **Zstd Level 3**: 3-5× (good balance)
261//! - **Zstd + Dictionary**: 4-7× (+30% improvement from dictionary)
262//! - **CDC Deduplication**: Not validated - need benchmark comparing CDC vs fixed-size chunking
263//!
264//! ## Time Estimates (64 GB VM Image, Single Thread)
265//!
266//! - **LZ4, Fixed Blocks**: ~30-45 seconds
267//! - **Zstd, Fixed Blocks**: ~2-3 minutes
268//! - **Zstd + Dictionary + CDC**: ~3-5 minutes (includes 2-5s training time)
269//!
270//! # Atomicity and Crash Safety
271//!
272//! **WARNING**: Pack operations are NOT atomic. If interrupted:
273//!
274//! - Output file is left in a partially written state
275//! - The header (written last) will be all zeros
276//! - Readers will reject the file due to invalid magic bytes
277//! - Manual cleanup is required (delete partial file)
278//!
279//! For production use cases requiring atomicity, write to a temporary file and
280//! perform an atomic rename after successful completion.
281
282use hexz_common::constants::{DICT_TRAINING_SIZE, ENTROPY_THRESHOLD};
283use hexz_common::crypto::KeyDerivationParams;
284use hexz_common::{Error, Result};
285use hexz_core::api::file::Archive;
286use hexz_core::format::header::Header;
287use ignore::WalkBuilder;
288use std::fs::File;
289use std::io::{Read, Seek, SeekFrom, Write};
290use std::path::{Path, PathBuf};
291use std::sync::Arc;
292use walkdir::WalkDir;
293
294use crate::archive_writer::ArchiveWriter;
295use crate::parallel_pack::{CompressedChunk, RawChunk};
296use hexz_core::algo::compression::{create_compressor_from_str, zstd::ZstdCompressor};
297use hexz_core::algo::dedup::cdc::{StreamChunker, analyze_stream};
298use hexz_core::algo::dedup::dcam::{DedupeParams, optimize_params};
299use hexz_core::algo::encryption::{Encryptor, aes_gcm::AesGcmEncryptor};
300use hexz_core::api::manifest::{ArchiveManifest, FileEntry};
301#[cfg(unix)]
302use std::os::unix::fs::MetadataExt;
303
304/// Configuration parameters for archive packing.
305///
306/// This struct encapsulates all settings for the packing process. It's designed
307/// to be easily constructed from CLI arguments or programmatic APIs.
308///
309/// # Examples
310///
311/// ```
312/// use hexz_ops::pack::PackConfig;
313/// use std::path::PathBuf;
314///
315/// // Basic configuration with defaults
316/// let config = PackConfig {
317///     input: PathBuf::from("data/"),
318///     output: PathBuf::from("archive.hxz"),
319///     ..Default::default()
320/// };
321///
322/// // Advanced configuration with CDC and encryption
323/// let advanced = PackConfig {
324///     input: PathBuf::from("data/"),
325///     output: PathBuf::from("archive.hxz"),
326///     compression: "zstd".to_string(),
327///     password: Some("secret".to_string()),
328///     min_chunk: Some(16384),
329///     avg_chunk: Some(65536),
330///     max_chunk: Some(131072),
331///     transform: hexz_ops::pack::PackTransformFlags { encrypt: true, ..Default::default() },
332///     ..Default::default()
333/// };
334/// ```
335/// Feature flags controlling how data is transformed during packing.
336#[derive(Debug, Clone, Default)]
337pub struct PackTransformFlags {
338    /// Enable encryption.
339    pub encrypt: bool,
340    /// Train a compression dictionary (zstd only).
341    pub train_dict: bool,
342    /// Enable parallel compression (use multiple CPU cores).
343    pub parallel: bool,
344}
345
346/// Feature flags controlling analysis and UI during packing.
347#[derive(Debug, Clone, Default)]
348pub struct PackAnalysisFlags {
349    /// Show progress bar (if no callback provided).
350    pub show_progress: bool,
351    /// Run DCAM analysis to auto-detect optimal CDC parameters.
352    /// When false (default), uses fixed global defaults: min=16 KiB, avg=64 KiB, max=256 KiB.
353    pub use_dcam: bool,
354    /// If true, DCAM will sweep a wider range of parameters (up to 16MB average chunks).
355    pub dcam_optimal: bool,
356}
357
358/// Configuration for archive packing operations.
359#[derive(Debug, Clone)]
360pub struct PackConfig {
361    /// Input path (file or directory).
362    pub input: PathBuf,
363    /// Base archive for delta deduplication.
364    pub base: Option<PathBuf>,
365    /// Output archive file path.
366    pub output: PathBuf,
367    /// Compression algorithm ("lz4" or "zstd").
368    pub compression: String,
369    /// Encryption password (required if encrypt=true).
370    pub password: Option<String>,
371    /// Block size in bytes.
372    pub block_size: u32,
373    /// Minimum chunk size for CDC (auto-detected if None).
374    pub min_chunk: Option<u32>,
375    /// Average chunk size for CDC (auto-detected if None).
376    pub avg_chunk: Option<u32>,
377    /// Maximum chunk size for CDC (auto-detected if None).
378    pub max_chunk: Option<u32>,
379    /// Number of worker threads (0 = auto-detect).
380    pub num_workers: usize,
381    /// Data transformation flags (encryption, dictionary, parallelism).
382    pub transform: PackTransformFlags,
383    /// Analysis and UI flags (progress, DCAM).
384    pub analysis: PackAnalysisFlags,
385}
386
387impl Default for PackConfig {
388    fn default() -> Self {
389        Self {
390            input: PathBuf::new(),
391            base: None,
392            output: PathBuf::from("output.hxz"),
393            compression: "lz4".to_string(),
394            password: None,
395            block_size: 65536,
396            min_chunk: None,
397            avg_chunk: None,
398            max_chunk: None,
399            num_workers: 0, // Auto-detect CPU cores
400            transform: PackTransformFlags {
401                encrypt: false,
402                train_dict: false,
403                parallel: true, // Enable by default for performance
404            },
405            analysis: PackAnalysisFlags {
406                show_progress: true, // Show progress by default
407                use_dcam: false,     // Use fixed defaults; opt-in DCAM with --dcam
408                dcam_optimal: false,
409            },
410        }
411    }
412}
413
414/// Calculates Shannon entropy of a byte slice.
415///
416/// Shannon entropy measures the "randomness" or information content of data:
417/// - **0.0**: All bytes are identical (highly compressible)
418/// - **8.0**: Maximum entropy, random data (incompressible)
419///
420/// # Formula
421///
422/// ```text
423/// H(X) = -Σ p(x) * log2(p(x))
424/// ```
425///
426/// Where `p(x)` is the frequency of each byte value.
427///
428/// # Usage
429///
430/// Used during dictionary training to filter out high-entropy (random) blocks
431/// that wouldn't benefit from compression. Only blocks with entropy below
432/// `ENTROPY_THRESHOLD` are included in the training set.
433///
434/// # Parameters
435///
436/// - `data`: Byte slice to analyze
437///
438/// # Returns
439///
440/// Entropy value from 0.0 (homogeneous) to 8.0 (random).
441///
442/// # Examples
443///
444/// ```
445/// # use hexz_ops::pack::calculate_entropy;
446/// // Homogeneous data (low entropy)
447/// let zeros = vec![0u8; 1024];
448/// let entropy = calculate_entropy(&zeros);
449/// assert_eq!(entropy, 0.0);
450///
451/// // Random data (high entropy)
452/// let random: Vec<u8> = (0..=255).cycle().take(1024).collect();
453/// let entropy = calculate_entropy(&random);
454/// assert!(entropy > 7.0);
455/// ```
456pub fn calculate_entropy(data: &[u8]) -> f64 {
457    if data.is_empty() {
458        return 0.0;
459    }
460
461    let mut frequencies = [0u32; 256];
462    for &byte in data {
463        frequencies[byte as usize] += 1;
464    }
465
466    let len = data.len() as f64;
467    let mut entropy = 0.0;
468
469    for &count in &frequencies {
470        if count > 0 {
471            let p = count as f64 / len;
472            entropy = p.mul_add(-p.log2(), entropy);
473        }
474    }
475
476    entropy
477}
478
479/// Global CDC defaults used when DCAM auto-detection is disabled.
480///
481/// - min: 16 KiB  — avoids pathologically tiny chunks on large files
482/// - avg: 64 KiB  — good balance of dedup granularity vs. metadata overhead
483/// - max: 256 KiB — caps pathologically large chunks from low-entropy regions
484pub const CDC_DEFAULT_MIN: u32 = 16_384; // 16 KiB
485/// Default CDC average chunk size (64 KiB, f = 16).
486pub const CDC_DEFAULT_AVG: u32 = 65_536; // 64 KiB  (f = 16)
487/// Default CDC maximum chunk size (256 KiB).
488pub const CDC_DEFAULT_MAX: u32 = 262_144; // 256 KiB
489
490/// Resolve CDC parameters for packing.
491///
492/// Resolution order (highest to lowest priority):
493/// 1. Explicit user flags (`--min-chunk`, `--avg-chunk`, `--max-chunk`)
494/// 2. DCAM analysis (only when `config.analysis.use_dcam = true`) — scans the full file
495/// 3. Global defaults: min=16 KiB, avg=64 KiB, max=256 KiB
496///
497/// Partial overrides are supported at every level: e.g. only `--min-chunk`
498/// specified while the other two come from DCAM or the defaults.
499pub fn resolve_cdc_params(path: &Path, config: &PackConfig) -> Result<DedupeParams> {
500    /// Build a `DedupeParams` from the three logical sizes, filling w/v with
501    /// the standard values used throughout the rest of the code-base.
502    fn from_sizes(min: u32, avg: u32, max: u32) -> DedupeParams {
503        DedupeParams {
504            f: (avg as f64).log2().round() as u32,
505            m: min,
506            z: max,
507            w: 48,
508            v: 52,
509        }
510    }
511
512    // If the user supplied all three, short-circuit immediately — no file scan.
513    if let (Some(min), Some(avg), Some(max)) =
514        (config.min_chunk, config.avg_chunk, config.max_chunk)
515    {
516        return Ok(from_sizes(min, avg, max));
517    }
518
519    // Try to inherit from base archive if provided
520    if let Some(ref base_path) = config.base {
521        if let Ok(base_file) = File::open(base_path) {
522            let mut reader = std::io::BufReader::new(base_file);
523            if let Ok(header) = Header::read_from(&mut reader) {
524                if let Some((f, m, z)) = header.cdc_params {
525                    let avg = 1u32 << f;
526                    tracing::debug!(
527                        "Inheriting CDC params from base archive: f={} m={} z={}",
528                        f,
529                        m,
530                        z
531                    );
532                    return Ok(from_sizes(m, avg, z));
533                }
534            }
535        }
536    }
537
538    // Determine the base (min, avg, max) triple, either from DCAM or defaults.
539    let (base_min, base_avg, base_max) = if config.analysis.use_dcam {
540        // DCAM: scan the entire file to find data-adaptive optimal params.
541        let baseline = DedupeParams::lbfs_baseline();
542        let file = File::open(path)?;
543        let file_size = file.metadata()?.len();
544
545        if file_size == 0 {
546            (CDC_DEFAULT_MIN, CDC_DEFAULT_AVG, CDC_DEFAULT_MAX)
547        } else {
548            let stats = analyze_stream(file, &baseline)?;
549            let optimized = optimize_params(
550                file_size,
551                stats.unique_bytes,
552                &baseline,
553                config.analysis.dcam_optimal,
554            );
555            let p = &optimized.params;
556            let avg = (2u32).pow(p.f);
557            tracing::debug!(
558                "DCAM auto-detected CDC params: f={} m={} z={} (change_rate={:.4}, predicted_ratio={:.4})",
559                p.f,
560                p.m,
561                p.z,
562                optimized.change_rate,
563                optimized.predicted_ratio,
564            );
565            (p.m, avg, p.z)
566        }
567    } else {
568        (CDC_DEFAULT_MIN, CDC_DEFAULT_AVG, CDC_DEFAULT_MAX)
569    };
570
571    // Apply any user-provided partial overrides on top of the base triple.
572    let min = config.min_chunk.unwrap_or(base_min);
573    let avg = config.avg_chunk.unwrap_or(base_avg);
574    let max = config.max_chunk.unwrap_or(base_max);
575
576    Ok(from_sizes(min, avg, max))
577}
578
579/// Packs a archive file from disk and/or memory images.
580///
581/// This is the main entry point for creating Hexz archive files. It orchestrates
582/// the complete packing pipeline: dictionary training, stream processing, index
583/// building, and header finalization.
584///
585/// # Workflow
586///
587/// 1. **Validation**: Ensure at least one input (disk or memory) is provided
588/// 2. **File Creation**: Create output file, reserve 512 bytes for header
589/// 3. **Dictionary Training**: If requested (Zstd only), train dictionary from input samples
590/// 4. **Dictionary Writing**: If trained, write dictionary immediately after header
591/// 5. **Compressor Initialization**: Create LZ4 or Zstd compressor (with optional dictionary)
592/// 6. **Encryptor Initialization**: If requested, derive key from password using PBKDF2
593/// 7. **Stream Processing**: Process main stream (if provided), then auxiliary stream (if provided)
594///    - Each stream independently chunks, compresses, encrypts, deduplicates, and indexes
595/// 8. **Master Index Writing**: Serialize master index (all `PageEntry` records) to end of file
596/// 9. **Header Writing**: Seek to start, write complete header with metadata and offsets
597/// 10. **Flush**: Ensure all data is written to disk
598///
599/// # Parameters
600///
601/// - `config`: Packing configuration parameters (see [`PackConfig`])
602/// - `progress_callback`: Optional callback for progress reporting
603///   - Called frequently during stream processing (~once per 64 KiB)
604///   - Signature: `Fn(logical_pos: u64, total_size: u64)`
605///   - Example: `|pos, total| println!("Progress: {:.1}%", (pos as f64 / total as f64) * 100.0)`
606///
607/// # Returns
608///
609/// - `Ok(())`: Archive packed successfully
610/// - `Err(Error::Io)`: I/O error (file access, disk full, permission denied)
611/// - `Err(Error::Compression)`: Compression error (unlikely, usually indicates invalid state)
612/// - `Err(Error::Encryption)`: Encryption error (invalid password format, crypto failure)
613///
614/// # Errors
615///
616/// This function can fail for several reasons:
617///
618/// ## I/O Errors
619///
620/// - **Input file not found**: `config.disk` or `config.memory` path doesn't exist
621/// - **Permission denied**: Cannot read input or write output
622/// - **Disk full**: Insufficient space for output file
623/// - **Output exists**: May overwrite existing file without warning
624///
625/// ## Configuration Errors
626///
627/// - **No inputs**: Neither `disk` nor `memory` is provided
628/// - **Missing password**: `encrypt = true` but `password = None`
629/// - **Invalid block size**: Block size too small (<1 KiB) or too large (>16 MiB)
630/// - **Invalid CDC params**: `min_chunk >= avg_chunk >= max_chunk` constraint violated
631///
632/// ## Compression/Encryption Errors
633///
634/// - **Dictionary training failure**: Zstd training fails (rare, usually on corrupted input)
635/// - **Compression failure**: Compressor returns error (rare, usually indicates bug)
636/// - **Encryption failure**: Key derivation or cipher initialization fails
637///
638/// # Examples
639///
640/// ## Basic Usage
641///
642/// ```no_run
643/// use hexz_ops::pack::{pack_archive, PackConfig};
644/// use std::path::PathBuf;
645///
646/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
647/// let config = PackConfig {
648///     input: PathBuf::from("disk.raw"),
649///     output: PathBuf::from("archive.hxz"),
650///     ..Default::default()
651/// };
652///
653/// pack_archive::<fn(u64, u64)>(&config, None)?;
654/// # Ok(())
655/// # }
656/// ```
657///
658/// ## With Progress Reporting
659///
660/// ```no_run
661/// use hexz_ops::pack::{pack_archive, PackConfig, PackTransformFlags};
662/// use std::path::PathBuf;
663///
664/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
665/// let config = PackConfig {
666///     input: PathBuf::from("ubuntu.qcow2"),
667///     output: PathBuf::from("ubuntu.hxz"),
668///     compression: "zstd".to_string(),
669///     transform: PackTransformFlags { train_dict: true, ..Default::default() },
670///     ..Default::default()
671/// };
672///
673/// let cb = |pos: u64, total: u64| {
674///     eprint!("\rPacking: {:.1}%", (pos as f64 / total as f64) * 100.0);
675/// };
676/// pack_archive(&config, Some(&cb))?;
677/// eprintln!("\nDone!");
678/// # Ok(())
679/// # }
680/// ```
681///
682/// ## Encrypted Archive
683///
684/// ```no_run
685/// use hexz_ops::pack::{pack_archive, PackConfig, PackTransformFlags};
686/// use std::path::PathBuf;
687///
688/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
689/// let config = PackConfig {
690///     input: PathBuf::from("sensitive.raw"),
691///     output: PathBuf::from("sensitive.hxz"),
692///     password: Some("strong_passphrase".to_string()),
693///     transform: PackTransformFlags { encrypt: true, ..Default::default() },
694///     ..Default::default()
695/// };
696///
697/// pack_archive::<fn(u64, u64)>(&config, None)?;
698/// println!("Encrypted archive created");
699/// # Ok(())
700/// # }
701/// ```
702///
703/// ## Content-Defined Chunking for Deduplication
704///
705/// ```no_run
706/// use hexz_ops::pack::{pack_archive, PackConfig};
707/// use std::path::PathBuf;
708///
709/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
710/// let config = PackConfig {
711///     input: PathBuf::from("incremental-backup.raw"),
712///     output: PathBuf::from("backup.hxz"),
713///     min_chunk: Some(16384),   // 16 KiB
714///     avg_chunk: Some(65536),   // 64 KiB
715///     max_chunk: Some(262144),  // 256 KiB
716///     ..Default::default()
717/// };
718///
719/// pack_archive::<fn(u64, u64)>(&config, None)?;
720/// # Ok(())
721/// # }
722/// ```
723///
724/// # Performance
725///
726/// See module-level documentation for detailed performance characteristics.
727///
728/// Typical throughput for a 64 GB VM image on modern hardware (Intel i7, `NVMe` SSD):
729///
730/// - **LZ4, no encryption**: ~2 GB/s (~30 seconds total)
731/// - **Zstd level 3, no encryption**: ~500 MB/s (~2 minutes total)
732/// - **Zstd + dictionary + CDC**: ~400 MB/s (~3 minutes including training)
733///
734/// # Atomicity
735///
736/// This operation is NOT atomic. On failure, the output file will be left in a
737/// partially written state. The file header is written last, so incomplete files
738/// will have an all-zero header and will be rejected by readers.
739///
740/// For atomic pack operations, write to a temporary file and perform an atomic
741/// rename after success:
742///
743/// ```no_run
744/// # use hexz_ops::pack::{pack_archive, PackConfig};
745/// # use std::path::PathBuf;
746/// # use std::fs;
747/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
748/// let config = PackConfig {
749///     input: PathBuf::from("disk.raw"),
750///     output: PathBuf::from("archive.hxz.tmp"),
751///     ..Default::default()
752/// };
753///
754/// pack_archive::<fn(u64, u64)>(&config, None)?;
755/// fs::rename("archive.hxz.tmp", "archive.hxz")?;
756/// # Ok(())
757/// # }
758/// ```
759///
760/// # Thread Safety
761///
762/// This function is not thread-safe with respect to the output file. Do not call
763/// `pack_archive` concurrently with the same output path. Concurrent packing to
764/// different output files is safe.
765///
766/// The progress callback must be `Send + Sync` if you want to call this function
767/// from a non-main thread.
768pub fn pack_archive<F>(config: &PackConfig, progress_callback: Option<&F>) -> Result<()>
769where
770    F: Fn(u64, u64) + Send + Sync,
771{
772    // 1. Resolve inputs
773    let input_path = &config.input;
774
775    // 2. Load parent archive if specified (for thin snapshots)
776    let parent = if let Some(ref base_path) = config.base {
777        Some(hexz_store::open_local(base_path, None)?)
778    } else {
779        None
780    };
781
782    // 3. Train compression dictionary if requested
783    let dictionary = if config.compression == "zstd" && config.transform.train_dict {
784        let sample_path = if input_path.is_dir() {
785            // Sample from the first file in the directory
786            WalkDir::new(input_path)
787                .into_iter()
788                .filter_map(std::result::Result::ok)
789                .find(|e| e.file_type().is_file())
790                .ok_or_else(|| {
791                    Error::Io(std::io::Error::new(
792                        std::io::ErrorKind::NotFound,
793                        "No files found for dictionary training",
794                    ))
795                })?
796                .path()
797                .to_path_buf()
798        } else {
799            input_path.clone()
800        };
801        Some(train_dictionary(&sample_path, config.block_size)?)
802    } else {
803        None
804    };
805
806    // 4. Initialize compressor & encryptor
807    let (compressor, compression_type) =
808        create_compressor_from_str(&config.compression, None, dictionary.as_deref())?;
809
810    let (encryptor, enc_params): (Option<Box<dyn Encryptor>>, _) = if config.transform.encrypt {
811        let password = config.password.clone().ok_or_else(|| {
812            Error::Io(std::io::Error::new(
813                std::io::ErrorKind::InvalidInput,
814                "Password required for encryption",
815            ))
816        })?;
817        let params = KeyDerivationParams::default();
818        let enc = AesGcmEncryptor::new(password.as_bytes(), &params.salt, params.iterations)?;
819        (Some(Box::new(enc) as Box<dyn Encryptor>), Some(params))
820    } else {
821        (None, None)
822    };
823
824    // 5. Resolve CDC parameters
825    let cdc_params = if input_path.is_file() {
826        resolve_cdc_params(input_path, config)?
827    } else {
828        DedupeParams::lbfs_baseline()
829    };
830
831    // 5. Create ArchiveWriter
832    let mut builder = ArchiveWriter::builder(&config.output, compressor, compression_type)
833        .block_size(config.block_size)
834        .variable_blocks(true)
835        .cdc_params(Some((cdc_params.f, cdc_params.m, cdc_params.z)));
836
837    if let Some(parent_snap) = parent {
838        builder = builder.parent(parent_snap);
839    }
840
841    if let (Some(enc), Some(params)) = (encryptor, enc_params) {
842        builder = builder.encryption(enc, params);
843    }
844
845    let mut writer = builder.build()?;
846
847    if let Some(d) = &dictionary {
848        writer.write_dictionary(d)?;
849    }
850
851    // 7. Process input
852    let dict_ref = dictionary.as_deref();
853    let manifest = if input_path.is_dir() {
854        Some(pack_directory(
855            input_path,
856            &mut writer,
857            &cdc_params,
858            config,
859            dict_ref,
860            progress_callback,
861        )?)
862    } else {
863        let total_size = input_path.metadata()?.len();
864        let progress_bar =
865            if config.analysis.show_progress && progress_callback.is_none() && total_size > 0 {
866                Some(crate::progress::PackProgress::new(total_size, "Packing"))
867            } else {
868                None
869            };
870
871        let cb = |pos: u64, total: u64| {
872            if let Some(ref pb) = progress_bar {
873                pb.set_position(pos);
874            }
875            if let Some(ref cb) = progress_callback {
876                cb(pos, total);
877            }
878        };
879
880        process_stream(
881            input_path,
882            true,
883            &mut writer,
884            &cdc_params,
885            config,
886            dict_ref,
887            Some(&cb),
888        )?;
889
890        if let Some(ref pb) = progress_bar {
891            pb.finish();
892        }
893        None
894    };
895
896    // 8. Finalize
897    let metadata = if let Some(m) = manifest {
898        Some(serde_json::to_vec(&m).map_err(|e| Error::Format(e.to_string()))?)
899    } else {
900        None
901    };
902
903    let parent_paths = if let Some(ref base) = config.base {
904        vec![base.to_string_lossy().into_owned()]
905    } else {
906        Vec::new()
907    };
908
909    writer.finalize(parent_paths, metadata.as_deref())?;
910
911    Ok(())
912}
913
914/// Recursively packs a directory into Main (and optionally Auxiliary) archive streams.
915///
916/// A file named `memory` at the root of the input directory is packed into the
917/// Auxiliary stream. All other files go into the Main stream.
918fn pack_directory<F>(
919    root: &Path,
920    writer: &mut ArchiveWriter,
921    cdc_params: &DedupeParams,
922    config: &PackConfig,
923    dictionary: Option<&[u8]>,
924    progress_callback: Option<&F>,
925) -> Result<ArchiveManifest>
926where
927    F: Fn(u64, u64) + Send + Sync,
928{
929    // Collect all file entries, separating main vs auxiliary
930    let mut main_entries: Vec<(PathBuf, String, std::fs::Metadata)> = Vec::new();
931    let mut aux_entries: Vec<(PathBuf, String, std::fs::Metadata)> = Vec::new();
932
933    let walker = WalkBuilder::new(root)
934        .standard_filters(true)
935        .add_custom_ignore_filename(".hexzignore")
936        .hidden(false)
937        .build();
938
939    for entry in walker.filter_map(std::result::Result::ok) {
940        if !entry.file_type().is_some_and(|ft| ft.is_file()) {
941            continue;
942        }
943        let path = entry.path().to_path_buf();
944        if path.components().any(|c| c.as_os_str() == ".hexz") {
945            continue;
946        }
947        let rel_path = path
948            .strip_prefix(root)
949            .map_err(|e| Error::Format(e.to_string()))?
950            .to_string_lossy()
951            .into_owned();
952        let metadata = entry
953            .metadata()
954            .map_err(|e| Error::Io(std::io::Error::other(e.to_string())))?;
955
956        // A file named "memory" at the directory root goes to the Auxiliary stream
957        if rel_path == "memory" {
958            aux_entries.push((path, rel_path, metadata));
959        } else {
960            main_entries.push((path, rel_path, metadata));
961        }
962    }
963
964    let main_size: u64 = main_entries.iter().map(|(_, _, m)| m.len()).sum();
965    let aux_size: u64 = aux_entries.iter().map(|(_, _, m)| m.len()).sum();
966    let total_size = main_size + aux_size;
967
968    let progress_bar =
969        if config.analysis.show_progress && progress_callback.is_none() && total_size > 0 {
970            Some(crate::progress::PackProgress::new(
971                total_size,
972                "Packing Directory",
973            ))
974        } else {
975            None
976        };
977
978    // Pack main stream
979    let mut files = Vec::new();
980    let mut current_logical_offset = 0u64;
981    let mut global_progress = 0u64;
982
983    writer.begin_stream(true, main_size);
984
985    for (path, rel_path, metadata) in &main_entries {
986        let size = metadata.len();
987        let file_entry = FileEntry {
988            path: rel_path.clone(),
989            offset: current_logical_offset,
990            size,
991            mode: {
992                #[cfg(unix)]
993                {
994                    metadata.mode()
995                }
996                #[cfg(not(unix))]
997                {
998                    0o644
999                }
1000            },
1001            mtime: metadata
1002                .modified()?
1003                .duration_since(std::time::UNIX_EPOCH)
1004                .unwrap_or_default()
1005                .as_secs(),
1006        };
1007
1008        let cur_offset = current_logical_offset;
1009        let cb = |pos: u64, _total: u64| {
1010            let gp = global_progress + pos;
1011            if let Some(ref pb) = progress_bar {
1012                pb.set_position(gp);
1013            }
1014            if let Some(ref cb) = progress_callback {
1015                cb(cur_offset + pos, total_size);
1016            }
1017        };
1018
1019        pack_file_to_stream(path, writer, cdc_params, config, dictionary, Some(&cb))?;
1020        writer.flush_stream()?;
1021
1022        files.push(file_entry);
1023        current_logical_offset = writer.current_logical_pos();
1024        global_progress += size;
1025    }
1026
1027    writer.end_stream()?;
1028
1029    // Pack auxiliary stream if there are memory files
1030    if !aux_entries.is_empty() {
1031        writer.begin_stream(false, aux_size);
1032
1033        for (path, rel_path, metadata) in &aux_entries {
1034            let size = metadata.len();
1035            let file_entry = FileEntry {
1036                path: rel_path.clone(),
1037                offset: 0,
1038                size,
1039                mode: {
1040                    #[cfg(unix)]
1041                    {
1042                        metadata.mode()
1043                    }
1044                    #[cfg(not(unix))]
1045                    {
1046                        0o644
1047                    }
1048                },
1049                mtime: metadata
1050                    .modified()?
1051                    .duration_since(std::time::UNIX_EPOCH)
1052                    .unwrap_or_default()
1053                    .as_secs(),
1054            };
1055
1056            let cb = |pos: u64, _total: u64| {
1057                let gp = global_progress + pos;
1058                if let Some(ref pb) = progress_bar {
1059                    pb.set_position(gp);
1060                }
1061                if let Some(ref cb) = progress_callback {
1062                    cb(gp, total_size);
1063                }
1064            };
1065
1066            pack_file_to_stream(path, writer, cdc_params, config, dictionary, Some(&cb))?;
1067            writer.flush_stream()?;
1068
1069            files.push(file_entry);
1070            global_progress += size;
1071        }
1072
1073        writer.end_stream()?;
1074    }
1075
1076    if let Some(ref pb) = progress_bar {
1077        pb.finish();
1078    }
1079
1080    Ok(ArchiveManifest { files })
1081}
1082
1083/// Packs a single file into the current active stream of the `ArchiveWriter`.
1084fn pack_file_to_stream<F>(
1085    path: &Path,
1086    writer: &mut ArchiveWriter,
1087    cdc_params: &DedupeParams,
1088    config: &PackConfig,
1089    dictionary: Option<&[u8]>,
1090    progress_callback: Option<&F>,
1091) -> Result<()>
1092where
1093    F: Fn(u64, u64),
1094{
1095    let f = File::open(path)?;
1096    let len = f.metadata()?.len();
1097
1098    if config.transform.parallel && !config.transform.encrypt {
1099        process_stream_parallel(
1100            f,
1101            len,
1102            writer,
1103            cdc_params,
1104            config,
1105            dictionary,
1106            progress_callback,
1107        )?;
1108    } else {
1109        process_stream_serial(f, len, writer, cdc_params, progress_callback)?;
1110    }
1111
1112    Ok(())
1113}
1114
1115/// Extracts an archive back to the filesystem.
1116///
1117/// If the archive contains a manifest, it will be extracted as a directory.
1118/// Otherwise, the Main stream will be extracted as a single raw file.
1119pub fn extract_archive(
1120    input_path: &Path,
1121    output_path: &Path,
1122    password: Option<String>,
1123) -> Result<()> {
1124    use hexz_core::ArchiveStream;
1125    use hexz_core::algo::compression::create_compressor;
1126    use hexz_core::algo::encryption::aes_gcm::AesGcmEncryptor;
1127    use hexz_core::api::file::ParentLoader;
1128    use hexz_core::format::header::Header;
1129    use hexz_store::local::MmapBackend;
1130
1131    let backend = Arc::new(MmapBackend::new(input_path)?);
1132    let header = Header::read_from_backend(backend.as_ref())?;
1133
1134    let encryptor = if let (Some(params), Some(pass)) = (header.encryption.as_ref(), password) {
1135        let enc = AesGcmEncryptor::new(pass.as_bytes(), &params.salt, params.iterations)?;
1136        Some(Box::new(enc) as Box<dyn hexz_core::algo::encryption::Encryptor>)
1137    } else {
1138        None
1139    };
1140
1141    let dictionary = header.load_dictionary(backend.as_ref())?;
1142    let compressor = create_compressor(header.compression, None, dictionary.as_deref());
1143
1144    // Provide a parent loader that resolves relative to the input archive
1145    let archive_dir = input_path
1146        .parent()
1147        .unwrap_or_else(|| Path::new("."))
1148        .to_path_buf();
1149    let loader: ParentLoader = Box::new(move |parent_path: &str| {
1150        let p = Path::new(parent_path);
1151
1152        // Try resolving in this order:
1153        // 1. As provided (if absolute or relative to CWD)
1154        // 2. Relative to the archive being extracted
1155        let full_parent_path = if p.exists() {
1156            p.to_path_buf()
1157        } else {
1158            let rel = archive_dir.join(parent_path);
1159            if rel.exists() {
1160                rel
1161            } else {
1162                // Fallback to p and let it fail with proper error if not found
1163                p.to_path_buf()
1164            }
1165        };
1166
1167        let pb: Arc<dyn hexz_core::store::StorageBackend> =
1168            Arc::new(MmapBackend::new(&full_parent_path)?);
1169        Archive::open(pb, None)
1170    });
1171
1172    let archive =
1173        Archive::with_cache_and_loader(backend, compressor, encryptor, None, None, Some(&loader))?;
1174
1175    // Check for manifest in metadata
1176    if let Some(metadata) = &archive.metadata {
1177        if let Ok(manifest) = serde_json::from_slice::<ArchiveManifest>(metadata) {
1178            std::fs::create_dir_all(output_path)?;
1179
1180            for file in manifest.files {
1181                let out_path = output_path.join(&file.path);
1182                if let Some(parent) = out_path.parent() {
1183                    std::fs::create_dir_all(parent)?;
1184                }
1185
1186                let mut out_file = File::create(&out_path)?;
1187                let data = archive.read_at(ArchiveStream::Main, file.offset, file.size as usize)?;
1188                out_file.write_all(&data)?;
1189
1190                #[cfg(unix)]
1191                {
1192                    use std::os::unix::fs::PermissionsExt;
1193                    std::fs::set_permissions(
1194                        &out_path,
1195                        std::fs::Permissions::from_mode(file.mode),
1196                    )?;
1197                }
1198            }
1199            return Ok(());
1200        }
1201    }
1202
1203    // Fallback: extract Main stream to a single file
1204    let mut out_file = File::create(output_path)?;
1205    let size = archive.size(ArchiveStream::Main);
1206
1207    // Extract in chunks to avoid huge memory usage
1208    let chunk_size = 1024 * 1024; // 1MB
1209    let mut pos = 0u64;
1210    while pos < size {
1211        let len = std::cmp::min(chunk_size as u64, size - pos) as usize;
1212        let data = archive.read_at(ArchiveStream::Main, pos, len)?;
1213        out_file.write_all(&data)?;
1214        pos += len as u64;
1215    }
1216
1217    Ok(())
1218}
1219
1220/// Trains a Zstd compression dictionary from stratified samples.
1221///
1222/// Dictionary training analyzes a representative sample of input blocks to build
1223/// a shared dictionary that improves compression ratios for structured data
1224/// (file systems, databases, logs) by capturing common patterns.
1225///
1226/// # Algorithm
1227///
1228/// 1. **Stratified Sampling**: Sample blocks evenly across the file
1229///    - Compute step size: `file_size / target_samples`
1230///    - Read one block at each sample point
1231///    - Ensures coverage of different regions (boot sector, metadata, data)
1232///
1233/// 2. **Quality Filtering**: Exclude unsuitable blocks
1234///    - Skip all-zero blocks (no compressible patterns)
1235///    - Compute Shannon entropy (0-8 bits per byte)
1236///    - Reject blocks with entropy > `ENTROPY_THRESHOLD` (6.0)
1237///    - Rationale: High-entropy data (encrypted, random) doesn't benefit from dictionaries
1238///
1239/// 3. **Dictionary Training**: Feed filtered samples to Zstd
1240///    - Uses Zstd's COVER algorithm (`fast_cover` variant)
1241///    - Analyzes n-grams to find common subsequences
1242///    - Outputs dictionary up to `DICT_TRAINING_SIZE` (110 KiB)
1243///
1244/// # Parameters
1245///
1246/// - `input_path`: Path to the input file to sample from
1247/// - `block_size`: Size of each sample block in bytes
1248///
1249/// # Returns
1250///
1251/// - `Ok(Vec<u8>)`: Trained dictionary bytes (empty if training fails or no suitable samples)
1252/// - `Err(Error)`: I/O error reading input file
1253///
1254/// # Performance
1255///
1256/// - **Sampling time**: ~100-500 ms (depends on file size and disk speed)
1257/// - **Training time**: ~2-5 seconds for 4000 samples
1258/// - **Memory usage**: ~256 MB (sample corpus in RAM)
1259///
1260/// # Compression Improvement
1261///
1262/// - **Typical**: 10-30% better ratio vs. no dictionary
1263/// - **Best case**: 50%+ improvement for highly structured data (databases)
1264/// - **Worst case**: No improvement or slight regression (already compressed data)
1265///
1266/// # Edge Cases
1267///
1268/// - **Empty file**: Returns empty dictionary with warning
1269/// - **All high-entropy data**: Returns empty dictionary with warning
1270/// - **Small files**: May not reach target sample count (trains on available data)
1271///
1272/// # Examples
1273///
1274/// Called internally by `pack_archive` when `train_dict` is enabled:
1275///
1276/// ```text
1277/// let dict = train_dictionary(Path::new("disk.raw"), 65536)?;
1278/// // dict: Vec<u8> containing the trained zstd dictionary
1279/// ```
1280fn train_dictionary(input_path: &Path, block_size: u32) -> Result<Vec<u8>> {
1281    let mut f = File::open(input_path)?;
1282    let file_len = f.metadata()?.len();
1283
1284    let mut samples = Vec::new();
1285    let mut buffer = vec![0u8; block_size as usize];
1286    let target_samples = DICT_TRAINING_SIZE;
1287
1288    let step = if file_len > 0 {
1289        (file_len / target_samples as u64).max(block_size as u64)
1290    } else {
1291        0
1292    };
1293
1294    let mut attempts = 0;
1295    while samples.len() < target_samples && attempts < target_samples * 2 {
1296        let offset = attempts as u64 * step;
1297        if offset >= file_len {
1298            break;
1299        }
1300
1301        _ = f.seek(SeekFrom::Start(offset))?;
1302        let n = f.read(&mut buffer)?;
1303        if n == 0 {
1304            break;
1305        }
1306        let chunk = &buffer[..n];
1307        let is_zeros = chunk.iter().all(|&b| b == 0);
1308
1309        if !is_zeros {
1310            let entropy = calculate_entropy(chunk);
1311            if entropy < ENTROPY_THRESHOLD {
1312                samples.push(chunk.to_vec());
1313            }
1314        }
1315        attempts += 1;
1316    }
1317
1318    if samples.is_empty() {
1319        tracing::warn!("Input seems to be empty or high entropy. Dictionary will be empty.");
1320        Ok(Vec::new())
1321    } else {
1322        let dict_bytes = ZstdCompressor::train(&samples, DICT_TRAINING_SIZE)?;
1323        tracing::info!("Dictionary trained: {} bytes", dict_bytes.len());
1324        Ok(dict_bytes)
1325    }
1326}
1327
1328/// Processes a single input stream (disk or memory) via the [`ArchiveWriter`].
1329fn process_stream<F>(
1330    path: &Path,
1331    is_disk: bool,
1332    writer: &mut ArchiveWriter,
1333    cdc_params: &DedupeParams,
1334    config: &PackConfig,
1335    dictionary: Option<&[u8]>,
1336    progress_callback: Option<&F>,
1337) -> Result<()>
1338where
1339    F: Fn(u64, u64),
1340{
1341    let f = File::open(path)?;
1342    let len = f.metadata()?.len();
1343
1344    writer.begin_stream(is_disk, len);
1345
1346    // Use parallel path when enabled and not encrypting (encryption needs sequential nonces)
1347    if config.transform.parallel && !config.transform.encrypt {
1348        process_stream_parallel(
1349            f,
1350            len,
1351            writer,
1352            cdc_params,
1353            config,
1354            dictionary,
1355            progress_callback,
1356        )?;
1357    } else {
1358        process_stream_serial(f, len, writer, cdc_params, progress_callback)?;
1359    }
1360
1361    writer.end_stream()?;
1362    Ok(())
1363}
1364
1365/// Serial (original) stream processing path.
1366fn process_stream_serial<F>(
1367    f: File,
1368    len: u64,
1369    writer: &mut ArchiveWriter,
1370    cdc_params: &DedupeParams,
1371    progress_callback: Option<&F>,
1372) -> Result<()>
1373where
1374    F: Fn(u64, u64),
1375{
1376    let mut logical_pos = 0u64;
1377    let mut chunk_buf = Vec::with_capacity(cdc_params.z as usize);
1378
1379    let mut chunker = StreamChunker::new(f, *cdc_params);
1380    while let Some(res) = chunker.next_into(&mut chunk_buf) {
1381        let n = res?;
1382        logical_pos += n as u64;
1383        writer.write_data_block(&chunk_buf)?;
1384        if let Some(callback) = progress_callback {
1385            callback(logical_pos, len);
1386        }
1387    }
1388
1389    Ok(())
1390}
1391
1392/// Parallel stream processing: single persistent pipeline for the entire stream.
1393///
1394/// Architecture:
1395/// - Reader thread: reads input file, chunks it, sends to workers
1396/// - N worker threads: compress + BLAKE3 hash chunks in parallel
1397/// - Main thread: receives compressed chunks, reorders via `BTreeMap`, writes sequentially
1398///
1399/// This avoids per-batch thread pool creation overhead (the old approach created
1400/// ~2800 thread pools for a 180GB file).
1401fn process_stream_parallel<F>(
1402    f: File,
1403    len: u64,
1404    writer: &mut ArchiveWriter,
1405    cdc_params: &DedupeParams,
1406    config: &PackConfig,
1407    dictionary: Option<&[u8]>,
1408    progress_callback: Option<&F>,
1409) -> Result<()>
1410where
1411    F: Fn(u64, u64),
1412{
1413    use crossbeam::channel::bounded;
1414    use hexz_core::algo::compression::Compressor;
1415    use std::collections::BTreeMap;
1416    use std::sync::Arc;
1417
1418    let num_workers = if config.num_workers > 0 {
1419        config.num_workers
1420    } else {
1421        num_cpus::get()
1422    };
1423
1424    // Create shared compressor for all workers, passing the trained dictionary
1425    let (compressor, _) = create_compressor_from_str(&config.compression, None, dictionary)?;
1426    let compressor: Arc<Box<dyn Compressor + Send + Sync>> = Arc::new(compressor);
1427
1428    // Bounded channels for backpressure: enough to keep workers busy without excessive memory.
1429    // Each in-flight chunk is ~64KB, so num_workers*4 chunks ≈ num_workers*256KB.
1430    let channel_size = num_workers * 4;
1431    let (tx_raw, rx_raw) = bounded::<(u64, RawChunk)>(channel_size);
1432    let (tx_compressed, rx_compressed) = bounded::<(u64, CompressedChunk)>(channel_size);
1433
1434    // Spawn persistent compression workers
1435    let mut workers = Vec::with_capacity(num_workers);
1436    for _ in 0..num_workers {
1437        let rx = rx_raw.clone();
1438        let tx = tx_compressed.clone();
1439        let comp = compressor.clone();
1440        workers.push(std::thread::spawn(move || -> Result<()> {
1441            for (seq, chunk) in rx {
1442                let compressed_data = comp.compress(&chunk.data)?;
1443                let hash = blake3::hash(&chunk.data);
1444                if tx
1445                    .send((
1446                        seq,
1447                        CompressedChunk {
1448                            compressed: compressed_data,
1449                            hash: hash.into(),
1450                            logical_offset: chunk.logical_offset,
1451                            original_size: chunk.data.len(),
1452                        },
1453                    ))
1454                    .is_err()
1455                {
1456                    break; // Receiver dropped, pipeline shutting down
1457                }
1458            }
1459            Ok(())
1460        }));
1461    }
1462
1463    // Drop our copies so channels close when all real holders finish
1464    drop(rx_raw);
1465    drop(tx_compressed);
1466
1467    // Spawn reader thread: reads input, chunks it, feeds workers
1468    let reader_cdc_params = *cdc_params;
1469    let reader = std::thread::spawn(move || -> Result<()> {
1470        let mut logical_pos = 0u64;
1471
1472        let chunker = StreamChunker::new(f, reader_cdc_params);
1473        for (seq, chunk_res) in chunker.enumerate() {
1474            let chunk = chunk_res?;
1475            let chunk_len = chunk.len();
1476            if tx_raw
1477                .send((
1478                    seq as u64,
1479                    RawChunk {
1480                        data: chunk,
1481                        logical_offset: logical_pos,
1482                    },
1483                ))
1484                .is_err()
1485            {
1486                break; // Workers shut down
1487            }
1488            logical_pos += chunk_len as u64;
1489        }
1490        Ok(())
1491    });
1492
1493    // Main thread: receive compressed chunks, reorder, write sequentially.
1494    // Workers return chunks out-of-order; BTreeMap restores logical order.
1495    let mut next_seq = 0u64;
1496    let mut reorder_buf: BTreeMap<u64, CompressedChunk> = BTreeMap::new();
1497    let mut write_error: Option<Error> = None;
1498
1499    for (seq, compressed) in &rx_compressed {
1500        _ = reorder_buf.insert(seq, compressed);
1501
1502        // Drain all consecutive chunks ready to write
1503        while let Some(chunk) = reorder_buf.remove(&next_seq) {
1504            match writer.write_precompressed_block(
1505                &chunk.compressed,
1506                &chunk.hash,
1507                chunk.original_size as u32,
1508            ) {
1509                Ok(()) => {
1510                    if let Some(callback) = progress_callback {
1511                        callback(chunk.logical_offset + chunk.original_size as u64, len);
1512                    }
1513                    next_seq += 1;
1514                }
1515                Err(e) => {
1516                    write_error = Some(e);
1517                    break;
1518                }
1519            }
1520        }
1521        if write_error.is_some() {
1522            break;
1523        }
1524    }
1525
1526    // Drop receiver to unblock workers/reader if we exited early due to write error.
1527    // This causes workers' send() to fail → workers exit → reader's send() fails → reader exits.
1528    drop(rx_compressed);
1529
1530    // Wait for all threads to finish
1531    let reader_result = reader
1532        .join()
1533        .map_err(|_| Error::Io(std::io::Error::other("Reader thread panicked")))?;
1534
1535    for worker in workers {
1536        _ = worker
1537            .join()
1538            .map_err(|_| Error::Io(std::io::Error::other("Worker thread panicked")))?
1539            .ok(); // Ignore worker errors if we already have a write error
1540    }
1541
1542    // Propagate errors (write errors take priority)
1543    if let Some(e) = write_error {
1544        return Err(e);
1545    }
1546    reader_result?;
1547
1548    Ok(())
1549}
1550
1551#[cfg(test)]
1552#[allow(clippy::float_cmp)]
1553mod tests {
1554    use super::*;
1555
1556    #[test]
1557    fn test_calculate_entropy_empty() {
1558        assert_eq!(calculate_entropy(&[]), 0.0);
1559    }
1560
1561    #[test]
1562    fn test_calculate_entropy_uniform() {
1563        // All same byte - lowest entropy
1564        let data = vec![0x42; 1000];
1565        let entropy = calculate_entropy(&data);
1566        assert!(
1567            entropy < 0.01,
1568            "Entropy should be near 0.0 for uniform data"
1569        );
1570    }
1571
1572    #[test]
1573    fn test_calculate_entropy_binary() {
1574        // Two values - low entropy
1575        let mut data = vec![0u8; 500];
1576        data.extend(vec![1u8; 500]);
1577        let entropy = calculate_entropy(&data);
1578        assert!(
1579            entropy > 0.9 && entropy < 1.1,
1580            "Entropy should be ~1.0 for binary data"
1581        );
1582    }
1583
1584    #[test]
1585    fn test_calculate_entropy_random() {
1586        // All 256 values - high entropy
1587        let data: Vec<u8> = (0..=255).cycle().take(256 * 4).collect();
1588        let entropy = calculate_entropy(&data);
1589        assert!(
1590            entropy > 7.5,
1591            "Entropy should be high for all byte values: got {entropy}"
1592        );
1593    }
1594
1595    #[test]
1596    fn test_calculate_entropy_single_byte() {
1597        assert_eq!(calculate_entropy(&[42]), 0.0);
1598    }
1599
1600    #[test]
1601    fn test_calculate_entropy_two_different_bytes() {
1602        let data = vec![0, 255];
1603        let entropy = calculate_entropy(&data);
1604        assert!(entropy > 0.9 && entropy < 1.1, "Entropy should be ~1.0");
1605    }
1606
1607    #[test]
1608    fn test_pack_config_default() {
1609        let config = PackConfig::default();
1610
1611        assert_eq!(config.compression, "lz4");
1612        assert!(!config.transform.encrypt);
1613        assert_eq!(config.password, None);
1614        assert!(!config.transform.train_dict);
1615        assert_eq!(config.block_size, 65536);
1616        assert_eq!(config.min_chunk, None);
1617        assert_eq!(config.avg_chunk, None);
1618        assert_eq!(config.max_chunk, None);
1619    }
1620
1621    #[test]
1622    fn test_pack_config_clone() {
1623        let config1 = PackConfig {
1624            input: PathBuf::from("/dev/sda"),
1625            output: PathBuf::from("output.hxz"),
1626            compression: "zstd".to_string(),
1627            password: Some("secret".to_string()),
1628            transform: PackTransformFlags {
1629                encrypt: true,
1630                ..Default::default()
1631            },
1632            ..Default::default()
1633        };
1634
1635        let config2 = config1.clone();
1636
1637        assert_eq!(config2.input, config1.input);
1638        assert_eq!(config2.output, config1.output);
1639        assert_eq!(config2.compression, config1.compression);
1640        assert_eq!(config2.transform.encrypt, config1.transform.encrypt);
1641        assert_eq!(config2.password, config1.password);
1642    }
1643
1644    #[test]
1645    fn test_pack_config_debug() {
1646        let config = PackConfig::default();
1647        let debug_str = format!("{config:?}");
1648
1649        assert!(debug_str.contains("PackConfig"));
1650        assert!(debug_str.contains("lz4"));
1651    }
1652
1653    #[test]
1654    fn test_entropy_threshold_filtering() {
1655        // Test data with entropy below threshold (compressible)
1656        let low_entropy_data = vec![0u8; 1024];
1657        assert!(calculate_entropy(&low_entropy_data) < ENTROPY_THRESHOLD);
1658
1659        // Test data with entropy above threshold (random)
1660        let high_entropy_data: Vec<u8> = (0..1024).map(|i| ((i * 7) % 256) as u8).collect();
1661        let entropy = calculate_entropy(&high_entropy_data);
1662        // This might not always be above threshold depending on the pattern,
1663        // but we can still test that entropy calculation works
1664        assert!((0.0..=8.0).contains(&entropy));
1665    }
1666
1667    #[test]
1668    fn test_entropy_calculation_properties() {
1669        // Entropy should increase with more unique values
1670        let data1 = vec![0u8; 100];
1671        let data2 = [0u8, 1u8].repeat(50);
1672        let mut data3 = Vec::new();
1673        for i in 0..100 {
1674            data3.push((i % 10) as u8);
1675        }
1676
1677        let entropy1 = calculate_entropy(&data1);
1678        let entropy2 = calculate_entropy(&data2);
1679        let entropy3 = calculate_entropy(&data3);
1680
1681        assert!(
1682            entropy1 < entropy2,
1683            "More unique values should increase entropy"
1684        );
1685        assert!(
1686            entropy2 < entropy3,
1687            "Even more unique values should further increase entropy"
1688        );
1689    }
1690}