1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
use bytes::BufMut; use futures_core::Future; use futures_util::io::{self, AsyncBufRead, Cursor}; use js_sys::{AsyncIterator, Boolean, Reflect, Uint8Array}; use std::{ pin::Pin, task::{Context, Poll}, }; use wasm_bindgen::{prelude::*, JsCast}; use wasm_bindgen_futures::JsFuture; pub struct JsAsyncRead { inner: AsyncIterator, next: JsFuture, data: Cursor<Vec<u8>>, } impl JsAsyncRead { pub fn new(inner: AsyncIterator) -> anyhow::Result<Self> { let next = JsFuture::from(inner.next().map_err(AsyncReadableError)?); let data = Default::default(); Ok(Self { inner, next, data }) } fn poll_read( self: Pin<&mut Self>, cx: &mut Context, mut buf: &mut [u8], ) -> Result<Poll<std::io::Result<usize>>, JsValue> { let this = self.get_mut(); let inner_buf = match Pin::new(&mut this.data).poll_fill_buf(cx) { Poll::Ready(Ok(buf)) => buf, Poll::Ready(Err(err)) => return Ok(Poll::Ready(Err(err))), Poll::Pending => return Ok(Poll::Pending), }; if inner_buf.is_empty() { let next = Pin::new(&mut this.next); let status = next.poll(cx)?; match status { Poll::Ready(object) => { let done = Reflect::get(&object, &"done".into())?; let done = done.unchecked_into::<Boolean>().value_of(); if done { Ok(Poll::Ready(Ok(0))) } else { let value = Reflect::get(&object, &"value".into())?; let value = value.unchecked_into::<Uint8Array>().to_vec(); this.data = Cursor::new(value); match this.inner.next() { Ok(promise) => { this.next = JsFuture::from(promise); }, Err(error) => { return Err(error); }, } cx.waker().clone().wake(); Ok(Poll::Pending) } }, Poll::Pending => { cx.waker().clone().wake(); Ok(Poll::Pending) }, } } else { let amt = inner_buf.len(); buf.put_slice(&inner_buf[.. amt]); Pin::new(&mut this.data).consume(amt); Ok(Poll::Ready(Ok(amt))) } } } #[derive(Clone, Debug)] struct AsyncReadableError(JsValue); unsafe impl Send for AsyncReadableError { } unsafe impl Sync for AsyncReadableError { } impl std::fmt::Display for AsyncReadableError { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!(fmt, "{:?}", self.0) } } impl std::error::Error for AsyncReadableError { } impl io::AsyncRead for JsAsyncRead { fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<std::io::Result<usize>> { match JsAsyncRead::poll_read(self, cx, buf) { Ok(success) => success, Err(error) => { let kind = io::ErrorKind::Other; let error = AsyncReadableError(error); Poll::Ready(Err(io::Error::new(kind, error))) }, } } }