1use futures::{prelude::*, future::Ready};
36use tetsy_libp2p_core::{transport::ListenerEvent, transport::TransportError, Multiaddr, Transport};
37use tetsy_send_wrapper::SendWrapper;
38use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll};
39use wasm_bindgen::{JsCast, prelude::*};
40use wasm_bindgen_futures::JsFuture;
41
42pub mod ffi {
44 use wasm_bindgen::prelude::*;
45
46 #[wasm_bindgen]
47 extern "C" {
48 pub type Transport;
50 pub type Connection;
52 pub type ListenEvent;
54 pub type ConnectionEvent;
56
57 #[wasm_bindgen(method, catch)]
64 pub fn dial(this: &Transport, multiaddr: &str) -> Result<js_sys::Promise, JsValue>;
65
66 #[wasm_bindgen(method, catch)]
73 pub fn listen_on(this: &Transport, multiaddr: &str) -> Result<js_sys::Iterator, JsValue>;
74
75 #[wasm_bindgen(method, getter)]
80 pub fn read(this: &Connection) -> js_sys::Iterator;
81
82 #[wasm_bindgen(method, catch)]
90 pub fn write(this: &Connection, data: &[u8]) -> Result<js_sys::Promise, JsValue>;
91
92 #[wasm_bindgen(method, catch)]
95 pub fn shutdown(this: &Connection) -> Result<(), JsValue>;
96
97 #[wasm_bindgen(method)]
99 pub fn close(this: &Connection);
100
101 #[wasm_bindgen(method, getter)]
104 pub fn new_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
105
106 #[wasm_bindgen(method, getter)]
108 pub fn expired_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
109
110 #[wasm_bindgen(method, getter)]
112 pub fn new_connections(this: &ListenEvent) -> Option<Box<[JsValue]>>;
113
114 #[wasm_bindgen(method, getter)]
116 pub fn next_event(this: &ListenEvent) -> JsValue;
117
118 #[wasm_bindgen(method, getter)]
120 pub fn connection(this: &ConnectionEvent) -> Connection;
121
122 #[wasm_bindgen(method, getter)]
124 pub fn observed_addr(this: &ConnectionEvent) -> String;
125
126 #[wasm_bindgen(method, getter)]
128 pub fn local_addr(this: &ConnectionEvent) -> String;
129 }
130
131 #[cfg(feature = "websocket")]
132 #[wasm_bindgen(module = "/src/websockets.js")]
133 extern "C" {
134 pub fn websocket_transport() -> Transport;
136 }
137}
138
139pub struct ExtTransport {
141 inner: SendWrapper<ffi::Transport>,
142}
143
144impl ExtTransport {
145 pub fn new(transport: ffi::Transport) -> Self {
147 ExtTransport {
148 inner: SendWrapper::new(transport),
149 }
150 }
151}
152
153impl fmt::Debug for ExtTransport {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 f.debug_tuple("ExtTransport").finish()
156 }
157}
158
159impl Clone for ExtTransport {
160 fn clone(&self) -> Self {
161 ExtTransport {
162 inner: SendWrapper::new(self.inner.clone().into()),
163 }
164 }
165}
166
167impl Transport for ExtTransport {
168 type Output = Connection;
169 type Error = JsErr;
170 type Listener = Listen;
171 type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
172 type Dial = Dial;
173
174 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
175 let iter = self
176 .inner
177 .listen_on(&addr.to_string())
178 .map_err(|err| {
179 if is_not_supported_error(&err) {
180 TransportError::MultiaddrNotSupported(addr)
181 } else {
182 TransportError::Other(JsErr::from(err))
183 }
184 })?;
185
186 Ok(Listen {
187 iterator: SendWrapper::new(iter),
188 next_event: None,
189 pending_events: VecDeque::new(),
190 })
191 }
192
193 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
194 let promise = self
195 .inner
196 .dial(&addr.to_string())
197 .map_err(|err| {
198 if is_not_supported_error(&err) {
199 TransportError::MultiaddrNotSupported(addr)
200 } else {
201 TransportError::Other(JsErr::from(err))
202 }
203 })?;
204
205 Ok(Dial {
206 inner: SendWrapper::new(promise.into()),
207 })
208 }
209
210 fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
211 None
212 }
213}
214
215#[must_use = "futures do nothing unless polled"]
217pub struct Dial {
218 inner: SendWrapper<JsFuture>,
220}
221
222impl fmt::Debug for Dial {
223 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
224 f.debug_tuple("Dial").finish()
225 }
226}
227
228impl Future for Dial {
229 type Output = Result<Connection, JsErr>;
230
231 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
232 match Future::poll(Pin::new(&mut *self.inner), cx) {
233 Poll::Ready(Ok(connec)) => Poll::Ready(Ok(Connection::new(connec.into()))),
234 Poll::Pending => Poll::Pending,
235 Poll::Ready(Err(err)) => Poll::Ready(Err(JsErr::from(err))),
236 }
237 }
238}
239
240#[must_use = "futures do nothing unless polled"]
242pub struct Listen {
243 iterator: SendWrapper<js_sys::Iterator>,
245 next_event: Option<SendWrapper<JsFuture>>,
247 pending_events: VecDeque<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>>,
249}
250
251impl fmt::Debug for Listen {
252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253 f.debug_tuple("Listen").finish()
254 }
255}
256
257impl Stream for Listen {
258 type Item = Result<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>, JsErr>;
259
260 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261 loop {
262 if let Some(ev) = self.pending_events.pop_front() {
263 return Poll::Ready(Some(Ok(ev)));
264 }
265
266 if self.next_event.is_none() {
269 if let Ok(ev) = self.iterator.next() {
270 if !ev.done() {
271 let promise: js_sys::Promise = ev.value().into();
272 self.next_event = Some(SendWrapper::new(promise.into()));
273 }
274 }
275 }
276
277 let event = if let Some(next_event) = self.next_event.as_mut() {
278 let e = match Future::poll(Pin::new(&mut **next_event), cx) {
279 Poll::Ready(Ok(ev)) => ffi::ListenEvent::from(ev),
280 Poll::Pending => return Poll::Pending,
281 Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
282 };
283 self.next_event = None;
284 e
285 } else {
286 return Poll::Ready(None);
287 };
288
289 for addr in event
290 .new_addrs()
291 .into_iter()
292 .flat_map(|e| e.to_vec().into_iter())
293 {
294 let addr = js_value_to_addr(&addr)?;
295 self.pending_events
296 .push_back(ListenerEvent::NewAddress(addr));
297 }
298
299 for upgrade in event
300 .new_connections()
301 .into_iter()
302 .flat_map(|e| e.to_vec().into_iter())
303 {
304 let upgrade: ffi::ConnectionEvent = upgrade.into();
305 self.pending_events.push_back(ListenerEvent::Upgrade {
306 local_addr: upgrade.local_addr().parse()?,
307 remote_addr: upgrade.observed_addr().parse()?,
308 upgrade: futures::future::ok(Connection::new(upgrade.connection())),
309 });
310 }
311
312 for addr in event
313 .expired_addrs()
314 .into_iter()
315 .flat_map(|e| e.to_vec().into_iter())
316 {
317 match js_value_to_addr(&addr) {
318 Ok(addr) => self.pending_events.push_back(ListenerEvent::NewAddress(addr)),
319 Err(err) => self.pending_events.push_back(ListenerEvent::Error(err)),
320 }
321 }
322 }
323 }
324}
325
326pub struct Connection {
332 inner: SendWrapper<ffi::Connection>,
334
335 read_iterator: SendWrapper<js_sys::Iterator>,
337
338 read_state: ConnectionReadState,
340
341 previous_write_promise: Option<SendWrapper<JsFuture>>,
345}
346
347impl Connection {
348 fn new(inner: ffi::Connection) -> Self {
350 let read_iterator = inner.read();
351
352 Connection {
353 inner: SendWrapper::new(inner),
354 read_iterator: SendWrapper::new(read_iterator),
355 read_state: ConnectionReadState::PendingData(Vec::new()),
356 previous_write_promise: None,
357 }
358 }
359}
360
361enum ConnectionReadState {
363 PendingData(Vec<u8>),
365 Waiting(SendWrapper<JsFuture>),
367 Finished,
369}
370
371impl fmt::Debug for Connection {
372 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373 f.debug_tuple("Connection").finish()
374 }
375}
376
377impl AsyncRead for Connection {
378 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
379 loop {
380 match mem::replace(&mut self.read_state, ConnectionReadState::Finished) {
381 ConnectionReadState::Finished => break Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
382
383 ConnectionReadState::PendingData(ref data) if data.is_empty() => {
384 let iter_next = self.read_iterator.next().map_err(JsErr::from)?;
385 if iter_next.done() {
386 self.read_state = ConnectionReadState::Finished;
387 } else {
388 let promise: js_sys::Promise = iter_next.value().into();
389 let promise = SendWrapper::new(promise.into());
390 self.read_state = ConnectionReadState::Waiting(promise);
391 }
392 continue;
393 }
394
395 ConnectionReadState::PendingData(mut data) => {
396 debug_assert!(!data.is_empty());
397 if buf.len() <= data.len() {
398 buf.copy_from_slice(&data[..buf.len()]);
399 self.read_state =
400 ConnectionReadState::PendingData(data.split_off(buf.len()));
401 break Poll::Ready(Ok(buf.len()));
402 } else {
403 let len = data.len();
404 buf[..len].copy_from_slice(&data);
405 self.read_state = ConnectionReadState::PendingData(Vec::new());
406 break Poll::Ready(Ok(len));
407 }
408 }
409
410 ConnectionReadState::Waiting(mut promise) => {
411 let data = match Future::poll(Pin::new(&mut *promise), cx) {
412 Poll::Ready(Ok(ref data)) if data.is_null() => break Poll::Ready(Ok(0)),
413 Poll::Ready(Ok(data)) => data,
414 Poll::Ready(Err(err)) => break Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
415 Poll::Pending => {
416 self.read_state = ConnectionReadState::Waiting(promise);
417 break Poll::Pending;
418 }
419 };
420
421 let data = js_sys::Uint8Array::new(&data);
424 let data_len = data.length() as usize;
425 if data_len <= buf.len() {
426 data.copy_to(&mut buf[..data_len]);
427 self.read_state = ConnectionReadState::PendingData(Vec::new());
428 break Poll::Ready(Ok(data_len));
429 } else {
430 let mut tmp_buf = vec![0; data_len];
431 data.copy_to(&mut tmp_buf[..]);
432 self.read_state = ConnectionReadState::PendingData(tmp_buf);
433 continue;
434 }
435 }
436 }
437 }
438 }
439}
440
441impl AsyncWrite for Connection {
442 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
443 if let Some(mut promise) = self.previous_write_promise.take() {
447 match Future::poll(Pin::new(&mut *promise), cx) {
448 Poll::Ready(Ok(_)) => (),
449 Poll::Ready(Err(err)) => return Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
450 Poll::Pending => {
451 self.previous_write_promise = Some(promise);
452 return Poll::Pending;
453 }
454 }
455 }
456
457 debug_assert!(self.previous_write_promise.is_none());
458 self.previous_write_promise = Some(SendWrapper::new(
459 self.inner.write(buf).map_err(JsErr::from)?.into(),
460 ));
461 Poll::Ready(Ok(buf.len()))
462 }
463
464 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
465 Poll::Ready(Ok(()))
467 }
468
469 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
470 match self.inner.shutdown() {
472 Ok(()) => Poll::Ready(Ok(())),
473 Err(err) => Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
474 }
475 }
476}
477
478impl Drop for Connection {
479 fn drop(&mut self) {
480 self.inner.close();
481 }
482}
483
484fn is_not_supported_error(err: &JsValue) -> bool {
486 if let Some(err) = err.dyn_ref::<js_sys::Error>() {
487 err.name() == "NotSupportedError"
488 } else {
489 false
490 }
491}
492
493fn js_value_to_addr(addr: &JsValue) -> Result<Multiaddr, JsErr> {
495 if let Some(addr) = addr.as_string() {
496 Ok(addr.parse()?)
497 } else {
498 Err(JsValue::from_str("Element in new_addrs is not a string").into())
499 }
500}
501
502pub struct JsErr(SendWrapper<JsValue>);
504
505impl From<JsValue> for JsErr {
506 fn from(val: JsValue) -> JsErr {
507 JsErr(SendWrapper::new(val))
508 }
509}
510
511impl From<tetsy_libp2p_core::multiaddr::Error> for JsErr {
512 fn from(err: tetsy_libp2p_core::multiaddr::Error) -> JsErr {
513 JsValue::from_str(&err.to_string()).into()
514 }
515}
516
517impl From<JsErr> for io::Error {
518 fn from(err: JsErr) -> io::Error {
519 io::Error::new(io::ErrorKind::Other, err.to_string())
520 }
521}
522
523impl fmt::Debug for JsErr {
524 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525 write!(f, "{}", self)
526 }
527}
528
529impl fmt::Display for JsErr {
530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531 if let Some(s) = self.0.as_string() {
532 write!(f, "{}", s)
533 } else if let Some(err) = self.0.dyn_ref::<js_sys::Error>() {
534 write!(f, "{}", String::from(err.message()))
535 } else if let Some(obj) = self.0.dyn_ref::<js_sys::Object>() {
536 write!(f, "{}", String::from(obj.to_string()))
537 } else {
538 write!(f, "{:?}", &*self.0)
539 }
540 }
541}
542
543impl error::Error for JsErr {}