use crate::convert::*;
use tensogram_core as core;
use wasm_bindgen::prelude::*;
const DEFAULT_MAX_BUFFER: usize = 256 * 1024 * 1024;
#[wasm_bindgen]
pub struct StreamingDecoder {
buffer: Vec<u8>,
consumed: usize,
global_metadata: Option<core::GlobalMetadata>,
ready_frames: std::collections::VecDeque<DecodedFrame>,
last_decode_error: Option<String>,
skipped_messages: usize,
max_buffer: usize,
}
#[wasm_bindgen]
impl StreamingDecoder {
#[wasm_bindgen(constructor)]
pub fn new() -> Self {
Self {
buffer: Vec::new(),
consumed: 0,
global_metadata: None,
ready_frames: std::collections::VecDeque::new(),
last_decode_error: None,
skipped_messages: 0,
max_buffer: DEFAULT_MAX_BUFFER,
}
}
pub fn feed(&mut self, chunk: &[u8]) -> Result<(), JsError> {
let new_size = (self.buffer.len() - self.consumed)
.checked_add(chunk.len())
.ok_or_else(|| JsError::new("buffer size overflow"))?;
if new_size > self.max_buffer {
return Err(JsError::new(&format!(
"streaming buffer would grow to {} bytes (limit {})",
new_size, self.max_buffer
)));
}
if self.consumed > 0 {
self.buffer.drain(..self.consumed);
self.consumed = 0;
}
self.buffer.extend_from_slice(chunk);
self.last_decode_error = None; self.try_decode_messages();
Ok(())
}
pub fn next_frame(&mut self) -> Option<DecodedFrame> {
self.ready_frames.pop_front()
}
pub fn has_metadata(&self) -> bool {
self.global_metadata.is_some()
}
pub fn metadata(&self) -> Result<JsValue, JsError> {
match &self.global_metadata {
Some(meta) => to_js(meta),
None => Ok(JsValue::NULL),
}
}
pub fn pending_count(&self) -> usize {
self.ready_frames.len()
}
pub fn buffered_bytes(&self) -> usize {
self.buffer.len() - self.consumed
}
pub fn last_error(&self) -> Option<String> {
self.last_decode_error.clone()
}
pub fn skipped_count(&self) -> usize {
self.skipped_messages
}
pub fn set_max_buffer(&mut self, max_bytes: usize) {
self.max_buffer = max_bytes;
}
pub fn reset(&mut self) {
self.buffer.clear();
self.consumed = 0;
self.global_metadata = None;
self.ready_frames.clear();
self.last_decode_error = None;
self.skipped_messages = 0;
}
}
impl StreamingDecoder {
fn try_decode_messages(&mut self) {
debug_assert!(
self.consumed <= self.buffer.len(),
"consumed ({}) > buffer.len() ({})",
self.consumed,
self.buffer.len()
);
let remaining = &self.buffer[self.consumed..];
if remaining.is_empty() {
return;
}
let positions = core::scan(remaining);
let options = core::DecodeOptions {
verify_hash: false,
..Default::default()
};
let mut furthest = 0usize;
for (msg_start, msg_len) in positions {
let msg_end = msg_start + msg_len;
if msg_end > remaining.len() {
break; }
let msg_bytes = &remaining[msg_start..msg_end];
match core::decode(msg_bytes, &options) {
Ok((metadata, objects)) => {
let base_entries = &metadata.base;
for (i, (descriptor, data)) in objects.into_iter().enumerate() {
let base_entry = base_entries.get(i).cloned();
self.ready_frames.push_back(DecodedFrame {
descriptor,
data,
base_entry,
});
}
self.global_metadata = Some(metadata);
}
Err(e) => {
self.last_decode_error = Some(e.to_string());
self.skipped_messages += 1;
}
}
furthest = msg_end;
}
self.consumed += furthest;
}
}
#[wasm_bindgen]
pub struct DecodedFrame {
descriptor: core::DataObjectDescriptor,
data: Vec<u8>,
base_entry: Option<std::collections::BTreeMap<String, ciborium::Value>>,
}
#[wasm_bindgen]
impl DecodedFrame {
pub fn descriptor(&self) -> Result<JsValue, JsError> {
to_js(&self.descriptor)
}
pub fn base_entry(&self) -> Result<JsValue, JsError> {
match &self.base_entry {
Some(entry) => to_js(entry),
None => Ok(JsValue::NULL),
}
}
pub fn data_f32(&self) -> Result<js_sys::Float32Array, JsError> {
view_as_f32(&self.data)
}
pub fn data_f64(&self) -> Result<js_sys::Float64Array, JsError> {
view_as_f64(&self.data)
}
pub fn data_i32(&self) -> Result<js_sys::Int32Array, JsError> {
view_as_i32(&self.data)
}
pub fn data_u8(&self) -> Result<js_sys::Uint8Array, JsError> {
Ok(view_as_u8(&self.data))
}
pub fn byte_length(&self) -> usize {
self.data.len()
}
}