1use std::fs::File;
42use std::io::{Read, Write};
43use std::path::Path;
44
45use bzip2::read::{BzDecoder, BzEncoder};
46use bzip2::Compression as Bzip2Compression;
47use flate2::read::{GzDecoder, GzEncoder};
48use flate2::Compression as GzipCompression;
49use lz4::{Decoder, EncoderBuilder};
50use zstd::{decode_all, encode_all};
51
52pub mod ndarray;
54
55use crate::error::{IoError, Result};
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum CompressionAlgorithm {
60 Gzip,
62 Zstd,
64 Lz4,
66 Bzip2,
68}
69
70impl CompressionAlgorithm {
71 pub fn extension(&self) -> &'static str {
73 match self {
74 CompressionAlgorithm::Gzip => "gz",
75 CompressionAlgorithm::Zstd => "zst",
76 CompressionAlgorithm::Lz4 => "lz4",
77 CompressionAlgorithm::Bzip2 => "bz2",
78 }
79 }
80
81 pub fn from_extension(ext: &str) -> Option<Self> {
83 match ext.to_lowercase().as_str() {
84 "gz" | "gzip" => Some(CompressionAlgorithm::Gzip),
85 "zst" | "zstd" => Some(CompressionAlgorithm::Zstd),
86 "lz4" => Some(CompressionAlgorithm::Lz4),
87 "bz2" | "bzip2" => Some(CompressionAlgorithm::Bzip2),
88 _ => None,
89 }
90 }
91}
92
93fn normalize_compression_level(level: Option<u32>, algorithm: CompressionAlgorithm) -> Result<u32> {
95 let level = level.unwrap_or(6); if level > 9 {
98 return Err(IoError::CompressionError(format!(
99 "Compression level must be between 0 and 9, got {}",
100 level
101 )));
102 }
103
104 match algorithm {
106 CompressionAlgorithm::Gzip => Ok(level),
107 CompressionAlgorithm::Zstd => {
108 Ok(1 + (level * 21) / 9)
110 }
111 CompressionAlgorithm::Lz4 => Ok(level),
112 CompressionAlgorithm::Bzip2 => Ok(level),
113 }
114}
115
116pub fn compress_data(
128 data: &[u8],
129 algorithm: CompressionAlgorithm,
130 level: Option<u32>,
131) -> Result<Vec<u8>> {
132 let normalized_level = normalize_compression_level(level, algorithm)?;
133
134 match algorithm {
135 CompressionAlgorithm::Gzip => {
136 let mut encoder = GzEncoder::new(data, GzipCompression::new(normalized_level as u32));
137 let mut compressed = Vec::new();
138 encoder
139 .read_to_end(&mut compressed)
140 .map_err(|e| IoError::CompressionError(e.to_string()))?;
141 Ok(compressed)
142 }
143 CompressionAlgorithm::Zstd => encode_all(data, normalized_level as i32)
144 .map_err(|e| IoError::CompressionError(e.to_string())),
145 CompressionAlgorithm::Lz4 => {
146 let mut encoder = EncoderBuilder::new()
147 .level(normalized_level as u32)
148 .build(Vec::new())
149 .map_err(|e| IoError::CompressionError(e.to_string()))?;
150
151 encoder
152 .write_all(data)
153 .map_err(|e| IoError::CompressionError(e.to_string()))?;
154
155 let (compressed, result) = encoder.finish();
156 result.map_err(|e| IoError::CompressionError(e.to_string()))?;
157
158 Ok(compressed)
159 }
160 CompressionAlgorithm::Bzip2 => {
161 let mut encoder = BzEncoder::new(data, Bzip2Compression::new(normalized_level as u32));
162 let mut compressed = Vec::new();
163 encoder
164 .read_to_end(&mut compressed)
165 .map_err(|e| IoError::CompressionError(e.to_string()))?;
166 Ok(compressed)
167 }
168 }
169}
170
171pub fn decompress_data(data: &[u8], algorithm: CompressionAlgorithm) -> Result<Vec<u8>> {
182 match algorithm {
183 CompressionAlgorithm::Gzip => {
184 let mut decoder = GzDecoder::new(data);
185 let mut decompressed = Vec::new();
186 decoder
187 .read_to_end(&mut decompressed)
188 .map_err(|e| IoError::DecompressionError(e.to_string()))?;
189 Ok(decompressed)
190 }
191 CompressionAlgorithm::Zstd => {
192 decode_all(data).map_err(|e| IoError::DecompressionError(e.to_string()))
193 }
194 CompressionAlgorithm::Lz4 => {
195 let mut decoder =
196 Decoder::new(data).map_err(|e| IoError::DecompressionError(e.to_string()))?;
197 let mut decompressed = Vec::new();
198 decoder
199 .read_to_end(&mut decompressed)
200 .map_err(|e| IoError::DecompressionError(e.to_string()))?;
201 Ok(decompressed)
202 }
203 CompressionAlgorithm::Bzip2 => {
204 let mut decoder = BzDecoder::new(data);
205 let mut decompressed = Vec::new();
206 decoder
207 .read_to_end(&mut decompressed)
208 .map_err(|e| IoError::DecompressionError(e.to_string()))?;
209 Ok(decompressed)
210 }
211 }
212}
213
214pub fn compress_file<P: AsRef<Path>>(
227 input_path: P,
228 output_path: Option<P>,
229 algorithm: CompressionAlgorithm,
230 level: Option<u32>,
231) -> Result<String> {
232 let mut input_data = Vec::new();
234 File::open(input_path.as_ref())
235 .map_err(|e| IoError::FileError(format!("Failed to open input file: {}", e)))?
236 .read_to_end(&mut input_data)
237 .map_err(|e| IoError::FileError(format!("Failed to read input file: {}", e)))?;
238
239 let compressed_data = compress_data(&input_data, algorithm, level)?;
241
242 let output_path_string = match output_path {
244 Some(path) => path.as_ref().to_string_lossy().to_string(),
245 None => {
246 let mut path_buf = input_path.as_ref().to_path_buf();
248 let ext = algorithm.extension();
249
250 let file_name = path_buf
252 .file_name()
253 .ok_or_else(|| IoError::FileError("Invalid input file path".to_string()))?
254 .to_string_lossy()
255 .to_string();
256
257 let new_file_name = format!("{}.{}", file_name, ext);
259 path_buf.set_file_name(new_file_name);
260
261 path_buf.to_string_lossy().to_string()
262 }
263 };
264
265 File::create(&output_path_string)
267 .map_err(|e| IoError::FileError(format!("Failed to create output file: {}", e)))?
268 .write_all(&compressed_data)
269 .map_err(|e| IoError::FileError(format!("Failed to write to output file: {}", e)))?;
270
271 Ok(output_path_string)
272}
273
274pub fn decompress_file<P: AsRef<Path>>(
286 input_path: P,
287 output_path: Option<P>,
288 algorithm: Option<CompressionAlgorithm>,
289) -> Result<String> {
290 let algorithm = match algorithm {
292 Some(algo) => algo,
293 None => {
294 let ext = input_path
296 .as_ref()
297 .extension()
298 .ok_or_else(|| {
299 IoError::DecompressionError("Unable to determine file extension".to_string())
300 })?
301 .to_string_lossy()
302 .to_string();
303
304 CompressionAlgorithm::from_extension(&ext)
305 .ok_or(IoError::UnsupportedCompressionAlgorithm(ext))?
306 }
307 };
308
309 let mut input_data = Vec::new();
311 File::open(input_path.as_ref())
312 .map_err(|e| IoError::FileError(format!("Failed to open input file: {}", e)))?
313 .read_to_end(&mut input_data)
314 .map_err(|e| IoError::FileError(format!("Failed to read input file: {}", e)))?;
315
316 let decompressed_data = decompress_data(&input_data, algorithm)?;
318
319 let output_path_string = match output_path {
321 Some(path) => path.as_ref().to_string_lossy().to_string(),
322 None => {
323 let path_str = input_path.as_ref().to_string_lossy().to_string();
325 let ext = algorithm.extension();
326
327 if path_str.ends_with(&format!(".{}", ext)) {
328 path_str[0..path_str.len() - ext.len() - 1].to_string()
330 } else {
331 format!("{}.decompressed", path_str)
333 }
334 }
335 };
336
337 File::create(&output_path_string)
339 .map_err(|e| IoError::FileError(format!("Failed to create output file: {}", e)))?
340 .write_all(&decompressed_data)
341 .map_err(|e| IoError::FileError(format!("Failed to write to output file: {}", e)))?;
342
343 Ok(output_path_string)
344}
345
346pub fn compression_ratio(
358 data: &[u8],
359 algorithm: CompressionAlgorithm,
360 level: Option<u32>,
361) -> Result<f64> {
362 let compressed = compress_data(data, algorithm, level)?;
363 let original_size = data.len() as f64;
364 let compressed_size = compressed.len() as f64;
365
366 if compressed_size == 0.0 {
368 return Err(IoError::CompressionError(
369 "Compressed data has zero size".to_string(),
370 ));
371 }
372
373 Ok(original_size / compressed_size)
374}
375
376pub struct CompressionInfo {
378 pub name: String,
380 pub description: String,
382 pub typical_compression_ratio: f64,
384 pub compression_speed: u8,
386 pub decompression_speed: u8,
388 pub file_extension: String,
390}
391
392pub fn algorithm_info(algorithm: CompressionAlgorithm) -> CompressionInfo {
394 match algorithm {
395 CompressionAlgorithm::Gzip => CompressionInfo {
396 name: "GZIP".to_string(),
397 description: "General-purpose compression algorithm with good balance of speed and compression ratio".to_string(),
398 typical_compression_ratio: 2.5,
399 compression_speed: 6,
400 decompression_speed: 7,
401 file_extension: "gz".to_string(),
402 },
403 CompressionAlgorithm::Zstd => CompressionInfo {
404 name: "Zstandard".to_string(),
405 description: "Modern compression algorithm with excellent compression ratio and fast decompression".to_string(),
406 typical_compression_ratio: 3.2,
407 compression_speed: 7,
408 decompression_speed: 9,
409 file_extension: "zst".to_string(),
410 },
411 CompressionAlgorithm::Lz4 => CompressionInfo {
412 name: "LZ4".to_string(),
413 description: "Extremely fast compression algorithm with moderate compression ratio".to_string(),
414 typical_compression_ratio: 1.8,
415 compression_speed: 10,
416 decompression_speed: 10,
417 file_extension: "lz4".to_string(),
418 },
419 CompressionAlgorithm::Bzip2 => CompressionInfo {
420 name: "BZIP2".to_string(),
421 description: "High compression ratio but slower speed, good for archival storage".to_string(),
422 typical_compression_ratio: 3.5,
423 compression_speed: 3,
424 decompression_speed: 4,
425 file_extension: "bz2".to_string(),
426 },
427 }
428}
429
430const GZIP_MAGIC: &[u8] = &[0x1f, 0x8b];
432const ZSTD_MAGIC: &[u8] = &[0x28, 0xb5, 0x2f, 0xfd];
433const LZ4_MAGIC: &[u8] = &[0x04, 0x22, 0x4d, 0x18];
434const BZIP2_MAGIC: &[u8] = &[0x42, 0x5a, 0x68];
435
436pub fn detect_compression_from_bytes(data: &[u8]) -> Option<CompressionAlgorithm> {
438 if data.starts_with(GZIP_MAGIC) {
439 Some(CompressionAlgorithm::Gzip)
440 } else if data.starts_with(ZSTD_MAGIC) {
441 Some(CompressionAlgorithm::Zstd)
442 } else if data.starts_with(LZ4_MAGIC) {
443 Some(CompressionAlgorithm::Lz4)
444 } else if data.starts_with(BZIP2_MAGIC) {
445 Some(CompressionAlgorithm::Bzip2)
446 } else {
447 None
448 }
449}
450
451pub struct TransparentFileHandler {
453 pub auto_detect_extension: bool,
455 pub auto_detect_content: bool,
457 pub default_algorithm: CompressionAlgorithm,
459 pub default_level: Option<u32>,
461}
462
463impl Default for TransparentFileHandler {
464 fn default() -> Self {
465 Self {
466 auto_detect_extension: true,
467 auto_detect_content: true,
468 default_algorithm: CompressionAlgorithm::Zstd,
469 default_level: Some(6),
470 }
471 }
472}
473
474impl TransparentFileHandler {
475 pub fn new(
477 auto_detect_extension: bool,
478 auto_detect_content: bool,
479 default_algorithm: CompressionAlgorithm,
480 default_level: Option<u32>,
481 ) -> Self {
482 Self {
483 auto_detect_extension,
484 auto_detect_content,
485 default_algorithm,
486 default_level,
487 }
488 }
489
490 pub fn read_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<u8>> {
492 let mut file_data = Vec::new();
493 File::open(path.as_ref())
494 .map_err(|e| IoError::FileError(format!("Failed to open file: {}", e)))?
495 .read_to_end(&mut file_data)
496 .map_err(|e| IoError::FileError(format!("Failed to read file: {}", e)))?;
497
498 let mut algorithm = None;
500
501 if self.auto_detect_extension {
503 if let Some(ext) = path.as_ref().extension() {
504 algorithm = CompressionAlgorithm::from_extension(&ext.to_string_lossy());
505 }
506 }
507
508 if algorithm.is_none() && self.auto_detect_content {
510 algorithm = detect_compression_from_bytes(&file_data);
511 }
512
513 match algorithm {
515 Some(algo) => decompress_data(&file_data, algo),
516 None => Ok(file_data), }
518 }
519
520 pub fn write_file<P: AsRef<Path>>(&self, path: P, data: &[u8]) -> Result<()> {
522 let mut algorithm = None;
523 let level = self.default_level;
524
525 if self.auto_detect_extension {
527 if let Some(ext) = path.as_ref().extension() {
528 algorithm = CompressionAlgorithm::from_extension(&ext.to_string_lossy());
529 }
530 }
531
532 if algorithm.is_none() && self.should_compress_by_default(&path) {
534 algorithm = Some(self.default_algorithm);
535 }
536
537 let output_data = match algorithm {
539 Some(algo) => compress_data(data, algo, level)?,
540 None => data.to_vec(),
541 };
542
543 File::create(path.as_ref())
545 .map_err(|e| IoError::FileError(format!("Failed to create file: {}", e)))?
546 .write_all(&output_data)
547 .map_err(|e| IoError::FileError(format!("Failed to write file: {}", e)))?;
548
549 Ok(())
550 }
551
552 fn should_compress_by_default<P: AsRef<Path>>(&self, path: P) -> bool {
554 if let Some(ext) = path.as_ref().extension() {
556 let ext_str = ext.to_string_lossy().to_lowercase();
557 matches!(
558 ext_str.as_str(),
559 "gz" | "gzip" | "zst" | "zstd" | "lz4" | "bz2" | "bzip2"
560 )
561 } else {
562 false
563 }
564 }
565
566 pub fn copy_file<P: AsRef<Path>, Q: AsRef<Path>>(
568 &self,
569 source: P,
570 destination: Q,
571 ) -> Result<()> {
572 let data = self.read_file(source)?;
573 self.write_file(destination, &data)?;
574 Ok(())
575 }
576
577 pub fn file_info<P: AsRef<Path>>(&self, path: P) -> Result<FileCompressionInfo> {
579 let mut file_data = Vec::new();
580 File::open(path.as_ref())
581 .map_err(|e| IoError::FileError(format!("Failed to open file: {}", e)))?
582 .read_to_end(&mut file_data)
583 .map_err(|e| IoError::FileError(format!("Failed to read file: {}", e)))?;
584
585 let original_size = file_data.len();
586
587 let detected_algorithm = if self.auto_detect_content {
589 detect_compression_from_bytes(&file_data)
590 } else {
591 None
592 };
593
594 let extension_algorithm = if self.auto_detect_extension {
595 path.as_ref()
596 .extension()
597 .and_then(|ext| CompressionAlgorithm::from_extension(&ext.to_string_lossy()))
598 } else {
599 None
600 };
601
602 let is_compressed = detected_algorithm.is_some() || extension_algorithm.is_some();
603 let algorithm = detected_algorithm.or(extension_algorithm);
604
605 let uncompressed_size = if let Some(algo) = algorithm {
606 match decompress_data(&file_data, algo) {
607 Ok(decompressed) => Some(decompressed.len()),
608 Err(_) => None,
609 }
610 } else {
611 Some(original_size)
612 };
613
614 Ok(FileCompressionInfo {
615 path: path.as_ref().to_path_buf(),
616 is_compressed,
617 algorithm,
618 compressed_size: if is_compressed {
619 Some(original_size)
620 } else {
621 None
622 },
623 uncompressed_size,
624 compression_ratio: if let (Some(compressed), Some(uncompressed)) = (
625 if is_compressed {
626 Some(original_size)
627 } else {
628 None
629 },
630 uncompressed_size,
631 ) {
632 Some(uncompressed as f64 / compressed as f64)
633 } else {
634 None
635 },
636 })
637 }
638}
639
640#[derive(Debug, Clone)]
642pub struct FileCompressionInfo {
643 pub path: std::path::PathBuf,
645 pub is_compressed: bool,
647 pub algorithm: Option<CompressionAlgorithm>,
649 pub compressed_size: Option<usize>,
651 pub uncompressed_size: Option<usize>,
653 pub compression_ratio: Option<f64>,
655}
656
657static GLOBAL_HANDLER: std::sync::OnceLock<TransparentFileHandler> = std::sync::OnceLock::new();
659
660pub fn init_global_handler(handler: TransparentFileHandler) {
662 let _ = GLOBAL_HANDLER.set(handler);
663}
664
665pub fn global_handler() -> &'static TransparentFileHandler {
667 GLOBAL_HANDLER.get_or_init(TransparentFileHandler::default)
668}
669
670pub fn read_file_transparent<P: AsRef<Path>>(path: P) -> Result<Vec<u8>> {
672 global_handler().read_file(path)
673}
674
675pub fn write_file_transparent<P: AsRef<Path>>(path: P, data: &[u8]) -> Result<()> {
677 global_handler().write_file(path, data)
678}
679
680pub fn copy_file_transparent<P: AsRef<Path>, Q: AsRef<Path>>(
682 source: P,
683 destination: Q,
684) -> Result<()> {
685 global_handler().copy_file(source, destination)
686}
687
688pub fn file_info_transparent<P: AsRef<Path>>(path: P) -> Result<FileCompressionInfo> {
690 global_handler().file_info(path)
691}
692
693use scirs2_core::parallel_ops::*;
698use std::sync::atomic::{AtomicUsize, Ordering};
699use std::sync::Arc;
700use std::time::Instant;
701
702#[derive(Debug, Clone)]
704pub struct ParallelCompressionConfig {
705 pub num_threads: usize,
707 pub chunk_size: usize,
709 pub buffer_size: usize,
711 pub enable_memory_mapping: bool,
713}
714
715impl Default for ParallelCompressionConfig {
716 fn default() -> Self {
717 Self {
718 num_threads: 0, chunk_size: 1024 * 1024, buffer_size: 64 * 1024, enable_memory_mapping: true,
722 }
723 }
724}
725
726#[derive(Debug, Clone)]
728pub struct ParallelCompressionStats {
729 pub chunks_processed: usize,
731 pub bytes_processed: usize,
733 pub bytes_output: usize,
735 pub operation_time_ms: f64,
737 pub throughput_bps: f64,
739 pub compression_ratio: f64,
741 pub threads_used: usize,
743}
744
745pub fn compress_data_parallel(
747 data: &[u8],
748 algorithm: CompressionAlgorithm,
749 level: Option<u32>,
750 config: ParallelCompressionConfig,
751) -> Result<(Vec<u8>, ParallelCompressionStats)> {
752 let start_time = Instant::now();
753 let input_size = data.len();
754
755 let num_threads = if config.num_threads == 0 {
757 num_threads()
758 } else {
759 config.num_threads
760 };
761
762 if input_size <= config.chunk_size {
764 let compressed = compress_data(data, algorithm, level)?;
765 let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
766
767 let stats = ParallelCompressionStats {
768 chunks_processed: 1,
769 bytes_processed: input_size,
770 bytes_output: compressed.len(),
771 operation_time_ms: operation_time,
772 throughput_bps: input_size as f64 / (operation_time / 1000.0),
773 compression_ratio: input_size as f64 / compressed.len() as f64,
774 threads_used: 1,
775 };
776
777 return Ok((compressed, stats));
778 }
779
780 let chunk_size = config.chunk_size;
782 let chunks: Vec<&[u8]> = data.chunks(chunk_size).collect();
783 let chunk_count = chunks.len();
784
785 let processed_count = Arc::new(AtomicUsize::new(0));
787 let compressed_chunks: Result<Vec<Vec<u8>>> = chunks
788 .into_par_iter()
789 .map(|chunk| {
790 let result = compress_data(chunk, algorithm, level);
791 processed_count.fetch_add(1, Ordering::Relaxed);
792 result
793 })
794 .collect();
795
796 let compressed_chunks = compressed_chunks?;
797
798 let total_compressed_size: usize = compressed_chunks.iter().map(|chunk| chunk.len()).sum();
800 let mut result = Vec::with_capacity(total_compressed_size + (chunk_count * 8)); result.extend_from_slice(&(chunk_count as u64).to_le_bytes());
804 for chunk in &compressed_chunks {
805 result.extend_from_slice(&(chunk.len() as u64).to_le_bytes());
806 }
807
808 for chunk in compressed_chunks {
810 result.extend_from_slice(&chunk);
811 }
812
813 let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
814
815 let stats = ParallelCompressionStats {
816 chunks_processed: chunk_count,
817 bytes_processed: input_size,
818 bytes_output: result.len(),
819 operation_time_ms: operation_time,
820 throughput_bps: input_size as f64 / (operation_time / 1000.0),
821 compression_ratio: input_size as f64 / result.len() as f64,
822 threads_used: num_threads,
823 };
824
825 Ok((result, stats))
826}
827
828pub fn decompress_data_parallel(
830 data: &[u8],
831 algorithm: CompressionAlgorithm,
832 config: ParallelCompressionConfig,
833) -> Result<(Vec<u8>, ParallelCompressionStats)> {
834 let start_time = Instant::now();
835 let input_size = data.len();
836
837 let num_threads = if config.num_threads == 0 {
839 num_threads()
840 } else {
841 config.num_threads
842 };
843
844 if data.len() < 8 {
846 let decompressed = decompress_data(data, algorithm)?;
848 let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
849
850 let stats = ParallelCompressionStats {
851 chunks_processed: 1,
852 bytes_processed: input_size,
853 bytes_output: decompressed.len(),
854 operation_time_ms: operation_time,
855 throughput_bps: decompressed.len() as f64 / (operation_time / 1000.0),
856 compression_ratio: decompressed.len() as f64 / input_size as f64,
857 threads_used: 1,
858 };
859
860 return Ok((decompressed, stats));
861 }
862
863 let chunk_count = u64::from_le_bytes(
865 data[0..8]
866 .try_into()
867 .map_err(|_| IoError::DecompressionError("Invalid chunk header".to_string()))?,
868 ) as usize;
869
870 if chunk_count == 0 || chunk_count > data.len() / 8 {
871 let decompressed = decompress_data(data, algorithm)?;
873 let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
874
875 let stats = ParallelCompressionStats {
876 chunks_processed: 1,
877 bytes_processed: input_size,
878 bytes_output: decompressed.len(),
879 operation_time_ms: operation_time,
880 throughput_bps: decompressed.len() as f64 / (operation_time / 1000.0),
881 compression_ratio: decompressed.len() as f64 / input_size as f64,
882 threads_used: 1,
883 };
884
885 return Ok((decompressed, stats));
886 }
887
888 let header_size = 8 + (chunk_count * 8);
890 if data.len() < header_size {
891 return Err(IoError::DecompressionError(
892 "Truncated chunk headers".to_string(),
893 ));
894 }
895
896 let mut chunk_sizes = Vec::with_capacity(chunk_count);
897 for i in 0..chunk_count {
898 let start_idx = 8 + (i * 8);
899 let size = u64::from_le_bytes(
900 data[start_idx..start_idx + 8]
901 .try_into()
902 .map_err(|_| IoError::DecompressionError("Invalid chunk size".to_string()))?,
903 ) as usize;
904 chunk_sizes.push(size);
905 }
906
907 let mut chunks = Vec::with_capacity(chunk_count);
909 let mut offset = header_size;
910
911 for &size in &chunk_sizes {
912 if offset + size > data.len() {
913 return Err(IoError::DecompressionError(
914 "Truncated chunk data".to_string(),
915 ));
916 }
917 chunks.push(&data[offset..offset + size]);
918 offset += size;
919 }
920
921 let processed_count = Arc::new(AtomicUsize::new(0));
923 let decompressed_chunks: Result<Vec<Vec<u8>>> = chunks
924 .into_par_iter()
925 .map(|chunk| {
926 let result = decompress_data(chunk, algorithm);
927 processed_count.fetch_add(1, Ordering::Relaxed);
928 result
929 })
930 .collect();
931
932 let decompressed_chunks = decompressed_chunks?;
933
934 let total_size: usize = decompressed_chunks.iter().map(|chunk| chunk.len()).sum();
936 let mut result = Vec::with_capacity(total_size);
937
938 for chunk in decompressed_chunks {
939 result.extend_from_slice(&chunk);
940 }
941
942 let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
943
944 let stats = ParallelCompressionStats {
945 chunks_processed: chunk_count,
946 bytes_processed: input_size,
947 bytes_output: result.len(),
948 operation_time_ms: operation_time,
949 throughput_bps: result.len() as f64 / (operation_time / 1000.0),
950 compression_ratio: result.len() as f64 / input_size as f64,
951 threads_used: num_threads,
952 };
953
954 Ok((result, stats))
955}
956
957pub fn compress_file_parallel<P: AsRef<Path>>(
959 input_path: P,
960 output_path: Option<P>,
961 algorithm: CompressionAlgorithm,
962 level: Option<u32>,
963 config: ParallelCompressionConfig,
964) -> Result<(String, ParallelCompressionStats)> {
965 let mut input_data = Vec::new();
967 File::open(input_path.as_ref())
968 .map_err(|e| IoError::FileError(format!("Failed to open input file: {}", e)))?
969 .read_to_end(&mut input_data)
970 .map_err(|e| IoError::FileError(format!("Failed to read input file: {}", e)))?;
971
972 let (compressed_data, stats) = compress_data_parallel(&input_data, algorithm, level, config)?;
974
975 let output_path_string = match output_path {
977 Some(path) => path.as_ref().to_string_lossy().to_string(),
978 None => {
979 let mut path_buf = input_path.as_ref().to_path_buf();
980 let ext = algorithm.extension();
981 let file_name = path_buf
982 .file_name()
983 .ok_or_else(|| IoError::FileError("Invalid input file path".to_string()))?
984 .to_string_lossy()
985 .to_string();
986 let new_file_name = format!("{}.{}", file_name, ext);
987 path_buf.set_file_name(new_file_name);
988 path_buf.to_string_lossy().to_string()
989 }
990 };
991
992 File::create(&output_path_string)
994 .map_err(|e| IoError::FileError(format!("Failed to create output file: {}", e)))?
995 .write_all(&compressed_data)
996 .map_err(|e| IoError::FileError(format!("Failed to write to output file: {}", e)))?;
997
998 Ok((output_path_string, stats))
999}
1000
1001pub fn decompress_file_parallel<P: AsRef<Path>>(
1003 input_path: P,
1004 output_path: Option<P>,
1005 algorithm: Option<CompressionAlgorithm>,
1006 config: ParallelCompressionConfig,
1007) -> Result<(String, ParallelCompressionStats)> {
1008 let algorithm = match algorithm {
1010 Some(algo) => algo,
1011 None => {
1012 let ext = input_path
1013 .as_ref()
1014 .extension()
1015 .ok_or_else(|| {
1016 IoError::DecompressionError("Unable to determine file extension".to_string())
1017 })?
1018 .to_string_lossy()
1019 .to_string();
1020
1021 CompressionAlgorithm::from_extension(&ext)
1022 .ok_or(IoError::UnsupportedCompressionAlgorithm(ext))?
1023 }
1024 };
1025
1026 let mut input_data = Vec::new();
1028 File::open(input_path.as_ref())
1029 .map_err(|e| IoError::FileError(format!("Failed to open input file: {}", e)))?
1030 .read_to_end(&mut input_data)
1031 .map_err(|e| IoError::FileError(format!("Failed to read input file: {}", e)))?;
1032
1033 let (decompressed_data, stats) = decompress_data_parallel(&input_data, algorithm, config)?;
1035
1036 let output_path_string = match output_path {
1038 Some(path) => path.as_ref().to_string_lossy().to_string(),
1039 None => {
1040 let path_str = input_path.as_ref().to_string_lossy().to_string();
1041 let ext = algorithm.extension();
1042
1043 if path_str.ends_with(&format!(".{}", ext)) {
1044 path_str[0..path_str.len() - ext.len() - 1].to_string()
1045 } else {
1046 format!("{}.decompressed", path_str)
1047 }
1048 }
1049 };
1050
1051 File::create(&output_path_string)
1053 .map_err(|e| IoError::FileError(format!("Failed to create output file: {}", e)))?
1054 .write_all(&decompressed_data)
1055 .map_err(|e| IoError::FileError(format!("Failed to write to output file: {}", e)))?;
1056
1057 Ok((output_path_string, stats))
1058}
1059
1060pub fn benchmark_compression_algorithms(
1062 data: &[u8],
1063 algorithms: &[CompressionAlgorithm],
1064 levels: &[u32],
1065 parallel_configs: &[ParallelCompressionConfig],
1066) -> Result<Vec<CompressionBenchmarkResult>> {
1067 let mut results = Vec::new();
1068
1069 for &algorithm in algorithms {
1070 for &level in levels {
1071 let start_time = Instant::now();
1073 let compressed = compress_data(data, algorithm, Some(level))?;
1074 let sequential_time = start_time.elapsed().as_secs_f64() * 1000.0;
1075
1076 let decompressed = decompress_data(&compressed, algorithm)?;
1077 let sequential_decomp_time =
1078 start_time.elapsed().as_secs_f64() * 1000.0 - sequential_time;
1079
1080 assert_eq!(data, &decompressed, "Round-trip failed for {:?}", algorithm);
1081
1082 for config in parallel_configs {
1084 let (par_compressed, par_comp_stats) =
1085 compress_data_parallel(data, algorithm, Some(level), config.clone())?;
1086 let (par_decompressed, par_decomp_stats) =
1087 decompress_data_parallel(&par_compressed, algorithm, config.clone())?;
1088
1089 assert_eq!(
1090 data, &par_decompressed,
1091 "Parallel round-trip failed for {:?}",
1092 algorithm
1093 );
1094
1095 results.push(CompressionBenchmarkResult {
1096 algorithm,
1097 level,
1098 config: config.clone(),
1099 input_size: data.len(),
1100 compressed_size: compressed.len(),
1101 parallel_compressed_size: par_compressed.len(),
1102 sequential_compression_time_ms: sequential_time,
1103 sequential_decompression_time_ms: sequential_decomp_time,
1104 parallel_compression_stats: par_comp_stats,
1105 parallel_decompression_stats: par_decomp_stats,
1106 compression_ratio: data.len() as f64 / compressed.len() as f64,
1107 parallel_compression_ratio: data.len() as f64 / par_compressed.len() as f64,
1108 });
1109 }
1110 }
1111 }
1112
1113 Ok(results)
1114}
1115
1116#[derive(Debug, Clone)]
1118pub struct CompressionBenchmarkResult {
1119 pub algorithm: CompressionAlgorithm,
1121 pub level: u32,
1123 pub config: ParallelCompressionConfig,
1125 pub input_size: usize,
1127 pub compressed_size: usize,
1129 pub parallel_compressed_size: usize,
1131 pub sequential_compression_time_ms: f64,
1133 pub sequential_decompression_time_ms: f64,
1135 pub parallel_compression_stats: ParallelCompressionStats,
1137 pub parallel_decompression_stats: ParallelCompressionStats,
1139 pub compression_ratio: f64,
1141 pub parallel_compression_ratio: f64,
1143}
1144
1145impl CompressionBenchmarkResult {
1146 pub fn compression_speedup(&self) -> f64 {
1148 self.sequential_compression_time_ms / self.parallel_compression_stats.operation_time_ms
1149 }
1150
1151 pub fn decompression_speedup(&self) -> f64 {
1153 self.sequential_decompression_time_ms / self.parallel_decompression_stats.operation_time_ms
1154 }
1155
1156 pub fn compression_overhead(&self) -> f64 {
1158 self.parallel_compressed_size as f64 / self.compressed_size as f64
1159 }
1160}