1use ethrex_common::types::BlockNumber;
6use ethrex_storage::Store;
7use std::{
8 collections::HashMap,
9 sync::{Arc, Mutex},
10 time::{Duration, Instant},
11};
12use tracing::error;
13
14use crate::rpc::RpcHandler;
15use crate::{
16 types::block_identifier::{BlockIdentifier, BlockTag},
17 utils::{RpcErr, RpcRequest, parse_json_hex},
18};
19use serde_json::{Value, json};
20
21use super::logs::{LogsFilter, fetch_logs_with_filter};
22
23#[derive(Debug, Clone)]
24pub struct NewFilterRequest {
25 pub request_data: LogsFilter,
26}
27
28pub fn clean_outdated_filters(filters: ActiveFilters, filter_duration: Duration) {
34 let mut active_filters_guard = filters.lock().unwrap_or_else(|mut poisoned_guard| {
35 error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE");
36 **poisoned_guard.get_mut() = HashMap::new();
37 filters.clear_poison();
38 poisoned_guard.into_inner()
39 });
40
41 active_filters_guard
43 .retain(|_, (filter_timestamp, _)| filter_timestamp.elapsed() <= filter_duration);
44}
45pub type ActiveFilters = Arc<Mutex<HashMap<u64, (Instant, PollableFilter)>>>;
47
48#[derive(Debug, Clone)]
49pub struct PollableFilter {
50 pub last_block_number: BlockNumber,
56 pub filter_data: LogsFilter,
57}
58
59impl NewFilterRequest {
60 pub fn parse(params: &Option<Vec<serde_json::Value>>) -> Result<Self, RpcErr> {
61 let filter = LogsFilter::parse(params)?;
62 Ok(NewFilterRequest {
63 request_data: filter,
64 })
65 }
66
67 pub async fn handle(
68 &self,
69 storage: ethrex_storage::Store,
70 filters: ActiveFilters,
71 ) -> Result<serde_json::Value, crate::utils::RpcErr> {
72 let from = self
73 .request_data
74 .from_block
75 .resolve_block_number(&storage)
76 .await?
77 .ok_or(RpcErr::WrongParam("fromBlock".to_string()))?;
78 let to = self
79 .request_data
80 .to_block
81 .resolve_block_number(&storage)
82 .await?
83 .ok_or(RpcErr::WrongParam("toBlock".to_string()))?;
84
85 if (from..=to).is_empty() {
86 return Err(RpcErr::BadParams("Invalid block range".to_string()));
87 }
88
89 let last_block_number = storage.get_latest_block_number().await?;
90 let id: u64 = rand::random();
91 let timestamp = Instant::now();
92 let mut active_filters_guard = filters.lock().unwrap_or_else(|mut poisoned_guard| {
93 error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE");
94 **poisoned_guard.get_mut() = HashMap::new();
95 filters.clear_poison();
96 poisoned_guard.into_inner()
97 });
98 active_filters_guard.insert(
99 id,
100 (
101 timestamp,
102 PollableFilter {
103 last_block_number,
104 filter_data: self.request_data.clone(),
105 },
106 ),
107 );
108 let as_hex = json!(format!("0x{:x}", id));
109 Ok(as_hex)
110 }
111
112 pub async fn stateful_call(
113 req: &RpcRequest,
114 storage: Store,
115 state: ActiveFilters,
116 ) -> Result<Value, RpcErr> {
117 let request = Self::parse(&req.params)?;
118 request.handle(storage, state).await
119 }
120}
121
122pub struct DeleteFilterRequest {
123 pub id: u64,
124}
125
126impl DeleteFilterRequest {
127 pub fn parse(params: &Option<Vec<serde_json::Value>>) -> Result<Self, RpcErr> {
128 match params.as_deref() {
129 Some([param]) => {
130 let id = parse_json_hex(param).map_err(|_err| RpcErr::BadHexFormat(0))?;
131 Ok(DeleteFilterRequest { id })
132 }
133 Some(_) => Err(RpcErr::BadParams(
134 "Expected an array with a single hex encoded id".to_string(),
135 )),
136 None => Err(RpcErr::MissingParam("0".to_string())),
137 }
138 }
139
140 pub fn handle(
141 &self,
142 _storage: ethrex_storage::Store,
143 filters: ActiveFilters,
144 ) -> Result<serde_json::Value, crate::utils::RpcErr> {
145 let mut active_filters_guard = filters.lock().unwrap_or_else(|mut poisoned_guard| {
146 error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE");
147 **poisoned_guard.get_mut() = HashMap::new();
148 filters.clear_poison();
149 poisoned_guard.into_inner()
150 });
151 match active_filters_guard.remove(&self.id) {
152 Some(_) => Ok(true.into()),
153 None => Ok(false.into()),
154 }
155 }
156
157 pub fn stateful_call(
158 req: &RpcRequest,
159 storage: ethrex_storage::Store,
160 filters: ActiveFilters,
161 ) -> Result<serde_json::Value, crate::utils::RpcErr> {
162 let request = Self::parse(&req.params)?;
163 request.handle(storage, filters)
164 }
165}
166
167pub struct FilterChangesRequest {
168 pub id: u64,
169}
170
171impl FilterChangesRequest {
172 pub fn parse(params: &Option<Vec<serde_json::Value>>) -> Result<Self, RpcErr> {
173 match params.as_deref() {
174 Some([param]) => {
175 let id = parse_json_hex(param).map_err(|_err| RpcErr::BadHexFormat(0))?;
176 Ok(FilterChangesRequest { id })
177 }
178 Some(_) => Err(RpcErr::BadParams(
179 "Expected an array with a single hex encoded id".to_string(),
180 )),
181 None => Err(RpcErr::MissingParam("0".to_string())),
182 }
183 }
184 pub async fn handle(
185 &self,
186 storage: ethrex_storage::Store,
187 filters: ActiveFilters,
188 ) -> Result<serde_json::Value, crate::utils::RpcErr> {
189 let latest_block_num = storage.get_latest_block_number().await?;
190 let mut active_filters_guard =
193 Box::new(filters.lock().unwrap_or_else(|mut poisoned_guard| {
194 error!("THREAD CRASHED WITH MUTEX TAKEN; SYSTEM MIGHT BE UNSTABLE");
195 **poisoned_guard.get_mut() = HashMap::new();
196 filters.clear_poison();
197 poisoned_guard.into_inner()
198 }));
199 if let Some((timestamp, filter)) = active_filters_guard.get_mut(&self.id) {
200 let valid_block_range = match filter.filter_data.to_block {
203 BlockIdentifier::Tag(BlockTag::Latest) => true,
204 BlockIdentifier::Number(block_num) if block_num >= latest_block_num => true,
205 _ => false,
206 };
207 if valid_block_range {
212 *timestamp = Instant::now();
215 filter.filter_data.from_block = BlockIdentifier::Number(filter.last_block_number);
218 filter.last_block_number = latest_block_num;
219 let mut filter = filter.clone();
220 filter.filter_data.to_block = BlockIdentifier::Number(latest_block_num);
221 drop(active_filters_guard);
224 let logs = fetch_logs_with_filter(&filter.filter_data, storage).await?;
225 serde_json::to_value(logs).map_err(|error| {
226 tracing::error!("Log filtering request failed with: {error}");
227 RpcErr::Internal("Failed to filter logs".to_string())
228 })
229 } else {
230 serde_json::to_value(Vec::<u8>::new()).map_err(|error| {
231 tracing::error!("Log filtering request failed with: {error}");
232 RpcErr::Internal("Failed to filter logs".to_string())
233 })
234 }
235 } else {
236 Err(RpcErr::BadParams(
237 "No matching filter for given id".to_string(),
238 ))
239 }
240 }
241 pub async fn stateful_call(
242 req: &RpcRequest,
243 storage: ethrex_storage::Store,
244 filters: ActiveFilters,
245 ) -> Result<serde_json::Value, crate::utils::RpcErr> {
246 let request = Self::parse(&req.params)?;
247 request.handle(storage, filters).await
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use std::{
254 collections::HashMap,
255 sync::{Arc, Mutex},
256 time::{Duration, Instant},
257 };
258
259 use super::ActiveFilters;
260 use crate::{
261 eth::{
262 filter::PollableFilter,
263 logs::{AddressFilter, LogsFilter, TopicFilter},
264 },
265 rpc::{FILTER_DURATION, map_http_requests},
266 test_utils::{TEST_GENESIS, default_context_with_storage, start_test_api},
267 };
268 use crate::{types::block_identifier::BlockIdentifier, utils::RpcRequest};
269 use ethrex_common::types::Genesis;
270 use ethrex_storage::{EngineType, Store};
271
272 use serde_json::{Value, json};
273
274 #[tokio::test]
275 async fn filter_request_smoke_test_valid_params() {
276 let filter_req_params = json!(
277 {
278 "fromBlock": "0x1",
279 "toBlock": "0x2",
280 "address": null,
281 "topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]
282 }
283 );
284 let raw_json = json!(
285 {
286 "jsonrpc":"2.0",
287 "method":"eth_newFilter",
288 "params":
289 [
290 filter_req_params.clone()
291 ]
292 ,"id":1
293 });
294 let filters = Arc::new(Mutex::new(HashMap::new()));
295 let id = run_new_filter_request_test(raw_json.clone(), filters.clone()).await;
296 let filters = filters.lock().unwrap();
297 assert!(filters.len() == 1);
298 let (_, filter) = filters.clone().get(&id).unwrap().clone();
299 assert!(matches!(
300 filter.filter_data.from_block,
301 BlockIdentifier::Number(1)
302 ));
303 assert!(matches!(
304 filter.filter_data.to_block,
305 BlockIdentifier::Number(2)
306 ));
307 assert!(filter.filter_data.address_filters.is_none());
308 assert!(matches!(
309 &filter.filter_data.topics[..],
310 [TopicFilter::Topic(_)]
311 ));
312 }
313
314 #[tokio::test]
315 async fn filter_request_smoke_test_valid_null_topics_null_addr() {
316 let raw_json = json!(
317 {
318 "jsonrpc":"2.0",
319 "method":"eth_newFilter",
320 "params":
321 [
322 {
323 "fromBlock": "0x1",
324 "toBlock": "0xFF",
325 "topics": null,
326 "address": null
327 }
328 ]
329 ,"id":1
330 });
331 let filters = Arc::new(Mutex::new(HashMap::new()));
332 let id = run_new_filter_request_test(raw_json.clone(), filters.clone()).await;
333 let filters = filters.lock().unwrap();
334 assert!(filters.len() == 1);
335 let (_, filter) = filters.clone().get(&id).unwrap().clone();
336 assert!(matches!(
337 filter.filter_data.from_block,
338 BlockIdentifier::Number(1)
339 ));
340 assert!(matches!(
341 filter.filter_data.to_block,
342 BlockIdentifier::Number(255)
343 ));
344 assert!(filter.filter_data.address_filters.is_none());
345 assert!(matches!(&filter.filter_data.topics[..], []));
346 }
347
348 #[tokio::test]
349 async fn filter_request_smoke_test_valid_addr_topic_null() {
350 let raw_json = json!(
351 {
352 "jsonrpc":"2.0",
353 "method":"eth_newFilter",
354 "params":
355 [
356 {
357 "fromBlock": "0x1",
358 "toBlock": "0xFF",
359 "topics": null,
360 "address": [ "0xb794f5ea0ba39494ce839613fffba74279579268" ]
361 }
362 ]
363 ,"id":1
364 });
365 let filters = Arc::new(Mutex::new(HashMap::new()));
366 let id = run_new_filter_request_test(raw_json.clone(), filters.clone()).await;
367 let filters = filters.lock().unwrap();
368 assert!(filters.len() == 1);
369 let (_, filter) = filters.clone().get(&id).unwrap().clone();
370 assert!(matches!(
371 filter.filter_data.from_block,
372 BlockIdentifier::Number(1)
373 ));
374 assert!(matches!(
375 filter.filter_data.to_block,
376 BlockIdentifier::Number(255)
377 ));
378 assert!(matches!(
379 filter.filter_data.address_filters.unwrap(),
380 AddressFilter::Many(_)
381 ));
382 assert!(matches!(&filter.filter_data.topics[..], []));
383 }
384
385 #[tokio::test]
386 #[should_panic]
387 async fn filter_request_smoke_test_invalid_block_range() {
388 let raw_json = json!(
389 {
390 "jsonrpc":"2.0",
391 "method":"eth_newFilter",
392 "params":
393 [
394 {
395 "fromBlock": "0xFFF",
396 "toBlock": "0xA",
397 "topics": null,
398 "address": null
399 }
400 ]
401 ,"id":1
402 });
403 run_new_filter_request_test(raw_json.clone(), Default::default()).await;
404 }
405
406 #[tokio::test]
407 #[should_panic]
408 async fn filter_request_smoke_test_from_block_missing() {
409 let raw_json = json!(
410 {
411 "jsonrpc":"2.0",
412 "method":"eth_newFilter",
413 "params":
414 [
415 {
416 "fromBlock": null,
417 "toBlock": "0xA",
418 "topics": null,
419 "address": null
420 }
421 ]
422 ,"id":1
423 });
424 let filters = Arc::new(Mutex::new(HashMap::new()));
425 run_new_filter_request_test(raw_json.clone(), filters.clone()).await;
426 }
427
428 async fn run_new_filter_request_test(
429 json_req: serde_json::Value,
430 filters_pointer: ActiveFilters,
431 ) -> u64 {
432 let storage = Store::new("in-mem", EngineType::InMemory)
433 .expect("Fatal: could not create in memory test db");
434 let mut context = default_context_with_storage(storage).await;
435 context.active_filters = filters_pointer.clone();
436
437 let request: RpcRequest = serde_json::from_value(json_req).expect("Test json is incorrect");
438 let genesis_config: Genesis =
439 serde_json::from_str(TEST_GENESIS).expect("Fatal: non-valid genesis test config");
440
441 context
442 .storage
443 .add_initial_state(genesis_config)
444 .await
445 .expect("Fatal: could not add test genesis in test");
446 let response = map_http_requests(&request, context)
447 .await
448 .unwrap()
449 .to_string();
450 let trimmed_id = response.trim().trim_matches('"');
451 assert!(trimmed_id.starts_with("0x"));
452 let hex = trimmed_id.trim_start_matches("0x");
453 let parsed = u64::from_str_radix(hex, 16);
454 assert!(u64::from_str_radix(hex, 16).is_ok());
455 parsed.unwrap()
456 }
457
458 #[tokio::test]
459 async fn install_filter_removed_correctly_test() {
460 let uninstall_filter_req: RpcRequest = serde_json::from_value(json!(
461 {
462 "jsonrpc":"2.0",
463 "method":"eth_uninstallFilter",
464 "params":
465 [
466 "0xFF"
467 ]
468 ,"id":1
469 }))
470 .expect("Json for test is not a valid request");
471 let filter = (
472 0xFF,
473 (
474 Instant::now(),
475 PollableFilter {
476 last_block_number: 0,
477 filter_data: LogsFilter {
478 from_block: BlockIdentifier::Number(1),
479 to_block: BlockIdentifier::Number(2),
480 address_filters: None,
481 topics: vec![],
482 },
483 },
484 ),
485 );
486 let active_filters = Arc::new(Mutex::new(HashMap::from([filter])));
487
488 let storage = Store::new("in-mem", EngineType::InMemory)
489 .expect("Fatal: could not create in memory test db");
490
491 let mut context = default_context_with_storage(storage).await;
492 context.active_filters = active_filters.clone();
493
494 map_http_requests(&uninstall_filter_req, context)
495 .await
496 .unwrap();
497
498 assert!(
499 active_filters.clone().lock().unwrap().is_empty(),
500 "Expected filter map to be empty after request"
501 );
502 }
503
504 #[tokio::test]
505 async fn removing_non_existing_filter_returns_false() {
506 let active_filters = Arc::new(Mutex::new(HashMap::new()));
507
508 let storage = Store::new("in-mem", EngineType::InMemory)
509 .expect("Fatal: could not create in memory test db");
510 let mut context = default_context_with_storage(storage).await;
511 context.active_filters = active_filters.clone();
512
513 let uninstall_filter_req: RpcRequest = serde_json::from_value(json!(
514 {
515 "jsonrpc":"2.0",
516 "method":"eth_uninstallFilter",
517 "params":
518 [
519 "0xFF"
520 ]
521 ,"id":1
522 }))
523 .expect("Json for test is not a valid request");
524 let res = map_http_requests(&uninstall_filter_req, context)
525 .await
526 .unwrap();
527 assert!(matches!(res, serde_json::Value::Bool(false)));
528 }
529
530 #[tokio::test]
531 async fn background_job_removes_filter_smoke_test() {
532 let server_handle = start_test_api().await;
535
536 tokio::time::sleep(Duration::from_secs(1)).await;
538
539 let client = reqwest::Client::new();
541 let raw_json = json!(
542 {
543 "jsonrpc":"2.0",
544 "method":"eth_newFilter",
545 "params":
546 [
547 {
548 "fromBlock": "0x1",
549 "toBlock": "0xA",
550 "topics": null,
551 "address": null
552 }
553 ]
554 ,"id":1
555 });
556 let response: Value = client
557 .post("http://localhost:8500")
558 .json(&raw_json)
559 .send()
560 .await
561 .unwrap()
562 .json()
563 .await
564 .unwrap();
565
566 assert!(
567 response.get("result").is_some(),
568 "Response should have a 'result' field"
569 );
570
571 let raw_json = json!(
572 {
573 "jsonrpc":"2.0",
574 "method":"eth_uninstallFilter",
575 "params":
576 [
577 response.get("result").unwrap()
578 ]
579 ,"id":1
580 });
581
582 tokio::time::sleep(FILTER_DURATION).await;
583 tokio::time::sleep(FILTER_DURATION).await;
584
585 let response: serde_json::Value = client
586 .post("http://localhost:8500")
587 .json(&raw_json)
588 .send()
589 .await
590 .unwrap()
591 .json()
592 .await
593 .unwrap();
594
595 assert!(
596 matches!(
597 response.get("result").unwrap(),
598 serde_json::Value::Bool(false)
599 ),
600 "Filter was expected to be deleted by background job, but it still exists"
601 );
602
603 server_handle.abort();
604 }
605}