use bytes::Bytes;
use rong::*;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc, Mutex as StdMutex};
use tokio::io::AsyncWrite;
use tokio::sync::{Mutex, mpsc, oneshot};
type ByteSender = mpsc::Sender<Bytes>;
type DoneReceiver = oneshot::Receiver<Result<(), String>>;
type SharedSenderSlot = Arc<StdMutex<Option<ByteSender>>>;
type SharedDoneSlot = Arc<StdMutex<Option<DoneReceiver>>>;
type SharedErrorSlot = Arc<StdMutex<Option<String>>>;
type SinkSlot = Rc<StdMutex<Option<JSObject>>>;
type WriterChannel = (ByteSender, Option<DoneReceiver>);
#[js_export]
pub struct WritableStream {
pub(crate) tx_slot: SharedSenderSlot,
pub(crate) done_slot: SharedDoneSlot,
pub(crate) err_slot: SharedErrorSlot,
sink_slot: SinkSlot,
}
#[js_export]
pub struct WritableStreamDefaultWriter {
slot: SharedSenderSlot,
tx: Arc<Mutex<Option<ByteSender>>>,
done_rx: Arc<StdMutex<Option<DoneReceiver>>>,
done_slot_ref: SharedDoneSlot,
err_slot: SharedErrorSlot,
sink_slot_ref: SinkSlot,
sink_obj: RefCell<Option<JSObject>>,
}
impl WritableStream {
pub fn to_sender(tx: ByteSender) -> Self {
Self {
tx_slot: Arc::new(StdMutex::new(Some(tx))),
done_slot: Arc::new(StdMutex::new(None)),
err_slot: Arc::new(StdMutex::new(None)),
sink_slot: Rc::new(StdMutex::new(None)),
}
}
pub fn to_async_writer<W>(mut writer: W) -> Self
where
W: AsyncWrite + Unpin + Send + 'static,
{
let (tx, mut rx) = mpsc::channel::<Bytes>(16);
let (done_tx, done_rx) = oneshot::channel::<Result<(), String>>();
let err_slot: SharedErrorSlot = Arc::new(StdMutex::new(None));
let err_slot_for_task = err_slot.clone();
tokio::task::spawn(async move {
let mut error: Option<String> = None;
while let Some(chunk) = rx.recv().await {
if let Err(e) = tokio::io::AsyncWriteExt::write_all(&mut writer, &chunk).await {
error = Some(e.to_string());
break;
}
}
if let Err(e) = tokio::io::AsyncWriteExt::flush(&mut writer).await
&& error.is_none()
{
error = Some(e.to_string());
}
if let Some(e) = error.as_ref()
&& let Ok(mut g) = err_slot_for_task.lock()
{
*g = Some(e.clone());
}
let _ = done_tx.send(match error {
Some(e) => Err(e),
None => Ok(()),
});
});
Self {
tx_slot: Arc::new(StdMutex::new(Some(tx))),
done_slot: Arc::new(StdMutex::new(Some(done_rx))),
err_slot,
sink_slot: Rc::new(StdMutex::new(None)),
}
}
}
#[js_class]
impl WritableStream {
#[js_method(constructor)]
fn new(underlying: function::Optional<JSValue>) -> JSResult<Self> {
let sink = match underlying.0 {
Some(v) => v.into_object(),
None => None,
};
Ok(Self {
tx_slot: Arc::new(StdMutex::new(None)),
done_slot: Arc::new(StdMutex::new(None)),
err_slot: Arc::new(StdMutex::new(None)),
sink_slot: Rc::new(StdMutex::new(sink)),
})
}
#[js_method(rename = "getWriter")]
pub(crate) fn get_writer(&self) -> JSResult<WritableStreamDefaultWriter> {
let mut guard = self
.tx_slot
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
match guard.take() {
Some(tx) => {
let done_rx = {
let mut d = self.done_slot.lock().map_err(|_| {
HostError::new(rong::error::E_INTERNAL, "Stream is poisoned")
})?;
d.take()
};
Ok(WritableStreamDefaultWriter {
slot: self.tx_slot.clone(),
tx: Arc::new(Mutex::new(Some(tx))),
done_rx: Arc::new(StdMutex::new(done_rx)),
done_slot_ref: self.done_slot.clone(),
err_slot: self.err_slot.clone(),
sink_slot_ref: self.sink_slot.clone(),
sink_obj: RefCell::new(None),
})
}
None => {
let mut sink_guard = self
.sink_slot
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
if sink_guard.is_none() {
return Err(HostError::new(
rong::error::E_INVALID_STATE,
"WritableStream is locked",
)
.with_name("TypeError")
.into());
}
let obj = sink_guard.take().ok_or_else(|| {
HostError::new(rong::error::E_INTERNAL, "WritableStream sink missing")
})?;
Ok(WritableStreamDefaultWriter {
slot: self.tx_slot.clone(),
tx: Arc::new(Mutex::new(None)),
done_rx: Arc::new(StdMutex::new(None)),
done_slot_ref: self.done_slot.clone(),
err_slot: self.err_slot.clone(),
sink_slot_ref: self.sink_slot.clone(),
sink_obj: RefCell::new(Some(obj)),
})
}
}
}
#[js_method]
fn abort(&self) -> JSResult<()> {
let mut guard = self
.tx_slot
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
*guard = None;
Ok(())
}
}
#[js_class]
impl WritableStreamDefaultWriter {
#[js_method(constructor)]
fn new() -> JSResult<Self> {
Err(
HostError::new(rong::error::E_ILLEGAL_CONSTRUCTOR, "Illegal constructor")
.with_name("TypeError")
.into(),
)
}
#[js_method]
pub(crate) async fn write(&self, chunk: JSValue) -> JSResult<()> {
if let Ok(err_guard) = self.err_slot.lock()
&& let Some(e) = err_guard.as_ref()
{
return Err(HostError::new(
rong::error::E_STREAM,
format!("WritableStream error: {}", e),
)
.into());
}
let sink_obj = self.sink_obj.borrow().clone();
if let Some(sink_obj) = sink_obj
&& let Ok(write_fn) = sink_obj.get::<_, JSFunc>("write")
{
let _r: JSValue = write_fn.call_async(None, (chunk,)).await?;
return Ok(());
}
let bytes: Bytes = if let Some(obj) = chunk.clone().into_object() {
if let Some(ta) = JSTypedArray::from_object(obj.clone()) {
if let Some(b) = ta.as_bytes() {
Bytes::copy_from_slice(b)
} else {
return Err(
HostError::new(rong::error::E_INVALID_ARG, "Invalid TypedArray")
.with_name("TypeError")
.into(),
);
}
} else if let Some(ab) = JSArrayBuffer::<u8>::from_object(obj) {
if let Some(b) = ab.as_bytes() {
Bytes::copy_from_slice(b)
} else {
return Err(
HostError::new(rong::error::E_INVALID_ARG, "Invalid ArrayBuffer")
.with_name("TypeError")
.into(),
);
}
} else {
return Err(HostError::new(
rong::error::E_INVALID_ARG,
"write expects Uint8Array or ArrayBuffer",
)
.with_name("TypeError")
.into());
}
} else {
return Err(HostError::new(
rong::error::E_INVALID_ARG,
"write expects a TypedArray or ArrayBuffer",
)
.with_name("TypeError")
.into());
};
let mut slot = self.tx.lock().await;
match slot.as_mut() {
Some(tx) => tx.send(bytes).await.map_err(|_| {
HostError::new(rong::error::E_INVALID_STATE, "WritableStream closed").into()
}),
None => Err(HostError::new(
rong::error::E_INVALID_STATE,
"Writer not acquired or closed",
)
.into()),
}
}
#[js_method]
pub(crate) async fn close(&self) -> JSResult<()> {
let sink_obj = self.sink_obj.borrow().clone();
if let Some(sink_obj) = sink_obj
&& let Ok(close_fn) = sink_obj.get::<_, JSFunc>("close")
{
let _r: JSValue = close_fn.call_async(None, ()).await?;
}
{
let mut slot = self.tx.lock().await;
*slot = None;
}
let rx_opt = {
let mut d = self
.done_rx
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
d.take()
};
if let Some(rx) = rx_opt {
match rx.await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(HostError::new(rong::error::E_STREAM, e).into()),
Err(_) => Ok(()),
}
} else {
Ok(())
}
}
#[js_method]
pub(crate) async fn abort(&self) -> JSResult<()> {
let mut slot = self.tx.lock().await;
*slot = None;
Ok(())
}
#[js_method(rename = "releaseLock")]
pub(crate) async fn release_lock(&self) -> JSResult<()> {
let sink_opt = self.sink_obj.borrow_mut().take();
let tx_opt = {
let mut slot = self.tx.lock().await;
slot.take()
};
if let Some(tx) = tx_opt {
let mut guard = self
.slot
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
if guard.is_none() {
*guard = Some(tx);
}
}
let done_opt = {
let mut d = self
.done_rx
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
d.take()
};
if let Some(done) = done_opt {
let mut g = self
.done_slot_ref
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
if g.is_none() {
*g = Some(done);
}
}
if let Some(sink_obj) = sink_opt {
let mut slot = self
.sink_slot_ref
.lock()
.map_err(|_| HostError::new(rong::error::E_INTERNAL, "Stream is poisoned"))?;
if slot.is_none() {
*slot = Some(sink_obj);
}
}
Ok(())
}
}
impl WritableStreamDefaultWriter {
pub(crate) fn take_channel(&mut self) -> Option<WriterChannel> {
if self.sink_obj.borrow().is_some() {
return None;
}
let tx_opt = futures::executor::block_on(async { self.tx.lock().await.take() });
if let Some(tx) = tx_opt {
let done_opt = self.done_rx.lock().ok().and_then(|mut g| g.take());
Some((tx, done_opt))
} else {
None
}
}
}
pub fn writable_stream_to_sender(tx: mpsc::Sender<Bytes>) -> WritableStream {
WritableStream::to_sender(tx)
}
pub fn writable_stream_to_sender_with_done(
tx: ByteSender,
done_rx: DoneReceiver,
) -> WritableStream {
WritableStream {
tx_slot: Arc::new(StdMutex::new(Some(tx))),
done_slot: Arc::new(StdMutex::new(Some(done_rx))),
err_slot: Arc::new(StdMutex::new(None)),
sink_slot: Rc::new(StdMutex::new(None)),
}
}
pub fn writable_stream_to_async_write<W>(writer: W) -> WritableStream
where
W: AsyncWrite + Unpin + Send + 'static,
{
WritableStream::to_async_writer(writer)
}
pub fn init(ctx: &JSContext) -> JSResult<()> {
ctx.register_class::<WritableStream>()?;
ctx.register_class::<WritableStreamDefaultWriter>()?;
Ok(())
}
#[derive(Clone)]
pub struct JSWritableStream(pub JSObject);
impl JSWritableStream {
pub fn new(ctx: &JSContext, stream: WritableStream) -> JSResult<Self> {
let obj = rong::Class::get::<WritableStream>(ctx)?.instance(stream);
Ok(Self(obj))
}
pub fn from_async_writer<W>(ctx: &JSContext, writer: W) -> JSResult<Self>
where
W: AsyncWrite + Unpin + Send + 'static,
{
let stream = WritableStream::to_async_writer(writer);
Self::new(ctx, stream)
}
pub fn into_object(self) -> JSObject {
self.0
}
pub fn object(&self) -> JSObject {
self.0.clone()
}
}