ferridriver_script/bindings/
streams.rs1use std::cell::{Cell, RefCell};
21use std::collections::VecDeque;
22use std::rc::Rc;
23use std::sync::Arc;
24
25use ferridriver::http_client::HttpStreamResponse;
26use rquickjs::atom::PredefinedAtom;
27use rquickjs::function::{Opt, This};
28use rquickjs::{Class, Ctx, Object, TypedArray, Value, class::Trace};
29use tokio::sync::Mutex as AsyncMutex;
30
31#[derive(Default)]
32struct BufState {
33 chunks: VecDeque<Vec<u8>>,
34 closed: bool,
35 errored: Option<String>,
36}
37
38#[derive(Clone)]
41enum StreamSource {
42 Buffered(Rc<RefCell<BufState>>),
43 Net(Arc<AsyncMutex<Option<HttpStreamResponse>>>),
44}
45
46type Locked = Rc<Cell<bool>>;
49
50fn chunk_bytes(v: &Value<'_>) -> Vec<u8> {
51 if let Some(s) = v.as_string().and_then(|s| s.to_string().ok()) {
52 return s.into_bytes();
53 }
54 if let Ok(ta) = TypedArray::<u8>::from_value(v.clone()) {
55 let b: &[u8] = ta.as_ref();
56 return b.to_vec();
57 }
58 if let Some(ab) = rquickjs::ArrayBuffer::from_value(v.clone())
59 && let Some(b) = ab.as_bytes()
60 {
61 return b.to_vec();
62 }
63 Vec::new()
64}
65
66fn result_obj<'js>(ctx: &Ctx<'js>, value: Value<'js>, done: bool) -> rquickjs::Result<Object<'js>> {
67 let o = Object::new(ctx.clone())?;
68 o.set("value", value)?;
69 o.set("done", done)?;
70 Ok(o)
71}
72
73fn chunk_result<'js>(ctx: &Ctx<'js>, bytes: Vec<u8>) -> rquickjs::Result<Object<'js>> {
74 let ta = TypedArray::<u8>::new(ctx.clone(), bytes)?;
75 result_obj(ctx, ta.into_value(), false)
76}
77
78async fn pull<'js>(ctx: &Ctx<'js>, source: &StreamSource) -> rquickjs::Result<Object<'js>> {
82 match source {
83 StreamSource::Buffered(state) => {
84 std::future::ready(()).await;
85 let (chunk, errored) = {
86 let mut s = state.borrow_mut();
87 if let Some(e) = s.errored.clone() {
88 (None, Some(e))
89 } else {
90 (s.chunks.pop_front(), None)
91 }
92 };
93 if let Some(e) = errored {
94 return Err(rquickjs::Exception::throw_type(ctx, &e));
95 }
96 match chunk {
97 Some(b) => chunk_result(ctx, b),
98 None => result_obj(ctx, Value::new_undefined(ctx.clone()), true),
99 }
100 },
101 StreamSource::Net(resp) => {
102 let mut guard = resp.lock().await;
103 let Some(r) = guard.as_mut() else {
104 return result_obj(ctx, Value::new_undefined(ctx.clone()), true);
105 };
106 match r.chunk().await {
107 Ok(Some(bytes)) => chunk_result(ctx, bytes.to_vec()),
108 Ok(None) => {
109 *guard = None;
110 result_obj(ctx, Value::new_undefined(ctx.clone()), true)
111 },
112 Err(e) => {
113 *guard = None;
114 Err(rquickjs::Exception::throw_type(ctx, &e.to_string()))
115 },
116 }
117 },
118 }
119}
120
121fn cancel_source(source: &StreamSource) {
122 match source {
123 StreamSource::Buffered(state) => {
124 let mut s = state.borrow_mut();
125 s.chunks.clear();
126 s.closed = true;
127 },
128 StreamSource::Net(resp) => {
131 if let Ok(mut g) = resp.try_lock() {
132 *g = None;
133 }
134 },
135 }
136}
137
138#[derive(Trace)]
139#[rquickjs::class(rename = "ReadableStreamDefaultController")]
140pub struct ReadableStreamDefaultControllerJs {
141 #[qjs(skip_trace)]
142 buf: Rc<RefCell<BufState>>,
143}
144
145#[allow(unsafe_code)]
146unsafe impl rquickjs::JsLifetime<'_> for ReadableStreamDefaultControllerJs {
147 type Changed<'to> = ReadableStreamDefaultControllerJs;
148}
149
150#[rquickjs::methods(rename_all = "camelCase")]
151impl ReadableStreamDefaultControllerJs {
152 #[qjs(constructor)]
155 pub fn new(ctx: Ctx<'_>) -> rquickjs::Result<Self> {
156 Err(rquickjs::Exception::throw_type(&ctx, "Illegal constructor"))
157 }
158
159 #[qjs(rename = "enqueue")]
160 pub fn enqueue(&self, chunk: Value<'_>) {
161 self.buf.borrow_mut().chunks.push_back(chunk_bytes(&chunk));
162 }
163
164 #[qjs(rename = "close")]
165 pub fn close(&self) {
166 self.buf.borrow_mut().closed = true;
167 }
168
169 #[qjs(rename = "error")]
170 pub fn error(&self, reason: Opt<Value<'_>>) {
171 let msg = reason
172 .0
173 .and_then(|v| {
174 v.as_string()
175 .and_then(|s| s.to_string().ok())
176 .or_else(|| v.as_object().and_then(|o| o.get::<_, String>("message").ok()))
177 })
178 .unwrap_or_else(|| "stream errored".to_string());
179 let mut s = self.buf.borrow_mut();
180 s.errored = Some(msg);
181 s.closed = true;
182 }
183}
184
185#[derive(Trace)]
186#[rquickjs::class(rename = "ReadableStreamDefaultReader")]
187pub struct ReadableStreamDefaultReaderJs {
188 #[qjs(skip_trace)]
189 source: StreamSource,
190 #[qjs(skip_trace)]
191 locked: Locked,
192 #[qjs(skip_trace)]
193 released: bool,
194}
195
196#[allow(unsafe_code)]
197unsafe impl rquickjs::JsLifetime<'_> for ReadableStreamDefaultReaderJs {
198 type Changed<'to> = ReadableStreamDefaultReaderJs;
199}
200
201#[rquickjs::methods(rename_all = "camelCase")]
202impl ReadableStreamDefaultReaderJs {
203 #[qjs(constructor)]
204 pub fn new(ctx: Ctx<'_>) -> rquickjs::Result<Self> {
205 Err(rquickjs::Exception::throw_type(&ctx, "Illegal constructor"))
206 }
207
208 #[qjs(rename = "read")]
211 pub async fn read<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Object<'js>> {
212 if self.released {
213 return Err(rquickjs::Exception::throw_type(&ctx, "Reader has been released"));
214 }
215 pull(&ctx, &self.source).await
216 }
217
218 #[qjs(rename = "releaseLock")]
219 pub fn release_lock(&mut self) {
220 self.released = true;
221 self.locked.set(false);
222 }
223
224 #[qjs(rename = "cancel")]
225 pub fn cancel(&self, _reason: Opt<Value<'_>>) {
226 cancel_source(&self.source);
227 }
228
229 #[qjs(get, rename = "closed")]
231 pub fn closed<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Object<'js>> {
232 result_obj(&ctx, Value::new_undefined(ctx.clone()), true)
233 }
234
235 #[qjs(rename = PredefinedAtom::SymbolAsyncIterator)]
237 pub fn async_iter(this: This<Class<'_, ReadableStreamDefaultReaderJs>>) -> Class<'_, ReadableStreamDefaultReaderJs> {
238 this.0
239 }
240
241 #[qjs(rename = "next")]
242 pub async fn next<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Object<'js>> {
243 pull(&ctx, &self.source).await
244 }
245}
246
247#[derive(Trace)]
248#[rquickjs::class(rename = "ReadableStream")]
249pub struct ReadableStreamJs {
250 #[qjs(skip_trace)]
251 source: StreamSource,
252 #[qjs(skip_trace)]
253 locked: Locked,
254}
255
256#[allow(unsafe_code)]
257unsafe impl rquickjs::JsLifetime<'_> for ReadableStreamJs {
258 type Changed<'to> = ReadableStreamJs;
259}
260
261impl ReadableStreamJs {
262 pub fn from_bytes(bytes: Vec<u8>) -> Self {
265 let mut chunks = VecDeque::new();
266 if !bytes.is_empty() {
267 chunks.push_back(bytes);
268 }
269 Self {
270 source: StreamSource::Buffered(Rc::new(RefCell::new(BufState {
271 chunks,
272 closed: true,
273 errored: None,
274 }))),
275 locked: Rc::new(Cell::new(false)),
276 }
277 }
278
279 pub fn from_net(resp: Arc<AsyncMutex<Option<HttpStreamResponse>>>) -> Self {
282 Self {
283 source: StreamSource::Net(resp),
284 locked: Rc::new(Cell::new(false)),
285 }
286 }
287
288 fn reader(&self) -> ReadableStreamDefaultReaderJs {
289 ReadableStreamDefaultReaderJs {
290 source: self.source.clone(),
291 locked: self.locked.clone(),
292 released: false,
293 }
294 }
295}
296
297#[rquickjs::methods(rename_all = "camelCase")]
298impl ReadableStreamJs {
299 #[qjs(constructor)]
302 pub fn new<'js>(ctx: Ctx<'js>, source: Opt<Object<'js>>) -> rquickjs::Result<Self> {
303 let buf = Rc::new(RefCell::new(BufState::default()));
304 if let Some(src) = source.0
305 && let Ok(start) = src.get::<_, rquickjs::Function<'js>>("start")
306 {
307 let controller = Class::instance(ctx.clone(), ReadableStreamDefaultControllerJs { buf: buf.clone() })?;
308 start.call::<_, ()>((controller,))?;
309 }
310 Ok(Self {
311 source: StreamSource::Buffered(buf),
312 locked: Rc::new(Cell::new(false)),
313 })
314 }
315
316 #[qjs(get, rename = "locked")]
317 pub fn locked(&self) -> bool {
318 self.locked.get()
319 }
320
321 #[qjs(rename = "getReader")]
322 pub fn get_reader<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Class<'js, ReadableStreamDefaultReaderJs>> {
323 if self.locked.get() {
324 return Err(rquickjs::Exception::throw_type(
325 &ctx,
326 "ReadableStream is already locked to a reader",
327 ));
328 }
329 self.locked.set(true);
330 Class::instance(ctx, self.reader())
331 }
332
333 #[qjs(rename = "cancel")]
334 pub fn cancel(&self, _reason: Opt<Value<'_>>) {
335 cancel_source(&self.source);
336 }
337
338 #[qjs(rename = PredefinedAtom::SymbolAsyncIterator)]
340 pub fn async_iter<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Class<'js, ReadableStreamDefaultReaderJs>> {
341 self.locked.set(true);
342 Class::instance(ctx, self.reader())
343 }
344}