1use crate::helpers::CallFuture;
3use crate::prelude::*;
4use crate::{
5 api::Namespace,
6 error, helpers,
7 types::{Filter, Log, H256},
8 Transport,
9};
10use futures::{stream, Stream, TryStreamExt};
11use core::{fmt, marker::PhantomData, time::Duration};
13use serde::de::DeserializeOwned;
14
15fn filter_stream<T: Transport, I: DeserializeOwned>(
16 base: BaseFilter<T, I>,
17 poll_interval: Duration,
18) -> impl Stream<Item = error::Result<I>> {
19 stream::unfold(base, move |base| async move {
20 let items = base.poll().await;
22 let items = items.map(Option::unwrap_or_default);
23 Some((items, base))
24 })
25 .map_ok(|items| stream::iter(items.into_iter().map(Ok)))
27 .try_flatten()
28 .into_stream()
29}
30
31trait FilterInterface {
33 type Output;
35
36 fn constructor() -> &'static str;
38}
39
40#[derive(Debug)]
42struct LogsFilter;
43
44impl FilterInterface for LogsFilter {
45 type Output = Log;
46
47 fn constructor() -> &'static str {
48 "eth_newFilter"
49 }
50}
51
52#[derive(Debug)]
54struct BlocksFilter;
55
56impl FilterInterface for BlocksFilter {
57 type Output = H256;
58
59 fn constructor() -> &'static str {
60 "eth_newBlockFilter"
61 }
62}
63
64#[derive(Debug)]
66struct PendingTransactionsFilter;
67
68impl FilterInterface for PendingTransactionsFilter {
69 type Output = H256;
70
71 fn constructor() -> &'static str {
72 "eth_newPendingTransactionFilter"
73 }
74}
75
76pub struct BaseFilter<T: Transport, I> {
82 id: String,
84 transport: T,
85 item: PhantomData<I>,
86}
87
88impl<T: Transport, I: 'static> fmt::Debug for BaseFilter<T, I> {
89 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
90 fmt.debug_struct("BaseFilter")
91 .field("id", &self.id)
92 .field("item", &std::any::TypeId::of::<I>())
93 .finish()
94 }
95}
96
97impl<T: Transport, I> Clone for BaseFilter<T, I> {
98 fn clone(&self) -> Self {
99 BaseFilter {
100 id: self.id.clone(),
101 transport: self.transport.clone(),
102 item: PhantomData::default(),
103 }
104 }
105}
106
107impl<T: Transport, I> BaseFilter<T, I> {
108 pub async fn uninstall(self) -> error::Result<bool>
110 where
111 Self: Sized,
112 {
113 let id = helpers::serialize(&self.id);
114 Ok(CallFuture::new(self.transport.execute("eth_uninstallFilter", vec![id])).await?)
115 }
116
117 pub fn transport(&self) -> &T {
119 &self.transport
120 }
121}
122
123impl<T: Transport, I: DeserializeOwned> BaseFilter<T, I> {
124 pub async fn poll(&self) -> error::Result<Option<Vec<I>>> {
127 let id = helpers::serialize(&self.id);
128 Ok(CallFuture::new(self.transport.execute("eth_getFilterChanges", vec![id])).await?)
129 }
130
131 pub fn stream(self, poll_interval: Duration) -> impl Stream<Item = error::Result<I>> {
133 filter_stream(self, poll_interval)
134 }
135}
136
137impl<T: Transport> BaseFilter<T, Log> {
138 pub async fn logs(&self) -> error::Result<Vec<Log>> {
140 let id = helpers::serialize(&self.id);
141 Ok(CallFuture::new(self.transport.execute("eth_getFilterLogs", vec![id])).await?)
142 }
143}
144
145async fn create_filter<'a, T: Transport, F: FilterInterface>(
147 transport: T,
148 arg: Vec<crate::Value<'a>>,
149) -> error::Result<BaseFilter<T, F::Output>> {
150 let id = CallFuture::new(transport.execute(F::constructor(), arg)).await?;
151 Ok(BaseFilter {
152 id,
153 transport,
154 item: PhantomData,
155 })
156}
157
158#[derive(Debug, Clone)]
160pub struct EthFilter<T> {
161 transport: T,
162}
163
164impl<T: Transport> Namespace<T> for EthFilter<T> {
165 fn new(transport: T) -> Self
166 where
167 Self: Sized,
168 {
169 EthFilter { transport }
170 }
171
172 fn transport(&self) -> &T {
173 &self.transport
174 }
175}
176
177impl<T: Transport> EthFilter<T> {
178 pub async fn create_logs_filter(self, filter: Filter) -> error::Result<BaseFilter<T, Log>> {
180 let f = helpers::serialize(&filter);
181 create_filter::<_, LogsFilter>(self.transport, vec![f]).await
182 }
183
184 pub async fn create_blocks_filter(self) -> error::Result<BaseFilter<T, H256>> {
186 create_filter::<_, BlocksFilter>(self.transport, vec![]).await
187 }
188
189 pub async fn create_pending_transactions_filter(self) -> error::Result<BaseFilter<T, H256>> {
191 create_filter::<_, PendingTransactionsFilter>(self.transport, vec![]).await
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::EthFilter;
198 use crate::{
199 api::Namespace,
200 rpc::Value,
201 transports::test::TestTransport,
202 types::{Address, FilterBuilder, Log, H256},
203 };
204 use futures::stream::StreamExt;
205 use hex_literal::hex;
206 use std::time::Duration;
207
208 #[test]
209 fn logs_filter() {
210 let mut transport = TestTransport::default();
212 transport.set_response(Value::String("0x123".into()));
213 {
214 let eth = EthFilter::new(&transport);
215
216 let filter = FilterBuilder::default().limit(10).build();
218 let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
219 assert_eq!(filter.id, "0x123".to_owned());
220 };
221
222 transport.assert_request("eth_newFilter", &[r#"{"limit":10}"#.into()]);
224 transport.assert_no_more_requests();
225 }
226
227 #[test]
228 fn logs_filter_get_logs() {
229 let log = Log {
231 address: Address::from_low_u64_be(1),
232 topics: vec![],
233 data: hex!("").into(),
234 block_hash: Some(H256::from_low_u64_be(2)),
235 block_number: Some(1.into()),
236 transaction_hash: Some(H256::from_low_u64_be(3)),
237 transaction_index: Some(0.into()),
238 log_index: Some(0.into()),
239 transaction_log_index: Some(0.into()),
240 log_type: Some("mined".into()),
241 removed: None,
242 };
243
244 let mut transport = TestTransport::default();
245 transport.set_response(Value::String("0x123".into()));
246 transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
247 let result = {
248 let eth = EthFilter::new(&transport);
249
250 let filter = FilterBuilder::default()
252 .topics(None, Some(vec![H256::from_low_u64_be(2)]), None, None)
253 .build();
254 let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
255 assert_eq!(filter.id, "0x123".to_owned());
256 futures::executor::block_on(filter.logs())
257 };
258
259 assert_eq!(result, Ok(vec![log]));
261 transport.assert_request(
262 "eth_newFilter",
263 &[r#"{"topics":[null,"0x0000000000000000000000000000000000000000000000000000000000000002"]}"#.into()],
264 );
265 transport.assert_request("eth_getFilterLogs", &[r#""0x123""#.into()]);
266 transport.assert_no_more_requests();
267 }
268
269 #[test]
270 fn logs_filter_poll() {
271 let log = Log {
273 address: Address::from_low_u64_be(1),
274 topics: vec![],
275 data: hex!("").into(),
276 block_hash: Some(H256::from_low_u64_be(2)),
277 block_number: Some(1.into()),
278 transaction_hash: Some(H256::from_low_u64_be(3)),
279 transaction_index: Some(0.into()),
280 log_index: Some(0.into()),
281 transaction_log_index: Some(0.into()),
282 log_type: Some("mined".into()),
283 removed: None,
284 };
285
286 let mut transport = TestTransport::default();
287 transport.set_response(Value::String("0x123".into()));
288 transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
289 let result = {
290 let eth = EthFilter::new(&transport);
291
292 let filter = FilterBuilder::default()
294 .address(vec![Address::from_low_u64_be(2)])
295 .build();
296 let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
297 assert_eq!(filter.id, "0x123".to_owned());
298 futures::executor::block_on(filter.poll())
299 };
300
301 assert_eq!(result, Ok(Some(vec![log])));
303 transport.assert_request(
304 "eth_newFilter",
305 &[r#"{"address":"0x0000000000000000000000000000000000000002"}"#.into()],
306 );
307 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
308 transport.assert_no_more_requests();
309 }
310
311 #[test]
312 fn blocks_filter() {
313 let mut transport = TestTransport::default();
315 transport.set_response(Value::String("0x123".into()));
316 {
317 let eth = EthFilter::new(&transport);
318
319 let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
321 assert_eq!(filter.id, "0x123".to_owned());
322 };
323
324 transport.assert_request("eth_newBlockFilter", &[]);
326 transport.assert_no_more_requests();
327 }
328
329 #[test]
330 fn blocks_filter_poll() {
331 let mut transport = TestTransport::default();
333 transport.set_response(Value::String("0x123".into()));
334 transport.add_response(Value::Array(vec![Value::String(
335 r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
336 )]));
337 let result = {
338 let eth = EthFilter::new(&transport);
339
340 let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
342 assert_eq!(filter.id, "0x123".to_owned());
343 futures::executor::block_on(filter.poll())
344 };
345
346 assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
348 transport.assert_request("eth_newBlockFilter", &[]);
349 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
350 transport.assert_no_more_requests();
351 }
352
353 #[test]
354 fn blocks_filter_stream() {
355 let mut transport = TestTransport::default();
357 transport.set_response(Value::String("0x123".into()));
358 transport.add_response(Value::Array(vec![Value::String(
359 r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
360 )]));
361 transport.add_response(Value::Array(vec![
362 Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000457"#.into()),
363 Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000458"#.into()),
364 ]));
365 transport.add_response(Value::Array(vec![Value::String(
366 r#"0x0000000000000000000000000000000000000000000000000000000000000459"#.into(),
367 )]));
368 let result: Vec<_> = {
369 let eth = EthFilter::new(&transport);
370
371 let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
373 futures::executor::block_on_stream(filter.stream(Duration::from_secs(0)).boxed_local())
374 .take(4)
375 .collect()
376 };
377
378 assert_eq!(
380 result,
381 [0x456, 0x457, 0x458, 0x459]
382 .iter()
383 .copied()
384 .map(H256::from_low_u64_be)
385 .map(Ok)
386 .collect::<Vec<_>>()
387 );
388 transport.assert_request("eth_newBlockFilter", &[]);
389 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
390 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
391 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
392 }
393
394 #[test]
395 fn pending_transactions_filter() {
396 let mut transport = TestTransport::default();
398 transport.set_response(Value::String("0x123".into()));
399 {
400 let eth = EthFilter::new(&transport);
401
402 let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
404 assert_eq!(filter.id, "0x123".to_owned());
405 };
406
407 transport.assert_request("eth_newPendingTransactionFilter", &[]);
409 transport.assert_no_more_requests();
410 }
411
412 #[test]
413 fn create_pending_transactions_filter_poll() {
414 let mut transport = TestTransport::default();
416 transport.set_response(Value::String("0x123".into()));
417 transport.add_response(Value::Array(vec![Value::String(
418 r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
419 )]));
420 let result = {
421 let eth = EthFilter::new(&transport);
422
423 let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
425 assert_eq!(filter.id, "0x123".to_owned());
426 futures::executor::block_on(filter.poll())
427 };
428
429 assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
431 transport.assert_request("eth_newPendingTransactionFilter", &[]);
432 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
433 transport.assert_no_more_requests();
434 }
435}