tensogram 0.22.0

Fast binary N-tensor message format for scientific data — encode, decode, file I/O, streaming
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
// (C) Copyright 2026- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.

//! Asynchronous streaming encoder — sibling of
//! [`crate::streaming::StreamingEncoder`] that writes to any
//! `tokio::io::AsyncWrite + Unpin` sink.
//!
//! Driver: the HPC producer scenario —
//! a producer job emits forecast steps as they are produced, writing to
//! a shared filesystem or object-store sink without blocking the
//! caller's compute thread.
//!
//! ## Wire-format compatibility
//!
//! The async encoder produces **byte-identical output** to the sync
//! [`crate::streaming::StreamingEncoder`] for the same logical sequence
//! of writes.  Frame layout, hash semantics, and footer-frame ordering
//! all share helpers in [`crate::streaming`].  The only difference is
//! the I/O surface (`AsyncWrite` vs `Write`).
//!
//! ## Threading model
//!
//! Like the sync encoder, the async encoder is single-task-owned.  The
//! `AsyncWrite` trait is naturally serial; concurrent
//! [`AsyncStreamingEncoder::write_object`] calls against the same
//! encoder are not permitted.  Hashing remains in the calling task
//! and never crosses thread boundaries — the transparent-codec
//! byte-identical-across-threads invariant is preserved.

use std::collections::BTreeMap;

use tokio::io::{AsyncSeekExt, AsyncWrite, AsyncWriteExt};

use crate::encode::{
    EncodeOptions, MaskMethod, build_pipeline_config, populate_base_entries,
    populate_reserved_provenance, validate_no_szip_offsets_for_non_szip, validate_object,
    validate_szip_block_offsets,
};
use crate::error::{Result, TensogramError};
use crate::framing::{EncodedObject, encode_data_object_frame};
use crate::metadata::{self, RESERVED_KEY};
use crate::streaming::{build_frame, build_preamble_and_header_bytes, padding_for};
use crate::substitute_and_mask;
use crate::types::{DataObjectDescriptor, GlobalMetadata, HashFrame, IndexFrame};
use crate::wire::{FrameType, POSTAMBLE_SIZE, Postamble};
use tensogram_encodings::pipeline;

/// Asynchronous streaming encoder writing Tensogram frames progressively
/// to any [`AsyncWrite`] sink.
///
/// See the module docs for the full design rationale.
///
/// # Example
///
/// ```no_run
/// # async fn run() -> tensogram::Result<()> {
/// use tokio::fs::File;
/// use tensogram::streaming_async::AsyncStreamingEncoder;
/// use tensogram::{GlobalMetadata, EncodeOptions};
///
/// let file = File::create("output.tgm").await?;
/// let meta = GlobalMetadata::default();
/// let mut enc = AsyncStreamingEncoder::new(file, &meta, &EncodeOptions::default()).await?;
/// // enc.write_object(&desc, &data).await?;
/// // enc.finish().await?;
/// # Ok(()) }
/// ```
pub struct AsyncStreamingEncoder<W: AsyncWrite + Unpin> {
    writer: W,
    object_offsets: Vec<u64>,
    object_lengths: Vec<u64>,
    hash_entries: Vec<Option<(String, String)>>,
    completed_objects: Vec<EncodedObject>,
    bytes_written: u64,
    hashing: bool,
    emit_footer_hash_frame: bool,
    global_meta: GlobalMetadata,
    pending_preceder: bool,
    preceder_payloads: Vec<Option<BTreeMap<String, ciborium::Value>>>,
    intra_codec_threads: u32,
    parallel_threshold_bytes: Option<usize>,
    allow_nan: bool,
    allow_inf: bool,
    nan_mask_method: MaskMethod,
    pos_inf_mask_method: MaskMethod,
    neg_inf_mask_method: MaskMethod,
    small_mask_threshold_bytes: usize,
}

impl<W: AsyncWrite + Unpin> AsyncStreamingEncoder<W> {
    /// Begin a new streaming message.
    ///
    /// Writes the preamble (with `total_length = 0` for streaming mode)
    /// and a `HeaderMetadata` frame to the underlying sink.  Mirrors
    /// [`crate::streaming::StreamingEncoder::new`] — the byte-level
    /// output is identical.
    pub async fn new(
        mut writer: W,
        global_meta: &GlobalMetadata,
        options: &EncodeOptions,
    ) -> Result<Self> {
        let resolved = options.aggregate_hash.resolved_streaming()?;
        let emit_footer_hash = options.hashing && resolved.emits_footer();

        let (preamble_bytes, header_frame_bytes) =
            build_preamble_and_header_bytes(global_meta, options.hashing, emit_footer_hash)?;

        let mut bytes_written = 0u64;
        writer.write_all(&preamble_bytes).await?;
        bytes_written += preamble_bytes.len() as u64;
        writer.write_all(&header_frame_bytes).await?;
        bytes_written += header_frame_bytes.len() as u64;
        write_padding(&mut writer, &mut bytes_written).await?;

        let intra_codec_threads = crate::parallel::resolve_budget(options.threads)?;

        Ok(Self {
            writer,
            object_offsets: Vec::new(),
            object_lengths: Vec::new(),
            hash_entries: Vec::new(),
            completed_objects: Vec::new(),
            bytes_written,
            hashing: options.hashing,
            emit_footer_hash_frame: emit_footer_hash,
            global_meta: global_meta.clone(),
            pending_preceder: false,
            preceder_payloads: Vec::new(),
            intra_codec_threads,
            parallel_threshold_bytes: options.parallel_threshold_bytes,
            allow_nan: options.allow_nan,
            allow_inf: options.allow_inf,
            nan_mask_method: options.nan_mask_method.clone(),
            pos_inf_mask_method: options.pos_inf_mask_method.clone(),
            neg_inf_mask_method: options.neg_inf_mask_method.clone(),
            small_mask_threshold_bytes: options.small_mask_threshold_bytes,
        })
    }

    /// Write a `PrecederMetadata` frame for the next data object.
    ///
    /// Mirror of [`crate::streaming::StreamingEncoder::write_preceder`].
    pub async fn write_preceder(
        &mut self,
        metadata: BTreeMap<String, ciborium::Value>,
    ) -> Result<()> {
        if self.pending_preceder {
            return Err(TensogramError::Framing(
                "write_preceder called twice without an intervening write_object/write_object_pre_encoded".to_string(),
            ));
        }
        if metadata.contains_key(RESERVED_KEY) {
            return Err(TensogramError::Metadata(format!(
                "client code must not write '{RESERVED_KEY}' in preceder metadata; \
                     this field is populated by the library"
            )));
        }

        let preceder_meta = GlobalMetadata {
            base: vec![metadata.clone()],
            ..Default::default()
        };
        let cbor = metadata::global_metadata_to_cbor(&preceder_meta)?;
        let frame_bytes = build_frame(FrameType::PrecederMetadata, 1, 0, &cbor, self.hashing);
        self.writer.write_all(&frame_bytes).await?;
        self.bytes_written += frame_bytes.len() as u64;

        write_padding(&mut self.writer, &mut self.bytes_written).await?;

        self.pending_preceder = true;
        self.preceder_payloads.push(Some(metadata));
        Ok(())
    }

    /// Encode and write a single data object frame.
    ///
    /// Builds the frame in memory using the same helpers as the sync
    /// encoder, then issues a single async `write_all`.  The produced
    /// bytes are identical to those of
    /// [`crate::streaming::StreamingEncoder::write_object`] for the
    /// same inputs.
    ///
    /// Buffering one frame in memory is acceptable for the v1 producer
    /// scenario where individual frames are bounded; true chunked
    /// AsyncWrite streaming is a follow-up optimisation.
    pub async fn write_object(&mut self, desc: &DataObjectDescriptor, data: &[u8]) -> Result<()> {
        validate_object(desc, data.len())?;
        let num_elements = desc.num_elements()?;

        let parallel = crate::parallel::should_parallelise(
            self.intra_codec_threads,
            data.len(),
            self.parallel_threshold_bytes,
        );

        let (pipeline_input, mask_set) = substitute_and_mask::substitute_and_mask(
            data,
            desc.dtype,
            desc.byte_order,
            self.allow_nan,
            self.allow_inf,
            parallel,
        )?;
        let intra = if parallel {
            self.intra_codec_threads
        } else {
            0
        };

        let mut final_desc = desc.clone();
        crate::encode::resolve_simple_packing_params(&mut final_desc, data)?;

        let config = crate::encode::build_pipeline_config_with_backend(
            &final_desc,
            num_elements,
            desc.dtype,
            tensogram_encodings::pipeline::CompressionBackend::default(),
            intra,
        )?;

        let result =
            crate::parallel::run_maybe_pooled(self.intra_codec_threads, parallel, intra, || {
                pipeline::encode_pipeline(pipeline_input.as_ref(), &config)
            })
            .map_err(|e| TensogramError::Encoding(e.to_string()))?;

        if let Some(offsets) = &result.block_offsets {
            final_desc.params.insert(
                "szip_block_offsets".to_string(),
                ciborium::Value::Array(
                    offsets
                        .iter()
                        .map(|&o| ciborium::Value::Integer(o.into()))
                        .collect(),
                ),
            );
        }

        let (payload_region, masks_metadata) = crate::encode::compose_payload_region(
            result.encoded_bytes,
            mask_set,
            &self.nan_mask_method,
            &self.pos_inf_mask_method,
            &self.neg_inf_mask_method,
            self.small_mask_threshold_bytes,
        )?;
        if let Some(m) = masks_metadata {
            final_desc.masks = Some(m);
        }

        self.write_object_inner(final_desc, &payload_region).await
    }

    /// Write a pre-encoded data object frame.
    ///
    /// Mirror of
    /// [`crate::streaming::StreamingEncoder::write_object_pre_encoded`].
    pub async fn write_object_pre_encoded(
        &mut self,
        descriptor: &DataObjectDescriptor,
        pre_encoded_bytes: &[u8],
    ) -> Result<()> {
        validate_object(descriptor, pre_encoded_bytes.len())?;
        let num_elements = descriptor.num_elements()?;
        build_pipeline_config(descriptor, num_elements, descriptor.dtype)?;
        validate_no_szip_offsets_for_non_szip(descriptor)?;
        if descriptor.compression == "szip" && descriptor.params.contains_key("szip_block_offsets")
        {
            validate_szip_block_offsets(&descriptor.params, pre_encoded_bytes.len())?;
        }
        self.write_object_inner(descriptor.clone(), pre_encoded_bytes)
            .await
    }

    async fn write_object_inner(
        &mut self,
        final_desc: DataObjectDescriptor,
        encoded_bytes: &[u8],
    ) -> Result<()> {
        let start_offset = self.bytes_written;

        // Build the full data-object frame in memory using the shared
        // helper, then async-write the buffer in one shot.  Bytes are
        // identical to the sync encoder's chunk-streamed path.
        // `cbor_before = false` matches the sync streaming layout
        // (CBOR_AFTER_PAYLOAD).
        let frame_bytes =
            encode_data_object_frame(&final_desc, encoded_bytes, false, self.hashing)?;
        let frame_len = frame_bytes.len() as u64;

        // Extract the inline hash slot (8 bytes at offset frame_len-12)
        // when hashing is on, so we can populate the FooterHash frame
        // later without re-hashing the body.
        let inline_digest = if self.hashing {
            let end = frame_bytes.len();
            let mut buf = [0u8; 8];
            buf.copy_from_slice(&frame_bytes[end - 12..end - 4]);
            Some(u64::from_be_bytes(buf))
        } else {
            None
        };

        self.writer.write_all(&frame_bytes).await?;
        self.bytes_written += frame_len;

        let hash_entry = inline_digest.map(|d| {
            (
                crate::hash::HASH_ALGORITHM_NAME.to_string(),
                crate::hash::format_xxh3_digest(d),
            )
        });

        self.object_offsets.push(start_offset);
        self.object_lengths.push(frame_len);
        self.hash_entries.push(hash_entry);
        self.completed_objects.push(EncodedObject {
            descriptor: final_desc,
            encoded_payload: Vec::new(),
        });

        if self.pending_preceder {
            self.pending_preceder = false;
        } else {
            self.preceder_payloads.push(None);
        }

        write_padding(&mut self.writer, &mut self.bytes_written).await?;
        Ok(())
    }

    /// Finalise the streaming message.  Writes footer frames + postamble.
    pub async fn finish(mut self) -> Result<W> {
        self.write_footer_frames_and_postamble().await?;
        self.writer.flush().await?;
        Ok(self.writer)
    }

    async fn write_footer_frames_and_postamble(&mut self) -> Result<()> {
        if self.pending_preceder {
            return Err(TensogramError::Framing(
                "dangling PrecederMetadata: finish called without a following write_object/write_object_pre_encoded"
                    .to_string(),
            ));
        }

        let footer_start = self.bytes_written;

        // Footer metadata frame (preceder payloads merged in).
        {
            let mut enriched_meta = self.global_meta.clone();
            populate_base_entries(&mut enriched_meta.base, &self.completed_objects);
            populate_reserved_provenance(&mut enriched_meta.reserved);
            if self.preceder_payloads.len() != self.completed_objects.len() {
                return Err(TensogramError::Framing(format!(
                    "internal: preceder_payloads ({}) out of sync with completed_objects ({})",
                    self.preceder_payloads.len(),
                    self.completed_objects.len()
                )));
            }
            for (i, prec) in self.preceder_payloads.iter().enumerate() {
                if let Some(prec_map) = prec
                    && i < enriched_meta.base.len()
                {
                    for (k, v) in prec_map {
                        enriched_meta.base[i].insert(k.clone(), v.clone());
                    }
                }
            }
            let meta_cbor = metadata::global_metadata_to_cbor(&enriched_meta)?;
            let frame_bytes =
                build_frame(FrameType::FooterMetadata, 1, 0, &meta_cbor, self.hashing);
            self.writer.write_all(&frame_bytes).await?;
            self.bytes_written += frame_bytes.len() as u64;
            write_padding(&mut self.writer, &mut self.bytes_written).await?;
        }

        // Footer hash frame (if opted in).
        if self.emit_footer_hash_frame && self.hash_entries.iter().any(|e| e.is_some()) {
            let algorithm = if self.hashing {
                crate::hash::HASH_ALGORITHM_NAME.to_string()
            } else {
                String::new()
            };
            let hashes: Vec<String> = self
                .hash_entries
                .iter()
                .map(|e| e.as_ref().map(|(_, v)| v.clone()).unwrap_or_default())
                .collect();
            let hash_frame = HashFrame { algorithm, hashes };
            let hash_cbor = metadata::hash_frame_to_cbor(&hash_frame)?;
            let frame_bytes = build_frame(FrameType::FooterHash, 1, 0, &hash_cbor, self.hashing);
            self.writer.write_all(&frame_bytes).await?;
            self.bytes_written += frame_bytes.len() as u64;
            write_padding(&mut self.writer, &mut self.bytes_written).await?;
        }

        // Footer index frame.
        let index = IndexFrame {
            offsets: std::mem::take(&mut self.object_offsets),
            lengths: std::mem::take(&mut self.object_lengths),
        };
        let index_cbor = metadata::index_to_cbor(&index)?;
        let frame_bytes = build_frame(FrameType::FooterIndex, 1, 0, &index_cbor, self.hashing);
        self.writer.write_all(&frame_bytes).await?;
        self.bytes_written += frame_bytes.len() as u64;
        write_padding(&mut self.writer, &mut self.bytes_written).await?;

        // Postamble (streaming mode: total_length = 0).
        let postamble = Postamble {
            first_footer_offset: footer_start,
            total_length: 0,
        };
        let mut postamble_bytes = Vec::with_capacity(POSTAMBLE_SIZE);
        postamble.write_to(&mut postamble_bytes);
        self.writer.write_all(&postamble_bytes).await?;
        self.bytes_written += postamble_bytes.len() as u64;

        Ok(())
    }

    /// Number of data objects written so far.
    pub fn object_count(&self) -> usize {
        self.object_offsets.len()
    }

    /// Total bytes written so far.
    pub fn bytes_written(&self) -> u64 {
        self.bytes_written
    }
}

// ── Seekable-sink specialisation ─────────────────────────────────────────────

impl<W: AsyncWrite + tokio::io::AsyncSeek + Unpin> AsyncStreamingEncoder<W> {
    /// Finalise and back-fill the `total_length` field in both the
    /// preamble and postamble.
    ///
    /// Mirror of
    /// [`crate::streaming::StreamingEncoder::finish_with_backfill`]
    /// for seekable async sinks (e.g. `tokio::fs::File`).  Object-store
    /// multipart sinks cannot seek; for those use [`Self::finish`]
    /// instead and accept `total_length = 0` (forward-scan only).
    pub async fn finish_with_backfill(mut self) -> Result<W> {
        use std::io::SeekFrom;
        self.write_footer_frames_and_postamble().await?;

        let end_pos = self.writer.stream_position().await?;
        let total_length = end_pos;

        self.writer.seek(SeekFrom::Start(16)).await?;
        self.writer.write_all(&total_length.to_be_bytes()).await?;

        self.writer.seek(SeekFrom::Start(end_pos - 16)).await?;
        self.writer.write_all(&total_length.to_be_bytes()).await?;

        self.writer.seek(SeekFrom::Start(end_pos)).await?;
        self.writer.flush().await?;
        Ok(self.writer)
    }
}

// ── Async padding helper ────────────────────────────────────────────────────

const ZERO_PAD: [u8; 7] = [0; 7];

async fn write_padding<W: AsyncWrite + Unpin>(
    writer: &mut W,
    bytes_written: &mut u64,
) -> std::io::Result<()> {
    let pad = padding_for(*bytes_written);
    if pad > 0 {
        writer.write_all(&ZERO_PAD[..pad]).await?;
        *bytes_written += pad as u64;
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::decode::{DecodeOptions, decode};
    use crate::dtype::Dtype;
    use crate::types::ByteOrder;

    fn make_descriptor(shape: Vec<u64>, dtype: Dtype) -> DataObjectDescriptor {
        let ndim = shape.len() as u64;
        let mut strides = vec![0u64; shape.len()];
        if !shape.is_empty() {
            strides[shape.len() - 1] = 1;
            for i in (0..shape.len() - 1).rev() {
                strides[i] = strides[i + 1] * shape[i + 1];
            }
        }
        DataObjectDescriptor {
            obj_type: "ntensor".to_string(),
            ndim,
            shape,
            strides,
            dtype,
            byte_order: ByteOrder::native(),
            encoding: "none".to_string(),
            filter: "none".to_string(),
            compression: "none".to_string(),
            params: BTreeMap::new(),
            masks: None,
        }
    }

    fn finite_f32_bytes(n: usize) -> Vec<u8> {
        (0..n)
            .flat_map(|i| (i as f32 * 0.5).to_ne_bytes())
            .collect()
    }

    /// `bytes_written()` getter must reflect progress as frames are
    /// emitted (covers the accessor).
    #[tokio::test]
    async fn bytes_written_tracks_progress() {
        let meta = GlobalMetadata::default();
        let desc = make_descriptor(vec![4], Dtype::Float32);
        let data = vec![0u8; 16];

        let mut enc = AsyncStreamingEncoder::new(Vec::new(), &meta, &EncodeOptions::default())
            .await
            .unwrap();
        let after_header = enc.bytes_written();
        assert!(after_header > 0, "header should advance byte counter");
        enc.write_object(&desc, &data).await.unwrap();
        assert!(
            enc.bytes_written() > after_header,
            "writing an object should advance byte counter"
        );
        let _ = enc.finish().await.unwrap();
    }

    /// Drive the parallel codec path: with a non-zero thread budget and a
    /// zero parallel threshold, `should_parallelise` returns true so the
    /// `intra = self.intra_codec_threads` arm and the parallel-backend
    /// pipeline-config arm are taken.
    #[tokio::test]
    async fn parallel_intra_codec_path_round_trip() {
        let meta = GlobalMetadata::default();
        let mut desc = make_descriptor(vec![1024], Dtype::Float32);
        // zstd is axis-B-friendly so a non-zero intra budget is meaningful.
        desc.compression = "zstd".to_string();
        let data = finite_f32_bytes(1024);

        let options = EncodeOptions {
            threads: 2,
            parallel_threshold_bytes: Some(0),
            ..Default::default()
        };
        let mut enc = AsyncStreamingEncoder::new(Vec::new(), &meta, &options)
            .await
            .unwrap();
        enc.write_object(&desc, &data).await.unwrap();
        let result = enc.finish().await.unwrap();

        let (_, objects) = decode(&result, &DecodeOptions::default()).unwrap();
        assert_eq!(objects.len(), 1);
        assert_eq!(objects[0].1, data);
    }

    /// szip compression emits `block_offsets`, exercising the
    /// `result.block_offsets` insertion into the descriptor params.
    #[cfg(any(feature = "szip", feature = "szip-pure"))]
    #[tokio::test]
    async fn szip_block_offsets_are_recorded() {
        let meta = GlobalMetadata::default();
        let mut desc = make_descriptor(vec![256], Dtype::Float32);
        desc.compression = "szip".to_string();
        desc.params
            .insert("szip_rsi".to_string(), ciborium::Value::Integer(128.into()));
        desc.params.insert(
            "szip_block_size".to_string(),
            ciborium::Value::Integer(16.into()),
        );
        desc.params
            .insert("szip_flags".to_string(), ciborium::Value::Integer(0.into()));
        let data = finite_f32_bytes(256);

        let mut enc = AsyncStreamingEncoder::new(Vec::new(), &meta, &EncodeOptions::default())
            .await
            .unwrap();
        enc.write_object(&desc, &data).await.unwrap();
        let result = enc.finish().await.unwrap();

        let (_, objects) = decode(&result, &DecodeOptions::default()).unwrap();
        assert_eq!(objects.len(), 1);
        assert_eq!(objects[0].1, data);
        // The roundtripped descriptor should carry szip block offsets.
        assert!(
            objects[0].0.params.contains_key("szip_block_offsets"),
            "szip encode should record block offsets"
        );
    }

    /// Pre-encoded szip path: extract real szip payload + descriptor (which
    /// carries `szip_block_offsets`) from a sync-encoded message and feed it
    /// to `write_object_pre_encoded`, driving the szip-offset validation arm.
    ///
    /// Uses `simple_packing` as the encoding wrapper: `validate_object`
    /// enforces a raw-size check only when `encoding == "none"`, so a
    /// non-trivial-compression pre-encoded payload must wear a non-"none"
    /// encoding (same constraint the sync `encode_pre_encoded` tests rely on).
    #[cfg(any(feature = "szip", feature = "szip-pure"))]
    #[tokio::test]
    async fn pre_encoded_szip_validates_block_offsets() {
        use crate::encode::encode;
        use tensogram_encodings::simple_packing;

        let values: Vec<f64> = (0..4096).map(|i| 250.0 + i as f64 * 0.1).collect();
        let raw: Vec<u8> = values.iter().flat_map(|v| v.to_be_bytes()).collect();
        let p = simple_packing::compute_params(&values, 16, 0).unwrap();

        let mut desc = make_descriptor(vec![4096], Dtype::Float64);
        desc.byte_order = ByteOrder::Big;
        desc.encoding = "simple_packing".to_string();
        desc.compression = "szip".to_string();
        desc.params.insert(
            "sp_reference_value".to_string(),
            ciborium::Value::Float(p.reference_value),
        );
        desc.params.insert(
            "sp_binary_scale_factor".to_string(),
            ciborium::Value::Integer((p.binary_scale_factor as i64).into()),
        );
        desc.params.insert(
            "sp_decimal_scale_factor".to_string(),
            ciborium::Value::Integer((p.decimal_scale_factor as i64).into()),
        );
        desc.params.insert(
            "sp_bits_per_value".to_string(),
            ciborium::Value::Integer((p.bits_per_value as i64).into()),
        );
        desc.params
            .insert("szip_rsi".to_string(), ciborium::Value::Integer(128.into()));
        desc.params.insert(
            "szip_block_size".to_string(),
            ciborium::Value::Integer(16.into()),
        );
        desc.params.insert(
            "szip_flags".to_string(),
            ciborium::Value::Integer(8_i64.into()),
        );

        let meta = GlobalMetadata::default();

        // Encode once to obtain real szip payload + the descriptor with
        // `szip_block_offsets` populated by the encoder.
        let msg = encode(&meta, &[(&desc, &raw)], &EncodeOptions::default()).unwrap();
        let dec = crate::framing::decode_message(&msg).unwrap();
        assert_eq!(dec.objects.len(), 1);
        let (extracted_desc, payload_slice, _mask, _off) = &dec.objects[0];
        let extracted_desc = extracted_desc.clone();
        let payload = payload_slice.to_vec();
        assert!(
            extracted_desc.params.contains_key("szip_block_offsets"),
            "encoder should have populated szip_block_offsets"
        );

        let mut enc = AsyncStreamingEncoder::new(Vec::new(), &meta, &EncodeOptions::default())
            .await
            .unwrap();
        enc.write_object_pre_encoded(&extracted_desc, &payload)
            .await
            .unwrap();
        let result = enc.finish().await.unwrap();

        let (_, objects) = decode(&result, &DecodeOptions::default()).unwrap();
        assert_eq!(objects.len(), 1);
        assert!(objects[0].0.params.contains_key("szip_block_offsets"));
    }

    /// `compose_payload_region` / `substitute_and_mask` error propagation:
    /// a non-finite payload without an allow flag must surface as an error
    /// from `write_object` (covers the `?` on the mask/compose helpers).
    #[tokio::test]
    async fn write_object_rejects_non_finite_without_allow() {
        let meta = GlobalMetadata::default();
        let desc = make_descriptor(vec![4], Dtype::Float32);
        // NaN at index 0, default options forbid NaN without a mask method.
        let mut data = vec![0u8; 16];
        data[..4].copy_from_slice(&f32::NAN.to_ne_bytes());

        let mut enc = AsyncStreamingEncoder::new(Vec::new(), &meta, &EncodeOptions::default())
            .await
            .unwrap();
        let err = enc.write_object(&desc, &data).await;
        assert!(err.is_err(), "non-finite data must be rejected");
    }
}