use crate::convert::*;
use std::collections::BTreeMap;
use std::io::Write;
use tensogram::{self as core, TensogramError};
use wasm_bindgen::prelude::*;
type BufferedEncoder = core::streaming::StreamingEncoder<Vec<u8>>;
type StreamingCoreEncoder = core::streaming::StreamingEncoder<JsCallbackWriter>;
enum Inner {
Buffered(BufferedEncoder),
Streaming(StreamingCoreEncoder),
}
impl Inner {
fn write_preceder(
&mut self,
map: BTreeMap<String, ciborium::Value>,
) -> Result<(), TensogramError> {
match self {
Inner::Buffered(e) => e.write_preceder(map),
Inner::Streaming(e) => e.write_preceder(map),
}
}
fn write_object(
&mut self,
desc: &core::DataObjectDescriptor,
bytes: &[u8],
) -> Result<(), TensogramError> {
match self {
Inner::Buffered(e) => e.write_object(desc, bytes),
Inner::Streaming(e) => e.write_object(desc, bytes),
}
}
fn write_object_pre_encoded(
&mut self,
desc: &core::DataObjectDescriptor,
bytes: &[u8],
) -> Result<(), TensogramError> {
match self {
Inner::Buffered(e) => e.write_object_pre_encoded(desc, bytes),
Inner::Streaming(e) => e.write_object_pre_encoded(desc, bytes),
}
}
fn object_count(&self) -> usize {
match self {
Inner::Buffered(e) => e.object_count(),
Inner::Streaming(e) => e.object_count(),
}
}
fn bytes_written(&self) -> u64 {
match self {
Inner::Buffered(e) => e.bytes_written(),
Inner::Streaming(e) => e.bytes_written(),
}
}
}
struct JsCallbackWriter {
callback: js_sys::Function,
}
impl JsCallbackWriter {
fn new(callback: js_sys::Function) -> Self {
Self { callback }
}
}
impl Write for JsCallbackWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let chunk = js_sys::Uint8Array::from(buf);
let this = JsValue::NULL;
match self.callback.call1(&this, &chunk) {
Ok(_) => Ok(buf.len()),
Err(js_err) => {
let message = js_err.as_string().unwrap_or_else(|| format!("{js_err:?}"));
Err(std::io::Error::other(format!(
"streaming sink callback failed: {message}"
)))
}
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[wasm_bindgen]
pub struct StreamingEncoder {
inner: Option<Inner>,
}
#[wasm_bindgen]
impl StreamingEncoder {
#[wasm_bindgen(constructor)]
pub fn new(
metadata_js: JsValue,
hash: Option<bool>,
on_bytes: Option<js_sys::Function>,
reject_nan: Option<bool>,
reject_inf: Option<bool>,
) -> Result<StreamingEncoder, JsError> {
let metadata: core::GlobalMetadata =
serde_wasm_bindgen::from_value(metadata_js).map_err(js_err)?;
let options = build_encode_options(hash, reject_nan, reject_inf);
let inner = match on_bytes {
Some(cb) => {
let sink = JsCallbackWriter::new(cb);
Inner::Streaming(
StreamingCoreEncoder::new(sink, &metadata, &options).map_err(js_err)?,
)
}
None => Inner::Buffered(
BufferedEncoder::new(Vec::new(), &metadata, &options).map_err(js_err)?,
),
};
Ok(StreamingEncoder { inner: Some(inner) })
}
pub fn write_preceder(&mut self, metadata_js: JsValue) -> Result<(), JsError> {
let inner = self.inner.as_mut().ok_or_else(already_finished)?;
let map: BTreeMap<String, ciborium::Value> =
serde_wasm_bindgen::from_value(metadata_js).map_err(js_err)?;
inner.write_preceder(map).map_err(js_err)
}
pub fn write_object(&mut self, descriptor_js: JsValue, data: JsValue) -> Result<(), JsError> {
self.write_with(descriptor_js, data, |inner, desc, bytes| {
inner.write_object(desc, bytes)
})
}
pub fn write_object_pre_encoded(
&mut self,
descriptor_js: JsValue,
data: JsValue,
) -> Result<(), JsError> {
self.write_with(descriptor_js, data, |inner, desc, bytes| {
inner.write_object_pre_encoded(desc, bytes)
})
}
pub fn object_count(&self) -> Result<usize, JsError> {
Ok(self
.inner
.as_ref()
.ok_or_else(already_finished)?
.object_count())
}
pub fn bytes_written(&self) -> Result<f64, JsError> {
Ok(self
.inner
.as_ref()
.ok_or_else(already_finished)?
.bytes_written() as f64)
}
pub fn finish(&mut self) -> Result<js_sys::Uint8Array, JsError> {
let inner = self.inner.take().ok_or_else(already_finished)?;
match inner {
Inner::Buffered(e) => {
let buf = e.finish().map_err(js_err)?;
Ok(js_sys::Uint8Array::from(buf.as_slice()))
}
Inner::Streaming(e) => {
let _sink = e.finish().map_err(js_err)?;
Ok(js_sys::Uint8Array::new_with_length(0))
}
}
}
}
impl StreamingEncoder {
fn write_with(
&mut self,
descriptor_js: JsValue,
data: JsValue,
core_fn: impl FnOnce(
&mut Inner,
&core::DataObjectDescriptor,
&[u8],
) -> Result<(), TensogramError>,
) -> Result<(), JsError> {
let inner = self.inner.as_mut().ok_or_else(already_finished)?;
let desc: core::DataObjectDescriptor =
serde_wasm_bindgen::from_value(descriptor_js).map_err(js_err)?;
let bytes = typed_array_or_u8_to_bytes(&data)
.ok_or_else(|| JsError::new("data must be a TypedArray or Uint8Array"))?;
core_fn(inner, &desc, &bytes).map_err(js_err)
}
}
fn already_finished() -> JsError {
JsError::new("StreamingEncoder already finished")
}