use crate::errors::ExecutionError;
use ethcontract_common::abi::{Topic, TopicFilter};
use futures::future::{self, TryFutureExt};
use futures::stream::{self, Stream, TryStreamExt};
use std::num::NonZeroU64;
use std::time::Duration;
use web3::api::Web3;
use web3::types::{Address, BlockNumber, Filter, FilterBuilder, Log, H256};
use web3::Transport;
#[cfg(not(test))]
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(5);
#[cfg(test)]
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(0);
pub const DEFAULT_BLOCK_PAGE_SIZE: u64 = 10_000;
#[derive(Debug)]
#[must_use = "log filter builders do nothing unless you query or stream them"]
pub struct LogFilterBuilder<T: Transport> {
web3: Web3<T>,
pub from_block: Option<BlockNumber>,
pub to_block: Option<BlockNumber>,
pub block_hash: Option<H256>,
pub address: Vec<Address>,
pub topics: TopicFilter,
pub limit: Option<usize>,
pub block_page_size: Option<NonZeroU64>,
pub poll_interval: Option<Duration>,
}
impl<T: Transport> LogFilterBuilder<T> {
pub fn new(web3: Web3<T>) -> Self {
LogFilterBuilder {
web3,
from_block: None,
to_block: None,
address: Vec::new(),
topics: TopicFilter::default(),
limit: None,
block_page_size: None,
poll_interval: None,
block_hash: None,
}
}
#[allow(clippy::wrong_self_convention)]
pub fn from_block(mut self, block: BlockNumber) -> Self {
self.from_block = Some(block);
self
}
#[allow(clippy::wrong_self_convention)]
pub fn to_block(mut self, block: BlockNumber) -> Self {
self.to_block = Some(block);
self
}
pub fn block_hash(mut self, hash: H256) -> Self {
self.block_hash = Some(hash);
self
}
pub fn address(mut self, address: Vec<Address>) -> Self {
self.address = address;
self
}
pub fn topic0(mut self, topic: Topic<H256>) -> Self {
self.topics.topic0 = topic;
self
}
pub fn topic1(mut self, topic: Topic<H256>) -> Self {
self.topics.topic1 = topic;
self
}
pub fn topic2(mut self, topic: Topic<H256>) -> Self {
self.topics.topic2 = topic;
self
}
pub fn topic3(mut self, topic: Topic<H256>) -> Self {
self.topics.topic3 = topic;
self
}
pub fn limit(mut self, value: usize) -> Self {
self.limit = Some(value);
self
}
pub fn block_page_size(mut self, value: u64) -> Self {
self.block_page_size = Some(NonZeroU64::new(value).expect("block page size cannot be 0"));
self
}
pub fn poll_interval(mut self, value: Duration) -> Self {
self.poll_interval = Some(value);
self
}
pub fn into_filter(self) -> FilterBuilder {
let mut filter = FilterBuilder::default();
if let Some(from_block) = self.from_block {
filter = filter.from_block(from_block);
}
if let Some(to_block) = self.to_block {
filter = filter.to_block(to_block);
}
if let Some(hash) = self.block_hash {
filter = filter.block_hash(hash);
}
if !self.address.is_empty() {
filter = filter.address(self.address);
}
if self.topics != TopicFilter::default() {
filter = filter.topics(
topic_to_option(self.topics.topic0),
topic_to_option(self.topics.topic1),
topic_to_option(self.topics.topic2),
topic_to_option(self.topics.topic3),
);
}
if let Some(limit) = self.limit {
filter = filter.limit(limit)
}
filter
}
pub async fn past_logs(self) -> Result<Vec<Log>, ExecutionError> {
let web3 = self.web3.clone();
let filter = self.into_filter();
let logs = web3.eth().logs(filter.build()).await?;
Ok(logs)
}
pub fn past_logs_pages(mut self) -> impl Stream<Item = Result<Vec<Log>, ExecutionError>> {
self.limit = None;
stream::try_unfold(PastLogsStream::Init(self), PastLogsStream::next)
.try_filter(|logs| future::ready(!logs.is_empty()))
}
pub fn stream(self) -> impl Stream<Item = Result<Log, ExecutionError>> {
let web3 = self.web3.clone();
let poll_interval = self.poll_interval.unwrap_or(DEFAULT_POLL_INTERVAL);
let filter = self.into_filter();
async move {
let eth_filter = web3
.eth_filter()
.create_logs_filter(filter.build())
.await
.map_err(ExecutionError::from)?;
let stream = eth_filter
.stream(poll_interval)
.map_err(ExecutionError::from);
Ok(stream)
}
.try_flatten_stream()
}
}
fn topic_to_option(topic: Topic<H256>) -> Option<Vec<H256>> {
match topic {
Topic::Any => None,
Topic::OneOf(v) => Some(v),
Topic::This(t) => Some(vec![t]),
}
}
enum PastLogsStream<T: Transport> {
Init(LogFilterBuilder<T>),
Done,
Paging(PastLogsPager<T>),
Querying(Web3<T>, Filter),
}
impl<T: Transport> PastLogsStream<T> {
async fn next(mut self) -> Result<Option<(Vec<Log>, Self)>, ExecutionError> {
loop {
let (logs, next) = match self {
PastLogsStream::Init(builder) => {
self = PastLogsStream::init(builder).await?;
continue;
}
PastLogsStream::Done => return Ok(None),
PastLogsStream::Paging(mut pager) => {
let logs = match pager.next_page().await? {
Some(logs) => logs,
None => return Ok(None),
};
(logs, PastLogsStream::Paging(pager))
}
PastLogsStream::Querying(web3, filter) => {
let logs = web3.eth().logs(filter.clone()).await?;
(logs, PastLogsStream::Done)
}
};
return Ok(Some((logs, next)));
}
}
async fn init(builder: LogFilterBuilder<T>) -> Result<Self, ExecutionError> {
let from_block = builder.from_block.unwrap_or(BlockNumber::Latest);
let to_block = builder.to_block.unwrap_or(BlockNumber::Latest);
let web3 = builder.web3.clone();
let block_page_size = builder
.block_page_size
.map(|size| size.get())
.unwrap_or(DEFAULT_BLOCK_PAGE_SIZE);
let filter = builder.into_filter();
let start_block = match from_block {
BlockNumber::Earliest => Some(0),
BlockNumber::Number(value) => Some(value.as_u64()),
BlockNumber::Latest | BlockNumber::Pending => None,
};
let end_block = match to_block {
BlockNumber::Earliest => None,
BlockNumber::Number(value) => Some(value.as_u64()),
BlockNumber::Latest | BlockNumber::Pending => {
let latest_block = web3.eth().block_number().await?;
Some(latest_block.as_u64())
}
};
let next = match (start_block, end_block) {
(Some(page_block), Some(end_block)) => PastLogsStream::Paging(PastLogsPager {
web3,
to_block,
block_page_size,
filter,
page_block,
end_block,
}),
_ => PastLogsStream::Querying(web3, filter.build()),
};
Ok(next)
}
}
struct PastLogsPager<T: Transport> {
web3: Web3<T>,
to_block: BlockNumber,
block_page_size: u64,
filter: FilterBuilder,
page_block: u64,
end_block: u64,
}
impl<T: Transport> PastLogsPager<T> {
async fn next_page(&mut self) -> Result<Option<Vec<Log>>, ExecutionError> {
debug_assert!(
self.block_page_size != 0,
"pager should never be constructed with 0 block page size",
);
while self.page_block <= self.end_block {
let page_end = self.page_block + self.block_page_size - 1;
let page_to_block = if page_end < self.end_block {
BlockNumber::Number(page_end.into())
} else {
self.to_block
};
let page = self
.web3
.eth()
.logs(
self.filter
.clone()
.from_block(self.page_block.into())
.to_block(page_to_block)
.build(),
)
.await?;
self.page_block = page_end + 1;
if page.is_empty() {
continue;
}
return Ok(Some(page));
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::prelude::*;
use futures::stream::StreamExt;
use serde_json::Value;
use web3::types::U64;
fn generate_log(kind: &str) -> Value {
json!({
"address": Address::zero(),
"topics": [],
"data": "0x",
"blockHash": H256::zero(),
"blockNumber": "0x0",
"transactionHash": H256::zero(),
"transactionIndex": "0x0",
"logIndex": "0x0",
"transactionLogIndex": "0x0",
"logType": kind,
"removed": false,
})
}
#[test]
fn past_logs_options() {
let mut transport = TestTransport::new();
let web3 = Web3::new(transport.clone());
let address = Address::repeat_byte(0x42);
let topics = (0..=3).map(H256::repeat_byte).collect::<Vec<_>>();
transport.add_response(json!([generate_log("awesome")]));
let logs = LogFilterBuilder::new(web3)
.from_block(66.into())
.to_block(BlockNumber::Pending)
.address(vec![address])
.topic0(Topic::This(topics[0]))
.topic1(Topic::This(topics[1]))
.topic2(Topic::This(topics[2]))
.topic3(Topic::OneOf(vec![topics[3]; 3]))
.limit(42)
.block_page_size(5) .poll_interval(Duration::from_secs(100)) .past_logs()
.immediate()
.expect("failed to get past logs");
assert_eq!(logs[0].log_type.as_deref(), Some("awesome"));
transport.assert_request(
"eth_getLogs",
&[json!({
"address": address,
"fromBlock": U64::from(66),
"toBlock": BlockNumber::Pending,
"topics": [
topics[0],
topics[1],
topics[2],
vec![topics[3]; 3],
],
"limit": 42,
})],
);
transport.assert_no_more_requests();
}
#[test]
fn past_log_stream_logs() {
let mut transport = TestTransport::new();
let web3 = Web3::new(transport.clone());
let address = Address::repeat_byte(0x42);
let topic = H256::repeat_byte(42);
let log = generate_log("awesome");
transport.add_response(json!(U64::from(20)));
transport.add_response(json!([log]));
transport.add_response(json!([]));
transport.add_response(json!([log, log]));
let mut raw_events = LogFilterBuilder::new(web3)
.from_block(10.into())
.to_block(BlockNumber::Pending)
.address(vec![address])
.topic0(Topic::This(topic))
.limit(42) .block_page_size(5)
.past_logs_pages()
.boxed();
let next = raw_events.next().immediate();
assert!(
matches!(&next, Some(Ok(logs)) if logs.len() == 1),
"expected page length of 1 but got {:?}",
next,
);
let next = raw_events.next().immediate();
assert!(
matches!(&next, Some(Ok(logs)) if logs.len() == 2),
"expected page length of 2 but got {:?}",
next,
);
let next = raw_events.next().immediate();
assert!(
next.is_none(),
"expected stream to be complete but got {:?}",
next,
);
transport.assert_request("eth_blockNumber", &[]);
transport.assert_request(
"eth_getLogs",
&[json!({
"address": address,
"fromBlock": U64::from(10),
"toBlock": U64::from(14),
"topics": [topic],
})],
);
transport.assert_request(
"eth_getLogs",
&[json!({
"address": address,
"fromBlock": U64::from(15),
"toBlock": U64::from(19),
"topics": [topic],
})],
);
transport.assert_request(
"eth_getLogs",
&[json!({
"address": address,
"fromBlock": U64::from(20),
"toBlock": "pending",
"topics": [topic],
})],
);
transport.assert_no_more_requests();
}
#[test]
fn log_stream_next_log() {
let mut transport = TestTransport::new();
let web3 = Web3::new(transport.clone());
transport.add_response(json!("0xf0"));
transport.add_response(json!([generate_log("awesome")]));
let log = LogFilterBuilder::new(web3)
.stream()
.boxed()
.next()
.wait()
.expect("log stream did not produce any logs")
.expect("failed to get log from log stream");
assert_eq!(log.log_type.as_deref(), Some("awesome"));
transport.assert_request("eth_newFilter", &[json!({})]);
transport.assert_request("eth_getFilterChanges", &[json!("0xf0")]);
transport.assert_no_more_requests();
}
}