use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::rc::Rc;
use std::sync::Arc;
use ferridriver::http_client::HttpStreamResponse;
use rquickjs::atom::PredefinedAtom;
use rquickjs::function::{Opt, This};
use rquickjs::{Class, Ctx, Object, TypedArray, Value, class::Trace};
use tokio::sync::Mutex as AsyncMutex;
#[derive(Default)]
struct BufState {
chunks: VecDeque<Vec<u8>>,
closed: bool,
errored: Option<String>,
}
#[derive(Clone)]
enum StreamSource {
Buffered(Rc<RefCell<BufState>>),
Net(Arc<AsyncMutex<Option<HttpStreamResponse>>>),
}
type Locked = Rc<Cell<bool>>;
fn chunk_bytes(v: &Value<'_>) -> Vec<u8> {
if let Some(s) = v.as_string().and_then(|s| s.to_string().ok()) {
return s.into_bytes();
}
if let Ok(ta) = TypedArray::<u8>::from_value(v.clone()) {
let b: &[u8] = ta.as_ref();
return b.to_vec();
}
if let Some(ab) = rquickjs::ArrayBuffer::from_value(v.clone())
&& let Some(b) = ab.as_bytes()
{
return b.to_vec();
}
Vec::new()
}
fn result_obj<'js>(ctx: &Ctx<'js>, value: Value<'js>, done: bool) -> rquickjs::Result<Object<'js>> {
let o = Object::new(ctx.clone())?;
o.set("value", value)?;
o.set("done", done)?;
Ok(o)
}
fn chunk_result<'js>(ctx: &Ctx<'js>, bytes: Vec<u8>) -> rquickjs::Result<Object<'js>> {
let ta = TypedArray::<u8>::new(ctx.clone(), bytes)?;
result_obj(ctx, ta.into_value(), false)
}
async fn pull<'js>(ctx: &Ctx<'js>, source: &StreamSource) -> rquickjs::Result<Object<'js>> {
match source {
StreamSource::Buffered(state) => {
std::future::ready(()).await;
let (chunk, errored) = {
let mut s = state.borrow_mut();
if let Some(e) = s.errored.clone() {
(None, Some(e))
} else {
(s.chunks.pop_front(), None)
}
};
if let Some(e) = errored {
return Err(rquickjs::Exception::throw_type(ctx, &e));
}
match chunk {
Some(b) => chunk_result(ctx, b),
None => result_obj(ctx, Value::new_undefined(ctx.clone()), true),
}
},
StreamSource::Net(resp) => {
let mut guard = resp.lock().await;
let Some(r) = guard.as_mut() else {
return result_obj(ctx, Value::new_undefined(ctx.clone()), true);
};
match r.chunk().await {
Ok(Some(bytes)) => chunk_result(ctx, bytes.to_vec()),
Ok(None) => {
*guard = None;
result_obj(ctx, Value::new_undefined(ctx.clone()), true)
},
Err(e) => {
*guard = None;
Err(rquickjs::Exception::throw_type(ctx, &e.to_string()))
},
}
},
}
}
fn cancel_source(source: &StreamSource) {
match source {
StreamSource::Buffered(state) => {
let mut s = state.borrow_mut();
s.chunks.clear();
s.closed = true;
},
StreamSource::Net(resp) => {
if let Ok(mut g) = resp.try_lock() {
*g = None;
}
},
}
}
#[derive(Trace)]
#[rquickjs::class(rename = "ReadableStreamDefaultController")]
pub struct ReadableStreamDefaultControllerJs {
#[qjs(skip_trace)]
buf: Rc<RefCell<BufState>>,
}
#[allow(unsafe_code)]
unsafe impl rquickjs::JsLifetime<'_> for ReadableStreamDefaultControllerJs {
type Changed<'to> = ReadableStreamDefaultControllerJs;
}
#[rquickjs::methods(rename_all = "camelCase")]
impl ReadableStreamDefaultControllerJs {
#[qjs(constructor)]
pub fn new(ctx: Ctx<'_>) -> rquickjs::Result<Self> {
Err(rquickjs::Exception::throw_type(&ctx, "Illegal constructor"))
}
#[qjs(rename = "enqueue")]
pub fn enqueue(&self, chunk: Value<'_>) {
self.buf.borrow_mut().chunks.push_back(chunk_bytes(&chunk));
}
#[qjs(rename = "close")]
pub fn close(&self) {
self.buf.borrow_mut().closed = true;
}
#[qjs(rename = "error")]
pub fn error(&self, reason: Opt<Value<'_>>) {
let msg = reason
.0
.and_then(|v| {
v.as_string()
.and_then(|s| s.to_string().ok())
.or_else(|| v.as_object().and_then(|o| o.get::<_, String>("message").ok()))
})
.unwrap_or_else(|| "stream errored".to_string());
let mut s = self.buf.borrow_mut();
s.errored = Some(msg);
s.closed = true;
}
}
#[derive(Trace)]
#[rquickjs::class(rename = "ReadableStreamDefaultReader")]
pub struct ReadableStreamDefaultReaderJs {
#[qjs(skip_trace)]
source: StreamSource,
#[qjs(skip_trace)]
locked: Locked,
#[qjs(skip_trace)]
released: bool,
}
#[allow(unsafe_code)]
unsafe impl rquickjs::JsLifetime<'_> for ReadableStreamDefaultReaderJs {
type Changed<'to> = ReadableStreamDefaultReaderJs;
}
#[rquickjs::methods(rename_all = "camelCase")]
impl ReadableStreamDefaultReaderJs {
#[qjs(constructor)]
pub fn new(ctx: Ctx<'_>) -> rquickjs::Result<Self> {
Err(rquickjs::Exception::throw_type(&ctx, "Illegal constructor"))
}
#[qjs(rename = "read")]
pub async fn read<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Object<'js>> {
if self.released {
return Err(rquickjs::Exception::throw_type(&ctx, "Reader has been released"));
}
pull(&ctx, &self.source).await
}
#[qjs(rename = "releaseLock")]
pub fn release_lock(&mut self) {
self.released = true;
self.locked.set(false);
}
#[qjs(rename = "cancel")]
pub fn cancel(&self, _reason: Opt<Value<'_>>) {
cancel_source(&self.source);
}
#[qjs(get, rename = "closed")]
pub fn closed<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Object<'js>> {
result_obj(&ctx, Value::new_undefined(ctx.clone()), true)
}
#[qjs(rename = PredefinedAtom::SymbolAsyncIterator)]
pub fn async_iter(this: This<Class<'_, ReadableStreamDefaultReaderJs>>) -> Class<'_, ReadableStreamDefaultReaderJs> {
this.0
}
#[qjs(rename = "next")]
pub async fn next<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Object<'js>> {
pull(&ctx, &self.source).await
}
}
#[derive(Trace)]
#[rquickjs::class(rename = "ReadableStream")]
pub struct ReadableStreamJs {
#[qjs(skip_trace)]
source: StreamSource,
#[qjs(skip_trace)]
locked: Locked,
}
#[allow(unsafe_code)]
unsafe impl rquickjs::JsLifetime<'_> for ReadableStreamJs {
type Changed<'to> = ReadableStreamJs;
}
impl ReadableStreamJs {
pub fn from_bytes(bytes: Vec<u8>) -> Self {
let mut chunks = VecDeque::new();
if !bytes.is_empty() {
chunks.push_back(bytes);
}
Self {
source: StreamSource::Buffered(Rc::new(RefCell::new(BufState {
chunks,
closed: true,
errored: None,
}))),
locked: Rc::new(Cell::new(false)),
}
}
pub fn from_net(resp: Arc<AsyncMutex<Option<HttpStreamResponse>>>) -> Self {
Self {
source: StreamSource::Net(resp),
locked: Rc::new(Cell::new(false)),
}
}
fn reader(&self) -> ReadableStreamDefaultReaderJs {
ReadableStreamDefaultReaderJs {
source: self.source.clone(),
locked: self.locked.clone(),
released: false,
}
}
}
#[rquickjs::methods(rename_all = "camelCase")]
impl ReadableStreamJs {
#[qjs(constructor)]
pub fn new<'js>(ctx: Ctx<'js>, source: Opt<Object<'js>>) -> rquickjs::Result<Self> {
let buf = Rc::new(RefCell::new(BufState::default()));
if let Some(src) = source.0
&& let Ok(start) = src.get::<_, rquickjs::Function<'js>>("start")
{
let controller = Class::instance(ctx.clone(), ReadableStreamDefaultControllerJs { buf: buf.clone() })?;
start.call::<_, ()>((controller,))?;
}
Ok(Self {
source: StreamSource::Buffered(buf),
locked: Rc::new(Cell::new(false)),
})
}
#[qjs(get, rename = "locked")]
pub fn locked(&self) -> bool {
self.locked.get()
}
#[qjs(rename = "getReader")]
pub fn get_reader<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Class<'js, ReadableStreamDefaultReaderJs>> {
if self.locked.get() {
return Err(rquickjs::Exception::throw_type(
&ctx,
"ReadableStream is already locked to a reader",
));
}
self.locked.set(true);
Class::instance(ctx, self.reader())
}
#[qjs(rename = "cancel")]
pub fn cancel(&self, _reason: Opt<Value<'_>>) {
cancel_source(&self.source);
}
#[qjs(rename = PredefinedAtom::SymbolAsyncIterator)]
pub fn async_iter<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Class<'js, ReadableStreamDefaultReaderJs>> {
self.locked.set(true);
Class::instance(ctx, self.reader())
}
}