Skip to main content

tensogram_encodings/
pipeline.rs

1// (C) Copyright 2026- ECMWF and individual contributors.
2//
3// This software is licensed under the terms of the Apache Licence Version 2.0
4// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5// In applying this licence, ECMWF does not waive the privileges and immunities
6// granted to it by virtue of its status as an intergovernmental organisation nor
7// does it submit to any jurisdiction.
8
9use std::borrow::Cow;
10
11#[cfg(feature = "blosc2")]
12use crate::compression::Blosc2Compressor;
13#[cfg(feature = "lz4")]
14use crate::compression::Lz4Compressor;
15#[cfg(feature = "sz3")]
16use crate::compression::Sz3Compressor;
17#[cfg(feature = "szip")]
18use crate::compression::SzipCompressor;
19#[cfg(feature = "szip-pure")]
20use crate::compression::SzipPureCompressor;
21#[cfg(feature = "zfp")]
22use crate::compression::ZfpCompressor;
23#[cfg(feature = "zstd")]
24use crate::compression::ZstdCompressor;
25#[cfg(feature = "zstd-pure")]
26use crate::compression::ZstdPureCompressor;
27use crate::compression::{CompressResult, CompressionError, Compressor};
28use crate::shuffle;
29use crate::simple_packing::{self, PackingError, SimplePackingParams};
30use serde::{Deserialize, Serialize};
31use std::sync::OnceLock;
32use thiserror::Error;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "lowercase")]
36pub enum ByteOrder {
37    Big,
38    Little,
39}
40
41impl ByteOrder {
42    /// Returns the byte order of the platform this code was compiled for.
43    #[inline]
44    pub fn native() -> Self {
45        #[cfg(target_endian = "little")]
46        {
47            ByteOrder::Little
48        }
49        #[cfg(target_endian = "big")]
50        {
51            ByteOrder::Big
52        }
53    }
54}
55
56/// Reverse bytes within each `unit_size`-byte chunk of `data`, converting
57/// between big-endian and little-endian representations in place.
58///
59/// No-op when `unit_size <= 1` (single-byte types have no byte order).
60/// Returns an error if `data.len()` is not a multiple of `unit_size`.
61/// For complex types, pass the scalar component size (e.g. 4 for complex64)
62/// so that each float32 component is swapped independently.
63pub fn byteswap(data: &mut [u8], unit_size: usize) -> Result<(), PipelineError> {
64    if unit_size <= 1 {
65        return Ok(());
66    }
67    if !data.len().is_multiple_of(unit_size) {
68        return Err(PipelineError::Range(format!(
69            "byteswap: data length {} is not a multiple of unit_size {}",
70            data.len(),
71            unit_size,
72        )));
73    }
74    for chunk in data.chunks_exact_mut(unit_size) {
75        chunk.reverse();
76    }
77    Ok(())
78}
79
80#[derive(Debug, Error)]
81pub enum PipelineError {
82    #[error("encoding error: {0}")]
83    Encoding(#[from] PackingError),
84    #[error("compression error: {0}")]
85    Compression(#[from] CompressionError),
86    #[error("shuffle error: {0}")]
87    Shuffle(String),
88    #[error("range error: {0}")]
89    Range(String),
90    #[error("unknown encoding: {0}")]
91    UnknownEncoding(String),
92    #[error("unknown filter: {0}")]
93    UnknownFilter(String),
94    #[error("unknown compression: {0}")]
95    UnknownCompression(String),
96}
97
98#[derive(Debug, Clone)]
99pub enum EncodingType {
100    None,
101    SimplePacking(SimplePackingParams),
102}
103
104impl std::fmt::Display for EncodingType {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            EncodingType::None => write!(f, "none"),
108            EncodingType::SimplePacking(_) => write!(f, "simple_packing"),
109        }
110    }
111}
112
113#[derive(Debug, Clone)]
114pub enum FilterType {
115    None,
116    Shuffle { element_size: usize },
117}
118
119#[cfg(feature = "blosc2")]
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
121#[serde(rename_all = "lowercase")]
122pub enum Blosc2Codec {
123    Blosclz,
124    Lz4,
125    Lz4hc,
126    Zlib,
127    Zstd,
128}
129
130#[cfg(feature = "zfp")]
131#[derive(Debug, Clone)]
132pub enum ZfpMode {
133    FixedRate { rate: f64 },
134    FixedPrecision { precision: u32 },
135    FixedAccuracy { tolerance: f64 },
136}
137
138#[cfg(feature = "sz3")]
139#[derive(Debug, Clone)]
140pub enum Sz3ErrorBound {
141    Absolute(f64),
142    Relative(f64),
143    Psnr(f64),
144}
145
146#[derive(Debug, Clone)]
147pub enum CompressionType {
148    None,
149    #[cfg(any(feature = "szip", feature = "szip-pure"))]
150    Szip {
151        rsi: u32,
152        block_size: u32,
153        flags: u32,
154        bits_per_sample: u32,
155    },
156    #[cfg(any(feature = "zstd", feature = "zstd-pure"))]
157    Zstd {
158        level: i32,
159    },
160    #[cfg(feature = "lz4")]
161    Lz4,
162    #[cfg(feature = "blosc2")]
163    Blosc2 {
164        codec: Blosc2Codec,
165        clevel: i32,
166        typesize: usize,
167    },
168    #[cfg(feature = "zfp")]
169    Zfp {
170        mode: ZfpMode,
171    },
172    #[cfg(feature = "sz3")]
173    Sz3 {
174        error_bound: Sz3ErrorBound,
175    },
176}
177
178/// Selects which backend to use when both FFI and pure-Rust implementations
179/// are compiled in for the same codec (szip or zstd).
180///
181/// When only one feature is enabled the backend field is ignored — the
182/// available implementation is always used.
183///
184/// The default is resolved once from the `TENSOGRAM_COMPRESSION_BACKEND`
185/// environment variable (values: `ffi` or `pure`).
186/// On `wasm32` the default is always `Pure`.
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub enum CompressionBackend {
189    /// Use the C FFI implementation (libaec for szip, libzstd for zstd).
190    /// Falls back to pure-Rust if the FFI feature is not compiled in.
191    Ffi,
192    /// Use the pure-Rust implementation (tensogram-szip, ruzstd).
193    /// Falls back to FFI if the pure feature is not compiled in.
194    Pure,
195}
196
197impl Default for CompressionBackend {
198    fn default() -> Self {
199        default_compression_backend()
200    }
201}
202
203/// Resolve the default compression backend from environment variables.
204///
205/// - `TENSOGRAM_COMPRESSION_BACKEND=pure|ffi` overrides the default for
206///   both szip and zstd.
207/// - On `wasm32` the default is always `Pure` (FFI backends cannot exist).
208/// - On native the default is `Ffi` (faster, battle-tested).
209pub fn default_compression_backend() -> CompressionBackend {
210    static DEFAULT: OnceLock<CompressionBackend> = OnceLock::new();
211    *DEFAULT.get_or_init(|| {
212        if cfg!(target_arch = "wasm32") {
213            return CompressionBackend::Pure;
214        }
215        // Check env — accept "pure" or "ffi" (case-insensitive)
216        if let Ok(val) = std::env::var("TENSOGRAM_COMPRESSION_BACKEND") {
217            return parse_backend(&val);
218        }
219        // Native default: prefer FFI
220        CompressionBackend::Ffi
221    })
222}
223
224fn parse_backend(val: &str) -> CompressionBackend {
225    match val.trim().to_ascii_lowercase().as_str() {
226        "pure" | "rust" => CompressionBackend::Pure,
227        _ => CompressionBackend::Ffi,
228    }
229}
230
231#[derive(Debug, Clone)]
232pub struct PipelineConfig {
233    pub encoding: EncodingType,
234    pub filter: FilterType,
235    pub compression: CompressionType,
236    pub num_values: usize,
237    pub byte_order: ByteOrder,
238    pub dtype_byte_width: usize,
239    /// The size of the fundamental scalar component for byte-order swapping.
240    /// Equal to `dtype_byte_width` for simple types.  For complex types the
241    /// swap must operate on each float component independently, so this is
242    /// half of `dtype_byte_width` (e.g. 4 for complex64, 8 for complex128).
243    pub swap_unit_size: usize,
244    /// Which backend to use for szip / zstd when both are compiled in.
245    pub compression_backend: CompressionBackend,
246    /// Intra-codec thread budget for this object.
247    ///
248    /// - `0` — sequential (default; preserves pre-threads behaviour).
249    /// - `N ≥ 1` — codec may use up to `N` threads internally.  Only a
250    ///   subset of codecs honour this (currently: blosc2, zstd FFI,
251    ///   simple_packing, shuffle); others ignore it.  Output bytes are
252    ///   byte-identical regardless of `N`.
253    ///
254    /// Callers should expect this to be set by the `tensogram`
255    /// layer after consulting
256    /// [`EncodeOptions.threads`](../../../tensogram/encode/struct.EncodeOptions.html#structfield.threads)
257    /// and the small-message threshold; direct pipeline callers may
258    /// leave it at `0`.
259    pub intra_codec_threads: u32,
260    /// Opt-in: compute xxh3-64 over the final encoded bytes inline with
261    /// encoding, exposing the digest via [`PipelineResult::hash`].
262    ///
263    /// - `false` (default) — the pipeline does no hashing; callers that
264    ///   need a hash must walk the output buffer themselves with
265    ///   `xxhash_rust::xxh3::xxh3_64`.
266    /// - `true` — the pipeline drives an `Xxh3Default` hasher in lockstep
267    ///   with the codec output, eliminating a second pass over the
268    ///   encoded buffer.  The digest is bit-identical to
269    ///   `xxhash_rust::xxh3::xxh3_64(&encoded_bytes)` by construction.
270    ///
271    /// Hashing always runs in the calling thread *after* any internal
272    /// codec parallelism has joined; it never participates in the
273    /// intra-codec thread budget.  For transparent codecs the hash is
274    /// byte-identical across `intra_codec_threads` values; for opaque
275    /// codecs the hash tracks the codec's output bytes (which may reorder
276    /// by worker-completion order).
277    pub compute_hash: bool,
278}
279
280pub struct PipelineResult {
281    pub encoded_bytes: Vec<u8>,
282    /// Block offsets produced by compressors that support random access (szip, blosc2).
283    pub block_offsets: Option<Vec<u64>>,
284    /// xxh3-64 digest over `encoded_bytes`, produced inline with encoding
285    /// when [`PipelineConfig::compute_hash`] was `true`.  `None` otherwise.
286    ///
287    /// The digest is bit-identical to `xxhash_rust::xxh3::xxh3_64(&encoded_bytes)`
288    /// by construction (both use seed 0 and the default xxh3 secret).
289    pub hash: Option<u64>,
290}
291
292/// Build an szip compressor, dispatching between FFI and pure-Rust at runtime.
293#[cfg(any(feature = "szip", feature = "szip-pure"))]
294fn build_szip_compressor(
295    #[allow(unused_variables)] backend: CompressionBackend,
296    rsi: u32,
297    block_size: u32,
298    flags: u32,
299    bits_per_sample: u32,
300) -> Box<dyn Compressor> {
301    // When both features are compiled in, dispatch at runtime.
302    #[cfg(all(feature = "szip", feature = "szip-pure"))]
303    if matches!(backend, CompressionBackend::Pure) {
304        return Box::new(SzipPureCompressor {
305            rsi,
306            block_size,
307            flags,
308            bits_per_sample,
309        });
310    }
311
312    // FFI path — used when szip feature is enabled and either:
313    // (a) both features compiled but backend is Ffi, or
314    // (b) only szip is compiled.
315    #[cfg(feature = "szip")]
316    {
317        Box::new(SzipCompressor {
318            rsi,
319            block_size,
320            flags,
321            bits_per_sample,
322        })
323    }
324
325    // Pure-only path — used when only szip-pure is compiled (no FFI).
326    #[cfg(all(feature = "szip-pure", not(feature = "szip")))]
327    {
328        Box::new(SzipPureCompressor {
329            rsi,
330            block_size,
331            flags,
332            bits_per_sample,
333        })
334    }
335}
336
337/// Build a zstd compressor, dispatching between FFI and pure-Rust at runtime.
338///
339/// `nb_workers` is forwarded to the FFI path (libzstd `NbWorkers`
340/// parameter) and ignored by the pure-Rust path (ruzstd is
341/// single-threaded).
342#[cfg(any(feature = "zstd", feature = "zstd-pure"))]
343fn build_zstd_compressor(
344    #[allow(unused_variables)] backend: CompressionBackend,
345    level: i32,
346    #[allow(unused_variables)] nb_workers: u32,
347) -> Box<dyn Compressor> {
348    #[cfg(all(feature = "zstd", feature = "zstd-pure"))]
349    if matches!(backend, CompressionBackend::Pure) {
350        return Box::new(ZstdPureCompressor { level });
351    }
352
353    #[cfg(feature = "zstd")]
354    {
355        Box::new(ZstdCompressor { level, nb_workers })
356    }
357
358    #[cfg(all(feature = "zstd-pure", not(feature = "zstd")))]
359    {
360        Box::new(ZstdPureCompressor { level })
361    }
362}
363
364/// Build a boxed compressor from a CompressionType variant.
365///
366/// For szip and zstd, the `compression_backend` field in `PipelineConfig`
367/// selects between FFI and pure-Rust implementations at **runtime**.  When
368/// only one feature is compiled in, the backend field is ignored.
369fn build_compressor(
370    compression: &CompressionType,
371    #[allow(unused_variables)] config: &PipelineConfig,
372) -> Result<Option<Box<dyn Compressor>>, CompressionError> {
373    match compression {
374        CompressionType::None => Ok(None),
375        #[cfg(any(feature = "szip", feature = "szip-pure"))]
376        CompressionType::Szip {
377            rsi,
378            block_size,
379            flags,
380            bits_per_sample,
381        } => {
382            let mut szip_flags = *flags;
383            // simple_packing output is MSB-first; tell the szip codec so its
384            // predictor sees bytes in the correct significance order.
385            // AEC_DATA_MSB = 4 in both libaec-sys and tensogram-szip.
386            if matches!(config.encoding, EncodingType::SimplePacking(_)) {
387                szip_flags |= 4; // AEC_DATA_MSB
388            }
389
390            // Runtime backend selection.  The helper builds the right
391            // Box<dyn Compressor> based on what features are compiled in
392            // and what the caller requested.
393            let compressor: Box<dyn Compressor> = build_szip_compressor(
394                config.compression_backend,
395                *rsi,
396                *block_size,
397                szip_flags,
398                *bits_per_sample,
399            );
400            Ok(Some(compressor))
401        }
402        #[cfg(any(feature = "zstd", feature = "zstd-pure"))]
403        CompressionType::Zstd { level } => {
404            let compressor: Box<dyn Compressor> = build_zstd_compressor(
405                config.compression_backend,
406                *level,
407                config.intra_codec_threads,
408            );
409            Ok(Some(compressor))
410        }
411        #[cfg(feature = "lz4")]
412        CompressionType::Lz4 => Ok(Some(Box::new(Lz4Compressor))),
413        #[cfg(feature = "blosc2")]
414        CompressionType::Blosc2 {
415            codec,
416            clevel,
417            typesize,
418        } => Ok(Some(Box::new(Blosc2Compressor {
419            codec: *codec,
420            clevel: *clevel,
421            typesize: *typesize,
422            nthreads: config.intra_codec_threads,
423        }))),
424        #[cfg(feature = "zfp")]
425        CompressionType::Zfp { mode } => Ok(Some(Box::new(ZfpCompressor {
426            mode: mode.clone(),
427            num_values: config.num_values,
428            byte_order: config.byte_order,
429        }))),
430        #[cfg(feature = "sz3")]
431        CompressionType::Sz3 { error_bound } => Ok(Some(Box::new(Sz3Compressor {
432            error_bound: error_bound.clone(),
433            num_values: config.num_values,
434            byte_order: config.byte_order,
435        }))),
436    }
437}
438
439/// Copy `src` into a freshly allocated `Vec<u8>` while updating `hasher` in
440/// lockstep — one pass over the data.
441///
442/// When `hasher` is `None` this is a plain `src.to_vec()`.  When `hasher` is
443/// `Some`, the function walks `src` in 64 KiB chunks: each chunk is first
444/// fed to the hasher (bringing the bytes into L1/L2) and then appended to
445/// the destination `Vec` (still cache-hot).  This avoids the second DRAM
446/// read that `src.to_vec()` followed by `xxh3_64(&dst)` would incur on
447/// buffers larger than L3.
448///
449/// The chunk size is a power-of-two that comfortably fits inside a typical
450/// L2 cache while amortising the per-chunk call overhead of
451/// `Xxh3Default::update`.
452#[inline]
453fn copy_and_hash(src: &[u8], hasher: Option<&mut xxhash_rust::xxh3::Xxh3Default>) -> Vec<u8> {
454    match hasher {
455        None => src.to_vec(),
456        Some(h) => {
457            const CHUNK: usize = 64 * 1024;
458            let mut dst = Vec::with_capacity(src.len());
459            let mut offset = 0;
460            while offset < src.len() {
461                let end = (offset + CHUNK).min(src.len());
462                let chunk = &src[offset..end];
463                h.update(chunk);
464                dst.extend_from_slice(chunk);
465                offset = end;
466            }
467            dst
468        }
469    }
470}
471
472/// Feed `bytes` to an optional hasher.  Used at each codec exit point in
473/// `encode_pipeline` — the codec just wrote those bytes, so they are
474/// maximally cache-hot; the hasher reads them from L2/L3 rather than DRAM.
475#[inline]
476fn update_hasher(bytes: &[u8], hasher: Option<&mut xxhash_rust::xxh3::Xxh3Default>) {
477    if let Some(h) = hasher {
478        h.update(bytes);
479    }
480}
481
482/// Full forward pipeline: encode → filter → compress.
483///
484/// When `config.compute_hash` is `true`, the xxh3-64 digest of the final
485/// encoded bytes is produced inline with the codec output (no second pass
486/// over the buffer) and returned via `PipelineResult.hash`.  The digest is
487/// bit-identical to what `xxhash_rust::xxh3::xxh3_64(&encoded_bytes)` would
488/// return, by construction — both use seed 0 and the default secret.
489///
490/// Hashing runs entirely in the calling thread *after* any intra-codec
491/// parallelism has joined.  The hasher is never shared across threads.
492#[tracing::instrument(skip(data, config), fields(data_len = data.len(), encoding = %config.encoding))]
493pub fn encode_pipeline(
494    data: &[u8],
495    config: &PipelineConfig,
496) -> Result<PipelineResult, PipelineError> {
497    let mut hasher = config
498        .compute_hash
499        .then(xxhash_rust::xxh3::Xxh3Default::new);
500
501    // Step 1: Encoding — Cow avoids cloning when encoding is None
502    let encoded: Cow<'_, [u8]> = match &config.encoding {
503        EncodingType::None => Cow::Borrowed(data),
504        EncodingType::SimplePacking(params) => {
505            let values = bytes_to_f64(data, config.byte_order);
506            Cow::Owned(simple_packing::encode_with_threads(
507                &values,
508                params,
509                config.intra_codec_threads,
510            )?)
511        }
512    };
513
514    // Step 2: Filter
515    let filtered: Cow<'_, [u8]> = match &config.filter {
516        FilterType::None => encoded,
517        FilterType::Shuffle { element_size } => Cow::Owned(
518            shuffle::shuffle_with_threads(&encoded, *element_size, config.intra_codec_threads)
519                .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
520        ),
521    };
522
523    // Step 3: Compression
524    let compressor = build_compressor(&config.compression, config)?;
525    let (encoded_bytes, block_offsets) = match compressor {
526        None => {
527            // No compression: if `filtered` is still borrowed from `data`
528            // (passthrough pipeline) we fuse the copy with hashing to
529            // avoid a second walk over the source.  Otherwise it is
530            // already owned and we just hash the Vec in place.
531            let owned = match filtered {
532                Cow::Borrowed(src) => copy_and_hash(src, hasher.as_mut()),
533                Cow::Owned(buf) => {
534                    update_hasher(&buf, hasher.as_mut());
535                    buf
536                }
537            };
538            (owned, None)
539        }
540        Some(compressor) => {
541            let CompressResult {
542                data: compressed,
543                block_offsets,
544            } = compressor.compress(&filtered)?;
545            update_hasher(&compressed, hasher.as_mut());
546            (compressed, block_offsets)
547        }
548    };
549
550    Ok(PipelineResult {
551        encoded_bytes,
552        block_offsets,
553        hash: hasher.map(|h| h.digest()),
554    })
555}
556
557/// Encode from f64 values directly, avoiding the bytes→f64 conversion overhead
558/// that `encode_pipeline` pays when the caller already has typed values.
559///
560/// Hash handling is identical to [`encode_pipeline`] — see that function's
561/// documentation for the `compute_hash` contract.
562#[tracing::instrument(skip(values, config), fields(num_values = values.len(), encoding = %config.encoding))]
563pub fn encode_pipeline_f64(
564    values: &[f64],
565    config: &PipelineConfig,
566) -> Result<PipelineResult, PipelineError> {
567    let mut hasher = config
568        .compute_hash
569        .then(xxhash_rust::xxh3::Xxh3Default::new);
570
571    let encoded: Cow<'_, [u8]> = match &config.encoding {
572        EncodingType::None => Cow::Owned(f64_to_bytes(values, config.byte_order)),
573        EncodingType::SimplePacking(params) => Cow::Owned(simple_packing::encode_with_threads(
574            values,
575            params,
576            config.intra_codec_threads,
577        )?),
578    };
579
580    let filtered: Cow<'_, [u8]> = match &config.filter {
581        FilterType::None => encoded,
582        FilterType::Shuffle { element_size } => Cow::Owned(
583            shuffle::shuffle_with_threads(&encoded, *element_size, config.intra_codec_threads)
584                .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
585        ),
586    };
587
588    let compressor = build_compressor(&config.compression, config)?;
589    let (encoded_bytes, block_offsets) = match compressor {
590        None => {
591            // `encoded` is always owned in this function (both branches of
592            // the match above construct a Cow::Owned), so `into_owned` is
593            // a zero-cost unwrap.
594            let owned = filtered.into_owned();
595            update_hasher(&owned, hasher.as_mut());
596            (owned, None)
597        }
598        Some(compressor) => {
599            let CompressResult {
600                data: compressed,
601                block_offsets,
602            } = compressor.compress(&filtered)?;
603            update_hasher(&compressed, hasher.as_mut());
604            (compressed, block_offsets)
605        }
606    };
607
608    Ok(PipelineResult {
609        encoded_bytes,
610        block_offsets,
611        hash: hasher.map(|h| h.digest()),
612    })
613}
614
615/// Full reverse pipeline: decompress → unshuffle → decode → native byteswap.
616///
617/// When `native_byte_order` is true (the default at the API level), the
618/// output bytes are converted to the caller's native byte order so that a
619/// simple `reinterpret_cast` or `from_ne_bytes` produces correct values.
620/// When false, bytes are returned in the message's declared wire byte order.
621#[tracing::instrument(skip(encoded, config), fields(encoded_len = encoded.len()))]
622pub fn decode_pipeline(
623    encoded: &[u8],
624    config: &PipelineConfig,
625    native_byte_order: bool,
626) -> Result<Vec<u8>, PipelineError> {
627    // Step 1: Decompress — Cow avoids cloning when no compression
628    let decompressed: Cow<'_, [u8]> = match build_compressor(&config.compression, config)? {
629        None => Cow::Borrowed(encoded),
630        Some(compressor) => {
631            let expected_size = estimate_decompressed_size(config);
632            Cow::Owned(compressor.decompress(encoded, expected_size)?)
633        }
634    };
635
636    // Step 2: Unshuffle
637    let unfiltered: Cow<'_, [u8]> = match &config.filter {
638        FilterType::None => decompressed,
639        FilterType::Shuffle { element_size } => Cow::Owned(
640            shuffle::unshuffle_with_threads(
641                &decompressed,
642                *element_size,
643                config.intra_codec_threads,
644            )
645            .map_err(|e| PipelineError::Shuffle(e.to_string()))?,
646        ),
647    };
648
649    // Determine the target byte order for the output.  When the caller
650    // requests native byte order, simple_packing can write directly in
651    // native (avoiding a redundant write + swap).
652    let target_byte_order = if native_byte_order {
653        ByteOrder::native()
654    } else {
655        config.byte_order
656    };
657
658    // Step 3: Decode encoding
659    let mut decoded = match &config.encoding {
660        EncodingType::None => unfiltered.into_owned(),
661        EncodingType::SimplePacking(params) => {
662            // simple_packing decodes to Vec<f64> in-register values (no byte
663            // order) then serialises directly to the target byte order.
664            let values = simple_packing::decode_with_threads(
665                &unfiltered,
666                config.num_values,
667                params,
668                config.intra_codec_threads,
669            )?;
670            f64_to_bytes(&values, target_byte_order)
671        }
672    };
673
674    // Step 4: Native-endian byteswap for encoding=none.
675    // (simple_packing already wrote in target_byte_order above.)
676    if native_byte_order
677        && matches!(config.encoding, EncodingType::None)
678        && config.byte_order != ByteOrder::native()
679    {
680        byteswap(&mut decoded, config.swap_unit_size)?;
681    }
682
683    Ok(decoded)
684}
685
686/// Decode a partial sample range from a compressed+encoded pipeline.
687///
688/// Supports compressors with random access (szip, blosc2, zfp fixed-rate).
689/// Shuffle filter is not supported with range decode.
690///
691/// `sample_offset` and `sample_count` are in logical element units.
692/// `block_offsets` are block boundary offsets from encoding (compressor-specific).
693///
694/// When `native_byte_order` is true, the output bytes are in the caller's
695/// native byte order.
696pub fn decode_range_pipeline(
697    encoded: &[u8],
698    config: &PipelineConfig,
699    block_offsets: &[u64],
700    sample_offset: u64,
701    sample_count: u64,
702    native_byte_order: bool,
703) -> Result<Vec<u8>, PipelineError> {
704    if matches!(config.filter, FilterType::Shuffle { .. }) {
705        return Err(PipelineError::Shuffle(
706            "partial range decode is not supported with shuffle filter".to_string(),
707        ));
708    }
709
710    // Phase 1: Compute byte range needed from the (possibly compressed) stream
711    let (byte_start, byte_size, bit_offset_in_chunk) = match &config.encoding {
712        EncodingType::SimplePacking(params) => {
713            let bit_start = sample_offset * params.bits_per_value as u64;
714            let bit_count = sample_count * params.bits_per_value as u64;
715            let bs = (bit_start / 8) as usize;
716            let be = (bit_start + bit_count).div_ceil(8) as usize;
717            (bs, be - bs, Some((bit_start % 8) as usize))
718        }
719        EncodingType::None => {
720            let elem_size = config.dtype_byte_width;
721            let bs = (sample_offset as usize)
722                .checked_mul(elem_size)
723                .ok_or_else(|| PipelineError::Range("byte offset overflow".to_string()))?;
724            let sz = (sample_count as usize)
725                .checked_mul(elem_size)
726                .ok_or_else(|| PipelineError::Range("byte count overflow".to_string()))?;
727            (bs, sz, None)
728        }
729    };
730
731    // Phase 2: Get decompressed bytes for the range
732    let decompressed = match build_compressor(&config.compression, config)? {
733        None => {
734            // No compression: slice directly from encoded buffer
735            let byte_end = byte_start
736                .checked_add(byte_size)
737                .ok_or_else(|| PipelineError::Range("byte end overflow".to_string()))?;
738            if byte_end > encoded.len() {
739                return Err(PipelineError::Range(format!(
740                    "range ({sample_offset}, {sample_count}) exceeds payload size"
741                )));
742            }
743            encoded[byte_start..byte_end].to_vec()
744        }
745        Some(compressor) => {
746            compressor.decompress_range(encoded, block_offsets, byte_start, byte_size)?
747        }
748    };
749
750    let target_byte_order = if native_byte_order {
751        ByteOrder::native()
752    } else {
753        config.byte_order
754    };
755
756    // Phase 3: Decode encoding from decompressed bytes
757    match &config.encoding {
758        EncodingType::None => {
759            let mut result = decompressed;
760            if native_byte_order && config.byte_order != ByteOrder::native() {
761                byteswap(&mut result, config.swap_unit_size)?;
762            }
763            Ok(result)
764        }
765        EncodingType::SimplePacking(params) => {
766            let values = simple_packing::decode_range(
767                &decompressed,
768                bit_offset_in_chunk.unwrap_or(0),
769                sample_count as usize,
770                params,
771            )?;
772            Ok(f64_to_bytes(&values, target_byte_order))
773        }
774    }
775}
776
777fn estimate_decompressed_size(config: &PipelineConfig) -> usize {
778    match &config.encoding {
779        EncodingType::None => config.num_values.saturating_mul(config.dtype_byte_width),
780        EncodingType::SimplePacking(params) => {
781            let total_bits =
782                (config.num_values as u128).saturating_mul(params.bits_per_value as u128);
783            total_bits.div_ceil(8).min(usize::MAX as u128) as usize
784        }
785    }
786}
787
788fn bytes_to_f64(data: &[u8], byte_order: ByteOrder) -> Vec<f64> {
789    data.chunks_exact(8)
790        .map(|chunk| {
791            let mut arr = [0u8; 8];
792            arr.copy_from_slice(chunk);
793            match byte_order {
794                ByteOrder::Big => f64::from_be_bytes(arr),
795                ByteOrder::Little => f64::from_le_bytes(arr),
796            }
797        })
798        .collect()
799}
800
801fn f64_to_bytes(values: &[f64], byte_order: ByteOrder) -> Vec<u8> {
802    values
803        .iter()
804        .flat_map(|v| match byte_order {
805            ByteOrder::Big => v.to_be_bytes(),
806            ByteOrder::Little => v.to_le_bytes(),
807        })
808        .collect()
809}
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814
815    #[test]
816    fn test_passthrough_pipeline() {
817        let data = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
818        let config = PipelineConfig {
819            encoding: EncodingType::None,
820            filter: FilterType::None,
821            compression: CompressionType::None,
822            num_values: 1,
823            byte_order: ByteOrder::Little,
824            dtype_byte_width: 8,
825            swap_unit_size: 8,
826            compression_backend: CompressionBackend::default(),
827            intra_codec_threads: 0,
828            compute_hash: false,
829        };
830        let result = encode_pipeline(&data, &config).unwrap();
831        assert_eq!(result.encoded_bytes, data);
832        let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
833        assert_eq!(decoded, data);
834    }
835
836    #[test]
837    fn test_simple_packing_pipeline() {
838        let values: Vec<f64> = (0..50).map(|i| 200.0 + i as f64 * 0.1).collect();
839        let data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
840        let params = simple_packing::compute_params(&values, 16, 0).unwrap();
841
842        let config = PipelineConfig {
843            encoding: EncodingType::SimplePacking(params),
844            filter: FilterType::None,
845            compression: CompressionType::None,
846            num_values: values.len(),
847            byte_order: ByteOrder::Little,
848            dtype_byte_width: 8,
849            swap_unit_size: 8,
850            compression_backend: CompressionBackend::default(),
851            intra_codec_threads: 0,
852            compute_hash: false,
853        };
854
855        let result = encode_pipeline(&data, &config).unwrap();
856        let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
857        let decoded_values = bytes_to_f64(&decoded, ByteOrder::Little);
858
859        for (orig, dec) in values.iter().zip(decoded_values.iter()) {
860            assert!((orig - dec).abs() < 0.01, "orig={orig}, dec={dec}");
861        }
862    }
863
864    #[test]
865    fn test_shuffle_pipeline() {
866        let data: Vec<u8> = (0..16).collect();
867        let config = PipelineConfig {
868            encoding: EncodingType::None,
869            filter: FilterType::Shuffle { element_size: 4 },
870            compression: CompressionType::None,
871            num_values: 4,
872            byte_order: ByteOrder::Little,
873            dtype_byte_width: 4,
874            swap_unit_size: 4,
875            compression_backend: CompressionBackend::default(),
876            intra_codec_threads: 0,
877            compute_hash: false,
878        };
879
880        let result = encode_pipeline(&data, &config).unwrap();
881        assert_ne!(result.encoded_bytes, data); // shuffled should differ
882        let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
883        assert_eq!(decoded, data);
884    }
885
886    #[cfg(any(feature = "szip", feature = "szip-pure"))]
887    #[test]
888    fn test_szip_round_trip_pipeline() {
889        let data: Vec<u8> = (0..2048).map(|i| (i % 256) as u8).collect();
890
891        // AEC_DATA_PREPROCESS = 8 in both libaec-sys and tensogram-szip
892        let preprocess_flag = 8u32;
893
894        let config = PipelineConfig {
895            encoding: EncodingType::None,
896            filter: FilterType::None,
897            compression: CompressionType::Szip {
898                rsi: 128,
899                block_size: 16,
900                flags: preprocess_flag,
901                bits_per_sample: 8,
902            },
903            num_values: 2048,
904            byte_order: ByteOrder::Little,
905            dtype_byte_width: 1,
906            swap_unit_size: 1,
907            compression_backend: CompressionBackend::default(),
908            intra_codec_threads: 0,
909            compute_hash: false,
910        };
911
912        let result = encode_pipeline(&data, &config).unwrap();
913        assert!(result.block_offsets.is_some());
914
915        let decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
916        assert_eq!(decoded, data);
917    }
918
919    // -----------------------------------------------------------------------
920    // byteswap utility tests
921    // -----------------------------------------------------------------------
922
923    #[test]
924    fn test_byteswap_noop_for_single_byte() {
925        let mut data = vec![1, 2, 3, 4];
926        let original = data.clone();
927        byteswap(&mut data, 1).unwrap();
928        assert_eq!(data, original);
929        byteswap(&mut data, 0).unwrap();
930        assert_eq!(data, original);
931    }
932
933    #[test]
934    fn test_byteswap_2_bytes() {
935        let mut data = vec![0xAA, 0xBB, 0xCC, 0xDD];
936        byteswap(&mut data, 2).unwrap();
937        assert_eq!(data, vec![0xBB, 0xAA, 0xDD, 0xCC]);
938    }
939
940    #[test]
941    fn test_byteswap_4_bytes() {
942        let mut data = vec![1, 2, 3, 4, 5, 6, 7, 8];
943        byteswap(&mut data, 4).unwrap();
944        assert_eq!(data, vec![4, 3, 2, 1, 8, 7, 6, 5]);
945    }
946
947    #[test]
948    fn test_byteswap_8_bytes() {
949        let mut data: Vec<u8> = (1..=16).collect();
950        byteswap(&mut data, 8).unwrap();
951        assert_eq!(
952            data,
953            vec![8, 7, 6, 5, 4, 3, 2, 1, 16, 15, 14, 13, 12, 11, 10, 9]
954        );
955    }
956
957    #[test]
958    fn test_byteswap_round_trip() {
959        let original = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
960        let mut data = original.clone();
961        byteswap(&mut data, 4).unwrap();
962        assert_ne!(data, original);
963        byteswap(&mut data, 4).unwrap();
964        assert_eq!(data, original);
965    }
966
967    #[test]
968    fn test_byteswap_misaligned_returns_error() {
969        let mut data = vec![1, 2, 3, 4, 5]; // 5 bytes, not a multiple of 4
970        let result = byteswap(&mut data, 4);
971        assert!(result.is_err());
972    }
973
974    // -----------------------------------------------------------------------
975    // Native byte-order decode tests
976    // -----------------------------------------------------------------------
977
978    #[test]
979    fn test_decode_native_byte_order_encoding_none() {
980        // Encode as big-endian float32 on a (likely) little-endian machine.
981        let value: f32 = 42.0;
982        let be_bytes = value.to_be_bytes();
983        let config = PipelineConfig {
984            encoding: EncodingType::None,
985            filter: FilterType::None,
986            compression: CompressionType::None,
987            num_values: 1,
988            byte_order: ByteOrder::Big,
989            dtype_byte_width: 4,
990            swap_unit_size: 4,
991            compression_backend: CompressionBackend::default(),
992            intra_codec_threads: 0,
993            compute_hash: false,
994        };
995
996        let result = encode_pipeline(&be_bytes, &config).unwrap();
997
998        // Decode with native_byte_order=true: should get native-endian bytes.
999        let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1000        let ne_value = f32::from_ne_bytes(native_decoded[..4].try_into().unwrap());
1001        assert_eq!(ne_value, value);
1002
1003        // Decode with native_byte_order=false: should get big-endian bytes.
1004        let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
1005        let be_value = f32::from_be_bytes(wire_decoded[..4].try_into().unwrap());
1006        assert_eq!(be_value, value);
1007    }
1008
1009    #[test]
1010    fn test_decode_native_byte_order_simple_packing() {
1011        let values: Vec<f64> = vec![100.0, 200.0, 300.0, 400.0];
1012        // Encode with big-endian byte order.
1013        let data: Vec<u8> = values.iter().flat_map(|v| v.to_be_bytes()).collect();
1014        let params = simple_packing::compute_params(&values, 24, 0).unwrap();
1015
1016        let config = PipelineConfig {
1017            encoding: EncodingType::SimplePacking(params),
1018            filter: FilterType::None,
1019            compression: CompressionType::None,
1020            num_values: values.len(),
1021            byte_order: ByteOrder::Big,
1022            dtype_byte_width: 8,
1023            swap_unit_size: 8,
1024            compression_backend: CompressionBackend::default(),
1025            intra_codec_threads: 0,
1026            compute_hash: false,
1027        };
1028
1029        let result = encode_pipeline(&data, &config).unwrap();
1030
1031        // Decode with native_byte_order=true: result should be native f64.
1032        let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1033        let decoded_values: Vec<f64> = native_decoded
1034            .chunks_exact(8)
1035            .map(|c| f64::from_ne_bytes(c.try_into().unwrap()))
1036            .collect();
1037        for (orig, dec) in values.iter().zip(decoded_values.iter()) {
1038            assert!((orig - dec).abs() < 1.0, "orig={orig}, dec={dec}");
1039        }
1040
1041        // Decode with native_byte_order=false: result should be big-endian f64.
1042        let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
1043        let wire_values: Vec<f64> = wire_decoded
1044            .chunks_exact(8)
1045            .map(|c| f64::from_be_bytes(c.try_into().unwrap()))
1046            .collect();
1047        for (orig, dec) in values.iter().zip(wire_values.iter()) {
1048            assert!((orig - dec).abs() < 1.0, "orig={orig}, dec={dec}");
1049        }
1050    }
1051
1052    #[test]
1053    fn test_native_byte_order_same_as_wire_is_noop() {
1054        // When wire byte order == native, native_byte_order=true/false should
1055        // produce identical output (no swap needed either way).
1056        let values: Vec<f32> = vec![1.0, 2.0, 3.0, 4.0];
1057        let data: Vec<u8> = values.iter().flat_map(|v| v.to_ne_bytes()).collect();
1058
1059        let config = PipelineConfig {
1060            encoding: EncodingType::None,
1061            filter: FilterType::None,
1062            compression: CompressionType::None,
1063            num_values: values.len(),
1064            byte_order: ByteOrder::native(),
1065            dtype_byte_width: 4,
1066            swap_unit_size: 4,
1067            compression_backend: CompressionBackend::default(),
1068            intra_codec_threads: 0,
1069            compute_hash: false,
1070        };
1071
1072        let result = encode_pipeline(&data, &config).unwrap();
1073        let native_decoded = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1074        let wire_decoded = decode_pipeline(&result.encoded_bytes, &config, false).unwrap();
1075        assert_eq!(native_decoded, wire_decoded);
1076    }
1077
1078    #[test]
1079    fn test_decode_native_byte_order_2byte_dtype() {
1080        // int16 / uint16 / float16 — 2-byte swap unit.
1081        let value: u16 = 0x0102;
1082        let be_bytes = value.to_be_bytes();
1083        let config = PipelineConfig {
1084            encoding: EncodingType::None,
1085            filter: FilterType::None,
1086            compression: CompressionType::None,
1087            num_values: 1,
1088            byte_order: ByteOrder::Big,
1089            dtype_byte_width: 2,
1090            swap_unit_size: 2,
1091            compression_backend: CompressionBackend::default(),
1092            intra_codec_threads: 0,
1093            compute_hash: false,
1094        };
1095        let result = encode_pipeline(&be_bytes, &config).unwrap();
1096        let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1097        assert_eq!(u16::from_ne_bytes(native[..2].try_into().unwrap()), value);
1098    }
1099
1100    #[test]
1101    fn test_decode_native_byte_order_8byte_dtype() {
1102        // float64 / int64 / uint64 — 8-byte swap unit.
1103        let value: f64 = std::f64::consts::E;
1104        let be_bytes = value.to_be_bytes();
1105        let config = PipelineConfig {
1106            encoding: EncodingType::None,
1107            filter: FilterType::None,
1108            compression: CompressionType::None,
1109            num_values: 1,
1110            byte_order: ByteOrder::Big,
1111            dtype_byte_width: 8,
1112            swap_unit_size: 8,
1113            compression_backend: CompressionBackend::default(),
1114            intra_codec_threads: 0,
1115            compute_hash: false,
1116        };
1117        let result = encode_pipeline(&be_bytes, &config).unwrap();
1118        let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1119        assert_eq!(f64::from_ne_bytes(native[..8].try_into().unwrap()), value);
1120    }
1121
1122    #[test]
1123    fn test_decode_native_byte_order_complex64() {
1124        // complex64 = two float32 — swap_unit_size=4, dtype_byte_width=8.
1125        // Each 4-byte component must be swapped independently.
1126        let real: f32 = 1.5;
1127        let imag: f32 = 2.5;
1128        let mut be_bytes = Vec::new();
1129        be_bytes.extend_from_slice(&real.to_be_bytes());
1130        be_bytes.extend_from_slice(&imag.to_be_bytes());
1131        let config = PipelineConfig {
1132            encoding: EncodingType::None,
1133            filter: FilterType::None,
1134            compression: CompressionType::None,
1135            num_values: 1,
1136            byte_order: ByteOrder::Big,
1137            dtype_byte_width: 8,
1138            swap_unit_size: 4, // complex64: swap each float32 component
1139            compression_backend: CompressionBackend::default(),
1140            intra_codec_threads: 0,
1141            compute_hash: false,
1142        };
1143        let result = encode_pipeline(&be_bytes, &config).unwrap();
1144        let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1145        let decoded_real = f32::from_ne_bytes(native[0..4].try_into().unwrap());
1146        let decoded_imag = f32::from_ne_bytes(native[4..8].try_into().unwrap());
1147        assert_eq!(decoded_real, real);
1148        assert_eq!(decoded_imag, imag);
1149    }
1150
1151    #[test]
1152    fn test_decode_native_byte_order_uint8_noop() {
1153        // uint8 / int8 — swap_unit_size=1, byteswap should be a no-op.
1154        let data = vec![1u8, 2, 3, 4, 5];
1155        let config = PipelineConfig {
1156            encoding: EncodingType::None,
1157            filter: FilterType::None,
1158            compression: CompressionType::None,
1159            num_values: 5,
1160            byte_order: ByteOrder::Big, // cross-endian, but 1-byte → no-op
1161            dtype_byte_width: 1,
1162            swap_unit_size: 1,
1163            compression_backend: CompressionBackend::default(),
1164            intra_codec_threads: 0,
1165            compute_hash: false,
1166        };
1167        let result = encode_pipeline(&data, &config).unwrap();
1168        let native = decode_pipeline(&result.encoded_bytes, &config, true).unwrap();
1169        assert_eq!(native, data); // no swap for single-byte types
1170    }
1171
1172    // -----------------------------------------------------------------------
1173    // Hash-while-encoding tests — guard the invariant that
1174    // PipelineResult.hash (when compute_hash = true) is byte-equivalent to
1175    // xxh3_64(encoded_bytes) computed post-hoc.
1176    // -----------------------------------------------------------------------
1177
1178    /// Helper: minimal passthrough config with a flag for hash.
1179    fn passthrough_config(num_values: usize, compute_hash: bool) -> PipelineConfig {
1180        PipelineConfig {
1181            encoding: EncodingType::None,
1182            filter: FilterType::None,
1183            compression: CompressionType::None,
1184            num_values,
1185            byte_order: ByteOrder::Little,
1186            dtype_byte_width: 1,
1187            swap_unit_size: 1,
1188            compression_backend: CompressionBackend::default(),
1189            intra_codec_threads: 0,
1190            compute_hash,
1191        }
1192    }
1193
1194    #[test]
1195    fn streaming_and_oneshot_xxh3_agree() {
1196        // Regression guard against xxhash-rust API drift: our fused path
1197        // relies on `Xxh3Default::new().update(chunks).digest()` producing
1198        // bit-identical output to `xxh3_64(concat(chunks))`.  If this ever
1199        // diverges, the hash-while-encoding optimisation would silently
1200        // corrupt hash values.
1201        use xxhash_rust::xxh3::{Xxh3Default, xxh3_64};
1202
1203        for size in [0usize, 1, 239, 240, 1024 * 1024 + 17] {
1204            let data: Vec<u8> = (0..size).map(|i| (i * 31 + 7) as u8).collect();
1205
1206            // Full one-shot.
1207            let one_shot = xxh3_64(&data);
1208
1209            // Streaming, 64 KiB chunks (matches copy_and_hash).
1210            let mut h = Xxh3Default::new();
1211            for chunk in data.chunks(64 * 1024) {
1212                h.update(chunk);
1213            }
1214            assert_eq!(h.digest(), one_shot, "streaming vs one-shot at size {size}");
1215
1216            // Streaming, 1-byte chunks — worst case for internal buffering.
1217            let mut h = Xxh3Default::new();
1218            for chunk in data.chunks(1) {
1219                h.update(chunk);
1220            }
1221            assert_eq!(
1222                h.digest(),
1223                one_shot,
1224                "streaming 1-byte chunks vs one-shot at size {size}"
1225            );
1226        }
1227    }
1228
1229    #[test]
1230    fn pipeline_hash_none_when_disabled() {
1231        let data: Vec<u8> = (0..64).collect();
1232        let config = passthrough_config(data.len(), /* compute_hash = */ false);
1233        let result = encode_pipeline(&data, &config).unwrap();
1234        assert!(
1235            result.hash.is_none(),
1236            "compute_hash = false must leave PipelineResult.hash = None"
1237        );
1238    }
1239
1240    #[test]
1241    fn pipeline_hash_matches_post_hoc_for_passthrough() {
1242        use xxhash_rust::xxh3::xxh3_64;
1243
1244        // Exercise the sizes that hit each branch of `copy_and_hash` — below
1245        // one chunk, exactly one chunk, and multiple chunks.
1246        for size in [0usize, 1, 64 * 1024 - 1, 64 * 1024, 64 * 1024 + 1, 250_000] {
1247            let data: Vec<u8> = (0..size).map(|i| (i as u32 ^ 0xA5A5A5A5) as u8).collect();
1248            let config = passthrough_config(size, true);
1249            let result = encode_pipeline(&data, &config).unwrap();
1250            let expected = xxh3_64(&result.encoded_bytes);
1251            assert_eq!(
1252                result.hash,
1253                Some(expected),
1254                "passthrough hash-while-encoding mismatch at size {size}"
1255            );
1256            assert_eq!(
1257                result.encoded_bytes, data,
1258                "passthrough must still produce identical bytes at size {size}"
1259            );
1260        }
1261    }
1262
1263    #[test]
1264    fn pipeline_hash_matches_post_hoc_for_simple_packing() {
1265        use xxhash_rust::xxh3::xxh3_64;
1266
1267        let values: Vec<f64> = (0..10_000).map(|i| 200.0 + i as f64 * 0.1).collect();
1268        let data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
1269        let params = simple_packing::compute_params(&values, 16, 0).unwrap();
1270
1271        let config = PipelineConfig {
1272            encoding: EncodingType::SimplePacking(params),
1273            filter: FilterType::None,
1274            compression: CompressionType::None,
1275            num_values: values.len(),
1276            byte_order: ByteOrder::Little,
1277            dtype_byte_width: 8,
1278            swap_unit_size: 8,
1279            compression_backend: CompressionBackend::default(),
1280            intra_codec_threads: 0,
1281            compute_hash: true,
1282        };
1283        let result = encode_pipeline(&data, &config).unwrap();
1284        let expected = xxh3_64(&result.encoded_bytes);
1285        assert_eq!(result.hash, Some(expected));
1286    }
1287
1288    #[cfg(feature = "lz4")]
1289    #[test]
1290    fn pipeline_hash_matches_post_hoc_for_lz4() {
1291        use xxhash_rust::xxh3::xxh3_64;
1292
1293        let data: Vec<u8> = (0..16_000).map(|i| (i % 257) as u8).collect();
1294        let config = PipelineConfig {
1295            encoding: EncodingType::None,
1296            filter: FilterType::None,
1297            compression: CompressionType::Lz4,
1298            num_values: data.len(),
1299            byte_order: ByteOrder::Little,
1300            dtype_byte_width: 1,
1301            swap_unit_size: 1,
1302            compression_backend: CompressionBackend::default(),
1303            intra_codec_threads: 0,
1304            compute_hash: true,
1305        };
1306        let result = encode_pipeline(&data, &config).unwrap();
1307        let expected = xxh3_64(&result.encoded_bytes);
1308        assert_eq!(result.hash, Some(expected));
1309    }
1310
1311    #[test]
1312    fn pipeline_f64_hash_matches_post_hoc() {
1313        use xxhash_rust::xxh3::xxh3_64;
1314
1315        let values: Vec<f64> = (0..1_000).map(|i| (i as f64).sqrt()).collect();
1316        let config = PipelineConfig {
1317            encoding: EncodingType::None,
1318            filter: FilterType::None,
1319            compression: CompressionType::None,
1320            num_values: values.len(),
1321            byte_order: ByteOrder::Little,
1322            dtype_byte_width: 8,
1323            swap_unit_size: 8,
1324            compression_backend: CompressionBackend::default(),
1325            intra_codec_threads: 0,
1326            compute_hash: true,
1327        };
1328        let result = encode_pipeline_f64(&values, &config).unwrap();
1329        let expected = xxh3_64(&result.encoded_bytes);
1330        assert_eq!(result.hash, Some(expected));
1331    }
1332
1333    #[test]
1334    fn pipeline_hash_byte_identical_across_threads_transparent() {
1335        // Transparent codec (simple_packing): hash at threads=0 must equal
1336        // hash at threads=N for every N.  Opaque codecs are checked in the
1337        // integration suite (they allow per-run byte differences).
1338        let values: Vec<f64> = (0..50_000)
1339            .map(|i| 280.0 + (i as f64 * 0.001).sin())
1340            .collect();
1341        let data: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
1342        let params = simple_packing::compute_params(&values, 24, 0).unwrap();
1343
1344        let mut hashes = Vec::new();
1345        for threads in [0u32, 1, 2, 4] {
1346            let config = PipelineConfig {
1347                encoding: EncodingType::SimplePacking(params.clone()),
1348                filter: FilterType::None,
1349                compression: CompressionType::None,
1350                num_values: values.len(),
1351                byte_order: ByteOrder::Little,
1352                dtype_byte_width: 8,
1353                swap_unit_size: 8,
1354                compression_backend: CompressionBackend::default(),
1355                intra_codec_threads: threads,
1356                compute_hash: true,
1357            };
1358            let result = encode_pipeline(&data, &config).unwrap();
1359            hashes.push(result.hash);
1360        }
1361        assert!(
1362            hashes.windows(2).all(|w| w[0] == w[1]),
1363            "transparent simple_packing must produce byte-identical hashes across thread counts: {hashes:?}"
1364        );
1365    }
1366}