use crate::{
ens,
pubsub::{PubsubClient, SubscriptionStream},
stream::{FilterWatcher, DEFAULT_POLL_INTERVAL},
FromErr, Http as HttpProvider, JsonRpcClient, MockProvider, PendingTransaction,
};
use ethers_core::{
abi::{self, Detokenize, ParamType},
types::{
Address, Block, BlockId, BlockNumber, BlockTrace, Bytes, Filter, Log, NameOrAddress,
Selector, Signature, Trace, TraceFilter, TraceType, Transaction, TransactionReceipt,
TransactionRequest, TxHash, TxpoolContent, TxpoolInspect, TxpoolStatus, H256, U256, U64,
},
utils,
};
use crate::Middleware;
use async_trait::async_trait;
use hex::FromHex;
use serde::{de::DeserializeOwned, Serialize};
use thiserror::Error;
use url::{ParseError, Url};
use std::{convert::TryFrom, fmt::Debug, time::Duration};
use tracing::trace;
use tracing_futures::Instrument;
#[derive(Clone, Debug)]
pub struct Provider<P>(P, Option<Address>, Option<Duration>, Option<Address>);
impl<P> AsRef<P> for Provider<P> {
fn as_ref(&self) -> &P {
&self.0
}
}
impl FromErr<ProviderError> for ProviderError {
fn from(src: ProviderError) -> Self {
src
}
}
#[derive(Debug, Error)]
pub enum ProviderError {
#[error(transparent)]
JsonRpcClientError(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error("ens name not found: {0}")]
EnsError(String),
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
#[error(transparent)]
HexError(#[from] hex::FromHexError),
#[error("custom error: {0}")]
CustomError(String),
}
#[derive(Clone, Debug)]
pub enum FilterKind<'a> {
Logs(&'a Filter),
NewBlocks,
PendingTransactions,
}
impl<P: JsonRpcClient> Provider<P> {
pub fn new(provider: P) -> Self {
Self(provider, None, None, None)
}
pub fn with_sender(mut self, address: impl Into<Address>) -> Self {
self.3 = Some(address.into());
self
}
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, ProviderError>
where
T: Debug + Serialize + Send + Sync,
R: Serialize + DeserializeOwned + Debug,
{
let span =
tracing::trace_span!("rpc", method = method, params = ?serde_json::to_string(¶ms)?);
let res = async move {
trace!("tx");
let res: R = self.0.request(method, params).await.map_err(Into::into)?;
trace!(rx = ?serde_json::to_string(&res)?);
Ok::<_, ProviderError>(res)
}
.instrument(span)
.await?;
Ok(res)
}
async fn get_block_gen<Tx: Default + Serialize + DeserializeOwned + Debug>(
&self,
id: BlockId,
include_txs: bool,
) -> Result<Option<Block<Tx>>, ProviderError> {
let include_txs = utils::serialize(&include_txs);
Ok(match id {
BlockId::Hash(hash) => {
let hash = utils::serialize(&hash);
self.request("eth_getBlockByHash", [hash, include_txs])
.await?
}
BlockId::Number(num) => {
let num = utils::serialize(&num);
self.request("eth_getBlockByNumber", [num, include_txs])
.await?
}
})
}
}
#[async_trait]
impl<P: JsonRpcClient> Middleware for Provider<P> {
type Error = ProviderError;
type Provider = P;
type Inner = Self;
fn inner(&self) -> &Self::Inner {
unreachable!("There is no inner provider here")
}
fn provider(&self) -> &Provider<Self::Provider> {
self
}
async fn get_block_number(&self) -> Result<U64, ProviderError> {
self.request("eth_blockNumber", ()).await
}
async fn get_block<T: Into<BlockId> + Send + Sync>(
&self,
block_hash_or_number: T,
) -> Result<Option<Block<TxHash>>, Self::Error> {
self.get_block_gen(block_hash_or_number.into(), false).await
}
async fn get_block_with_txs<T: Into<BlockId> + Send + Sync>(
&self,
block_hash_or_number: T,
) -> Result<Option<Block<Transaction>>, ProviderError> {
self.get_block_gen(block_hash_or_number.into(), true).await
}
async fn get_transaction<T: Send + Sync + Into<TxHash>>(
&self,
transaction_hash: T,
) -> Result<Option<Transaction>, ProviderError> {
let hash = transaction_hash.into();
self.request("eth_getTransactionByHash", [hash]).await
}
async fn get_transaction_receipt<T: Send + Sync + Into<TxHash>>(
&self,
transaction_hash: T,
) -> Result<Option<TransactionReceipt>, ProviderError> {
let hash = transaction_hash.into();
self.request("eth_getTransactionReceipt", [hash]).await
}
async fn get_gas_price(&self) -> Result<U256, ProviderError> {
self.request("eth_gasPrice", ()).await
}
async fn get_accounts(&self) -> Result<Vec<Address>, ProviderError> {
self.request("eth_accounts", ()).await
}
async fn get_transaction_count<T: Into<NameOrAddress> + Send + Sync>(
&self,
from: T,
block: Option<BlockNumber>,
) -> Result<U256, ProviderError> {
let from = match from.into() {
NameOrAddress::Name(ens_name) => self.resolve_name(&ens_name).await?,
NameOrAddress::Address(addr) => addr,
};
let from = utils::serialize(&from);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
self.request("eth_getTransactionCount", [from, block]).await
}
async fn get_balance<T: Into<NameOrAddress> + Send + Sync>(
&self,
from: T,
block: Option<BlockNumber>,
) -> Result<U256, ProviderError> {
let from = match from.into() {
NameOrAddress::Name(ens_name) => self.resolve_name(&ens_name).await?,
NameOrAddress::Address(addr) => addr,
};
let from = utils::serialize(&from);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
self.request("eth_getBalance", [from, block]).await
}
async fn get_chainid(&self) -> Result<U256, ProviderError> {
self.request("eth_chainId", ()).await
}
async fn call(
&self,
tx: &TransactionRequest,
block: Option<BlockNumber>,
) -> Result<Bytes, ProviderError> {
let tx = utils::serialize(tx);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
self.request("eth_call", [tx, block]).await
}
async fn estimate_gas(&self, tx: &TransactionRequest) -> Result<U256, ProviderError> {
self.request("eth_estimateGas", [tx]).await
}
async fn send_transaction(
&self,
mut tx: TransactionRequest,
_: Option<BlockNumber>,
) -> Result<PendingTransaction<'_, P>, ProviderError> {
if tx.from.is_none() {
tx.from = self.3;
}
if tx.gas.is_none() {
tx.gas = Some(self.estimate_gas(&tx).await?);
}
if let Some(NameOrAddress::Name(ref ens_name)) = tx.to {
let addr = self.resolve_name(&ens_name).await?;
tx.to = Some(addr.into())
}
let tx_hash = self.request("eth_sendTransaction", [tx]).await?;
Ok(PendingTransaction::new(tx_hash, self).interval(self.get_interval()))
}
async fn send_raw_transaction<'a>(
&'a self,
tx: &Transaction,
) -> Result<PendingTransaction<'a, P>, ProviderError> {
let rlp = utils::serialize(&tx.rlp());
let tx_hash = self.request("eth_sendRawTransaction", [rlp]).await?;
Ok(PendingTransaction::new(tx_hash, self).interval(self.get_interval()))
}
async fn is_signer(&self) -> bool {
match self.3 {
Some(sender) => self.sign(vec![], &sender).await.is_ok(),
None => false,
}
}
async fn sign<T: Into<Bytes> + Send + Sync>(
&self,
data: T,
from: &Address,
) -> Result<Signature, ProviderError> {
let data = utils::serialize(&data.into());
let from = utils::serialize(from);
let sig: String = self.request("eth_sign", [from, data]).await?;
let sig = sig.strip_prefix("0x").unwrap_or(&sig);
let sig = hex::decode(sig)?;
Ok(Signature::try_from(sig.as_slice())
.map_err(|e| ProviderError::CustomError(e.to_string()))?)
}
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, ProviderError> {
self.request("eth_getLogs", [filter]).await
}
async fn watch<'a>(
&'a self,
filter: &Filter,
) -> Result<FilterWatcher<'a, P, Log>, ProviderError> {
let id = self.new_filter(FilterKind::Logs(filter)).await?;
let filter = FilterWatcher::new(id, self).interval(self.get_interval());
Ok(filter)
}
async fn watch_blocks(&self) -> Result<FilterWatcher<'_, P, H256>, ProviderError> {
let id = self.new_filter(FilterKind::NewBlocks).await?;
let filter = FilterWatcher::new(id, self).interval(self.get_interval());
Ok(filter)
}
async fn watch_pending_transactions(
&self,
) -> Result<FilterWatcher<'_, P, H256>, ProviderError> {
let id = self.new_filter(FilterKind::PendingTransactions).await?;
let filter = FilterWatcher::new(id, self).interval(self.get_interval());
Ok(filter)
}
async fn new_filter(&self, filter: FilterKind<'_>) -> Result<U256, ProviderError> {
let (method, args) = match filter {
FilterKind::NewBlocks => ("eth_newBlockFilter", vec![]),
FilterKind::PendingTransactions => ("eth_newPendingTransactionFilter", vec![]),
FilterKind::Logs(filter) => ("eth_newFilter", vec![utils::serialize(&filter)]),
};
self.request(method, args).await
}
async fn uninstall_filter<T: Into<U256> + Send + Sync>(
&self,
id: T,
) -> Result<bool, ProviderError> {
let id = utils::serialize(&id.into());
self.request("eth_uninstallFilter", [id]).await
}
async fn get_filter_changes<T, R>(&self, id: T) -> Result<Vec<R>, ProviderError>
where
T: Into<U256> + Send + Sync,
R: Serialize + DeserializeOwned + Send + Sync + Debug,
{
let id = utils::serialize(&id.into());
self.request("eth_getFilterChanges", [id]).await
}
async fn get_storage_at<T: Into<NameOrAddress> + Send + Sync>(
&self,
from: T,
location: H256,
block: Option<BlockNumber>,
) -> Result<H256, ProviderError> {
let from = match from.into() {
NameOrAddress::Name(ens_name) => self.resolve_name(&ens_name).await?,
NameOrAddress::Address(addr) => addr,
};
let from = utils::serialize(&from);
let location = utils::serialize(&location);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
let value: String = self
.request("eth_getStorageAt", [from, location, block])
.await?;
let value = format!("{:0>64}", value.replace("0x", ""));
Ok(H256::from_slice(&Vec::from_hex(value)?))
}
async fn get_code<T: Into<NameOrAddress> + Send + Sync>(
&self,
at: T,
block: Option<BlockNumber>,
) -> Result<Bytes, ProviderError> {
let at = match at.into() {
NameOrAddress::Name(ens_name) => self.resolve_name(&ens_name).await?,
NameOrAddress::Address(addr) => addr,
};
let at = utils::serialize(&at);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
self.request("eth_getCode", [at, block]).await
}
async fn resolve_name(&self, ens_name: &str) -> Result<Address, ProviderError> {
self.query_resolver(ParamType::Address, ens_name, ens::ADDR_SELECTOR)
.await
}
async fn lookup_address(&self, address: Address) -> Result<String, ProviderError> {
let ens_name = ens::reverse_address(address);
self.query_resolver(ParamType::String, &ens_name, ens::NAME_SELECTOR)
.await
}
async fn txpool_content(&self) -> Result<TxpoolContent, ProviderError> {
self.request("txpool_content", ()).await
}
async fn txpool_inspect(&self) -> Result<TxpoolInspect, ProviderError> {
self.request("txpool_inspect", ()).await
}
async fn txpool_status(&self) -> Result<TxpoolStatus, ProviderError> {
self.request("txpool_status", ()).await
}
async fn trace_call(
&self,
req: TransactionRequest,
trace_type: Vec<TraceType>,
block: Option<BlockNumber>,
) -> Result<BlockTrace, ProviderError> {
let req = utils::serialize(&req);
let block = utils::serialize(&block.unwrap_or(BlockNumber::Latest));
let trace_type = utils::serialize(&trace_type);
self.request("trace_call", [req, trace_type, block]).await
}
async fn trace_raw_transaction(
&self,
data: Bytes,
trace_type: Vec<TraceType>,
) -> Result<BlockTrace, ProviderError> {
let data = utils::serialize(&data);
let trace_type = utils::serialize(&trace_type);
self.request("trace_rawTransaction", [data, trace_type])
.await
}
async fn trace_replay_transaction(
&self,
hash: H256,
trace_type: Vec<TraceType>,
) -> Result<BlockTrace, ProviderError> {
let hash = utils::serialize(&hash);
let trace_type = utils::serialize(&trace_type);
self.request("trace_replayTransaction", [hash, trace_type])
.await
}
async fn trace_replay_block_transactions(
&self,
block: BlockNumber,
trace_type: Vec<TraceType>,
) -> Result<Vec<BlockTrace>, ProviderError> {
let block = utils::serialize(&block);
let trace_type = utils::serialize(&trace_type);
self.request("trace_replayBlockTransactions", [block, trace_type])
.await
}
async fn trace_block(&self, block: BlockNumber) -> Result<Vec<Trace>, ProviderError> {
let block = utils::serialize(&block);
self.request("trace_block", [block]).await
}
async fn trace_filter(&self, filter: TraceFilter) -> Result<Vec<Trace>, ProviderError> {
let filter = utils::serialize(&filter);
self.request("trace_filter", vec![filter]).await
}
async fn trace_get<T: Into<U64> + Send + Sync>(
&self,
hash: H256,
index: Vec<T>,
) -> Result<Trace, ProviderError> {
let hash = utils::serialize(&hash);
let index: Vec<U64> = index.into_iter().map(|i| i.into()).collect();
let index = utils::serialize(&index);
self.request("trace_get", vec![hash, index]).await
}
async fn trace_transaction(&self, hash: H256) -> Result<Vec<Trace>, ProviderError> {
let hash = utils::serialize(&hash);
self.request("trace_transaction", vec![hash]).await
}
async fn parity_block_receipts<T: Into<BlockNumber> + Send + Sync>(
&self,
block: T,
) -> Result<Vec<TransactionReceipt>, Self::Error> {
self.request("parity_getBlockReceipts", vec![block.into()])
.await
}
async fn subscribe<T, R>(
&self,
params: T,
) -> Result<SubscriptionStream<'_, P, R>, ProviderError>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned + Send + Sync,
P: PubsubClient,
{
let id: U256 = self.request("eth_subscribe", params).await?;
SubscriptionStream::new(id, self).map_err(Into::into)
}
async fn unsubscribe<T>(&self, id: T) -> Result<bool, ProviderError>
where
T: Into<U256> + Send + Sync,
P: PubsubClient,
{
self.request("eth_unsubscribe", [id.into()]).await
}
async fn subscribe_blocks(
&self,
) -> Result<SubscriptionStream<'_, P, Block<TxHash>>, ProviderError>
where
P: PubsubClient,
{
self.subscribe(["newHeads"]).await
}
async fn subscribe_pending_txs(
&self,
) -> Result<SubscriptionStream<'_, P, TxHash>, ProviderError>
where
P: PubsubClient,
{
self.subscribe(["newPendingTransactions"]).await
}
async fn subscribe_logs<'a>(
&'a self,
filter: &Filter,
) -> Result<SubscriptionStream<'a, P, Log>, ProviderError>
where
P: PubsubClient,
{
let logs = utils::serialize(&"logs");
let filter = utils::serialize(filter);
self.subscribe([logs, filter]).await
}
}
impl<P: JsonRpcClient> Provider<P> {
async fn query_resolver<T: Detokenize>(
&self,
param: ParamType,
ens_name: &str,
selector: Selector,
) -> Result<T, ProviderError> {
let ens_addr = self.1.unwrap_or(ens::ENS_ADDRESS);
let data = self
.call(&ens::get_resolver(ens_addr, ens_name), None)
.await?;
let resolver_address: Address = decode_bytes(ParamType::Address, data);
if resolver_address == Address::zero() {
return Err(ProviderError::EnsError(ens_name.to_owned()));
}
let data = self
.call(&ens::resolve(resolver_address, selector, ens_name), None)
.await?;
Ok(decode_bytes(param, data))
}
#[cfg(test)]
pub async fn mine(&self, num_blocks: usize) -> Result<(), ProviderError> {
for _ in 0..num_blocks {
self.0
.request::<_, U256>("evm_mine", None::<()>)
.await
.map_err(Into::into)?;
}
Ok(())
}
pub fn ens<T: Into<Address>>(mut self, ens: T) -> Self {
self.1 = Some(ens.into());
self
}
pub fn interval<T: Into<Duration>>(mut self, interval: T) -> Self {
self.2 = Some(interval.into());
self
}
pub fn get_interval(&self) -> Duration {
self.2.unwrap_or(DEFAULT_POLL_INTERVAL)
}
}
#[cfg(feature = "ws")]
impl Provider<crate::Ws> {
pub async fn connect(
url: impl tokio_tungstenite::tungstenite::client::IntoClientRequest + Unpin,
) -> Result<Self, ProviderError> {
let ws = crate::Ws::connect(url).await?;
Ok(Self::new(ws))
}
}
impl Provider<MockProvider> {
pub fn mocked() -> (Self, MockProvider) {
let mock = MockProvider::new();
let mock_clone = mock.clone();
(Self::new(mock), mock_clone)
}
}
fn decode_bytes<T: Detokenize>(param: ParamType, bytes: Bytes) -> T {
let tokens = abi::decode(&[param], &bytes.as_ref())
.expect("could not abi-decode bytes to address tokens");
T::from_tokens(tokens).expect("could not parse tokens as address")
}
impl TryFrom<&str> for Provider<HttpProvider> {
type Error = ParseError;
fn try_from(src: &str) -> Result<Self, Self::Error> {
Ok(Provider(
HttpProvider::new(Url::parse(src)?),
None,
None,
None,
))
}
}
impl TryFrom<String> for Provider<HttpProvider> {
type Error = ParseError;
fn try_from(src: String) -> Result<Self, Self::Error> {
Provider::try_from(src.as_str())
}
}
#[cfg(test)]
mod ens_tests {
use super::*;
const INFURA: &str = "https://mainnet.infura.io/v3/c60b0bb42f8a4c6481ecd229eddaca27";
#[tokio::test]
async fn mainnet_resolve_name() {
let provider = Provider::<HttpProvider>::try_from(INFURA).unwrap();
let addr = provider
.resolve_name("registrar.firefly.eth")
.await
.unwrap();
assert_eq!(
addr,
"6fC21092DA55B392b045eD78F4732bff3C580e2c".parse().unwrap()
);
provider.resolve_name("asdfasdffads").await.unwrap_err();
provider
.resolve_name("asdfasdf.registrar.firefly.eth")
.await
.unwrap_err();
}
#[tokio::test]
async fn mainnet_lookup_address() {
let provider = Provider::<HttpProvider>::try_from(INFURA).unwrap();
let name = provider
.lookup_address("6fC21092DA55B392b045eD78F4732bff3C580e2c".parse().unwrap())
.await
.unwrap();
assert_eq!(name, "registrar.firefly.eth");
provider
.lookup_address("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".parse().unwrap())
.await
.unwrap_err();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Http;
use ethers_core::types::H256;
use futures_util::StreamExt;
#[tokio::test]
#[ignore]
async fn test_new_block_filter() {
let num_blocks = 3;
let provider = Provider::<HttpProvider>::try_from("http://localhost:8545")
.unwrap()
.interval(Duration::from_millis(1000));
let start_block = provider.get_block_number().await.unwrap();
let stream = provider.watch_blocks().await.unwrap().stream();
let hashes: Vec<H256> = stream.take(num_blocks).collect::<Vec<H256>>().await;
for (i, hash) in hashes.iter().enumerate() {
let block = provider
.get_block(start_block + i as u64 + 1)
.await
.unwrap()
.unwrap();
assert_eq!(*hash, block.hash.unwrap());
}
}
#[tokio::test]
async fn test_is_signer() {
use ethers_core::utils::Ganache;
use std::str::FromStr;
let ganache = Ganache::new().spawn();
let provider = Provider::<Http>::try_from(ganache.endpoint())
.unwrap()
.with_sender(ganache.addresses()[0]);
assert_eq!(provider.is_signer().await, true);
let provider = Provider::<Http>::try_from(ganache.endpoint()).unwrap();
assert_eq!(provider.is_signer().await, false);
let sender = Address::from_str("635B4764D1939DfAcD3a8014726159abC277BecC")
.expect("should be able to parse hex address");
let provider = Provider::<Http>::try_from(
"https://ropsten.infura.io/v3/fd8b88b56aa84f6da87b60f5441d6778",
)
.unwrap()
.with_sender(sender);
assert_eq!(provider.is_signer().await, false);
}
#[tokio::test]
#[ignore]
async fn test_new_pending_txs_filter() {
let num_txs = 5;
let provider = Provider::<HttpProvider>::try_from("http://localhost:8545")
.unwrap()
.interval(Duration::from_millis(1000));
let accounts = provider.get_accounts().await.unwrap();
let stream = provider
.watch_pending_transactions()
.await
.unwrap()
.stream();
let mut tx_hashes = Vec::new();
let tx = TransactionRequest::new()
.from(accounts[0])
.to(accounts[0])
.value(1e18 as u64);
for _ in 0..num_txs {
tx_hashes.push(provider.send_transaction(tx.clone(), None).await.unwrap());
}
let hashes: Vec<H256> = stream.take(num_txs).collect::<Vec<H256>>().await;
assert_eq!(tx_hashes, hashes);
}
#[tokio::test]
async fn receipt_on_unmined_tx() {
use ethers_core::{
types::TransactionRequest,
utils::{parse_ether, Ganache},
};
let ganache = Ganache::new().block_time(2u64).spawn();
let provider = Provider::<Http>::try_from(ganache.endpoint()).unwrap();
let accounts = provider.get_accounts().await.unwrap();
let tx = TransactionRequest::pay(accounts[0], parse_ether(1u64).unwrap()).from(accounts[0]);
let pending_tx = provider.send_transaction(tx, None).await.unwrap();
assert!(provider
.get_transaction_receipt(*pending_tx)
.await
.unwrap()
.is_none());
let hash = *pending_tx;
let receipt = pending_tx.await.unwrap();
assert_eq!(receipt.transaction_hash, hash);
}
#[tokio::test]
async fn parity_block_receipts() {
let url = match std::env::var("PARITY") {
Ok(inner) => inner,
_ => return,
};
let provider = Provider::<Http>::try_from(url.as_str()).unwrap();
let receipts = provider.parity_block_receipts(10657200).await.unwrap();
assert!(!receipts.is_empty());
}
#[tokio::test]
#[cfg(not(feature = "celo"))]
async fn block_subscribe() {
use ethers_core::utils::Ganache;
use futures_util::StreamExt;
let ganache = Ganache::new().block_time(2u64).spawn();
let provider = Provider::connect(ganache.ws_endpoint()).await.unwrap();
let stream = provider.subscribe_blocks().await.unwrap();
let blocks = stream
.take(3)
.map(|x| x.number.unwrap().as_u64())
.collect::<Vec<_>>()
.await;
assert_eq!(blocks, vec![1, 2, 3]);
}
}