wasm_ext/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the tetsy-libp2p `Transport` trait for external transports.
22//!
23//! This `Transport` is used in the context of WASM to allow delegating the transport mechanism
24//! to the code that uses tetsy-libp2p, as opposed to inside of tetsy-libp2p itself.
25//!
26//! > **Note**: This only allows transports that produce a raw stream with the remote. You
27//! >           couldn't, for example, pass an implementation QUIC.
28//!
29//! # Usage
30//!
31//! Call `new()` with a JavaScript object that implements the interface described in the `ffi`
32//! module.
33//!
34
35use futures::{prelude::*, future::Ready};
36use tetsy_libp2p_core::{transport::ListenerEvent, transport::TransportError, Multiaddr, Transport};
37use tetsy_send_wrapper::SendWrapper;
38use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll};
39use wasm_bindgen::{JsCast, prelude::*};
40use wasm_bindgen_futures::JsFuture;
41
42/// Contains the definition that one must match on the JavaScript side.
43pub mod ffi {
44    use wasm_bindgen::prelude::*;
45
46    #[wasm_bindgen]
47    extern "C" {
48        /// Type of the object that allows opening connections.
49        pub type Transport;
50        /// Type of the object that represents an open connection with a remote.
51        pub type Connection;
52        /// Type of the object that represents an event generated by listening.
53        pub type ListenEvent;
54        /// Type of the object that represents an event containing a new connection with a remote.
55        pub type ConnectionEvent;
56
57        /// Start attempting to dial the given multiaddress.
58        ///
59        /// The returned `Promise` must yield a [`Connection`] on success.
60        ///
61        /// If the multiaddress is not supported, you should return an instance of `Error` whose
62        /// `name` property has been set to the string `"NotSupportedError"`.
63        #[wasm_bindgen(method, catch)]
64        pub fn dial(this: &Transport, multiaddr: &str) -> Result<js_sys::Promise, JsValue>;
65
66        /// Start listening on the given multiaddress.
67        ///
68        /// The returned `Iterator` must yield `Promise`s to [`ListenEvent`] events.
69        ///
70        /// If the multiaddress is not supported, you should return an instance of `Error` whose
71        /// `name` property has been set to the string `"NotSupportedError"`.
72        #[wasm_bindgen(method, catch)]
73        pub fn listen_on(this: &Transport, multiaddr: &str) -> Result<js_sys::Iterator, JsValue>;
74
75        /// Returns an iterator of JavaScript `Promise`s that resolve to `ArrayBuffer` objects
76        /// (or resolve to null, see below). These `ArrayBuffer` objects contain the data that the
77        /// remote has sent to us. If the remote closes the connection, the iterator must produce
78        /// a `Promise` that resolves to `null`.
79        #[wasm_bindgen(method, getter)]
80        pub fn read(this: &Connection) -> js_sys::Iterator;
81
82        /// Writes data to the connection. Returns a `Promise` that resolves when the connection is
83        /// ready for writing again.
84        ///
85        /// If the `Promise` produces an error, the writing side of the connection is considered
86        /// unrecoverable and the connection should be closed as soon as possible.
87        ///
88        /// Guaranteed to only be called after the previous write promise has resolved.
89        #[wasm_bindgen(method, catch)]
90        pub fn write(this: &Connection, data: &[u8]) -> Result<js_sys::Promise, JsValue>;
91
92        /// Shuts down the writing side of the connection. After this has been called, the `write`
93        /// method will no longer be called.
94        #[wasm_bindgen(method, catch)]
95        pub fn shutdown(this: &Connection) -> Result<(), JsValue>;
96
97        /// Closes the connection. No other method will be called on this connection anymore.
98        #[wasm_bindgen(method)]
99        pub fn close(this: &Connection);
100
101        /// List of addresses we have started listening on. Must be an array of strings of
102        /// multiaddrs.
103        #[wasm_bindgen(method, getter)]
104        pub fn new_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
105
106        /// List of addresses that have expired. Must be an array of strings of multiaddrs.
107        #[wasm_bindgen(method, getter)]
108        pub fn expired_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
109
110        /// List of [`ConnectionEvent`] object that has been received.
111        #[wasm_bindgen(method, getter)]
112        pub fn new_connections(this: &ListenEvent) -> Option<Box<[JsValue]>>;
113
114        /// Promise to the next event that the listener will generate.
115        #[wasm_bindgen(method, getter)]
116        pub fn next_event(this: &ListenEvent) -> JsValue;
117
118        /// The [`Connection`] object for communication with the remote.
119        #[wasm_bindgen(method, getter)]
120        pub fn connection(this: &ConnectionEvent) -> Connection;
121
122        /// The address we observe for the remote connection.
123        #[wasm_bindgen(method, getter)]
124        pub fn observed_addr(this: &ConnectionEvent) -> String;
125
126        /// The address we are listening on, that received the remote connection.
127        #[wasm_bindgen(method, getter)]
128        pub fn local_addr(this: &ConnectionEvent) -> String;
129    }
130
131    #[cfg(feature = "websocket")]
132    #[wasm_bindgen(module = "/src/websockets.js")]
133    extern "C" {
134        /// Returns a `Transport` implemented using websockets.
135        pub fn websocket_transport() -> Transport;
136    }
137}
138
139/// Implementation of `Transport` whose implementation is handled by some FFI.
140pub struct ExtTransport {
141    inner: SendWrapper<ffi::Transport>,
142}
143
144impl ExtTransport {
145    /// Creates a new `ExtTransport` that uses the given external `Transport`.
146    pub fn new(transport: ffi::Transport) -> Self {
147        ExtTransport {
148            inner: SendWrapper::new(transport),
149        }
150    }
151}
152
153impl fmt::Debug for ExtTransport {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        f.debug_tuple("ExtTransport").finish()
156    }
157}
158
159impl Clone for ExtTransport {
160    fn clone(&self) -> Self {
161        ExtTransport {
162            inner: SendWrapper::new(self.inner.clone().into()),
163        }
164    }
165}
166
167impl Transport for ExtTransport {
168    type Output = Connection;
169    type Error = JsErr;
170    type Listener = Listen;
171    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
172    type Dial = Dial;
173
174    fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
175        let iter = self
176            .inner
177            .listen_on(&addr.to_string())
178            .map_err(|err| {
179                if is_not_supported_error(&err) {
180                    TransportError::MultiaddrNotSupported(addr)
181                } else {
182                    TransportError::Other(JsErr::from(err))
183                }
184            })?;
185
186        Ok(Listen {
187            iterator: SendWrapper::new(iter),
188            next_event: None,
189            pending_events: VecDeque::new(),
190        })
191    }
192
193    fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
194        let promise = self
195            .inner
196            .dial(&addr.to_string())
197            .map_err(|err| {
198                if is_not_supported_error(&err) {
199                    TransportError::MultiaddrNotSupported(addr)
200                } else {
201                    TransportError::Other(JsErr::from(err))
202                }
203            })?;
204
205        Ok(Dial {
206            inner: SendWrapper::new(promise.into()),
207        })
208    }
209
210    fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
211        None
212    }
213}
214
215/// Future that dial a remote through an external transport.
216#[must_use = "futures do nothing unless polled"]
217pub struct Dial {
218    /// A promise that will resolve to a `ffi::Connection` on success.
219    inner: SendWrapper<JsFuture>,
220}
221
222impl fmt::Debug for Dial {
223    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
224        f.debug_tuple("Dial").finish()
225    }
226}
227
228impl Future for Dial {
229    type Output = Result<Connection, JsErr>;
230
231    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
232        match Future::poll(Pin::new(&mut *self.inner), cx) {
233            Poll::Ready(Ok(connec)) => Poll::Ready(Ok(Connection::new(connec.into()))),
234            Poll::Pending => Poll::Pending,
235            Poll::Ready(Err(err)) => Poll::Ready(Err(JsErr::from(err))),
236        }
237    }
238}
239
240/// Stream that listens for incoming connections through an external transport.
241#[must_use = "futures do nothing unless polled"]
242pub struct Listen {
243    /// Iterator of `ListenEvent`s.
244    iterator: SendWrapper<js_sys::Iterator>,
245    /// Promise that will yield the next `ListenEvent`.
246    next_event: Option<SendWrapper<JsFuture>>,
247    /// List of events that we are waiting to propagate.
248    pending_events: VecDeque<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>>,
249}
250
251impl fmt::Debug for Listen {
252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253        f.debug_tuple("Listen").finish()
254    }
255}
256
257impl Stream for Listen {
258    type Item = Result<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>, JsErr>;
259
260    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261        loop {
262            if let Some(ev) = self.pending_events.pop_front() {
263                return Poll::Ready(Some(Ok(ev)));
264            }
265
266            // Try to fill `self.next_event` if necessary and possible. If we fail, then
267            // `Ready(None)` is returned below.
268            if self.next_event.is_none() {
269                if let Ok(ev) = self.iterator.next() {
270                    if !ev.done() {
271                        let promise: js_sys::Promise = ev.value().into();
272                        self.next_event = Some(SendWrapper::new(promise.into()));
273                    }
274                }
275            }
276
277            let event = if let Some(next_event) = self.next_event.as_mut() {
278                let e = match Future::poll(Pin::new(&mut **next_event), cx) {
279                    Poll::Ready(Ok(ev)) => ffi::ListenEvent::from(ev),
280                    Poll::Pending => return Poll::Pending,
281                    Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
282                };
283                self.next_event = None;
284                e
285            } else {
286                return Poll::Ready(None);
287            };
288
289            for addr in event
290                .new_addrs()
291                .into_iter()
292                .flat_map(|e| e.to_vec().into_iter())
293            {
294                let addr = js_value_to_addr(&addr)?;
295                self.pending_events
296                    .push_back(ListenerEvent::NewAddress(addr));
297            }
298
299            for upgrade in event
300                .new_connections()
301                .into_iter()
302                .flat_map(|e| e.to_vec().into_iter())
303            {
304                let upgrade: ffi::ConnectionEvent = upgrade.into();
305                self.pending_events.push_back(ListenerEvent::Upgrade {
306                    local_addr: upgrade.local_addr().parse()?,
307                    remote_addr: upgrade.observed_addr().parse()?,
308                    upgrade: futures::future::ok(Connection::new(upgrade.connection())),
309                });
310            }
311
312            for addr in event
313                .expired_addrs()
314                .into_iter()
315                .flat_map(|e| e.to_vec().into_iter())
316            {
317                match js_value_to_addr(&addr) {
318                    Ok(addr) => self.pending_events.push_back(ListenerEvent::NewAddress(addr)),
319                    Err(err) => self.pending_events.push_back(ListenerEvent::Error(err)),
320                }
321            }
322        }
323    }
324}
325
326/// Active stream of data with a remote.
327///
328/// It is guaranteed that each call to `io::Write::write` on this object maps to exactly one call
329/// to `write` on the FFI. In other words, no internal buffering happens for writes, and data can't
330/// be split.
331pub struct Connection {
332    /// The FFI object.
333    inner: SendWrapper<ffi::Connection>,
334
335    /// The iterator that was returned by `read()`.
336    read_iterator: SendWrapper<js_sys::Iterator>,
337
338    /// Reading part of the connection.
339    read_state: ConnectionReadState,
340
341    /// When we write data using the FFI, a promise is returned containing the moment when the
342    /// underlying transport is ready to accept data again. This promise is stored here.
343    /// If this is `Some`, we must wait until the contained promise is resolved to write again.
344    previous_write_promise: Option<SendWrapper<JsFuture>>,
345}
346
347impl Connection {
348    /// Initializes a `Connection` object from the FFI connection.
349    fn new(inner: ffi::Connection) -> Self {
350        let read_iterator = inner.read();
351
352        Connection {
353            inner: SendWrapper::new(inner),
354            read_iterator: SendWrapper::new(read_iterator),
355            read_state: ConnectionReadState::PendingData(Vec::new()),
356            previous_write_promise: None,
357        }
358    }
359}
360
361/// Reading side of the connection.
362enum ConnectionReadState {
363    /// Some data have been read and are waiting to be transferred. Can be empty.
364    PendingData(Vec<u8>),
365    /// Waiting for a `Promise` containing the next data.
366    Waiting(SendWrapper<JsFuture>),
367    /// An error occurred or an earlier read yielded EOF.
368    Finished,
369}
370
371impl fmt::Debug for Connection {
372    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373        f.debug_tuple("Connection").finish()
374    }
375}
376
377impl AsyncRead for Connection {
378    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
379        loop {
380            match mem::replace(&mut self.read_state, ConnectionReadState::Finished) {
381                ConnectionReadState::Finished => break Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
382
383                ConnectionReadState::PendingData(ref data) if data.is_empty() => {
384                    let iter_next = self.read_iterator.next().map_err(JsErr::from)?;
385                    if iter_next.done() {
386                        self.read_state = ConnectionReadState::Finished;
387                    } else {
388                        let promise: js_sys::Promise = iter_next.value().into();
389                        let promise = SendWrapper::new(promise.into());
390                        self.read_state = ConnectionReadState::Waiting(promise);
391                    }
392                    continue;
393                }
394
395                ConnectionReadState::PendingData(mut data) => {
396                    debug_assert!(!data.is_empty());
397                    if buf.len() <= data.len() {
398                        buf.copy_from_slice(&data[..buf.len()]);
399                        self.read_state =
400                            ConnectionReadState::PendingData(data.split_off(buf.len()));
401                        break Poll::Ready(Ok(buf.len()));
402                    } else {
403                        let len = data.len();
404                        buf[..len].copy_from_slice(&data);
405                        self.read_state = ConnectionReadState::PendingData(Vec::new());
406                        break Poll::Ready(Ok(len));
407                    }
408                }
409
410                ConnectionReadState::Waiting(mut promise) => {
411                    let data = match Future::poll(Pin::new(&mut *promise), cx) {
412                        Poll::Ready(Ok(ref data)) if data.is_null() => break Poll::Ready(Ok(0)),
413                        Poll::Ready(Ok(data)) => data,
414                        Poll::Ready(Err(err)) => break Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
415                        Poll::Pending => {
416                            self.read_state = ConnectionReadState::Waiting(promise);
417                            break Poll::Pending;
418                        }
419                    };
420
421                    // Try to directly copy the data into `buf` if it is large enough, otherwise
422                    // transition to `PendingData` and loop again.
423                    let data = js_sys::Uint8Array::new(&data);
424                    let data_len = data.length() as usize;
425                    if data_len <= buf.len() {
426                        data.copy_to(&mut buf[..data_len]);
427                        self.read_state = ConnectionReadState::PendingData(Vec::new());
428                        break Poll::Ready(Ok(data_len));
429                    } else {
430                        let mut tmp_buf = vec![0; data_len];
431                        data.copy_to(&mut tmp_buf[..]);
432                        self.read_state = ConnectionReadState::PendingData(tmp_buf);
433                        continue;
434                    }
435                }
436            }
437        }
438    }
439}
440
441impl AsyncWrite for Connection {
442    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
443        // Note: as explained in the doc-comments of `Connection`, each call to this function must
444        // map to exactly one call to `self.inner.write()`.
445
446        if let Some(mut promise) = self.previous_write_promise.take() {
447            match Future::poll(Pin::new(&mut *promise), cx) {
448                Poll::Ready(Ok(_)) => (),
449                Poll::Ready(Err(err)) => return Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
450                Poll::Pending => {
451                    self.previous_write_promise = Some(promise);
452                    return Poll::Pending;
453                }
454            }
455        }
456
457        debug_assert!(self.previous_write_promise.is_none());
458        self.previous_write_promise = Some(SendWrapper::new(
459            self.inner.write(buf).map_err(JsErr::from)?.into(),
460        ));
461        Poll::Ready(Ok(buf.len()))
462    }
463
464    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
465        // There's no flushing mechanism. In the FFI we consider that writing implicitly flushes.
466        Poll::Ready(Ok(()))
467    }
468
469    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
470        // Shutting down is considered instantaneous.
471        match self.inner.shutdown() {
472            Ok(()) => Poll::Ready(Ok(())),
473            Err(err) => Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
474        }
475    }
476}
477
478impl Drop for Connection {
479    fn drop(&mut self) {
480        self.inner.close();
481    }
482}
483
484/// Returns true if `err` is an error about an address not being supported.
485fn is_not_supported_error(err: &JsValue) -> bool {
486    if let Some(err) = err.dyn_ref::<js_sys::Error>() {
487        err.name() == "NotSupportedError"
488    } else {
489        false
490    }
491}
492
493/// Turns a `JsValue` containing a `String` into a `Multiaddr`, if possible.
494fn js_value_to_addr(addr: &JsValue) -> Result<Multiaddr, JsErr> {
495    if let Some(addr) = addr.as_string() {
496        Ok(addr.parse()?)
497    } else {
498        Err(JsValue::from_str("Element in new_addrs is not a string").into())
499    }
500}
501
502/// Error that can be generated by the `ExtTransport`.
503pub struct JsErr(SendWrapper<JsValue>);
504
505impl From<JsValue> for JsErr {
506    fn from(val: JsValue) -> JsErr {
507        JsErr(SendWrapper::new(val))
508    }
509}
510
511impl From<tetsy_libp2p_core::multiaddr::Error> for JsErr {
512    fn from(err: tetsy_libp2p_core::multiaddr::Error) -> JsErr {
513        JsValue::from_str(&err.to_string()).into()
514    }
515}
516
517impl From<JsErr> for io::Error {
518    fn from(err: JsErr) -> io::Error {
519        io::Error::new(io::ErrorKind::Other, err.to_string())
520    }
521}
522
523impl fmt::Debug for JsErr {
524    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525        write!(f, "{}", self)
526    }
527}
528
529impl fmt::Display for JsErr {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        if let Some(s) = self.0.as_string() {
532            write!(f, "{}", s)
533        } else if let Some(err) = self.0.dyn_ref::<js_sys::Error>() {
534            write!(f, "{}", String::from(err.message()))
535        } else if let Some(obj) = self.0.dyn_ref::<js_sys::Object>() {
536            write!(f, "{}", String::from(obj.to_string()))
537        } else {
538            write!(f, "{:?}", &*self.0)
539        }
540    }
541}
542
543impl error::Error for JsErr {}