Skip to main content

tensogram_wasm/
streaming.rs

1// (C) Copyright 2026- ECMWF and individual contributors.
2//
3// This software is licensed under the terms of the Apache Licence Version 2.0
4// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5// In applying this licence, ECMWF does not waive the privileges and immunities
6// granted to it by virtue of its status as an intergovernmental organisation nor
7// does it submit to any jurisdiction.
8
9//! Frame-by-frame streaming decoder for progressive chunk feeding.
10//!
11//! Accumulates bytes from a stream and decodes complete messages as
12//! they arrive. Each decoded data object is emitted as a `DecodedFrame`
13//! that the JS caller can pull via `next_frame()`.
14//!
15//! ```js
16//! const decoder = new tensogram.StreamingDecoder();
17//! const reader = response.body.getReader();
18//! while (true) {
19//!   const { done, value } = await reader.read();
20//!   if (done) break;
21//!   decoder.feed(value);
22//!   let frame;
23//!   while ((frame = decoder.next_frame())) {
24//!     const data = frame.data_f32();
25//!     renderToCanvas(frame.descriptor().shape, data);
26//!     frame.free();
27//!   }
28//! }
29//! decoder.free();
30//! ```
31
32use crate::convert::*;
33use tensogram as core;
34use wasm_bindgen::prelude::*;
35
36/// Default maximum buffer size: 256 MiB.  Prevents unbounded memory
37/// growth when the stream contains garbage or an incomplete message
38/// header that never completes.
39const DEFAULT_MAX_BUFFER: usize = 256 * 1024 * 1024;
40
41/// Frame-by-frame streaming decoder.
42///
43/// Accumulates bytes from progressive feeding and emits decoded data
44/// objects as complete messages arrive.
45///
46/// **Error visibility**: If a scanned message fails to decode (corrupt
47/// payload), the error is captured in `last_error()` and the decoder
48/// advances past the bad message.  Call `last_error()` after each
49/// `feed()` to check for skipped messages.
50///
51/// **Memory limit**: The internal buffer is capped at 256 MiB by
52/// default.  Call `set_max_buffer(n)` to change it.  Exceeding the
53/// limit makes `feed()` return a thrown `JsValue` (a `js_sys::Error`).
54#[wasm_bindgen]
55pub struct StreamingDecoder {
56    buffer: Vec<u8>,
57    /// Byte offset of the next unprocessed region.
58    consumed: usize,
59    /// Global metadata from the most recently decoded message.
60    global_metadata: Option<core::GlobalMetadata>,
61    /// Queue of decoded frames ready for JS to consume.
62    ready_frames: std::collections::VecDeque<DecodedFrame>,
63    /// Last decode error (if a scanned message failed to decode).
64    last_decode_error: Option<String>,
65    /// Count of messages that were scanned but failed to decode.
66    skipped_messages: usize,
67    /// Maximum buffer size in bytes.
68    max_buffer: usize,
69}
70
71impl Default for StreamingDecoder {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77#[wasm_bindgen]
78impl StreamingDecoder {
79    /// Create a new streaming decoder.
80    #[wasm_bindgen(constructor)]
81    pub fn new() -> Self {
82        Self {
83            buffer: Vec::new(),
84            consumed: 0,
85            global_metadata: None,
86            ready_frames: std::collections::VecDeque::new(),
87            last_decode_error: None,
88            skipped_messages: 0,
89            max_buffer: DEFAULT_MAX_BUFFER,
90        }
91    }
92
93    /// Feed a chunk of bytes into the decoder.
94    ///
95    /// Internally scans for complete messages and decodes each one,
96    /// emitting individual data objects as `DecodedFrame`s.
97    ///
98    /// Returns an error if the internal buffer exceeds `max_buffer` bytes.
99    /// Check `last_error()` after feeding to detect skipped corrupt messages.
100    pub fn feed(&mut self, chunk: &[u8]) -> Result<(), JsValue> {
101        let new_size = (self.buffer.len() - self.consumed)
102            .checked_add(chunk.len())
103            .ok_or_else(|| JsValue::from(js_sys::Error::new("buffer size overflow")))?;
104        if new_size > self.max_buffer {
105            return Err(JsValue::from(js_sys::Error::new(&format!(
106                "streaming buffer would grow to {} bytes (limit {})",
107                new_size, self.max_buffer
108            ))));
109        }
110        // Compact before extending so the actual Vec length (and WASM memory)
111        // stays close to the logical limit instead of growing by `consumed`.
112        if self.consumed > 0 {
113            self.buffer.drain(..self.consumed);
114            self.consumed = 0;
115        }
116        self.buffer.extend_from_slice(chunk);
117        self.last_decode_error = None; // clear previous error
118        self.try_decode_messages();
119        Ok(())
120    }
121
122    /// Pull the next decoded data object frame, or `undefined` if none ready.
123    ///
124    /// In JavaScript, `wasm-bindgen` maps Rust `None` to `undefined`.
125    /// Use a truthiness check: `while ((frame = decoder.next_frame()))`.
126    pub fn next_frame(&mut self) -> Option<DecodedFrame> {
127        self.ready_frames.pop_front()
128    }
129
130    /// Whether global metadata has been received from at least one message.
131    pub fn has_metadata(&self) -> bool {
132        self.global_metadata.is_some()
133    }
134
135    /// Get the global metadata from the most recently decoded message.
136    pub fn metadata(&self) -> Result<JsValue, JsValue> {
137        match &self.global_metadata {
138            // Use `metadata_to_js` so `version` is synthesised from
139            // the preamble for TypeScript ergonomics — the CBOR
140            // metadata frame no longer carries a `version` key in v3.
141            Some(meta) => crate::convert::metadata_to_js(meta),
142            None => Ok(JsValue::NULL),
143        }
144    }
145
146    /// Number of decoded frames ready to consume.
147    pub fn pending_count(&self) -> usize {
148        self.ready_frames.len()
149    }
150
151    /// Total bytes buffered but not yet decoded.
152    pub fn buffered_bytes(&self) -> usize {
153        self.buffer.len() - self.consumed
154    }
155
156    /// Error message from the last skipped (corrupt) message, or null.
157    ///
158    /// Cleared on each `feed()` call.  If non-null, at least one message
159    /// found by the scanner failed to decode and was skipped.
160    pub fn last_error(&self) -> Option<String> {
161        self.last_decode_error.clone()
162    }
163
164    /// Total number of messages that were skipped due to decode errors
165    /// since the decoder was created or last reset.
166    pub fn skipped_count(&self) -> usize {
167        self.skipped_messages
168    }
169
170    /// Set the maximum internal buffer size in bytes (default: 256 MiB).
171    ///
172    /// The limit applies to the total *unprocessed* bytes (already-buffered
173    /// bytes plus the next incoming chunk).  If adding a new chunk would
174    /// exceed this limit, `feed()` returns an error and the chunk is not
175    /// buffered.  Reducing the limit below the current buffer size takes
176    /// effect on the next `feed()` call.
177    pub fn set_max_buffer(&mut self, max_bytes: usize) {
178        self.max_buffer = max_bytes;
179    }
180
181    /// Reset the decoder, clearing all buffered data and pending frames.
182    pub fn reset(&mut self) {
183        self.buffer.clear();
184        self.consumed = 0;
185        self.global_metadata = None;
186        self.ready_frames.clear();
187        self.last_decode_error = None;
188        self.skipped_messages = 0;
189    }
190}
191
192impl StreamingDecoder {
193    fn try_decode_messages(&mut self) {
194        debug_assert!(
195            self.consumed <= self.buffer.len(),
196            "consumed ({}) > buffer.len() ({})",
197            self.consumed,
198            self.buffer.len()
199        );
200
201        let remaining = &self.buffer[self.consumed..];
202        if remaining.is_empty() {
203            return;
204        }
205
206        // Scan once for ALL complete messages in the remaining buffer.
207        // This is O(n) instead of re-scanning after each decoded message.
208        let positions = core::scan(remaining);
209
210        let options = core::DecodeOptions {
211            ..Default::default()
212        };
213
214        // Track the furthest byte position we've successfully processed.
215        // `msg_end` values are relative to `remaining` (= buffer[consumed..]),
216        // so we record the max end seen and advance `consumed` by that amount
217        // once at the end.
218        let mut furthest = 0usize;
219
220        for (msg_start, msg_len) in positions {
221            let msg_end = msg_start + msg_len;
222
223            if msg_end > remaining.len() {
224                break; // Incomplete trailing message — wait for more data
225            }
226
227            let msg_bytes = &remaining[msg_start..msg_end];
228
229            match core::decode(msg_bytes, &options) {
230                Ok((metadata, objects)) => {
231                    let base_entries = &metadata.base;
232
233                    for (i, (descriptor, data)) in objects.into_iter().enumerate() {
234                        let base_entry = base_entries.get(i).cloned();
235                        self.ready_frames.push_back(DecodedFrame {
236                            descriptor,
237                            data,
238                            base_entry,
239                        });
240                    }
241
242                    self.global_metadata = Some(metadata);
243                }
244                Err(e) => {
245                    // Record the error so JS callers can inspect it.
246                    // We still advance past the bad message to avoid an
247                    // infinite re-scan loop.
248                    self.last_decode_error = Some(e.to_string());
249                    self.skipped_messages += 1;
250                }
251            }
252
253            furthest = msg_end;
254        }
255
256        self.consumed += furthest;
257    }
258}
259
260// ── DecodedFrame ─────────────────────────────────────────────────────────────
261
262/// A single decoded data object from the streaming decoder.
263///
264/// Owns the decoded payload data.  Use `data_f32()` etc. for zero-copy
265/// TypedArray views.  Call `.free()` when done to release WASM memory.
266#[wasm_bindgen]
267pub struct DecodedFrame {
268    descriptor: core::DataObjectDescriptor,
269    data: Vec<u8>,
270    base_entry: Option<std::collections::BTreeMap<String, ciborium::Value>>,
271}
272
273#[wasm_bindgen]
274impl DecodedFrame {
275    /// Object descriptor (shape, dtype, encoding, etc.) as a JS object.
276    pub fn descriptor(&self) -> Result<JsValue, JsValue> {
277        to_js(&self.descriptor)
278    }
279
280    /// Per-object metadata entry from the base array (if available).
281    pub fn base_entry(&self) -> Result<JsValue, JsValue> {
282        match &self.base_entry {
283            Some(entry) => to_js(entry),
284            None => Ok(JsValue::NULL),
285        }
286    }
287
288    /// Zero-copy Float32Array view into decoded payload.
289    ///
290    /// **Warning**: invalidated if WASM memory grows.
291    pub fn data_f32(&self) -> Result<js_sys::Float32Array, JsValue> {
292        view_as_f32(&self.data)
293    }
294
295    /// Zero-copy Float64Array view.
296    pub fn data_f64(&self) -> Result<js_sys::Float64Array, JsValue> {
297        view_as_f64(&self.data)
298    }
299
300    /// Zero-copy Int32Array view.
301    pub fn data_i32(&self) -> Result<js_sys::Int32Array, JsValue> {
302        view_as_i32(&self.data)
303    }
304
305    /// Zero-copy Uint8Array view.
306    pub fn data_u8(&self) -> Result<js_sys::Uint8Array, JsValue> {
307        Ok(view_as_u8(&self.data))
308    }
309
310    /// Payload byte length.
311    pub fn byte_length(&self) -> usize {
312        self.data.len()
313    }
314}