1use crate::{
4 api::Namespace,
5 error, helpers, rpc,
6 types::{Filter, Log, H256},
7 Transport,
8};
9use futures::{stream, Stream, TryStreamExt};
10use futures_timer::Delay;
11use serde::de::DeserializeOwned;
12use std::{fmt, marker::PhantomData, time::Duration, vec};
13
14fn filter_stream<T: Transport, I: DeserializeOwned>(
15 base: BaseFilter<T, I>,
16 poll_interval: Duration,
17) -> impl Stream<Item = error::Result<I>> {
18 let id = helpers::serialize(&base.id);
19 stream::unfold((base, id), move |state| async move {
20 let (base, id) = state;
21 Delay::new(poll_interval).await;
22 let response = base.transport.execute("eth_getFilterChanges", vec![id.clone()]).await;
23 let items: error::Result<Option<Vec<I>>> = response.and_then(helpers::decode);
24 let items = items.map(Option::unwrap_or_default);
25 Some((items, (base, id)))
26 })
27 .map_ok(|items| stream::iter(items.into_iter().map(Ok)))
29 .try_flatten()
30 .into_stream()
31}
32
33trait FilterInterface {
35 type Output;
37
38 fn constructor() -> &'static str;
40}
41
42#[derive(Debug)]
44struct LogsFilter;
45
46impl FilterInterface for LogsFilter {
47 type Output = Log;
48
49 fn constructor() -> &'static str {
50 "eth_newFilter"
51 }
52}
53
54#[derive(Debug)]
56struct BlocksFilter;
57
58impl FilterInterface for BlocksFilter {
59 type Output = H256;
60
61 fn constructor() -> &'static str {
62 "eth_newBlockFilter"
63 }
64}
65
66#[derive(Debug)]
68struct PendingTransactionsFilter;
69
70impl FilterInterface for PendingTransactionsFilter {
71 type Output = H256;
72
73 fn constructor() -> &'static str {
74 "eth_newPendingTransactionFilter"
75 }
76}
77
78pub struct BaseFilter<T: Transport, I> {
84 id: String,
86 transport: T,
87 item: PhantomData<I>,
88}
89
90impl<T: Transport, I: 'static> fmt::Debug for BaseFilter<T, I> {
91 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
92 fmt.debug_struct("BaseFilter")
93 .field("id", &self.id)
94 .field("transport", &self.transport)
95 .field("item", &std::any::TypeId::of::<I>())
96 .finish()
97 }
98}
99
100impl<T: Transport, I> Clone for BaseFilter<T, I> {
101 fn clone(&self) -> Self {
102 BaseFilter {
103 id: self.id.clone(),
104 transport: self.transport.clone(),
105 item: PhantomData::default(),
106 }
107 }
108}
109
110impl<T: Transport, I> BaseFilter<T, I> {
111 pub async fn uninstall(self) -> error::Result<bool>
113 where
114 Self: Sized,
115 {
116 let id = helpers::serialize(&self.id);
117 let response = self.transport.execute("eth_uninstallFilter", vec![id]).await?;
118 helpers::decode(response)
119 }
120
121 pub fn transport(&self) -> &T {
123 &self.transport
124 }
125}
126
127impl<T: Transport, I: DeserializeOwned> BaseFilter<T, I> {
128 pub async fn poll(&self) -> error::Result<Option<Vec<I>>> {
131 let id = helpers::serialize(&self.id);
132 let response = self.transport.execute("eth_getFilterChanges", vec![id]).await?;
133 helpers::decode(response)
134 }
135
136 pub fn stream(self, poll_interval: Duration) -> impl Stream<Item = error::Result<I>> {
138 filter_stream(self, poll_interval)
139 }
140}
141
142impl<T: Transport> BaseFilter<T, Log> {
143 pub async fn logs(&self) -> error::Result<Vec<Log>> {
145 let id = helpers::serialize(&self.id);
146 let response = self.transport.execute("eth_getFilterLogs", vec![id]).await?;
147 helpers::decode(response)
148 }
149}
150
151async fn create_filter<T: Transport, F: FilterInterface>(
153 transport: T,
154 arg: Vec<rpc::Value>,
155) -> error::Result<BaseFilter<T, F::Output>> {
156 let response = transport.execute(F::constructor(), arg).await?;
157 let id = helpers::decode(response)?;
158 Ok(BaseFilter {
159 id,
160 transport,
161 item: PhantomData,
162 })
163}
164
165#[derive(Debug, Clone)]
167pub struct EthFilter<T> {
168 transport: T,
169}
170
171impl<T: Transport> Namespace<T> for EthFilter<T> {
172 fn new(transport: T) -> Self
173 where
174 Self: Sized,
175 {
176 EthFilter { transport }
177 }
178
179 fn transport(&self) -> &T {
180 &self.transport
181 }
182}
183
184impl<T: Transport> EthFilter<T> {
185 pub async fn create_logs_filter(self, filter: Filter) -> error::Result<BaseFilter<T, Log>> {
187 let f = helpers::serialize(&filter);
188 create_filter::<_, LogsFilter>(self.transport, vec![f]).await
189 }
190
191 pub async fn create_blocks_filter(self) -> error::Result<BaseFilter<T, H256>> {
193 create_filter::<_, BlocksFilter>(self.transport, vec![]).await
194 }
195
196 pub async fn create_pending_transactions_filter(self) -> error::Result<BaseFilter<T, H256>> {
198 create_filter::<_, PendingTransactionsFilter>(self.transport, vec![]).await
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::EthFilter;
205 use crate::{
206 api::Namespace,
207 rpc::Value,
208 transports::test::TestTransport,
209 types::{Address, FilterBuilder, Log, H256},
210 };
211 use futures::stream::StreamExt;
212 use hex_literal::hex;
213 use std::time::Duration;
214
215 #[test]
216 fn logs_filter() {
217 let mut transport = TestTransport::default();
219 transport.set_response(Value::String("0x123".into()));
220 {
221 let eth = EthFilter::new(&transport);
222
223 let filter = FilterBuilder::default().limit(10).build();
225 let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
226 assert_eq!(filter.id, "0x123".to_owned());
227 };
228
229 transport.assert_request("eth_newFilter", &[r#"{"limit":10}"#.into()]);
231 transport.assert_no_more_requests();
232 }
233
234 #[test]
235 fn logs_filter_get_logs() {
236 let log = Log {
238 address: Address::from_low_u64_be(1),
239 topics: vec![],
240 data: hex!("").into(),
241 block_hash: Some(H256::from_low_u64_be(2)),
242 block_number: Some(1.into()),
243 transaction_hash: Some(H256::from_low_u64_be(3)),
244 transaction_index: Some(0.into()),
245 log_index: Some(0.into()),
246 transaction_log_index: Some(0.into()),
247 log_type: Some("mined".into()),
248 removed: None,
249 };
250
251 let mut transport = TestTransport::default();
252 transport.set_response(Value::String("0x123".into()));
253 transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
254 let result = {
255 let eth = EthFilter::new(&transport);
256
257 let filter = FilterBuilder::default()
259 .topics(None, Some(vec![H256::from_low_u64_be(2)]), None, None)
260 .build();
261 let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
262 assert_eq!(filter.id, "0x123".to_owned());
263 futures::executor::block_on(filter.logs())
264 };
265
266 assert_eq!(result, Ok(vec![log]));
268 transport.assert_request(
269 "eth_newFilter",
270 &[r#"{"topics":[null,"0x0000000000000000000000000000000000000000000000000000000000000002"]}"#.into()],
271 );
272 transport.assert_request("eth_getFilterLogs", &[r#""0x123""#.into()]);
273 transport.assert_no_more_requests();
274 }
275
276 #[test]
277 fn logs_filter_poll() {
278 let log = Log {
280 address: Address::from_low_u64_be(1),
281 topics: vec![],
282 data: hex!("").into(),
283 block_hash: Some(H256::from_low_u64_be(2)),
284 block_number: Some(1.into()),
285 transaction_hash: Some(H256::from_low_u64_be(3)),
286 transaction_index: Some(0.into()),
287 log_index: Some(0.into()),
288 transaction_log_index: Some(0.into()),
289 log_type: Some("mined".into()),
290 removed: None,
291 };
292
293 let mut transport = TestTransport::default();
294 transport.set_response(Value::String("0x123".into()));
295 transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
296 let result = {
297 let eth = EthFilter::new(&transport);
298
299 let filter = FilterBuilder::default()
301 .address(vec![Address::from_low_u64_be(2)])
302 .build();
303 let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
304 assert_eq!(filter.id, "0x123".to_owned());
305 futures::executor::block_on(filter.poll())
306 };
307
308 assert_eq!(result, Ok(Some(vec![log])));
310 transport.assert_request(
311 "eth_newFilter",
312 &[r#"{"address":"0x0000000000000000000000000000000000000002"}"#.into()],
313 );
314 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
315 transport.assert_no_more_requests();
316 }
317
318 #[test]
319 fn blocks_filter() {
320 let mut transport = TestTransport::default();
322 transport.set_response(Value::String("0x123".into()));
323 {
324 let eth = EthFilter::new(&transport);
325
326 let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
328 assert_eq!(filter.id, "0x123".to_owned());
329 };
330
331 transport.assert_request("eth_newBlockFilter", &[]);
333 transport.assert_no_more_requests();
334 }
335
336 #[test]
337 fn blocks_filter_poll() {
338 let mut transport = TestTransport::default();
340 transport.set_response(Value::String("0x123".into()));
341 transport.add_response(Value::Array(vec![Value::String(
342 r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
343 )]));
344 let result = {
345 let eth = EthFilter::new(&transport);
346
347 let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
349 assert_eq!(filter.id, "0x123".to_owned());
350 futures::executor::block_on(filter.poll())
351 };
352
353 assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
355 transport.assert_request("eth_newBlockFilter", &[]);
356 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
357 transport.assert_no_more_requests();
358 }
359
360 #[test]
361 fn blocks_filter_stream() {
362 let mut transport = TestTransport::default();
364 transport.set_response(Value::String("0x123".into()));
365 transport.add_response(Value::Array(vec![Value::String(
366 r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
367 )]));
368 transport.add_response(Value::Array(vec![
369 Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000457"#.into()),
370 Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000458"#.into()),
371 ]));
372 transport.add_response(Value::Array(vec![Value::String(
373 r#"0x0000000000000000000000000000000000000000000000000000000000000459"#.into(),
374 )]));
375 let result: Vec<_> = {
376 let eth = EthFilter::new(&transport);
377
378 let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
380 futures::executor::block_on_stream(filter.stream(Duration::from_secs(0)).boxed_local())
381 .take(4)
382 .collect()
383 };
384
385 assert_eq!(
387 result,
388 [0x456, 0x457, 0x458, 0x459]
389 .iter()
390 .copied()
391 .map(H256::from_low_u64_be)
392 .map(Ok)
393 .collect::<Vec<_>>()
394 );
395 transport.assert_request("eth_newBlockFilter", &[]);
396 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
397 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
398 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
399 }
400
401 #[test]
402 fn pending_transactions_filter() {
403 let mut transport = TestTransport::default();
405 transport.set_response(Value::String("0x123".into()));
406 {
407 let eth = EthFilter::new(&transport);
408
409 let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
411 assert_eq!(filter.id, "0x123".to_owned());
412 };
413
414 transport.assert_request("eth_newPendingTransactionFilter", &[]);
416 transport.assert_no_more_requests();
417 }
418
419 #[test]
420 fn create_pending_transactions_filter_poll() {
421 let mut transport = TestTransport::default();
423 transport.set_response(Value::String("0x123".into()));
424 transport.add_response(Value::Array(vec![Value::String(
425 r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
426 )]));
427 let result = {
428 let eth = EthFilter::new(&transport);
429
430 let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
432 assert_eq!(filter.id, "0x123".to_owned());
433 futures::executor::block_on(filter.poll())
434 };
435
436 assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
438 transport.assert_request("eth_newPendingTransactionFilter", &[]);
439 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
440 transport.assert_no_more_requests();
441 }
442}