1use crate::cache::CacheManager;
20use crate::metrics::{ErrorType, MetricsCollector};
21use crate::providers::ProviderManager;
22use eyre::Result;
23use reqwest::StatusCode;
24use serde_json::Value;
25use std::collections::hash_map::DefaultHasher;
26use std::collections::{HashMap, HashSet};
27use std::hash::{Hash, Hasher};
28use std::sync::Arc;
29use std::time::Instant;
30use tracing::{debug, warn};
31
32const CACHEABLE_METHODS: &[&str] = &[
34 "eth_getCode",
36 "eth_getBalance",
37 "eth_getStorageAt",
38 "eth_getProof",
39 "eth_getTransactionCount",
40 "eth_call",
41 "eth_getTransactionByHash",
43 "eth_getRawTransactionByHash",
44 "eth_getTransactionReceipt",
45 "eth_getTransactionByBlockHashAndIndex",
46 "eth_getTransactionByBlockNumberAndIndex",
47 "eth_getBlockByNumber",
49 "eth_getBlockByHash",
50 "eth_getBlockReceipts",
51 "eth_getBlockTransactionCountByHash",
52 "eth_getBlockTransactionCountByNumber",
53 "eth_getUncleByBlockHashAndIndex",
54 "eth_getUncleByBlockNumberAndIndex",
55 "eth_getUncleCountByBlockHash",
56 "eth_getUncleCountByBlockNumber",
57 "eth_getLogs",
59 "eth_chainId",
61 "net_version",
62 "debug_traceTransaction",
64 "debug_traceBlockByNumber",
65 "debug_traceBlockByHash",
66 "debug_traceCall",
67 "trace_transaction",
68 "trace_block",
69 "trace_replayTransaction",
70 "trace_replayBlockTransactions",
71 "trace_call",
72];
73
74const RATE_LIMIT_PATTERNS: &[&str] = &[
76 "rate limit",
77 "rate-limit",
78 "ratelimit",
79 "too many requests",
80 "cu limit exceeded",
81 "compute units exceeded",
82 "quota exceeded",
83 "throttled",
84 "exceeded the allowed rps",
85 "request limit",
86 "max requests",
87];
88
89const USER_ERROR_PATTERNS: &[&str] = &[
91 "invalid method",
92 "method not found",
93 "invalid params",
94 "missing param",
95 "invalid argument",
96 "parse error",
97 "invalid request",
98 "unsupported method",
99];
100
101const UNSUPPORTED_METHOD_PATTERNS: &[&str] =
103 &["not supported", "not available", "method not found", "unsupported", "not implemented"];
104
105const TRACE_RESPONSE_FIELDS: &[&str] =
107 &["output", "trace", "stateDiff", "result", "structLogs", "gas", "returnValue"];
108
109pub struct RpcHandler {
115 upstream_client: reqwest::Client,
116 provider_manager: Arc<ProviderManager>,
117 cache_manager: Arc<CacheManager>,
118 metrics_collector: Arc<MetricsCollector>,
119}
120
121impl RpcHandler {
122 pub fn new(
132 provider_manager: Arc<ProviderManager>,
133 cache_manager: Arc<CacheManager>,
134 metrics_collector: Arc<MetricsCollector>,
135 ) -> Result<Self> {
136 let upstream_client =
137 reqwest::Client::builder().timeout(std::time::Duration::from_secs(10)).build()?;
138
139 Ok(Self { upstream_client, provider_manager, cache_manager, metrics_collector })
140 }
141
142 pub fn cache_manager(&self) -> &Arc<CacheManager> {
147 &self.cache_manager
148 }
149
150 pub fn provider_manager(&self) -> &Arc<ProviderManager> {
152 &self.provider_manager
153 }
154
155 #[allow(dead_code)]
157 pub fn metrics_collector(&self) -> &Arc<MetricsCollector> {
158 &self.metrics_collector
159 }
160
161 pub async fn handle_request(&self, request: Value) -> Result<Value> {
175 let start_time = Instant::now();
176 let method = request.get("method").and_then(|m| m.as_str()).unwrap_or("");
177
178 debug!("Handling RPC request: {}", method);
179
180 if CACHEABLE_METHODS.contains(&method) {
182 if self.has_non_deterministic_block_params(&request) {
185 debug!("Non-deterministic block params for {}, bypassing cache", method);
186 return self.forward_request(&request).await;
187 }
188
189 let cache_key = self.generate_cache_key(&request);
191
192 if let Some(cached_response) = self.cache_manager.get(&cache_key).await {
194 let response_time = start_time.elapsed().as_millis() as u64;
195 self.metrics_collector.record_cache_hit(method, response_time);
196 debug!("Cache hit for {}: {}", method, cache_key);
197 return Ok(cached_response);
198 }
199
200 debug!("Cache miss for {}: {}", method, cache_key);
201
202 let response = self.forward_request(&request).await;
204 self.metrics_collector.record_cache_miss();
205
206 if let Ok(resp) = &response {
207 let success = resp.get("error").is_none();
208
209 if success {
211 if (method.starts_with("debug_") || method.starts_with("trace_"))
213 && !self.is_valid_debug_trace_response(resp)
214 {
215 debug!("Invalid debug/trace response for {}, not caching", method);
216 return Ok(resp.clone());
217 }
218
219 self.cache_manager.set(cache_key, resp.clone()).await;
220 debug!("Cached response for {}", method);
221 } else {
222 debug!("Error response for {}, not caching", method);
223 if let Some(error_obj) = resp.get("error") {
225 if let Some(error_msg) = error_obj.get("message").and_then(|m| m.as_str()) {
226 let error_msg_lower = error_msg.to_lowercase();
227 if RATE_LIMIT_PATTERNS
228 .iter()
229 .any(|pattern| error_msg_lower.contains(pattern))
230 {
231 self.metrics_collector.record_error(ErrorType::RateLimit);
232 } else if USER_ERROR_PATTERNS
233 .iter()
234 .any(|pattern| error_msg_lower.contains(pattern))
235 {
236 self.metrics_collector.record_error(ErrorType::UserError);
237 } else {
238 self.metrics_collector.record_error(ErrorType::Other);
239 }
240 }
241 }
242 }
243 }
244
245 response
246 } else {
247 debug!("Non-cacheable request: {}", method);
249 self.forward_request(&request).await
250 }
251 }
252
253 fn is_rate_limit_response(
258 &self,
259 status: StatusCode,
260 response_text: &str,
261 json_response: Option<&Value>,
262 ) -> bool {
263 if status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::SERVICE_UNAVAILABLE {
265 debug!("Rate limit response detected by status: {}", status);
266 return true;
267 }
268
269 let text_lower = response_text.to_lowercase();
271 if RATE_LIMIT_PATTERNS.iter().any(|pattern| text_lower.contains(pattern)) {
272 debug!(
273 "Rate limit response detected by response text: {}",
274 &text_lower.chars().take(200).collect::<String>()
275 );
276 return true;
277 }
278
279 if let Some(json) = json_response {
281 if let Some(error) = json.get("error") {
282 if let Some(message) = error.get("message").and_then(|m| m.as_str()) {
284 let msg_lower = message.to_lowercase();
285 if RATE_LIMIT_PATTERNS.iter().any(|pattern| msg_lower.contains(pattern)) {
286 debug!(
287 "Rate limit response detected by error message: {}",
288 &msg_lower.chars().take(200).collect::<String>()
289 );
290 return true;
291 }
292 }
293
294 if let Some(code) = error.get("code").and_then(|c| c.as_i64()) {
296 match code {
297 429 | -32005 | -32098 | -32099 => {
298 debug!("Rate limit response detected by error code: {}", code);
299 return true;
300 }
301 _ => {}
302 }
303 }
304 }
305 }
306
307 false
308 }
309
310 fn is_user_error(&self, json_response: &Value) -> bool {
314 if let Some(error) = json_response.get("error") {
315 if let Some(code) = error.get("code").and_then(|c| c.as_i64()) {
316 match code {
317 -32600 | -32601 | -32602 | -32700 => {
318 debug!("Detected user error with code: {}", code);
319 return true;
320 }
321 _ => {}
322 }
323 }
324
325 if let Some(message) = error.get("message").and_then(|m| m.as_str()) {
327 let msg_lower = message.to_lowercase();
328
329 if USER_ERROR_PATTERNS.iter().any(|pattern| msg_lower.contains(pattern)) {
330 debug!("Detected user error from message: {}", message);
331 return true;
332 }
333 }
334 }
335
336 false
337 }
338
339 fn create_error_signature(&self, error: &Value) -> u64 {
343 let mut hasher = DefaultHasher::new();
344
345 if let Some(code) = error.get("code").and_then(|c| c.as_i64()) {
347 code.hash(&mut hasher);
348 }
349
350 if let Some(message) = error.get("message").and_then(|m| m.as_str()) {
352 message.to_lowercase().hash(&mut hasher);
353 }
354
355 if let Some(data) = error.get("data") {
357 match data {
358 Value::String(s) => s.hash(&mut hasher),
359 Value::Number(n) => n.to_string().hash(&mut hasher),
360 Value::Bool(b) => b.hash(&mut hasher),
361 _ => {} }
363 }
364
365 hasher.finish()
366 }
367
368 fn is_valid_debug_trace_response(&self, response: &Value) -> bool {
373 if let Some(result) = response.get("result") {
374 if let Value::String(s) = result {
376 let s_lower = s.to_lowercase();
377 if UNSUPPORTED_METHOD_PATTERNS.iter().any(|pattern| s_lower.contains(pattern)) {
378 return false;
379 }
380 }
381
382 if let Value::Object(obj) = result {
384 let has_trace_fields =
386 TRACE_RESPONSE_FIELDS.iter().any(|field| obj.contains_key(*field));
387
388 if !has_trace_fields {
389 return false;
391 }
392 }
393
394 if let Value::Array(arr) = result {
396 return !arr.is_empty();
398 }
399
400 if result.is_null() {
402 return false;
403 }
404
405 return true;
406 }
407
408 false
409 }
410
411 async fn forward_request(&self, request: &Value) -> Result<Value> {
412 const MAX_RETRIES: usize = 5;
413 const MAX_MULTIPLE_SAME_ERROR: usize = 3;
414
415 let mut tried_providers = HashSet::new();
417 let mut error_responses: HashMap<u64, (Value, usize)> = HashMap::new();
419 let mut last_network_error: Option<eyre::Error> = None;
420 let mut providers_tried = 0;
421
422 for retry in 0..MAX_RETRIES {
423 let provider_url =
425 match self.provider_manager.get_weighted_provider_excluding(&tried_providers).await
426 {
427 Some(url) => url,
428 None => {
429 debug!(
430 "All available providers have been tried for this request (attempt {})",
431 retry + 1
432 );
433
434 if retry < MAX_RETRIES - 1 {
436 self.provider_manager.health_check_all().await;
437 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
438 tried_providers.clear();
440 continue;
441 }
442
443 if let Some((error_response, _)) =
445 error_responses.values().max_by_key(|(_, count)| *count)
446 {
447 debug!("All providers exhausted, returning most common error");
448 return Ok(error_response.clone());
449 }
450
451 return Err(last_network_error
452 .unwrap_or_else(|| eyre::eyre!("No healthy RPC providers available")));
453 }
454 };
455
456 tried_providers.insert(provider_url.clone());
458
459 providers_tried += 1;
460 debug!(
461 "Forwarding request to provider: {} (attempt {}/{}, {} providers tried)",
462 provider_url,
463 retry + 1,
464 MAX_RETRIES,
465 tried_providers.len()
466 );
467
468 let method = request.get("method").and_then(|m| m.as_str()).unwrap_or("unknown_method");
469
470 let start = Instant::now();
471
472 match self
473 .upstream_client
474 .post(&provider_url)
475 .header("Content-Type", "application/json")
476 .json(request)
477 .send()
478 .await
479 {
480 Ok(response) => {
481 let status = response.status();
482 let response_time = start.elapsed().as_millis() as u64;
483
484 match response.text().await {
485 Ok(response_text) => {
486 let json_result = serde_json::from_str::<Value>(&response_text);
488
489 if self.is_rate_limit_response(
491 status,
492 &response_text,
493 json_result.as_ref().ok(),
494 ) {
495 debug!(
496 "Provider {} is rate limited (response: {}...)",
497 provider_url,
498 &response_text.chars().take(200).collect::<String>()
499 );
500
501 self.metrics_collector.record_request(
502 method,
503 &provider_url,
504 response_time,
505 false,
506 );
507 self.provider_manager.mark_provider_failed(&provider_url).await;
508
509 continue;
511 }
512
513 match json_result {
515 Ok(response_json) => {
516 if let Some(error) = response_json.get("error") {
518 self.metrics_collector.record_request(
519 method,
520 &provider_url,
521 response_time,
522 false,
523 );
524
525 if self.is_user_error(&response_json) {
527 debug!("Detected user error, returning immediately");
528
529 self.provider_manager
531 .mark_provider_success(&provider_url, response_time)
532 .await;
533 return Ok(response_json);
534 }
535
536 let error_hash = self.create_error_signature(error);
538
539 error_responses
541 .entry(error_hash)
542 .and_modify(|(_, count)| *count += 1)
543 .or_insert((response_json.clone(), 1));
544
545 debug!(
546 "Provider {} returned error (hash: {})",
547 provider_url, error_hash
548 );
549
550 if let Some((_, count)) = error_responses.get(&error_hash) {
552 if *count >= MAX_MULTIPLE_SAME_ERROR {
553 debug!(
554 "Multiple providers ({}) returned same error, likely legitimate",
555 count
556 );
557 return Ok(response_json);
558 }
559 }
560
561 self.provider_manager
563 .mark_provider_failed(&provider_url)
564 .await;
565 continue;
566 }
567
568 self.metrics_collector.record_request(
570 method,
571 &provider_url,
572 response_time,
573 true,
574 );
575
576 self.provider_manager
577 .mark_provider_success(&provider_url, response_time)
578 .await;
579
580 debug!(
581 "Request successful via {} ({}ms)",
582 provider_url, response_time
583 );
584 return Ok(response_json);
585 }
586 Err(parse_error) => {
587 warn!(
589 "Invalid JSON response from {} (first 200 chars): {}...",
590 provider_url,
591 &response_text.chars().take(200).collect::<String>()
592 );
593
594 self.metrics_collector.record_request(
595 method,
596 &provider_url,
597 response_time,
598 false,
599 );
600 self.provider_manager.mark_provider_failed(&provider_url).await;
601 last_network_error = Some(eyre::eyre!(
602 "Invalid JSON from provider: {}",
603 parse_error
604 ));
605 continue;
606 }
607 }
608 }
609 Err(e) => {
610 warn!("Failed to read response body from {}: {}", provider_url, e);
611 self.metrics_collector.record_request(
612 method,
613 &provider_url,
614 response_time,
615 false,
616 );
617 self.provider_manager.mark_provider_failed(&provider_url).await;
618 last_network_error = Some(e.into());
619 continue;
620 }
621 }
622 }
623 Err(e) => {
624 warn!("Request failed to {}: {}", provider_url, e);
625
626 let response_time = start.elapsed().as_millis() as u64;
627 self.metrics_collector.record_request(
628 method,
629 &provider_url,
630 response_time,
631 false,
632 );
633
634 self.provider_manager.mark_provider_failed(&provider_url).await;
635 last_network_error = Some(e.into());
636 continue;
637 }
638 }
639 }
640
641 warn!(
643 "All {} retries exhausted after trying {} providers for request: {}",
644 MAX_RETRIES, providers_tried, request
645 );
646
647 if !error_responses.is_empty() {
649 let (most_common_error, count) =
650 error_responses.values().max_by_key(|(_, count)| *count).unwrap();
651
652 debug!("Returning most common error (seen {} times)", count);
653 return Ok(most_common_error.clone());
654 }
655
656 Err(last_network_error.unwrap_or_else(|| eyre::eyre!("429 Too Many Requests")))
658 }
659
660 fn has_non_deterministic_block_params(&self, request: &Value) -> bool {
661 let params = request.get("params").and_then(|p| p.as_array());
662
663 if let Some(params) = params {
664 for param in params {
665 if let Some("latest" | "pending" | "earliest" | "safe" | "finalized") =
666 param.as_str()
667 {
668 return true;
669 }
670 if let Some(param_obj) = param.as_object() {
672 if let Some(block_value) = param_obj
673 .get("blockNumber")
674 .or_else(|| param_obj.get("toBlock"))
675 .or_else(|| param_obj.get("fromBlock"))
676 {
677 if let Some("latest" | "pending" | "earliest" | "safe" | "finalized") =
678 block_value.as_str()
679 {
680 return true;
681 }
682 }
683 }
684 }
685 }
686
687 false
688 }
689
690 pub fn generate_cache_key(&self, request: &Value) -> String {
692 let method = request.get("method").and_then(|m| m.as_str()).unwrap_or("");
693 let params = request.get("params").unwrap_or(&Value::Null);
694
695 let mut hasher = DefaultHasher::new();
697 method.hash(&mut hasher);
698
699 Self::hash_json_value(&mut hasher, params);
701
702 format!("{}:{:x}", method, hasher.finish())
703 }
704
705 fn hash_json_value(hasher: &mut DefaultHasher, value: &Value) {
707 use std::collections::BTreeMap;
708
709 match value {
710 Value::Null => "null".hash(hasher),
711 Value::Bool(b) => b.hash(hasher),
712 Value::Number(n) => n.to_string().hash(hasher),
713 Value::String(s) => s.hash(hasher),
714 Value::Array(arr) => {
715 "array".hash(hasher);
716 arr.len().hash(hasher);
717 for elem in arr {
718 Self::hash_json_value(hasher, elem);
719 }
720 }
721 Value::Object(obj) => {
722 "object".hash(hasher);
723 let sorted: BTreeMap<_, _> = obj.iter().collect();
725 sorted.len().hash(hasher);
726 for (key, val) in sorted {
727 key.hash(hasher);
728 Self::hash_json_value(hasher, val);
729 }
730 }
731 }
732 }
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738 use crate::cache::CacheManager;
739 use edb_common::logging;
740 use tempfile::TempDir;
741 use tracing::{debug, info};
742 use wiremock::{
743 matchers::{method, path},
744 Mock, MockServer, ResponseTemplate,
745 };
746
747 async fn create_test_rpc_handler() -> (RpcHandler, MockServer, TempDir) {
748 let mock_server = MockServer::start().await;
749 let temp_dir = TempDir::new().unwrap();
750 let cache_path = temp_dir.path().join("test_cache.json");
751
752 Mock::given(method("POST"))
754 .and(path("/"))
755 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
756 "jsonrpc": "2.0",
757 "id": 1,
758 "result": "0x1" })))
760 .up_to_n_times(1) .mount(&mock_server)
762 .await;
763
764 let cache_manager = Arc::new(CacheManager::new(100, cache_path).unwrap());
765 let metrics_collector = Arc::new(MetricsCollector::new());
766
767 let provider_manager = Arc::new(
769 ProviderManager::new(vec![mock_server.uri()], 3)
770 .await
771 .expect("Failed to create provider manager"),
772 );
773
774 let handler = RpcHandler::new(provider_manager, cache_manager, metrics_collector).unwrap();
775
776 (handler, mock_server, temp_dir)
777 }
778
779 #[tokio::test]
780 async fn test_cacheable_method_caching() {
781 let (handler, mock_server, _temp_dir) = create_test_rpc_handler().await;
782
783 let response_data = serde_json::json!({
784 "jsonrpc": "2.0",
785 "id": 1,
786 "result": {
787 "number": "0x1000000",
788 "hash": "0x1234567890abcdef"
789 }
790 });
791
792 Mock::given(method("POST"))
793 .and(path("/"))
794 .respond_with(ResponseTemplate::new(200).set_body_json(&response_data))
795 .expect(1) .mount(&mock_server)
797 .await;
798
799 let request = serde_json::json!({
800 "jsonrpc": "2.0",
801 "method": "eth_getBlockByNumber",
802 "params": ["0x1000000", false],
803 "id": 1
804 });
805
806 let result1 = handler.handle_request(request.clone()).await.unwrap();
808 assert_eq!(result1, response_data);
809
810 let result2 = handler.handle_request(request).await.unwrap();
812 assert_eq!(result2, response_data);
813
814 }
816
817 #[tokio::test]
818 async fn test_non_cacheable_method_passthrough() {
819 let (handler, mock_server, _temp_dir) = create_test_rpc_handler().await;
820
821 let response_data = serde_json::json!({
822 "jsonrpc": "2.0",
823 "id": 1,
824 "result": "0x1000000"
825 });
826
827 Mock::given(method("POST"))
828 .and(path("/"))
829 .respond_with(ResponseTemplate::new(200).set_body_json(&response_data))
830 .expect(2) .mount(&mock_server)
832 .await;
833
834 let request = serde_json::json!({
835 "jsonrpc": "2.0",
836 "method": "eth_blockNumber",
837 "params": [],
838 "id": 1
839 });
840
841 let result1 = handler.handle_request(request.clone()).await.unwrap();
843 assert_eq!(result1, response_data);
844
845 let result2 = handler.handle_request(request).await.unwrap();
846 assert_eq!(result2, response_data);
847 }
848
849 #[tokio::test]
850 async fn test_non_deterministic_block_params_bypass_cache() {
851 let (handler, mock_server, _temp_dir) = create_test_rpc_handler().await;
852
853 let response_data = serde_json::json!({
854 "jsonrpc": "2.0",
855 "id": 1,
856 "result": {
857 "number": "0x1000000",
858 "hash": "0x1234567890abcdef"
859 }
860 });
861
862 Mock::given(method("POST"))
863 .and(path("/"))
864 .respond_with(ResponseTemplate::new(200).set_body_json(&response_data))
865 .expect(2) .mount(&mock_server)
867 .await;
868
869 let request = serde_json::json!({
870 "jsonrpc": "2.0",
871 "method": "eth_getBlockByNumber",
872 "params": ["latest", false],
873 "id": 1
874 });
875
876 let result1 = handler.handle_request(request.clone()).await.unwrap();
878 assert_eq!(result1, response_data);
879
880 let result2 = handler.handle_request(request).await.unwrap();
881 assert_eq!(result2, response_data);
882 }
883
884 #[tokio::test]
885 async fn test_deterministic_vs_non_deterministic_params() {
886 let (handler, mock_server, _temp_dir) = create_test_rpc_handler().await;
887
888 let response_data = serde_json::json!({
889 "jsonrpc": "2.0",
890 "id": 1,
891 "result": {
892 "number": "0x1000000",
893 "hash": "0x1234567890abcdef"
894 }
895 });
896
897 Mock::given(method("POST"))
898 .and(path("/"))
899 .respond_with(ResponseTemplate::new(200).set_body_json(&response_data))
900 .expect(3) .mount(&mock_server)
902 .await;
903
904 let latest_request = serde_json::json!({
906 "jsonrpc": "2.0",
907 "method": "eth_getBlockByNumber",
908 "params": ["latest", false],
909 "id": 1
910 });
911
912 let specific_request = serde_json::json!({
914 "jsonrpc": "2.0",
915 "method": "eth_getBlockByNumber",
916 "params": ["0x1000000", false],
917 "id": 1
918 });
919
920 handler.handle_request(latest_request.clone()).await.unwrap();
922 handler.handle_request(latest_request).await.unwrap();
923
924 handler.handle_request(specific_request.clone()).await.unwrap();
926 handler.handle_request(specific_request).await.unwrap();
928 }
929
930 #[tokio::test]
931 async fn test_has_non_deterministic_block_params() {
932 let (handler, _mock_server, _temp_dir) = create_test_rpc_handler().await;
933
934 let test_cases = vec![
936 (
937 serde_json::json!({
938 "method": "eth_getBlockByNumber",
939 "params": ["latest", false]
940 }),
941 true,
942 ),
943 (
944 serde_json::json!({
945 "method": "eth_getBlockByNumber",
946 "params": ["pending", false]
947 }),
948 true,
949 ),
950 (
951 serde_json::json!({
952 "method": "eth_getBlockByNumber",
953 "params": ["0x1000000", false]
954 }),
955 false,
956 ),
957 (
958 serde_json::json!({
959 "method": "eth_getLogs",
960 "params": [{
961 "fromBlock": "latest",
962 "toBlock": "latest"
963 }]
964 }),
965 true,
966 ),
967 (
968 serde_json::json!({
969 "method": "eth_getLogs",
970 "params": [{
971 "fromBlock": "0x1000000",
972 "toBlock": "0x1000010"
973 }]
974 }),
975 false,
976 ),
977 ];
978
979 for (request, expected) in test_cases {
980 let result = handler.has_non_deterministic_block_params(&request);
981 assert_eq!(result, expected, "Failed for request: {:?}", request);
982 }
983 }
984
985 #[tokio::test]
986 async fn test_error_response_not_cached() {
987 let (handler, mock_server, _temp_dir) = create_test_rpc_handler().await;
988
989 let error_response = serde_json::json!({
990 "jsonrpc": "2.0",
991 "id": 1,
992 "error": {
993 "code": -32602,
994 "message": "Invalid params"
995 }
996 });
997
998 Mock::given(method("POST"))
999 .and(path("/"))
1000 .respond_with(ResponseTemplate::new(200).set_body_json(&error_response))
1001 .expect(2) .mount(&mock_server)
1003 .await;
1004
1005 let request = serde_json::json!({
1006 "jsonrpc": "2.0",
1007 "method": "eth_getBlockByNumber",
1008 "params": ["0x1000000", false],
1009 "id": 1
1010 });
1011
1012 let result1 = handler.handle_request(request.clone()).await.unwrap();
1014 assert_eq!(result1, error_response);
1015
1016 let result2 = handler.handle_request(request).await.unwrap();
1017 assert_eq!(result2, error_response);
1018 }
1019
1020 #[tokio::test]
1021 async fn test_multi_provider_weighted_distribution() {
1022 let mock1 = MockServer::start().await;
1024 let mock2 = MockServer::start().await;
1025 let mock3 = MockServer::start().await;
1026
1027 for mock_server in &[&mock1, &mock2, &mock3] {
1029 Mock::given(method("POST"))
1030 .and(path("/"))
1031 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1032 "jsonrpc": "2.0",
1033 "id": 1,
1034 "result": "0x1234567"
1035 })))
1036 .up_to_n_times(1) .mount(mock_server)
1038 .await;
1039 }
1040
1041 let temp_dir = TempDir::new().unwrap();
1043 let cache_path = temp_dir.path().join("test_cache.json");
1044 let cache_manager = Arc::new(CacheManager::new(100, cache_path).unwrap());
1045
1046 let provider_manager = Arc::new(
1047 ProviderManager::new(vec![mock1.uri(), mock2.uri(), mock3.uri()], 3)
1048 .await
1049 .expect("Failed to create provider manager"),
1050 );
1051
1052 let metrics_collector = Arc::new(MetricsCollector::new());
1053 let handler = RpcHandler::new(provider_manager, cache_manager, metrics_collector).unwrap();
1054
1055 let response1 = serde_json::json!({
1057 "jsonrpc": "2.0",
1058 "id": 1,
1059 "result": "response_from_server_1"
1060 });
1061
1062 let response2 = serde_json::json!({
1063 "jsonrpc": "2.0",
1064 "id": 1,
1065 "result": "response_from_server_2"
1066 });
1067
1068 let response3 = serde_json::json!({
1069 "jsonrpc": "2.0",
1070 "id": 1,
1071 "result": "response_from_server_3"
1072 });
1073
1074 Mock::given(method("POST"))
1077 .and(path("/"))
1078 .respond_with(ResponseTemplate::new(200).set_body_json(&response1))
1079 .expect(0..=9) .mount(&mock1)
1081 .await;
1082
1083 Mock::given(method("POST"))
1084 .and(path("/"))
1085 .respond_with(ResponseTemplate::new(200).set_body_json(&response2))
1086 .expect(0..=9)
1087 .mount(&mock2)
1088 .await;
1089
1090 Mock::given(method("POST"))
1091 .and(path("/"))
1092 .respond_with(ResponseTemplate::new(200).set_body_json(&response3))
1093 .expect(0..=9)
1094 .mount(&mock3)
1095 .await;
1096
1097 let mut responses = Vec::new();
1099 for i in 0..9 {
1100 let request = serde_json::json!({
1101 "jsonrpc": "2.0",
1102 "method": "eth_blockNumber", "params": [],
1104 "id": i + 1
1105 });
1106
1107 let response = handler.handle_request(request).await.unwrap();
1108 responses.push(response["result"].as_str().unwrap().to_string());
1109 }
1110
1111 let server1_count = responses.iter().filter(|r| *r == "response_from_server_1").count();
1113 let server2_count = responses.iter().filter(|r| *r == "response_from_server_2").count();
1114 let server3_count = responses.iter().filter(|r| *r == "response_from_server_3").count();
1115
1116 assert_eq!(
1118 server1_count + server2_count + server3_count,
1119 9,
1120 "All 9 requests should be completed"
1121 );
1122
1123 debug!(
1126 "Server distribution - Server1: {}, Server2: {}, Server3: {}",
1127 server1_count, server2_count, server3_count
1128 );
1129 }
1130
1131 #[tokio::test]
1132 async fn test_provider_tried_once_per_request() {
1133 edb_common::logging::ensure_test_logging(None);
1134 info!("Testing that each provider is only tried once per request with Option 1");
1135
1136 let mock1 = MockServer::start().await;
1138 let mock2 = MockServer::start().await;
1139 let mock3 = MockServer::start().await;
1140
1141 for mock_server in &[&mock1, &mock2, &mock3] {
1143 Mock::given(method("POST"))
1144 .and(path("/"))
1145 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1146 "jsonrpc": "2.0",
1147 "id": 1,
1148 "result": "0x1"
1149 })))
1150 .up_to_n_times(1)
1151 .mount(mock_server)
1152 .await;
1153 }
1154
1155 let temp_dir = TempDir::new().unwrap();
1156 let cache_path = temp_dir.path().join("test_cache.json");
1157 let cache_manager = Arc::new(CacheManager::new(100, cache_path).unwrap());
1158
1159 let provider_manager = Arc::new(
1160 ProviderManager::new(vec![mock1.uri(), mock2.uri(), mock3.uri()], 3)
1161 .await
1162 .expect("Failed to create provider manager"),
1163 );
1164
1165 let metrics_collector = Arc::new(MetricsCollector::new());
1166 let handler = RpcHandler::new(provider_manager, cache_manager, metrics_collector).unwrap();
1167
1168 let error_response = serde_json::json!({
1170 "jsonrpc": "2.0",
1171 "id": 1,
1172 "error": {
1173 "code": -32000,
1174 "message": "Execution reverted: insufficient balance"
1175 }
1176 });
1177
1178 Mock::given(method("POST"))
1180 .and(path("/"))
1181 .respond_with(ResponseTemplate::new(200).set_body_json(&error_response))
1182 .expect(1) .mount(&mock1)
1184 .await;
1185
1186 Mock::given(method("POST"))
1187 .and(path("/"))
1188 .respond_with(ResponseTemplate::new(200).set_body_json(&error_response))
1189 .expect(1) .mount(&mock2)
1191 .await;
1192
1193 Mock::given(method("POST"))
1194 .and(path("/"))
1195 .respond_with(ResponseTemplate::new(200).set_body_json(&error_response))
1196 .expect(1) .mount(&mock3)
1198 .await;
1199
1200 let request = serde_json::json!({
1201 "jsonrpc": "2.0",
1202 "method": "eth_call",
1203 "params": [{}, "0x1000000"],
1204 "id": 1
1205 });
1206
1207 let result = handler.handle_request(request).await.unwrap();
1209 assert_eq!(result, error_response);
1210
1211 debug!("All providers tried exactly once - Option 1 working correctly");
1213 }
1214
1215 #[tokio::test]
1216 async fn test_multi_provider_caching_behavior() {
1217 logging::ensure_test_logging(None);
1218
1219 let mock1 = MockServer::start().await;
1221 let mock2 = MockServer::start().await;
1222
1223 for mock_server in &[&mock1, &mock2] {
1225 Mock::given(method("POST"))
1226 .and(path("/"))
1227 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1228 "jsonrpc": "2.0",
1229 "id": 1,
1230 "result": "0x1234567"
1231 })))
1232 .up_to_n_times(1)
1233 .mount(mock_server)
1234 .await;
1235 }
1236
1237 let temp_dir = TempDir::new().unwrap();
1239 let cache_path = temp_dir.path().join("test_cache.json");
1240 let cache_manager = Arc::new(CacheManager::new(100, cache_path).unwrap());
1241
1242 let provider_manager = Arc::new(
1243 ProviderManager::new(vec![mock1.uri(), mock2.uri()], 3)
1244 .await
1245 .expect("Failed to create provider manager"),
1246 );
1247
1248 let metrics_collector = Arc::new(MetricsCollector::new());
1249 let handler = RpcHandler::new(provider_manager, cache_manager, metrics_collector).unwrap();
1250
1251 let cacheable_response = serde_json::json!({
1252 "jsonrpc": "2.0",
1253 "id": 1,
1254 "result": {
1255 "number": "0x1000000",
1256 "hash": "0x1234567890abcdef"
1257 }
1258 });
1259
1260 Mock::given(method("POST"))
1262 .and(path("/"))
1263 .respond_with(ResponseTemplate::new(200).set_body_json(&cacheable_response))
1264 .up_to_n_times(1) .mount(&mock1)
1266 .await;
1267
1268 Mock::given(method("POST"))
1270 .and(path("/"))
1271 .respond_with(ResponseTemplate::new(200).set_body_json(&cacheable_response))
1272 .up_to_n_times(1) .mount(&mock2)
1274 .await;
1275
1276 let request = serde_json::json!({
1277 "jsonrpc": "2.0",
1278 "method": "eth_getBlockByNumber", "params": ["0x1000000", false],
1280 "id": 1
1281 });
1282
1283 for _ in 0..5 {
1285 let response = handler.handle_request(request.clone()).await.unwrap();
1286 assert_eq!(response, cacheable_response);
1287 }
1288
1289 }
1291
1292 #[tokio::test]
1293 async fn test_rate_limit_detection() {
1294 let temp_dir = TempDir::new().unwrap();
1296 let cache_path = temp_dir.path().join("test_cache.json");
1297 let cache_manager = Arc::new(CacheManager::new(100, cache_path).unwrap());
1298
1299 let mock_server = MockServer::start().await;
1301 Mock::given(method("POST"))
1302 .and(path("/"))
1303 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1304 "jsonrpc": "2.0",
1305 "id": 1,
1306 "result": "0x1"
1307 })))
1308 .up_to_n_times(1)
1309 .mount(&mock_server)
1310 .await;
1311
1312 let provider_manager = Arc::new(
1313 ProviderManager::new(vec![mock_server.uri()], 3)
1314 .await
1315 .expect("Failed to create provider manager"),
1316 );
1317
1318 let metrics_collector = Arc::new(MetricsCollector::new());
1319 let handler = RpcHandler::new(provider_manager, cache_manager, metrics_collector).unwrap();
1320
1321 let test_cases = vec![
1323 (StatusCode::TOO_MANY_REQUESTS, "{}", None, true),
1325 (StatusCode::SERVICE_UNAVAILABLE, "{}", None, true),
1327 (StatusCode::OK, "rate limit exceeded", None, true),
1329 (StatusCode::OK, "cu limit exceeded; The current RPC traffic is too high", None, true),
1331 (
1333 StatusCode::OK,
1334 "{}",
1335 Some(serde_json::json!({
1336 "error": {
1337 "code": -32005,
1338 "message": "Too many requests"
1339 }
1340 })),
1341 true,
1342 ),
1343 (
1345 StatusCode::OK,
1346 "{}",
1347 Some(serde_json::json!({
1348 "error": {
1349 "code": 429,
1350 "message": "Some error"
1351 }
1352 })),
1353 true,
1354 ),
1355 (
1357 StatusCode::OK,
1358 "{}",
1359 Some(serde_json::json!({
1360 "result": "0x1234"
1361 })),
1362 false,
1363 ),
1364 (
1366 StatusCode::OK,
1367 "{}",
1368 Some(serde_json::json!({
1369 "error": {
1370 "code": -32000,
1371 "message": "Execution reverted"
1372 }
1373 })),
1374 false,
1375 ),
1376 ];
1377
1378 for (status, text, json, expected) in test_cases {
1379 let result = handler.is_rate_limit_response(status, text, json.as_ref());
1380 assert_eq!(
1381 result, expected,
1382 "Failed for status: {:?}, text: {}, json: {:?}",
1383 status, text, json
1384 );
1385 }
1386 }
1387
1388 #[tokio::test]
1389 async fn test_user_error_detection() {
1390 let temp_dir = TempDir::new().unwrap();
1392 let cache_path = temp_dir.path().join("test_cache.json");
1393 let cache_manager = Arc::new(CacheManager::new(100, cache_path).unwrap());
1394
1395 let mock_server = MockServer::start().await;
1396 Mock::given(method("POST"))
1397 .and(path("/"))
1398 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1399 "jsonrpc": "2.0",
1400 "id": 1,
1401 "result": "0x1"
1402 })))
1403 .up_to_n_times(1)
1404 .mount(&mock_server)
1405 .await;
1406
1407 let provider_manager = Arc::new(
1408 ProviderManager::new(vec![mock_server.uri()], 3)
1409 .await
1410 .expect("Failed to create provider manager"),
1411 );
1412
1413 let metrics_collector = Arc::new(MetricsCollector::new());
1414 let handler = RpcHandler::new(provider_manager, cache_manager, metrics_collector).unwrap();
1415
1416 let test_cases = vec![
1418 (
1420 serde_json::json!({
1421 "error": {
1422 "code": -32600,
1423 "message": "Invalid Request"
1424 }
1425 }),
1426 true,
1427 ),
1428 (
1430 serde_json::json!({
1431 "error": {
1432 "code": -32601,
1433 "message": "Method not found"
1434 }
1435 }),
1436 true,
1437 ),
1438 (
1440 serde_json::json!({
1441 "error": {
1442 "code": -32602,
1443 "message": "Invalid params"
1444 }
1445 }),
1446 true,
1447 ),
1448 (
1450 serde_json::json!({
1451 "error": {
1452 "code": -32700,
1453 "message": "Parse error"
1454 }
1455 }),
1456 true,
1457 ),
1458 (
1460 serde_json::json!({
1461 "error": {
1462 "code": -32000,
1463 "message": "Invalid method eth_fooBar"
1464 }
1465 }),
1466 true,
1467 ),
1468 (
1470 serde_json::json!({
1471 "error": {
1472 "code": -32000,
1473 "message": "Execution reverted"
1474 }
1475 }),
1476 false,
1477 ),
1478 (
1480 serde_json::json!({
1481 "error": {
1482 "code": 429,
1483 "message": "Too many requests"
1484 }
1485 }),
1486 false,
1487 ),
1488 ];
1489
1490 for (response, expected) in test_cases {
1491 let result = handler.is_user_error(&response);
1492 assert_eq!(result, expected, "Failed for response: {:?}", response);
1493 }
1494 }
1495
1496 #[tokio::test]
1497 async fn test_rate_limit_fallback_to_healthy_provider() {
1498 let mock1 = MockServer::start().await;
1500 let mock2 = MockServer::start().await;
1501
1502 for mock_server in &[&mock1, &mock2] {
1504 Mock::given(method("POST"))
1505 .and(path("/"))
1506 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1507 "jsonrpc": "2.0",
1508 "id": 1,
1509 "result": "0x1"
1510 })))
1511 .up_to_n_times(1)
1512 .mount(mock_server)
1513 .await;
1514 }
1515
1516 let temp_dir = TempDir::new().unwrap();
1517 let cache_path = temp_dir.path().join("test_cache.json");
1518 let cache_manager = Arc::new(CacheManager::new(100, cache_path).unwrap());
1519
1520 let provider_manager = Arc::new(
1521 ProviderManager::new(vec![mock1.uri(), mock2.uri()], 3)
1522 .await
1523 .expect("Failed to create provider manager"),
1524 );
1525
1526 let metrics_collector = Arc::new(MetricsCollector::new());
1527 let handler = RpcHandler::new(provider_manager, cache_manager, metrics_collector).unwrap();
1528
1529 Mock::given(method("POST"))
1531 .and(path("/"))
1532 .respond_with(ResponseTemplate::new(429).set_body_string("Rate limit exceeded"))
1533 .up_to_n_times(1)
1534 .mount(&mock1)
1535 .await;
1536
1537 let success_response = serde_json::json!({
1539 "jsonrpc": "2.0",
1540 "id": 1,
1541 "result": "0x12345"
1542 });
1543
1544 Mock::given(method("POST"))
1545 .and(path("/"))
1546 .respond_with(ResponseTemplate::new(200).set_body_json(&success_response))
1547 .expect(1)
1548 .mount(&mock2)
1549 .await;
1550
1551 let request = serde_json::json!({
1552 "jsonrpc": "2.0",
1553 "method": "eth_blockNumber",
1554 "params": [],
1555 "id": 1
1556 });
1557
1558 let result = handler.handle_request(request).await.unwrap();
1560 assert_eq!(result, success_response);
1561 }
1562
1563 #[tokio::test]
1564 async fn test_error_deduplication() {
1565 let mock1 = MockServer::start().await;
1567 let mock2 = MockServer::start().await;
1568 let mock3 = MockServer::start().await;
1569
1570 for mock_server in &[&mock1, &mock2, &mock3] {
1572 Mock::given(method("POST"))
1573 .and(path("/"))
1574 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1575 "jsonrpc": "2.0",
1576 "id": 1,
1577 "result": "0x1"
1578 })))
1579 .up_to_n_times(1)
1580 .mount(mock_server)
1581 .await;
1582 }
1583
1584 let temp_dir = TempDir::new().unwrap();
1585 let cache_path = temp_dir.path().join("test_cache.json");
1586 let cache_manager = Arc::new(CacheManager::new(100, cache_path).unwrap());
1587
1588 let provider_manager = Arc::new(
1589 ProviderManager::new(vec![mock1.uri(), mock2.uri(), mock3.uri()], 3)
1590 .await
1591 .expect("Failed to create provider manager"),
1592 );
1593
1594 let metrics_collector = Arc::new(MetricsCollector::new());
1595 let handler = RpcHandler::new(provider_manager, cache_manager, metrics_collector).unwrap();
1596
1597 let error_response = serde_json::json!({
1599 "jsonrpc": "2.0",
1600 "id": 1,
1601 "error": {
1602 "code": -32000,
1603 "message": "Execution reverted: insufficient balance"
1604 }
1605 });
1606
1607 Mock::given(method("POST"))
1609 .and(path("/"))
1610 .respond_with(ResponseTemplate::new(200).set_body_json(&error_response))
1611 .expect(1)
1612 .mount(&mock1)
1613 .await;
1614
1615 Mock::given(method("POST"))
1617 .and(path("/"))
1618 .respond_with(ResponseTemplate::new(200).set_body_json(&error_response))
1619 .expect(1)
1620 .mount(&mock2)
1621 .await;
1622
1623 Mock::given(method("POST"))
1625 .and(path("/"))
1626 .respond_with(ResponseTemplate::new(200).set_body_json(&error_response))
1627 .expect(1)
1628 .mount(&mock3)
1629 .await;
1630
1631 let request = serde_json::json!({
1632 "jsonrpc": "2.0",
1633 "method": "eth_call",
1634 "params": [{}, "0x1000000"],
1635 "id": 1
1636 });
1637
1638 let result = handler.handle_request(request).await.unwrap();
1640 assert_eq!(result, error_response);
1641 }
1642
1643 #[tokio::test]
1644 async fn test_eth_call_caching_with_deterministic_block() {
1645 let (handler, mock_server, _temp_dir) = create_test_rpc_handler().await;
1646
1647 let response_data = serde_json::json!({
1648 "jsonrpc": "2.0",
1649 "id": 1,
1650 "result": "0x0000000000000000000000000000000000000000000000000000000000000001"
1651 });
1652
1653 Mock::given(method("POST"))
1654 .and(path("/"))
1655 .respond_with(ResponseTemplate::new(200).set_body_json(&response_data))
1656 .expect(1) .mount(&mock_server)
1658 .await;
1659
1660 let request = serde_json::json!({
1662 "jsonrpc": "2.0",
1663 "method": "eth_call",
1664 "params": [
1665 {
1666 "to": "0x1234567890123456789012345678901234567890",
1667 "data": "0x"
1668 },
1669 "0x1000000" ],
1671 "id": 1
1672 });
1673
1674 let result1 = handler.handle_request(request.clone()).await.unwrap();
1676 assert_eq!(result1, response_data);
1677
1678 let result2 = handler.handle_request(request).await.unwrap();
1680 assert_eq!(result2, response_data);
1681
1682 }
1684
1685 #[tokio::test]
1686 async fn test_eth_call_not_cached_with_latest() {
1687 let (handler, mock_server, _temp_dir) = create_test_rpc_handler().await;
1688
1689 let response_data = serde_json::json!({
1690 "jsonrpc": "2.0",
1691 "id": 1,
1692 "result": "0x0000000000000000000000000000000000000000000000000000000000000001"
1693 });
1694
1695 Mock::given(method("POST"))
1696 .and(path("/"))
1697 .respond_with(ResponseTemplate::new(200).set_body_json(&response_data))
1698 .expect(2) .mount(&mock_server)
1700 .await;
1701
1702 let request = serde_json::json!({
1704 "jsonrpc": "2.0",
1705 "method": "eth_call",
1706 "params": [
1707 {
1708 "to": "0x1234567890123456789012345678901234567890",
1709 "data": "0x"
1710 },
1711 "latest" ],
1713 "id": 1
1714 });
1715
1716 let result1 = handler.handle_request(request.clone()).await.unwrap();
1718 assert_eq!(result1, response_data);
1719
1720 let result2 = handler.handle_request(request).await.unwrap();
1721 assert_eq!(result2, response_data);
1722
1723 }
1725
1726 #[tokio::test]
1727 async fn test_debug_trace_validation_with_unsupported_response() {
1728 let temp_dir = TempDir::new().unwrap();
1729 let cache_path = temp_dir.path().join("test_cache.json");
1730 let cache_manager = Arc::new(CacheManager::new(100, cache_path).unwrap());
1731
1732 let mock_server = MockServer::start().await;
1733 Mock::given(method("POST"))
1734 .and(path("/"))
1735 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1736 "jsonrpc": "2.0",
1737 "id": 1,
1738 "result": "0x1"
1739 })))
1740 .up_to_n_times(1)
1741 .mount(&mock_server)
1742 .await;
1743
1744 let provider_manager = Arc::new(
1745 ProviderManager::new(vec![mock_server.uri()], 3)
1746 .await
1747 .expect("Failed to create provider manager"),
1748 );
1749
1750 let metrics_collector = Arc::new(MetricsCollector::new());
1751 let handler = RpcHandler::new(provider_manager, cache_manager, metrics_collector).unwrap();
1752
1753 let test_cases = vec![
1755 (
1757 serde_json::json!({
1758 "jsonrpc": "2.0",
1759 "id": 1,
1760 "result": "method not supported"
1761 }),
1762 false,
1763 ),
1764 (
1766 serde_json::json!({
1767 "jsonrpc": "2.0",
1768 "id": 1,
1769 "result": {
1770 "output": "0x1234",
1771 "gas": 21000
1772 }
1773 }),
1774 true,
1775 ),
1776 (
1778 serde_json::json!({
1779 "jsonrpc": "2.0",
1780 "id": 1,
1781 "result": {}
1782 }),
1783 false,
1784 ),
1785 (
1787 serde_json::json!({
1788 "jsonrpc": "2.0",
1789 "id": 1,
1790 "result": [{"action": "call"}]
1791 }),
1792 true,
1793 ),
1794 ];
1795
1796 for (response, expected) in test_cases {
1797 let result = handler.is_valid_debug_trace_response(&response);
1798 assert_eq!(result, expected, "Failed for response: {:?}", response);
1799 }
1800 }
1801}