Skip to main content

tensogram_wasm/
encoder.rs

1// (C) Copyright 2026- ECMWF and individual contributors.
2//
3// This software is licensed under the terms of the Apache Licence Version 2.0
4// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5// In applying this licence, ECMWF does not waive the privileges and immunities
6// granted to it by virtue of its status as an intergovernmental organisation nor
7// does it submit to any jurisdiction.
8
9//! Frame-at-a-time streaming encoder for JavaScript callers.
10//!
11//! Two operating modes, both driven by the Rust-core
12//! `StreamingEncoder<W: Write>` generic:
13//!
14//! - **Buffered (default).**  The sink is an in-memory `Vec<u8>`.  Every
15//!   `write_object` / `write_preceder` appends to the buffer; `finish()`
16//!   returns the complete wire-format message as a `Uint8Array`.
17//!   Matches the Python `StreamingEncoder` model.
18//!
19//! - **Streaming.**  The sink is a [`JsCallbackWriter`] that forwards
20//!   every chunk of bytes the core encoder emits to a caller-provided
21//!   `(chunk: Uint8Array) => void` JS callback.  No full-message
22//!   buffering — the callback is invoked during construction (preamble +
23//!   header frames), during each `write_object` (one data-object frame's
24//!   bytes), and during `finish()` (footer frames + postamble).
25//!   `finish()` still returns a `Uint8Array`, but in streaming mode it
26//!   is empty because every byte has already gone through the callback.
27//!
28//! The TypeScript wrapper selects the mode via
29//! `StreamingEncoderOptions.onBytes`; the mode is fixed at construction
30//! time and cannot be switched.
31//!
32//! ```js
33//! // Buffered (default):
34//! const enc = new StreamingEncoder({ version: 3 });
35//! enc.writeObject(descriptor, new Float32Array([1, 2, 3]));
36//! const bytes = enc.finish();          // full wire-format message
37//!
38//! // Streaming:
39//! const chunks = [];
40//! const enc = new StreamingEncoder({ version: 3 }, {
41//!   onBytes: (chunk) => chunks.push(new Uint8Array(chunk)),
42//! });
43//! enc.writeObject(descriptor, new Float32Array([1, 2, 3]));
44//! enc.finish();                        // callback has received every chunk
45//! const bytes = Uint8Array.from(chunks.flatMap((c) => Array.from(c)));
46//! ```
47
48use crate::convert::*;
49use std::collections::BTreeMap;
50use std::io::Write;
51use tensogram::{self as core, TensogramError};
52use wasm_bindgen::prelude::*;
53
54// ── Sinks ───────────────────────────────────────────────────────────────────
55
56type BufferedEncoder = core::streaming::StreamingEncoder<Vec<u8>>;
57type StreamingCoreEncoder = core::streaming::StreamingEncoder<JsCallbackWriter>;
58
59/// Internal sink selection — the mode is fixed at construction.
60enum Inner {
61    Buffered(BufferedEncoder),
62    Streaming(StreamingCoreEncoder),
63}
64
65impl Inner {
66    fn write_preceder(
67        &mut self,
68        map: BTreeMap<String, ciborium::Value>,
69    ) -> Result<(), TensogramError> {
70        match self {
71            Inner::Buffered(e) => e.write_preceder(map),
72            Inner::Streaming(e) => e.write_preceder(map),
73        }
74    }
75
76    fn write_object(
77        &mut self,
78        desc: &core::DataObjectDescriptor,
79        bytes: &[u8],
80    ) -> Result<(), TensogramError> {
81        match self {
82            Inner::Buffered(e) => e.write_object(desc, bytes),
83            Inner::Streaming(e) => e.write_object(desc, bytes),
84        }
85    }
86
87    fn write_object_pre_encoded(
88        &mut self,
89        desc: &core::DataObjectDescriptor,
90        bytes: &[u8],
91    ) -> Result<(), TensogramError> {
92        match self {
93            Inner::Buffered(e) => e.write_object_pre_encoded(desc, bytes),
94            Inner::Streaming(e) => e.write_object_pre_encoded(desc, bytes),
95        }
96    }
97
98    fn object_count(&self) -> usize {
99        match self {
100            Inner::Buffered(e) => e.object_count(),
101            Inner::Streaming(e) => e.object_count(),
102        }
103    }
104
105    fn bytes_written(&self) -> u64 {
106        match self {
107            Inner::Buffered(e) => e.bytes_written(),
108            Inner::Streaming(e) => e.bytes_written(),
109        }
110    }
111}
112
113/// `std::io::Write` sink that forwards every chunk of bytes to a
114/// synchronous JavaScript callback.
115///
116/// The callback must be synchronous — any `Promise` it returns is
117/// silently discarded because the Rust `Write::write` contract is
118/// synchronous.  The TS wrapper documents this contract; callers that
119/// need async work (e.g. `fetch` upload) should either buffer
120/// internally first or use the buffered mode with a single `fetch`
121/// call.
122///
123/// Errors thrown by the callback surface as
124/// `std::io::Error::other(...)`, which the core encoder then wraps as
125/// `TensogramError::Io` — the TypeScript wrapper routes this to
126/// `IoError` via the standard `mapTensogramError` path.
127struct JsCallbackWriter {
128    callback: js_sys::Function,
129}
130
131impl JsCallbackWriter {
132    fn new(callback: js_sys::Function) -> Self {
133        Self { callback }
134    }
135}
136
137impl Write for JsCallbackWriter {
138    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
139        // `Uint8Array::from(&[u8])` copies into JS-heap memory — the
140        // callback receives a fresh, JS-owned view each time, so the
141        // caller can hold on to it past the Rust side of this call.
142        let chunk = js_sys::Uint8Array::from(buf);
143        let this = JsValue::NULL;
144        match self.callback.call1(&this, &chunk) {
145            Ok(_) => Ok(buf.len()),
146            Err(js_err) => {
147                let message = js_err.as_string().unwrap_or_else(|| format!("{js_err:?}"));
148                Err(std::io::Error::other(format!(
149                    "streaming sink callback failed: {message}"
150                )))
151            }
152        }
153    }
154
155    fn flush(&mut self) -> std::io::Result<()> {
156        Ok(())
157    }
158}
159
160// ── Exported class ──────────────────────────────────────────────────────────
161
162/// Streaming encoder with a selectable sink: in-memory buffer (default)
163/// or caller-supplied JS callback.
164///
165/// Lifecycle:
166/// 1. `new(meta, hash?, on_bytes?)` — writes preamble + header metadata
167///    frame.  In buffered mode these bytes accumulate internally; in
168///    streaming mode they flow to `on_bytes` immediately.
169/// 2. Zero or more `write_preceder(meta)` / `write_object(desc, data)` /
170///    `write_object_pre_encoded(desc, data)` calls.
171/// 3. `finish()` writes the footer + postamble.  In buffered mode
172///    returns the complete `Uint8Array`; in streaming mode returns an
173///    empty `Uint8Array` (the callback has seen every byte).
174///
175/// After `finish()` the encoder is closed — every further method call
176/// throws "already finished".  Callers must still invoke the
177/// wasm-bindgen `free()` method when done with the handle.
178#[wasm_bindgen]
179pub struct StreamingEncoder {
180    /// `None` once `finish()` has been called — every mutator checks for
181    /// this and raises a clean "already finished" error.
182    inner: Option<Inner>,
183}
184
185#[wasm_bindgen]
186impl StreamingEncoder {
187    /// Begin a new streaming message.
188    ///
189    /// @param metadata_js - GlobalMetadata (JS object).  Must contain
190    ///   `version`; `base` may carry per-object entries; `_reserved_` is
191    ///   rejected (library-managed).
192    /// @param hash - When `true` (default), xxh3 hashes are computed
193    ///   per object and stored in the footer hash frame.  When `false`,
194    ///   hashing is disabled entirely.
195    /// @param on_bytes - Optional synchronous callback invoked with
196    ///   each chunk of wire-format bytes the encoder produces.  When
197    ///   present, no internal buffering is performed and `finish()`
198    ///   returns an empty `Uint8Array`.  When absent, the encoder
199    ///   buffers to an internal `Vec<u8>` and `finish()` returns the
200    ///   complete message.
201    #[wasm_bindgen(constructor)]
202    #[allow(clippy::too_many_arguments)]
203    pub fn new(
204        metadata_js: JsValue,
205        hash: Option<bool>,
206        on_bytes: Option<js_sys::Function>,
207        allow_nan: Option<bool>,
208        allow_inf: Option<bool>,
209        nan_mask_method: Option<String>,
210        pos_inf_mask_method: Option<String>,
211        neg_inf_mask_method: Option<String>,
212        small_mask_threshold_bytes: Option<usize>,
213    ) -> Result<StreamingEncoder, JsValue> {
214        let metadata = metadata_from_js(&metadata_js)?;
215        let options = build_encode_options_full(
216            hash,
217            allow_nan,
218            allow_inf,
219            nan_mask_method.as_deref(),
220            pos_inf_mask_method.as_deref(),
221            neg_inf_mask_method.as_deref(),
222            small_mask_threshold_bytes,
223        )?;
224        let inner = match on_bytes {
225            Some(cb) => {
226                let sink = JsCallbackWriter::new(cb);
227                Inner::Streaming(
228                    StreamingCoreEncoder::new(sink, &metadata, &options).map_err(js_err)?,
229                )
230            }
231            None => Inner::Buffered(
232                BufferedEncoder::new(Vec::new(), &metadata, &options).map_err(js_err)?,
233            ),
234        };
235        Ok(StreamingEncoder { inner: Some(inner) })
236    }
237
238    /// Write a PrecederMetadata frame for the next data object.
239    ///
240    /// The provided object is merged into a GlobalMetadata with a
241    /// single-entry `base` array.  Must be followed by exactly one
242    /// `write_object` / `write_object_pre_encoded` call before another
243    /// `write_preceder` or `finish`.
244    pub fn write_preceder(&mut self, metadata_js: JsValue) -> Result<(), JsValue> {
245        let inner = self.inner.as_mut().ok_or_else(already_finished)?;
246        let map: BTreeMap<String, ciborium::Value> =
247            serde_wasm_bindgen::from_value(metadata_js).map_err(js_err_display)?;
248        inner.write_preceder(map).map_err(js_err)
249    }
250
251    /// Encode and write a single data object.
252    ///
253    /// @param descriptor_js - `DataObjectDescriptor` as a plain JS object.
254    /// @param data - Raw native-endian payload as any TypedArray.
255    pub fn write_object(&mut self, descriptor_js: JsValue, data: JsValue) -> Result<(), JsValue> {
256        self.write_with(descriptor_js, data, |inner, desc, bytes| {
257            inner.write_object(desc, bytes)
258        })
259    }
260
261    /// Write a pre-encoded data object directly (no pipeline).
262    ///
263    /// `data` must already be encoded according to the descriptor's
264    /// `encoding` / `filter` / `compression`.  The library does not run
265    /// the pipeline — it validates descriptor structure and the szip
266    /// block offsets (if any) and writes bytes verbatim.  The hash is
267    /// recomputed from these bytes if the encoder was constructed with
268    /// `hash: true`.
269    pub fn write_object_pre_encoded(
270        &mut self,
271        descriptor_js: JsValue,
272        data: JsValue,
273    ) -> Result<(), JsValue> {
274        self.write_with(descriptor_js, data, |inner, desc, bytes| {
275            inner.write_object_pre_encoded(desc, bytes)
276        })
277    }
278
279    /// Number of data objects written so far.  Zero after `new()`,
280    /// increments on every successful `write_object` /
281    /// `write_object_pre_encoded`.
282    pub fn object_count(&self) -> Result<usize, JsValue> {
283        Ok(self
284            .inner
285            .as_ref()
286            .ok_or_else(already_finished)?
287            .object_count())
288    }
289
290    /// Total bytes produced so far (preamble + header frames + all
291    /// completed data-object frames).  In buffered mode this equals
292    /// the length of the internal buffer; in streaming mode it equals
293    /// the sum of byte-lengths passed to the callback.
294    ///
295    /// Returned as `f64` because JS numbers are the lingua-franca for
296    /// sizes on the wire boundary.  `Number.MAX_SAFE_INTEGER` ≈ 9 PiB,
297    /// which is well beyond any realistic Tensogram message.
298    pub fn bytes_written(&self) -> Result<f64, JsValue> {
299        Ok(self
300            .inner
301            .as_ref()
302            .ok_or_else(already_finished)?
303            .bytes_written() as f64)
304    }
305
306    /// Finalise the encoder, writing footer frames + postamble.
307    ///
308    /// In buffered mode returns the complete wire-format `Uint8Array`.
309    /// In streaming mode the footer bytes flow through the callback
310    /// and the return value is an empty `Uint8Array` (zero-length
311    /// marker, not a failure — every byte has already been delivered).
312    ///
313    /// After this call the encoder is closed — any further method call
314    /// throws "already finished".  Callers must still invoke the
315    /// wasm-bindgen `free()` method when done with the handle.
316    pub fn finish(&mut self) -> Result<js_sys::Uint8Array, JsValue> {
317        let inner = self.inner.take().ok_or_else(already_finished)?;
318        match inner {
319            Inner::Buffered(e) => {
320                let buf = e.finish().map_err(js_err)?;
321                Ok(js_sys::Uint8Array::from(buf.as_slice()))
322            }
323            Inner::Streaming(e) => {
324                // The core returns the sink (JsCallbackWriter); we
325                // discard it — every byte has already gone to the JS
326                // callback.  A zero-length `Uint8Array` keeps the
327                // return type stable across both modes.
328                let _sink = e.finish().map_err(js_err)?;
329                Ok(js_sys::Uint8Array::new_with_length(0))
330            }
331        }
332    }
333}
334
335impl StreamingEncoder {
336    /// Shared dispatch for `write_object` / `write_object_pre_encoded`:
337    /// deserialise the descriptor, extract raw bytes from any
338    /// TypedArray, then hand off to the supplied core-level writer.
339    fn write_with(
340        &mut self,
341        descriptor_js: JsValue,
342        data: JsValue,
343        core_fn: impl FnOnce(
344            &mut Inner,
345            &core::DataObjectDescriptor,
346            &[u8],
347        ) -> Result<(), TensogramError>,
348    ) -> Result<(), JsValue> {
349        let inner = self.inner.as_mut().ok_or_else(already_finished)?;
350        let desc: core::DataObjectDescriptor =
351            serde_wasm_bindgen::from_value(descriptor_js).map_err(js_err_display)?;
352        let bytes = typed_array_or_u8_to_bytes(&data)
353            .ok_or_else(|| JsValue::from(js_sys::Error::new("data must be a TypedArray or Uint8Array")))?;
354        core_fn(inner, &desc, &bytes).map_err(js_err)
355    }
356}
357
358fn already_finished() -> JsValue {
359    js_sys::Error::new("StreamingEncoder already finished").into()
360}