erc20_rpc_pool/rpc_pool/
eth_generic_call.rs1use crate::rpc_pool::web3_error_list::check_if_proper_rpc_error;
2use crate::rpc_pool::VerifyEndpointResult;
3use crate::Web3RpcPool;
4use erc20_payment_lib_common::{
5 DriverEvent, DriverEventContent, Web3RpcPoolContent, Web3RpcPoolInfo,
6};
7use serde::de::DeserializeOwned;
8use std::sync::Arc;
9use std::time::Duration;
10use web3::{api::Eth, helpers::CallFuture};
11
12pub trait EthMethod<T: web3::Transport> {
13 const METHOD: &'static str;
14 type Args: Clone;
15 type Return: DeserializeOwned;
16
17 fn do_call(eth: Eth<T>, args: Self::Args) -> CallFuture<Self::Return, T::Out>;
18}
19
20impl Web3RpcPool {
21 pub async fn eth_generic_call<EthMethodCall: EthMethod<web3::transports::Http>>(
22 self: Arc<Self>,
23 args: EthMethodCall::Args,
24 ) -> Result<EthMethodCall::Return, web3::Error> {
25 let mut loop_no = 0;
26 const LOOP_COUNT: usize = 4;
27 loop {
28 let resp = self.clone().choose_best_endpoints().await;
29 if resp.allowed_endpoints.is_empty() && !resp.is_resolving {
30 log::warn!("No valid endpoints found for chain id {}, wait until next check. Call yagna payment driver rpc --verify for details", self.chain_id);
31 return Err(web3::Error::Unreachable);
32 }
33 let idx_vec = resp.allowed_endpoints;
34 if let Some(idx_chosen) = idx_vec.first() {
35 self.mark_rpc_chosen(*idx_chosen);
36 }
37
38 if idx_vec.is_empty() {
39 if loop_no >= LOOP_COUNT {
40 if let Some(event_sender) =
41 self.event_sender.clone().and_then(|es| es.upgrade())
42 {
43 let _ = event_sender
44 .send(DriverEvent {
45 create_date: chrono::Utc::now(),
46 content: DriverEventContent::Web3RpcMessage(Web3RpcPoolInfo {
47 chain_id: self.chain_id,
48 content: Web3RpcPoolContent::AllEndpointsFailed,
49 }),
50 })
51 .await;
52 }
53 log::warn!(
54 "Seems like all RPC endpoints failed - chain id: {}",
55 self.chain_id
56 );
57 return Err(web3::Error::Unreachable);
58 }
59 let sleep_times: [u64; LOOP_COUNT] = [800, 1200, 2000, 2800];
61 tokio::time::sleep(Duration::from_millis(sleep_times[loop_no])).await;
62 loop_no += 1;
63 continue;
64 }
65
66 for idx in idx_vec {
67 let res = match self.get_web3(idx) {
68 Some(web3) => tokio::time::timeout(
69 self.get_max_timeout(idx),
70 EthMethodCall::do_call(web3.eth(), args.clone()),
71 ),
72 None => {
73 log::warn!("No web3 instance found on specified index");
75 continue;
76 }
77 };
78
79 let err = match res.await {
80 Ok(Ok(balance)) => {
81 self.mark_rpc_success(idx, EthMethodCall::METHOD.to_string());
82 if let Some(event_sender) =
83 self.event_sender.clone().and_then(|es| es.upgrade())
84 {
85 let _ = event_sender
86 .send(DriverEvent {
87 create_date: chrono::Utc::now(),
88 content: DriverEventContent::Web3RpcMessage(Web3RpcPoolInfo {
89 chain_id: self.chain_id,
90 content: Web3RpcPoolContent::Success,
91 }),
92 })
93 .await;
94 }
95 return Ok(balance);
96 }
97 Ok(Err(e)) => match e {
98 web3::Error::Rpc(e) => {
99 let proper = check_if_proper_rpc_error(&e.to_string());
100 if proper {
101 self.mark_rpc_success(idx, EthMethodCall::METHOD.to_string());
102 if let Some(event_sender) =
103 self.event_sender.clone().and_then(|es| es.upgrade())
104 {
105 let _ = event_sender
106 .send(DriverEvent {
107 create_date: chrono::Utc::now(),
108 content: DriverEventContent::Web3RpcMessage(
109 Web3RpcPoolInfo {
110 chain_id: self.chain_id,
111 content: Web3RpcPoolContent::Success,
112 },
113 ),
114 })
115 .await;
116 }
117 return Err(web3::Error::Rpc(e));
118 } else {
119 log::warn!(
120 "Unknown RPC error when calling {} from endpoint {}: {}",
121 EthMethodCall::METHOD,
122 self.get_name(idx),
123 e
124 );
125 self.mark_rpc_error(
126 idx,
127 EthMethodCall::METHOD.to_string(),
128 VerifyEndpointResult::RpcWeb3Error(e.to_string()),
129 );
130 web3::Error::Rpc(e)
131 }
132 }
133 _ => {
134 log::warn!(
135 "Error doing call {} from endpoint {}: {}",
136 EthMethodCall::METHOD,
137 self.get_name(idx),
138 e
139 );
140 self.mark_rpc_error(
141 idx,
142 EthMethodCall::METHOD.to_string(),
143 VerifyEndpointResult::OtherNetworkError(e.to_string()),
144 );
145 e
146 }
147 },
148 Err(e) => {
149 log::warn!(
150 "Timeout when getting data from endpoint {}: {}",
151 self.get_name(idx),
152 e
153 );
154 self.mark_rpc_error(
155 idx,
156 EthMethodCall::METHOD.to_string(),
157 VerifyEndpointResult::Unreachable,
158 );
159 web3::Error::Unreachable
160 }
161 };
162 if loop_no >= LOOP_COUNT {
163 if let Some(event_sender) =
164 self.event_sender.clone().and_then(|es| es.upgrade())
165 {
166 let _ = event_sender
167 .send(DriverEvent {
168 create_date: chrono::Utc::now(),
169 content: DriverEventContent::Web3RpcMessage(Web3RpcPoolInfo {
170 chain_id: self.chain_id,
171 content: Web3RpcPoolContent::Error(format!(
172 "Web3 rpc call failed {}",
173 err
174 )),
175 }),
176 })
177 .await;
178 }
179 return Err(err);
180 }
181 let sleep_times: [u64; LOOP_COUNT] = [800, 1200, 2000, 2800];
183 tokio::time::sleep(Duration::from_millis(sleep_times[loop_no])).await;
184 loop_no += 1;
185 }
186 }
187 }
188}