web3 0.3.0

Ethereum JSON-RPC client.
Documentation
//! WebSocket Transport

extern crate websocket;

use std::collections::BTreeMap;
use std::sync::{atomic, Arc};

use api::{SubscriptionId};
use futures::{self, Future, Sink, Stream};
use futures::sync::{mpsc, oneshot};
use helpers;
use parking_lot::Mutex;
use rpc;
use self::websocket::{ClientBuilder, OwnedMessage};
use self::websocket::url::Url;
use transports::Result;
use transports::shared::{EventLoopHandle, Response};
use transports::tokio_core::reactor;
use {BatchTransport, DuplexTransport, Error, ErrorKind, RequestId, Transport};

impl From<websocket::WebSocketError> for Error {
  fn from(err: websocket::WebSocketError) -> Self {
    ErrorKind::Transport(format!("{:?}", err)).into()
  }
}

impl From<websocket::client::ParseError> for Error {
  fn from(err: websocket::client::ParseError) -> Self {
    ErrorKind::Transport(format!("{:?}", err)).into()
  }
}

type Pending = oneshot::Sender<Result<Vec<Result<rpc::Value>>>>;

type Subscription = mpsc::UnboundedSender<rpc::Value>;

/// A future representing pending WebSocket request, resolves to a response.
pub type WsTask<F> = Response<F, Vec<Result<rpc::Value>>>;

/// WebSocket transport
#[derive(Debug, Clone)]
pub struct WebSocket {
  id: Arc<atomic::AtomicUsize>,
  url: Url,
  pending: Arc<Mutex<BTreeMap<RequestId, Pending>>>,
  subscriptions: Arc<Mutex<BTreeMap<SubscriptionId, Subscription>>>,
  write_sender: mpsc::UnboundedSender<OwnedMessage>,
}

impl WebSocket {
  /// Create new WebSocket transport with separate event loop.
  /// NOTE: Dropping event loop handle will stop the transport layer!
  pub fn new(url: &str) -> Result<(EventLoopHandle, Self)> {
    let url = url.to_owned();
    EventLoopHandle::spawn(move |handle| Self::with_event_loop(&url, &handle).map_err(Into::into))
  }

  /// Create new WebSocket transport within existing Event Loop.
  pub fn with_event_loop(url: &str, handle: &reactor::Handle) -> Result<Self> {
    trace!("Connecting to: {:?}", url);

    let url: Url = url.parse()?;
    let pending: Arc<Mutex<BTreeMap<RequestId, Pending>>> = Default::default();
    let subscriptions: Arc<Mutex<BTreeMap<SubscriptionId, Subscription>>> = Default::default();
    let (write_sender, write_receiver) = mpsc::unbounded();

    let ws_future = {
      let pending_ = pending.clone();
      let subscriptions_ = subscriptions.clone();
      let write_sender_ = write_sender.clone();

      ClientBuilder::from_url(&url)
        .async_connect(None, handle)
        .from_err::<Error>()
        .map(|(duplex, _)| duplex.split())
        .and_then(move |(sink, stream)| {
          let reader = stream.from_err::<Error>().for_each(move |message| {
            trace!("Message received: {:?}", message);

            match message {
              OwnedMessage::Close(e) => write_sender_
                .unbounded_send(OwnedMessage::Close(e))
                .map_err(|_| ErrorKind::Transport("Error sending close message".into()).into()),
              OwnedMessage::Ping(d) => write_sender_
                .unbounded_send(OwnedMessage::Pong(d))
                .map_err(|_| ErrorKind::Transport("Error sending pong message".into()).into()),
              OwnedMessage::Text(t) => {
                if let Ok(notification) = helpers::to_notification_from_slice(t.as_bytes()) {
                  if let Some(rpc::Params::Map(params)) = notification.params {
                    let id = params.get("subscription");
                    let result = params.get("result");

                    if let (Some(&rpc::Value::String(ref id)), Some(result)) = (id, result) {
                      let id: SubscriptionId = id.clone().into();
                      if let Some(stream) = subscriptions_.lock().get(&id) {
                        return stream.unbounded_send(result.clone()).map_err(|_| {
                          ErrorKind::Transport("Error sending notification".into()).into()
                        });
                      } else {
                        warn!("Got notification for unknown subscription (id: {:?})", id);
                      }
                    } else {
                      error!("Got unsupported notification (id: {:?})", id);
                    }
                  }

                  return Ok(());
                }

                let response = helpers::to_response_from_slice(t.as_bytes());
                let outputs = match response {
                  Ok(rpc::Response::Single(output)) => vec![output],
                  Ok(rpc::Response::Batch(outputs)) => outputs,
                  _ => vec![],
                };

                let id = match outputs.get(0) {
                  Some(&rpc::Output::Success(ref success)) => success.id.clone(),
                  Some(&rpc::Output::Failure(ref failure)) => failure.id.clone(),
                  None => rpc::Id::Num(0),
                };

                if let rpc::Id::Num(num) = id {
                  if let Some(request) = pending_.lock().remove(&(num as usize)) {
                    trace!("Responding to (id: {:?}) with {:?}", num, outputs);
                    if let Err(err) = request.send(helpers::to_results_from_outputs(outputs)) {
                      warn!("Sending a response to deallocated channel: {:?}", err);
                    }
                  } else {
                    warn!("Got response for unknown request (id: {:?})", num);
                  }
                } else {
                  warn!("Got unsupported response (id: {:?})", id);
                }

                Ok(())
              }
              _ => Ok(()),
            }
          });

          let writer = sink
            .sink_from_err()
            .send_all(write_receiver.map_err(|_| websocket::WebSocketError::NoDataAvailable))
            .map(|_| ());

          reader.join(writer)
        })
    };

    handle.spawn(ws_future.map(|_| ()).map_err(|err| {
      error!("WebSocketError: {:?}", err);
    }));

    Ok(Self {
      id: Arc::new(atomic::AtomicUsize::new(1)),
      url: url,
      pending,
      subscriptions,
      write_sender,
    })
  }

  fn send_request<F, O>(&self, id: RequestId, request: rpc::Request, extract: F) -> WsTask<F>
  where
    F: Fn(Vec<Result<rpc::Value>>) -> O,
  {
    let request = helpers::to_string(&request);
    debug!("[{}] Calling: {}", id, request);
    let (tx, rx) = futures::oneshot();
    self.pending.lock().insert(id, tx);

    let result = self
      .write_sender
      .unbounded_send(OwnedMessage::Text(request))
      .map_err(|_| ErrorKind::Transport("Error sending request".into()).into());

    Response::new(id, result, rx, extract)
  }
}

impl Transport for WebSocket {
  type Out = WsTask<fn(Vec<Result<rpc::Value>>) -> Result<rpc::Value>>;

  fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
    let id = self.id.fetch_add(1, atomic::Ordering::AcqRel);
    let request = helpers::build_request(id, method, params);

    (id, request)
  }

  fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
    self.send_request(id, rpc::Request::Single(request), |response| match response
      .into_iter()
      .next()
    {
      Some(res) => res,
      None => Err(ErrorKind::InvalidResponse("Expected single, got batch.".into()).into()),
    })
  }
}

impl BatchTransport for WebSocket {
  type Batch = WsTask<fn(Vec<Result<rpc::Value>>) -> Result<Vec<Result<rpc::Value>>>>;

  fn send_batch<T>(&self, requests: T) -> Self::Batch
  where
    T: IntoIterator<Item = (RequestId, rpc::Call)>,
  {
    let mut it = requests.into_iter();
    let (id, first) = it.next()
      .map(|x| (x.0, Some(x.1)))
      .unwrap_or_else(|| (0, None));
    let requests = first.into_iter().chain(it.map(|x| x.1)).collect();
    self.send_request(id, rpc::Request::Batch(requests), Ok)
  }
}

impl DuplexTransport for WebSocket {
  type NotificationStream = Box<Stream<Item = rpc::Value, Error = Error> + Send + 'static>;

  fn subscribe(&self, id: &SubscriptionId) -> Self::NotificationStream {
    let (tx, rx) = mpsc::unbounded();
    if self.subscriptions.lock().insert(id.clone(), tx).is_some() {
        warn!("Replacing already-registered subscription with id {:?}", id)
    }
    Box::new(rx.map_err(|()| ErrorKind::Transport("No data available".into()).into()))
  }

  fn unsubscribe(&self, id: &SubscriptionId) {
    self.subscriptions.lock().remove(id);
  }
}

#[cfg(test)]
mod tests {
  extern crate tokio_core;
  extern crate websocket;

  use super::WebSocket;
  use futures::{Future, Sink, Stream};
  use rpc;
  use self::websocket::async::Server;
  use self::websocket::message::OwnedMessage;
  use self::websocket::server::InvalidConnection;
  use Transport;

  #[test]
  fn should_send_a_request() {
    // given
    let mut eloop = tokio_core::reactor::Core::new().unwrap();
    let handle = eloop.handle();
    let server = Server::bind("localhost:3000", &handle).unwrap();
    let f = {
      let handle_ = handle.clone();
      server
        .incoming()
        .take(1)
        .map_err(|InvalidConnection { error, .. }| error)
        .for_each(move |(upgrade, addr)| {
          trace!("Got a connection from {}", addr);
          let f = upgrade.accept().and_then(|(s, _)| {
            let (sink, stream) = s.split();

            stream
              .take_while(|m| Ok(!m.is_close()))
              .filter_map(|m| match m {
                OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
                OwnedMessage::Pong(_) => None,
                OwnedMessage::Text(t) => {
                  assert_eq!(
                    t,
                    r#"{"jsonrpc":"2.0","method":"eth_accounts","params":["1"],"id":1}"#
                  );
                  Some(OwnedMessage::Text(
                    r#"{"jsonrpc":"2.0","id":1,"result":"x"}"#.to_owned(),
                  ))
                }
                _ => None,
              })
              .forward(sink)
              .and_then(|(_, sink)| sink.send(OwnedMessage::Close(None)))
          });

          handle_.spawn(f.map(|_| ()).map_err(|_| ()));

          Ok(())
        })
    };
    handle.spawn(f.map_err(|_| ()));

    let ws = WebSocket::with_event_loop("ws://localhost:3000", &handle).unwrap();

    // when
    let res = ws.execute("eth_accounts", vec![rpc::Value::String("1".into())]);

    // then
    assert_eq!(eloop.run(res), Ok(rpc::Value::String("x".into())));
  }
}