1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use bytes::BufMut;
use futures_core::Future;
use futures_util::io::{self, AsyncBufRead, Cursor};
use js_sys::{AsyncIterator, IteratorNext, JsString, Uint8Array};
use std::{
convert::TryFrom,
pin::Pin,
task::{Context, Poll},
};
use wasm_bindgen::{prelude::*, JsCast};
use wasm_bindgen_futures::JsFuture;
pub struct JsAsyncRead {
inner: AsyncIterator,
next: JsFuture,
data: Cursor<Vec<u8>>,
}
impl JsAsyncRead {
pub fn new(inner: AsyncIterator) -> Result<Self, JsValue> {
let next = JsFuture::from(inner.next()?);
let data = Default::default();
Ok(Self { inner, next, data })
}
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
mut buf: &mut [u8],
) -> Result<Poll<std::io::Result<usize>>, JsValue> {
let this = self.get_mut();
let inner_buf = match Pin::new(&mut this.data).poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => buf,
Poll::Ready(Err(err)) => return Ok(Poll::Ready(Err(err))),
Poll::Pending => return Ok(Poll::Pending),
};
if inner_buf.is_empty() {
let next = Pin::new(&mut this.next);
let status = next.poll(cx)?;
match status {
Poll::Ready(object) => {
let iterator_next = object.unchecked_into::<IteratorNext>();
if iterator_next.done() {
Ok(Poll::Ready(Ok(0)))
} else {
let value = {
let next_value = iterator_next.value();
if Uint8Array::instanceof(&next_value) {
Ok(next_value.unchecked_into::<Uint8Array>().to_vec())
} else if next_value.is_string() {
if let Some(string) = next_value.unchecked_into::<JsString>().as_string() {
Ok(string.into_bytes())
} else {
Err(js_sys::Error::new("Error converting JsString to String"))
}
} else {
Err(js_sys::Error::new(
"Inner AsyncIterator must produce a JsString or Uint8Array",
))
}
}?;
this.data = Cursor::new(value);
match this.inner.next() {
Ok(promise) => {
this.next = JsFuture::from(promise);
},
Err(error) => {
return Err(error);
},
}
cx.waker().wake_by_ref();
Ok(Poll::Pending)
}
},
Poll::Pending => Ok(Poll::Pending),
}
} else {
let amt = std::cmp::min(inner_buf.len(), buf.len());
buf.put_slice(&inner_buf[.. amt]);
Pin::new(&mut this.data).consume(amt);
Ok(Poll::Ready(Ok(amt)))
}
}
}
impl TryFrom<AsyncIterator> for JsAsyncRead {
type Error = JsValue;
fn try_from(inner: AsyncIterator) -> Result<Self, JsValue> {
Self::new(inner)
}
}
#[derive(Clone, Debug)]
struct AsyncReadableError(JsValue);
unsafe impl Send for AsyncReadableError {
}
unsafe impl Sync for AsyncReadableError {
}
impl std::fmt::Display for AsyncReadableError {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(fmt, "{:?}", self.0)
}
}
impl std::error::Error for AsyncReadableError {
}
impl io::AsyncRead for JsAsyncRead {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
match JsAsyncRead::poll_read(self, cx, buf) {
Ok(success) => success,
Err(error) => {
let kind = io::ErrorKind::Other;
let error = AsyncReadableError(error);
Poll::Ready(Err(io::Error::new(kind, error)))
},
}
}
}