web3/api/
eth_filter.rs

1//! `Eth` namespace, filters.
2
3use crate::{
4    api::Namespace,
5    error, helpers, rpc,
6    types::{Filter, Log, H256},
7    Transport,
8};
9use futures::{stream, Stream, TryStreamExt};
10use futures_timer::Delay;
11use serde::de::DeserializeOwned;
12use std::{fmt, marker::PhantomData, time::Duration, vec};
13
14fn filter_stream<T: Transport, I: DeserializeOwned>(
15    base: BaseFilter<T, I>,
16    poll_interval: Duration,
17) -> impl Stream<Item = error::Result<I>> {
18    let id = helpers::serialize(&base.id);
19    stream::unfold((base, id), move |state| async move {
20        let (base, id) = state;
21        Delay::new(poll_interval).await;
22        let response = base.transport.execute("eth_getFilterChanges", vec![id.clone()]).await;
23        let items: error::Result<Option<Vec<I>>> = response.and_then(helpers::decode);
24        let items = items.map(Option::unwrap_or_default);
25        Some((items, (base, id)))
26    })
27    // map I to Result<I> even though it is always Ok so that try_flatten works
28    .map_ok(|items| stream::iter(items.into_iter().map(Ok)))
29    .try_flatten()
30    .into_stream()
31}
32
33/// Specifies filter items and constructor method.
34trait FilterInterface {
35    /// Filter item type
36    type Output;
37
38    /// Name of method used to construct the filter
39    fn constructor() -> &'static str;
40}
41
42/// Logs Filter
43#[derive(Debug)]
44struct LogsFilter;
45
46impl FilterInterface for LogsFilter {
47    type Output = Log;
48
49    fn constructor() -> &'static str {
50        "eth_newFilter"
51    }
52}
53
54/// New blocks hashes filter.
55#[derive(Debug)]
56struct BlocksFilter;
57
58impl FilterInterface for BlocksFilter {
59    type Output = H256;
60
61    fn constructor() -> &'static str {
62        "eth_newBlockFilter"
63    }
64}
65
66/// New Pending Transactions Filter
67#[derive(Debug)]
68struct PendingTransactionsFilter;
69
70impl FilterInterface for PendingTransactionsFilter {
71    type Output = H256;
72
73    fn constructor() -> &'static str {
74        "eth_newPendingTransactionFilter"
75    }
76}
77
78/// Base filter handle.
79/// Allows to poll the filter.
80///
81/// Note: because Rust currently doesn't support async drop, the filter has to be uninstalled manually.
82/// See [https://github.com/tomusdrw/rust-web3/issues/583](this tracking issue).
83pub struct BaseFilter<T: Transport, I> {
84    // TODO [ToDr] Workaround for ganache returning 0x03 instead of 0x3
85    id: String,
86    transport: T,
87    item: PhantomData<I>,
88}
89
90impl<T: Transport, I: 'static> fmt::Debug for BaseFilter<T, I> {
91    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
92        fmt.debug_struct("BaseFilter")
93            .field("id", &self.id)
94            .field("transport", &self.transport)
95            .field("item", &std::any::TypeId::of::<I>())
96            .finish()
97    }
98}
99
100impl<T: Transport, I> Clone for BaseFilter<T, I> {
101    fn clone(&self) -> Self {
102        BaseFilter {
103            id: self.id.clone(),
104            transport: self.transport.clone(),
105            item: PhantomData::default(),
106        }
107    }
108}
109
110impl<T: Transport, I> BaseFilter<T, I> {
111    /// Uninstalls the filter
112    pub async fn uninstall(self) -> error::Result<bool>
113    where
114        Self: Sized,
115    {
116        let id = helpers::serialize(&self.id);
117        let response = self.transport.execute("eth_uninstallFilter", vec![id]).await?;
118        helpers::decode(response)
119    }
120
121    /// Borrows the transport.
122    pub fn transport(&self) -> &T {
123        &self.transport
124    }
125}
126
127impl<T: Transport, I: DeserializeOwned> BaseFilter<T, I> {
128    /// Polls this filter for changes.
129    /// Will return logs that happened after previous poll.
130    pub async fn poll(&self) -> error::Result<Option<Vec<I>>> {
131        let id = helpers::serialize(&self.id);
132        let response = self.transport.execute("eth_getFilterChanges", vec![id]).await?;
133        helpers::decode(response)
134    }
135
136    /// Returns the stream of items which automatically polls the server
137    pub fn stream(self, poll_interval: Duration) -> impl Stream<Item = error::Result<I>> {
138        filter_stream(self, poll_interval)
139    }
140}
141
142impl<T: Transport> BaseFilter<T, Log> {
143    /// Returns future with all logs matching given filter
144    pub async fn logs(&self) -> error::Result<Vec<Log>> {
145        let id = helpers::serialize(&self.id);
146        let response = self.transport.execute("eth_getFilterLogs", vec![id]).await?;
147        helpers::decode(response)
148    }
149}
150
151/// Should be used to create new filter future
152async fn create_filter<T: Transport, F: FilterInterface>(
153    transport: T,
154    arg: Vec<rpc::Value>,
155) -> error::Result<BaseFilter<T, F::Output>> {
156    let response = transport.execute(F::constructor(), arg).await?;
157    let id = helpers::decode(response)?;
158    Ok(BaseFilter {
159        id,
160        transport,
161        item: PhantomData,
162    })
163}
164
165/// `Eth` namespace, filters
166#[derive(Debug, Clone)]
167pub struct EthFilter<T> {
168    transport: T,
169}
170
171impl<T: Transport> Namespace<T> for EthFilter<T> {
172    fn new(transport: T) -> Self
173    where
174        Self: Sized,
175    {
176        EthFilter { transport }
177    }
178
179    fn transport(&self) -> &T {
180        &self.transport
181    }
182}
183
184impl<T: Transport> EthFilter<T> {
185    /// Installs a new logs filter.
186    pub async fn create_logs_filter(self, filter: Filter) -> error::Result<BaseFilter<T, Log>> {
187        let f = helpers::serialize(&filter);
188        create_filter::<_, LogsFilter>(self.transport, vec![f]).await
189    }
190
191    /// Installs a new block filter.
192    pub async fn create_blocks_filter(self) -> error::Result<BaseFilter<T, H256>> {
193        create_filter::<_, BlocksFilter>(self.transport, vec![]).await
194    }
195
196    /// Installs a new pending transactions filter.
197    pub async fn create_pending_transactions_filter(self) -> error::Result<BaseFilter<T, H256>> {
198        create_filter::<_, PendingTransactionsFilter>(self.transport, vec![]).await
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::EthFilter;
205    use crate::{
206        api::Namespace,
207        rpc::Value,
208        transports::test::TestTransport,
209        types::{Address, FilterBuilder, Log, H256},
210    };
211    use futures::stream::StreamExt;
212    use hex_literal::hex;
213    use std::time::Duration;
214
215    #[test]
216    fn logs_filter() {
217        // given
218        let mut transport = TestTransport::default();
219        transport.set_response(Value::String("0x123".into()));
220        {
221            let eth = EthFilter::new(&transport);
222
223            // when
224            let filter = FilterBuilder::default().limit(10).build();
225            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
226            assert_eq!(filter.id, "0x123".to_owned());
227        };
228
229        // then
230        transport.assert_request("eth_newFilter", &[r#"{"limit":10}"#.into()]);
231        transport.assert_no_more_requests();
232    }
233
234    #[test]
235    fn logs_filter_get_logs() {
236        // given
237        let log = Log {
238            address: Address::from_low_u64_be(1),
239            topics: vec![],
240            data: hex!("").into(),
241            block_hash: Some(H256::from_low_u64_be(2)),
242            block_number: Some(1.into()),
243            transaction_hash: Some(H256::from_low_u64_be(3)),
244            transaction_index: Some(0.into()),
245            log_index: Some(0.into()),
246            transaction_log_index: Some(0.into()),
247            log_type: Some("mined".into()),
248            removed: None,
249        };
250
251        let mut transport = TestTransport::default();
252        transport.set_response(Value::String("0x123".into()));
253        transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
254        let result = {
255            let eth = EthFilter::new(&transport);
256
257            // when
258            let filter = FilterBuilder::default()
259                .topics(None, Some(vec![H256::from_low_u64_be(2)]), None, None)
260                .build();
261            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
262            assert_eq!(filter.id, "0x123".to_owned());
263            futures::executor::block_on(filter.logs())
264        };
265
266        // then
267        assert_eq!(result, Ok(vec![log]));
268        transport.assert_request(
269            "eth_newFilter",
270            &[r#"{"topics":[null,"0x0000000000000000000000000000000000000000000000000000000000000002"]}"#.into()],
271        );
272        transport.assert_request("eth_getFilterLogs", &[r#""0x123""#.into()]);
273        transport.assert_no_more_requests();
274    }
275
276    #[test]
277    fn logs_filter_poll() {
278        // given
279        let log = Log {
280            address: Address::from_low_u64_be(1),
281            topics: vec![],
282            data: hex!("").into(),
283            block_hash: Some(H256::from_low_u64_be(2)),
284            block_number: Some(1.into()),
285            transaction_hash: Some(H256::from_low_u64_be(3)),
286            transaction_index: Some(0.into()),
287            log_index: Some(0.into()),
288            transaction_log_index: Some(0.into()),
289            log_type: Some("mined".into()),
290            removed: None,
291        };
292
293        let mut transport = TestTransport::default();
294        transport.set_response(Value::String("0x123".into()));
295        transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
296        let result = {
297            let eth = EthFilter::new(&transport);
298
299            // when
300            let filter = FilterBuilder::default()
301                .address(vec![Address::from_low_u64_be(2)])
302                .build();
303            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
304            assert_eq!(filter.id, "0x123".to_owned());
305            futures::executor::block_on(filter.poll())
306        };
307
308        // then
309        assert_eq!(result, Ok(Some(vec![log])));
310        transport.assert_request(
311            "eth_newFilter",
312            &[r#"{"address":"0x0000000000000000000000000000000000000002"}"#.into()],
313        );
314        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
315        transport.assert_no_more_requests();
316    }
317
318    #[test]
319    fn blocks_filter() {
320        // given
321        let mut transport = TestTransport::default();
322        transport.set_response(Value::String("0x123".into()));
323        {
324            let eth = EthFilter::new(&transport);
325
326            // when
327            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
328            assert_eq!(filter.id, "0x123".to_owned());
329        };
330
331        // then
332        transport.assert_request("eth_newBlockFilter", &[]);
333        transport.assert_no_more_requests();
334    }
335
336    #[test]
337    fn blocks_filter_poll() {
338        // given
339        let mut transport = TestTransport::default();
340        transport.set_response(Value::String("0x123".into()));
341        transport.add_response(Value::Array(vec![Value::String(
342            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
343        )]));
344        let result = {
345            let eth = EthFilter::new(&transport);
346
347            // when
348            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
349            assert_eq!(filter.id, "0x123".to_owned());
350            futures::executor::block_on(filter.poll())
351        };
352
353        // then
354        assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
355        transport.assert_request("eth_newBlockFilter", &[]);
356        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
357        transport.assert_no_more_requests();
358    }
359
360    #[test]
361    fn blocks_filter_stream() {
362        // given
363        let mut transport = TestTransport::default();
364        transport.set_response(Value::String("0x123".into()));
365        transport.add_response(Value::Array(vec![Value::String(
366            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
367        )]));
368        transport.add_response(Value::Array(vec![
369            Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000457"#.into()),
370            Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000458"#.into()),
371        ]));
372        transport.add_response(Value::Array(vec![Value::String(
373            r#"0x0000000000000000000000000000000000000000000000000000000000000459"#.into(),
374        )]));
375        let result: Vec<_> = {
376            let eth = EthFilter::new(&transport);
377
378            // when
379            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
380            futures::executor::block_on_stream(filter.stream(Duration::from_secs(0)).boxed_local())
381                .take(4)
382                .collect()
383        };
384
385        // then
386        assert_eq!(
387            result,
388            [0x456, 0x457, 0x458, 0x459]
389                .iter()
390                .copied()
391                .map(H256::from_low_u64_be)
392                .map(Ok)
393                .collect::<Vec<_>>()
394        );
395        transport.assert_request("eth_newBlockFilter", &[]);
396        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
397        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
398        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
399    }
400
401    #[test]
402    fn pending_transactions_filter() {
403        // given
404        let mut transport = TestTransport::default();
405        transport.set_response(Value::String("0x123".into()));
406        {
407            let eth = EthFilter::new(&transport);
408
409            // when
410            let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
411            assert_eq!(filter.id, "0x123".to_owned());
412        };
413
414        // then
415        transport.assert_request("eth_newPendingTransactionFilter", &[]);
416        transport.assert_no_more_requests();
417    }
418
419    #[test]
420    fn create_pending_transactions_filter_poll() {
421        // given
422        let mut transport = TestTransport::default();
423        transport.set_response(Value::String("0x123".into()));
424        transport.add_response(Value::Array(vec![Value::String(
425            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
426        )]));
427        let result = {
428            let eth = EthFilter::new(&transport);
429
430            // when
431            let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
432            assert_eq!(filter.id, "0x123".to_owned());
433            futures::executor::block_on(filter.poll())
434        };
435
436        // then
437        assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
438        transport.assert_request("eth_newPendingTransactionFilter", &[]);
439        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
440        transport.assert_no_more_requests();
441    }
442}