tensogram_wasm/encoder.rs
1// (C) Copyright 2026- ECMWF and individual contributors.
2//
3// This software is licensed under the terms of the Apache Licence Version 2.0
4// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5// In applying this licence, ECMWF does not waive the privileges and immunities
6// granted to it by virtue of its status as an intergovernmental organisation nor
7// does it submit to any jurisdiction.
8
9//! Frame-at-a-time streaming encoder for JavaScript callers.
10//!
11//! Two operating modes, both driven by the Rust-core
12//! `StreamingEncoder<W: Write>` generic:
13//!
14//! - **Buffered (default).** The sink is an in-memory `Vec<u8>`. Every
15//! `write_object` / `write_preceder` appends to the buffer; `finish()`
16//! returns the complete wire-format message as a `Uint8Array`.
17//! Matches the Python `StreamingEncoder` model.
18//!
19//! - **Streaming.** The sink is a [`JsCallbackWriter`] that forwards
20//! every chunk of bytes the core encoder emits to a caller-provided
21//! `(chunk: Uint8Array) => void` JS callback. No full-message
22//! buffering — the callback is invoked during construction (preamble +
23//! header frames), during each `write_object` (one data-object frame's
24//! bytes), and during `finish()` (footer frames + postamble).
25//! `finish()` still returns a `Uint8Array`, but in streaming mode it
26//! is empty because every byte has already gone through the callback.
27//!
28//! The TypeScript wrapper selects the mode via
29//! `StreamingEncoderOptions.onBytes`; the mode is fixed at construction
30//! time and cannot be switched.
31//!
32//! ```js
33//! // Buffered (default):
34//! const enc = new StreamingEncoder({ version: 3 });
35//! enc.writeObject(descriptor, new Float32Array([1, 2, 3]));
36//! const bytes = enc.finish(); // full wire-format message
37//!
38//! // Streaming:
39//! const chunks = [];
40//! const enc = new StreamingEncoder({ version: 3 }, {
41//! onBytes: (chunk) => chunks.push(new Uint8Array(chunk)),
42//! });
43//! enc.writeObject(descriptor, new Float32Array([1, 2, 3]));
44//! enc.finish(); // callback has received every chunk
45//! const bytes = Uint8Array.from(chunks.flatMap((c) => Array.from(c)));
46//! ```
47
48use crate::convert::*;
49use std::collections::BTreeMap;
50use std::io::Write;
51use tensogram::{self as core, TensogramError};
52use wasm_bindgen::prelude::*;
53
54// ── Sinks ───────────────────────────────────────────────────────────────────
55
56type BufferedEncoder = core::streaming::StreamingEncoder<Vec<u8>>;
57type StreamingCoreEncoder = core::streaming::StreamingEncoder<JsCallbackWriter>;
58
59/// Internal sink selection — the mode is fixed at construction.
60enum Inner {
61 Buffered(BufferedEncoder),
62 Streaming(StreamingCoreEncoder),
63}
64
65impl Inner {
66 fn write_preceder(
67 &mut self,
68 map: BTreeMap<String, ciborium::Value>,
69 ) -> Result<(), TensogramError> {
70 match self {
71 Inner::Buffered(e) => e.write_preceder(map),
72 Inner::Streaming(e) => e.write_preceder(map),
73 }
74 }
75
76 fn write_object(
77 &mut self,
78 desc: &core::DataObjectDescriptor,
79 bytes: &[u8],
80 ) -> Result<(), TensogramError> {
81 match self {
82 Inner::Buffered(e) => e.write_object(desc, bytes),
83 Inner::Streaming(e) => e.write_object(desc, bytes),
84 }
85 }
86
87 fn write_object_pre_encoded(
88 &mut self,
89 desc: &core::DataObjectDescriptor,
90 bytes: &[u8],
91 ) -> Result<(), TensogramError> {
92 match self {
93 Inner::Buffered(e) => e.write_object_pre_encoded(desc, bytes),
94 Inner::Streaming(e) => e.write_object_pre_encoded(desc, bytes),
95 }
96 }
97
98 fn object_count(&self) -> usize {
99 match self {
100 Inner::Buffered(e) => e.object_count(),
101 Inner::Streaming(e) => e.object_count(),
102 }
103 }
104
105 fn bytes_written(&self) -> u64 {
106 match self {
107 Inner::Buffered(e) => e.bytes_written(),
108 Inner::Streaming(e) => e.bytes_written(),
109 }
110 }
111}
112
113/// `std::io::Write` sink that forwards every chunk of bytes to a
114/// synchronous JavaScript callback.
115///
116/// The callback must be synchronous — any `Promise` it returns is
117/// silently discarded because the Rust `Write::write` contract is
118/// synchronous. The TS wrapper documents this contract; callers that
119/// need async work (e.g. `fetch` upload) should either buffer
120/// internally first or use the buffered mode with a single `fetch`
121/// call.
122///
123/// Errors thrown by the callback surface as
124/// `std::io::Error::other(...)`, which the core encoder then wraps as
125/// `TensogramError::Io` — the TypeScript wrapper routes this to
126/// `IoError` via the standard `mapTensogramError` path.
127struct JsCallbackWriter {
128 callback: js_sys::Function,
129}
130
131impl JsCallbackWriter {
132 fn new(callback: js_sys::Function) -> Self {
133 Self { callback }
134 }
135}
136
137impl Write for JsCallbackWriter {
138 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
139 // `Uint8Array::from(&[u8])` copies into JS-heap memory — the
140 // callback receives a fresh, JS-owned view each time, so the
141 // caller can hold on to it past the Rust side of this call.
142 let chunk = js_sys::Uint8Array::from(buf);
143 let this = JsValue::NULL;
144 match self.callback.call1(&this, &chunk) {
145 Ok(_) => Ok(buf.len()),
146 Err(js_err) => {
147 let message = js_err.as_string().unwrap_or_else(|| format!("{js_err:?}"));
148 Err(std::io::Error::other(format!(
149 "streaming sink callback failed: {message}"
150 )))
151 }
152 }
153 }
154
155 fn flush(&mut self) -> std::io::Result<()> {
156 Ok(())
157 }
158}
159
160// ── Exported class ──────────────────────────────────────────────────────────
161
162/// Streaming encoder with a selectable sink: in-memory buffer (default)
163/// or caller-supplied JS callback.
164///
165/// Lifecycle:
166/// 1. `new(meta, hash?, on_bytes?)` — writes preamble + header metadata
167/// frame. In buffered mode these bytes accumulate internally; in
168/// streaming mode they flow to `on_bytes` immediately.
169/// 2. Zero or more `write_preceder(meta)` / `write_object(desc, data)` /
170/// `write_object_pre_encoded(desc, data)` calls.
171/// 3. `finish()` writes the footer + postamble. In buffered mode
172/// returns the complete `Uint8Array`; in streaming mode returns an
173/// empty `Uint8Array` (the callback has seen every byte).
174///
175/// After `finish()` the encoder is closed — every further method call
176/// throws "already finished". Callers must still invoke the
177/// wasm-bindgen `free()` method when done with the handle.
178#[wasm_bindgen]
179pub struct StreamingEncoder {
180 /// `None` once `finish()` has been called — every mutator checks for
181 /// this and raises a clean "already finished" error.
182 inner: Option<Inner>,
183}
184
185#[wasm_bindgen]
186impl StreamingEncoder {
187 /// Begin a new streaming message.
188 ///
189 /// @param metadata_js - GlobalMetadata (JS object). Must contain
190 /// `version`; `base` may carry per-object entries; `_reserved_` is
191 /// rejected (library-managed).
192 /// @param hash - When `true` (default), xxh3 hashes are computed
193 /// per object and stored in the footer hash frame. When `false`,
194 /// hashing is disabled entirely.
195 /// @param on_bytes - Optional synchronous callback invoked with
196 /// each chunk of wire-format bytes the encoder produces. When
197 /// present, no internal buffering is performed and `finish()`
198 /// returns an empty `Uint8Array`. When absent, the encoder
199 /// buffers to an internal `Vec<u8>` and `finish()` returns the
200 /// complete message.
201 #[wasm_bindgen(constructor)]
202 #[allow(clippy::too_many_arguments)]
203 pub fn new(
204 metadata_js: JsValue,
205 hash: Option<bool>,
206 on_bytes: Option<js_sys::Function>,
207 allow_nan: Option<bool>,
208 allow_inf: Option<bool>,
209 nan_mask_method: Option<String>,
210 pos_inf_mask_method: Option<String>,
211 neg_inf_mask_method: Option<String>,
212 small_mask_threshold_bytes: Option<usize>,
213 ) -> Result<StreamingEncoder, JsValue> {
214 let metadata = metadata_from_js(&metadata_js)?;
215 let options = build_encode_options_full(
216 hash,
217 allow_nan,
218 allow_inf,
219 nan_mask_method.as_deref(),
220 pos_inf_mask_method.as_deref(),
221 neg_inf_mask_method.as_deref(),
222 small_mask_threshold_bytes,
223 )?;
224 let inner = match on_bytes {
225 Some(cb) => {
226 let sink = JsCallbackWriter::new(cb);
227 Inner::Streaming(
228 StreamingCoreEncoder::new(sink, &metadata, &options).map_err(js_err)?,
229 )
230 }
231 None => Inner::Buffered(
232 BufferedEncoder::new(Vec::new(), &metadata, &options).map_err(js_err)?,
233 ),
234 };
235 Ok(StreamingEncoder { inner: Some(inner) })
236 }
237
238 /// Write a PrecederMetadata frame for the next data object.
239 ///
240 /// The provided object is merged into a GlobalMetadata with a
241 /// single-entry `base` array. Must be followed by exactly one
242 /// `write_object` / `write_object_pre_encoded` call before another
243 /// `write_preceder` or `finish`.
244 pub fn write_preceder(&mut self, metadata_js: JsValue) -> Result<(), JsValue> {
245 let inner = self.inner.as_mut().ok_or_else(already_finished)?;
246 let map: BTreeMap<String, ciborium::Value> =
247 serde_wasm_bindgen::from_value(metadata_js).map_err(js_err_display)?;
248 inner.write_preceder(map).map_err(js_err)
249 }
250
251 /// Encode and write a single data object.
252 ///
253 /// @param descriptor_js - `DataObjectDescriptor` as a plain JS object.
254 /// @param data - Raw native-endian payload as any TypedArray.
255 pub fn write_object(&mut self, descriptor_js: JsValue, data: JsValue) -> Result<(), JsValue> {
256 self.write_with(descriptor_js, data, |inner, desc, bytes| {
257 inner.write_object(desc, bytes)
258 })
259 }
260
261 /// Write a pre-encoded data object directly (no pipeline).
262 ///
263 /// `data` must already be encoded according to the descriptor's
264 /// `encoding` / `filter` / `compression`. The library does not run
265 /// the pipeline — it validates descriptor structure and the szip
266 /// block offsets (if any) and writes bytes verbatim. The hash is
267 /// recomputed from these bytes if the encoder was constructed with
268 /// `hash: true`.
269 pub fn write_object_pre_encoded(
270 &mut self,
271 descriptor_js: JsValue,
272 data: JsValue,
273 ) -> Result<(), JsValue> {
274 self.write_with(descriptor_js, data, |inner, desc, bytes| {
275 inner.write_object_pre_encoded(desc, bytes)
276 })
277 }
278
279 /// Number of data objects written so far. Zero after `new()`,
280 /// increments on every successful `write_object` /
281 /// `write_object_pre_encoded`.
282 pub fn object_count(&self) -> Result<usize, JsValue> {
283 Ok(self
284 .inner
285 .as_ref()
286 .ok_or_else(already_finished)?
287 .object_count())
288 }
289
290 /// Total bytes produced so far (preamble + header frames + all
291 /// completed data-object frames). In buffered mode this equals
292 /// the length of the internal buffer; in streaming mode it equals
293 /// the sum of byte-lengths passed to the callback.
294 ///
295 /// Returned as `f64` because JS numbers are the lingua-franca for
296 /// sizes on the wire boundary. `Number.MAX_SAFE_INTEGER` ≈ 9 PiB,
297 /// which is well beyond any realistic Tensogram message.
298 pub fn bytes_written(&self) -> Result<f64, JsValue> {
299 Ok(self
300 .inner
301 .as_ref()
302 .ok_or_else(already_finished)?
303 .bytes_written() as f64)
304 }
305
306 /// Finalise the encoder, writing footer frames + postamble.
307 ///
308 /// In buffered mode returns the complete wire-format `Uint8Array`.
309 /// In streaming mode the footer bytes flow through the callback
310 /// and the return value is an empty `Uint8Array` (zero-length
311 /// marker, not a failure — every byte has already been delivered).
312 ///
313 /// After this call the encoder is closed — any further method call
314 /// throws "already finished". Callers must still invoke the
315 /// wasm-bindgen `free()` method when done with the handle.
316 pub fn finish(&mut self) -> Result<js_sys::Uint8Array, JsValue> {
317 let inner = self.inner.take().ok_or_else(already_finished)?;
318 match inner {
319 Inner::Buffered(e) => {
320 let buf = e.finish().map_err(js_err)?;
321 Ok(js_sys::Uint8Array::from(buf.as_slice()))
322 }
323 Inner::Streaming(e) => {
324 // The core returns the sink (JsCallbackWriter); we
325 // discard it — every byte has already gone to the JS
326 // callback. A zero-length `Uint8Array` keeps the
327 // return type stable across both modes.
328 let _sink = e.finish().map_err(js_err)?;
329 Ok(js_sys::Uint8Array::new_with_length(0))
330 }
331 }
332 }
333}
334
335impl StreamingEncoder {
336 /// Shared dispatch for `write_object` / `write_object_pre_encoded`:
337 /// deserialise the descriptor, extract raw bytes from any
338 /// TypedArray, then hand off to the supplied core-level writer.
339 fn write_with(
340 &mut self,
341 descriptor_js: JsValue,
342 data: JsValue,
343 core_fn: impl FnOnce(
344 &mut Inner,
345 &core::DataObjectDescriptor,
346 &[u8],
347 ) -> Result<(), TensogramError>,
348 ) -> Result<(), JsValue> {
349 let inner = self.inner.as_mut().ok_or_else(already_finished)?;
350 let desc: core::DataObjectDescriptor =
351 serde_wasm_bindgen::from_value(descriptor_js).map_err(js_err_display)?;
352 let bytes = typed_array_or_u8_to_bytes(&data)
353 .ok_or_else(|| JsValue::from(js_sys::Error::new("data must be a TypedArray or Uint8Array")))?;
354 core_fn(inner, &desc, &bytes).map_err(js_err)
355 }
356}
357
358fn already_finished() -> JsValue {
359 js_sys::Error::new("StreamingEncoder already finished").into()
360}