mwc_web3/api/
eth_filter.rs

1//! `Eth` namespace, filters.
2
3use crate::{
4    api::Namespace,
5    error, helpers, rpc,
6    types::{Filter, Log, H256},
7    Transport,
8};
9use futures::{stream, Stream, TryStreamExt};
10use futures_timer::Delay;
11use serde::de::DeserializeOwned;
12use std::{fmt, marker::PhantomData, time::Duration, vec};
13
14fn filter_stream<T: Transport, I: DeserializeOwned>(
15    base: BaseFilter<T, I>,
16    poll_interval: Duration,
17) -> impl Stream<Item = error::Result<I>> {
18    let id = helpers::serialize(&base.id);
19    stream::unfold((base, id), move |state| async move {
20        let (base, id) = state;
21        Delay::new(poll_interval).await;
22        let response = base.transport.execute("eth_getFilterChanges", vec![id.clone()]).await;
23        let items: error::Result<Option<Vec<I>>> = response.and_then(helpers::decode);
24        let items = items.map(Option::unwrap_or_default);
25        Some((items, (base, id)))
26    })
27    // map I to Result<I> even though it is always Ok so that try_flatten works
28    .map_ok(|items| stream::iter(items.into_iter().map(Ok)))
29    .try_flatten()
30    .into_stream()
31}
32
33/// Specifies filter items and constructor method.
34trait FilterInterface {
35    /// Filter item type
36    type Output;
37
38    /// Name of method used to construct the filter
39    fn constructor() -> &'static str;
40}
41
42/// Logs Filter
43#[derive(Debug)]
44struct LogsFilter;
45
46impl FilterInterface for LogsFilter {
47    type Output = Log;
48
49    fn constructor() -> &'static str {
50        "eth_newFilter"
51    }
52}
53
54/// New blocks hashes filter.
55#[derive(Debug)]
56struct BlocksFilter;
57
58impl FilterInterface for BlocksFilter {
59    type Output = H256;
60
61    fn constructor() -> &'static str {
62        "eth_newBlockFilter"
63    }
64}
65
66/// New Pending Transactions Filter
67#[derive(Debug)]
68struct PendingTransactionsFilter;
69
70impl FilterInterface for PendingTransactionsFilter {
71    type Output = H256;
72
73    fn constructor() -> &'static str {
74        "eth_newPendingTransactionFilter"
75    }
76}
77
78/// Base filter handle.
79/// Uninstall filter on drop.
80/// Allows to poll the filter.
81pub struct BaseFilter<T: Transport, I> {
82    // TODO [ToDr] Workaround for ganache returning 0x03 instead of 0x3
83    id: String,
84    transport: T,
85    item: PhantomData<I>,
86}
87
88impl<T: Transport, I: 'static> fmt::Debug for BaseFilter<T, I> {
89    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
90        fmt.debug_struct("BaseFilter")
91            .field("id", &self.id)
92            .field("transport", &self.transport)
93            .field("item", &std::any::TypeId::of::<I>())
94            .finish()
95    }
96}
97
98impl<T: Transport, I> Clone for BaseFilter<T, I> {
99    fn clone(&self) -> Self {
100        BaseFilter {
101            id: self.id.clone(),
102            transport: self.transport.clone(),
103            item: PhantomData::default(),
104        }
105    }
106}
107
108impl<T: Transport, I> BaseFilter<T, I> {
109    /// Uninstalls the filter
110    pub async fn uninstall(self) -> error::Result<bool>
111    where
112        Self: Sized,
113    {
114        let id = helpers::serialize(&self.id);
115        let response = self.transport.execute("eth_uninstallFilter", vec![id]).await?;
116        helpers::decode(response)
117    }
118
119    /// Borrows the transport.
120    pub fn transport(&self) -> &T {
121        &self.transport
122    }
123}
124
125impl<T: Transport, I: DeserializeOwned> BaseFilter<T, I> {
126    /// Polls this filter for changes.
127    /// Will return logs that happened after previous poll.
128    pub async fn poll(&self) -> error::Result<Option<Vec<I>>> {
129        let id = helpers::serialize(&self.id);
130        let response = self.transport.execute("eth_getFilterChanges", vec![id]).await?;
131        helpers::decode(response)
132    }
133
134    /// Returns the stream of items which automatically polls the server
135    pub fn stream(self, poll_interval: Duration) -> impl Stream<Item = error::Result<I>> {
136        filter_stream(self, poll_interval)
137    }
138}
139
140impl<T: Transport> BaseFilter<T, Log> {
141    /// Returns future with all logs matching given filter
142    pub async fn logs(&self) -> error::Result<Vec<Log>> {
143        let id = helpers::serialize(&self.id);
144        let response = self.transport.execute("eth_getFilterLogs", vec![id]).await?;
145        helpers::decode(response)
146    }
147}
148
149/// Should be used to create new filter future
150async fn create_filter<T: Transport, F: FilterInterface>(
151    transport: T,
152    arg: Vec<rpc::Value>,
153) -> error::Result<BaseFilter<T, F::Output>> {
154    let response = transport.execute(F::constructor(), arg).await?;
155    let id = helpers::decode(response)?;
156    Ok(BaseFilter {
157        id,
158        transport,
159        item: PhantomData,
160    })
161}
162
163/// `Eth` namespace, filters
164#[derive(Debug, Clone)]
165pub struct EthFilter<T> {
166    transport: T,
167}
168
169impl<T: Transport> Namespace<T> for EthFilter<T> {
170    fn new(transport: T) -> Self
171    where
172        Self: Sized,
173    {
174        EthFilter { transport }
175    }
176
177    fn transport(&self) -> &T {
178        &self.transport
179    }
180}
181
182impl<T: Transport> EthFilter<T> {
183    /// Installs a new logs filter.
184    pub async fn create_logs_filter(self, filter: Filter) -> error::Result<BaseFilter<T, Log>> {
185        let f = helpers::serialize(&filter);
186        create_filter::<_, LogsFilter>(self.transport, vec![f]).await
187    }
188
189    /// Installs a new block filter.
190    pub async fn create_blocks_filter(self) -> error::Result<BaseFilter<T, H256>> {
191        create_filter::<_, BlocksFilter>(self.transport, vec![]).await
192    }
193
194    /// Installs a new pending transactions filter.
195    pub async fn create_pending_transactions_filter(self) -> error::Result<BaseFilter<T, H256>> {
196        create_filter::<_, PendingTransactionsFilter>(self.transport, vec![]).await
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::EthFilter;
203    use crate::{
204        api::Namespace,
205        rpc::Value,
206        transports::test::TestTransport,
207        types::{Address, FilterBuilder, Log, H256},
208    };
209    use futures::stream::StreamExt;
210    use hex_literal::hex;
211    use std::time::Duration;
212
213    #[test]
214    fn logs_filter() {
215        // given
216        let mut transport = TestTransport::default();
217        transport.set_response(Value::String("0x123".into()));
218        {
219            let eth = EthFilter::new(&transport);
220
221            // when
222            let filter = FilterBuilder::default().limit(10).build();
223            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
224            assert_eq!(filter.id, "0x123".to_owned());
225        };
226
227        // then
228        transport.assert_request("eth_newFilter", &[r#"{"limit":10}"#.into()]);
229        transport.assert_no_more_requests();
230    }
231
232    #[test]
233    fn logs_filter_get_logs() {
234        // given
235        let log = Log {
236            address: Address::from_low_u64_be(1),
237            topics: vec![],
238            data: hex!("").into(),
239            block_hash: Some(H256::from_low_u64_be(2)),
240            block_number: Some(1.into()),
241            transaction_hash: Some(H256::from_low_u64_be(3)),
242            transaction_index: Some(0.into()),
243            log_index: Some(0.into()),
244            transaction_log_index: Some(0.into()),
245            log_type: Some("mined".into()),
246            removed: None,
247        };
248
249        let mut transport = TestTransport::default();
250        transport.set_response(Value::String("0x123".into()));
251        transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
252        let result = {
253            let eth = EthFilter::new(&transport);
254
255            // when
256            let filter = FilterBuilder::default()
257                .topics(None, Some(vec![H256::from_low_u64_be(2)]), None, None)
258                .build();
259            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
260            assert_eq!(filter.id, "0x123".to_owned());
261            futures::executor::block_on(filter.logs())
262        };
263
264        // then
265        assert_eq!(result, Ok(vec![log]));
266        transport.assert_request(
267            "eth_newFilter",
268            &[r#"{"topics":[null,"0x0000000000000000000000000000000000000000000000000000000000000002"]}"#.into()],
269        );
270        transport.assert_request("eth_getFilterLogs", &[r#""0x123""#.into()]);
271        transport.assert_no_more_requests();
272    }
273
274    #[test]
275    fn logs_filter_poll() {
276        // given
277        let log = Log {
278            address: Address::from_low_u64_be(1),
279            topics: vec![],
280            data: hex!("").into(),
281            block_hash: Some(H256::from_low_u64_be(2)),
282            block_number: Some(1.into()),
283            transaction_hash: Some(H256::from_low_u64_be(3)),
284            transaction_index: Some(0.into()),
285            log_index: Some(0.into()),
286            transaction_log_index: Some(0.into()),
287            log_type: Some("mined".into()),
288            removed: None,
289        };
290
291        let mut transport = TestTransport::default();
292        transport.set_response(Value::String("0x123".into()));
293        transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
294        let result = {
295            let eth = EthFilter::new(&transport);
296
297            // when
298            let filter = FilterBuilder::default()
299                .address(vec![Address::from_low_u64_be(2)])
300                .build();
301            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
302            assert_eq!(filter.id, "0x123".to_owned());
303            futures::executor::block_on(filter.poll())
304        };
305
306        // then
307        assert_eq!(result, Ok(Some(vec![log])));
308        transport.assert_request(
309            "eth_newFilter",
310            &[r#"{"address":"0x0000000000000000000000000000000000000002"}"#.into()],
311        );
312        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
313        transport.assert_no_more_requests();
314    }
315
316    #[test]
317    fn blocks_filter() {
318        // given
319        let mut transport = TestTransport::default();
320        transport.set_response(Value::String("0x123".into()));
321        {
322            let eth = EthFilter::new(&transport);
323
324            // when
325            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
326            assert_eq!(filter.id, "0x123".to_owned());
327        };
328
329        // then
330        transport.assert_request("eth_newBlockFilter", &[]);
331        transport.assert_no_more_requests();
332    }
333
334    #[test]
335    fn blocks_filter_poll() {
336        // given
337        let mut transport = TestTransport::default();
338        transport.set_response(Value::String("0x123".into()));
339        transport.add_response(Value::Array(vec![Value::String(
340            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
341        )]));
342        let result = {
343            let eth = EthFilter::new(&transport);
344
345            // when
346            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
347            assert_eq!(filter.id, "0x123".to_owned());
348            futures::executor::block_on(filter.poll())
349        };
350
351        // then
352        assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
353        transport.assert_request("eth_newBlockFilter", &[]);
354        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
355        transport.assert_no_more_requests();
356    }
357
358    #[test]
359    fn blocks_filter_stream() {
360        // given
361        let mut transport = TestTransport::default();
362        transport.set_response(Value::String("0x123".into()));
363        transport.add_response(Value::Array(vec![Value::String(
364            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
365        )]));
366        transport.add_response(Value::Array(vec![
367            Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000457"#.into()),
368            Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000458"#.into()),
369        ]));
370        transport.add_response(Value::Array(vec![Value::String(
371            r#"0x0000000000000000000000000000000000000000000000000000000000000459"#.into(),
372        )]));
373        let result: Vec<_> = {
374            let eth = EthFilter::new(&transport);
375
376            // when
377            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
378            futures::executor::block_on_stream(filter.stream(Duration::from_secs(0)).boxed_local())
379                .take(4)
380                .collect()
381        };
382
383        // then
384        assert_eq!(
385            result,
386            [0x456, 0x457, 0x458, 0x459]
387                .iter()
388                .copied()
389                .map(H256::from_low_u64_be)
390                .map(Ok)
391                .collect::<Vec<_>>()
392        );
393        transport.assert_request("eth_newBlockFilter", &[]);
394        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
395        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
396        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
397    }
398
399    #[test]
400    fn pending_transactions_filter() {
401        // given
402        let mut transport = TestTransport::default();
403        transport.set_response(Value::String("0x123".into()));
404        {
405            let eth = EthFilter::new(&transport);
406
407            // when
408            let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
409            assert_eq!(filter.id, "0x123".to_owned());
410        };
411
412        // then
413        transport.assert_request("eth_newPendingTransactionFilter", &[]);
414        transport.assert_no_more_requests();
415    }
416
417    #[test]
418    fn create_pending_transactions_filter_poll() {
419        // given
420        let mut transport = TestTransport::default();
421        transport.set_response(Value::String("0x123".into()));
422        transport.add_response(Value::Array(vec![Value::String(
423            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
424        )]));
425        let result = {
426            let eth = EthFilter::new(&transport);
427
428            // when
429            let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
430            assert_eq!(filter.id, "0x123".to_owned());
431            futures::executor::block_on(filter.poll())
432        };
433
434        // then
435        assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
436        transport.assert_request("eth_newPendingTransactionFilter", &[]);
437        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
438        transport.assert_no_more_requests();
439    }
440}