tensogram-encodings 0.21.0

Encoding pipeline and compression codec registry for the Tensogram message format
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
// (C) Copyright 2026- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.

use std::sync::Once;

use blosc2::chunk::SChunk;
use blosc2::{CParams, CompressAlgo, DParams};

use super::{CompressResult, CompressionError, Compressor};
use crate::pipeline::Blosc2Codec;

/// Default maximum size of a single SChunk chunk, in bytes (256 MiB).
///
/// Blosc2 enforces a hard per-call limit of
/// `BLOSC2_MAX_BUFFERSIZE = INT_MAX - 32 ≈ 2 GiB` on every compression call
/// (see `c-blosc2/include/blosc2.h`, `BLOSC2_MAX_BUFFERSIZE`), so any buffer
/// larger than this limit passed in a single `SChunk::append()` fails with
/// `MaxBufsizeExceeded`. To support arrays bigger than that, the compressor
/// splits its input across multiple SChunk chunks of up to
/// `DEFAULT_BLOSC2_CHUNK_BYTES` bytes each.
///
/// 256 MiB matches the upper cap used by python-blosc2's `SChunk.__init__`
/// (see `python-blosc2/src/blosc2/schunk.py`). It keeps the number of
/// chunks small on multi-GiB payloads while staying comfortably below the
/// 2 GiB per-call limit and the per-append working-set pressure that a
/// larger chunk would impose.
pub const DEFAULT_BLOSC2_CHUNK_BYTES: usize = 256 * 1024 * 1024;

/// C-Blosc2's hard per-call compression buffer limit, in bytes.
///
/// Any single `SChunk::append()` whose input exceeds this returns
/// `MaxBufsizeExceeded`.  Defined as `INT_MAX - BLOSC2_MAX_OVERHEAD`
/// in `c-blosc2/include/blosc2.h` (overhead = extended header = 32 B).
/// Crate-private — the chunking logic in `compress_with_chunk_bytes`
/// clamps the requested chunk size to this value so the multi-append
/// path is always legal even if a caller (or a future regression)
/// requests something larger.
const BLOSC2_MAX_BUFFERSIZE: usize = (i32::MAX as usize) - 32;

/// Ensure the blosc2 C library is initialized.
///
/// Workaround: the `blosc2` crate (v0.2.2) calls `blosc2_init()` inside
/// `SChunk::new()` but not `SChunk::from_buffer()`. Decode-only processes
/// that never compress will hit an uninitialized library and fail.
fn ensure_blosc2_init() {
    static INIT: Once = Once::new();
    INIT.call_once(|| {
        // SAFETY: blosc2_init() has no preconditions and is safe to call multiple times.
        unsafe { blosc2_sys::blosc2_init() };
    });
}

fn map_err(e: blosc2::Error) -> CompressionError {
    CompressionError::Blosc2(format!("{e:?}"))
}

pub(crate) fn codec_to_algo(codec: &Blosc2Codec) -> CompressAlgo {
    match codec {
        Blosc2Codec::Blosclz => CompressAlgo::Blosclz,
        Blosc2Codec::Lz4 => CompressAlgo::Lz4,
        Blosc2Codec::Lz4hc => CompressAlgo::Lz4hc,
        Blosc2Codec::Zlib => CompressAlgo::Zlib,
        Blosc2Codec::Zstd => CompressAlgo::Zstd,
    }
}

pub struct Blosc2Compressor {
    pub codec: Blosc2Codec,
    pub clevel: i32,
    pub typesize: usize,
    /// Number of threads blosc2 may use internally (forwarded to
    /// `CParams::nthreads` / `DParams::nthreads`).  `0` means
    /// "sequential" — blosc2 is told to use exactly 1 thread, giving
    /// byte-identical output to previous releases.  Values `>= 1` are
    /// passed through verbatim.
    pub nthreads: u32,
}

/// Normalise the tensogram `nthreads` semantics (0 == sequential) to
/// blosc2's which has `.nthreads(x).max(1)` as its internal clamp but
/// where an explicit 1 is the canonical single-threaded setting.
#[inline]
fn blosc2_nthreads(n: u32) -> usize {
    if n == 0 { 1 } else { n as usize }
}

/// Derive the effective per-chunk byte size from a caller's request.
///
/// - Floored to a multiple of `typesize` so non-tail SChunk appends
///   are aligned for blosc2's shuffle / bitshuffle filters.
/// - Clamped downward to [`BLOSC2_MAX_BUFFERSIZE`] so every
///   `SChunk::append()` call stays legal regardless of how the caller
///   (or a mis-validated `typesize`) arrived here.
/// - Clamped upward to at least one `typesize` so chunks always hold
///   at least one element; in the `requested < typesize` edge case
///   the effective size therefore exceeds the request.
/// - `typesize == 0` is treated as 1 for arithmetic well-definedness
///   (blosc2 itself rejects 0 in `CParams::typesize()` earlier, this
///   is defence in depth).
///
/// Extracted from `Blosc2Compressor::compress_with_chunk_bytes` so the
/// clamp properties can be unit-tested without allocating multi-GiB
/// buffers.
#[inline]
fn effective_chunk_bytes(requested: usize, typesize: usize) -> usize {
    let ts = typesize.max(1);
    let capped = requested.min(BLOSC2_MAX_BUFFERSIZE);
    ((capped / ts).max(1) * ts).min(BLOSC2_MAX_BUFFERSIZE)
}

impl Blosc2Compressor {
    fn build_cparams(&self) -> Result<CParams, CompressionError> {
        let algo = codec_to_algo(&self.codec);
        let mut cparams = CParams::default();
        cparams
            .compressor(algo)
            .clevel(self.clevel as u32)
            .typesize(self.typesize)
            .map_err(map_err)?;
        cparams.nthreads(blosc2_nthreads(self.nthreads));
        Ok(cparams)
    }

    fn build_dparams(&self) -> DParams {
        let mut dparams = DParams::default();
        dparams.nthreads(blosc2_nthreads(self.nthreads));
        dparams
    }

    /// Compress `data` into an SChunk.  The effective per-chunk byte size
    /// is `requested_chunk_bytes` floored to a multiple of `typesize` and
    /// clamped to [`BLOSC2_MAX_BUFFERSIZE`] at the top.  In the edge case
    /// `requested_chunk_bytes < typesize` it is rounded UP to exactly one
    /// `typesize`, so the effective size can exceed the requested value
    /// when the caller asks for less than a single element.  Production
    /// callers never hit that edge because [`DEFAULT_BLOSC2_CHUNK_BYTES`]
    /// (256 MiB) is always comfortably larger than blosc2's maximum
    /// typesize (255 bytes).
    ///
    /// Private test seam so that unit tests can exercise the multi-chunk
    /// path with small buffers without having to allocate
    /// [`DEFAULT_BLOSC2_CHUNK_BYTES`] worth of scratch memory.  Production
    /// callers reach this via [`Compressor::compress`], which passes
    /// [`DEFAULT_BLOSC2_CHUNK_BYTES`].
    fn compress_with_chunk_bytes(
        &self,
        data: &[u8],
        requested_chunk_bytes: usize,
    ) -> Result<CompressResult, CompressionError> {
        ensure_blosc2_init();
        let cparams = self.build_cparams()?;
        let dparams = self.build_dparams();

        let mut schunk = SChunk::new(cparams, dparams).map_err(map_err)?;

        let chunk_bytes = effective_chunk_bytes(requested_chunk_bytes, self.typesize);

        if data.len() <= chunk_bytes {
            // Fast path: inputs that fit in one chunk produce byte-identical
            // output to the pre-fix single-append codepath.
            schunk.append(data).map_err(map_err)?;
        } else {
            // Large input: split to stay below blosc2's per-call
            // `BLOSC2_MAX_BUFFERSIZE = INT_MAX - 32` limit.  The SChunk
            // format tolerates a single short trailing chunk (see
            // `blosc2_schunk_fill_special` in c-blosc2/blosc/schunk.c).
            for slice in data.chunks(chunk_bytes) {
                schunk.append(slice).map_err(map_err)?;
            }
        }

        let buf = schunk.to_buffer().map_err(map_err)?;
        Ok(CompressResult {
            data: buf.as_slice().to_vec(),
            block_offsets: None,
        })
    }
}

impl Compressor for Blosc2Compressor {
    fn compress(&self, data: &[u8]) -> Result<CompressResult, CompressionError> {
        self.compress_with_chunk_bytes(data, DEFAULT_BLOSC2_CHUNK_BYTES)
    }

    fn decompress(&self, data: &[u8], _expected_size: usize) -> Result<Vec<u8>, CompressionError> {
        ensure_blosc2_init();
        // We iterate chunks explicitly instead of going through
        // `schunk.items(0..schunk.items_num())` because `items_num()` in
        // blosc2-rs 0.2.x is computed as
        // `num_chunks * chunksize / typesize`
        // (blosc2-0.2.2/src/chunk/schunk.rs:466-468), which OVER-REPORTS
        // whenever the final chunk is shorter than `chunksize` — the
        // common case once `compress()` splits a large input across
        // multiple chunks.  Asking `items(0..overcount)` would request
        // phantom items past the real end and fail in the C layer.
        //
        // Each `Chunk::decompress()` call uses the chunk's own recorded
        // `nbytes` (Chunk::nbytes in blosc2-0.2.2/src/chunk/chunk.rs:141),
        // so the short-tail case decodes correctly.
        //
        // Blosc2's safe `SChunk::from_buffer` reads dparams from the
        // buffer itself; there is no cheap runtime override through the
        // high-level API (compare `Chunk::set_dparams`, which works at
        // the single-chunk level).  We therefore run decompress
        // sequentially and rely on the compress path for axis-B wins.
        // Compress is the expensive direction; blosc2 decompress is
        // largely memory-bound anyway.
        //
        // The `_expected_size` parameter is deliberately ignored: it is
        // derived from caller-supplied tensor metadata (via the pipeline's
        // `estimate_decompressed_size`), so trusting it for an infallible
        // pre-allocation would turn a malformed `num_values` field into a
        // process abort.  Instead we grow `out` from empty and fall back
        // to `try_reserve` per chunk, where the size comes from the
        // already-validated blosc2 frame trailer; an unreasonably large
        // per-chunk value surfaces cleanly as a `CompressionError`
        // instead of aborting the process.
        let mut schunk = SChunk::from_buffer(data.into()).map_err(map_err)?;
        let num_chunks = schunk.num_chunks();
        if num_chunks == 0 {
            return Ok(Vec::new());
        }

        let mut out: Vec<u8> = Vec::new();
        for idx in 0..num_chunks {
            let chunk = schunk.get_chunk(idx).map_err(map_err)?;
            let bytes = chunk.decompress().map_err(map_err)?;
            out.try_reserve(bytes.len()).map_err(|e| {
                CompressionError::Blosc2(format!(
                    "failed to reserve {} bytes for decompressed chunk {idx}: {e}",
                    bytes.len(),
                ))
            })?;
            out.extend_from_slice(&bytes);
        }
        Ok(out)
    }

    fn decompress_range(
        &self,
        data: &[u8],
        _block_offsets: &[u64],
        byte_pos: usize,
        byte_size: usize,
    ) -> Result<Vec<u8>, CompressionError> {
        ensure_blosc2_init();
        // See note in `decompress()` — decompress path is single-threaded.
        let schunk = SChunk::from_buffer(data.into()).map_err(map_err)?;
        let ts = schunk.typesize();
        if ts == 0 {
            return Err(CompressionError::Blosc2("typesize is 0".to_string()));
        }

        // Convert byte range to item range with checked arithmetic —
        // `byte_pos + byte_size` can wrap on hostile input.
        let item_start = byte_pos / ts;
        let byte_end = byte_pos
            .checked_add(byte_size)
            .ok_or_else(|| CompressionError::Blosc2("byte range overflow".to_string()))?;
        let item_end = byte_end.div_ceil(ts);

        let items = schunk.items(item_start..item_end).map_err(map_err)?;

        // Trim to exact byte range within the item-aligned result
        let offset_in_items = byte_pos % ts;
        let end = offset_in_items
            .checked_add(byte_size)
            .ok_or_else(|| CompressionError::Blosc2("range overflow".to_string()))?;
        if end > items.len() {
            return Err(CompressionError::Blosc2(format!(
                "range exceeds decompressed data: need {end} bytes, got {}",
                items.len()
            )));
        }
        let slice = &items[offset_in_items..end];
        let mut out: Vec<u8> = Vec::new();
        out.try_reserve_exact(slice.len()).map_err(|e| {
            CompressionError::Blosc2(format!(
                "failed to reserve {} bytes for blosc2 range output: {e}",
                slice.len()
            ))
        })?;
        out.extend_from_slice(slice);
        Ok(out)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn small_chunk_compressor() -> Blosc2Compressor {
        Blosc2Compressor {
            codec: Blosc2Codec::Lz4,
            clevel: 5,
            typesize: 1,
            nthreads: 0,
        }
    }

    #[test]
    fn blosc2_round_trip() {
        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Lz4,
            clevel: 5,
            typesize: 1,
            nthreads: 0,
        };

        let result = compressor.compress(&data).unwrap();
        let decompressed = compressor.decompress(&result.data, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    #[test]
    fn blosc2_round_trip_4byte() {
        let data: Vec<u8> = (0..4000).flat_map(|i: u32| i.to_ne_bytes()).collect();
        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Blosclz,
            clevel: 5,
            typesize: 4,
            nthreads: 0,
        };

        let result = compressor.compress(&data).unwrap();
        let decompressed = compressor.decompress(&result.data, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    #[test]
    fn blosc2_range_decode() {
        let data: Vec<u8> = (0..8192).map(|i| (i % 256) as u8).collect();
        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Lz4,
            clevel: 5,
            typesize: 1,
            nthreads: 0,
        };

        let result = compressor.compress(&data).unwrap();
        let partial = compressor
            .decompress_range(&result.data, &[], 200, 500)
            .unwrap();
        assert_eq!(partial.len(), 500);
        assert_eq!(&partial[..], &data[200..700]);
    }

    #[test]
    fn blosc2_round_trip_zstd() {
        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Zstd,
            clevel: 3,
            typesize: 1,
            nthreads: 0,
        };
        let result = compressor.compress(&data).unwrap();
        let decompressed = compressor.decompress(&result.data, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    #[test]
    fn blosc2_round_trip_zlib() {
        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Zlib,
            clevel: 5,
            typesize: 1,
            nthreads: 0,
        };
        let result = compressor.compress(&data).unwrap();
        let decompressed = compressor.decompress(&result.data, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    /// nthreads > 1 must produce losslessly-decodable output that
    /// round-trips to the original bytes.
    ///
    /// Determinism contract for opaque codecs (blosc2, zstd with
    /// `nbWorkers > 0`): compressed bytes MAY differ between parallel
    /// and sequential runs because the codec reorders blocks in the
    /// offset table by worker completion order.  What MUST hold is
    /// that every parallel variant round-trips losslessly and that
    /// cross-thread-count decode works (encode with nthreads=N,
    /// decompress with any M).
    ///
    /// This is the honest contract — callers wanting byte-identical
    /// output across thread counts should use `threads = 0`.  See
    /// docs/src/guide/multi-threaded-pipeline.md for the full policy.
    #[test]
    fn blosc2_nthreads_round_trip_lossless() {
        let data: Vec<u8> = (0..256 * 1024).map(|i| ((i * 31) % 256) as u8).collect();

        let seq_compressor = Blosc2Compressor {
            codec: Blosc2Codec::Zstd,
            clevel: 3,
            typesize: 4,
            nthreads: 0,
        };
        let seq_bytes = seq_compressor.compress(&data).unwrap().data;
        let seq_rt = seq_compressor.decompress(&seq_bytes, data.len()).unwrap();
        assert_eq!(seq_rt, data);

        for n in [1u32, 2, 4, 8] {
            let par_compressor = Blosc2Compressor {
                codec: Blosc2Codec::Zstd,
                clevel: 3,
                typesize: 4,
                nthreads: n,
            };
            let par_bytes = par_compressor.compress(&data).unwrap().data;
            // Decoded values must always round-trip exactly.
            let par_rt = par_compressor.decompress(&par_bytes, data.len()).unwrap();
            assert_eq!(
                par_rt, data,
                "blosc2 nthreads={n} round-trip must match original"
            );
            // Cross-thread-count decode: encode with N threads, decode
            // with 0 threads (single-threaded) — must still work.
            let cross_rt = seq_compressor.decompress(&par_bytes, data.len()).unwrap();
            assert_eq!(cross_rt, data);
        }
    }

    /// At `nthreads=0` (default), blosc2 compress output is stable
    /// across runs and matches the pre-0.13.0 byte layout.  Together
    /// with the cross-language golden tests, this locks in the
    /// "sequential is byte-identical" guarantee.
    #[test]
    fn blosc2_nthreads_zero_is_deterministic_across_runs() {
        let data: Vec<u8> = (0..64 * 1024).map(|i| ((i * 17) % 256) as u8).collect();
        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Zstd,
            clevel: 3,
            typesize: 4,
            nthreads: 0,
        };
        let a = compressor.compress(&data).unwrap().data;
        let b = compressor.compress(&data).unwrap().data;
        assert_eq!(a, b, "blosc2 nthreads=0 must be deterministic");
    }

    /// Regression guard for the decompress path.
    ///
    /// The real bug (missing blosc2_init in decode-only processes) cannot be
    /// fully reproduced in a unit test because blosc2_init is global state —
    /// once called by compress(), it stays initialized. This test still guards
    /// against regressions in the decompress wiring itself.
    #[test]
    fn blosc2_decompress_path() {
        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Lz4,
            clevel: 5,
            typesize: 1,
            nthreads: 0,
        };
        let compressed = compressor.compress(&data).unwrap().data;

        let decoder = Blosc2Compressor {
            codec: Blosc2Codec::Lz4,
            clevel: 0,
            typesize: 1,
            nthreads: 0,
        };
        let decompressed = decoder.decompress(&compressed, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    /// Regression test for issue #68: a buffer larger than `chunk_bytes`
    /// with a non-multiple size splits into N full chunks plus one short
    /// trailing chunk. Both encode and decode must handle the short tail.
    ///
    /// This would fail with the pre-fix `decompress()` that used
    /// `schunk.items(0..schunk.items_num())`, because `items_num()` in
    /// blosc2-0.2.2 over-reports when the final chunk is short — see
    /// the note in `Blosc2Compressor::decompress`.
    #[test]
    fn blosc2_multi_chunk_round_trip_short_tail() {
        let chunk_bytes = 4096;
        let len = 3 * chunk_bytes + 777;
        let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();

        let compressor = small_chunk_compressor();
        let compressed = compressor
            .compress_with_chunk_bytes(&data, chunk_bytes)
            .unwrap()
            .data;

        let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
        assert_eq!(decompressed.len(), data.len());
        assert_eq!(decompressed, data);

        let schunk = blosc2::chunk::SChunk::from_buffer(compressed.as_slice().into()).unwrap();
        assert!(
            schunk.num_chunks() >= 2,
            "multi-chunk path not exercised: num_chunks = {}",
            schunk.num_chunks()
        );
    }

    /// Multi-chunk path with input length that is an exact multiple of
    /// `chunk_bytes` (no short trailing chunk).  Guards that the equal-
    /// size case stays correct alongside the short-tail case.
    #[test]
    fn blosc2_multi_chunk_round_trip_exact_multiple() {
        let chunk_bytes = 4096;
        let len = 4 * chunk_bytes;
        let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();

        let compressor = small_chunk_compressor();
        let compressed = compressor
            .compress_with_chunk_bytes(&data, chunk_bytes)
            .unwrap()
            .data;
        let decompressed = compressor.decompress(&compressed, data.len()).unwrap();

        assert_eq!(decompressed, data);
    }

    /// Partial-decode across a chunk boundary.  The range starts in one
    /// SChunk chunk and ends in the next; the C `get_slice_buffer` path
    /// used by `decompress_range()` must stitch the two halves together.
    #[test]
    fn blosc2_range_decode_spans_chunk_boundary() {
        let chunk_bytes = 4096;
        let len = 3 * chunk_bytes + 500;
        let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();

        let compressor = small_chunk_compressor();
        let compressed = compressor
            .compress_with_chunk_bytes(&data, chunk_bytes)
            .unwrap()
            .data;

        let range_start = chunk_bytes - 200;
        let range_len = 500;
        let partial = compressor
            .decompress_range(&compressed, &[], range_start, range_len)
            .unwrap();

        assert_eq!(partial.len(), range_len);
        assert_eq!(&partial[..], &data[range_start..range_start + range_len]);
    }

    /// A small input with the production default `chunk_bytes` must stay
    /// in the single-append fast path and produce a single SChunk chunk.
    /// This preserves byte-level compatibility with the pre-fix output
    /// for anything that previously worked.
    #[test]
    fn blosc2_small_input_is_single_chunk() {
        let data: Vec<u8> = (0..8192).map(|i| (i % 251) as u8).collect();
        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Lz4,
            clevel: 5,
            typesize: 1,
            nthreads: 0,
        };
        let compressed = compressor.compress(&data).unwrap().data;

        let schunk = blosc2::chunk::SChunk::from_buffer(compressed.as_slice().into()).unwrap();
        assert_eq!(
            schunk.num_chunks(),
            1,
            "small input should stay on single-append fast path"
        );

        let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    /// Input that exactly equals `chunk_bytes` must still take the
    /// single-append path — the fast-path predicate is `<=`, not `<`.
    /// Guards against a common off-by-one that would force a trivial
    /// multi-chunk encode for any N-aligned buffer.
    #[test]
    fn blosc2_input_equal_to_chunk_bytes_stays_single_chunk() {
        let chunk_bytes = 4096;
        let data: Vec<u8> = (0..chunk_bytes).map(|i| (i % 251) as u8).collect();

        let compressor = small_chunk_compressor();
        let compressed = compressor
            .compress_with_chunk_bytes(&data, chunk_bytes)
            .unwrap()
            .data;

        let schunk = blosc2::chunk::SChunk::from_buffer(compressed.as_slice().into()).unwrap();
        assert_eq!(schunk.num_chunks(), 1);

        let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    /// Typesize-aligned chunking: with typesize = 4 and chunk_bytes
    /// explicitly set smaller than the input, every internal append
    /// except the possible trailing chunk must receive a buffer whose
    /// length is a multiple of typesize.  This is asserted indirectly:
    /// blosc2's shuffle filter corrupts data if typesize alignment is
    /// wrong on non-tail chunks, so round-trip success is the guard.
    #[test]
    fn blosc2_multi_chunk_typesize_alignment() {
        let chunk_bytes = 4096;
        let num_values: usize = 2 * chunk_bytes + 37;
        let data: Vec<u8> = (0..num_values)
            .flat_map(|i: usize| (i as u32).to_ne_bytes())
            .collect();
        assert_eq!(data.len() % 4, 0);

        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Blosclz,
            clevel: 5,
            typesize: 4,
            nthreads: 0,
        };
        let compressed = compressor
            .compress_with_chunk_bytes(&data, chunk_bytes)
            .unwrap()
            .data;
        let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    /// Input whose total length is NOT a multiple of `typesize`.  This
    /// is reachable in production from `simple_packing` with a
    /// non-byte-aligned `bits_per_value` (packed length =
    /// `ceil(num_values * bpv / 8)`), and is explicitly tolerated by
    /// c-blosc2's shuffle/bitshuffle filters, which pass leftover
    /// trailing bytes through unchanged.  Round-trip must still be
    /// lossless.
    #[test]
    fn blosc2_multi_chunk_non_typesize_aligned_tail() {
        let chunk_bytes = 4096;
        let len = 2 * chunk_bytes + 13;
        assert_ne!(len % 4, 0, "test setup: length must not be 4-aligned");
        let data: Vec<u8> = (0..len).map(|i| (i % 251) as u8).collect();

        let compressor = Blosc2Compressor {
            codec: Blosc2Codec::Blosclz,
            clevel: 5,
            typesize: 4,
            nthreads: 0,
        };
        let compressed = compressor
            .compress_with_chunk_bytes(&data, chunk_bytes)
            .unwrap()
            .data;
        let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    /// Accepting an oversized chunk-size request without error.  This
    /// is a shallow integration test — the small input takes the
    /// single-append fast path, so it only proves `compress()` does
    /// not crash or error when the caller asks for more than 2 GiB.
    /// The actual clamp-value property is verified directly by the
    /// unit tests on `effective_chunk_bytes` below, without requiring
    /// a > 2 GiB allocation at test time.
    #[test]
    fn blosc2_oversized_chunk_bytes_request_round_trips() {
        let data: Vec<u8> = (0..8192).map(|i| (i % 251) as u8).collect();
        let compressor = small_chunk_compressor();
        let compressed = compressor
            .compress_with_chunk_bytes(&data, BLOSC2_MAX_BUFFERSIZE + (1 << 30))
            .unwrap()
            .data;
        let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
        assert_eq!(decompressed, data);
    }

    /// Direct unit test for the clamp: any request above
    /// `BLOSC2_MAX_BUFFERSIZE` must be capped below it so every
    /// `SChunk::append()` call stays legal.  Regression guard against
    /// issue #68 re-emerging via a mis-configured request.
    #[test]
    fn effective_chunk_bytes_never_exceeds_blosc2_max() {
        for ts in [1usize, 2, 4, 8, 16, 32, 64, 128, 255] {
            for &req in &[
                0usize,
                1,
                BLOSC2_MAX_BUFFERSIZE / 2,
                BLOSC2_MAX_BUFFERSIZE - 1,
                BLOSC2_MAX_BUFFERSIZE,
                BLOSC2_MAX_BUFFERSIZE + 1,
                BLOSC2_MAX_BUFFERSIZE + (1 << 30),
                usize::MAX,
            ] {
                let got = effective_chunk_bytes(req, ts);
                assert!(
                    got <= BLOSC2_MAX_BUFFERSIZE,
                    "effective_chunk_bytes({req}, {ts}) = {got} exceeds BLOSC2_MAX_BUFFERSIZE = {BLOSC2_MAX_BUFFERSIZE}",
                );
            }
        }
    }

    /// Every result is a multiple of `typesize` (or `1` if `typesize`
    /// is `0`), guaranteeing non-tail SChunk appends are aligned for
    /// blosc2's shuffle / bitshuffle filters.
    #[test]
    fn effective_chunk_bytes_is_typesize_aligned() {
        for ts in [1usize, 2, 3, 4, 7, 8, 16, 32, 64, 128, 255] {
            for &req in &[0usize, 1, 7, 8, 9, 4095, 4096, 4097, 1 << 20, 1 << 30] {
                let got = effective_chunk_bytes(req, ts);
                assert_eq!(
                    got % ts,
                    0,
                    "effective_chunk_bytes({req}, {ts}) = {got} not a multiple of {ts}",
                );
                assert!(
                    got >= ts,
                    "effective_chunk_bytes({req}, {ts}) = {got} below typesize",
                );
            }
        }
    }

    /// When `requested < typesize` the helper rounds UP to exactly one
    /// `typesize` so chunks always hold at least one whole element.
    /// This is the edge case called out in
    /// `compress_with_chunk_bytes`'s docstring.
    #[test]
    fn effective_chunk_bytes_rounds_up_below_typesize() {
        assert_eq!(effective_chunk_bytes(0, 8), 8);
        assert_eq!(effective_chunk_bytes(1, 8), 8);
        assert_eq!(effective_chunk_bytes(7, 8), 8);
        assert_eq!(effective_chunk_bytes(8, 8), 8);
    }

    /// `typesize == 0` is rejected earlier by `CParams::typesize()`
    /// in blosc2, but the helper defensively treats it as `1` so the
    /// arithmetic is well-defined even if that validation is bypassed
    /// in some future code path.
    #[test]
    fn effective_chunk_bytes_zero_typesize_is_defensive() {
        assert_eq!(effective_chunk_bytes(0, 0), 1);
        assert_eq!(effective_chunk_bytes(4096, 0), 4096);
        assert_eq!(
            effective_chunk_bytes(BLOSC2_MAX_BUFFERSIZE + 1, 0),
            BLOSC2_MAX_BUFFERSIZE
        );
    }
}