Skip to main content

ferridriver_script/bindings/
streams.rs

1//! WHATWG `ReadableStream` — a deliberately small spec subset (the
2//! full llrt/stream-web machinery — BYOB, byte controllers, tee — is
3//! studied for behaviour only, not ported).
4//!
5//! Two body sources behind one class:
6//! - **Buffered**: `new ReadableStream({ start(controller) })` and
7//!   `Blob.stream()` — chunks held in memory.
8//! - **Net**: `Response.body` — pulls chunks directly off the live
9//!   `reqwest` response ([`ferridriver::http_client::HttpStreamResponse`])
10//!   on each `read()`, so a large/streamed body is NOT fully buffered;
11//!   the consumer's pull rate is the backpressure.
12//!
13//! `getReader()` (locks; second getReader -> TypeError), `read()`
14//! (`{value:Uint8Array,done}`), `releaseLock()`, `cancel()`, `locked`,
15//! async iteration. Reader/controller are not user-constructible
16//! (throw, per spec) but the global names + `instanceof` exist.
17//! `read()` is async (a Net pull awaits the socket). Subset: no
18//! `pull`/`tee`/BYOB underlying-source callbacks.
19
20use std::cell::{Cell, RefCell};
21use std::collections::VecDeque;
22use std::rc::Rc;
23use std::sync::Arc;
24
25use ferridriver::http_client::HttpStreamResponse;
26use rquickjs::atom::PredefinedAtom;
27use rquickjs::function::{Opt, This};
28use rquickjs::{Class, Ctx, Object, TypedArray, Value, class::Trace};
29use tokio::sync::Mutex as AsyncMutex;
30
31#[derive(Default)]
32struct BufState {
33  chunks: VecDeque<Vec<u8>>,
34  closed: bool,
35  errored: Option<String>,
36}
37
38/// The body behind a stream. `Net` is the live response; `Buffered` is
39/// an in-memory queue (constructed streams, `Blob.stream()`).
40#[derive(Clone)]
41enum StreamSource {
42  Buffered(Rc<RefCell<BufState>>),
43  Net(Arc<AsyncMutex<Option<HttpStreamResponse>>>),
44}
45
46/// `locked` is shared between a stream and the reader it hands out so
47/// `releaseLock()` is observable on the stream.
48type Locked = Rc<Cell<bool>>;
49
50fn chunk_bytes(v: &Value<'_>) -> Vec<u8> {
51  if let Some(s) = v.as_string().and_then(|s| s.to_string().ok()) {
52    return s.into_bytes();
53  }
54  if let Ok(ta) = TypedArray::<u8>::from_value(v.clone()) {
55    let b: &[u8] = ta.as_ref();
56    return b.to_vec();
57  }
58  if let Some(ab) = rquickjs::ArrayBuffer::from_value(v.clone())
59    && let Some(b) = ab.as_bytes()
60  {
61    return b.to_vec();
62  }
63  Vec::new()
64}
65
66fn result_obj<'js>(ctx: &Ctx<'js>, value: Value<'js>, done: bool) -> rquickjs::Result<Object<'js>> {
67  let o = Object::new(ctx.clone())?;
68  o.set("value", value)?;
69  o.set("done", done)?;
70  Ok(o)
71}
72
73fn chunk_result<'js>(ctx: &Ctx<'js>, bytes: Vec<u8>) -> rquickjs::Result<Object<'js>> {
74  let ta = TypedArray::<u8>::new(ctx.clone(), bytes)?;
75  result_obj(ctx, ta.into_value(), false)
76}
77
78/// One read step. Buffered is immediate; Net awaits the next socket
79/// chunk (the only `.await` for a buffered stream is the readiness
80/// yield, so `read()` is uniformly async without a no-op `async`).
81async fn pull<'js>(ctx: &Ctx<'js>, source: &StreamSource) -> rquickjs::Result<Object<'js>> {
82  match source {
83    StreamSource::Buffered(state) => {
84      std::future::ready(()).await;
85      let (chunk, errored) = {
86        let mut s = state.borrow_mut();
87        if let Some(e) = s.errored.clone() {
88          (None, Some(e))
89        } else {
90          (s.chunks.pop_front(), None)
91        }
92      };
93      if let Some(e) = errored {
94        return Err(rquickjs::Exception::throw_type(ctx, &e));
95      }
96      match chunk {
97        Some(b) => chunk_result(ctx, b),
98        None => result_obj(ctx, Value::new_undefined(ctx.clone()), true),
99      }
100    },
101    StreamSource::Net(resp) => {
102      let mut guard = resp.lock().await;
103      let Some(r) = guard.as_mut() else {
104        return result_obj(ctx, Value::new_undefined(ctx.clone()), true);
105      };
106      match r.chunk().await {
107        Ok(Some(bytes)) => chunk_result(ctx, bytes.to_vec()),
108        Ok(None) => {
109          *guard = None;
110          result_obj(ctx, Value::new_undefined(ctx.clone()), true)
111        },
112        Err(e) => {
113          *guard = None;
114          Err(rquickjs::Exception::throw_type(ctx, &e.to_string()))
115        },
116      }
117    },
118  }
119}
120
121fn cancel_source(source: &StreamSource) {
122  match source {
123    StreamSource::Buffered(state) => {
124      let mut s = state.borrow_mut();
125      s.chunks.clear();
126      s.closed = true;
127    },
128    // Drop the live response if not mid-read (best-effort; a concurrent
129    // read keeps the lock and finishes its chunk first).
130    StreamSource::Net(resp) => {
131      if let Ok(mut g) = resp.try_lock() {
132        *g = None;
133      }
134    },
135  }
136}
137
138#[derive(Trace)]
139#[rquickjs::class(rename = "ReadableStreamDefaultController")]
140pub struct ReadableStreamDefaultControllerJs {
141  #[qjs(skip_trace)]
142  buf: Rc<RefCell<BufState>>,
143}
144
145#[allow(unsafe_code)]
146unsafe impl rquickjs::JsLifetime<'_> for ReadableStreamDefaultControllerJs {
147  type Changed<'to> = ReadableStreamDefaultControllerJs;
148}
149
150#[rquickjs::methods(rename_all = "camelCase")]
151impl ReadableStreamDefaultControllerJs {
152  /// Not user-constructible (only handed to `start`); present so the
153  /// global name + `instanceof` exist.
154  #[qjs(constructor)]
155  pub fn new(ctx: Ctx<'_>) -> rquickjs::Result<Self> {
156    Err(rquickjs::Exception::throw_type(&ctx, "Illegal constructor"))
157  }
158
159  #[qjs(rename = "enqueue")]
160  pub fn enqueue(&self, chunk: Value<'_>) {
161    self.buf.borrow_mut().chunks.push_back(chunk_bytes(&chunk));
162  }
163
164  #[qjs(rename = "close")]
165  pub fn close(&self) {
166    self.buf.borrow_mut().closed = true;
167  }
168
169  #[qjs(rename = "error")]
170  pub fn error(&self, reason: Opt<Value<'_>>) {
171    let msg = reason
172      .0
173      .and_then(|v| {
174        v.as_string()
175          .and_then(|s| s.to_string().ok())
176          .or_else(|| v.as_object().and_then(|o| o.get::<_, String>("message").ok()))
177      })
178      .unwrap_or_else(|| "stream errored".to_string());
179    let mut s = self.buf.borrow_mut();
180    s.errored = Some(msg);
181    s.closed = true;
182  }
183}
184
185#[derive(Trace)]
186#[rquickjs::class(rename = "ReadableStreamDefaultReader")]
187pub struct ReadableStreamDefaultReaderJs {
188  #[qjs(skip_trace)]
189  source: StreamSource,
190  #[qjs(skip_trace)]
191  locked: Locked,
192  #[qjs(skip_trace)]
193  released: bool,
194}
195
196#[allow(unsafe_code)]
197unsafe impl rquickjs::JsLifetime<'_> for ReadableStreamDefaultReaderJs {
198  type Changed<'to> = ReadableStreamDefaultReaderJs;
199}
200
201#[rquickjs::methods(rename_all = "camelCase")]
202impl ReadableStreamDefaultReaderJs {
203  #[qjs(constructor)]
204  pub fn new(ctx: Ctx<'_>) -> rquickjs::Result<Self> {
205    Err(rquickjs::Exception::throw_type(&ctx, "Illegal constructor"))
206  }
207
208  /// `read()` -> `Promise<{ value: Uint8Array, done }>` (a Net pull
209  /// awaits the socket; buffered resolves immediately).
210  #[qjs(rename = "read")]
211  pub async fn read<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Object<'js>> {
212    if self.released {
213      return Err(rquickjs::Exception::throw_type(&ctx, "Reader has been released"));
214    }
215    pull(&ctx, &self.source).await
216  }
217
218  #[qjs(rename = "releaseLock")]
219  pub fn release_lock(&mut self) {
220    self.released = true;
221    self.locked.set(false);
222  }
223
224  #[qjs(rename = "cancel")]
225  pub fn cancel(&self, _reason: Opt<Value<'_>>) {
226    cancel_source(&self.source);
227  }
228
229  /// Spec `reader.closed` is a `Promise`; buffered-subset eager-resolve.
230  #[qjs(get, rename = "closed")]
231  pub fn closed<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Object<'js>> {
232    result_obj(&ctx, Value::new_undefined(ctx.clone()), true)
233  }
234
235  /// A reader is its own async iterator.
236  #[qjs(rename = PredefinedAtom::SymbolAsyncIterator)]
237  pub fn async_iter(this: This<Class<'_, ReadableStreamDefaultReaderJs>>) -> Class<'_, ReadableStreamDefaultReaderJs> {
238    this.0
239  }
240
241  #[qjs(rename = "next")]
242  pub async fn next<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Object<'js>> {
243    pull(&ctx, &self.source).await
244  }
245}
246
247#[derive(Trace)]
248#[rquickjs::class(rename = "ReadableStream")]
249pub struct ReadableStreamJs {
250  #[qjs(skip_trace)]
251  source: StreamSource,
252  #[qjs(skip_trace)]
253  locked: Locked,
254}
255
256#[allow(unsafe_code)]
257unsafe impl rquickjs::JsLifetime<'_> for ReadableStreamJs {
258  type Changed<'to> = ReadableStreamJs;
259}
260
261impl ReadableStreamJs {
262  /// A buffered stream pre-filled with `bytes` (one chunk, closed) —
263  /// `Blob.stream()` and a buffered `Response.body`.
264  pub fn from_bytes(bytes: Vec<u8>) -> Self {
265    let mut chunks = VecDeque::new();
266    if !bytes.is_empty() {
267      chunks.push_back(bytes);
268    }
269    Self {
270      source: StreamSource::Buffered(Rc::new(RefCell::new(BufState {
271        chunks,
272        closed: true,
273        errored: None,
274      }))),
275      locked: Rc::new(Cell::new(false)),
276    }
277  }
278
279  /// A live stream over a not-yet-read response — an incremental
280  /// `Response.body`.
281  pub fn from_net(resp: Arc<AsyncMutex<Option<HttpStreamResponse>>>) -> Self {
282    Self {
283      source: StreamSource::Net(resp),
284      locked: Rc::new(Cell::new(false)),
285    }
286  }
287
288  fn reader(&self) -> ReadableStreamDefaultReaderJs {
289    ReadableStreamDefaultReaderJs {
290      source: self.source.clone(),
291      locked: self.locked.clone(),
292      released: false,
293    }
294  }
295}
296
297#[rquickjs::methods(rename_all = "camelCase")]
298impl ReadableStreamJs {
299  /// `new ReadableStream(underlyingSource?)` — runs `start(controller)`
300  /// synchronously if present. `pull`/`cancel`/BYOB unsupported.
301  #[qjs(constructor)]
302  pub fn new<'js>(ctx: Ctx<'js>, source: Opt<Object<'js>>) -> rquickjs::Result<Self> {
303    let buf = Rc::new(RefCell::new(BufState::default()));
304    if let Some(src) = source.0
305      && let Ok(start) = src.get::<_, rquickjs::Function<'js>>("start")
306    {
307      let controller = Class::instance(ctx.clone(), ReadableStreamDefaultControllerJs { buf: buf.clone() })?;
308      start.call::<_, ()>((controller,))?;
309    }
310    Ok(Self {
311      source: StreamSource::Buffered(buf),
312      locked: Rc::new(Cell::new(false)),
313    })
314  }
315
316  #[qjs(get, rename = "locked")]
317  pub fn locked(&self) -> bool {
318    self.locked.get()
319  }
320
321  #[qjs(rename = "getReader")]
322  pub fn get_reader<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Class<'js, ReadableStreamDefaultReaderJs>> {
323    if self.locked.get() {
324      return Err(rquickjs::Exception::throw_type(
325        &ctx,
326        "ReadableStream is already locked to a reader",
327      ));
328    }
329    self.locked.set(true);
330    Class::instance(ctx, self.reader())
331  }
332
333  #[qjs(rename = "cancel")]
334  pub fn cancel(&self, _reason: Opt<Value<'_>>) {
335    cancel_source(&self.source);
336  }
337
338  /// `stream[Symbol.asyncIterator]()` — a reader (locks the stream).
339  #[qjs(rename = PredefinedAtom::SymbolAsyncIterator)]
340  pub fn async_iter<'js>(&self, ctx: Ctx<'js>) -> rquickjs::Result<Class<'js, ReadableStreamDefaultReaderJs>> {
341    self.locked.set(true);
342    Class::instance(ctx, self.reader())
343  }
344}