Skip to main content

tensogram_encodings/
pipeline.rs

1// (C) Copyright 2026- ECMWF and individual contributors.
2//
3// This software is licensed under the terms of the Apache Licence Version 2.0
4// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5// In applying this licence, ECMWF does not waive the privileges and immunities
6// granted to it by virtue of its status as an intergovernmental organisation nor
7// does it submit to any jurisdiction.
8
9use std::borrow::Cow;
10
11#[cfg(feature = "blosc2")]
12use crate::compression::Blosc2Compressor;
13#[cfg(feature = "lz4")]
14use crate::compression::Lz4Compressor;
15#[cfg(feature = "sz3")]
16use crate::compression::Sz3Compressor;
17#[cfg(feature = "szip")]
18use crate::compression::SzipCompressor;
19#[cfg(feature = "szip-pure")]
20use crate::compression::SzipPureCompressor;
21#[cfg(feature = "zfp")]
22use crate::compression::ZfpCompressor;
23#[cfg(feature = "zstd")]
24use crate::compression::ZstdCompressor;
25#[cfg(feature = "zstd-pure")]
26use crate::compression::ZstdPureCompressor;
27use crate::compression::{CompressResult, CompressionError, Compressor};
28use crate::shuffle;
29use crate::simple_packing::{self, PackingError, SimplePackingParams};
30use serde::{Deserialize, Serialize};
31use std::sync::OnceLock;
32use thiserror::Error;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "lowercase")]
36pub enum ByteOrder {
37    Big,
38    Little,
39}
40
41impl ByteOrder {
42    /// Returns the byte order of the platform this code was compiled for.
43    #[inline]
44    pub fn native() -> Self {
45        #[cfg(target_endian = "little")]
46        {
47            ByteOrder::Little
48        }
49        #[cfg(target_endian = "big")]
50        {
51            ByteOrder::Big
52        }
53    }
54}
55
56/// Reverse bytes within each `unit_size`-byte chunk of `data`, converting
57/// between big-endian and little-endian representations in place.
58///
59/// No-op when `unit_size <= 1` (single-byte types have no byte order).
60/// Returns an error if `data.len()` is not a multiple of `unit_size`.
61/// For complex types, pass the scalar component size (e.g. 4 for complex64)
62/// so that each float32 component is swapped independently.
63pub fn byteswap(data: &mut [u8], unit_size: usize) -> Result<(), PipelineError> {
64    if unit_size <= 1 {
65        return Ok(());
66    }
67    if !data.len().is_multiple_of(unit_size) {
68        return Err(PipelineError::Range(format!(
69            "byteswap: data length {} is not a multiple of unit_size {}",
70            data.len(),
71            unit_size,
72        )));
73    }
74    for chunk in data.chunks_exact_mut(unit_size) {
75        chunk.reverse();
76    }
77    Ok(())
78}
79
80#[derive(Debug, Error)]
81pub enum PipelineError {
82    #[error("encoding error: {0}")]
83    Encoding(#[from] PackingError),
84    #[error("compression error: {0}")]
85    Compression(#[from] CompressionError),
86    #[error("shuffle error: {0}")]
87    Shuffle(String),
88    #[error("range error: {0}")]
89    Range(String),
90    #[error("unknown encoding: {0}")]
91    UnknownEncoding(String),
92    #[error("unknown filter: {0}")]
93    UnknownFilter(String),
94    #[error("unknown compression: {0}")]
95    UnknownCompression(String),
96}
97
98#[derive(Debug, Clone)]
99pub enum EncodingType {
100    None,
101    SimplePacking(SimplePackingParams),
102}
103
104impl std::fmt::Display for EncodingType {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            EncodingType::None => write!(f, "none"),
108            EncodingType::SimplePacking(_) => write!(f, "simple_packing"),
109        }
110    }
111}
112
113#[derive(Debug, Clone)]
114pub enum FilterType {
115    None,
116    Shuffle { element_size: usize },
117}
118
119#[cfg(feature = "blosc2")]
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "lowercase")]
122pub enum Blosc2Codec {
123    Blosclz,
124    Lz4,
125    Lz4hc,
126    Zlib,
127    Zstd,
128}
129
130#[cfg(feature = "zfp")]
131#[derive(Debug, Clone)]
132pub enum ZfpMode {
133    FixedRate { rate: f64 },
134    FixedPrecision { precision: u32 },
135    FixedAccuracy { tolerance: f64 },
136}
137
138#[cfg(feature = "sz3")]
139#[derive(Debug, Clone)]
140pub enum Sz3ErrorBound {
141    Absolute(f64),
142    Relative(f64),
143    Psnr(f64),
144}
145
146#[derive(Debug, Clone)]
147pub enum CompressionType {
148    None,
149    #[cfg(any(feature = "szip", feature = "szip-pure"))]
150    Szip {
151        rsi: u32,
152        block_size: u32,
153        flags: u32,
154        bits_per_sample: u32,
155    },
156    #[cfg(any(feature = "zstd", feature = "zstd-pure"))]
157    Zstd {
158        level: i32,
159    },
160    #[cfg(feature = "lz4")]
161    Lz4,
162    #[cfg(feature = "blosc2")]
163    Blosc2 {
164        codec: Blosc2Codec,
165        clevel: i32,
166        typesize: usize,
167    },
168    #[cfg(feature = "zfp")]
169    Zfp {
170        mode: ZfpMode,
171    },
172    #[cfg(feature = "sz3")]
173    Sz3 {
174        error_bound: Sz3ErrorBound,
175    },
176}
177
178/// Selects which backend to use when both FFI and pure-Rust implementations
179/// are compiled in for the same codec (szip or zstd).
180///
181/// When only one feature is enabled the backend field is ignored — the
182/// available implementation is always used.
183///
184/// The default is resolved once from the `TENSOGRAM_COMPRESSION_BACKEND`
185/// environment variable (values: `ffi` or `pure`).
186/// On `wasm32` the default is always `Pure`.
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub enum CompressionBackend {
189    /// Use the C FFI implementation (libaec for szip, libzstd for zstd).
190    /// Falls back to pure-Rust if the FFI feature is not compiled in.
191    Ffi,
192    /// Use the pure-Rust implementation (tensogram-szip, ruzstd).
193    /// Falls back to FFI if the pure feature is not compiled in.
194    Pure,
195}
196
197impl Default for CompressionBackend {
198    fn default() -> Self {
199        default_compression_backend()
200    }
201}
202
203/// Resolve the default compression backend from environment variables.
204///
205/// - `TENSOGRAM_COMPRESSION_BACKEND=pure|ffi` overrides the default for
206///   both szip and zstd.
207/// - On `wasm32` the default is always `Pure` (FFI backends cannot exist).
208/// - On native the default is `Ffi` (faster, battle-tested).
209pub fn default_compression_backend() -> CompressionBackend {
210    static DEFAULT: OnceLock<CompressionBackend> = OnceLock::new();
211    *DEFAULT.get_or_init(|| {
212        if cfg!(target_arch = "wasm32") {
213            return CompressionBackend::Pure;
214        }
215        // Check env — accept "pure" or "ffi" (case-insensitive)
216        if let Ok(val) = std::env::var("TENSOGRAM_COMPRESSION_BACKEND") {
217            return parse_backend(&val);
218        }
219        // Native default: prefer FFI
220        CompressionBackend::Ffi
221    })
222}
223
224fn parse_backend(val: &str) -> CompressionBackend {
225    match val.trim().to_ascii_lowercase().as_str() {
226        "pure" | "rust" => CompressionBackend::Pure,
227        _ => CompressionBackend::Ffi,
228    }
229}
230
231#[derive(Debug, Clone)]
232pub struct PipelineConfig {
233    pub encoding: EncodingType,
234    pub filter: FilterType,
235    pub compression: CompressionType,
236    pub num_values: usize,
237    pub byte_order: ByteOrder,
238    pub dtype_byte_width: usize,
239    /// The size of the fundamental scalar component for byte-order swapping.
240    /// Equal to `dtype_byte_width` for simple types.  For complex types the
241    /// swap must operate on each float component independently, so this is
242    /// half of `dtype_byte_width` (e.g. 4 for complex64, 8 for complex128).
243    pub swap_unit_size: usize,
244    /// Which backend to use for szip / zstd when both are compiled in.
245    pub compression_backend: CompressionBackend,
246    /// Intra-codec thread budget for this object.
247    ///
248    /// - `0` — sequential (default; preserves pre-threads behaviour).
249    /// - `N ≥ 1` — codec may use up to `N` threads internally.  Only a
250    ///   subset of codecs honour this (currently: blosc2, zstd FFI,
251    ///   simple_packing, shuffle); others ignore it.  Output bytes are
252    ///   byte-identical regardless of `N`.
253    ///
254    /// Callers should expect this to be set by the `tensogram-core`
255    /// layer after consulting
256    /// [`EncodeOptions.threads`](../../../tensogram_core/encode/struct.EncodeOptions.html#structfield.threads)
257    /// and the small-message threshold; direct pipeline callers may
258    /// leave it at `0`.
259    pub intra_codec_threads: u32,
260}
261
262pub struct PipelineResult {
263    pub encoded_bytes: Vec<u8>,
264    /// Block offsets produced by compressors that support random access (szip, blosc2).
265    pub block_offsets: Option<Vec<u64>>,
266}
267
268/// Build an szip compressor, dispatching between FFI and pure-Rust at runtime.
269#[cfg(any(feature = "szip", feature = "szip-pure"))]
270fn build_szip_compressor(
271    #[allow(unused_variables)] backend: CompressionBackend,
272    rsi: u32,
273    block_size: u32,
274    flags: u32,
275    bits_per_sample: u32,
276) -> Box<dyn Compressor> {
277    // When both features are compiled in, dispatch at runtime.
278    #[cfg(all(feature = "szip", feature = "szip-pure"))]
279    if matches!(backend, CompressionBackend::Pure) {
280        return Box::new(SzipPureCompressor {
281            rsi,
282            block_size,
283            flags,
284            bits_per_sample,
285        });
286    }
287
288    // FFI path — used when szip feature is enabled and either:
289    // (a) both features compiled but backend is Ffi, or
290    // (b) only szip is compiled.
291    #[cfg(feature = "szip")]
292    {
293        Box::new(SzipCompressor {
294            rsi,
295            block_size,
296            flags,
297            bits_per_sample,
298        })
299    }
300
301    // Pure-only path — used when only szip-pure is compiled (no FFI).
302    #[cfg(all(feature = "szip-pure", not(feature = "szip")))]
303    {
304        Box::new(SzipPureCompressor {
305            rsi,
306            block_size,
307            flags,
308            bits_per_sample,
309        })
310    }
311}
312
313/// Build a zstd compressor, dispatching between FFI and pure-Rust at runtime.
314///
315/// `nb_workers` is forwarded to the FFI path (libzstd `NbWorkers`
316/// parameter) and ignored by the pure-Rust path (ruzstd is
317/// single-threaded).
318#[cfg(any(feature = "zstd", feature = "zstd-pure"))]
319fn build_zstd_compressor(
320    #[allow(unused_variables)] backend: CompressionBackend,
321    level: i32,
322    #[allow(unused_variables)] nb_workers: u32,
323) -> Box<dyn Compressor> {
324    #[cfg(all(feature = "zstd", feature = "zstd-pure"))]
325    if matches!(backend, CompressionBackend::Pure) {
326        return Box::new(ZstdPureCompressor { level });
327    }
328
329    #[cfg(feature = "zstd")]
330    {
331        Box::new(ZstdCompressor { level, nb_workers })
332    }
333
334    #[cfg(all(feature = "zstd-pure", not(feature = "zstd")))]
335    {
336        Box::new(ZstdPureCompressor { level })
337    }
338}
339
340/// Build a boxed compressor from a CompressionType variant.
341///
342/// For szip and zstd, the `compression_backend` field in `PipelineConfig`
343/// selects between FFI and pure-Rust implementations at **runtime**.  When
344/// only one feature is compiled in, the backend field is ignored.
345fn build_compressor(
346    compression: &CompressionType,
347    #[allow(unused_variables)] config: &PipelineConfig,
348) -> Result<Option<Box<dyn Compressor>>, CompressionError> {
349    match compression {
350        CompressionType::None => Ok(None),
351        #[cfg(any(feature = "szip", feature = "szip-pure"))]
352        CompressionType::Szip {
353            rsi,
354            block_size,
355            flags,
356            bits_per_sample,
357        } => {
358            let mut szip_flags = *flags;
359            // simple_packing output is MSB-first; tell the szip codec so its
360            // predictor sees bytes in the correct significance order.
361            // AEC_DATA_MSB = 4 in both libaec-sys and tensogram-szip.
362            if matches!(config.encoding, EncodingType::SimplePacking(_)) {
363                szip_flags |= 4; // AEC_DATA_MSB
364            }
365
366            // Runtime backend selection.  The helper builds the right
367            // Box<dyn Compressor> based on what features are compiled in
368            // and what the caller requested.
369            let compressor: Box<dyn Compressor> = build_szip_compressor(
370                config.compression_backend,
371                *rsi,
372                *block_size,
373                szip_flags,
374                *bits_per_sample,
375            );
376            Ok(Some(compressor))
377        }
378        #[cfg(any(feature = "zstd", feature = "zstd-pure"))]
379        CompressionType::Zstd { level } => {
380            let compressor: Box<dyn Compressor> = build_zstd_compressor(
381                config.compression_backend,
382                *level,
383                config.intra_codec_threads,
384            );
385            Ok(Some(compressor))
386        }
387        #[cfg(feature = "lz4")]
388        CompressionType::Lz4 => Ok(Some(Box::new(Lz4Compressor))),
389        #[cfg(feature = "blosc2")]
390        CompressionType::Blosc2 {
391            codec,
392            clevel,
393            typesize,
394        } => Ok(Some(Box::new(Blosc2Compressor {
395            codec: *codec,
396            clevel: *clevel,
397            typesize: *typesize,
398            nthreads: config.intra_codec_threads,
399        }))),
400        #[cfg(feature = "zfp")]
401        CompressionType::Zfp { mode } => Ok(Some(Box::new(ZfpCompressor {
402            mode: mode.clone(),
403            num_values: config.num_values,
404            byte_order: config.byte_order,
405        }))),
406        #[cfg(feature = "sz3")]
407        CompressionType::Sz3 { error_bound } => Ok(Some(Box::new(Sz3Compressor {
408            error_bound: error_bound.clone(),
409            num_values: config.num_values,
410            byte_order: config.byte_order,
411        }))),
412    }
413}
414
415/// Full forward pipeline: encode → filter → compress
416#[tracing::instrument(skip(data, config), fields(data_len = data.len(), encoding = %config.encoding))]
417pub fn encode_pipeline(
418    data: &[u8],
419    config: &PipelineConfig,
420) -> Result<PipelineResult, PipelineError> {
421    // Step 1: Encoding — Cow avoids cloning when encoding is None
422    let encoded: Cow<'_, [u8]> = match &config.encoding {
423        EncodingType::None => Cow::Borrowed(data),
424        EncodingType::SimplePacking(params) => {
425            let values = bytes_to_f64(data, config.byte_order);
426            Cow::Owned(simple_packing::encode_with_threads(
427                &values,
428                params,
429                config.intra_codec_threads,
430            )?)
431        }
432    };
433
434    // Step 2: Filter
435    let filtered: Cow<'_, [u8]> = match &config.filter {
436        FilterType::None => encoded,
437        FilterType::Shuffle { element_size } => Cow::Owned(
438            shuffle::shuffle_with_threads(&encoded, *element_size, config.intra_codec_threads)
439                .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
440        ),
441    };
442
443    // Step 3: Compression
444    let compressor = build_compressor(&config.compression, config)?;
445    match compressor {
446        None => Ok(PipelineResult {
447            encoded_bytes: filtered.into_owned(),
448            block_offsets: None,
449        }),
450        Some(compressor) => {
451            let CompressResult {
452                data: compressed,
453                block_offsets,
454            } = compressor.compress(&filtered)?;
455            Ok(PipelineResult {
456                encoded_bytes: compressed,
457                block_offsets,
458            })
459        }
460    }
461}
462
463/// Encode from f64 values directly, avoiding the bytes→f64 conversion overhead
464/// that `encode_pipeline` pays when the caller already has typed values.
465#[tracing::instrument(skip(values, config), fields(num_values = values.len(), encoding = %config.encoding))]
466pub fn encode_pipeline_f64(
467    values: &[f64],
468    config: &PipelineConfig,
469) -> Result<PipelineResult, PipelineError> {
470    let encoded: Cow<'_, [u8]> = match &config.encoding {
471        EncodingType::None => Cow::Owned(f64_to_bytes(values, config.byte_order)),
472        EncodingType::SimplePacking(params) => Cow::Owned(simple_packing::encode_with_threads(
473            values,
474            params,
475            config.intra_codec_threads,
476        )?),
477    };
478
479    let filtered: Cow<'_, [u8]> = match &config.filter {
480        FilterType::None => encoded,
481        FilterType::Shuffle { element_size } => Cow::Owned(
482            shuffle::shuffle_with_threads(&encoded, *element_size, config.intra_codec_threads)
483                .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
484        ),
485    };
486
487    let compressor = build_compressor(&config.compression, config)?;
488    match compressor {
489        None => Ok(PipelineResult {
490            encoded_bytes: filtered.into_owned(),
491            block_offsets: None,
492        }),
493        Some(compressor) => {
494            let CompressResult {
495                data: compressed,
496                block_offsets,
497            } = compressor.compress(&filtered)?;
498            Ok(PipelineResult {
499                encoded_bytes: compressed,
500                block_offsets,
501            })
502        }
503    }
504}
505
506/// Full reverse pipeline: decompress → unshuffle → decode → native byteswap.
507///
508/// When `native_byte_order` is true (the default at the API level), the
509/// output bytes are converted to the caller's native byte order so that a
510/// simple `reinterpret_cast` or `from_ne_bytes` produces correct values.
511/// When false, bytes are returned in the message's declared wire byte order.
512#[tracing::instrument(skip(encoded, config), fields(encoded_len = encoded.len()))]
513pub fn decode_pipeline(
514    encoded: &[u8],
515    config: &PipelineConfig,
516    native_byte_order: bool,
517) -> Result<Vec<u8>, PipelineError> {
518    // Step 1: Decompress — Cow avoids cloning when no compression
519    let decompressed: Cow<'_, [u8]> = match build_compressor(&config.compression, config)? {
520        None => Cow::Borrowed(encoded),
521        Some(compressor) => {
522            let expected_size = estimate_decompressed_size(config);
523            Cow::Owned(compressor.decompress(encoded, expected_size)?)
524        }
525    };
526
527    // Step 2: Unshuffle
528    let unfiltered: Cow<'_, [u8]> = match &config.filter {
529        FilterType::None => decompressed,
530        FilterType::Shuffle { element_size } => Cow::Owned(
531            shuffle::unshuffle_with_threads(
532                &decompressed,
533                *element_size,
534                config.intra_codec_threads,
535            )
536            .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
537        ),
538    };
539
540    // Determine the target byte order for the output.  When the caller
541    // requests native byte order, simple_packing can write directly in
542    // native (avoiding a redundant write + swap).
543    let target_byte_order = if native_byte_order {
544        ByteOrder::native()
545    } else {
546        config.byte_order
547    };
548
549    // Step 3: Decode encoding
550    let mut decoded = match &config.encoding {
551        EncodingType::None => unfiltered.into_owned(),
552        EncodingType::SimplePacking(params) => {
553            // simple_packing decodes to Vec<f64> in-register values (no byte
554            // order) then serialises directly to the target byte order.
555            let values = simple_packing::decode_with_threads(
556                &unfiltered,
557                config.num_values,
558                params,
559                config.intra_codec_threads,
560            )?;
561            f64_to_bytes(&values, target_byte_order)
562        }
563    };
564
565    // Step 4: Native-endian byteswap for encoding=none.
566    // (simple_packing already wrote in target_byte_order above.)
567    if native_byte_order
568        && matches!(config.encoding, EncodingType::None)
569        && config.byte_order != ByteOrder::native()
570    {
571        byteswap(&mut decoded, config.swap_unit_size)?;
572    }
573
574    Ok(decoded)
575}
576
577/// Decode a partial sample range from a compressed+encoded pipeline.
578///
579/// Supports compressors with random access (szip, blosc2, zfp fixed-rate).
580/// Shuffle filter is not supported with range decode.
581///
582/// `sample_offset` and `sample_count` are in logical element units.
583/// `block_offsets` are block boundary offsets from encoding (compressor-specific).
584///
585/// When `native_byte_order` is true, the output bytes are in the caller's
586/// native byte order.
587pub fn decode_range_pipeline(
588    encoded: &[u8],
589    config: &PipelineConfig,
590    block_offsets: &[u64],
591    sample_offset: u64,
592    sample_count: u64,
593    native_byte_order: bool,
594) -> Result<Vec<u8>, PipelineError> {
595    if matches!(config.filter, FilterType::Shuffle { .. }) {
596        return Err(PipelineError::Shuffle(
597            "partial range decode is not supported with shuffle filter".to_string(),
598        ));
599    }
600
601    // Phase 1: Compute byte range needed from the (possibly compressed) stream
602    let (byte_start, byte_size, bit_offset_in_chunk) = match &config.encoding {
603        EncodingType::SimplePacking(params) => {
604            let bit_start = sample_offset * params.bits_per_value as u64;
605            let bit_count = sample_count * params.bits_per_value as u64;
606            let bs = (bit_start / 8) as usize;
607            let be = (bit_start + bit_count).div_ceil(8) as usize;
608            (bs, be - bs, Some((bit_start % 8) as usize))
609        }
610        EncodingType::None => {
611            let elem_size = config.dtype_byte_width;
612            let bs = (sample_offset as usize)
613                .checked_mul(elem_size)
614                .ok_or_else(|| PipelineError::Range("byte offset overflow".to_string()))?;
615            let sz = (sample_count as usize)
616                .checked_mul(elem_size)
617                .ok_or_else(|| PipelineError::Range("byte count overflow".to_string()))?;
618            (bs, sz, None)
619        }
620    };
621
622    // Phase 2: Get decompressed bytes for the range
623    let decompressed = match build_compressor(&config.compression, config)? {
624        None => {
625            // No compression: slice directly from encoded buffer
626            let byte_end = byte_start
627                .checked_add(byte_size)
628                .ok_or_else(|| PipelineError::Range("byte end overflow".to_string()))?;
629            if byte_end > encoded.len() {
630                return Err(PipelineError::Range(format!(
631                    "range ({sample_offset}, {sample_count}) exceeds payload size"
632                )));
633            }
634            encoded[byte_start..byte_end].to_vec()
635        }
636        Some(compressor) => {
637            compressor.decompress_range(encoded, block_offsets, byte_start, byte_size)?
638        }
639    };
640
641    let target_byte_order = if native_byte_order {
642        ByteOrder::native()
643    } else {
644        config.byte_order
645    };
646
647    // Phase 3: Decode encoding from decompressed bytes
648    match &config.encoding {
649        EncodingType::None => {
650            let mut result = decompressed;
651            if native_byte_order && config.byte_order != ByteOrder::native() {
652                byteswap(&mut result, config.swap_unit_size)?;
653            }
654            Ok(result)
655        }
656        EncodingType::SimplePacking(params) => {
657            let values = simple_packing::decode_range(
658                &decompressed,
659                bit_offset_in_chunk.unwrap_or(0),
660                sample_count as usize,
661                params,
662            )?;
663            Ok(f64_to_bytes(&values, target_byte_order))
664        }
665    }
666}
667
668fn estimate_decompressed_size(config: &PipelineConfig) -> usize {
669    match &config.encoding {
670        EncodingType::None => config.num_values.saturating_mul(config.dtype_byte_width),
671        EncodingType::SimplePacking(params) => {
672            let total_bits =
673                (config.num_values as u128).saturating_mul(params.bits_per_value as u128);
674            total_bits.div_ceil(8).min(usize::MAX as u128) as usize
675        }
676    }
677}
678
679fn bytes_to_f64(data: &[u8], byte_order: ByteOrder) -> Vec<f64> {
680    data.chunks_exact(8)
681        .map(|chunk| {
682            let mut arr = [0u8; 8];
683            arr.copy_from_slice(chunk);
684            match byte_order {
685                ByteOrder::Big => f64::from_be_bytes(arr),
686                ByteOrder::Little => f64::from_le_bytes(arr),
687            }
688        })
689        .collect()
690}
691
692fn f64_to_bytes(values: &[f64], byte_order: ByteOrder) -> Vec<u8> {
693    values
694        .iter()
695        .flat_map(|v| match byte_order {
696            ByteOrder::Big => v.to_be_bytes(),
697            ByteOrder::Little => v.to_le_bytes(),
698        })
699        .collect()
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705
706    #[test]
707    fn test_passthrough_pipeline() {
708        let data = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
709        let config = PipelineConfig {
710            encoding: EncodingType::None,
711            filter: FilterType::None,
712            compression: CompressionType::None,
713            num_values: 1,
714            byte_order: ByteOrder::Little,
715            dtype_byte_width: 8,
716            swap_unit_size: 8,
717            compression_backend: CompressionBackend::default(),
718            intra_codec_threads: 0,
719        };
720        let result = encode_pipeline(&data, &config).unwrap();
721        assert_eq!(result.encoded_bytes, data);
722        let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
723        assert_eq!(decoded, data);
724    }
725
726    #[test]
727    fn test_simple_packing_pipeline() {
728        let values: Vec<f64> = (0..50).map(|i| 200.0 + i as f64 * 0.1).collect();
729        let data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
730        let params = simple_packing::compute_params(&values, 16, 0).unwrap();
731
732        let config = PipelineConfig {
733            encoding: EncodingType::SimplePacking(params),
734            filter: FilterType::None,
735            compression: CompressionType::None,
736            num_values: values.len(),
737            byte_order: ByteOrder::Little,
738            dtype_byte_width: 8,
739            swap_unit_size: 8,
740            compression_backend: CompressionBackend::default(),
741            intra_codec_threads: 0,
742        };
743
744        let result = encode_pipeline(&data, &config).unwrap();
745        let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
746        let decoded_values = bytes_to_f64(&decoded, ByteOrder::Little);
747
748        for (orig, dec) in values.iter().zip(decoded_values.iter()) {
749            assert!((orig - dec).abs() < 0.01, "orig={orig}, dec={dec}");
750        }
751    }
752
753    #[test]
754    fn test_shuffle_pipeline() {
755        let data: Vec<u8> = (0..16).collect();
756        let config = PipelineConfig {
757            encoding: EncodingType::None,
758            filter: FilterType::Shuffle { element_size: 4 },
759            compression: CompressionType::None,
760            num_values: 4,
761            byte_order: ByteOrder::Little,
762            dtype_byte_width: 4,
763            swap_unit_size: 4,
764            compression_backend: CompressionBackend::default(),
765            intra_codec_threads: 0,
766        };
767
768        let result = encode_pipeline(&data, &config).unwrap();
769        assert_ne!(result.encoded_bytes, data); // shuffled should differ
770        let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
771        assert_eq!(decoded, data);
772    }
773
774    #[cfg(any(feature = "szip", feature = "szip-pure"))]
775    #[test]
776    fn test_szip_round_trip_pipeline() {
777        let data: Vec<u8> = (0..2048).map(|i| (i % 256) as u8).collect();
778
779        // AEC_DATA_PREPROCESS = 8 in both libaec-sys and tensogram-szip
780        let preprocess_flag = 8u32;
781
782        let config = PipelineConfig {
783            encoding: EncodingType::None,
784            filter: FilterType::None,
785            compression: CompressionType::Szip {
786                rsi: 128,
787                block_size: 16,
788                flags: preprocess_flag,
789                bits_per_sample: 8,
790            },
791            num_values: 2048,
792            byte_order: ByteOrder::Little,
793            dtype_byte_width: 1,
794            swap_unit_size: 1,
795            compression_backend: CompressionBackend::default(),
796            intra_codec_threads: 0,
797        };
798
799        let result = encode_pipeline(&data, &config).unwrap();
800        assert!(result.block_offsets.is_some());
801
802        let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
803        assert_eq!(decoded, data);
804    }
805
806    // -----------------------------------------------------------------------
807    // byteswap utility tests
808    // -----------------------------------------------------------------------
809
810    #[test]
811    fn test_byteswap_noop_for_single_byte() {
812        let mut data = vec![1, 2, 3, 4];
813        let original = data.clone();
814        byteswap(&mut data, 1).unwrap();
815        assert_eq!(data, original);
816        byteswap(&mut data, 0).unwrap();
817        assert_eq!(data, original);
818    }
819
820    #[test]
821    fn test_byteswap_2_bytes() {
822        let mut data = vec![0xAA, 0xBB, 0xCC, 0xDD];
823        byteswap(&mut data, 2).unwrap();
824        assert_eq!(data, vec![0xBB, 0xAA, 0xDD, 0xCC]);
825    }
826
827    #[test]
828    fn test_byteswap_4_bytes() {
829        let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8];
830        byteswap(&mut data, 4).unwrap();
831        assert_eq!(data, vec![4, 3, 2, 1, 8, 7, 6, 5]);
832    }
833
834    #[test]
835    fn test_byteswap_8_bytes() {
836        let mut data: Vec<u8> = (1..=16).collect();
837        byteswap(&mut data, 8).unwrap();
838        assert_eq!(
839            data,
840            vec![8, 7, 6, 5, 4, 3, 2, 1, 16, 15, 14, 13, 12, 11, 10, 9]
841        );
842    }
843
844    #[test]
845    fn test_byteswap_round_trip() {
846        let original = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
847        let mut data = original.clone();
848        byteswap(&mut data, 4).unwrap();
849        assert_ne!(data, original);
850        byteswap(&mut data, 4).unwrap();
851        assert_eq!(data, original);
852    }
853
854    #[test]
855    fn test_byteswap_misaligned_returns_error() {
856        let mut data = vec![1, 2, 3, 4, 5]; // 5 bytes, not a multiple of 4
857        let result = byteswap(&mut data, 4);
858        assert!(result.is_err());
859    }
860
861    // -----------------------------------------------------------------------
862    // Native byte-order decode tests
863    // -----------------------------------------------------------------------
864
865    #[test]
866    fn test_decode_native_byte_order_encoding_none() {
867        // Encode as big-endian float32 on a (likely) little-endian machine.
868        let value: f32 = 42.0;
869        let be_bytes = value.to_be_bytes();
870        let config = PipelineConfig {
871            encoding: EncodingType::None,
872            filter: FilterType::None,
873            compression: CompressionType::None,
874            num_values: 1,
875            byte_order: ByteOrder::Big,
876            dtype_byte_width: 4,
877            swap_unit_size: 4,
878            compression_backend: CompressionBackend::default(),
879            intra_codec_threads: 0,
880        };
881
882        let result = encode_pipeline(&be_bytes, &config).unwrap();
883
884        // Decode with native_byte_order=true: should get native-endian bytes.
885        let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
886        let ne_value = f32::from_ne_bytes(native_decoded[..4].try_into().unwrap());
887        assert_eq!(ne_value, value);
888
889        // Decode with native_byte_order=false: should get big-endian bytes.
890        let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
891        let be_value = f32::from_be_bytes(wire_decoded[..4].try_into().unwrap());
892        assert_eq!(be_value, value);
893    }
894
895    #[test]
896    fn test_decode_native_byte_order_simple_packing() {
897        let values: Vec<f64> = vec![100.0, 200.0, 300.0, 400.0];
898        // Encode with big-endian byte order.
899        let data: Vec<u8> = values.iter().flat_map(|v| v.to_be_bytes()).collect();
900        let params = simple_packing::compute_params(&values, 24, 0).unwrap();
901
902        let config = PipelineConfig {
903            encoding: EncodingType::SimplePacking(params),
904            filter: FilterType::None,
905            compression: CompressionType::None,
906            num_values: values.len(),
907            byte_order: ByteOrder::Big,
908            dtype_byte_width: 8,
909            swap_unit_size: 8,
910            compression_backend: CompressionBackend::default(),
911            intra_codec_threads: 0,
912        };
913
914        let result = encode_pipeline(&data, &config).unwrap();
915
916        // Decode with native_byte_order=true: result should be native f64.
917        let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
918        let decoded_values: Vec<f64> = native_decoded
919            .chunks_exact(8)
920            .map(|c| f64::from_ne_bytes(c.try_into().unwrap()))
921            .collect();
922        for (orig, dec) in values.iter().zip(decoded_values.iter()) {
923            assert!((orig - dec).abs() < 1.0, "orig={orig}, dec={dec}");
924        }
925
926        // Decode with native_byte_order=false: result should be big-endian f64.
927        let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
928        let wire_values: Vec<f64> = wire_decoded
929            .chunks_exact(8)
930            .map(|c| f64::from_be_bytes(c.try_into().unwrap()))
931            .collect();
932        for (orig, dec) in values.iter().zip(wire_values.iter()) {
933            assert!((orig - dec).abs() < 1.0, "orig={orig}, dec={dec}");
934        }
935    }
936
937    #[test]
938    fn test_native_byte_order_same_as_wire_is_noop() {
939        // When wire byte order == native, native_byte_order=true/false should
940        // produce identical output (no swap needed either way).
941        let values: Vec<f32> = vec![1.0, 2.0, 3.0, 4.0];
942        let data: Vec<u8> = values.iter().flat_map(|v| v.to_ne_bytes()).collect();
943
944        let config = PipelineConfig {
945            encoding: EncodingType::None,
946            filter: FilterType::None,
947            compression: CompressionType::None,
948            num_values: values.len(),
949            byte_order: ByteOrder::native(),
950            dtype_byte_width: 4,
951            swap_unit_size: 4,
952            compression_backend: CompressionBackend::default(),
953            intra_codec_threads: 0,
954        };
955
956        let result = encode_pipeline(&data, &config).unwrap();
957        let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
958        let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
959        assert_eq!(native_decoded, wire_decoded);
960    }
961
962    #[test]
963    fn test_decode_native_byte_order_2byte_dtype() {
964        // int16 / uint16 / float16 — 2-byte swap unit.
965        let value: u16 = 0x0102;
966        let be_bytes = value.to_be_bytes();
967        let config = PipelineConfig {
968            encoding: EncodingType::None,
969            filter: FilterType::None,
970            compression: CompressionType::None,
971            num_values: 1,
972            byte_order: ByteOrder::Big,
973            dtype_byte_width: 2,
974            swap_unit_size: 2,
975            compression_backend: CompressionBackend::default(),
976            intra_codec_threads: 0,
977        };
978        let result = encode_pipeline(&be_bytes, &config).unwrap();
979        let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
980        assert_eq!(u16::from_ne_bytes(native[..2].try_into().unwrap()), value);
981    }
982
983    #[test]
984    fn test_decode_native_byte_order_8byte_dtype() {
985        // float64 / int64 / uint64 — 8-byte swap unit.
986        let value: f64 = std::f64::consts::E;
987        let be_bytes = value.to_be_bytes();
988        let config = PipelineConfig {
989            encoding: EncodingType::None,
990            filter: FilterType::None,
991            compression: CompressionType::None,
992            num_values: 1,
993            byte_order: ByteOrder::Big,
994            dtype_byte_width: 8,
995            swap_unit_size: 8,
996            compression_backend: CompressionBackend::default(),
997            intra_codec_threads: 0,
998        };
999        let result = encode_pipeline(&be_bytes, &config).unwrap();
1000        let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1001        assert_eq!(f64::from_ne_bytes(native[..8].try_into().unwrap()), value);
1002    }
1003
1004    #[test]
1005    fn test_decode_native_byte_order_complex64() {
1006        // complex64 = two float32 — swap_unit_size=4, dtype_byte_width=8.
1007        // Each 4-byte component must be swapped independently.
1008        let real: f32 = 1.5;
1009        let imag: f32 = 2.5;
1010        let mut be_bytes = Vec::new();
1011        be_bytes.extend_from_slice(&real.to_be_bytes());
1012        be_bytes.extend_from_slice(&imag.to_be_bytes());
1013        let config = PipelineConfig {
1014            encoding: EncodingType::None,
1015            filter: FilterType::None,
1016            compression: CompressionType::None,
1017            num_values: 1,
1018            byte_order: ByteOrder::Big,
1019            dtype_byte_width: 8,
1020            swap_unit_size: 4, // complex64: swap each float32 component
1021            compression_backend: CompressionBackend::default(),
1022            intra_codec_threads: 0,
1023        };
1024        let result = encode_pipeline(&be_bytes, &config).unwrap();
1025        let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1026        let decoded_real = f32::from_ne_bytes(native[0..4].try_into().unwrap());
1027        let decoded_imag = f32::from_ne_bytes(native[4..8].try_into().unwrap());
1028        assert_eq!(decoded_real, real);
1029        assert_eq!(decoded_imag, imag);
1030    }
1031
1032    #[test]
1033    fn test_decode_native_byte_order_uint8_noop() {
1034        // uint8 / int8 — swap_unit_size=1, byteswap should be a no-op.
1035        let data = vec![1u8, 2, 3, 4, 5];
1036        let config = PipelineConfig {
1037            encoding: EncodingType::None,
1038            filter: FilterType::None,
1039            compression: CompressionType::None,
1040            num_values: 5,
1041            byte_order: ByteOrder::Big, // cross-endian, but 1-byte → no-op
1042            dtype_byte_width: 1,
1043            swap_unit_size: 1,
1044            compression_backend: CompressionBackend::default(),
1045            intra_codec_threads: 0,
1046        };
1047        let result = encode_pipeline(&data, &config).unwrap();
1048        let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1049        assert_eq!(native, data); // no swap for single-byte types
1050    }
1051}