Skip to main content

ethrex_rpc/eth/
filter.rs

1// The behaviour of the filtering endpoints is based on:
2// - Manually testing the behaviour deploying contracts on the Sepolia test network.
3// - Go-Ethereum, specifically: https://github.com/ethereum/go-ethereum/blob/368e16f39d6c7e5cce72a92ec289adbfbaed4854/eth/filters/filter.go
4// - Ethereum's reference: https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_newfilter
5use ethrex_common::types::BlockNumber;
6use ethrex_storage::Store;
7use std::{
8    collections::HashMap,
9    sync::{Arc, Mutex},
10    time::{Duration, Instant},
11};
12use tracing::error;
13
14use crate::rpc::RpcHandler;
15use crate::{
16    types::block_identifier::{BlockIdentifier, BlockTag},
17    utils::{RpcErr, RpcRequest, parse_json_hex},
18};
19use serde_json::{Value, json};
20
21use super::logs::{LogsFilter, fetch_logs_with_filter};
22
23#[derive(Debug, Clone)]
24pub struct NewFilterRequest {
25    pub request_data: LogsFilter,
26}
27
28/// Used by the tokio runtime to clean outdated filters
29/// Takes 2 arguments:
30/// - filters: the filters to clean up.
31/// - filter_duration: represents how many *seconds* filter can last,
32///   if any filter is older than this, it will be removed.
33pub fn clean_outdated_filters(filters: ActiveFilters, filter_duration: Duration) {
34    let mut active_filters_guard = filters.lock().unwrap_or_else(|mut poisoned_guard| {
35        error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE");
36        **poisoned_guard.get_mut() = HashMap::new();
37        filters.clear_poison();
38        poisoned_guard.into_inner()
39    });
40
41    // Keep only filters that have not expired.
42    active_filters_guard
43        .retain(|_, (filter_timestamp, _)| filter_timestamp.elapsed() <= filter_duration);
44}
45/// Maps IDs to active pollable filters and their timestamps.
46pub type ActiveFilters = Arc<Mutex<HashMap<u64, (Instant, PollableFilter)>>>;
47
48#[derive(Debug, Clone)]
49pub struct PollableFilter {
50    /// Last block number from when this
51    /// filter was requested or created.
52    /// i.e. if this filter is requested,
53    /// the log will be applied from this
54    /// block number up to the latest one.
55    pub last_block_number: BlockNumber,
56    pub filter_data: LogsFilter,
57}
58
59impl NewFilterRequest {
60    pub fn parse(params: &Option<Vec<serde_json::Value>>) -> Result<Self, RpcErr> {
61        let filter = LogsFilter::parse(params)?;
62        Ok(NewFilterRequest {
63            request_data: filter,
64        })
65    }
66
67    pub async fn handle(
68        &self,
69        storage: ethrex_storage::Store,
70        filters: ActiveFilters,
71    ) -> Result<serde_json::Value, crate::utils::RpcErr> {
72        let from = self
73            .request_data
74            .from_block
75            .resolve_block_number(&storage)
76            .await?
77            .ok_or(RpcErr::WrongParam("fromBlock".to_string()))?;
78        let to = self
79            .request_data
80            .to_block
81            .resolve_block_number(&storage)
82            .await?
83            .ok_or(RpcErr::WrongParam("toBlock".to_string()))?;
84
85        if (from..=to).is_empty() {
86            return Err(RpcErr::BadParams("Invalid block range".to_string()));
87        }
88
89        let last_block_number = storage.get_latest_block_number().await?;
90        let id: u64 = rand::random();
91        let timestamp = Instant::now();
92        let mut active_filters_guard = filters.lock().unwrap_or_else(|mut poisoned_guard| {
93            error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE");
94            **poisoned_guard.get_mut() = HashMap::new();
95            filters.clear_poison();
96            poisoned_guard.into_inner()
97        });
98        active_filters_guard.insert(
99            id,
100            (
101                timestamp,
102                PollableFilter {
103                    last_block_number,
104                    filter_data: self.request_data.clone(),
105                },
106            ),
107        );
108        let as_hex = json!(format!("0x{:x}", id));
109        Ok(as_hex)
110    }
111
112    pub async fn stateful_call(
113        req: &RpcRequest,
114        storage: Store,
115        state: ActiveFilters,
116    ) -> Result<Value, RpcErr> {
117        let request = Self::parse(&req.params)?;
118        request.handle(storage, state).await
119    }
120}
121
122pub struct DeleteFilterRequest {
123    pub id: u64,
124}
125
126impl DeleteFilterRequest {
127    pub fn parse(params: &Option<Vec<serde_json::Value>>) -> Result<Self, RpcErr> {
128        match params.as_deref() {
129            Some([param]) => {
130                let id = parse_json_hex(param).map_err(|_err| RpcErr::BadHexFormat(0))?;
131                Ok(DeleteFilterRequest { id })
132            }
133            Some(_) => Err(RpcErr::BadParams(
134                "Expected an array with a single hex encoded id".to_string(),
135            )),
136            None => Err(RpcErr::MissingParam("0".to_string())),
137        }
138    }
139
140    pub fn handle(
141        &self,
142        _storage: ethrex_storage::Store,
143        filters: ActiveFilters,
144    ) -> Result<serde_json::Value, crate::utils::RpcErr> {
145        let mut active_filters_guard = filters.lock().unwrap_or_else(|mut poisoned_guard| {
146            error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE");
147            **poisoned_guard.get_mut() = HashMap::new();
148            filters.clear_poison();
149            poisoned_guard.into_inner()
150        });
151        match active_filters_guard.remove(&self.id) {
152            Some(_) => Ok(true.into()),
153            None => Ok(false.into()),
154        }
155    }
156
157    pub fn stateful_call(
158        req: &RpcRequest,
159        storage: ethrex_storage::Store,
160        filters: ActiveFilters,
161    ) -> Result<serde_json::Value, crate::utils::RpcErr> {
162        let request = Self::parse(&req.params)?;
163        request.handle(storage, filters)
164    }
165}
166
167pub struct FilterChangesRequest {
168    pub id: u64,
169}
170
171impl FilterChangesRequest {
172    pub fn parse(params: &Option<Vec<serde_json::Value>>) -> Result<Self, RpcErr> {
173        match params.as_deref() {
174            Some([param]) => {
175                let id = parse_json_hex(param).map_err(|_err| RpcErr::BadHexFormat(0))?;
176                Ok(FilterChangesRequest { id })
177            }
178            Some(_) => Err(RpcErr::BadParams(
179                "Expected an array with a single hex encoded id".to_string(),
180            )),
181            None => Err(RpcErr::MissingParam("0".to_string())),
182        }
183    }
184    pub async fn handle(
185        &self,
186        storage: ethrex_storage::Store,
187        filters: ActiveFilters,
188    ) -> Result<serde_json::Value, crate::utils::RpcErr> {
189        let latest_block_num = storage.get_latest_block_number().await?;
190        // Box needed to keep the future Sync
191        // https://github.com/rust-lang/rust/issues/128095
192        let mut active_filters_guard =
193            Box::new(filters.lock().unwrap_or_else(|mut poisoned_guard| {
194                error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE");
195                **poisoned_guard.get_mut() = HashMap::new();
196                filters.clear_poison();
197                poisoned_guard.into_inner()
198            }));
199        if let Some((timestamp, filter)) = active_filters_guard.get_mut(&self.id) {
200            // We'll only get changes for a filter that either has a block
201            // range for upcoming blocks, or for the 'latest' tag.
202            let valid_block_range = match filter.filter_data.to_block {
203                BlockIdentifier::Tag(BlockTag::Latest) => true,
204                BlockIdentifier::Number(block_num) if block_num >= latest_block_num => true,
205                _ => false,
206            };
207            // This filter has a valid block range, so here's what we'll do:
208            // - Update the filter's timestamp and block number from the last poll.
209            // - Do the query to fetch logs in range last_block_number..=to_block for
210            //   this filter.
211            if valid_block_range {
212                // Since the filter was polled, updated its timestamp, so
213                // it does not expire.
214                *timestamp = Instant::now();
215                // Update this filter so the current query
216                // starts from the last polled block.
217                filter.filter_data.from_block = BlockIdentifier::Number(filter.last_block_number);
218                filter.last_block_number = latest_block_num;
219                let mut filter = filter.clone();
220                filter.filter_data.to_block = BlockIdentifier::Number(latest_block_num);
221                // Drop the lock early to process this filter's query
222                // and not keep the lock more than we should.
223                drop(active_filters_guard);
224                let logs = fetch_logs_with_filter(&filter.filter_data, storage).await?;
225                serde_json::to_value(logs).map_err(|error| {
226                    tracing::error!("Log filtering request failed with: {error}");
227                    RpcErr::Internal("Failed to filter logs".to_string())
228                })
229            } else {
230                serde_json::to_value(Vec::<u8>::new()).map_err(|error| {
231                    tracing::error!("Log filtering request failed with: {error}");
232                    RpcErr::Internal("Failed to filter logs".to_string())
233                })
234            }
235        } else {
236            Err(RpcErr::BadParams(
237                "No matching filter for given id".to_string(),
238            ))
239        }
240    }
241    pub async fn stateful_call(
242        req: &RpcRequest,
243        storage: ethrex_storage::Store,
244        filters: ActiveFilters,
245    ) -> Result<serde_json::Value, crate::utils::RpcErr> {
246        let request = Self::parse(&req.params)?;
247        request.handle(storage, filters).await
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use std::{
254        collections::HashMap,
255        sync::{Arc, Mutex},
256        time::{Duration, Instant},
257    };
258
259    use super::ActiveFilters;
260    use crate::{
261        eth::{
262            filter::PollableFilter,
263            logs::{AddressFilter, LogsFilter, TopicFilter},
264        },
265        rpc::{FILTER_DURATION, map_http_requests},
266        test_utils::{TEST_GENESIS, default_context_with_storage, start_test_api},
267    };
268    use crate::{types::block_identifier::BlockIdentifier, utils::RpcRequest};
269    use ethrex_common::types::Genesis;
270    use ethrex_storage::{EngineType, Store};
271
272    use serde_json::{Value, json};
273
274    #[tokio::test]
275    async fn filter_request_smoke_test_valid_params() {
276        let filter_req_params = json!(
277                {
278                    "fromBlock": "0x1",
279                    "toBlock": "0x2",
280                    "address": null,
281                    "topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]
282                }
283        );
284        let raw_json = json!(
285        {
286            "jsonrpc":"2.0",
287            "method":"eth_newFilter",
288            "params":
289            [
290                filter_req_params.clone()
291            ]
292                ,"id":1
293        });
294        let filters = Arc::new(Mutex::new(HashMap::new()));
295        let id = run_new_filter_request_test(raw_json.clone(), filters.clone()).await;
296        let filters = filters.lock().unwrap();
297        assert!(filters.len() == 1);
298        let (_, filter) = filters.clone().get(&id).unwrap().clone();
299        assert!(matches!(
300            filter.filter_data.from_block,
301            BlockIdentifier::Number(1)
302        ));
303        assert!(matches!(
304            filter.filter_data.to_block,
305            BlockIdentifier::Number(2)
306        ));
307        assert!(filter.filter_data.address_filters.is_none());
308        assert!(matches!(
309            &filter.filter_data.topics[..],
310            [TopicFilter::Topic(_)]
311        ));
312    }
313
314    #[tokio::test]
315    async fn filter_request_smoke_test_valid_null_topics_null_addr() {
316        let raw_json = json!(
317        {
318            "jsonrpc":"2.0",
319            "method":"eth_newFilter",
320            "params":
321            [
322                {
323                    "fromBlock": "0x1",
324                    "toBlock": "0xFF",
325                    "topics": null,
326                    "address": null
327                }
328            ]
329                ,"id":1
330        });
331        let filters = Arc::new(Mutex::new(HashMap::new()));
332        let id = run_new_filter_request_test(raw_json.clone(), filters.clone()).await;
333        let filters = filters.lock().unwrap();
334        assert!(filters.len() == 1);
335        let (_, filter) = filters.clone().get(&id).unwrap().clone();
336        assert!(matches!(
337            filter.filter_data.from_block,
338            BlockIdentifier::Number(1)
339        ));
340        assert!(matches!(
341            filter.filter_data.to_block,
342            BlockIdentifier::Number(255)
343        ));
344        assert!(filter.filter_data.address_filters.is_none());
345        assert!(matches!(&filter.filter_data.topics[..], []));
346    }
347
348    #[tokio::test]
349    async fn filter_request_smoke_test_valid_addr_topic_null() {
350        let raw_json = json!(
351        {
352            "jsonrpc":"2.0",
353            "method":"eth_newFilter",
354            "params":
355            [
356                {
357                    "fromBlock": "0x1",
358                    "toBlock": "0xFF",
359                    "topics": null,
360                    "address": [ "0xb794f5ea0ba39494ce839613fffba74279579268" ]
361                }
362            ]
363                ,"id":1
364        });
365        let filters = Arc::new(Mutex::new(HashMap::new()));
366        let id = run_new_filter_request_test(raw_json.clone(), filters.clone()).await;
367        let filters = filters.lock().unwrap();
368        assert!(filters.len() == 1);
369        let (_, filter) = filters.clone().get(&id).unwrap().clone();
370        assert!(matches!(
371            filter.filter_data.from_block,
372            BlockIdentifier::Number(1)
373        ));
374        assert!(matches!(
375            filter.filter_data.to_block,
376            BlockIdentifier::Number(255)
377        ));
378        assert!(matches!(
379            filter.filter_data.address_filters.unwrap(),
380            AddressFilter::Many(_)
381        ));
382        assert!(matches!(&filter.filter_data.topics[..], []));
383    }
384
385    #[tokio::test]
386    #[should_panic]
387    async fn filter_request_smoke_test_invalid_block_range() {
388        let raw_json = json!(
389        {
390            "jsonrpc":"2.0",
391            "method":"eth_newFilter",
392            "params":
393            [
394                {
395                    "fromBlock": "0xFFF",
396                    "toBlock": "0xA",
397                    "topics": null,
398                    "address": null
399                }
400            ]
401                ,"id":1
402        });
403        run_new_filter_request_test(raw_json.clone(), Default::default()).await;
404    }
405
406    #[tokio::test]
407    #[should_panic]
408    async fn filter_request_smoke_test_from_block_missing() {
409        let raw_json = json!(
410        {
411            "jsonrpc":"2.0",
412            "method":"eth_newFilter",
413            "params":
414            [
415                {
416                    "fromBlock": null,
417                    "toBlock": "0xA",
418                    "topics": null,
419                    "address": null
420                }
421            ]
422                ,"id":1
423        });
424        let filters = Arc::new(Mutex::new(HashMap::new()));
425        run_new_filter_request_test(raw_json.clone(), filters.clone()).await;
426    }
427
428    async fn run_new_filter_request_test(
429        json_req: serde_json::Value,
430        filters_pointer: ActiveFilters,
431    ) -> u64 {
432        let storage = Store::new("in-mem", EngineType::InMemory)
433            .expect("Fatal: could not create in memory test db");
434        let mut context = default_context_with_storage(storage).await;
435        context.active_filters = filters_pointer.clone();
436
437        let request: RpcRequest = serde_json::from_value(json_req).expect("Test json is incorrect");
438        let genesis_config: Genesis =
439            serde_json::from_str(TEST_GENESIS).expect("Fatal: non-valid genesis test config");
440
441        context
442            .storage
443            .add_initial_state(genesis_config)
444            .await
445            .expect("Fatal: could not add test genesis in test");
446        let response = map_http_requests(&request, context)
447            .await
448            .unwrap()
449            .to_string();
450        let trimmed_id = response.trim().trim_matches('"');
451        assert!(trimmed_id.starts_with("0x"));
452        let hex = trimmed_id.trim_start_matches("0x");
453        let parsed = u64::from_str_radix(hex, 16);
454        assert!(u64::from_str_radix(hex, 16).is_ok());
455        parsed.unwrap()
456    }
457
458    #[tokio::test]
459    async fn install_filter_removed_correctly_test() {
460        let uninstall_filter_req: RpcRequest = serde_json::from_value(json!(
461        {
462            "jsonrpc":"2.0",
463            "method":"eth_uninstallFilter",
464            "params":
465            [
466                "0xFF"
467            ]
468                ,"id":1
469        }))
470        .expect("Json for test is not a valid request");
471        let filter = (
472            0xFF,
473            (
474                Instant::now(),
475                PollableFilter {
476                    last_block_number: 0,
477                    filter_data: LogsFilter {
478                        from_block: BlockIdentifier::Number(1),
479                        to_block: BlockIdentifier::Number(2),
480                        address_filters: None,
481                        topics: vec![],
482                    },
483                },
484            ),
485        );
486        let active_filters = Arc::new(Mutex::new(HashMap::from([filter])));
487
488        let storage = Store::new("in-mem", EngineType::InMemory)
489            .expect("Fatal: could not create in memory test db");
490
491        let mut context = default_context_with_storage(storage).await;
492        context.active_filters = active_filters.clone();
493
494        map_http_requests(&uninstall_filter_req, context)
495            .await
496            .unwrap();
497
498        assert!(
499            active_filters.clone().lock().unwrap().is_empty(),
500            "Expected filter map to be empty after request"
501        );
502    }
503
504    #[tokio::test]
505    async fn removing_non_existing_filter_returns_false() {
506        let active_filters = Arc::new(Mutex::new(HashMap::new()));
507
508        let storage = Store::new("in-mem", EngineType::InMemory)
509            .expect("Fatal: could not create in memory test db");
510        let mut context = default_context_with_storage(storage).await;
511        context.active_filters = active_filters.clone();
512
513        let uninstall_filter_req: RpcRequest = serde_json::from_value(json!(
514        {
515            "jsonrpc":"2.0",
516            "method":"eth_uninstallFilter",
517            "params":
518            [
519                "0xFF"
520            ]
521                ,"id":1
522        }))
523        .expect("Json for test is not a valid request");
524        let res = map_http_requests(&uninstall_filter_req, context)
525            .await
526            .unwrap();
527        assert!(matches!(res, serde_json::Value::Bool(false)));
528    }
529
530    #[tokio::test]
531    async fn background_job_removes_filter_smoke_test() {
532        // Start a test server to start the cleanup
533        // task in the background
534        let server_handle = start_test_api().await;
535
536        // Give the server some time to start
537        tokio::time::sleep(Duration::from_secs(1)).await;
538
539        // Install a filter through the endpiont
540        let client = reqwest::Client::new();
541        let raw_json = json!(
542        {
543            "jsonrpc":"2.0",
544            "method":"eth_newFilter",
545            "params":
546            [
547                {
548                    "fromBlock": "0x1",
549                    "toBlock": "0xA",
550                    "topics": null,
551                    "address": null
552                }
553            ]
554                ,"id":1
555        });
556        let response: Value = client
557            .post("http://localhost:8500")
558            .json(&raw_json)
559            .send()
560            .await
561            .unwrap()
562            .json()
563            .await
564            .unwrap();
565
566        assert!(
567            response.get("result").is_some(),
568            "Response should have a 'result' field"
569        );
570
571        let raw_json = json!(
572        {
573            "jsonrpc":"2.0",
574            "method":"eth_uninstallFilter",
575            "params":
576            [
577                response.get("result").unwrap()
578            ]
579                ,"id":1
580        });
581
582        tokio::time::sleep(FILTER_DURATION).await;
583        tokio::time::sleep(FILTER_DURATION).await;
584
585        let response: serde_json::Value = client
586            .post("http://localhost:8500")
587            .json(&raw_json)
588            .send()
589            .await
590            .unwrap()
591            .json()
592            .await
593            .unwrap();
594
595        assert!(
596            matches!(
597                response.get("result").unwrap(),
598                serde_json::Value::Bool(false)
599            ),
600            "Filter was expected to be deleted by background job, but it still exists"
601        );
602
603        server_handle.abort();
604    }
605}