web3 0.0.5

Ethereum JSON-RPC client.
Documentation
//! Easy to use utilities for confirmations.

use std::time::Duration;
use futures::{IntoFuture, Future, Stream, Poll};
use futures::stream::Skip;
use api::{Eth, EthFilter, Namespace, CreateFilter, FilterStream};
use types::{H256, U256, TransactionRequest, TransactionReceipt};
use helpers::CallResult;
use {Transport, Error};

/// Checks whether an event has been confirmed.
pub trait ConfirmationCheck {
  /// Future resolved when is known whether an event has been confirmed.
  type Check: IntoFuture<Item = Option<U256>, Error = Error>;

  /// Should be called to get future which resolves when confirmation state is known.
  fn check(&self) -> Self::Check;
}

impl<F, T> ConfirmationCheck for F where
  F: Fn() -> T,
  T: IntoFuture<Item = Option<U256>, Error = Error>,
{
  type Check = T;

  fn check(&self) -> Self::Check {
    (*self)()
  }
}

enum WaitForConfirmationsState<F, O> {
  WaitForNextBlock,
  CheckConfirmation(F),
  CompareConfirmations(u64, CallResult<U256, O>),
}

struct WaitForConfirmations<T, V, F> where T: Transport {
  transport: T,
  state: WaitForConfirmationsState<F, T::Out>,
  filter_stream: Skip<FilterStream<T, H256>>,
  confirmation_check: V,
  confirmations: u64,
}

impl<T, V, F> Future for WaitForConfirmations<T, V, F::Future> where
  T: Transport,
  V: ConfirmationCheck<Check = F>,
  F: IntoFuture<Item = Option<U256>, Error = Error>,
{

  type Item = ();
  type Error = Error;

  fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    loop {
      let next_state = match self.state {
        WaitForConfirmationsState::WaitForNextBlock => {
          let _ = try_ready!(self.filter_stream.poll());
          WaitForConfirmationsState::CheckConfirmation(self.confirmation_check.check().into_future())
        },
        WaitForConfirmationsState::CheckConfirmation(ref mut future) => match try_ready!(future.poll()) {
          Some(confirmation_block_number) => {
            let future = Eth::new(&self.transport).block_number();
            WaitForConfirmationsState::CompareConfirmations(confirmation_block_number.low_u64(), future)
          },
          None => WaitForConfirmationsState::WaitForNextBlock,
        },
        WaitForConfirmationsState::CompareConfirmations(confirmation_block_number, ref mut block_number_future) => {
          let block_number = try_ready!(block_number_future.poll()).low_u64();
          if confirmation_block_number + self.confirmations <= block_number {
            return Ok(().into())
          } else {
            WaitForConfirmationsState::WaitForNextBlock
          }
        },
      };
      self.state = next_state;
    }
  }
}

struct CreateWaitForConfirmations<T: Transport, V> {
  create_filter: CreateFilter<T, H256>,
  poll_interval: Duration,
  transport: Option<T>,
  confirmation_check: Option<V>,
  confirmations: u64,
}

enum ConfirmationsState<T: Transport, V, F> {
  Create(CreateWaitForConfirmations<T, V>),
  Wait(WaitForConfirmations<T, V, F>),
}

/// On each new block checks confirmations.
pub struct Confirmations<T: Transport, V, F> {
  state: ConfirmationsState<T, V, F>,
}

impl<T: Transport + Clone, V, F> Confirmations<T, V, F> {
  fn new(transport: T, poll_interval: Duration, confirmations: u64, check: V) -> Self {
    let eth = EthFilter::new(transport.clone());
    Confirmations {
      state: ConfirmationsState::Create(CreateWaitForConfirmations {
        create_filter: eth.create_blocks_filter(),
        poll_interval,
        transport: Some(transport),
        confirmation_check: Some(check),
        confirmations,
      })
    }
  }
}

impl<T, V, F> Future for Confirmations<T, V, F::Future> where
  T: Transport,
  V: ConfirmationCheck<Check = F>,
  F: IntoFuture<Item = Option<U256>, Error = Error>,
{

  type Item = ();
  type Error = Error;

  fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    loop {
      let next_state = match self.state {
        ConfirmationsState::Create(ref mut create) => {
          let filter = try_ready!(create.create_filter.poll());
          let future = WaitForConfirmations {
            transport: create.transport.take().expect("future polled after ready; qed"),
            state: WaitForConfirmationsState::WaitForNextBlock,
            filter_stream: filter.stream(create.poll_interval).skip(create.confirmations),
            confirmation_check: create.confirmation_check.take().expect("future polled after ready; qed"),
            confirmations: create.confirmations,
          };
          ConfirmationsState::Wait(future)
        },
        ConfirmationsState::Wait(ref mut wait) => return Future::poll(wait),
      };
      self.state = next_state;
    }
  }
}

/// Should be used to wait for confirmations
pub fn wait_for_confirmations<T, V, F>(transport: T, poll_interval: Duration, confirmations: u64, check: V) -> Confirmations<T, V, F::Future> where
  T: Transport + Clone,
  V: ConfirmationCheck<Check = F>,
  F: IntoFuture<Item = Option<U256>, Error = Error>,
{
  Confirmations::new(transport, poll_interval, confirmations, check)
}

struct TransactionReceiptBlockNumber<T: Transport> {
  future: CallResult<Option<TransactionReceipt>, T::Out>,
}

impl<T: Transport> Future for TransactionReceiptBlockNumber<T> {
  type Item = Option<U256>;
  type Error = Error;

  fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    let receipt = try_ready!(self.future.poll());
    Ok(receipt.map(|receipt| receipt.block_number).into())
  }
}

struct TransactionReceiptBlockNumberCheck<T: Transport> {
  eth: Eth<T>,
  hash: H256,
}

impl<T: Transport> TransactionReceiptBlockNumberCheck<T> {
  fn new(eth: Eth<T>, hash: H256) -> Self {
    TransactionReceiptBlockNumberCheck {
      eth,
      hash,
    }
  }
}

impl<T: Transport> ConfirmationCheck for TransactionReceiptBlockNumberCheck<T> {
  type Check = TransactionReceiptBlockNumber<T>;

  fn check(&self) -> Self::Check {
    TransactionReceiptBlockNumber {
      future: self.eth.transaction_receipt(self.hash.clone())
    }
  }
}

enum SendTransactionWithConfirmationState<T: Transport> {
  SendTransaction(CallResult<H256, T::Out>),
  WaitForConfirmations(H256, Confirmations<T, TransactionReceiptBlockNumberCheck<T>, TransactionReceiptBlockNumber<T>>),
  GetTransactionReceipt(CallResult<Option<TransactionReceipt>, T::Out>),
}

/// Sends transaction and then checks if has been confirmed.
pub struct SendTransactionWithConfirmation<T: Transport> {
  state: SendTransactionWithConfirmationState<T>,
  eth: Eth<T>,
  transport: T,
  poll_interval: Duration,
  confirmations: u64,
}

impl<T: Transport + Clone> SendTransactionWithConfirmation<T> {
  fn new(transport: T, tx: TransactionRequest, poll_interval: Duration, confirmations: u64) -> Self {
    let eth = Eth::new(transport.clone());
    SendTransactionWithConfirmation {
      state: SendTransactionWithConfirmationState::SendTransaction(eth.send_transaction(tx)),
      eth,
      transport,
      poll_interval,
      confirmations,
    }
  }
}

impl<T: Transport + Clone> Future for SendTransactionWithConfirmation<T> {
  type Item = TransactionReceipt;
  type Error = Error;

  fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    loop {
      let next_state = match self.state {
        SendTransactionWithConfirmationState::SendTransaction(ref mut future) => {
          let hash = try_ready!(future.poll());
          let confirmation_check = TransactionReceiptBlockNumberCheck::new(Eth::new(self.transport.clone()), hash.clone());
          let wait = wait_for_confirmations(self.transport.clone(), self.poll_interval, self.confirmations, confirmation_check);
          SendTransactionWithConfirmationState::WaitForConfirmations(hash, wait)
        },
        SendTransactionWithConfirmationState::WaitForConfirmations(hash, ref mut future) => {
          let _confirmed = try_ready!(Future::poll(future));
          let receipt_future = self.eth.transaction_receipt(hash);
          SendTransactionWithConfirmationState::GetTransactionReceipt(receipt_future)
        },
        SendTransactionWithConfirmationState::GetTransactionReceipt(ref mut future) => {
          let receipt = try_ready!(Future::poll(future)).expect("receipt can't be null after wait for confirmations; qed");
          return Ok(receipt.into());
        },
      };
      self.state = next_state;
    }
  }
}

/// Sends transaction and returns future resolved after transaction is confirmed
pub fn send_transaction_with_confirmation<T>(transport: T, tx: TransactionRequest, poll_interval: Duration, confirmations: u64) -> SendTransactionWithConfirmation<T> where T: Transport + Clone {
  SendTransactionWithConfirmation::new(transport, tx, poll_interval, confirmations)
}

#[cfg(test)]
mod tests {
  use std::time::Duration;
  use futures::Future;
  use helpers::tests::TestTransport;
  use types::{TransactionRequest, TransactionReceipt};
  use super::send_transaction_with_confirmation;
  use rpc::Value;

  #[test]
  fn test_send_transaction_with_confirmation() {
    let mut transport = TestTransport::default();
    let confirmations = 3;
    let transaction_request = TransactionRequest {
      from: 0x123.into(),
      to: Some(0x123.into()),
      gas: None,
      gas_price: Some(1.into()),
      value: Some(1.into()),
      data: None,
      nonce: None,
      condition: None,
    };

    let transaction_receipt = TransactionReceipt {
      transaction_hash: 0.into(),
      transaction_index: 0.into(),
      block_hash: 0.into(),
      block_number: 2.into(),
      cumulative_gas_used: 0.into(),
      gas_used: 0.into(),
      contract_address: None,
      logs: vec![],
    };

    let poll_interval = Duration::from_secs(0);
    transport.add_response(Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000111"#.into()));
    transport.add_response(Value::String("0x123".into()));
    transport.add_response(Value::Array(vec![
      Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into()),
      Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000457"#.into()),
    ]));
    transport.add_response(Value::Array(vec![
      Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000458"#.into()),
    ]));
    transport.add_response(Value::Array(vec![
      Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000459"#.into()),
    ]));
    transport.add_response(Value::Null);
    transport.add_response(Value::Array(vec![
      Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000460"#.into()),
      Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000461"#.into()),
    ]));
    transport.add_response(Value::Null);
    transport.add_response(json!(transaction_receipt));
    transport.add_response(Value::String("0x6".into()));
    transport.add_response(json!(transaction_receipt));
    transport.add_response(Value::Bool(true));

    let confirmation = {
      let future = send_transaction_with_confirmation(&transport, transaction_request, poll_interval, confirmations);
      future.wait()
    };

    transport.assert_request("eth_sendTransaction", &[r#"{"from":"0x0000000000000000000000000000000000000123","gasPrice":"0x1","to":"0x0000000000000000000000000000000000000123","value":"0x1"}"#.into()]);
    transport.assert_request("eth_newBlockFilter", &[]);
    transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
    transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
    transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
    transport.assert_request("eth_getTransactionReceipt", &[r#""0x0000000000000000000000000000000000000000000000000000000000000111""#.into()]);
    transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
    transport.assert_request("eth_getTransactionReceipt", &[r#""0x0000000000000000000000000000000000000000000000000000000000000111""#.into()]);
    transport.assert_request("eth_getTransactionReceipt", &[r#""0x0000000000000000000000000000000000000000000000000000000000000111""#.into()]);
    transport.assert_request("eth_blockNumber", &[]);
    transport.assert_request("eth_getTransactionReceipt", &[r#""0x0000000000000000000000000000000000000000000000000000000000000111""#.into()]);
    transport.assert_no_more_requests();
    assert_eq!(confirmation, Ok(transaction_receipt));
  }
}