1use std::borrow::Cow;
10
11#[cfg(feature = "blosc2")]
12use crate::compression::Blosc2Compressor;
13#[cfg(feature = "lz4")]
14use crate::compression::Lz4Compressor;
15#[cfg(feature = "sz3")]
16use crate::compression::Sz3Compressor;
17#[cfg(feature = "szip")]
18use crate::compression::SzipCompressor;
19#[cfg(feature = "szip-pure")]
20use crate::compression::SzipPureCompressor;
21#[cfg(feature = "zfp")]
22use crate::compression::ZfpCompressor;
23#[cfg(feature = "zstd")]
24use crate::compression::ZstdCompressor;
25#[cfg(feature = "zstd-pure")]
26use crate::compression::ZstdPureCompressor;
27use crate::compression::{CompressResult, CompressionError, Compressor};
28use crate::shuffle;
29use crate::simple_packing::{self, PackingError, SimplePackingParams};
30use serde::{Deserialize, Serialize};
31use std::sync::OnceLock;
32use thiserror::Error;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "lowercase")]
36pub enum ByteOrder {
37 Big,
38 Little,
39}
40
41impl ByteOrder {
42 #[inline]
44 pub fn native() -> Self {
45 #[cfg(target_endian = "little")]
46 {
47 ByteOrder::Little
48 }
49 #[cfg(target_endian = "big")]
50 {
51 ByteOrder::Big
52 }
53 }
54}
55
56pub fn byteswap(data: &mut [u8], unit_size: usize) -> Result<(), PipelineError> {
64 if unit_size <= 1 {
65 return Ok(());
66 }
67 if !data.len().is_multiple_of(unit_size) {
68 return Err(PipelineError::Range(format!(
69 "byteswap: data length {} is not a multiple of unit_size {}",
70 data.len(),
71 unit_size,
72 )));
73 }
74 for chunk in data.chunks_exact_mut(unit_size) {
75 chunk.reverse();
76 }
77 Ok(())
78}
79
80#[derive(Debug, Error)]
81pub enum PipelineError {
82 #[error("encoding error: {0}")]
83 Encoding(#[from] PackingError),
84 #[error("compression error: {0}")]
85 Compression(#[from] CompressionError),
86 #[error("shuffle error: {0}")]
87 Shuffle(String),
88 #[error("range error: {0}")]
89 Range(String),
90 #[error("unknown encoding: {0}")]
91 UnknownEncoding(String),
92 #[error("unknown filter: {0}")]
93 UnknownFilter(String),
94 #[error("unknown compression: {0}")]
95 UnknownCompression(String),
96}
97
98#[derive(Debug, Clone)]
99pub enum EncodingType {
100 None,
101 SimplePacking(SimplePackingParams),
102}
103
104impl std::fmt::Display for EncodingType {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 EncodingType::None => write!(f, "none"),
108 EncodingType::SimplePacking(_) => write!(f, "simple_packing"),
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
114pub enum FilterType {
115 None,
116 Shuffle { element_size: usize },
117}
118
119#[cfg(feature = "blosc2")]
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "lowercase")]
122pub enum Blosc2Codec {
123 Blosclz,
124 Lz4,
125 Lz4hc,
126 Zlib,
127 Zstd,
128}
129
130#[cfg(feature = "zfp")]
131#[derive(Debug, Clone)]
132pub enum ZfpMode {
133 FixedRate { rate: f64 },
134 FixedPrecision { precision: u32 },
135 FixedAccuracy { tolerance: f64 },
136}
137
138#[cfg(feature = "sz3")]
139#[derive(Debug, Clone)]
140pub enum Sz3ErrorBound {
141 Absolute(f64),
142 Relative(f64),
143 Psnr(f64),
144}
145
146#[derive(Debug, Clone)]
147pub enum CompressionType {
148 None,
149 #[cfg(any(feature = "szip", feature = "szip-pure"))]
150 Szip {
151 rsi: u32,
152 block_size: u32,
153 flags: u32,
154 bits_per_sample: u32,
155 },
156 #[cfg(any(feature = "zstd", feature = "zstd-pure"))]
157 Zstd {
158 level: i32,
159 },
160 #[cfg(feature = "lz4")]
161 Lz4,
162 #[cfg(feature = "blosc2")]
163 Blosc2 {
164 codec: Blosc2Codec,
165 clevel: i32,
166 typesize: usize,
167 },
168 #[cfg(feature = "zfp")]
169 Zfp {
170 mode: ZfpMode,
171 },
172 #[cfg(feature = "sz3")]
173 Sz3 {
174 error_bound: Sz3ErrorBound,
175 },
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub enum CompressionBackend {
189 Ffi,
192 Pure,
195}
196
197impl Default for CompressionBackend {
198 fn default() -> Self {
199 default_compression_backend()
200 }
201}
202
203pub fn default_compression_backend() -> CompressionBackend {
210 static DEFAULT: OnceLock<CompressionBackend> = OnceLock::new();
211 *DEFAULT.get_or_init(|| {
212 if cfg!(target_arch = "wasm32") {
213 return CompressionBackend::Pure;
214 }
215 if let Ok(val) = std::env::var("TENSOGRAM_COMPRESSION_BACKEND") {
217 return parse_backend(&val);
218 }
219 CompressionBackend::Ffi
221 })
222}
223
224fn parse_backend(val: &str) -> CompressionBackend {
225 match val.trim().to_ascii_lowercase().as_str() {
226 "pure" | "rust" => CompressionBackend::Pure,
227 _ => CompressionBackend::Ffi,
228 }
229}
230
231#[derive(Debug, Clone)]
232pub struct PipelineConfig {
233 pub encoding: EncodingType,
234 pub filter: FilterType,
235 pub compression: CompressionType,
236 pub num_values: usize,
237 pub byte_order: ByteOrder,
238 pub dtype_byte_width: usize,
239 pub swap_unit_size: usize,
244 pub compression_backend: CompressionBackend,
246 pub intra_codec_threads: u32,
260}
261
262pub struct PipelineResult {
263 pub encoded_bytes: Vec<u8>,
264 pub block_offsets: Option<Vec<u64>>,
266}
267
268#[cfg(any(feature = "szip", feature = "szip-pure"))]
270fn build_szip_compressor(
271 #[allow(unused_variables)] backend: CompressionBackend,
272 rsi: u32,
273 block_size: u32,
274 flags: u32,
275 bits_per_sample: u32,
276) -> Box<dyn Compressor> {
277 #[cfg(all(feature = "szip", feature = "szip-pure"))]
279 if matches!(backend, CompressionBackend::Pure) {
280 return Box::new(SzipPureCompressor {
281 rsi,
282 block_size,
283 flags,
284 bits_per_sample,
285 });
286 }
287
288 #[cfg(feature = "szip")]
292 {
293 Box::new(SzipCompressor {
294 rsi,
295 block_size,
296 flags,
297 bits_per_sample,
298 })
299 }
300
301 #[cfg(all(feature = "szip-pure", not(feature = "szip")))]
303 {
304 Box::new(SzipPureCompressor {
305 rsi,
306 block_size,
307 flags,
308 bits_per_sample,
309 })
310 }
311}
312
313#[cfg(any(feature = "zstd", feature = "zstd-pure"))]
319fn build_zstd_compressor(
320 #[allow(unused_variables)] backend: CompressionBackend,
321 level: i32,
322 #[allow(unused_variables)] nb_workers: u32,
323) -> Box<dyn Compressor> {
324 #[cfg(all(feature = "zstd", feature = "zstd-pure"))]
325 if matches!(backend, CompressionBackend::Pure) {
326 return Box::new(ZstdPureCompressor { level });
327 }
328
329 #[cfg(feature = "zstd")]
330 {
331 Box::new(ZstdCompressor { level, nb_workers })
332 }
333
334 #[cfg(all(feature = "zstd-pure", not(feature = "zstd")))]
335 {
336 Box::new(ZstdPureCompressor { level })
337 }
338}
339
340fn build_compressor(
346 compression: &CompressionType,
347 #[allow(unused_variables)] config: &PipelineConfig,
348) -> Result<Option<Box<dyn Compressor>>, CompressionError> {
349 match compression {
350 CompressionType::None => Ok(None),
351 #[cfg(any(feature = "szip", feature = "szip-pure"))]
352 CompressionType::Szip {
353 rsi,
354 block_size,
355 flags,
356 bits_per_sample,
357 } => {
358 let mut szip_flags = *flags;
359 if matches!(config.encoding, EncodingType::SimplePacking(_)) {
363 szip_flags |= 4; }
365
366 let compressor: Box<dyn Compressor> = build_szip_compressor(
370 config.compression_backend,
371 *rsi,
372 *block_size,
373 szip_flags,
374 *bits_per_sample,
375 );
376 Ok(Some(compressor))
377 }
378 #[cfg(any(feature = "zstd", feature = "zstd-pure"))]
379 CompressionType::Zstd { level } => {
380 let compressor: Box<dyn Compressor> = build_zstd_compressor(
381 config.compression_backend,
382 *level,
383 config.intra_codec_threads,
384 );
385 Ok(Some(compressor))
386 }
387 #[cfg(feature = "lz4")]
388 CompressionType::Lz4 => Ok(Some(Box::new(Lz4Compressor))),
389 #[cfg(feature = "blosc2")]
390 CompressionType::Blosc2 {
391 codec,
392 clevel,
393 typesize,
394 } => Ok(Some(Box::new(Blosc2Compressor {
395 codec: *codec,
396 clevel: *clevel,
397 typesize: *typesize,
398 nthreads: config.intra_codec_threads,
399 }))),
400 #[cfg(feature = "zfp")]
401 CompressionType::Zfp { mode } => Ok(Some(Box::new(ZfpCompressor {
402 mode: mode.clone(),
403 num_values: config.num_values,
404 byte_order: config.byte_order,
405 }))),
406 #[cfg(feature = "sz3")]
407 CompressionType::Sz3 { error_bound } => Ok(Some(Box::new(Sz3Compressor {
408 error_bound: error_bound.clone(),
409 num_values: config.num_values,
410 byte_order: config.byte_order,
411 }))),
412 }
413}
414
415#[tracing::instrument(skip(data, config), fields(data_len = data.len(), encoding = %config.encoding))]
417pub fn encode_pipeline(
418 data: &[u8],
419 config: &PipelineConfig,
420) -> Result<PipelineResult, PipelineError> {
421 let encoded: Cow<'_, [u8]> = match &config.encoding {
423 EncodingType::None => Cow::Borrowed(data),
424 EncodingType::SimplePacking(params) => {
425 let values = bytes_to_f64(data, config.byte_order);
426 Cow::Owned(simple_packing::encode_with_threads(
427 &values,
428 params,
429 config.intra_codec_threads,
430 )?)
431 }
432 };
433
434 let filtered: Cow<'_, [u8]> = match &config.filter {
436 FilterType::None => encoded,
437 FilterType::Shuffle { element_size } => Cow::Owned(
438 shuffle::shuffle_with_threads(&encoded, *element_size, config.intra_codec_threads)
439 .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
440 ),
441 };
442
443 let compressor = build_compressor(&config.compression, config)?;
445 match compressor {
446 None => Ok(PipelineResult {
447 encoded_bytes: filtered.into_owned(),
448 block_offsets: None,
449 }),
450 Some(compressor) => {
451 let CompressResult {
452 data: compressed,
453 block_offsets,
454 } = compressor.compress(&filtered)?;
455 Ok(PipelineResult {
456 encoded_bytes: compressed,
457 block_offsets,
458 })
459 }
460 }
461}
462
463#[tracing::instrument(skip(values, config), fields(num_values = values.len(), encoding = %config.encoding))]
466pub fn encode_pipeline_f64(
467 values: &[f64],
468 config: &PipelineConfig,
469) -> Result<PipelineResult, PipelineError> {
470 let encoded: Cow<'_, [u8]> = match &config.encoding {
471 EncodingType::None => Cow::Owned(f64_to_bytes(values, config.byte_order)),
472 EncodingType::SimplePacking(params) => Cow::Owned(simple_packing::encode_with_threads(
473 values,
474 params,
475 config.intra_codec_threads,
476 )?),
477 };
478
479 let filtered: Cow<'_, [u8]> = match &config.filter {
480 FilterType::None => encoded,
481 FilterType::Shuffle { element_size } => Cow::Owned(
482 shuffle::shuffle_with_threads(&encoded, *element_size, config.intra_codec_threads)
483 .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
484 ),
485 };
486
487 let compressor = build_compressor(&config.compression, config)?;
488 match compressor {
489 None => Ok(PipelineResult {
490 encoded_bytes: filtered.into_owned(),
491 block_offsets: None,
492 }),
493 Some(compressor) => {
494 let CompressResult {
495 data: compressed,
496 block_offsets,
497 } = compressor.compress(&filtered)?;
498 Ok(PipelineResult {
499 encoded_bytes: compressed,
500 block_offsets,
501 })
502 }
503 }
504}
505
506#[tracing::instrument(skip(encoded, config), fields(encoded_len = encoded.len()))]
513pub fn decode_pipeline(
514 encoded: &[u8],
515 config: &PipelineConfig,
516 native_byte_order: bool,
517) -> Result<Vec<u8>, PipelineError> {
518 let decompressed: Cow<'_, [u8]> = match build_compressor(&config.compression, config)? {
520 None => Cow::Borrowed(encoded),
521 Some(compressor) => {
522 let expected_size = estimate_decompressed_size(config);
523 Cow::Owned(compressor.decompress(encoded, expected_size)?)
524 }
525 };
526
527 let unfiltered: Cow<'_, [u8]> = match &config.filter {
529 FilterType::None => decompressed,
530 FilterType::Shuffle { element_size } => Cow::Owned(
531 shuffle::unshuffle_with_threads(
532 &decompressed,
533 *element_size,
534 config.intra_codec_threads,
535 )
536 .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
537 ),
538 };
539
540 let target_byte_order = if native_byte_order {
544 ByteOrder::native()
545 } else {
546 config.byte_order
547 };
548
549 let mut decoded = match &config.encoding {
551 EncodingType::None => unfiltered.into_owned(),
552 EncodingType::SimplePacking(params) => {
553 let values = simple_packing::decode_with_threads(
556 &unfiltered,
557 config.num_values,
558 params,
559 config.intra_codec_threads,
560 )?;
561 f64_to_bytes(&values, target_byte_order)
562 }
563 };
564
565 if native_byte_order
568 && matches!(config.encoding, EncodingType::None)
569 && config.byte_order != ByteOrder::native()
570 {
571 byteswap(&mut decoded, config.swap_unit_size)?;
572 }
573
574 Ok(decoded)
575}
576
577pub fn decode_range_pipeline(
588 encoded: &[u8],
589 config: &PipelineConfig,
590 block_offsets: &[u64],
591 sample_offset: u64,
592 sample_count: u64,
593 native_byte_order: bool,
594) -> Result<Vec<u8>, PipelineError> {
595 if matches!(config.filter, FilterType::Shuffle { .. }) {
596 return Err(PipelineError::Shuffle(
597 "partial range decode is not supported with shuffle filter".to_string(),
598 ));
599 }
600
601 let (byte_start, byte_size, bit_offset_in_chunk) = match &config.encoding {
603 EncodingType::SimplePacking(params) => {
604 let bit_start = sample_offset * params.bits_per_value as u64;
605 let bit_count = sample_count * params.bits_per_value as u64;
606 let bs = (bit_start / 8) as usize;
607 let be = (bit_start + bit_count).div_ceil(8) as usize;
608 (bs, be - bs, Some((bit_start % 8) as usize))
609 }
610 EncodingType::None => {
611 let elem_size = config.dtype_byte_width;
612 let bs = (sample_offset as usize)
613 .checked_mul(elem_size)
614 .ok_or_else(|| PipelineError::Range("byte offset overflow".to_string()))?;
615 let sz = (sample_count as usize)
616 .checked_mul(elem_size)
617 .ok_or_else(|| PipelineError::Range("byte count overflow".to_string()))?;
618 (bs, sz, None)
619 }
620 };
621
622 let decompressed = match build_compressor(&config.compression, config)? {
624 None => {
625 let byte_end = byte_start
627 .checked_add(byte_size)
628 .ok_or_else(|| PipelineError::Range("byte end overflow".to_string()))?;
629 if byte_end > encoded.len() {
630 return Err(PipelineError::Range(format!(
631 "range ({sample_offset}, {sample_count}) exceeds payload size"
632 )));
633 }
634 encoded[byte_start..byte_end].to_vec()
635 }
636 Some(compressor) => {
637 compressor.decompress_range(encoded, block_offsets, byte_start, byte_size)?
638 }
639 };
640
641 let target_byte_order = if native_byte_order {
642 ByteOrder::native()
643 } else {
644 config.byte_order
645 };
646
647 match &config.encoding {
649 EncodingType::None => {
650 let mut result = decompressed;
651 if native_byte_order && config.byte_order != ByteOrder::native() {
652 byteswap(&mut result, config.swap_unit_size)?;
653 }
654 Ok(result)
655 }
656 EncodingType::SimplePacking(params) => {
657 let values = simple_packing::decode_range(
658 &decompressed,
659 bit_offset_in_chunk.unwrap_or(0),
660 sample_count as usize,
661 params,
662 )?;
663 Ok(f64_to_bytes(&values, target_byte_order))
664 }
665 }
666}
667
668fn estimate_decompressed_size(config: &PipelineConfig) -> usize {
669 match &config.encoding {
670 EncodingType::None => config.num_values.saturating_mul(config.dtype_byte_width),
671 EncodingType::SimplePacking(params) => {
672 let total_bits =
673 (config.num_values as u128).saturating_mul(params.bits_per_value as u128);
674 total_bits.div_ceil(8).min(usize::MAX as u128) as usize
675 }
676 }
677}
678
679fn bytes_to_f64(data: &[u8], byte_order: ByteOrder) -> Vec<f64> {
680 data.chunks_exact(8)
681 .map(|chunk| {
682 let mut arr = [0u8; 8];
683 arr.copy_from_slice(chunk);
684 match byte_order {
685 ByteOrder::Big => f64::from_be_bytes(arr),
686 ByteOrder::Little => f64::from_le_bytes(arr),
687 }
688 })
689 .collect()
690}
691
692fn f64_to_bytes(values: &[f64], byte_order: ByteOrder) -> Vec<u8> {
693 values
694 .iter()
695 .flat_map(|v| match byte_order {
696 ByteOrder::Big => v.to_be_bytes(),
697 ByteOrder::Little => v.to_le_bytes(),
698 })
699 .collect()
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705
706 #[test]
707 fn test_passthrough_pipeline() {
708 let data = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
709 let config = PipelineConfig {
710 encoding: EncodingType::None,
711 filter: FilterType::None,
712 compression: CompressionType::None,
713 num_values: 1,
714 byte_order: ByteOrder::Little,
715 dtype_byte_width: 8,
716 swap_unit_size: 8,
717 compression_backend: CompressionBackend::default(),
718 intra_codec_threads: 0,
719 };
720 let result = encode_pipeline(&data, &config).unwrap();
721 assert_eq!(result.encoded_bytes, data);
722 let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
723 assert_eq!(decoded, data);
724 }
725
726 #[test]
727 fn test_simple_packing_pipeline() {
728 let values: Vec<f64> = (0..50).map(|i| 200.0 + i as f64 * 0.1).collect();
729 let data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
730 let params = simple_packing::compute_params(&values, 16, 0).unwrap();
731
732 let config = PipelineConfig {
733 encoding: EncodingType::SimplePacking(params),
734 filter: FilterType::None,
735 compression: CompressionType::None,
736 num_values: values.len(),
737 byte_order: ByteOrder::Little,
738 dtype_byte_width: 8,
739 swap_unit_size: 8,
740 compression_backend: CompressionBackend::default(),
741 intra_codec_threads: 0,
742 };
743
744 let result = encode_pipeline(&data, &config).unwrap();
745 let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
746 let decoded_values = bytes_to_f64(&decoded, ByteOrder::Little);
747
748 for (orig, dec) in values.iter().zip(decoded_values.iter()) {
749 assert!((orig - dec).abs() < 0.01, "orig={orig}, dec={dec}");
750 }
751 }
752
753 #[test]
754 fn test_shuffle_pipeline() {
755 let data: Vec<u8> = (0..16).collect();
756 let config = PipelineConfig {
757 encoding: EncodingType::None,
758 filter: FilterType::Shuffle { element_size: 4 },
759 compression: CompressionType::None,
760 num_values: 4,
761 byte_order: ByteOrder::Little,
762 dtype_byte_width: 4,
763 swap_unit_size: 4,
764 compression_backend: CompressionBackend::default(),
765 intra_codec_threads: 0,
766 };
767
768 let result = encode_pipeline(&data, &config).unwrap();
769 assert_ne!(result.encoded_bytes, data); let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
771 assert_eq!(decoded, data);
772 }
773
774 #[cfg(any(feature = "szip", feature = "szip-pure"))]
775 #[test]
776 fn test_szip_round_trip_pipeline() {
777 let data: Vec<u8> = (0..2048).map(|i| (i % 256) as u8).collect();
778
779 let preprocess_flag = 8u32;
781
782 let config = PipelineConfig {
783 encoding: EncodingType::None,
784 filter: FilterType::None,
785 compression: CompressionType::Szip {
786 rsi: 128,
787 block_size: 16,
788 flags: preprocess_flag,
789 bits_per_sample: 8,
790 },
791 num_values: 2048,
792 byte_order: ByteOrder::Little,
793 dtype_byte_width: 1,
794 swap_unit_size: 1,
795 compression_backend: CompressionBackend::default(),
796 intra_codec_threads: 0,
797 };
798
799 let result = encode_pipeline(&data, &config).unwrap();
800 assert!(result.block_offsets.is_some());
801
802 let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
803 assert_eq!(decoded, data);
804 }
805
806 #[test]
811 fn test_byteswap_noop_for_single_byte() {
812 let mut data = vec![1, 2, 3, 4];
813 let original = data.clone();
814 byteswap(&mut data, 1).unwrap();
815 assert_eq!(data, original);
816 byteswap(&mut data, 0).unwrap();
817 assert_eq!(data, original);
818 }
819
820 #[test]
821 fn test_byteswap_2_bytes() {
822 let mut data = vec![0xAA, 0xBB, 0xCC, 0xDD];
823 byteswap(&mut data, 2).unwrap();
824 assert_eq!(data, vec![0xBB, 0xAA, 0xDD, 0xCC]);
825 }
826
827 #[test]
828 fn test_byteswap_4_bytes() {
829 let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8];
830 byteswap(&mut data, 4).unwrap();
831 assert_eq!(data, vec![4, 3, 2, 1, 8, 7, 6, 5]);
832 }
833
834 #[test]
835 fn test_byteswap_8_bytes() {
836 let mut data: Vec<u8> = (1..=16).collect();
837 byteswap(&mut data, 8).unwrap();
838 assert_eq!(
839 data,
840 vec![8, 7, 6, 5, 4, 3, 2, 1, 16, 15, 14, 13, 12, 11, 10, 9]
841 );
842 }
843
844 #[test]
845 fn test_byteswap_round_trip() {
846 let original = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
847 let mut data = original.clone();
848 byteswap(&mut data, 4).unwrap();
849 assert_ne!(data, original);
850 byteswap(&mut data, 4).unwrap();
851 assert_eq!(data, original);
852 }
853
854 #[test]
855 fn test_byteswap_misaligned_returns_error() {
856 let mut data = vec![1, 2, 3, 4, 5]; let result = byteswap(&mut data, 4);
858 assert!(result.is_err());
859 }
860
861 #[test]
866 fn test_decode_native_byte_order_encoding_none() {
867 let value: f32 = 42.0;
869 let be_bytes = value.to_be_bytes();
870 let config = PipelineConfig {
871 encoding: EncodingType::None,
872 filter: FilterType::None,
873 compression: CompressionType::None,
874 num_values: 1,
875 byte_order: ByteOrder::Big,
876 dtype_byte_width: 4,
877 swap_unit_size: 4,
878 compression_backend: CompressionBackend::default(),
879 intra_codec_threads: 0,
880 };
881
882 let result = encode_pipeline(&be_bytes, &config).unwrap();
883
884 let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
886 let ne_value = f32::from_ne_bytes(native_decoded[..4].try_into().unwrap());
887 assert_eq!(ne_value, value);
888
889 let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
891 let be_value = f32::from_be_bytes(wire_decoded[..4].try_into().unwrap());
892 assert_eq!(be_value, value);
893 }
894
895 #[test]
896 fn test_decode_native_byte_order_simple_packing() {
897 let values: Vec<f64> = vec![100.0, 200.0, 300.0, 400.0];
898 let data: Vec<u8> = values.iter().flat_map(|v| v.to_be_bytes()).collect();
900 let params = simple_packing::compute_params(&values, 24, 0).unwrap();
901
902 let config = PipelineConfig {
903 encoding: EncodingType::SimplePacking(params),
904 filter: FilterType::None,
905 compression: CompressionType::None,
906 num_values: values.len(),
907 byte_order: ByteOrder::Big,
908 dtype_byte_width: 8,
909 swap_unit_size: 8,
910 compression_backend: CompressionBackend::default(),
911 intra_codec_threads: 0,
912 };
913
914 let result = encode_pipeline(&data, &config).unwrap();
915
916 let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
918 let decoded_values: Vec<f64> = native_decoded
919 .chunks_exact(8)
920 .map(|c| f64::from_ne_bytes(c.try_into().unwrap()))
921 .collect();
922 for (orig, dec) in values.iter().zip(decoded_values.iter()) {
923 assert!((orig - dec).abs() < 1.0, "orig={orig}, dec={dec}");
924 }
925
926 let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
928 let wire_values: Vec<f64> = wire_decoded
929 .chunks_exact(8)
930 .map(|c| f64::from_be_bytes(c.try_into().unwrap()))
931 .collect();
932 for (orig, dec) in values.iter().zip(wire_values.iter()) {
933 assert!((orig - dec).abs() < 1.0, "orig={orig}, dec={dec}");
934 }
935 }
936
937 #[test]
938 fn test_native_byte_order_same_as_wire_is_noop() {
939 let values: Vec<f32> = vec![1.0, 2.0, 3.0, 4.0];
942 let data: Vec<u8> = values.iter().flat_map(|v| v.to_ne_bytes()).collect();
943
944 let config = PipelineConfig {
945 encoding: EncodingType::None,
946 filter: FilterType::None,
947 compression: CompressionType::None,
948 num_values: values.len(),
949 byte_order: ByteOrder::native(),
950 dtype_byte_width: 4,
951 swap_unit_size: 4,
952 compression_backend: CompressionBackend::default(),
953 intra_codec_threads: 0,
954 };
955
956 let result = encode_pipeline(&data, &config).unwrap();
957 let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
958 let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
959 assert_eq!(native_decoded, wire_decoded);
960 }
961
962 #[test]
963 fn test_decode_native_byte_order_2byte_dtype() {
964 let value: u16 = 0x0102;
966 let be_bytes = value.to_be_bytes();
967 let config = PipelineConfig {
968 encoding: EncodingType::None,
969 filter: FilterType::None,
970 compression: CompressionType::None,
971 num_values: 1,
972 byte_order: ByteOrder::Big,
973 dtype_byte_width: 2,
974 swap_unit_size: 2,
975 compression_backend: CompressionBackend::default(),
976 intra_codec_threads: 0,
977 };
978 let result = encode_pipeline(&be_bytes, &config).unwrap();
979 let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
980 assert_eq!(u16::from_ne_bytes(native[..2].try_into().unwrap()), value);
981 }
982
983 #[test]
984 fn test_decode_native_byte_order_8byte_dtype() {
985 let value: f64 = std::f64::consts::E;
987 let be_bytes = value.to_be_bytes();
988 let config = PipelineConfig {
989 encoding: EncodingType::None,
990 filter: FilterType::None,
991 compression: CompressionType::None,
992 num_values: 1,
993 byte_order: ByteOrder::Big,
994 dtype_byte_width: 8,
995 swap_unit_size: 8,
996 compression_backend: CompressionBackend::default(),
997 intra_codec_threads: 0,
998 };
999 let result = encode_pipeline(&be_bytes, &config).unwrap();
1000 let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1001 assert_eq!(f64::from_ne_bytes(native[..8].try_into().unwrap()), value);
1002 }
1003
1004 #[test]
1005 fn test_decode_native_byte_order_complex64() {
1006 let real: f32 = 1.5;
1009 let imag: f32 = 2.5;
1010 let mut be_bytes = Vec::new();
1011 be_bytes.extend_from_slice(&real.to_be_bytes());
1012 be_bytes.extend_from_slice(&imag.to_be_bytes());
1013 let config = PipelineConfig {
1014 encoding: EncodingType::None,
1015 filter: FilterType::None,
1016 compression: CompressionType::None,
1017 num_values: 1,
1018 byte_order: ByteOrder::Big,
1019 dtype_byte_width: 8,
1020 swap_unit_size: 4, compression_backend: CompressionBackend::default(),
1022 intra_codec_threads: 0,
1023 };
1024 let result = encode_pipeline(&be_bytes, &config).unwrap();
1025 let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1026 let decoded_real = f32::from_ne_bytes(native[0..4].try_into().unwrap());
1027 let decoded_imag = f32::from_ne_bytes(native[4..8].try_into().unwrap());
1028 assert_eq!(decoded_real, real);
1029 assert_eq!(decoded_imag, imag);
1030 }
1031
1032 #[test]
1033 fn test_decode_native_byte_order_uint8_noop() {
1034 let data = vec![1u8, 2, 3, 4, 5];
1036 let config = PipelineConfig {
1037 encoding: EncodingType::None,
1038 filter: FilterType::None,
1039 compression: CompressionType::None,
1040 num_values: 5,
1041 byte_order: ByteOrder::Big, dtype_byte_width: 1,
1043 swap_unit_size: 1,
1044 compression_backend: CompressionBackend::default(),
1045 intra_codec_threads: 0,
1046 };
1047 let result = encode_pipeline(&data, &config).unwrap();
1048 let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1049 assert_eq!(native, data); }
1051}