tensogram_wasm/streaming.rs
1// (C) Copyright 2026- ECMWF and individual contributors.
2//
3// This software is licensed under the terms of the Apache Licence Version 2.0
4// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5// In applying this licence, ECMWF does not waive the privileges and immunities
6// granted to it by virtue of its status as an intergovernmental organisation nor
7// does it submit to any jurisdiction.
8
9//! Frame-by-frame streaming decoder for progressive chunk feeding.
10//!
11//! Accumulates bytes from a stream and decodes complete messages as
12//! they arrive. Each decoded data object is emitted as a `DecodedFrame`
13//! that the JS caller can pull via `next_frame()`.
14//!
15//! ```js
16//! const decoder = new tensogram.StreamingDecoder();
17//! const reader = response.body.getReader();
18//! while (true) {
19//! const { done, value } = await reader.read();
20//! if (done) break;
21//! decoder.feed(value);
22//! let frame;
23//! while ((frame = decoder.next_frame())) {
24//! const data = frame.data_f32();
25//! renderToCanvas(frame.descriptor().shape, data);
26//! frame.free();
27//! }
28//! }
29//! decoder.free();
30//! ```
31
32use crate::convert::*;
33use tensogram as core;
34use wasm_bindgen::prelude::*;
35
36/// Default maximum buffer size: 256 MiB. Prevents unbounded memory
37/// growth when the stream contains garbage or an incomplete message
38/// header that never completes.
39const DEFAULT_MAX_BUFFER: usize = 256 * 1024 * 1024;
40
41/// Frame-by-frame streaming decoder.
42///
43/// Accumulates bytes from progressive feeding and emits decoded data
44/// objects as complete messages arrive.
45///
46/// **Error visibility**: If a scanned message fails to decode (corrupt
47/// payload), the error is captured in `last_error()` and the decoder
48/// advances past the bad message. Call `last_error()` after each
49/// `feed()` to check for skipped messages.
50///
51/// **Memory limit**: The internal buffer is capped at 256 MiB by
52/// default. Call `set_max_buffer(n)` to change it. Exceeding the
53/// limit makes `feed()` return a thrown `JsValue` (a `js_sys::Error`).
54#[wasm_bindgen]
55pub struct StreamingDecoder {
56 buffer: Vec<u8>,
57 /// Byte offset of the next unprocessed region.
58 consumed: usize,
59 /// Global metadata from the most recently decoded message.
60 global_metadata: Option<core::GlobalMetadata>,
61 /// Queue of decoded frames ready for JS to consume.
62 ready_frames: std::collections::VecDeque<DecodedFrame>,
63 /// Last decode error (if a scanned message failed to decode).
64 last_decode_error: Option<String>,
65 /// Count of messages that were scanned but failed to decode.
66 skipped_messages: usize,
67 /// Maximum buffer size in bytes.
68 max_buffer: usize,
69}
70
71impl Default for StreamingDecoder {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77#[wasm_bindgen]
78impl StreamingDecoder {
79 /// Create a new streaming decoder.
80 #[wasm_bindgen(constructor)]
81 pub fn new() -> Self {
82 Self {
83 buffer: Vec::new(),
84 consumed: 0,
85 global_metadata: None,
86 ready_frames: std::collections::VecDeque::new(),
87 last_decode_error: None,
88 skipped_messages: 0,
89 max_buffer: DEFAULT_MAX_BUFFER,
90 }
91 }
92
93 /// Feed a chunk of bytes into the decoder.
94 ///
95 /// Internally scans for complete messages and decodes each one,
96 /// emitting individual data objects as `DecodedFrame`s.
97 ///
98 /// Returns an error if the internal buffer exceeds `max_buffer` bytes.
99 /// Check `last_error()` after feeding to detect skipped corrupt messages.
100 pub fn feed(&mut self, chunk: &[u8]) -> Result<(), JsValue> {
101 let new_size = (self.buffer.len() - self.consumed)
102 .checked_add(chunk.len())
103 .ok_or_else(|| JsValue::from(js_sys::Error::new("buffer size overflow")))?;
104 if new_size > self.max_buffer {
105 return Err(JsValue::from(js_sys::Error::new(&format!(
106 "streaming buffer would grow to {} bytes (limit {})",
107 new_size, self.max_buffer
108 ))));
109 }
110 // Compact before extending so the actual Vec length (and WASM memory)
111 // stays close to the logical limit instead of growing by `consumed`.
112 if self.consumed > 0 {
113 self.buffer.drain(..self.consumed);
114 self.consumed = 0;
115 }
116 self.buffer.extend_from_slice(chunk);
117 self.last_decode_error = None; // clear previous error
118 self.try_decode_messages();
119 Ok(())
120 }
121
122 /// Pull the next decoded data object frame, or `undefined` if none ready.
123 ///
124 /// In JavaScript, `wasm-bindgen` maps Rust `None` to `undefined`.
125 /// Use a truthiness check: `while ((frame = decoder.next_frame()))`.
126 pub fn next_frame(&mut self) -> Option<DecodedFrame> {
127 self.ready_frames.pop_front()
128 }
129
130 /// Whether global metadata has been received from at least one message.
131 pub fn has_metadata(&self) -> bool {
132 self.global_metadata.is_some()
133 }
134
135 /// Get the global metadata from the most recently decoded message.
136 pub fn metadata(&self) -> Result<JsValue, JsValue> {
137 match &self.global_metadata {
138 // Use `metadata_to_js` so `version` is synthesised from
139 // the preamble for TypeScript ergonomics — the CBOR
140 // metadata frame no longer carries a `version` key in v3.
141 Some(meta) => crate::convert::metadata_to_js(meta),
142 None => Ok(JsValue::NULL),
143 }
144 }
145
146 /// Number of decoded frames ready to consume.
147 pub fn pending_count(&self) -> usize {
148 self.ready_frames.len()
149 }
150
151 /// Total bytes buffered but not yet decoded.
152 pub fn buffered_bytes(&self) -> usize {
153 self.buffer.len() - self.consumed
154 }
155
156 /// Error message from the last skipped (corrupt) message, or null.
157 ///
158 /// Cleared on each `feed()` call. If non-null, at least one message
159 /// found by the scanner failed to decode and was skipped.
160 pub fn last_error(&self) -> Option<String> {
161 self.last_decode_error.clone()
162 }
163
164 /// Total number of messages that were skipped due to decode errors
165 /// since the decoder was created or last reset.
166 pub fn skipped_count(&self) -> usize {
167 self.skipped_messages
168 }
169
170 /// Set the maximum internal buffer size in bytes (default: 256 MiB).
171 ///
172 /// The limit applies to the total *unprocessed* bytes (already-buffered
173 /// bytes plus the next incoming chunk). If adding a new chunk would
174 /// exceed this limit, `feed()` returns an error and the chunk is not
175 /// buffered. Reducing the limit below the current buffer size takes
176 /// effect on the next `feed()` call.
177 pub fn set_max_buffer(&mut self, max_bytes: usize) {
178 self.max_buffer = max_bytes;
179 }
180
181 /// Reset the decoder, clearing all buffered data and pending frames.
182 pub fn reset(&mut self) {
183 self.buffer.clear();
184 self.consumed = 0;
185 self.global_metadata = None;
186 self.ready_frames.clear();
187 self.last_decode_error = None;
188 self.skipped_messages = 0;
189 }
190}
191
192impl StreamingDecoder {
193 fn try_decode_messages(&mut self) {
194 debug_assert!(
195 self.consumed <= self.buffer.len(),
196 "consumed ({}) > buffer.len() ({})",
197 self.consumed,
198 self.buffer.len()
199 );
200
201 let remaining = &self.buffer[self.consumed..];
202 if remaining.is_empty() {
203 return;
204 }
205
206 // Scan once for ALL complete messages in the remaining buffer.
207 // This is O(n) instead of re-scanning after each decoded message.
208 let positions = core::scan(remaining);
209
210 let options = core::DecodeOptions {
211 ..Default::default()
212 };
213
214 // Track the furthest byte position we've successfully processed.
215 // `msg_end` values are relative to `remaining` (= buffer[consumed..]),
216 // so we record the max end seen and advance `consumed` by that amount
217 // once at the end.
218 let mut furthest = 0usize;
219
220 for (msg_start, msg_len) in positions {
221 let msg_end = msg_start + msg_len;
222
223 if msg_end > remaining.len() {
224 break; // Incomplete trailing message — wait for more data
225 }
226
227 let msg_bytes = &remaining[msg_start..msg_end];
228
229 match core::decode(msg_bytes, &options) {
230 Ok((metadata, objects)) => {
231 let base_entries = &metadata.base;
232
233 for (i, (descriptor, data)) in objects.into_iter().enumerate() {
234 let base_entry = base_entries.get(i).cloned();
235 self.ready_frames.push_back(DecodedFrame {
236 descriptor,
237 data,
238 base_entry,
239 });
240 }
241
242 self.global_metadata = Some(metadata);
243 }
244 Err(e) => {
245 // Record the error so JS callers can inspect it.
246 // We still advance past the bad message to avoid an
247 // infinite re-scan loop.
248 self.last_decode_error = Some(e.to_string());
249 self.skipped_messages += 1;
250 }
251 }
252
253 furthest = msg_end;
254 }
255
256 self.consumed += furthest;
257 }
258}
259
260// ── DecodedFrame ─────────────────────────────────────────────────────────────
261
262/// A single decoded data object from the streaming decoder.
263///
264/// Owns the decoded payload data. Use `data_f32()` etc. for zero-copy
265/// TypedArray views. Call `.free()` when done to release WASM memory.
266#[wasm_bindgen]
267pub struct DecodedFrame {
268 descriptor: core::DataObjectDescriptor,
269 data: Vec<u8>,
270 base_entry: Option<std::collections::BTreeMap<String, ciborium::Value>>,
271}
272
273#[wasm_bindgen]
274impl DecodedFrame {
275 /// Object descriptor (shape, dtype, encoding, etc.) as a JS object.
276 pub fn descriptor(&self) -> Result<JsValue, JsValue> {
277 to_js(&self.descriptor)
278 }
279
280 /// Per-object metadata entry from the base array (if available).
281 pub fn base_entry(&self) -> Result<JsValue, JsValue> {
282 match &self.base_entry {
283 Some(entry) => to_js(entry),
284 None => Ok(JsValue::NULL),
285 }
286 }
287
288 /// Zero-copy Float32Array view into decoded payload.
289 ///
290 /// **Warning**: invalidated if WASM memory grows.
291 pub fn data_f32(&self) -> Result<js_sys::Float32Array, JsValue> {
292 view_as_f32(&self.data)
293 }
294
295 /// Zero-copy Float64Array view.
296 pub fn data_f64(&self) -> Result<js_sys::Float64Array, JsValue> {
297 view_as_f64(&self.data)
298 }
299
300 /// Zero-copy Int32Array view.
301 pub fn data_i32(&self) -> Result<js_sys::Int32Array, JsValue> {
302 view_as_i32(&self.data)
303 }
304
305 /// Zero-copy Uint8Array view.
306 pub fn data_u8(&self) -> Result<js_sys::Uint8Array, JsValue> {
307 Ok(view_as_u8(&self.data))
308 }
309
310 /// Payload byte length.
311 pub fn byte_length(&self) -> usize {
312 self.data.len()
313 }
314}