1use std::borrow::Cow;
10
11#[cfg(feature = "blosc2")]
12use crate::compression::Blosc2Compressor;
13#[cfg(feature = "lz4")]
14use crate::compression::Lz4Compressor;
15#[cfg(feature = "sz3")]
16use crate::compression::Sz3Compressor;
17#[cfg(feature = "szip")]
18use crate::compression::SzipCompressor;
19#[cfg(feature = "szip-pure")]
20use crate::compression::SzipPureCompressor;
21#[cfg(feature = "zfp")]
22use crate::compression::ZfpCompressor;
23#[cfg(feature = "zstd")]
24use crate::compression::ZstdCompressor;
25#[cfg(feature = "zstd-pure")]
26use crate::compression::ZstdPureCompressor;
27use crate::compression::{CompressResult, CompressionError, Compressor};
28use crate::shuffle;
29use crate::simple_packing::{self, PackingError, SimplePackingParams};
30use serde::{Deserialize, Serialize};
31use std::sync::OnceLock;
32use thiserror::Error;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "lowercase")]
36pub enum ByteOrder {
37 Big,
38 Little,
39}
40
41impl ByteOrder {
42 #[inline]
44 pub fn native() -> Self {
45 #[cfg(target_endian = "little")]
46 {
47 ByteOrder::Little
48 }
49 #[cfg(target_endian = "big")]
50 {
51 ByteOrder::Big
52 }
53 }
54}
55
56pub fn byteswap(data: &mut [u8], unit_size: usize) -> Result<(), PipelineError> {
64 if unit_size <= 1 {
65 return Ok(());
66 }
67 if !data.len().is_multiple_of(unit_size) {
68 return Err(PipelineError::Range(format!(
69 "byteswap: data length {} is not a multiple of unit_size {}",
70 data.len(),
71 unit_size,
72 )));
73 }
74 for chunk in data.chunks_exact_mut(unit_size) {
75 chunk.reverse();
76 }
77 Ok(())
78}
79
80#[derive(Debug, Error)]
81pub enum PipelineError {
82 #[error("encoding error: {0}")]
83 Encoding(#[from] PackingError),
84 #[error("compression error: {0}")]
85 Compression(#[from] CompressionError),
86 #[error("shuffle error: {0}")]
87 Shuffle(String),
88 #[error("range error: {0}")]
89 Range(String),
90 #[error("unknown encoding: {0}")]
91 UnknownEncoding(String),
92 #[error("unknown filter: {0}")]
93 UnknownFilter(String),
94 #[error("unknown compression: {0}")]
95 UnknownCompression(String),
96}
97
98#[derive(Debug, Clone)]
99pub enum EncodingType {
100 None,
101 SimplePacking(SimplePackingParams),
102}
103
104impl std::fmt::Display for EncodingType {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 EncodingType::None => write!(f, "none"),
108 EncodingType::SimplePacking(_) => write!(f, "simple_packing"),
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
114pub enum FilterType {
115 None,
116 Shuffle { element_size: usize },
117}
118
119#[cfg(feature = "blosc2")]
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "lowercase")]
122pub enum Blosc2Codec {
123 Blosclz,
124 Lz4,
125 Lz4hc,
126 Zlib,
127 Zstd,
128}
129
130#[cfg(feature = "zfp")]
131#[derive(Debug, Clone)]
132pub enum ZfpMode {
133 FixedRate { rate: f64 },
134 FixedPrecision { precision: u32 },
135 FixedAccuracy { tolerance: f64 },
136}
137
138#[cfg(feature = "sz3")]
139#[derive(Debug, Clone)]
140pub enum Sz3ErrorBound {
141 Absolute(f64),
142 Relative(f64),
143 Psnr(f64),
144}
145
146#[derive(Debug, Clone)]
147pub enum CompressionType {
148 None,
149 #[cfg(any(feature = "szip", feature = "szip-pure"))]
150 Szip {
151 rsi: u32,
152 block_size: u32,
153 flags: u32,
154 bits_per_sample: u32,
155 },
156 #[cfg(any(feature = "zstd", feature = "zstd-pure"))]
157 Zstd {
158 level: i32,
159 },
160 #[cfg(feature = "lz4")]
161 Lz4,
162 #[cfg(feature = "blosc2")]
163 Blosc2 {
164 codec: Blosc2Codec,
165 clevel: i32,
166 typesize: usize,
167 },
168 #[cfg(feature = "zfp")]
169 Zfp {
170 mode: ZfpMode,
171 },
172 #[cfg(feature = "sz3")]
173 Sz3 {
174 error_bound: Sz3ErrorBound,
175 },
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub enum CompressionBackend {
189 Ffi,
192 Pure,
195}
196
197impl Default for CompressionBackend {
198 fn default() -> Self {
199 default_compression_backend()
200 }
201}
202
203pub fn default_compression_backend() -> CompressionBackend {
210 static DEFAULT: OnceLock<CompressionBackend> = OnceLock::new();
211 *DEFAULT.get_or_init(|| {
212 if cfg!(target_arch = "wasm32") {
213 return CompressionBackend::Pure;
214 }
215 if let Ok(val) = std::env::var("TENSOGRAM_COMPRESSION_BACKEND") {
217 return parse_backend(&val);
218 }
219 CompressionBackend::Ffi
221 })
222}
223
224fn parse_backend(val: &str) -> CompressionBackend {
225 match val.trim().to_ascii_lowercase().as_str() {
226 "pure" | "rust" => CompressionBackend::Pure,
227 _ => CompressionBackend::Ffi,
228 }
229}
230
231#[derive(Debug, Clone)]
232pub struct PipelineConfig {
233 pub encoding: EncodingType,
234 pub filter: FilterType,
235 pub compression: CompressionType,
236 pub num_values: usize,
237 pub byte_order: ByteOrder,
238 pub dtype_byte_width: usize,
239 pub swap_unit_size: usize,
244 pub compression_backend: CompressionBackend,
246 pub intra_codec_threads: u32,
260 pub compute_hash: bool,
278}
279
280pub struct PipelineResult {
281 pub encoded_bytes: Vec<u8>,
282 pub block_offsets: Option<Vec<u64>>,
284 pub hash: Option<u64>,
290}
291
292#[cfg(any(feature = "szip", feature = "szip-pure"))]
294fn build_szip_compressor(
295 #[allow(unused_variables)] backend: CompressionBackend,
296 rsi: u32,
297 block_size: u32,
298 flags: u32,
299 bits_per_sample: u32,
300) -> Box<dyn Compressor> {
301 #[cfg(all(feature = "szip", feature = "szip-pure"))]
303 if matches!(backend, CompressionBackend::Pure) {
304 return Box::new(SzipPureCompressor {
305 rsi,
306 block_size,
307 flags,
308 bits_per_sample,
309 });
310 }
311
312 #[cfg(feature = "szip")]
316 {
317 Box::new(SzipCompressor {
318 rsi,
319 block_size,
320 flags,
321 bits_per_sample,
322 })
323 }
324
325 #[cfg(all(feature = "szip-pure", not(feature = "szip")))]
327 {
328 Box::new(SzipPureCompressor {
329 rsi,
330 block_size,
331 flags,
332 bits_per_sample,
333 })
334 }
335}
336
337#[cfg(any(feature = "zstd", feature = "zstd-pure"))]
343fn build_zstd_compressor(
344 #[allow(unused_variables)] backend: CompressionBackend,
345 level: i32,
346 #[allow(unused_variables)] nb_workers: u32,
347) -> Box<dyn Compressor> {
348 #[cfg(all(feature = "zstd", feature = "zstd-pure"))]
349 if matches!(backend, CompressionBackend::Pure) {
350 return Box::new(ZstdPureCompressor { level });
351 }
352
353 #[cfg(feature = "zstd")]
354 {
355 Box::new(ZstdCompressor { level, nb_workers })
356 }
357
358 #[cfg(all(feature = "zstd-pure", not(feature = "zstd")))]
359 {
360 Box::new(ZstdPureCompressor { level })
361 }
362}
363
364fn build_compressor(
370 compression: &CompressionType,
371 #[allow(unused_variables)] config: &PipelineConfig,
372) -> Result<Option<Box<dyn Compressor>>, CompressionError> {
373 match compression {
374 CompressionType::None => Ok(None),
375 #[cfg(any(feature = "szip", feature = "szip-pure"))]
376 CompressionType::Szip {
377 rsi,
378 block_size,
379 flags,
380 bits_per_sample,
381 } => {
382 let mut szip_flags = *flags;
383 if matches!(config.encoding, EncodingType::SimplePacking(_)) {
387 szip_flags |= 4; }
389
390 let compressor: Box<dyn Compressor> = build_szip_compressor(
394 config.compression_backend,
395 *rsi,
396 *block_size,
397 szip_flags,
398 *bits_per_sample,
399 );
400 Ok(Some(compressor))
401 }
402 #[cfg(any(feature = "zstd", feature = "zstd-pure"))]
403 CompressionType::Zstd { level } => {
404 let compressor: Box<dyn Compressor> = build_zstd_compressor(
405 config.compression_backend,
406 *level,
407 config.intra_codec_threads,
408 );
409 Ok(Some(compressor))
410 }
411 #[cfg(feature = "lz4")]
412 CompressionType::Lz4 => Ok(Some(Box::new(Lz4Compressor))),
413 #[cfg(feature = "blosc2")]
414 CompressionType::Blosc2 {
415 codec,
416 clevel,
417 typesize,
418 } => Ok(Some(Box::new(Blosc2Compressor {
419 codec: *codec,
420 clevel: *clevel,
421 typesize: *typesize,
422 nthreads: config.intra_codec_threads,
423 }))),
424 #[cfg(feature = "zfp")]
425 CompressionType::Zfp { mode } => Ok(Some(Box::new(ZfpCompressor {
426 mode: mode.clone(),
427 num_values: config.num_values,
428 byte_order: config.byte_order,
429 }))),
430 #[cfg(feature = "sz3")]
431 CompressionType::Sz3 { error_bound } => Ok(Some(Box::new(Sz3Compressor {
432 error_bound: error_bound.clone(),
433 num_values: config.num_values,
434 byte_order: config.byte_order,
435 }))),
436 }
437}
438
439#[inline]
453fn copy_and_hash(src: &[u8], hasher: Option<&mut xxhash_rust::xxh3::Xxh3Default>) -> Vec<u8> {
454 match hasher {
455 None => src.to_vec(),
456 Some(h) => {
457 const CHUNK: usize = 64 * 1024;
458 let mut dst = Vec::with_capacity(src.len());
459 let mut offset = 0;
460 while offset < src.len() {
461 let end = (offset + CHUNK).min(src.len());
462 let chunk = &src[offset..end];
463 h.update(chunk);
464 dst.extend_from_slice(chunk);
465 offset = end;
466 }
467 dst
468 }
469 }
470}
471
472#[inline]
476fn update_hasher(bytes: &[u8], hasher: Option<&mut xxhash_rust::xxh3::Xxh3Default>) {
477 if let Some(h) = hasher {
478 h.update(bytes);
479 }
480}
481
482#[tracing::instrument(skip(data, config), fields(data_len = data.len(), encoding = %config.encoding))]
493pub fn encode_pipeline(
494 data: &[u8],
495 config: &PipelineConfig,
496) -> Result<PipelineResult, PipelineError> {
497 let mut hasher = config
498 .compute_hash
499 .then(xxhash_rust::xxh3::Xxh3Default::new);
500
501 let encoded: Cow<'_, [u8]> = match &config.encoding {
503 EncodingType::None => Cow::Borrowed(data),
504 EncodingType::SimplePacking(params) => {
505 let values = bytes_to_f64(data, config.byte_order);
506 Cow::Owned(simple_packing::encode_with_threads(
507 &values,
508 params,
509 config.intra_codec_threads,
510 )?)
511 }
512 };
513
514 let filtered: Cow<'_, [u8]> = match &config.filter {
516 FilterType::None => encoded,
517 FilterType::Shuffle { element_size } => Cow::Owned(
518 shuffle::shuffle_with_threads(&encoded, *element_size, config.intra_codec_threads)
519 .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
520 ),
521 };
522
523 let compressor = build_compressor(&config.compression, config)?;
525 let (encoded_bytes, block_offsets) = match compressor {
526 None => {
527 let owned = match filtered {
532 Cow::Borrowed(src) => copy_and_hash(src, hasher.as_mut()),
533 Cow::Owned(buf) => {
534 update_hasher(&buf, hasher.as_mut());
535 buf
536 }
537 };
538 (owned, None)
539 }
540 Some(compressor) => {
541 let CompressResult {
542 data: compressed,
543 block_offsets,
544 } = compressor.compress(&filtered)?;
545 update_hasher(&compressed, hasher.as_mut());
546 (compressed, block_offsets)
547 }
548 };
549
550 Ok(PipelineResult {
551 encoded_bytes,
552 block_offsets,
553 hash: hasher.map(|h| h.digest()),
554 })
555}
556
557#[tracing::instrument(skip(values, config), fields(num_values = values.len(), encoding = %config.encoding))]
563pub fn encode_pipeline_f64(
564 values: &[f64],
565 config: &PipelineConfig,
566) -> Result<PipelineResult, PipelineError> {
567 let mut hasher = config
568 .compute_hash
569 .then(xxhash_rust::xxh3::Xxh3Default::new);
570
571 let encoded: Cow<'_, [u8]> = match &config.encoding {
572 EncodingType::None => Cow::Owned(f64_to_bytes(values, config.byte_order)),
573 EncodingType::SimplePacking(params) => Cow::Owned(simple_packing::encode_with_threads(
574 values,
575 params,
576 config.intra_codec_threads,
577 )?),
578 };
579
580 let filtered: Cow<'_, [u8]> = match &config.filter {
581 FilterType::None => encoded,
582 FilterType::Shuffle { element_size } => Cow::Owned(
583 shuffle::shuffle_with_threads(&encoded, *element_size, config.intra_codec_threads)
584 .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
585 ),
586 };
587
588 let compressor = build_compressor(&config.compression, config)?;
589 let (encoded_bytes, block_offsets) = match compressor {
590 None => {
591 let owned = filtered.into_owned();
595 update_hasher(&owned, hasher.as_mut());
596 (owned, None)
597 }
598 Some(compressor) => {
599 let CompressResult {
600 data: compressed,
601 block_offsets,
602 } = compressor.compress(&filtered)?;
603 update_hasher(&compressed, hasher.as_mut());
604 (compressed, block_offsets)
605 }
606 };
607
608 Ok(PipelineResult {
609 encoded_bytes,
610 block_offsets,
611 hash: hasher.map(|h| h.digest()),
612 })
613}
614
615#[tracing::instrument(skip(encoded, config), fields(encoded_len = encoded.len()))]
622pub fn decode_pipeline(
623 encoded: &[u8],
624 config: &PipelineConfig,
625 native_byte_order: bool,
626) -> Result<Vec<u8>, PipelineError> {
627 let decompressed: Cow<'_, [u8]> = match build_compressor(&config.compression, config)? {
629 None => Cow::Borrowed(encoded),
630 Some(compressor) => {
631 let expected_size = estimate_decompressed_size(config);
632 Cow::Owned(compressor.decompress(encoded, expected_size)?)
633 }
634 };
635
636 let unfiltered: Cow<'_, [u8]> = match &config.filter {
638 FilterType::None => decompressed,
639 FilterType::Shuffle { element_size } => Cow::Owned(
640 shuffle::unshuffle_with_threads(
641 &decompressed,
642 *element_size,
643 config.intra_codec_threads,
644 )
645 .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
646 ),
647 };
648
649 let target_byte_order = if native_byte_order {
653 ByteOrder::native()
654 } else {
655 config.byte_order
656 };
657
658 let mut decoded = match &config.encoding {
660 EncodingType::None => unfiltered.into_owned(),
661 EncodingType::SimplePacking(params) => {
662 let values = simple_packing::decode_with_threads(
665 &unfiltered,
666 config.num_values,
667 params,
668 config.intra_codec_threads,
669 )?;
670 f64_to_bytes(&values, target_byte_order)
671 }
672 };
673
674 if native_byte_order
677 && matches!(config.encoding, EncodingType::None)
678 && config.byte_order != ByteOrder::native()
679 {
680 byteswap(&mut decoded, config.swap_unit_size)?;
681 }
682
683 Ok(decoded)
684}
685
686pub fn decode_range_pipeline(
697 encoded: &[u8],
698 config: &PipelineConfig,
699 block_offsets: &[u64],
700 sample_offset: u64,
701 sample_count: u64,
702 native_byte_order: bool,
703) -> Result<Vec<u8>, PipelineError> {
704 if matches!(config.filter, FilterType::Shuffle { .. }) {
705 return Err(PipelineError::Shuffle(
706 "partial range decode is not supported with shuffle filter".to_string(),
707 ));
708 }
709
710 let (byte_start, byte_size, bit_offset_in_chunk) = match &config.encoding {
712 EncodingType::SimplePacking(params) => {
713 let bit_start = sample_offset * params.bits_per_value as u64;
714 let bit_count = sample_count * params.bits_per_value as u64;
715 let bs = (bit_start / 8) as usize;
716 let be = (bit_start + bit_count).div_ceil(8) as usize;
717 (bs, be - bs, Some((bit_start % 8) as usize))
718 }
719 EncodingType::None => {
720 let elem_size = config.dtype_byte_width;
721 let bs = (sample_offset as usize)
722 .checked_mul(elem_size)
723 .ok_or_else(|| PipelineError::Range("byte offset overflow".to_string()))?;
724 let sz = (sample_count as usize)
725 .checked_mul(elem_size)
726 .ok_or_else(|| PipelineError::Range("byte count overflow".to_string()))?;
727 (bs, sz, None)
728 }
729 };
730
731 let decompressed = match build_compressor(&config.compression, config)? {
733 None => {
734 let byte_end = byte_start
736 .checked_add(byte_size)
737 .ok_or_else(|| PipelineError::Range("byte end overflow".to_string()))?;
738 if byte_end > encoded.len() {
739 return Err(PipelineError::Range(format!(
740 "range ({sample_offset}, {sample_count}) exceeds payload size"
741 )));
742 }
743 encoded[byte_start..byte_end].to_vec()
744 }
745 Some(compressor) => {
746 compressor.decompress_range(encoded, block_offsets, byte_start, byte_size)?
747 }
748 };
749
750 let target_byte_order = if native_byte_order {
751 ByteOrder::native()
752 } else {
753 config.byte_order
754 };
755
756 match &config.encoding {
758 EncodingType::None => {
759 let mut result = decompressed;
760 if native_byte_order && config.byte_order != ByteOrder::native() {
761 byteswap(&mut result, config.swap_unit_size)?;
762 }
763 Ok(result)
764 }
765 EncodingType::SimplePacking(params) => {
766 let values = simple_packing::decode_range(
767 &decompressed,
768 bit_offset_in_chunk.unwrap_or(0),
769 sample_count as usize,
770 params,
771 )?;
772 Ok(f64_to_bytes(&values, target_byte_order))
773 }
774 }
775}
776
777fn estimate_decompressed_size(config: &PipelineConfig) -> usize {
778 match &config.encoding {
779 EncodingType::None => config.num_values.saturating_mul(config.dtype_byte_width),
780 EncodingType::SimplePacking(params) => {
781 let total_bits =
782 (config.num_values as u128).saturating_mul(params.bits_per_value as u128);
783 total_bits.div_ceil(8).min(usize::MAX as u128) as usize
784 }
785 }
786}
787
788fn bytes_to_f64(data: &[u8], byte_order: ByteOrder) -> Vec<f64> {
789 data.chunks_exact(8)
790 .map(|chunk| {
791 let mut arr = [0u8; 8];
792 arr.copy_from_slice(chunk);
793 match byte_order {
794 ByteOrder::Big => f64::from_be_bytes(arr),
795 ByteOrder::Little => f64::from_le_bytes(arr),
796 }
797 })
798 .collect()
799}
800
801fn f64_to_bytes(values: &[f64], byte_order: ByteOrder) -> Vec<u8> {
802 values
803 .iter()
804 .flat_map(|v| match byte_order {
805 ByteOrder::Big => v.to_be_bytes(),
806 ByteOrder::Little => v.to_le_bytes(),
807 })
808 .collect()
809}
810
811#[cfg(test)]
812mod tests {
813 use super::*;
814
815 #[test]
816 fn test_passthrough_pipeline() {
817 let data = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
818 let config = PipelineConfig {
819 encoding: EncodingType::None,
820 filter: FilterType::None,
821 compression: CompressionType::None,
822 num_values: 1,
823 byte_order: ByteOrder::Little,
824 dtype_byte_width: 8,
825 swap_unit_size: 8,
826 compression_backend: CompressionBackend::default(),
827 intra_codec_threads: 0,
828 compute_hash: false,
829 };
830 let result = encode_pipeline(&data, &config).unwrap();
831 assert_eq!(result.encoded_bytes, data);
832 let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
833 assert_eq!(decoded, data);
834 }
835
836 #[test]
837 fn test_simple_packing_pipeline() {
838 let values: Vec<f64> = (0..50).map(|i| 200.0 + i as f64 * 0.1).collect();
839 let data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
840 let params = simple_packing::compute_params(&values, 16, 0).unwrap();
841
842 let config = PipelineConfig {
843 encoding: EncodingType::SimplePacking(params),
844 filter: FilterType::None,
845 compression: CompressionType::None,
846 num_values: values.len(),
847 byte_order: ByteOrder::Little,
848 dtype_byte_width: 8,
849 swap_unit_size: 8,
850 compression_backend: CompressionBackend::default(),
851 intra_codec_threads: 0,
852 compute_hash: false,
853 };
854
855 let result = encode_pipeline(&data, &config).unwrap();
856 let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
857 let decoded_values = bytes_to_f64(&decoded, ByteOrder::Little);
858
859 for (orig, dec) in values.iter().zip(decoded_values.iter()) {
860 assert!((orig - dec).abs() < 0.01, "orig={orig}, dec={dec}");
861 }
862 }
863
864 #[test]
865 fn test_shuffle_pipeline() {
866 let data: Vec<u8> = (0..16).collect();
867 let config = PipelineConfig {
868 encoding: EncodingType::None,
869 filter: FilterType::Shuffle { element_size: 4 },
870 compression: CompressionType::None,
871 num_values: 4,
872 byte_order: ByteOrder::Little,
873 dtype_byte_width: 4,
874 swap_unit_size: 4,
875 compression_backend: CompressionBackend::default(),
876 intra_codec_threads: 0,
877 compute_hash: false,
878 };
879
880 let result = encode_pipeline(&data, &config).unwrap();
881 assert_ne!(result.encoded_bytes, data); let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
883 assert_eq!(decoded, data);
884 }
885
886 #[cfg(any(feature = "szip", feature = "szip-pure"))]
887 #[test]
888 fn test_szip_round_trip_pipeline() {
889 let data: Vec<u8> = (0..2048).map(|i| (i % 256) as u8).collect();
890
891 let preprocess_flag = 8u32;
893
894 let config = PipelineConfig {
895 encoding: EncodingType::None,
896 filter: FilterType::None,
897 compression: CompressionType::Szip {
898 rsi: 128,
899 block_size: 16,
900 flags: preprocess_flag,
901 bits_per_sample: 8,
902 },
903 num_values: 2048,
904 byte_order: ByteOrder::Little,
905 dtype_byte_width: 1,
906 swap_unit_size: 1,
907 compression_backend: CompressionBackend::default(),
908 intra_codec_threads: 0,
909 compute_hash: false,
910 };
911
912 let result = encode_pipeline(&data, &config).unwrap();
913 assert!(result.block_offsets.is_some());
914
915 let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
916 assert_eq!(decoded, data);
917 }
918
919 #[test]
924 fn test_byteswap_noop_for_single_byte() {
925 let mut data = vec![1, 2, 3, 4];
926 let original = data.clone();
927 byteswap(&mut data, 1).unwrap();
928 assert_eq!(data, original);
929 byteswap(&mut data, 0).unwrap();
930 assert_eq!(data, original);
931 }
932
933 #[test]
934 fn test_byteswap_2_bytes() {
935 let mut data = vec![0xAA, 0xBB, 0xCC, 0xDD];
936 byteswap(&mut data, 2).unwrap();
937 assert_eq!(data, vec![0xBB, 0xAA, 0xDD, 0xCC]);
938 }
939
940 #[test]
941 fn test_byteswap_4_bytes() {
942 let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8];
943 byteswap(&mut data, 4).unwrap();
944 assert_eq!(data, vec![4, 3, 2, 1, 8, 7, 6, 5]);
945 }
946
947 #[test]
948 fn test_byteswap_8_bytes() {
949 let mut data: Vec<u8> = (1..=16).collect();
950 byteswap(&mut data, 8).unwrap();
951 assert_eq!(
952 data,
953 vec![8, 7, 6, 5, 4, 3, 2, 1, 16, 15, 14, 13, 12, 11, 10, 9]
954 );
955 }
956
957 #[test]
958 fn test_byteswap_round_trip() {
959 let original = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
960 let mut data = original.clone();
961 byteswap(&mut data, 4).unwrap();
962 assert_ne!(data, original);
963 byteswap(&mut data, 4).unwrap();
964 assert_eq!(data, original);
965 }
966
967 #[test]
968 fn test_byteswap_misaligned_returns_error() {
969 let mut data = vec![1, 2, 3, 4, 5]; let result = byteswap(&mut data, 4);
971 assert!(result.is_err());
972 }
973
974 #[test]
979 fn test_decode_native_byte_order_encoding_none() {
980 let value: f32 = 42.0;
982 let be_bytes = value.to_be_bytes();
983 let config = PipelineConfig {
984 encoding: EncodingType::None,
985 filter: FilterType::None,
986 compression: CompressionType::None,
987 num_values: 1,
988 byte_order: ByteOrder::Big,
989 dtype_byte_width: 4,
990 swap_unit_size: 4,
991 compression_backend: CompressionBackend::default(),
992 intra_codec_threads: 0,
993 compute_hash: false,
994 };
995
996 let result = encode_pipeline(&be_bytes, &config).unwrap();
997
998 let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1000 let ne_value = f32::from_ne_bytes(native_decoded[..4].try_into().unwrap());
1001 assert_eq!(ne_value, value);
1002
1003 let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
1005 let be_value = f32::from_be_bytes(wire_decoded[..4].try_into().unwrap());
1006 assert_eq!(be_value, value);
1007 }
1008
1009 #[test]
1010 fn test_decode_native_byte_order_simple_packing() {
1011 let values: Vec<f64> = vec![100.0, 200.0, 300.0, 400.0];
1012 let data: Vec<u8> = values.iter().flat_map(|v| v.to_be_bytes()).collect();
1014 let params = simple_packing::compute_params(&values, 24, 0).unwrap();
1015
1016 let config = PipelineConfig {
1017 encoding: EncodingType::SimplePacking(params),
1018 filter: FilterType::None,
1019 compression: CompressionType::None,
1020 num_values: values.len(),
1021 byte_order: ByteOrder::Big,
1022 dtype_byte_width: 8,
1023 swap_unit_size: 8,
1024 compression_backend: CompressionBackend::default(),
1025 intra_codec_threads: 0,
1026 compute_hash: false,
1027 };
1028
1029 let result = encode_pipeline(&data, &config).unwrap();
1030
1031 let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1033 let decoded_values: Vec<f64> = native_decoded
1034 .chunks_exact(8)
1035 .map(|c| f64::from_ne_bytes(c.try_into().unwrap()))
1036 .collect();
1037 for (orig, dec) in values.iter().zip(decoded_values.iter()) {
1038 assert!((orig - dec).abs() < 1.0, "orig={orig}, dec={dec}");
1039 }
1040
1041 let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
1043 let wire_values: Vec<f64> = wire_decoded
1044 .chunks_exact(8)
1045 .map(|c| f64::from_be_bytes(c.try_into().unwrap()))
1046 .collect();
1047 for (orig, dec) in values.iter().zip(wire_values.iter()) {
1048 assert!((orig - dec).abs() < 1.0, "orig={orig}, dec={dec}");
1049 }
1050 }
1051
1052 #[test]
1053 fn test_native_byte_order_same_as_wire_is_noop() {
1054 let values: Vec<f32> = vec![1.0, 2.0, 3.0, 4.0];
1057 let data: Vec<u8> = values.iter().flat_map(|v| v.to_ne_bytes()).collect();
1058
1059 let config = PipelineConfig {
1060 encoding: EncodingType::None,
1061 filter: FilterType::None,
1062 compression: CompressionType::None,
1063 num_values: values.len(),
1064 byte_order: ByteOrder::native(),
1065 dtype_byte_width: 4,
1066 swap_unit_size: 4,
1067 compression_backend: CompressionBackend::default(),
1068 intra_codec_threads: 0,
1069 compute_hash: false,
1070 };
1071
1072 let result = encode_pipeline(&data, &config).unwrap();
1073 let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1074 let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
1075 assert_eq!(native_decoded, wire_decoded);
1076 }
1077
1078 #[test]
1079 fn test_decode_native_byte_order_2byte_dtype() {
1080 let value: u16 = 0x0102;
1082 let be_bytes = value.to_be_bytes();
1083 let config = PipelineConfig {
1084 encoding: EncodingType::None,
1085 filter: FilterType::None,
1086 compression: CompressionType::None,
1087 num_values: 1,
1088 byte_order: ByteOrder::Big,
1089 dtype_byte_width: 2,
1090 swap_unit_size: 2,
1091 compression_backend: CompressionBackend::default(),
1092 intra_codec_threads: 0,
1093 compute_hash: false,
1094 };
1095 let result = encode_pipeline(&be_bytes, &config).unwrap();
1096 let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1097 assert_eq!(u16::from_ne_bytes(native[..2].try_into().unwrap()), value);
1098 }
1099
1100 #[test]
1101 fn test_decode_native_byte_order_8byte_dtype() {
1102 let value: f64 = std::f64::consts::E;
1104 let be_bytes = value.to_be_bytes();
1105 let config = PipelineConfig {
1106 encoding: EncodingType::None,
1107 filter: FilterType::None,
1108 compression: CompressionType::None,
1109 num_values: 1,
1110 byte_order: ByteOrder::Big,
1111 dtype_byte_width: 8,
1112 swap_unit_size: 8,
1113 compression_backend: CompressionBackend::default(),
1114 intra_codec_threads: 0,
1115 compute_hash: false,
1116 };
1117 let result = encode_pipeline(&be_bytes, &config).unwrap();
1118 let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1119 assert_eq!(f64::from_ne_bytes(native[..8].try_into().unwrap()), value);
1120 }
1121
1122 #[test]
1123 fn test_decode_native_byte_order_complex64() {
1124 let real: f32 = 1.5;
1127 let imag: f32 = 2.5;
1128 let mut be_bytes = Vec::new();
1129 be_bytes.extend_from_slice(&real.to_be_bytes());
1130 be_bytes.extend_from_slice(&imag.to_be_bytes());
1131 let config = PipelineConfig {
1132 encoding: EncodingType::None,
1133 filter: FilterType::None,
1134 compression: CompressionType::None,
1135 num_values: 1,
1136 byte_order: ByteOrder::Big,
1137 dtype_byte_width: 8,
1138 swap_unit_size: 4, compression_backend: CompressionBackend::default(),
1140 intra_codec_threads: 0,
1141 compute_hash: false,
1142 };
1143 let result = encode_pipeline(&be_bytes, &config).unwrap();
1144 let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1145 let decoded_real = f32::from_ne_bytes(native[0..4].try_into().unwrap());
1146 let decoded_imag = f32::from_ne_bytes(native[4..8].try_into().unwrap());
1147 assert_eq!(decoded_real, real);
1148 assert_eq!(decoded_imag, imag);
1149 }
1150
1151 #[test]
1152 fn test_decode_native_byte_order_uint8_noop() {
1153 let data = vec![1u8, 2, 3, 4, 5];
1155 let config = PipelineConfig {
1156 encoding: EncodingType::None,
1157 filter: FilterType::None,
1158 compression: CompressionType::None,
1159 num_values: 5,
1160 byte_order: ByteOrder::Big, dtype_byte_width: 1,
1162 swap_unit_size: 1,
1163 compression_backend: CompressionBackend::default(),
1164 intra_codec_threads: 0,
1165 compute_hash: false,
1166 };
1167 let result = encode_pipeline(&data, &config).unwrap();
1168 let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1169 assert_eq!(native, data); }
1171
1172 fn passthrough_config(num_values: usize, compute_hash: bool) -> PipelineConfig {
1180 PipelineConfig {
1181 encoding: EncodingType::None,
1182 filter: FilterType::None,
1183 compression: CompressionType::None,
1184 num_values,
1185 byte_order: ByteOrder::Little,
1186 dtype_byte_width: 1,
1187 swap_unit_size: 1,
1188 compression_backend: CompressionBackend::default(),
1189 intra_codec_threads: 0,
1190 compute_hash,
1191 }
1192 }
1193
1194 #[test]
1195 fn streaming_and_oneshot_xxh3_agree() {
1196 use xxhash_rust::xxh3::{Xxh3Default, xxh3_64};
1202
1203 for size in [0usize, 1, 239, 240, 1024 * 1024 + 17] {
1204 let data: Vec<u8> = (0..size).map(|i| (i * 31 + 7) as u8).collect();
1205
1206 let one_shot = xxh3_64(&data);
1208
1209 let mut h = Xxh3Default::new();
1211 for chunk in data.chunks(64 * 1024) {
1212 h.update(chunk);
1213 }
1214 assert_eq!(h.digest(), one_shot, "streaming vs one-shot at size {size}");
1215
1216 let mut h = Xxh3Default::new();
1218 for chunk in data.chunks(1) {
1219 h.update(chunk);
1220 }
1221 assert_eq!(
1222 h.digest(),
1223 one_shot,
1224 "streaming 1-byte chunks vs one-shot at size {size}"
1225 );
1226 }
1227 }
1228
1229 #[test]
1230 fn pipeline_hash_none_when_disabled() {
1231 let data: Vec<u8> = (0..64).collect();
1232 let config = passthrough_config(data.len(), false);
1233 let result = encode_pipeline(&data, &config).unwrap();
1234 assert!(
1235 result.hash.is_none(),
1236 "compute_hash = false must leave PipelineResult.hash = None"
1237 );
1238 }
1239
1240 #[test]
1241 fn pipeline_hash_matches_post_hoc_for_passthrough() {
1242 use xxhash_rust::xxh3::xxh3_64;
1243
1244 for size in [0usize, 1, 64 * 1024 - 1, 64 * 1024, 64 * 1024 + 1, 250_000] {
1247 let data: Vec<u8> = (0..size).map(|i| (i as u32 ^ 0xA5A5A5A5) as u8).collect();
1248 let config = passthrough_config(size, true);
1249 let result = encode_pipeline(&data, &config).unwrap();
1250 let expected = xxh3_64(&result.encoded_bytes);
1251 assert_eq!(
1252 result.hash,
1253 Some(expected),
1254 "passthrough hash-while-encoding mismatch at size {size}"
1255 );
1256 assert_eq!(
1257 result.encoded_bytes, data,
1258 "passthrough must still produce identical bytes at size {size}"
1259 );
1260 }
1261 }
1262
1263 #[test]
1264 fn pipeline_hash_matches_post_hoc_for_simple_packing() {
1265 use xxhash_rust::xxh3::xxh3_64;
1266
1267 let values: Vec<f64> = (0..10_000).map(|i| 200.0 + i as f64 * 0.1).collect();
1268 let data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
1269 let params = simple_packing::compute_params(&values, 16, 0).unwrap();
1270
1271 let config = PipelineConfig {
1272 encoding: EncodingType::SimplePacking(params),
1273 filter: FilterType::None,
1274 compression: CompressionType::None,
1275 num_values: values.len(),
1276 byte_order: ByteOrder::Little,
1277 dtype_byte_width: 8,
1278 swap_unit_size: 8,
1279 compression_backend: CompressionBackend::default(),
1280 intra_codec_threads: 0,
1281 compute_hash: true,
1282 };
1283 let result = encode_pipeline(&data, &config).unwrap();
1284 let expected = xxh3_64(&result.encoded_bytes);
1285 assert_eq!(result.hash, Some(expected));
1286 }
1287
1288 #[cfg(feature = "lz4")]
1289 #[test]
1290 fn pipeline_hash_matches_post_hoc_for_lz4() {
1291 use xxhash_rust::xxh3::xxh3_64;
1292
1293 let data: Vec<u8> = (0..16_000).map(|i| (i % 257) as u8).collect();
1294 let config = PipelineConfig {
1295 encoding: EncodingType::None,
1296 filter: FilterType::None,
1297 compression: CompressionType::Lz4,
1298 num_values: data.len(),
1299 byte_order: ByteOrder::Little,
1300 dtype_byte_width: 1,
1301 swap_unit_size: 1,
1302 compression_backend: CompressionBackend::default(),
1303 intra_codec_threads: 0,
1304 compute_hash: true,
1305 };
1306 let result = encode_pipeline(&data, &config).unwrap();
1307 let expected = xxh3_64(&result.encoded_bytes);
1308 assert_eq!(result.hash, Some(expected));
1309 }
1310
1311 #[test]
1312 fn pipeline_f64_hash_matches_post_hoc() {
1313 use xxhash_rust::xxh3::xxh3_64;
1314
1315 let values: Vec<f64> = (0..1_000).map(|i| (i as f64).sqrt()).collect();
1316 let config = PipelineConfig {
1317 encoding: EncodingType::None,
1318 filter: FilterType::None,
1319 compression: CompressionType::None,
1320 num_values: values.len(),
1321 byte_order: ByteOrder::Little,
1322 dtype_byte_width: 8,
1323 swap_unit_size: 8,
1324 compression_backend: CompressionBackend::default(),
1325 intra_codec_threads: 0,
1326 compute_hash: true,
1327 };
1328 let result = encode_pipeline_f64(&values, &config).unwrap();
1329 let expected = xxh3_64(&result.encoded_bytes);
1330 assert_eq!(result.hash, Some(expected));
1331 }
1332
1333 #[test]
1334 fn pipeline_hash_byte_identical_across_threads_transparent() {
1335 let values: Vec<f64> = (0..50_000)
1339 .map(|i| 280.0 + (i as f64 * 0.001).sin())
1340 .collect();
1341 let data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
1342 let params = simple_packing::compute_params(&values, 24, 0).unwrap();
1343
1344 let mut hashes = Vec::new();
1345 for threads in [0u32, 1, 2, 4] {
1346 let config = PipelineConfig {
1347 encoding: EncodingType::SimplePacking(params.clone()),
1348 filter: FilterType::None,
1349 compression: CompressionType::None,
1350 num_values: values.len(),
1351 byte_order: ByteOrder::Little,
1352 dtype_byte_width: 8,
1353 swap_unit_size: 8,
1354 compression_backend: CompressionBackend::default(),
1355 intra_codec_threads: threads,
1356 compute_hash: true,
1357 };
1358 let result = encode_pipeline(&data, &config).unwrap();
1359 hashes.push(result.hash);
1360 }
1361 assert!(
1362 hashes.windows(2).all(|w| w[0] == w[1]),
1363 "transparent simple_packing must produce byte-identical hashes across thread counts: {hashes:?}"
1364 );
1365 }
1366}