pink_web3/api/
eth_filter.rs

1//! `Eth` namespace, filters.
2use crate::helpers::CallFuture;
3use crate::prelude::*;
4use crate::{
5    api::Namespace,
6    error, helpers,
7    types::{Filter, Log, H256},
8    Transport,
9};
10use futures::{stream, Stream, TryStreamExt};
11// use futures_timer::Delay;
12use core::{fmt, marker::PhantomData, time::Duration};
13use serde::de::DeserializeOwned;
14
15fn filter_stream<T: Transport, I: DeserializeOwned>(
16    base: BaseFilter<T, I>,
17    poll_interval: Duration,
18) -> impl Stream<Item = error::Result<I>> {
19    stream::unfold(base, move |base| async move {
20        // Delay::new(poll_interval).await;
21        let items = base.poll().await;
22        let items = items.map(Option::unwrap_or_default);
23        Some((items, base))
24    })
25    // map I to Result<I> even though it is always Ok so that try_flatten works
26    .map_ok(|items| stream::iter(items.into_iter().map(Ok)))
27    .try_flatten()
28    .into_stream()
29}
30
31/// Specifies filter items and constructor method.
32trait FilterInterface {
33    /// Filter item type
34    type Output;
35
36    /// Name of method used to construct the filter
37    fn constructor() -> &'static str;
38}
39
40/// Logs Filter
41#[derive(Debug)]
42struct LogsFilter;
43
44impl FilterInterface for LogsFilter {
45    type Output = Log;
46
47    fn constructor() -> &'static str {
48        "eth_newFilter"
49    }
50}
51
52/// New blocks hashes filter.
53#[derive(Debug)]
54struct BlocksFilter;
55
56impl FilterInterface for BlocksFilter {
57    type Output = H256;
58
59    fn constructor() -> &'static str {
60        "eth_newBlockFilter"
61    }
62}
63
64/// New Pending Transactions Filter
65#[derive(Debug)]
66struct PendingTransactionsFilter;
67
68impl FilterInterface for PendingTransactionsFilter {
69    type Output = H256;
70
71    fn constructor() -> &'static str {
72        "eth_newPendingTransactionFilter"
73    }
74}
75
76/// Base filter handle.
77/// Allows to poll the filter.
78///
79/// Note: because Rust currently doesn't support async drop, the filter has to be uninstalled manually.
80/// See [https://github.com/tomusdrw/rust-web3/issues/583](this tracking issue).
81pub struct BaseFilter<T: Transport, I> {
82    // TODO [ToDr] Workaround for ganache returning 0x03 instead of 0x3
83    id: String,
84    transport: T,
85    item: PhantomData<I>,
86}
87
88impl<T: Transport, I: 'static> fmt::Debug for BaseFilter<T, I> {
89    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
90        fmt.debug_struct("BaseFilter")
91            .field("id", &self.id)
92            .field("item", &std::any::TypeId::of::<I>())
93            .finish()
94    }
95}
96
97impl<T: Transport, I> Clone for BaseFilter<T, I> {
98    fn clone(&self) -> Self {
99        BaseFilter {
100            id: self.id.clone(),
101            transport: self.transport.clone(),
102            item: PhantomData::default(),
103        }
104    }
105}
106
107impl<T: Transport, I> BaseFilter<T, I> {
108    /// Uninstalls the filter
109    pub async fn uninstall(self) -> error::Result<bool>
110    where
111        Self: Sized,
112    {
113        let id = helpers::serialize(&self.id);
114        Ok(CallFuture::new(self.transport.execute("eth_uninstallFilter", vec![id])).await?)
115    }
116
117    /// Borrows the transport.
118    pub fn transport(&self) -> &T {
119        &self.transport
120    }
121}
122
123impl<T: Transport, I: DeserializeOwned> BaseFilter<T, I> {
124    /// Polls this filter for changes.
125    /// Will return logs that happened after previous poll.
126    pub async fn poll(&self) -> error::Result<Option<Vec<I>>> {
127        let id = helpers::serialize(&self.id);
128        Ok(CallFuture::new(self.transport.execute("eth_getFilterChanges", vec![id])).await?)
129    }
130
131    /// Returns the stream of items which automatically polls the server
132    pub fn stream(self, poll_interval: Duration) -> impl Stream<Item = error::Result<I>> {
133        filter_stream(self, poll_interval)
134    }
135}
136
137impl<T: Transport> BaseFilter<T, Log> {
138    /// Returns future with all logs matching given filter
139    pub async fn logs(&self) -> error::Result<Vec<Log>> {
140        let id = helpers::serialize(&self.id);
141        Ok(CallFuture::new(self.transport.execute("eth_getFilterLogs", vec![id])).await?)
142    }
143}
144
145/// Should be used to create new filter future
146async fn create_filter<'a, T: Transport, F: FilterInterface>(
147    transport: T,
148    arg: Vec<crate::Value<'a>>,
149) -> error::Result<BaseFilter<T, F::Output>> {
150    let id = CallFuture::new(transport.execute(F::constructor(), arg)).await?;
151    Ok(BaseFilter {
152        id,
153        transport,
154        item: PhantomData,
155    })
156}
157
158/// `Eth` namespace, filters
159#[derive(Debug, Clone)]
160pub struct EthFilter<T> {
161    transport: T,
162}
163
164impl<T: Transport> Namespace<T> for EthFilter<T> {
165    fn new(transport: T) -> Self
166    where
167        Self: Sized,
168    {
169        EthFilter { transport }
170    }
171
172    fn transport(&self) -> &T {
173        &self.transport
174    }
175}
176
177impl<T: Transport> EthFilter<T> {
178    /// Installs a new logs filter.
179    pub async fn create_logs_filter(self, filter: Filter) -> error::Result<BaseFilter<T, Log>> {
180        let f = helpers::serialize(&filter);
181        create_filter::<_, LogsFilter>(self.transport, vec![f]).await
182    }
183
184    /// Installs a new block filter.
185    pub async fn create_blocks_filter(self) -> error::Result<BaseFilter<T, H256>> {
186        create_filter::<_, BlocksFilter>(self.transport, vec![]).await
187    }
188
189    /// Installs a new pending transactions filter.
190    pub async fn create_pending_transactions_filter(self) -> error::Result<BaseFilter<T, H256>> {
191        create_filter::<_, PendingTransactionsFilter>(self.transport, vec![]).await
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::EthFilter;
198    use crate::{
199        api::Namespace,
200        rpc::Value,
201        transports::test::TestTransport,
202        types::{Address, FilterBuilder, Log, H256},
203    };
204    use futures::stream::StreamExt;
205    use hex_literal::hex;
206    use std::time::Duration;
207
208    #[test]
209    fn logs_filter() {
210        // given
211        let mut transport = TestTransport::default();
212        transport.set_response(Value::String("0x123".into()));
213        {
214            let eth = EthFilter::new(&transport);
215
216            // when
217            let filter = FilterBuilder::default().limit(10).build();
218            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
219            assert_eq!(filter.id, "0x123".to_owned());
220        };
221
222        // then
223        transport.assert_request("eth_newFilter", &[r#"{"limit":10}"#.into()]);
224        transport.assert_no_more_requests();
225    }
226
227    #[test]
228    fn logs_filter_get_logs() {
229        // given
230        let log = Log {
231            address: Address::from_low_u64_be(1),
232            topics: vec![],
233            data: hex!("").into(),
234            block_hash: Some(H256::from_low_u64_be(2)),
235            block_number: Some(1.into()),
236            transaction_hash: Some(H256::from_low_u64_be(3)),
237            transaction_index: Some(0.into()),
238            log_index: Some(0.into()),
239            transaction_log_index: Some(0.into()),
240            log_type: Some("mined".into()),
241            removed: None,
242        };
243
244        let mut transport = TestTransport::default();
245        transport.set_response(Value::String("0x123".into()));
246        transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
247        let result = {
248            let eth = EthFilter::new(&transport);
249
250            // when
251            let filter = FilterBuilder::default()
252                .topics(None, Some(vec![H256::from_low_u64_be(2)]), None, None)
253                .build();
254            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
255            assert_eq!(filter.id, "0x123".to_owned());
256            futures::executor::block_on(filter.logs())
257        };
258
259        // then
260        assert_eq!(result, Ok(vec![log]));
261        transport.assert_request(
262            "eth_newFilter",
263            &[r#"{"topics":[null,"0x0000000000000000000000000000000000000000000000000000000000000002"]}"#.into()],
264        );
265        transport.assert_request("eth_getFilterLogs", &[r#""0x123""#.into()]);
266        transport.assert_no_more_requests();
267    }
268
269    #[test]
270    fn logs_filter_poll() {
271        // given
272        let log = Log {
273            address: Address::from_low_u64_be(1),
274            topics: vec![],
275            data: hex!("").into(),
276            block_hash: Some(H256::from_low_u64_be(2)),
277            block_number: Some(1.into()),
278            transaction_hash: Some(H256::from_low_u64_be(3)),
279            transaction_index: Some(0.into()),
280            log_index: Some(0.into()),
281            transaction_log_index: Some(0.into()),
282            log_type: Some("mined".into()),
283            removed: None,
284        };
285
286        let mut transport = TestTransport::default();
287        transport.set_response(Value::String("0x123".into()));
288        transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
289        let result = {
290            let eth = EthFilter::new(&transport);
291
292            // when
293            let filter = FilterBuilder::default()
294                .address(vec![Address::from_low_u64_be(2)])
295                .build();
296            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
297            assert_eq!(filter.id, "0x123".to_owned());
298            futures::executor::block_on(filter.poll())
299        };
300
301        // then
302        assert_eq!(result, Ok(Some(vec![log])));
303        transport.assert_request(
304            "eth_newFilter",
305            &[r#"{"address":"0x0000000000000000000000000000000000000002"}"#.into()],
306        );
307        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
308        transport.assert_no_more_requests();
309    }
310
311    #[test]
312    fn blocks_filter() {
313        // given
314        let mut transport = TestTransport::default();
315        transport.set_response(Value::String("0x123".into()));
316        {
317            let eth = EthFilter::new(&transport);
318
319            // when
320            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
321            assert_eq!(filter.id, "0x123".to_owned());
322        };
323
324        // then
325        transport.assert_request("eth_newBlockFilter", &[]);
326        transport.assert_no_more_requests();
327    }
328
329    #[test]
330    fn blocks_filter_poll() {
331        // given
332        let mut transport = TestTransport::default();
333        transport.set_response(Value::String("0x123".into()));
334        transport.add_response(Value::Array(vec![Value::String(
335            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
336        )]));
337        let result = {
338            let eth = EthFilter::new(&transport);
339
340            // when
341            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
342            assert_eq!(filter.id, "0x123".to_owned());
343            futures::executor::block_on(filter.poll())
344        };
345
346        // then
347        assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
348        transport.assert_request("eth_newBlockFilter", &[]);
349        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
350        transport.assert_no_more_requests();
351    }
352
353    #[test]
354    fn blocks_filter_stream() {
355        // given
356        let mut transport = TestTransport::default();
357        transport.set_response(Value::String("0x123".into()));
358        transport.add_response(Value::Array(vec![Value::String(
359            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
360        )]));
361        transport.add_response(Value::Array(vec![
362            Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000457"#.into()),
363            Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000458"#.into()),
364        ]));
365        transport.add_response(Value::Array(vec![Value::String(
366            r#"0x0000000000000000000000000000000000000000000000000000000000000459"#.into(),
367        )]));
368        let result: Vec<_> = {
369            let eth = EthFilter::new(&transport);
370
371            // when
372            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
373            futures::executor::block_on_stream(filter.stream(Duration::from_secs(0)).boxed_local())
374                .take(4)
375                .collect()
376        };
377
378        // then
379        assert_eq!(
380            result,
381            [0x456, 0x457, 0x458, 0x459]
382                .iter()
383                .copied()
384                .map(H256::from_low_u64_be)
385                .map(Ok)
386                .collect::<Vec<_>>()
387        );
388        transport.assert_request("eth_newBlockFilter", &[]);
389        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
390        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
391        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
392    }
393
394    #[test]
395    fn pending_transactions_filter() {
396        // given
397        let mut transport = TestTransport::default();
398        transport.set_response(Value::String("0x123".into()));
399        {
400            let eth = EthFilter::new(&transport);
401
402            // when
403            let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
404            assert_eq!(filter.id, "0x123".to_owned());
405        };
406
407        // then
408        transport.assert_request("eth_newPendingTransactionFilter", &[]);
409        transport.assert_no_more_requests();
410    }
411
412    #[test]
413    fn create_pending_transactions_filter_poll() {
414        // given
415        let mut transport = TestTransport::default();
416        transport.set_response(Value::String("0x123".into()));
417        transport.add_response(Value::Array(vec![Value::String(
418            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
419        )]));
420        let result = {
421            let eth = EthFilter::new(&transport);
422
423            // when
424            let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
425            assert_eq!(filter.id, "0x123".to_owned());
426            futures::executor::block_on(filter.poll())
427        };
428
429        // then
430        assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
431        transport.assert_request("eth_newPendingTransactionFilter", &[]);
432        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
433        transport.assert_no_more_requests();
434    }
435}