1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
//!
//! Conversion from Rust streams into async JavaScript generators.
//!
//! This module allows you to convert any `future::Stream<Item>` where `Item : Into<JsValue>`
//! into an async JavaScript generator.
//!
//! ```no_run
//! let js_value : JsValue = AsyncStream::new(stream).into();
//! ```
//! or
//! ```no_run
//! let js_value = create_async_stream_iterator(stream);
//! ```
//!
//! For example:
//! ```no_run
//! #[wasm_bindgen]
//! fn test() {
//! let iter = stream::iter(0..30);
//! AsyncStream::new(iter).into()
//! }
//! ```
//!
//! Then, on JavaScript side, you can can consume it as follows:
//! ```js
//! let iter = myFn(); // get the generator from Rust
//! for await (let item of iter) {
//! console.log("item ->",item);
//! }
//! ```
//!
use crate::error::Error;
use crate::extensions::object::*;
use futures::{Stream, StreamExt};
use js_sys::Object;
use std::pin::Pin;
use wasm_bindgen::{prelude::wasm_bindgen, JsValue};
#[wasm_bindgen]
struct AsyncStreamProxy(Pin<Box<dyn Stream<Item = JsValue>>>);
impl AsyncStreamProxy {
pub fn new<T>(source: impl Stream<Item = T> + Send + 'static) -> Self
where
T: Into<JsValue> + Send + 'static,
{
AsyncStreamProxy(Box::pin(source.map(|item| item.into())))
}
}
#[wasm_bindgen]
impl AsyncStreamProxy {
#[allow(dead_code)]
pub async fn next(&mut self) -> Result<JsValue, Error> {
let object = Object::new();
let result = match self.0.next().await {
Some(value) => {
object.set("value", &value)?;
object.into()
}
None => {
object.set("done", &JsValue::from(true))?;
object.into()
}
};
Ok(result)
}
}
///
/// `AsyncStream` is a helper that receives a stream that must correspond
/// to the following spec: `Stream<Item = T> where T : Into<JsValue> + Send + 'static`.
/// The stream must be supplied via the `AsyncStream::new` constructor.
///
/// You can then use `into()` to obtain a `JsValue` the represents a
/// JavaScript generator iterating this stream.
///
pub struct AsyncStream(AsyncStreamProxy);
impl AsyncStream {
pub fn new<T>(source: impl Stream<Item = T> + Send + 'static) -> Self
where
T: Into<JsValue> + Send + 'static,
{
Self(AsyncStreamProxy::new(source))
}
}
static mut ASYNC_ITER_PROXY_FN: Option<js_sys::Function> = None;
fn async_iter_proxy_fn() -> &'static js_sys::Function {
unsafe {
ASYNC_ITER_PROXY_FN.get_or_insert_with(|| {
js_sys::Function::new_with_args(
"iter",
"return (async function* () {
let done = false;
let item = await iter.next();
while (!item.done) {
yield item.value;
item = await iter.next();
}
})();
",
)
})
}
}
impl From<AsyncStream> for JsValue {
fn from(stream: AsyncStream) -> Self {
let proxy_fn = async_iter_proxy_fn();
proxy_fn
.call1(&wasm_bindgen::JsValue::undefined(), &stream.0.into())
.unwrap_or_else(|err| panic!("create_async_stream_iterator(): {:?}", err))
}
}
///
/// Helper function that receives a stream and returns a `JsValue` representing
/// the JavaScript generator iterating this stream. The function uses `AsyncStream`
/// internally as follows: `AsyncStream::new(stream).into()`
///
pub fn create_async_stream_iterator<T>(source: impl Stream<Item = T> + Send + 'static) -> JsValue
where
T: Into<JsValue> + Send + 'static,
{
AsyncStream::new(source).into()
}