1use std::fs::File;
7use std::io::{Read, Write};
8use std::path::Path;
9
10use ::serde::{Deserialize, Serialize};
11use bincode::{config, serde as bincode_serde};
12use scirs2_core::ndarray::{ArrayBase, Data, Dimension, IxDyn, OwnedRepr};
13
14use super::{compress_data, decompress_data, CompressionAlgorithm};
15use crate::error::{IoError, Result};
16use scirs2_core::parallel_ops::*;
17use scirs2_core::simd_ops::PlatformCapabilities;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct CompressedArrayMetadata {
22 pub shape: Vec<usize>,
24 pub dtype: String,
26 pub element_size: usize,
28 pub algorithm: String,
30 pub original_size: usize,
32 pub compressed_size: usize,
34 pub compression_ratio: f64,
36 pub compression_level: u32,
38 pub additional_metadata: std::collections::HashMap<String, String>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CompressedArray {
45 pub metadata: CompressedArrayMetadata,
47 pub data: Vec<u8>,
49}
50
51#[allow(dead_code)]
65pub fn compress_array<P, A, S, D>(
66 path: P,
67 array: &ArrayBase<S, D>,
68 algorithm: CompressionAlgorithm,
69 level: Option<u32>,
70 additional_metadata: Option<std::collections::HashMap<String, String>>,
71) -> Result<()>
72where
73 P: AsRef<Path>,
74 A: Serialize + Clone,
75 S: Data<Elem = A>,
76 D: Dimension + Serialize,
77{
78 let cfg = config::standard();
81 let flat_data: Vec<u8> = bincode_serde::encode_to_vec(array, cfg)
82 .map_err(|e| IoError::SerializationError(e.to_string()))?;
83
84 let level = level.unwrap_or(6);
86 let compressed_data = compress_data(&flat_data, algorithm, Some(level))?;
87
88 let metadata = CompressedArrayMetadata {
90 shape: array.shape().to_vec(),
91 dtype: std::any::type_name::<A>().to_string(),
92 element_size: std::mem::size_of::<A>(),
93 algorithm: format!("{algorithm:?}"),
94 original_size: flat_data.len(),
95 compressed_size: compressed_data.len(),
96 compression_ratio: flat_data.len() as f64 / compressed_data.len() as f64,
97 compression_level: level,
98 additional_metadata: additional_metadata.unwrap_or_default(),
99 };
100
101 let compressed_array = CompressedArray {
103 metadata,
104 data: compressed_data,
105 };
106
107 let serialized = bincode_serde::encode_to_vec(&compressed_array, cfg)
109 .map_err(|e| IoError::SerializationError(e.to_string()))?;
110
111 File::create(path)
112 .map_err(|e| IoError::FileError(format!("Failed to create output file: {e}")))?
113 .write_all(&serialized)
114 .map_err(|e| IoError::FileError(format!("Failed to write to output file: {e}")))?;
115
116 Ok(())
117}
118
119#[allow(dead_code)]
129pub fn decompress_array<P, A, D>(path: P) -> Result<ArrayBase<OwnedRepr<A>, D>>
130where
131 P: AsRef<Path>,
132 A: for<'de> Deserialize<'de> + Clone,
133 D: Dimension + for<'de> Deserialize<'de>,
134{
135 let mut file = File::open(path)
137 .map_err(|e| IoError::FileError(format!("Failed to open input file: {e}")))?;
138
139 let mut serialized = Vec::new();
140 file.read_to_end(&mut serialized)
141 .map_err(|e| IoError::FileError(format!("Failed to read input file: {e}")))?;
142
143 let cfg = config::standard();
145 let (compressed_array, _len): (CompressedArray, usize) =
146 bincode_serde::decode_from_slice(&serialized, cfg)
147 .map_err(|e| IoError::DeserializationError(e.to_string()))?;
148
149 let algorithm = match compressed_array.metadata.algorithm.as_str() {
151 "Gzip" => CompressionAlgorithm::Gzip,
152 "Zstd" => CompressionAlgorithm::Zstd,
153 "Lz4" => CompressionAlgorithm::Lz4,
154 "Bzip2" => CompressionAlgorithm::Bzip2,
155 _ => {
156 return Err(IoError::DecompressionError(format!(
157 "Unknown compression algorithm: {}",
158 compressed_array.metadata.algorithm
159 )))
160 }
161 };
162
163 let decompressed_data = decompress_data(&compressed_array.data, algorithm)?;
165
166 let (array, _len): (ArrayBase<OwnedRepr<A>, D>, usize) =
168 bincode_serde::decode_from_slice(&decompressed_data, cfg)
169 .map_err(|e| IoError::DeserializationError(e.to_string()))?;
170
171 Ok(array)
172}
173
174#[allow(dead_code)]
191pub fn compress_array_chunked<P, A, S>(
192 path: P,
193 array: &ArrayBase<S, IxDyn>,
194 algorithm: CompressionAlgorithm,
195 level: Option<u32>,
196 chunk_size: usize,
197) -> Result<()>
198where
199 P: AsRef<Path>,
200 A: Serialize + Clone,
201 S: Data<Elem = A>,
202{
203 let mut compressed_chunks = Vec::new();
205 let mut total_original_size = 0;
206 let mut total_compressed_size = 0;
207
208 for chunk_idx in 0..((array.len() + chunk_size - 1) / chunk_size) {
211 let start = chunk_idx * chunk_size;
212 let end = (start + chunk_size).min(array.len());
213
214 let chunk_data: Vec<A> = array
216 .iter()
217 .skip(start)
218 .take(end - start)
219 .cloned()
220 .collect();
221
222 let cfg = config::standard();
224 let serialized_chunk = bincode_serde::encode_to_vec(&chunk_data, cfg)
225 .map_err(|e| IoError::SerializationError(e.to_string()))?;
226
227 let compressed_chunk = compress_data(&serialized_chunk, algorithm, level)?;
229
230 total_original_size += serialized_chunk.len();
232 total_compressed_size += compressed_chunk.len();
233
234 compressed_chunks.push(compressed_chunk);
236 }
237
238 let metadata = CompressedArrayMetadata {
240 shape: array.shape().to_vec(),
241 dtype: std::any::type_name::<A>().to_string(),
242 element_size: std::mem::size_of::<A>(),
243 algorithm: format!("{algorithm:?}"),
244 original_size: total_original_size,
245 compressed_size: total_compressed_size,
246 compression_ratio: total_original_size as f64 / total_compressed_size as f64,
247 compression_level: level.unwrap_or(6),
248 additional_metadata: {
249 let mut map = std::collections::HashMap::new();
250 map.insert("chunked".to_string(), "true".to_string());
251 map.insert(
252 "num_chunks".to_string(),
253 compressed_chunks.len().to_string(),
254 );
255 map.insert("chunk_size".to_string(), chunk_size.to_string());
256 map
257 },
258 };
259
260 let mut file = File::create(path)
262 .map_err(|e| IoError::FileError(format!("Failed to create output file: {e}")))?;
263
264 let cfg = config::standard();
266 let serialized_metadata = bincode_serde::encode_to_vec(&metadata, cfg)
267 .map_err(|e| IoError::SerializationError(e.to_string()))?;
268
269 let metadata_size = serialized_metadata.len() as u64;
270 file.write_all(&metadata_size.to_le_bytes())
271 .map_err(|e| IoError::FileError(format!("Failed to write metadata size: {e}")))?;
272
273 file.write_all(&serialized_metadata)
274 .map_err(|e| IoError::FileError(format!("Failed to write metadata: {e}")))?;
275
276 let num_chunks = compressed_chunks.len() as u64;
278 file.write_all(&num_chunks.to_le_bytes())
279 .map_err(|e| IoError::FileError(format!("Failed to write chunk count: {e}")))?;
280
281 for chunk in compressed_chunks {
283 let chunk_size = chunk.len() as u64;
284 file.write_all(&chunk_size.to_le_bytes())
285 .map_err(|e| IoError::FileError(format!("Failed to write chunk size: {e}")))?;
286
287 file.write_all(&chunk)
288 .map_err(|e| IoError::FileError(format!("Failed to write chunk data: {e}")))?;
289 }
290
291 Ok(())
292}
293
294#[allow(dead_code)]
304pub fn decompress_array_chunked<P, A>(
305 path: P,
306) -> Result<(ArrayBase<OwnedRepr<A>, IxDyn>, CompressedArrayMetadata)>
307where
308 P: AsRef<Path>,
309 A: for<'de> Deserialize<'de> + Clone,
310{
311 let mut file = File::open(path)
312 .map_err(|e| IoError::FileError(format!("Failed to open input file: {e}")))?;
313
314 let mut metadata_size_bytes = [0u8; 8];
316 file.read_exact(&mut metadata_size_bytes)
317 .map_err(|e| IoError::FileError(format!("Failed to read metadata size: {e}")))?;
318
319 let metadata_size = u64::from_le_bytes(metadata_size_bytes) as usize;
320
321 let mut metadata_bytes = vec![0u8; metadata_size];
323 file.read_exact(&mut metadata_bytes)
324 .map_err(|e| IoError::FileError(format!("Failed to read metadata: {e}")))?;
325
326 let cfg = config::standard();
327 let (metadata, _len): (CompressedArrayMetadata, usize) =
328 bincode_serde::decode_from_slice(&metadata_bytes, cfg)
329 .map_err(|e| IoError::DeserializationError(e.to_string()))?;
330
331 let algorithm = match metadata.algorithm.as_str() {
333 "Gzip" => CompressionAlgorithm::Gzip,
334 "Zstd" => CompressionAlgorithm::Zstd,
335 "Lz4" => CompressionAlgorithm::Lz4,
336 "Bzip2" => CompressionAlgorithm::Bzip2,
337 _ => {
338 return Err(IoError::DecompressionError(format!(
339 "Unknown compression algorithm: {}",
340 metadata.algorithm
341 )))
342 }
343 };
344
345 let mut num_chunks_bytes = [0u8; 8];
347 file.read_exact(&mut num_chunks_bytes)
348 .map_err(|e| IoError::FileError(format!("Failed to read chunk count: {e}")))?;
349
350 let num_chunks = u64::from_le_bytes(num_chunks_bytes) as usize;
351
352 let total_elements: usize = metadata.shape.iter().product();
354 let mut all_elements = Vec::with_capacity(total_elements);
355
356 for _ in 0..num_chunks {
358 let mut chunk_size_bytes = [0u8; 8];
360 file.read_exact(&mut chunk_size_bytes)
361 .map_err(|e| IoError::FileError(format!("Failed to read chunk size: {e}")))?;
362
363 let chunk_size = u64::from_le_bytes(chunk_size_bytes) as usize;
364
365 let mut chunk_bytes = vec![0u8; chunk_size];
367 file.read_exact(&mut chunk_bytes)
368 .map_err(|e| IoError::FileError(format!("Failed to read chunk data: {e}")))?;
369
370 let decompressed_chunk = decompress_data(&chunk_bytes, algorithm)?;
372
373 let (chunk_elements, _len): (Vec<A>, usize) =
375 bincode_serde::decode_from_slice(&decompressed_chunk, cfg)
376 .map_err(|e| IoError::DeserializationError(e.to_string()))?;
377
378 all_elements.extend(chunk_elements);
379 }
380
381 let array = ArrayBase::from_shape_vec(IxDyn(&metadata.shape), all_elements)
383 .map_err(|e| IoError::DeserializationError(e.to_string()))?;
384
385 Ok((array, metadata))
386}
387
388#[allow(dead_code)]
403pub fn compare_compression_algorithms<A, S, D>(
404 array: &ArrayBase<S, D>,
405 algorithms: &[CompressionAlgorithm],
406 level: Option<u32>,
407) -> Result<Vec<(CompressionAlgorithm, f64, usize)>>
408where
409 A: Serialize + Clone,
410 S: Data<Elem = A>,
411 D: Dimension + Serialize,
412{
413 let cfg = config::standard();
415 let serialized = bincode_serde::encode_to_vec(array, cfg)
416 .map_err(|e| IoError::SerializationError(e.to_string()))?;
417
418 let original_size = serialized.len();
419
420 let mut results = Vec::new();
422
423 for &algorithm in algorithms {
424 let compressed = compress_data(&serialized, algorithm, level)?;
426 let compressed_size = compressed.len();
427 let ratio = original_size as f64 / compressed_size as f64;
428
429 results.push((algorithm, ratio, compressed_size));
430 }
431
432 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
434
435 Ok(results)
436}
437
438#[allow(dead_code)]
459pub fn compress_array_zero_copy<A, S>(
460 array: &ArrayBase<S, IxDyn>,
461 algorithm: CompressionAlgorithm,
462 level: Option<u32>,
463 chunk_size: usize,
464) -> Result<CompressedArray>
465where
466 A: Serialize + Clone + bytemuck::Pod,
467 S: scirs2_core::ndarray::Data<Elem = A>,
468{
469 if !array.is_standard_layout() {
470 return Err(IoError::FormatError(
471 "Array must be in standard layout for zero-copy compression".to_string(),
472 ));
473 }
474
475 let capabilities = PlatformCapabilities::detect();
476 let use_parallel = capabilities.simd_available && array.len() > 10000;
477
478 if let Some(slice) = array.as_slice() {
480 let bytes = bytemuck::cast_slice(slice);
481 let bytes_per_chunk = chunk_size * std::mem::size_of::<A>();
482
483 let (compressed_chunks, total_original_size, total_compressed_size) = if use_parallel {
484 let chunks: Vec<&[u8]> = bytes.chunks(bytes_per_chunk).collect();
486 let results: Vec<_> = chunks
487 .into_par_iter()
488 .map(|chunk_bytes| {
489 let compressed =
490 compress_data(chunk_bytes, algorithm, level).unwrap_or_else(|_| Vec::new());
491 let original_size = chunk_bytes.len();
492 let compressed_size = compressed.len();
493 (compressed, original_size, compressed_size)
494 })
495 .collect();
496
497 let mut compressed_chunks = Vec::new();
498 let mut total_original = 0;
499 let mut total_compressed = 0;
500
501 for (compressed, orig_size, comp_size) in results {
502 compressed_chunks.push(compressed);
503 total_original += orig_size;
504 total_compressed += comp_size;
505 }
506
507 (compressed_chunks, total_original, total_compressed)
508 } else {
509 let mut compressed_chunks = Vec::new();
511 let mut total_original_size = 0;
512 let mut total_compressed_size = 0;
513
514 for chunk_bytes in bytes.chunks(bytes_per_chunk) {
515 let compressed_chunk = compress_data(chunk_bytes, algorithm, level)?;
516 total_original_size += chunk_bytes.len();
517 total_compressed_size += compressed_chunk.len();
518 compressed_chunks.push(compressed_chunk);
519 }
520
521 (
522 compressed_chunks,
523 total_original_size,
524 total_compressed_size,
525 )
526 };
527
528 let mut combined_data =
530 Vec::with_capacity(total_compressed_size + compressed_chunks.len() * 8 + 8);
531
532 combined_data.extend_from_slice(&(compressed_chunks.len() as u64).to_le_bytes());
534
535 for chunk in &compressed_chunks {
537 combined_data.extend_from_slice(&(chunk.len() as u64).to_le_bytes());
538 }
539
540 for chunk in compressed_chunks {
542 combined_data.extend_from_slice(&chunk);
543 }
544
545 let metadata = CompressedArrayMetadata {
547 shape: array.shape().to_vec(),
548 dtype: std::any::type_name::<A>().to_string(),
549 element_size: std::mem::size_of::<A>(),
550 algorithm: format!("{algorithm:?}"),
551 original_size: total_original_size,
552 compressed_size: combined_data.len(),
553 compression_ratio: total_original_size as f64 / combined_data.len() as f64,
554 compression_level: level.unwrap_or(6),
555 additional_metadata: {
556 let mut map = std::collections::HashMap::new();
557 map.insert("zero_copy".to_string(), "true".to_string());
558 map.insert("chunk_size".to_string(), chunk_size.to_string());
559 map.insert("parallel".to_string(), use_parallel.to_string());
560 map
561 },
562 };
563
564 Ok(CompressedArray {
565 metadata,
566 data: combined_data,
567 })
568 } else {
569 Err(IoError::FormatError(
570 "Array must be contiguous for zero-copy compression".to_string(),
571 ))
572 }
573}
574
575#[allow(dead_code)]
588pub fn decompress_array_zero_copy<A>(
589 compressed: &CompressedArray,
590) -> Result<scirs2_core::ndarray::Array<A, IxDyn>>
591where
592 A: for<'de> Deserialize<'de> + Clone + bytemuck::Pod,
593{
594 let algorithm = match compressed.metadata.algorithm.as_str() {
595 "Gzip" => CompressionAlgorithm::Gzip,
596 "Lz4" => CompressionAlgorithm::Lz4,
597 "Zstd" => CompressionAlgorithm::Zstd,
598 "Bzip2" => CompressionAlgorithm::Bzip2,
599 _ => {
600 return Err(IoError::FormatError(format!(
601 "Unknown compression algorithm: {}",
602 compressed.metadata.algorithm
603 )))
604 }
605 };
606
607 let capabilities = PlatformCapabilities::detect();
608 let data = &compressed.data;
609
610 if data.len() < 8 {
612 return Err(IoError::DecompressionError(
613 "Invalid compressed data".to_string(),
614 ));
615 }
616
617 let chunk_count = u64::from_le_bytes(data[0..8].try_into().unwrap()) as usize;
618
619 let header_size = 8 + chunk_count * 8;
621 if data.len() < header_size {
622 return Err(IoError::DecompressionError(
623 "Invalid chunk headers".to_string(),
624 ));
625 }
626
627 let mut chunk_sizes = Vec::with_capacity(chunk_count);
628 for i in 0..chunk_count {
629 let start = 8 + i * 8;
630 let size = u64::from_le_bytes(data[start..start + 8].try_into().unwrap()) as usize;
631 chunk_sizes.push(size);
632 }
633
634 let mut chunks = Vec::with_capacity(chunk_count);
636 let mut offset = header_size;
637
638 for &size in &chunk_sizes {
639 if offset + size > data.len() {
640 return Err(IoError::DecompressionError(
641 "Truncated chunk data".to_string(),
642 ));
643 }
644 chunks.push(&data[offset..offset + size]);
645 offset += size;
646 }
647
648 let total_elements: usize = compressed.metadata.shape.iter().product();
650
651 let use_parallel = capabilities.simd_available && chunks.len() > 4 && total_elements > 10000;
652
653 let decompressed_data = if use_parallel {
654 let decompressed_chunks: Vec<Vec<u8>> = chunks
656 .into_par_iter()
657 .map(|chunk| decompress_data(chunk, algorithm).unwrap_or_else(|_| Vec::new()))
658 .collect();
659
660 let mut result = Vec::with_capacity(total_elements);
661 for chunk_data in decompressed_chunks {
662 if chunk_data.len() % std::mem::size_of::<A>() != 0 {
663 return Err(IoError::DecompressionError(
664 "Invalid chunk alignment".to_string(),
665 ));
666 }
667 let elements = bytemuck::cast_slice::<u8, A>(&chunk_data);
668 result.extend_from_slice(elements);
669 }
670 result
671 } else {
672 let mut decompressed_data = Vec::with_capacity(total_elements);
674
675 for chunk in chunks {
676 let decompressed_chunk = decompress_data(chunk, algorithm)?;
677
678 if decompressed_chunk.len() % std::mem::size_of::<A>() != 0 {
679 return Err(IoError::DecompressionError(
680 "Invalid chunk alignment".to_string(),
681 ));
682 }
683
684 let elements = bytemuck::cast_slice::<u8, A>(&decompressed_chunk);
685 decompressed_data.extend_from_slice(elements);
686 }
687 decompressed_data
688 };
689
690 scirs2_core::ndarray::Array::from_shape_vec(
692 IxDyn(&compressed.metadata.shape),
693 decompressed_data,
694 )
695 .map_err(|e| IoError::DeserializationError(e.to_string()))
696}