use crate::writable::WritableStream;
use bytes::{Bytes, BytesMut};
use rong::function::This;
use rong::*;
use rong_abort::AbortSignal;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use tokio::io::AsyncRead;
use tokio::sync::mpsc;
type StreamChunk = Result<Bytes, String>;
type StreamReceiver = mpsc::Receiver<StreamChunk>;
type ControlSender = mpsc::UnboundedSender<StreamChunk>;
type ControlReceiver = mpsc::UnboundedReceiver<StreamChunk>;
type SharedReceiverSlot = Arc<StdMutex<Option<StreamReceiver>>>;
type SharedReceiver = Arc<StdMutex<Option<StreamReceiver>>>;
type SharedSender = Arc<StdMutex<Option<ControlSender>>>;
#[js_export]
pub struct ReadableStream {
pub(crate) rx_slot: SharedReceiverSlot,
}
#[js_export]
pub struct ReadableStreamDefaultReader {
slot: SharedReceiverSlot,
rx: SharedReceiver,
canceled: Arc<AtomicBool>,
}
#[js_export]
pub struct ReadableStreamDefaultController {
tx: SharedSender,
}
impl ReadableStream {
pub fn from_receiver(rx: StreamReceiver) -> Self {
Self {
rx_slot: Arc::new(StdMutex::new(Some(rx))),
}
}
pub fn from_async_reader<R>(mut reader: R, chunk_size: usize) -> Self
where
R: AsyncRead + Unpin + Send + 'static,
{
let (tx, rx) = mpsc::channel::<StreamChunk>(16);
rong::spawn(async move {
let mut buf = BytesMut::with_capacity(chunk_size.max(1));
loop {
buf.clear();
match tokio::io::AsyncReadExt::read_buf(&mut reader, &mut buf).await {
Ok(0) => break,
Ok(_) => {
if tx.send(Ok(buf.split().freeze())).await.is_err() {
break;
}
}
Err(e) => {
let _ = tx.send(Err(e.to_string())).await;
break;
}
}
}
});
Self::from_receiver(rx)
}
}
#[derive(FromJSObj, Default)]
struct PipeToOptions {
#[rename = "preventClose"]
prevent_close: Option<bool>,
#[rename = "preventAbort"]
prevent_abort: Option<bool>,
#[rename = "preventCancel"]
prevent_cancel: Option<bool>,
signal: Option<AbortSignal>,
}
#[js_class]
impl ReadableStream {
#[js_method(constructor)]
fn constructor(ctx: JSContext, underlying: function::Optional<JSValue>) -> JSResult<JSObject> {
let (tx, rx) = mpsc::channel::<StreamChunk>(16);
let (control_tx, mut control_rx): (ControlSender, ControlReceiver) =
mpsc::unbounded_channel();
let stream = Self::from_receiver(rx);
let obj = JSReadableStream::new(&ctx, stream)?.into_object();
let tx_forwards = tx.clone();
rong::spawn(async move {
while let Some(item) = control_rx.recv().await {
if tx_forwards.send(item).await.is_err() {
break;
}
}
});
if let Some(v) = underlying.0
&& let Some(src) = v.into_object()
{
let controller = ReadableStreamDefaultController {
tx: Arc::new(StdMutex::new(Some(control_tx))),
};
if let Ok(start) = src.get::<_, JSFunc>("start") {
start.call::<_, JSValue>(Some(src.clone()), (controller,))?;
}
}
Ok(obj)
}
#[js_method(rename = "getReader")]
fn get_reader(&self) -> JSResult<ReadableStreamDefaultReader> {
let mut guard = self
.rx_slot
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
match guard.take() {
Some(rx) => Ok(ReadableStreamDefaultReader {
slot: self.rx_slot.clone(),
rx: Arc::new(StdMutex::new(Some(rx))),
canceled: Arc::new(AtomicBool::new(false)),
}),
None => Err(
HostError::new(rong::error::E_INVALID_STATE, "ReadableStream is locked")
.with_name("TypeError")
.into(),
),
}
}
#[js_method]
fn cancel(&self) -> JSResult<()> {
let mut guard = self
.rx_slot
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
*guard = None;
Ok(())
}
#[js_method(rename = "pipeTo")]
async fn pipe_to(
&self,
ctx: JSContext,
dest: WritableStream,
options: function::Optional<PipeToOptions>,
) -> JSResult<()> {
let opts = options.0.unwrap_or_default();
let prevent_close = opts.prevent_close.unwrap_or(false);
let prevent_abort = opts.prevent_abort.unwrap_or(false);
let prevent_cancel = opts.prevent_cancel.unwrap_or(false);
if let Some(sig) = &opts.signal
&& sig.aborted()
{
return Err(RongJSError::from_thrown_value(sig.get_reason()));
}
let reader = self.get_reader()?;
let mut writer = dest.get_writer()?;
if let Some(mut rx) = readable_stream_take_receiver(self)
&& let Some((tx, done_rx)) =
crate::writable::WritableStreamDefaultWriter::take_channel(&mut writer)
{
let mut abort_rx = opts.signal.as_ref().map(|s| s.subscribe());
let mut pipe_err: Option<RongJSError> = None;
loop {
if let Some(arx) = &mut abort_rx {
tokio::select! {
item = rx.recv() => {
match item {
Some(Ok(bytes)) => {
if tx.send(bytes).await.is_err() { break; }
}
Some(Err(e)) => {
pipe_err = Some(HostError::new(rong::error::E_STREAM, e).into());
break;
}
None => break,
}
}
reason = arx.recv() => {
pipe_err = Some(RongJSError::from_thrown_value(reason));
break;
}
}
} else {
match rx.recv().await {
Some(Ok(bytes)) => {
if tx.send(bytes).await.is_err() {
break;
}
}
Some(Err(e)) => {
pipe_err = Some(HostError::new(rong::error::E_STREAM, e).into());
break;
}
None => break,
}
}
}
if pipe_err.is_none() && !prevent_close {
drop(tx);
if let Some(rx) = done_rx {
let _ = rx.await;
}
}
let _ = reader.release_lock();
let _ = writer.release_lock().await;
if let Some(e) = pipe_err {
if !prevent_abort {
let _ = writer.abort().await;
}
return Err(e);
}
return Ok(());
}
let mut pipe_err: Option<RongJSError> = None;
let mut abort_rx = opts.signal.as_ref().map(|s| s.subscribe());
loop {
let res_obj = if let Some(rx) = &mut abort_rx {
tokio::select! {
r = reader.read(ctx.clone()) => { Some(r?) }
reason = rx.recv() => { pipe_err = Some(RongJSError::from_thrown_value(reason)); None }
}
} else {
Some(reader.read(ctx.clone()).await?)
};
if let Some(r) = res_obj {
let done = r.get::<_, bool>("done")?;
if done {
break;
}
let chunk = r.get::<_, JSValue>("value")?;
if let Some(sig) = &opts.signal
&& sig.aborted()
{
pipe_err = Some(RongJSError::from_thrown_value(sig.get_reason()));
break;
}
if let Err(e) = writer.write(chunk).await {
if !prevent_cancel {
let _ = reader.cancel().await;
}
pipe_err = Some(e);
break;
}
continue;
} else {
break;
}
}
if pipe_err.is_none() && !prevent_close {
let _ = writer.close().await;
}
let _ = reader.release_lock();
let _ = writer.release_lock().await;
if let Some(e) = pipe_err {
if !prevent_abort {
let _ = writer.abort().await;
}
return Err(e);
}
Ok(())
}
#[js_method]
fn tee(&self, ctx: JSContext) -> JSResult<JSArray> {
let rx = match readable_stream_take_receiver(self) {
Some(rx) => rx,
None => {
return Err(HostError::new(
rong::error::E_INVALID_STATE,
"ReadableStream is locked",
)
.with_name("TypeError")
.into());
}
};
let (tx1, rx1) = mpsc::channel::<StreamChunk>(16);
let (tx2, rx2) = mpsc::channel::<StreamChunk>(16);
rong::spawn(async move {
let mut src = rx;
let mut tx1 = Some(tx1);
let mut tx2 = Some(tx2);
while let Some(item) = src.recv().await {
match item {
Ok(bytes) => {
if let Some(t1) = tx1.as_mut()
&& t1.send(Ok(bytes.clone())).await.is_err()
{
tx1 = None;
}
if let Some(t2) = tx2.as_mut()
&& t2.send(Ok(bytes)).await.is_err()
{
tx2 = None;
}
if tx1.is_none() && tx2.is_none() {
break;
}
}
Err(e) => {
if let Some(t1) = tx1.as_mut() {
let _ = t1.send(Err(e.clone())).await;
}
if let Some(t2) = tx2.as_mut() {
let _ = t2.send(Err(e)).await;
}
break;
}
}
}
});
let b1 = JSReadableStream::from_receiver(&ctx, rx1)?.into_object();
let b2 = JSReadableStream::from_receiver(&ctx, rx2)?.into_object();
let arr = JSArray::new(&ctx)?;
arr.set(0, b1)?;
arr.set(1, b2)?;
Ok(arr)
}
}
#[js_class]
impl ReadableStreamDefaultReader {
#[js_method(constructor)]
fn new() -> JSResult<Self> {
Err(
HostError::new(rong::error::E_ILLEGAL_CONSTRUCTOR, "Illegal constructor")
.with_name("TypeError")
.into(),
)
}
#[js_method]
async fn read(&self, ctx: JSContext) -> JSResult<JSObject> {
let mut rx_opt = {
let mut slot = self
.rx
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Reader is poisoned"))?;
slot.take()
};
let mut rx = match rx_opt.take() {
Some(rx) => rx,
None => {
let out = JSObject::new(&ctx);
out.set("done", true)?;
return Ok(out);
}
};
let next = rx.recv().await;
{
let mut slot = self
.rx
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Reader is poisoned"))?;
if self.canceled.load(Ordering::Acquire) {
*slot = None;
} else {
*slot = Some(rx);
}
}
if self.canceled.load(Ordering::Acquire) {
let out = JSObject::new(&ctx);
out.set("done", true)?;
return Ok(out);
}
match next {
Some(Ok(bytes)) => {
let out = JSObject::new(&ctx);
out.set("done", false)?;
let ab = JSArrayBuffer::<u8>::from_bytes_owned(&ctx, bytes.to_vec())?;
out.set("value", ab)?;
Ok(out)
}
Some(Err(e)) => Err(HostError::new(rong::error::E_STREAM, e).into()),
None => {
let out = JSObject::new(&ctx);
out.set("done", true)?;
Ok(out)
}
}
}
#[js_method(rename = "releaseLock")]
fn release_lock(&self) -> JSResult<()> {
let rx_opt = {
let mut slot = self
.rx
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Reader is poisoned"))?;
slot.take()
};
if let Some(rx) = rx_opt
&& !self.canceled.load(Ordering::Acquire)
{
let mut guard = self
.slot
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
if guard.is_none() {
*guard = Some(rx);
}
}
Ok(())
}
#[js_method]
async fn cancel(&self) -> JSResult<()> {
self.canceled.store(true, Ordering::Release);
let mut slot = self
.rx
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Reader is poisoned"))?;
*slot = None; Ok(())
}
}
#[js_class]
impl ReadableStreamDefaultController {
#[js_method(constructor)]
fn new() -> JSResult<Self> {
Err(
HostError::new(rong::error::E_ILLEGAL_CONSTRUCTOR, "Illegal constructor")
.with_name("TypeError")
.into(),
)
}
#[js_method]
fn enqueue(&mut self, chunk: JSValue) -> JSResult<()> {
let bytes: Bytes = if let Some(obj) = chunk.clone().into_object() {
if let Some(ta) = JSTypedArray::from_object(obj.clone()) {
if let Some(b) = ta.as_bytes() {
Bytes::copy_from_slice(b)
} else {
return Err(
HostError::new(rong::error::E_INVALID_ARG, "Invalid TypedArray")
.with_name("TypeError")
.into(),
);
}
} else if let Some(ab) = JSArrayBuffer::<u8>::from_object(obj) {
if let Some(b) = ab.as_bytes() {
Bytes::copy_from_slice(b)
} else {
return Err(
HostError::new(rong::error::E_INVALID_ARG, "Invalid ArrayBuffer")
.with_name("TypeError")
.into(),
);
}
} else {
return Err(HostError::new(
rong::error::E_INVALID_ARG,
"enqueue expects Uint8Array or ArrayBuffer",
)
.with_name("TypeError")
.into());
}
} else {
return Err(HostError::new(
rong::error::E_INVALID_ARG,
"enqueue expects a TypedArray or ArrayBuffer",
)
.with_name("TypeError")
.into());
};
let mut slot = self
.tx
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
if let Some(tx) = slot.as_mut() {
tx.send(Ok(bytes)).map_err(|_| {
HostError::new(rong::error::E_INVALID_STATE, "ReadableStream is closed").into()
})
} else {
Err(HostError::new(rong::error::E_INVALID_STATE, "ReadableStream is closed").into())
}
}
#[js_method]
fn close(&mut self) -> JSResult<()> {
let mut slot = self
.tx
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
*slot = None;
Ok(())
}
#[js_method]
fn error(&mut self, reason: function::Optional<JSValue>) -> JSResult<()> {
let msg = reason
.0
.map(|v| v.to_string())
.unwrap_or_else(|| "ReadableStream error".to_string());
let mut slot = self
.tx
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
if let Some(tx) = slot.as_mut() {
let _ = tx.send(Err(msg));
}
*slot = None;
Ok(())
}
}
pub fn readable_stream_take_receiver(rs: &ReadableStream) -> Option<StreamReceiver> {
let mut guard = rs.rx_slot.lock().ok()?;
guard.take()
}
pub fn readable_stream_is_locked(rs: &ReadableStream) -> bool {
rs.rx_slot.lock().map(|g| g.is_none()).unwrap_or(true)
}
fn install_instance_async_iter(ctx: &JSContext, obj: &JSObject) -> JSResult<()> {
let next_fn = JSFunc::new(
ctx,
move |ctx: JSContext, this: This<JSObject>| async move {
let rx_slot = match (*this).borrow::<ReadableStream>() {
Ok(rs) => rs.rx_slot.clone(),
Err(_) => {
return Err(HostError::new(rong::error::E_TYPE, "Not ReadableStream")
.with_name("TypeError")
.into());
}
};
let mut rx = {
let mut guard = rx_slot
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
guard.take()
};
let Some(mut rx) = rx.take() else {
let out = JSObject::new(&ctx);
out.set("done", true).ok();
out.set("value", JSValue::undefined(&ctx)).ok();
return Ok(out);
};
let item = rx.recv().await;
match item {
Some(Ok(bytes)) => {
if let Ok(mut slot) = rx_slot.lock()
&& slot.is_none()
{
*slot = Some(rx);
}
let out = JSObject::new(&ctx);
out.set("done", false).ok();
if let Ok(ab) = JSArrayBuffer::<u8>::from_bytes(&ctx, &bytes) {
out.set("value", ab).ok();
}
Ok(out)
}
Some(Err(e)) => Err(HostError::new(rong::error::E_STREAM, e).into()),
None => {
let out = JSObject::new(&ctx);
out.set("done", true).ok();
out.set("value", JSValue::undefined(&ctx)).ok();
Ok(out)
}
}
},
)?;
obj.set("next", next_fn)?;
let symbol = ctx
.global()
.get::<_, JSObject>("Symbol")?
.get::<_, JSSymbol>("asyncIterator")?;
let return_this = JSFunc::new(ctx, move |this: This<JSObject>| (*this).clone())?;
obj.set(symbol, return_this)?;
Ok(())
}
#[derive(Clone)]
pub struct JSReadableStream(pub JSObject);
impl JSReadableStream {
pub fn new(ctx: &JSContext, stream: ReadableStream) -> JSResult<Self> {
let obj = rong::Class::get::<ReadableStream>(ctx)?.instance(stream);
install_instance_async_iter(ctx, &obj)?;
Ok(Self(obj))
}
pub fn from_receiver(ctx: &JSContext, rx: StreamReceiver) -> JSResult<Self> {
let stream = ReadableStream::from_receiver(rx);
Self::new(ctx, stream)
}
pub fn from_shared_receiver(ctx: &JSContext, slot: SharedReceiverSlot) -> JSResult<Self> {
let stream = ReadableStream { rx_slot: slot };
Self::new(ctx, stream)
}
pub fn from_async_reader<R>(ctx: &JSContext, reader: R, chunk_size: usize) -> JSResult<Self>
where
R: AsyncRead + Unpin + Send + 'static,
{
let stream = ReadableStream::from_async_reader(reader, chunk_size);
Self::new(ctx, stream)
}
pub fn into_object(self) -> JSObject {
self.0
}
pub fn object(&self) -> JSObject {
self.0.clone()
}
}
pub fn init(ctx: &JSContext) -> JSResult<()> {
ctx.register_class::<ReadableStream>()?;
ctx.register_class::<ReadableStreamDefaultReader>()?;
ctx.register_class::<ReadableStreamDefaultController>()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use rong_test::*;
#[test]
fn test_stream_js() {
async_run!(|ctx: JSContext| async move {
rong_assert::init(&ctx)?;
rong_console::init(&ctx)?;
rong_encoding::init(&ctx)?;
rong_timer::init(&ctx)?;
crate::init(&ctx)?;
let passed = UnitJSRunner::load_script(&ctx, "stream.js")
.await?
.run()
.await?;
assert!(passed);
Ok(())
});
}
}