scirs2_io/compression/
mod.rs

1//! Compression utilities for scientific data
2//!
3//! This module provides functionality for compressing and decompressing data using
4//! various algorithms suitable for scientific computing. It focuses on lossless
5//! compression to ensure data integrity while reducing storage requirements.
6//!
7//! ## Features
8//!
9//! - Multiple compression algorithms (GZIP, ZSTD, LZ4, BZIP2)
10//! - Configurable compression levels
11//! - Memory-efficient compression of large datasets
12//! - Metadata preservation during compression
13//! - Array-specific compression optimizations
14//!
15//! ## Sub-modules
16//!
17//! - `ndarray`: Specialized compression utilities for ndarray types
18//!
19//! ## Examples
20//!
21//! ```rust,no_run
22//! use scirs2_io::compression::{compress_data, decompress_data, CompressionAlgorithm};
23//! use std::fs::File;
24//! use std::io::prelude::*;
25//!
26//! // Compress some data using ZSTD with default compression level
27//! let data = b"Large scientific dataset with repetitive patterns";
28//! let compressed = compress_data(data, CompressionAlgorithm::Zstd, None).unwrap();
29//!
30//! // Save the compressed data to a file
31//! let mut file = File::create("data.zst").unwrap();
32//! file.write_all(&compressed).unwrap();
33//!
34//! // Later, read and decompress the data
35//! let mut compressed_data = Vec::new();
36//! File::open("data.zst").unwrap().read_to_end(&mut compressed_data).unwrap();
37//! let original = decompress_data(&compressed_data, CompressionAlgorithm::Zstd).unwrap();
38//! assert_eq!(original, data);
39//! ```
40
41use std::fs::File;
42use std::io::{Read, Write};
43use std::path::Path;
44
45use bzip2::read::{BzDecoder, BzEncoder};
46use bzip2::Compression as Bzip2Compression;
47use flate2::read::{GzDecoder, GzEncoder};
48use flate2::Compression as GzipCompression;
49use lz4::{Decoder, EncoderBuilder};
50use zstd::{decode_all, encode_all};
51
52// Re-export ndarray submodule
53pub mod ndarray;
54
55use crate::error::{IoError, Result};
56
57/// Compression algorithms supported by the library
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum CompressionAlgorithm {
60    /// GZIP compression (good balance of speed and compression ratio)
61    Gzip,
62    /// Zstandard compression (excellent compression ratio, fast decompression)
63    Zstd,
64    /// LZ4 compression (extremely fast, moderate compression ratio)
65    Lz4,
66    /// BZIP2 compression (high compression ratio, slower speed)
67    Bzip2,
68}
69
70impl CompressionAlgorithm {
71    /// Get the file extension associated with this compression algorithm
72    pub fn extension(&self) -> &'static str {
73        match self {
74            CompressionAlgorithm::Gzip => "gz",
75            CompressionAlgorithm::Zstd => "zst",
76            CompressionAlgorithm::Lz4 => "lz4",
77            CompressionAlgorithm::Bzip2 => "bz2",
78        }
79    }
80
81    /// Try to determine the compression algorithm from a file extension
82    pub fn from_extension(ext: &str) -> Option<Self> {
83        match ext.to_lowercase().as_str() {
84            "gz" | "gzip" => Some(CompressionAlgorithm::Gzip),
85            "zst" | "zstd" => Some(CompressionAlgorithm::Zstd),
86            "lz4" => Some(CompressionAlgorithm::Lz4),
87            "bz2" | "bzip2" => Some(CompressionAlgorithm::Bzip2),
88            _ => None,
89        }
90    }
91}
92
93/// Convert a compression level (0-9) to the appropriate internal level for each algorithm
94fn normalize_compression_level(level: Option<u32>, algorithm: CompressionAlgorithm) -> Result<u32> {
95    let level = level.unwrap_or(6); // Default compression level
96
97    if level > 9 {
98        return Err(IoError::CompressionError(format!(
99            "Compression level must be between 0 and 9, got {}",
100            level
101        )));
102    }
103
104    // Each compression library has different ranges for compression levels
105    match algorithm {
106        CompressionAlgorithm::Gzip => Ok(level),
107        CompressionAlgorithm::Zstd => {
108            // ZSTD supports levels 1-22, map our 0-9 to 1-22
109            Ok(1 + (level * 21) / 9)
110        }
111        CompressionAlgorithm::Lz4 => Ok(level),
112        CompressionAlgorithm::Bzip2 => Ok(level),
113    }
114}
115
116/// Compress data using the specified algorithm and compression level
117///
118/// # Arguments
119///
120/// * `data` - The data to compress
121/// * `algorithm` - The compression algorithm to use
122/// * `level` - The compression level (0-9, where 0 is no compression, 9 is maximum compression)
123///
124/// # Returns
125///
126/// The compressed data as a `Vec<u8>`
127pub fn compress_data(
128    data: &[u8],
129    algorithm: CompressionAlgorithm,
130    level: Option<u32>,
131) -> Result<Vec<u8>> {
132    let normalized_level = normalize_compression_level(level, algorithm)?;
133
134    match algorithm {
135        CompressionAlgorithm::Gzip => {
136            let mut encoder = GzEncoder::new(data, GzipCompression::new(normalized_level as u32));
137            let mut compressed = Vec::new();
138            encoder
139                .read_to_end(&mut compressed)
140                .map_err(|e| IoError::CompressionError(e.to_string()))?;
141            Ok(compressed)
142        }
143        CompressionAlgorithm::Zstd => encode_all(data, normalized_level as i32)
144            .map_err(|e| IoError::CompressionError(e.to_string())),
145        CompressionAlgorithm::Lz4 => {
146            let mut encoder = EncoderBuilder::new()
147                .level(normalized_level as u32)
148                .build(Vec::new())
149                .map_err(|e| IoError::CompressionError(e.to_string()))?;
150
151            encoder
152                .write_all(data)
153                .map_err(|e| IoError::CompressionError(e.to_string()))?;
154
155            let (compressed, result) = encoder.finish();
156            result.map_err(|e| IoError::CompressionError(e.to_string()))?;
157
158            Ok(compressed)
159        }
160        CompressionAlgorithm::Bzip2 => {
161            let mut encoder = BzEncoder::new(data, Bzip2Compression::new(normalized_level as u32));
162            let mut compressed = Vec::new();
163            encoder
164                .read_to_end(&mut compressed)
165                .map_err(|e| IoError::CompressionError(e.to_string()))?;
166            Ok(compressed)
167        }
168    }
169}
170
171/// Decompress data using the specified algorithm
172///
173/// # Arguments
174///
175/// * `data` - The compressed data
176/// * `algorithm` - The compression algorithm used
177///
178/// # Returns
179///
180/// The decompressed data as a `Vec<u8>`
181pub fn decompress_data(data: &[u8], algorithm: CompressionAlgorithm) -> Result<Vec<u8>> {
182    match algorithm {
183        CompressionAlgorithm::Gzip => {
184            let mut decoder = GzDecoder::new(data);
185            let mut decompressed = Vec::new();
186            decoder
187                .read_to_end(&mut decompressed)
188                .map_err(|e| IoError::DecompressionError(e.to_string()))?;
189            Ok(decompressed)
190        }
191        CompressionAlgorithm::Zstd => {
192            decode_all(data).map_err(|e| IoError::DecompressionError(e.to_string()))
193        }
194        CompressionAlgorithm::Lz4 => {
195            let mut decoder =
196                Decoder::new(data).map_err(|e| IoError::DecompressionError(e.to_string()))?;
197            let mut decompressed = Vec::new();
198            decoder
199                .read_to_end(&mut decompressed)
200                .map_err(|e| IoError::DecompressionError(e.to_string()))?;
201            Ok(decompressed)
202        }
203        CompressionAlgorithm::Bzip2 => {
204            let mut decoder = BzDecoder::new(data);
205            let mut decompressed = Vec::new();
206            decoder
207                .read_to_end(&mut decompressed)
208                .map_err(|e| IoError::DecompressionError(e.to_string()))?;
209            Ok(decompressed)
210        }
211    }
212}
213
214/// Compress a file using the specified algorithm and save it to a new file
215///
216/// # Arguments
217///
218/// * `input_path` - Path to the file to compress
219/// * `output_path` - Path to save the compressed file (if None, appends algorithm extension to input_path)
220/// * `algorithm` - The compression algorithm to use
221/// * `level` - The compression level (0-9, where 0 is no compression, 9 is maximum compression)
222///
223/// # Returns
224///
225/// The path to the compressed file
226pub fn compress_file<P: AsRef<Path>>(
227    input_path: P,
228    output_path: Option<P>,
229    algorithm: CompressionAlgorithm,
230    level: Option<u32>,
231) -> Result<String> {
232    // Read input file
233    let mut input_data = Vec::new();
234    File::open(input_path.as_ref())
235        .map_err(|e| IoError::FileError(format!("Failed to open input file: {}", e)))?
236        .read_to_end(&mut input_data)
237        .map_err(|e| IoError::FileError(format!("Failed to read input file: {}", e)))?;
238
239    // Compress the data
240    let compressed_data = compress_data(&input_data, algorithm, level)?;
241
242    // Determine output path
243    let output_path_string = match output_path {
244        Some(path) => path.as_ref().to_string_lossy().to_string(),
245        None => {
246            // Generate output path by appending algorithm extension
247            let mut path_buf = input_path.as_ref().to_path_buf();
248            let ext = algorithm.extension();
249
250            // Get the file name as a string
251            let file_name = path_buf
252                .file_name()
253                .ok_or_else(|| IoError::FileError("Invalid input file path".to_string()))?
254                .to_string_lossy()
255                .to_string();
256
257            // Append the extension and update the file name
258            let new_file_name = format!("{}.{}", file_name, ext);
259            path_buf.set_file_name(new_file_name);
260
261            path_buf.to_string_lossy().to_string()
262        }
263    };
264
265    // Write the compressed data to the output file
266    File::create(&output_path_string)
267        .map_err(|e| IoError::FileError(format!("Failed to create output file: {}", e)))?
268        .write_all(&compressed_data)
269        .map_err(|e| IoError::FileError(format!("Failed to write to output file: {}", e)))?;
270
271    Ok(output_path_string)
272}
273
274/// Decompress a file using the specified algorithm and save it to a new file
275///
276/// # Arguments
277///
278/// * `input_path` - Path to the compressed file
279/// * `output_path` - Path to save the decompressed file (if None, removes algorithm extension from input_path)
280/// * `algorithm` - The compression algorithm to use (if None, tries to determine from file extension)
281///
282/// # Returns
283///
284/// The path to the decompressed file
285pub fn decompress_file<P: AsRef<Path>>(
286    input_path: P,
287    output_path: Option<P>,
288    algorithm: Option<CompressionAlgorithm>,
289) -> Result<String> {
290    // Determine the compression algorithm
291    let algorithm = match algorithm {
292        Some(algo) => algo,
293        None => {
294            // Try to determine from the file extension
295            let ext = input_path
296                .as_ref()
297                .extension()
298                .ok_or_else(|| {
299                    IoError::DecompressionError("Unable to determine file extension".to_string())
300                })?
301                .to_string_lossy()
302                .to_string();
303
304            CompressionAlgorithm::from_extension(&ext)
305                .ok_or(IoError::UnsupportedCompressionAlgorithm(ext))?
306        }
307    };
308
309    // Read input file
310    let mut input_data = Vec::new();
311    File::open(input_path.as_ref())
312        .map_err(|e| IoError::FileError(format!("Failed to open input file: {}", e)))?
313        .read_to_end(&mut input_data)
314        .map_err(|e| IoError::FileError(format!("Failed to read input file: {}", e)))?;
315
316    // Decompress the data
317    let decompressed_data = decompress_data(&input_data, algorithm)?;
318
319    // Determine output path
320    let output_path_string = match output_path {
321        Some(path) => path.as_ref().to_string_lossy().to_string(),
322        None => {
323            // Generate output path by removing algorithm extension
324            let path_str = input_path.as_ref().to_string_lossy().to_string();
325            let ext = algorithm.extension();
326
327            if path_str.ends_with(&format!(".{}", ext)) {
328                // Remove the extension
329                path_str[0..path_str.len() - ext.len() - 1].to_string()
330            } else {
331                // If the extension doesn't match, add a ".decompressed" suffix
332                format!("{}.decompressed", path_str)
333            }
334        }
335    };
336
337    // Write the decompressed data to the output file
338    File::create(&output_path_string)
339        .map_err(|e| IoError::FileError(format!("Failed to create output file: {}", e)))?
340        .write_all(&decompressed_data)
341        .map_err(|e| IoError::FileError(format!("Failed to write to output file: {}", e)))?;
342
343    Ok(output_path_string)
344}
345
346/// Calculate the compression ratio for the given data and algorithm
347///
348/// # Arguments
349///
350/// * `data` - The original data
351/// * `algorithm` - The compression algorithm to use
352/// * `level` - The compression level (0-9, optional)
353///
354/// # Returns
355///
356/// The compression ratio (original size / compressed size)
357pub fn compression_ratio(
358    data: &[u8],
359    algorithm: CompressionAlgorithm,
360    level: Option<u32>,
361) -> Result<f64> {
362    let compressed = compress_data(data, algorithm, level)?;
363    let original_size = data.len() as f64;
364    let compressed_size = compressed.len() as f64;
365
366    // Avoid division by zero
367    if compressed_size == 0.0 {
368        return Err(IoError::CompressionError(
369            "Compressed data has zero size".to_string(),
370        ));
371    }
372
373    Ok(original_size / compressed_size)
374}
375
376/// Information about a compression algorithm
377pub struct CompressionInfo {
378    /// Name of the compression algorithm
379    pub name: String,
380    /// Brief description of the algorithm
381    pub description: String,
382    /// Typical compression ratio for scientific data (higher is better)
383    pub typical_compression_ratio: f64,
384    /// Relative compression speed (1-10, higher is faster)
385    pub compression_speed: u8,
386    /// Relative decompression speed (1-10, higher is faster)
387    pub decompression_speed: u8,
388    /// File extension associated with this compression
389    pub file_extension: String,
390}
391
392/// Get information about a specific compression algorithm
393pub fn algorithm_info(algorithm: CompressionAlgorithm) -> CompressionInfo {
394    match algorithm {
395        CompressionAlgorithm::Gzip => CompressionInfo {
396            name: "GZIP".to_string(),
397            description: "General-purpose compression algorithm with good balance of speed and compression ratio".to_string(),
398            typical_compression_ratio: 2.5,
399            compression_speed: 6,
400            decompression_speed: 7,
401            file_extension: "gz".to_string(),
402        },
403        CompressionAlgorithm::Zstd => CompressionInfo {
404            name: "Zstandard".to_string(),
405            description: "Modern compression algorithm with excellent compression ratio and fast decompression".to_string(),
406            typical_compression_ratio: 3.2,
407            compression_speed: 7,
408            decompression_speed: 9,
409            file_extension: "zst".to_string(),
410        },
411        CompressionAlgorithm::Lz4 => CompressionInfo {
412            name: "LZ4".to_string(),
413            description: "Extremely fast compression algorithm with moderate compression ratio".to_string(),
414            typical_compression_ratio: 1.8,
415            compression_speed: 10,
416            decompression_speed: 10,
417            file_extension: "lz4".to_string(),
418        },
419        CompressionAlgorithm::Bzip2 => CompressionInfo {
420            name: "BZIP2".to_string(),
421            description: "High compression ratio but slower speed, good for archival storage".to_string(),
422            typical_compression_ratio: 3.5,
423            compression_speed: 3,
424            decompression_speed: 4,
425            file_extension: "bz2".to_string(),
426        },
427    }
428}
429
430/// Magic bytes for different compression formats
431const GZIP_MAGIC: &[u8] = &[0x1f, 0x8b];
432const ZSTD_MAGIC: &[u8] = &[0x28, 0xb5, 0x2f, 0xfd];
433const LZ4_MAGIC: &[u8] = &[0x04, 0x22, 0x4d, 0x18];
434const BZIP2_MAGIC: &[u8] = &[0x42, 0x5a, 0x68];
435
436/// Detect compression algorithm from magic bytes
437pub fn detect_compression_from_bytes(data: &[u8]) -> Option<CompressionAlgorithm> {
438    if data.starts_with(GZIP_MAGIC) {
439        Some(CompressionAlgorithm::Gzip)
440    } else if data.starts_with(ZSTD_MAGIC) {
441        Some(CompressionAlgorithm::Zstd)
442    } else if data.starts_with(LZ4_MAGIC) {
443        Some(CompressionAlgorithm::Lz4)
444    } else if data.starts_with(BZIP2_MAGIC) {
445        Some(CompressionAlgorithm::Bzip2)
446    } else {
447        None
448    }
449}
450
451/// Transparent file handler that automatically handles compression/decompression
452pub struct TransparentFileHandler {
453    /// Automatically detect compression from file extension
454    pub auto_detect_extension: bool,
455    /// Automatically detect compression from file content
456    pub auto_detect_content: bool,
457    /// Default compression algorithm when creating new files
458    pub default_algorithm: CompressionAlgorithm,
459    /// Default compression level
460    pub default_level: Option<u32>,
461}
462
463impl Default for TransparentFileHandler {
464    fn default() -> Self {
465        Self {
466            auto_detect_extension: true,
467            auto_detect_content: true,
468            default_algorithm: CompressionAlgorithm::Zstd,
469            default_level: Some(6),
470        }
471    }
472}
473
474impl TransparentFileHandler {
475    /// Create a new transparent file handler with custom settings
476    pub fn new(
477        auto_detect_extension: bool,
478        auto_detect_content: bool,
479        default_algorithm: CompressionAlgorithm,
480        default_level: Option<u32>,
481    ) -> Self {
482        Self {
483            auto_detect_extension,
484            auto_detect_content,
485            default_algorithm,
486            default_level,
487        }
488    }
489
490    /// Read a file with automatic decompression
491    pub fn read_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<u8>> {
492        let mut file_data = Vec::new();
493        File::open(path.as_ref())
494            .map_err(|e| IoError::FileError(format!("Failed to open file: {}", e)))?
495            .read_to_end(&mut file_data)
496            .map_err(|e| IoError::FileError(format!("Failed to read file: {}", e)))?;
497
498        // Try to detect compression
499        let mut algorithm = None;
500
501        // Check file extension first if enabled
502        if self.auto_detect_extension {
503            if let Some(ext) = path.as_ref().extension() {
504                algorithm = CompressionAlgorithm::from_extension(&ext.to_string_lossy());
505            }
506        }
507
508        // Check content magic bytes if enabled and extension detection failed
509        if algorithm.is_none() && self.auto_detect_content {
510            algorithm = detect_compression_from_bytes(&file_data);
511        }
512
513        // Decompress if compression was detected
514        match algorithm {
515            Some(algo) => decompress_data(&file_data, algo),
516            None => Ok(file_data), // Return as-is if no compression detected
517        }
518    }
519
520    /// Write a file with automatic compression
521    pub fn write_file<P: AsRef<Path>>(&self, path: P, data: &[u8]) -> Result<()> {
522        let mut algorithm = None;
523        let level = self.default_level;
524
525        // Check if we should compress based on file extension
526        if self.auto_detect_extension {
527            if let Some(ext) = path.as_ref().extension() {
528                algorithm = CompressionAlgorithm::from_extension(&ext.to_string_lossy());
529            }
530        }
531
532        // Use default algorithm if no compression detected but we want to compress
533        if algorithm.is_none() && self.should_compress_by_default(&path) {
534            algorithm = Some(self.default_algorithm);
535        }
536
537        // Compress data if needed
538        let output_data = match algorithm {
539            Some(algo) => compress_data(data, algo, level)?,
540            None => data.to_vec(),
541        };
542
543        // Write to file
544        File::create(path.as_ref())
545            .map_err(|e| IoError::FileError(format!("Failed to create file: {}", e)))?
546            .write_all(&output_data)
547            .map_err(|e| IoError::FileError(format!("Failed to write file: {}", e)))?;
548
549        Ok(())
550    }
551
552    /// Determine if file should be compressed by default based on path
553    fn should_compress_by_default<P: AsRef<Path>>(&self, path: P) -> bool {
554        // Don't compress if file already has a compression extension
555        if let Some(ext) = path.as_ref().extension() {
556            let ext_str = ext.to_string_lossy().to_lowercase();
557            matches!(
558                ext_str.as_str(),
559                "gz" | "gzip" | "zst" | "zstd" | "lz4" | "bz2" | "bzip2"
560            )
561        } else {
562            false
563        }
564    }
565
566    /// Copy a file with transparent compression/decompression
567    pub fn copy_file<P: AsRef<Path>, Q: AsRef<Path>>(
568        &self,
569        source: P,
570        destination: Q,
571    ) -> Result<()> {
572        let data = self.read_file(source)?;
573        self.write_file(destination, &data)?;
574        Ok(())
575    }
576
577    /// Get file info including compression details
578    pub fn file_info<P: AsRef<Path>>(&self, path: P) -> Result<FileCompressionInfo> {
579        let mut file_data = Vec::new();
580        File::open(path.as_ref())
581            .map_err(|e| IoError::FileError(format!("Failed to open file: {}", e)))?
582            .read_to_end(&mut file_data)
583            .map_err(|e| IoError::FileError(format!("Failed to read file: {}", e)))?;
584
585        let original_size = file_data.len();
586
587        // Detect compression
588        let detected_algorithm = if self.auto_detect_content {
589            detect_compression_from_bytes(&file_data)
590        } else {
591            None
592        };
593
594        let extension_algorithm = if self.auto_detect_extension {
595            path.as_ref()
596                .extension()
597                .and_then(|ext| CompressionAlgorithm::from_extension(&ext.to_string_lossy()))
598        } else {
599            None
600        };
601
602        let is_compressed = detected_algorithm.is_some() || extension_algorithm.is_some();
603        let algorithm = detected_algorithm.or(extension_algorithm);
604
605        let uncompressed_size = if let Some(algo) = algorithm {
606            match decompress_data(&file_data, algo) {
607                Ok(decompressed) => Some(decompressed.len()),
608                Err(_) => None,
609            }
610        } else {
611            Some(original_size)
612        };
613
614        Ok(FileCompressionInfo {
615            path: path.as_ref().to_path_buf(),
616            is_compressed,
617            algorithm,
618            compressed_size: if is_compressed {
619                Some(original_size)
620            } else {
621                None
622            },
623            uncompressed_size,
624            compression_ratio: if let (Some(compressed), Some(uncompressed)) = (
625                if is_compressed {
626                    Some(original_size)
627                } else {
628                    None
629                },
630                uncompressed_size,
631            ) {
632                Some(uncompressed as f64 / compressed as f64)
633            } else {
634                None
635            },
636        })
637    }
638}
639
640/// Information about a file's compression status
641#[derive(Debug, Clone)]
642pub struct FileCompressionInfo {
643    /// Path to the file
644    pub path: std::path::PathBuf,
645    /// Whether the file is compressed
646    pub is_compressed: bool,
647    /// Detected compression algorithm
648    pub algorithm: Option<CompressionAlgorithm>,
649    /// Size of compressed data (if compressed)
650    pub compressed_size: Option<usize>,
651    /// Size of uncompressed data
652    pub uncompressed_size: Option<usize>,
653    /// Compression ratio (uncompressed / compressed)
654    pub compression_ratio: Option<f64>,
655}
656
657/// Global transparent file handler instance
658static GLOBAL_HANDLER: std::sync::OnceLock<TransparentFileHandler> = std::sync::OnceLock::new();
659
660/// Initialize the global transparent file handler
661pub fn init_global_handler(handler: TransparentFileHandler) {
662    let _ = GLOBAL_HANDLER.set(handler);
663}
664
665/// Get a reference to the global transparent file handler
666pub fn global_handler() -> &'static TransparentFileHandler {
667    GLOBAL_HANDLER.get_or_init(TransparentFileHandler::default)
668}
669
670/// Convenient function to read a file with automatic decompression using global handler
671pub fn read_file_transparent<P: AsRef<Path>>(path: P) -> Result<Vec<u8>> {
672    global_handler().read_file(path)
673}
674
675/// Convenient function to write a file with automatic compression using global handler
676pub fn write_file_transparent<P: AsRef<Path>>(path: P, data: &[u8]) -> Result<()> {
677    global_handler().write_file(path, data)
678}
679
680/// Convenient function to copy a file with transparent compression/decompression using global handler
681pub fn copy_file_transparent<P: AsRef<Path>, Q: AsRef<Path>>(
682    source: P,
683    destination: Q,
684) -> Result<()> {
685    global_handler().copy_file(source, destination)
686}
687
688/// Convenient function to get file compression info using global handler
689pub fn file_info_transparent<P: AsRef<Path>>(path: P) -> Result<FileCompressionInfo> {
690    global_handler().file_info(path)
691}
692
693//
694// Parallel Compression/Decompression
695//
696
697use scirs2_core::parallel_ops::*;
698use std::sync::atomic::{AtomicUsize, Ordering};
699use std::sync::Arc;
700use std::time::Instant;
701
702/// Configuration for parallel compression/decompression operations
703#[derive(Debug, Clone)]
704pub struct ParallelCompressionConfig {
705    /// Number of threads to use (0 means use all available cores)
706    pub num_threads: usize,
707    /// Size of each chunk in bytes for parallel processing
708    pub chunk_size: usize,
709    /// Buffer size for I/O operations
710    pub buffer_size: usize,
711    /// Whether to enable memory mapping for large files
712    pub enable_memory_mapping: bool,
713}
714
715impl Default for ParallelCompressionConfig {
716    fn default() -> Self {
717        Self {
718            num_threads: 0,          // Use all available cores
719            chunk_size: 1024 * 1024, // 1 MB chunks
720            buffer_size: 64 * 1024,  // 64 KB buffer
721            enable_memory_mapping: true,
722        }
723    }
724}
725
726/// Statistics for parallel compression/decompression operations
727#[derive(Debug, Clone)]
728pub struct ParallelCompressionStats {
729    /// Total number of chunks processed
730    pub chunks_processed: usize,
731    /// Total bytes processed (uncompressed)
732    pub bytes_processed: usize,
733    /// Total bytes output (compressed/decompressed)
734    pub bytes_output: usize,
735    /// Time taken for the operation in milliseconds
736    pub operation_time_ms: f64,
737    /// Throughput in bytes per second
738    pub throughput_bps: f64,
739    /// Compression ratio (input_size / output_size)
740    pub compression_ratio: f64,
741    /// Number of threads used
742    pub threads_used: usize,
743}
744
745/// Compress data in parallel using multiple threads
746pub fn compress_data_parallel(
747    data: &[u8],
748    algorithm: CompressionAlgorithm,
749    level: Option<u32>,
750    config: ParallelCompressionConfig,
751) -> Result<(Vec<u8>, ParallelCompressionStats)> {
752    let start_time = Instant::now();
753    let input_size = data.len();
754
755    // Configure thread pool
756    let num_threads = if config.num_threads == 0 {
757        num_threads()
758    } else {
759        config.num_threads
760    };
761
762    // For very small data, use sequential compression
763    if input_size <= config.chunk_size {
764        let compressed = compress_data(data, algorithm, level)?;
765        let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
766
767        let stats = ParallelCompressionStats {
768            chunks_processed: 1,
769            bytes_processed: input_size,
770            bytes_output: compressed.len(),
771            operation_time_ms: operation_time,
772            throughput_bps: input_size as f64 / (operation_time / 1000.0),
773            compression_ratio: input_size as f64 / compressed.len() as f64,
774            threads_used: 1,
775        };
776
777        return Ok((compressed, stats));
778    }
779
780    // Split data into chunks
781    let chunk_size = config.chunk_size;
782    let chunks: Vec<&[u8]> = data.chunks(chunk_size).collect();
783    let chunk_count = chunks.len();
784
785    // Process chunks in parallel
786    let processed_count = Arc::new(AtomicUsize::new(0));
787    let compressed_chunks: Result<Vec<Vec<u8>>> = chunks
788        .into_par_iter()
789        .map(|chunk| {
790            let result = compress_data(chunk, algorithm, level);
791            processed_count.fetch_add(1, Ordering::Relaxed);
792            result
793        })
794        .collect();
795
796    let compressed_chunks = compressed_chunks?;
797
798    // Calculate total size and concatenate results
799    let total_compressed_size: usize = compressed_chunks.iter().map(|chunk| chunk.len()).sum();
800    let mut result = Vec::with_capacity(total_compressed_size + (chunk_count * 8)); // Extra space for chunk headers
801
802    // Write chunk headers (size information for decompression)
803    result.extend_from_slice(&(chunk_count as u64).to_le_bytes());
804    for chunk in &compressed_chunks {
805        result.extend_from_slice(&(chunk.len() as u64).to_le_bytes());
806    }
807
808    // Write compressed chunks
809    for chunk in compressed_chunks {
810        result.extend_from_slice(&chunk);
811    }
812
813    let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
814
815    let stats = ParallelCompressionStats {
816        chunks_processed: chunk_count,
817        bytes_processed: input_size,
818        bytes_output: result.len(),
819        operation_time_ms: operation_time,
820        throughput_bps: input_size as f64 / (operation_time / 1000.0),
821        compression_ratio: input_size as f64 / result.len() as f64,
822        threads_used: num_threads,
823    };
824
825    Ok((result, stats))
826}
827
828/// Decompress data in parallel using multiple threads
829pub fn decompress_data_parallel(
830    data: &[u8],
831    algorithm: CompressionAlgorithm,
832    config: ParallelCompressionConfig,
833) -> Result<(Vec<u8>, ParallelCompressionStats)> {
834    let start_time = Instant::now();
835    let input_size = data.len();
836
837    // Configure thread pool
838    let num_threads = if config.num_threads == 0 {
839        num_threads()
840    } else {
841        config.num_threads
842    };
843
844    // Check if this is parallel-compressed data by looking for chunk headers
845    if data.len() < 8 {
846        // Too small to be parallel-compressed data, use sequential decompression
847        let decompressed = decompress_data(data, algorithm)?;
848        let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
849
850        let stats = ParallelCompressionStats {
851            chunks_processed: 1,
852            bytes_processed: input_size,
853            bytes_output: decompressed.len(),
854            operation_time_ms: operation_time,
855            throughput_bps: decompressed.len() as f64 / (operation_time / 1000.0),
856            compression_ratio: decompressed.len() as f64 / input_size as f64,
857            threads_used: 1,
858        };
859
860        return Ok((decompressed, stats));
861    }
862
863    // Read chunk count
864    let chunk_count = u64::from_le_bytes(
865        data[0..8]
866            .try_into()
867            .map_err(|_| IoError::DecompressionError("Invalid chunk header".to_string()))?,
868    ) as usize;
869
870    if chunk_count == 0 || chunk_count > data.len() / 8 {
871        // Not parallel-compressed data or invalid, use sequential decompression
872        let decompressed = decompress_data(data, algorithm)?;
873        let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
874
875        let stats = ParallelCompressionStats {
876            chunks_processed: 1,
877            bytes_processed: input_size,
878            bytes_output: decompressed.len(),
879            operation_time_ms: operation_time,
880            throughput_bps: decompressed.len() as f64 / (operation_time / 1000.0),
881            compression_ratio: decompressed.len() as f64 / input_size as f64,
882            threads_used: 1,
883        };
884
885        return Ok((decompressed, stats));
886    }
887
888    // Read chunk sizes
889    let header_size = 8 + (chunk_count * 8);
890    if data.len() < header_size {
891        return Err(IoError::DecompressionError(
892            "Truncated chunk headers".to_string(),
893        ));
894    }
895
896    let mut chunk_sizes = Vec::with_capacity(chunk_count);
897    for i in 0..chunk_count {
898        let start_idx = 8 + (i * 8);
899        let size = u64::from_le_bytes(
900            data[start_idx..start_idx + 8]
901                .try_into()
902                .map_err(|_| IoError::DecompressionError("Invalid chunk size".to_string()))?,
903        ) as usize;
904        chunk_sizes.push(size);
905    }
906
907    // Extract compressed chunks
908    let mut chunks = Vec::with_capacity(chunk_count);
909    let mut offset = header_size;
910
911    for &size in &chunk_sizes {
912        if offset + size > data.len() {
913            return Err(IoError::DecompressionError(
914                "Truncated chunk data".to_string(),
915            ));
916        }
917        chunks.push(&data[offset..offset + size]);
918        offset += size;
919    }
920
921    // Decompress chunks in parallel
922    let processed_count = Arc::new(AtomicUsize::new(0));
923    let decompressed_chunks: Result<Vec<Vec<u8>>> = chunks
924        .into_par_iter()
925        .map(|chunk| {
926            let result = decompress_data(chunk, algorithm);
927            processed_count.fetch_add(1, Ordering::Relaxed);
928            result
929        })
930        .collect();
931
932    let decompressed_chunks = decompressed_chunks?;
933
934    // Concatenate results
935    let total_size: usize = decompressed_chunks.iter().map(|chunk| chunk.len()).sum();
936    let mut result = Vec::with_capacity(total_size);
937
938    for chunk in decompressed_chunks {
939        result.extend_from_slice(&chunk);
940    }
941
942    let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
943
944    let stats = ParallelCompressionStats {
945        chunks_processed: chunk_count,
946        bytes_processed: input_size,
947        bytes_output: result.len(),
948        operation_time_ms: operation_time,
949        throughput_bps: result.len() as f64 / (operation_time / 1000.0),
950        compression_ratio: result.len() as f64 / input_size as f64,
951        threads_used: num_threads,
952    };
953
954    Ok((result, stats))
955}
956
957/// Compress a file in parallel and save it to a new file
958pub fn compress_file_parallel<P: AsRef<Path>>(
959    input_path: P,
960    output_path: Option<P>,
961    algorithm: CompressionAlgorithm,
962    level: Option<u32>,
963    config: ParallelCompressionConfig,
964) -> Result<(String, ParallelCompressionStats)> {
965    // Read input file
966    let mut input_data = Vec::new();
967    File::open(input_path.as_ref())
968        .map_err(|e| IoError::FileError(format!("Failed to open input file: {}", e)))?
969        .read_to_end(&mut input_data)
970        .map_err(|e| IoError::FileError(format!("Failed to read input file: {}", e)))?;
971
972    // Compress the data in parallel
973    let (compressed_data, stats) = compress_data_parallel(&input_data, algorithm, level, config)?;
974
975    // Determine output path
976    let output_path_string = match output_path {
977        Some(path) => path.as_ref().to_string_lossy().to_string(),
978        None => {
979            let mut path_buf = input_path.as_ref().to_path_buf();
980            let ext = algorithm.extension();
981            let file_name = path_buf
982                .file_name()
983                .ok_or_else(|| IoError::FileError("Invalid input file path".to_string()))?
984                .to_string_lossy()
985                .to_string();
986            let new_file_name = format!("{}.{}", file_name, ext);
987            path_buf.set_file_name(new_file_name);
988            path_buf.to_string_lossy().to_string()
989        }
990    };
991
992    // Write the compressed data to the output file
993    File::create(&output_path_string)
994        .map_err(|e| IoError::FileError(format!("Failed to create output file: {}", e)))?
995        .write_all(&compressed_data)
996        .map_err(|e| IoError::FileError(format!("Failed to write to output file: {}", e)))?;
997
998    Ok((output_path_string, stats))
999}
1000
1001/// Decompress a file in parallel and save it to a new file
1002pub fn decompress_file_parallel<P: AsRef<Path>>(
1003    input_path: P,
1004    output_path: Option<P>,
1005    algorithm: Option<CompressionAlgorithm>,
1006    config: ParallelCompressionConfig,
1007) -> Result<(String, ParallelCompressionStats)> {
1008    // Determine the compression algorithm
1009    let algorithm = match algorithm {
1010        Some(algo) => algo,
1011        None => {
1012            let ext = input_path
1013                .as_ref()
1014                .extension()
1015                .ok_or_else(|| {
1016                    IoError::DecompressionError("Unable to determine file extension".to_string())
1017                })?
1018                .to_string_lossy()
1019                .to_string();
1020
1021            CompressionAlgorithm::from_extension(&ext)
1022                .ok_or(IoError::UnsupportedCompressionAlgorithm(ext))?
1023        }
1024    };
1025
1026    // Read input file
1027    let mut input_data = Vec::new();
1028    File::open(input_path.as_ref())
1029        .map_err(|e| IoError::FileError(format!("Failed to open input file: {}", e)))?
1030        .read_to_end(&mut input_data)
1031        .map_err(|e| IoError::FileError(format!("Failed to read input file: {}", e)))?;
1032
1033    // Decompress the data in parallel
1034    let (decompressed_data, stats) = decompress_data_parallel(&input_data, algorithm, config)?;
1035
1036    // Determine output path
1037    let output_path_string = match output_path {
1038        Some(path) => path.as_ref().to_string_lossy().to_string(),
1039        None => {
1040            let path_str = input_path.as_ref().to_string_lossy().to_string();
1041            let ext = algorithm.extension();
1042
1043            if path_str.ends_with(&format!(".{}", ext)) {
1044                path_str[0..path_str.len() - ext.len() - 1].to_string()
1045            } else {
1046                format!("{}.decompressed", path_str)
1047            }
1048        }
1049    };
1050
1051    // Write the decompressed data to the output file
1052    File::create(&output_path_string)
1053        .map_err(|e| IoError::FileError(format!("Failed to create output file: {}", e)))?
1054        .write_all(&decompressed_data)
1055        .map_err(|e| IoError::FileError(format!("Failed to write to output file: {}", e)))?;
1056
1057    Ok((output_path_string, stats))
1058}
1059
1060/// Benchmark compression performance for different algorithms and configurations
1061pub fn benchmark_compression_algorithms(
1062    data: &[u8],
1063    algorithms: &[CompressionAlgorithm],
1064    levels: &[u32],
1065    parallel_configs: &[ParallelCompressionConfig],
1066) -> Result<Vec<CompressionBenchmarkResult>> {
1067    let mut results = Vec::new();
1068
1069    for &algorithm in algorithms {
1070        for &level in levels {
1071            // Sequential compression
1072            let start_time = Instant::now();
1073            let compressed = compress_data(data, algorithm, Some(level))?;
1074            let sequential_time = start_time.elapsed().as_secs_f64() * 1000.0;
1075
1076            let decompressed = decompress_data(&compressed, algorithm)?;
1077            let sequential_decomp_time =
1078                start_time.elapsed().as_secs_f64() * 1000.0 - sequential_time;
1079
1080            assert_eq!(data, &decompressed, "Round-trip failed for {:?}", algorithm);
1081
1082            // Parallel compression for each config
1083            for config in parallel_configs {
1084                let (par_compressed, par_comp_stats) =
1085                    compress_data_parallel(data, algorithm, Some(level), config.clone())?;
1086                let (par_decompressed, par_decomp_stats) =
1087                    decompress_data_parallel(&par_compressed, algorithm, config.clone())?;
1088
1089                assert_eq!(
1090                    data, &par_decompressed,
1091                    "Parallel round-trip failed for {:?}",
1092                    algorithm
1093                );
1094
1095                results.push(CompressionBenchmarkResult {
1096                    algorithm,
1097                    level,
1098                    config: config.clone(),
1099                    input_size: data.len(),
1100                    compressed_size: compressed.len(),
1101                    parallel_compressed_size: par_compressed.len(),
1102                    sequential_compression_time_ms: sequential_time,
1103                    sequential_decompression_time_ms: sequential_decomp_time,
1104                    parallel_compression_stats: par_comp_stats,
1105                    parallel_decompression_stats: par_decomp_stats,
1106                    compression_ratio: data.len() as f64 / compressed.len() as f64,
1107                    parallel_compression_ratio: data.len() as f64 / par_compressed.len() as f64,
1108                });
1109            }
1110        }
1111    }
1112
1113    Ok(results)
1114}
1115
1116/// Results from compression benchmarking
1117#[derive(Debug, Clone)]
1118pub struct CompressionBenchmarkResult {
1119    /// The compression algorithm tested
1120    pub algorithm: CompressionAlgorithm,
1121    /// The compression level used
1122    pub level: u32,
1123    /// The parallel configuration used
1124    pub config: ParallelCompressionConfig,
1125    /// Size of input data
1126    pub input_size: usize,
1127    /// Size of sequentially compressed data
1128    pub compressed_size: usize,
1129    /// Size of parallel compressed data
1130    pub parallel_compressed_size: usize,
1131    /// Time for sequential compression
1132    pub sequential_compression_time_ms: f64,
1133    /// Time for sequential decompression  
1134    pub sequential_decompression_time_ms: f64,
1135    /// Statistics from parallel compression
1136    pub parallel_compression_stats: ParallelCompressionStats,
1137    /// Statistics from parallel decompression
1138    pub parallel_decompression_stats: ParallelCompressionStats,
1139    /// Compression ratio for sequential compression
1140    pub compression_ratio: f64,
1141    /// Compression ratio for parallel compression
1142    pub parallel_compression_ratio: f64,
1143}
1144
1145impl CompressionBenchmarkResult {
1146    /// Calculate the speedup factor for parallel compression vs sequential
1147    pub fn compression_speedup(&self) -> f64 {
1148        self.sequential_compression_time_ms / self.parallel_compression_stats.operation_time_ms
1149    }
1150
1151    /// Calculate the speedup factor for parallel decompression vs sequential
1152    pub fn decompression_speedup(&self) -> f64 {
1153        self.sequential_decompression_time_ms / self.parallel_decompression_stats.operation_time_ms
1154    }
1155
1156    /// Calculate the overhead factor for parallel compression (how much larger the parallel-compressed data is)
1157    pub fn compression_overhead(&self) -> f64 {
1158        self.parallel_compressed_size as f64 / self.compressed_size as f64
1159    }
1160}