1use crate::{
4 api::Namespace,
5 error, helpers, rpc,
6 types::{Filter, Log, H256},
7 Transport,
8};
9use futures::{stream, Stream, TryStreamExt};
10use futures_timer::Delay;
11use serde::de::DeserializeOwned;
12use std::{fmt, marker::PhantomData, time::Duration, vec};
13
14fn filter_stream<T: Transport, I: DeserializeOwned>(
15 base: BaseFilter<T, I>,
16 poll_interval: Duration,
17) -> impl Stream<Item = error::Result<I>> {
18 let id = helpers::serialize(&base.id);
19 stream::unfold((base, id), move |state| async move {
20 let (base, id) = state;
21 Delay::new(poll_interval).await;
22 let response = base.transport.execute("eth_getFilterChanges", vec![id.clone()]).await;
23 let items: error::Result<Option<Vec<I>>> = response.and_then(helpers::decode);
24 let items = items.map(Option::unwrap_or_default);
25 Some((items, (base, id)))
26 })
27 .map_ok(|items| stream::iter(items.into_iter().map(Ok)))
29 .try_flatten()
30 .into_stream()
31}
32
33trait FilterInterface {
35 type Output;
37
38 fn constructor() -> &'static str;
40}
41
42#[derive(Debug)]
44struct LogsFilter;
45
46impl FilterInterface for LogsFilter {
47 type Output = Log;
48
49 fn constructor() -> &'static str {
50 "eth_newFilter"
51 }
52}
53
54#[derive(Debug)]
56struct BlocksFilter;
57
58impl FilterInterface for BlocksFilter {
59 type Output = H256;
60
61 fn constructor() -> &'static str {
62 "eth_newBlockFilter"
63 }
64}
65
66#[derive(Debug)]
68struct PendingTransactionsFilter;
69
70impl FilterInterface for PendingTransactionsFilter {
71 type Output = H256;
72
73 fn constructor() -> &'static str {
74 "eth_newPendingTransactionFilter"
75 }
76}
77
78pub struct BaseFilter<T: Transport, I> {
82 id: String,
84 transport: T,
85 item: PhantomData<I>,
86}
87
88impl<T: Transport, I: 'static> fmt::Debug for BaseFilter<T, I> {
89 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
90 fmt.debug_struct("BaseFilter")
91 .field("id", &self.id)
92 .field("transport", &self.transport)
93 .field("item", &std::any::TypeId::of::<I>())
94 .finish()
95 }
96}
97
98impl<T: Transport, I> Clone for BaseFilter<T, I> {
99 fn clone(&self) -> Self {
100 BaseFilter {
101 id: self.id.clone(),
102 transport: self.transport.clone(),
103 item: PhantomData::default(),
104 }
105 }
106}
107
108impl<T: Transport, I> BaseFilter<T, I> {
109 pub async fn uninstall(self) -> error::Result<bool>
111 where
112 Self: Sized,
113 {
114 let id = helpers::serialize(&self.id);
115 let response = self.transport.execute("eth_uninstallFilter", vec![id]).await?;
116 helpers::decode(response)
117 }
118
119 pub fn transport(&self) -> &T {
121 &self.transport
122 }
123}
124
125impl<T: Transport, I: DeserializeOwned> BaseFilter<T, I> {
126 pub async fn poll(&self) -> error::Result<Option<Vec<I>>> {
129 let id = helpers::serialize(&self.id);
130 let response = self.transport.execute("eth_getFilterChanges", vec![id]).await?;
131 helpers::decode(response)
132 }
133
134 pub fn stream(self, poll_interval: Duration) -> impl Stream<Item = error::Result<I>> {
136 filter_stream(self, poll_interval)
137 }
138}
139
140impl<T: Transport> BaseFilter<T, Log> {
141 pub async fn logs(&self) -> error::Result<Vec<Log>> {
143 let id = helpers::serialize(&self.id);
144 let response = self.transport.execute("eth_getFilterLogs", vec![id]).await?;
145 helpers::decode(response)
146 }
147}
148
149async fn create_filter<T: Transport, F: FilterInterface>(
151 transport: T,
152 arg: Vec<rpc::Value>,
153) -> error::Result<BaseFilter<T, F::Output>> {
154 let response = transport.execute(F::constructor(), arg).await?;
155 let id = helpers::decode(response)?;
156 Ok(BaseFilter {
157 id,
158 transport,
159 item: PhantomData,
160 })
161}
162
163#[derive(Debug, Clone)]
165pub struct EthFilter<T> {
166 transport: T,
167}
168
169impl<T: Transport> Namespace<T> for EthFilter<T> {
170 fn new(transport: T) -> Self
171 where
172 Self: Sized,
173 {
174 EthFilter { transport }
175 }
176
177 fn transport(&self) -> &T {
178 &self.transport
179 }
180}
181
182impl<T: Transport> EthFilter<T> {
183 pub async fn create_logs_filter(self, filter: Filter) -> error::Result<BaseFilter<T, Log>> {
185 let f = helpers::serialize(&filter);
186 create_filter::<_, LogsFilter>(self.transport, vec![f]).await
187 }
188
189 pub async fn create_blocks_filter(self) -> error::Result<BaseFilter<T, H256>> {
191 create_filter::<_, BlocksFilter>(self.transport, vec![]).await
192 }
193
194 pub async fn create_pending_transactions_filter(self) -> error::Result<BaseFilter<T, H256>> {
196 create_filter::<_, PendingTransactionsFilter>(self.transport, vec![]).await
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::EthFilter;
203 use crate::{
204 api::Namespace,
205 rpc::Value,
206 transports::test::TestTransport,
207 types::{Address, FilterBuilder, Log, H256},
208 };
209 use futures::stream::StreamExt;
210 use hex_literal::hex;
211 use std::time::Duration;
212
213 #[test]
214 fn logs_filter() {
215 let mut transport = TestTransport::default();
217 transport.set_response(Value::String("0x123".into()));
218 {
219 let eth = EthFilter::new(&transport);
220
221 let filter = FilterBuilder::default().limit(10).build();
223 let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
224 assert_eq!(filter.id, "0x123".to_owned());
225 };
226
227 transport.assert_request("eth_newFilter", &[r#"{"limit":10}"#.into()]);
229 transport.assert_no_more_requests();
230 }
231
232 #[test]
233 fn logs_filter_get_logs() {
234 let log = Log {
236 address: Address::from_low_u64_be(1),
237 topics: vec![],
238 data: hex!("").into(),
239 block_hash: Some(H256::from_low_u64_be(2)),
240 block_number: Some(1.into()),
241 transaction_hash: Some(H256::from_low_u64_be(3)),
242 transaction_index: Some(0.into()),
243 log_index: Some(0.into()),
244 transaction_log_index: Some(0.into()),
245 log_type: Some("mined".into()),
246 removed: None,
247 };
248
249 let mut transport = TestTransport::default();
250 transport.set_response(Value::String("0x123".into()));
251 transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
252 let result = {
253 let eth = EthFilter::new(&transport);
254
255 let filter = FilterBuilder::default()
257 .topics(None, Some(vec![H256::from_low_u64_be(2)]), None, None)
258 .build();
259 let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
260 assert_eq!(filter.id, "0x123".to_owned());
261 futures::executor::block_on(filter.logs())
262 };
263
264 assert_eq!(result, Ok(vec![log]));
266 transport.assert_request(
267 "eth_newFilter",
268 &[r#"{"topics":[null,"0x0000000000000000000000000000000000000000000000000000000000000002"]}"#.into()],
269 );
270 transport.assert_request("eth_getFilterLogs", &[r#""0x123""#.into()]);
271 transport.assert_no_more_requests();
272 }
273
274 #[test]
275 fn logs_filter_poll() {
276 let log = Log {
278 address: Address::from_low_u64_be(1),
279 topics: vec![],
280 data: hex!("").into(),
281 block_hash: Some(H256::from_low_u64_be(2)),
282 block_number: Some(1.into()),
283 transaction_hash: Some(H256::from_low_u64_be(3)),
284 transaction_index: Some(0.into()),
285 log_index: Some(0.into()),
286 transaction_log_index: Some(0.into()),
287 log_type: Some("mined".into()),
288 removed: None,
289 };
290
291 let mut transport = TestTransport::default();
292 transport.set_response(Value::String("0x123".into()));
293 transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
294 let result = {
295 let eth = EthFilter::new(&transport);
296
297 let filter = FilterBuilder::default()
299 .address(vec![Address::from_low_u64_be(2)])
300 .build();
301 let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
302 assert_eq!(filter.id, "0x123".to_owned());
303 futures::executor::block_on(filter.poll())
304 };
305
306 assert_eq!(result, Ok(Some(vec![log])));
308 transport.assert_request(
309 "eth_newFilter",
310 &[r#"{"address":"0x0000000000000000000000000000000000000002"}"#.into()],
311 );
312 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
313 transport.assert_no_more_requests();
314 }
315
316 #[test]
317 fn blocks_filter() {
318 let mut transport = TestTransport::default();
320 transport.set_response(Value::String("0x123".into()));
321 {
322 let eth = EthFilter::new(&transport);
323
324 let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
326 assert_eq!(filter.id, "0x123".to_owned());
327 };
328
329 transport.assert_request("eth_newBlockFilter", &[]);
331 transport.assert_no_more_requests();
332 }
333
334 #[test]
335 fn blocks_filter_poll() {
336 let mut transport = TestTransport::default();
338 transport.set_response(Value::String("0x123".into()));
339 transport.add_response(Value::Array(vec![Value::String(
340 r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
341 )]));
342 let result = {
343 let eth = EthFilter::new(&transport);
344
345 let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
347 assert_eq!(filter.id, "0x123".to_owned());
348 futures::executor::block_on(filter.poll())
349 };
350
351 assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
353 transport.assert_request("eth_newBlockFilter", &[]);
354 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
355 transport.assert_no_more_requests();
356 }
357
358 #[test]
359 fn blocks_filter_stream() {
360 let mut transport = TestTransport::default();
362 transport.set_response(Value::String("0x123".into()));
363 transport.add_response(Value::Array(vec![Value::String(
364 r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
365 )]));
366 transport.add_response(Value::Array(vec![
367 Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000457"#.into()),
368 Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000458"#.into()),
369 ]));
370 transport.add_response(Value::Array(vec![Value::String(
371 r#"0x0000000000000000000000000000000000000000000000000000000000000459"#.into(),
372 )]));
373 let result: Vec<_> = {
374 let eth = EthFilter::new(&transport);
375
376 let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
378 futures::executor::block_on_stream(filter.stream(Duration::from_secs(0)).boxed_local())
379 .take(4)
380 .collect()
381 };
382
383 assert_eq!(
385 result,
386 [0x456, 0x457, 0x458, 0x459]
387 .iter()
388 .copied()
389 .map(H256::from_low_u64_be)
390 .map(Ok)
391 .collect::<Vec<_>>()
392 );
393 transport.assert_request("eth_newBlockFilter", &[]);
394 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
395 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
396 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
397 }
398
399 #[test]
400 fn pending_transactions_filter() {
401 let mut transport = TestTransport::default();
403 transport.set_response(Value::String("0x123".into()));
404 {
405 let eth = EthFilter::new(&transport);
406
407 let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
409 assert_eq!(filter.id, "0x123".to_owned());
410 };
411
412 transport.assert_request("eth_newPendingTransactionFilter", &[]);
414 transport.assert_no_more_requests();
415 }
416
417 #[test]
418 fn create_pending_transactions_filter_poll() {
419 let mut transport = TestTransport::default();
421 transport.set_response(Value::String("0x123".into()));
422 transport.add_response(Value::Array(vec![Value::String(
423 r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
424 )]));
425 let result = {
426 let eth = EthFilter::new(&transport);
427
428 let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
430 assert_eq!(filter.id, "0x123".to_owned());
431 futures::executor::block_on(filter.poll())
432 };
433
434 assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
436 transport.assert_request("eth_newPendingTransactionFilter", &[]);
437 transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
438 transport.assert_no_more_requests();
439 }
440}