web3 0.3.1

Ethereum JSON-RPC client.
Documentation
//! Batching Transport

use std::mem;
use std::collections::BTreeMap;
use std::sync::Arc;
use futures::{self, future, Future};
use futures::sync::oneshot;
use parking_lot::Mutex;
use rpc;
use transports::Result;
use {BatchTransport, Error as RpcError, ErrorKind, RequestId, Transport};

type Pending = oneshot::Sender<Result<rpc::Value>>;
type PendingRequests = Arc<Mutex<BTreeMap<RequestId, Pending>>>;

/// Transport allowing to batch queries together.
#[derive(Debug, Clone)]
pub struct Batch<T> {
    transport: T,
    pending: PendingRequests,
    batch: Arc<Mutex<Vec<(RequestId, rpc::Call)>>>,
}

impl<T> Batch<T>
where
    T: BatchTransport,
{
    /// Creates new Batch transport given existing transport supporing batch requests.
    pub fn new(transport: T) -> Self {
        Batch {
            transport,
            pending: Default::default(),
            batch: Default::default(),
        }
    }

    /// Sends all requests as a batch.
    pub fn submit_batch(&self) -> BatchFuture<T::Batch> {
        let batch = mem::replace(&mut *self.batch.lock(), vec![]);
        let ids = batch.iter().map(|&(id, _)| id).collect::<Vec<_>>();

        let batch = self.transport.send_batch(batch);
        let pending = self.pending.clone();

        BatchFuture {
            state: BatchState::SendingBatch(batch, ids),
            pending,
        }
    }
}

impl<T> Transport for Batch<T>
where
    T: BatchTransport,
{
    type Out = SingleResult;

    fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
        self.transport.prepare(method, params)
    }

    fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
        let (tx, rx) = futures::oneshot();
        self.pending.lock().insert(id, tx);
        self.batch.lock().push((id, request));

        SingleResult(rx)
    }
}

enum BatchState<T> {
    SendingBatch(T, Vec<RequestId>),
    Resolving(
        future::JoinAll<Vec<::std::result::Result<(), Result<rpc::Value>>>>,
        Result<Vec<Result<rpc::Value>>>,
    ),
    Done,
}

/// A result of submitting a batch request.
/// Returns the results of all requests within the batch.
pub struct BatchFuture<T> {
    state: BatchState<T>,
    pending: PendingRequests,
}

impl<T: Future<Item = Vec<Result<rpc::Value>>, Error = RpcError>> Future for BatchFuture<T> {
    type Item = Vec<Result<rpc::Value>>;
    type Error = RpcError;

    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
        loop {
            match mem::replace(&mut self.state, BatchState::Done) {
                BatchState::SendingBatch(mut batch, ids) => {
                    let res = match batch.poll() {
                        Ok(futures::Async::NotReady) => {
                            self.state = BatchState::SendingBatch(batch, ids);
                            return Ok(futures::Async::NotReady);
                        }
                        Ok(futures::Async::Ready(v)) => Ok(v),
                        Err(err) => Err(err),
                    };

                    let mut pending = self.pending.lock();
                    let sending = ids.into_iter()
                        .enumerate()
                        .filter_map(|(idx, request_id)| {
                            pending.remove(&request_id).map(|rx| match res {
                                Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
                                Err(ref err) => rx.send(Err(err.clone())),
                                _ => rx.send(Err(ErrorKind::Internal.into())),
                            })
                        })
                        .collect::<Vec<_>>();

                    self.state = BatchState::Resolving(future::join_all(sending), res);
                }
                BatchState::Resolving(mut all, res) => {
                    if let Ok(futures::Async::NotReady) = all.poll() {
                        self.state = BatchState::Resolving(all, res);
                        return Ok(futures::Async::NotReady);
                    }

                    return Ok(futures::Async::Ready(res?));
                }
                BatchState::Done => {
                    panic!("Poll after Ready.");
                }
            };
        }
    }
}

/// Result of calling a single method that will be part of the batch.
/// Converts `oneshot::Receiver` error into `RpcError::Internal`
pub struct SingleResult(oneshot::Receiver<Result<rpc::Value>>);

impl Future for SingleResult {
    type Item = rpc::Value;
    type Error = RpcError;

    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
        let res = try_ready!(
            self.0
                .poll()
                .map_err(|_| RpcError::from(ErrorKind::Internal))
        );
        res.map(futures::Async::Ready)
    }
}